fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "fmt"
5 "os"
6 "os/exec"
7 "path/filepath"
8 "sort"
9 "strings"
10 "time"
11
12 "github.com/grafana/regexp"
13 "github.com/prometheus/client_golang/prometheus"
14 "github.com/prometheus/client_golang/prometheus/promauto"
15 "github.com/sourcegraph/zoekt"
16 "github.com/sourcegraph/zoekt/index"
17)
18
19var metricCleanupDuration = promauto.NewHistogram(prometheus.HistogramOpts{
20 Name: "index_cleanup_duration_seconds",
21 Help: "The duration of one cleanup run",
22 Buckets: prometheus.LinearBuckets(1, 1, 10),
23})
24
25// cleanup trashes shards in indexDir that do not exist in repos. For repos
26// that do not exist in indexDir, but do in indexDir/.trash it will move them
27// back into indexDir. Additionally it uses now to remove shards that have
28// been in the trash for 24 hours. It also deletes .tmp files older than 4 hours.
29func cleanup(indexDir string, repos []uint32, now time.Time, shardMerging bool) {
30 start := time.Now()
31 trashDir := filepath.Join(indexDir, ".trash")
32 if err := os.MkdirAll(trashDir, 0o755); err != nil {
33 errorLog.Printf("failed to create trash dir: %v", err)
34 }
35
36 trash := getShards(trashDir)
37 tombtones := getTombstonedRepos(indexDir)
38 indexShards := getShards(indexDir)
39
40 // trash: Remove old shards and conflicts with index
41 minAge := now.Add(-24 * time.Hour)
42 for repo, shards := range trash {
43 old := false
44 for _, shard := range shards {
45 if shard.ModTime.Before(minAge) {
46 old = true
47 } else if shard.ModTime.After(now) {
48 debugLog.Printf("trashed shard %s has timestamp in the future, reseting to now", shard.Path)
49 _ = os.Chtimes(shard.Path, now, now)
50 }
51 }
52
53 if _, conflicts := indexShards[repo]; !conflicts && !old {
54 continue
55 }
56
57 infoLog.Printf("removing old shards from trash for %v", repo)
58 removeAll(shards...)
59 delete(trash, repo)
60 }
61
62 // tombstones: Remove tombstones that conflict with index or trash. After this,
63 // tombstones only contain repos that are neither in the trash nor in the index.
64 for repo := range tombtones {
65 if _, conflicts := indexShards[repo]; conflicts {
66 delete(tombtones, repo)
67 }
68 // Trash takes precedence over tombstones.
69 if _, conflicts := trash[repo]; conflicts {
70 delete(tombtones, repo)
71 }
72 }
73
74 // index: We are ID based, but store shards by name still. If we end up with
75 // shards that have the same ID but different names delete and start over.
76 // This can happen when a repository is renamed. In future we should make
77 // shard file names based on ID.
78 for repo, shards := range indexShards {
79 if consistentRepoName(shards) {
80 continue
81 }
82
83 // prevent further processing since we will delete
84 delete(indexShards, repo)
85
86 // This should be rare, so give an informative log message.
87 var paths []string
88 for _, shard := range shards {
89 paths = append(paths, filepath.Base(shard.Path))
90 }
91 infoLog.Printf("removing shards for %v due to multiple repository names: %s", repo, strings.Join(paths, " "))
92
93 // We may be in both normal and compound shards in this case. First
94 // tombstone the compound shards so we don't just rm them.
95 simple := shards[:0]
96 for _, s := range shards {
97 if shardMerging && maybeSetTombstone([]shard{s}, repo) {
98 continue
99 }
100
101 simple = append(simple, s)
102 }
103
104 if len(simple) == 0 {
105 continue
106 }
107
108 removeAll(simple...)
109 }
110
111 // index: Move missing repos from trash into index
112 // index: Restore deleted or tombstoned repos.
113 for _, repo := range repos {
114 // Delete from index so that index will only contain shards to be
115 // trashed.
116 delete(indexShards, repo)
117
118 if shards, ok := trash[repo]; ok {
119 infoLog.Printf("restoring shards from trash for %v", repo)
120 moveAll(indexDir, shards)
121 continue
122 }
123
124 if s, ok := tombtones[repo]; ok {
125 infoLog.Printf("removing tombstone for %v", repo)
126 err := index.UnsetTombstone(s.Path, repo)
127 if err != nil {
128 errorLog.Printf("error removing tombstone for %v: %s", repo, err)
129 }
130 }
131 }
132
133 // index: Move non-existent repos into trash
134 for repo, shards := range indexShards {
135 // Best-effort touch. If touch fails, we will just remove from the
136 // trash sooner.
137 for _, shard := range shards {
138 _ = os.Chtimes(shard.Path, now, now)
139 }
140
141 if shardMerging && maybeSetTombstone(shards, repo) {
142 continue
143 }
144 moveAll(trashDir, shards)
145 }
146
147 // Remove .tmp files from crashed indexer runs-- for example, if an indexer
148 // OOMs, it will leave around .tmp files, usually in a loop. We can remove
149 // the files now since cleanup runs with a global lock (no indexing jobs
150 // running at the same time).
151 if failures, err := filepath.Glob(filepath.Join(indexDir, "*.tmp")); err != nil {
152 errorLog.Printf("Glob: %v", err)
153 } else {
154 for _, f := range failures {
155 st, err := os.Stat(f)
156 if err != nil {
157 errorLog.Printf("Stat(%q): %v", f, err)
158 continue
159 }
160 if !st.IsDir() {
161 infoLog.Printf("removing tmp file: %s", f)
162 os.Remove(f)
163 }
164 }
165 }
166
167 metricCleanupDuration.Observe(time.Since(start).Seconds())
168}
169
170type shard struct {
171 RepoID uint32
172 RepoName string
173 Path string
174 ModTime time.Time
175 RepoTombstone bool
176}
177
178func getShards(dir string) map[uint32][]shard {
179 d, err := os.Open(dir)
180 if err != nil {
181 debugLog.Printf("failed to getShards: %s", dir)
182 return nil
183 }
184 defer d.Close()
185 names, _ := d.Readdirnames(-1)
186 sort.Strings(names)
187
188 shards := make(map[uint32][]shard, len(names))
189 for _, n := range names {
190 path := filepath.Join(dir, n)
191 fi, err := os.Stat(path)
192 if err != nil {
193 debugLog.Printf("stat failed: %v", err)
194 continue
195 }
196 if fi.IsDir() || filepath.Ext(path) != ".zoekt" {
197 continue
198 }
199
200 repos, _, err := index.ReadMetadataPathAlive(path)
201 if err != nil {
202 debugLog.Printf("failed to read shard: %v", err)
203 continue
204 }
205
206 for _, repo := range repos {
207 shards[repo.ID] = append(shards[repo.ID], shard{
208 RepoID: repo.ID,
209 RepoName: repo.Name,
210 Path: path,
211 ModTime: fi.ModTime(),
212 RepoTombstone: repo.Tombstone,
213 })
214 }
215 }
216 return shards
217}
218
219// getTombstonedRepos return a map of tombstoned repositories in dir. If a
220// repository is tombstoned in more than one compound shard, only the latest one,
221// as determined by the date of the latest commit, is returned.
222func getTombstonedRepos(dir string) map[uint32]shard {
223 paths, err := filepath.Glob(filepath.Join(dir, "compound-*.zoekt"))
224 if err != nil {
225 return nil
226 }
227 if len(paths) == 0 {
228 return nil
229 }
230
231 m := make(map[uint32]shard)
232
233 for _, p := range paths {
234 repos, _, err := index.ReadMetadataPath(p)
235 if err != nil {
236 continue
237 }
238 for _, repo := range repos {
239 if !repo.Tombstone {
240 continue
241 }
242 if v, ok := m[repo.ID]; ok && v.ModTime.After(repo.LatestCommitDate) {
243 continue
244 }
245 m[repo.ID] = shard{
246 RepoID: repo.ID,
247 RepoName: repo.Name,
248 Path: p,
249 ModTime: repo.LatestCommitDate,
250 RepoTombstone: repo.Tombstone,
251 }
252 }
253 }
254 return m
255}
256
257var incompleteRE = regexp.MustCompile(`\.zoekt[0-9]+(\.\w+)?$`)
258
259func removeIncompleteShards(dir string) {
260 d, err := os.Open(dir)
261 if err != nil {
262 debugLog.Printf("failed to removeIncompleteShards: %s", dir)
263 return
264 }
265 defer d.Close()
266
267 names, _ := d.Readdirnames(-1)
268 for _, n := range names {
269 if incompleteRE.MatchString(n) {
270 path := filepath.Join(dir, n)
271 if err := os.Remove(path); err != nil {
272 debugLog.Printf("failed to remove incomplete shard %s: %v", path, err)
273 } else {
274 debugLog.Printf("cleaned up incomplete shard %s", path)
275 }
276 }
277 }
278}
279
280func removeAll(shards ...shard) {
281 // Note on error handling here: We only expect this to fail due to
282 // IsNotExist, which is fine. Additionally this shouldn't fail
283 // partially. But if it does, and the file still exists, then we have the
284 // potential for a partial index for a repo. However, this should be
285 // exceedingly rare due to it being a mix of partial failure on something in
286 // trash + an admin re-adding a repository.
287 for _, shard := range shards {
288 paths, err := index.IndexFilePaths(shard.Path)
289 if err != nil {
290 debugLog.Printf("failed to remove shard %s: %v", shard.Path, err)
291 }
292 for _, p := range paths {
293 if err := os.Remove(p); err != nil {
294 debugLog.Printf("failed to remove shard file %s: %v", p, err)
295 }
296 }
297 }
298}
299
300func moveAll(dstDir string, shards []shard) {
301 for i, shard := range shards {
302 paths, err := index.IndexFilePaths(shard.Path)
303 if err != nil {
304 errorLog.Printf("failed to stat shard paths, deleting all shards for %s: %v", shard.RepoName, err)
305 removeAll(shards...)
306 return
307 }
308
309 // Remove all files in dstDir for shard. This is to avoid cases like not
310 // overwriting an old meta file.
311 dstShard := shard
312 dstShard.Path = filepath.Join(dstDir, filepath.Base(shard.Path))
313 removeAll(dstShard)
314
315 // HACK we do not yet support tombstones in compound shard. So we avoid
316 // needing to deal with it by always deleting the whole compound shard.
317 if strings.HasPrefix(filepath.Base(shard.Path), "compound-") {
318 infoLog.Printf("HACK removing compound shard since we don't support tombstoning: %s", shard.Path)
319 removeAll(shard)
320 continue
321 }
322
323 // Rename all paths, stop at first failure
324 for _, p := range paths {
325 dst := filepath.Join(dstDir, filepath.Base(p))
326 err = os.Rename(p, dst)
327 if err != nil {
328 break
329 }
330 }
331
332 if err != nil {
333 errorLog.Printf("failed to move shard, deleting all shards for %s: %v", shard.RepoName, err)
334 removeAll(dstShard) // some files may have moved to dst
335 removeAll(shards...)
336 return
337 }
338
339 // update shards so partial failure removes the dst path
340 shards[i] = dstShard
341 }
342}
343
344// consistentRepoName returns true if the list of shards have a unique
345// repository name.
346func consistentRepoName(shards []shard) bool {
347 if len(shards) <= 1 {
348 return true
349 }
350 name := shards[0].RepoName
351 for _, shard := range shards[1:] {
352 if shard.RepoName != name {
353 return false
354 }
355 }
356 return true
357}
358
359// maybeSetTombstone will call zoekt.SetTombstone for repoID if shards
360// represents a compound shard. It returns true if shards represents a
361// compound shard.
362func maybeSetTombstone(shards []shard, repoID uint32) bool {
363 // 1 repo can be split across many simple shards but it should only be contained
364 // in 1 compound shard. Hence we check that len(shards)==1 and only consider the
365 // shard at index 0.
366 if len(shards) != 1 || !strings.HasPrefix(filepath.Base(shards[0].Path), "compound-") {
367 return false
368 }
369
370 if err := index.SetTombstone(shards[0].Path, repoID); err != nil {
371 errorLog.Printf("error setting tombstone for %d in shard %s: %s. Removing shard\n", repoID, shards[0].Path, err)
372 _ = os.Remove(shards[0].Path)
373 }
374 return true
375}
376
377var metricVacuumRunning = promauto.NewGauge(prometheus.GaugeOpts{
378 Name: "index_vacuum_running",
379 Help: "Set to 1 if indexserver's vacuum job is running.",
380})
381
382var metricNumberShards = promauto.NewGauge(prometheus.GaugeOpts{
383 Name: "index_number_shards",
384 Help: "The number of total shards.",
385})
386
387var metricNumberCompoundShards = promauto.NewGauge(prometheus.GaugeOpts{
388 Name: "index_number_compound_shards",
389 Help: "The number of compound shards.",
390})
391
392// vacuum removes tombstoned repos from compound shards and removes compound
393// shards if they shrink below minSizeBytes. Vacuum locks the index directory for
394// each compound shard it vacuums.
395func (s *Server) vacuum() {
396 metricVacuumRunning.Set(1)
397 defer metricVacuumRunning.Set(0)
398
399 d, err := os.Open(s.IndexDir)
400 if err != nil {
401 return
402 }
403 defer d.Close()
404 fns, _ := d.Readdirnames(-1)
405
406 for _, fn := range fns {
407 // We could run this over all shards, but based on our current setup, simple
408 // shards won't have tombstones but instead will be moved to .trash.
409 if !strings.HasPrefix(fn, "compound-") || !strings.HasSuffix(fn, ".zoekt") {
410 continue
411 }
412
413 path := filepath.Join(s.IndexDir, fn)
414 info, err := os.Stat(path)
415 if err != nil {
416 errorLog.Printf("vacuum stat failed: %v", err)
417 continue
418 }
419
420 if info.Size() < s.mergeOpts.minSizeBytes {
421 cmd := exec.Command("zoekt-merge-index", "explode", path)
422
423 var b []byte
424 s.muIndexDir.Global(func() {
425 b, err = cmd.CombinedOutput()
426 })
427
428 if err != nil {
429 errorLog.Printf("failed to explode compound shard: shard=%s out=%s err=%s", path, string(b), err)
430 }
431 infoLog.Printf("exploded compound shard: shard=%s", path)
432 continue
433 }
434
435 s.muIndexDir.Global(func() {
436 _, err = removeTombstones(path)
437 })
438
439 if err != nil {
440 errorLog.Printf("error while removing tombstones in %s: %s", path, err)
441 }
442 }
443}
444
445var mockMerger func() error
446
447// removeTombstones removes all tombstones from a compound shard at fn by merging
448// the compound shard with itself.
449func removeTombstones(fn string) ([]*zoekt.Repository, error) {
450 var runMerge func() error
451 if mockMerger != nil {
452 runMerge = mockMerger
453 } else {
454 runMerge = exec.Command("zoekt-merge-index", "merge", fn).Run
455 }
456
457 repos, _, err := index.ReadMetadataPath(fn)
458 if err != nil {
459 return nil, fmt.Errorf("zoekt.ReadMetadataPath: %s", err)
460 }
461
462 var tombstones []*zoekt.Repository
463 for _, r := range repos {
464 if r.Tombstone {
465 tombstones = append(tombstones, r)
466 }
467 }
468 if len(tombstones) == 0 {
469 return nil, nil
470 }
471
472 defer func() {
473 paths, err := index.IndexFilePaths(fn)
474 if err != nil {
475 return
476 }
477 for _, path := range paths {
478 os.Remove(path)
479 }
480 }()
481 err = runMerge()
482 if err != nil {
483 return nil, fmt.Errorf("runMerge: %s", err)
484 }
485 return tombstones, nil
486}