fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "fmt"
5 "log"
6 "os"
7 "os/exec"
8 "path/filepath"
9 "sort"
10 "strings"
11 "time"
12
13 "github.com/grafana/regexp"
14 "github.com/prometheus/client_golang/prometheus"
15 "github.com/prometheus/client_golang/prometheus/promauto"
16 "gopkg.in/natefinch/lumberjack.v2"
17
18 "github.com/sourcegraph/zoekt"
19)
20
21var metricCleanupDuration = promauto.NewHistogram(prometheus.HistogramOpts{
22 Name: "index_cleanup_duration_seconds",
23 Help: "The duration of one cleanup run",
24 Buckets: prometheus.LinearBuckets(1, 1, 10),
25})
26
27// cleanup trashes shards in indexDir that do not exist in repos. For repos
28// that do not exist in indexDir, but do in indexDir/.trash it will move them
29// back into indexDir. Additionally it uses now to remove shards that have
30// been in the trash for 24 hours. It also deletes .tmp files older than 4 hours.
31func cleanup(indexDir string, repos []uint32, now time.Time, shardMerging bool) {
32 start := time.Now()
33 trashDir := filepath.Join(indexDir, ".trash")
34 if err := os.MkdirAll(trashDir, 0755); err != nil {
35 log.Printf("failed to create trash dir: %v", err)
36 }
37
38 trash := getShards(trashDir)
39 tombtones := getTombstonedRepos(indexDir)
40 index := getShards(indexDir)
41
42 // trash: Remove old shards and conflicts with index
43 minAge := now.Add(-24 * time.Hour)
44 for repo, shards := range trash {
45 old := false
46 for _, shard := range shards {
47 if shard.ModTime.Before(minAge) {
48 old = true
49 } else if shard.ModTime.After(now) {
50 debug.Printf("trashed shard %s has timestamp in the future, reseting to now", shard.Path)
51 _ = os.Chtimes(shard.Path, now, now)
52 }
53 }
54
55 if _, conflicts := index[repo]; !conflicts && !old {
56 continue
57 }
58
59 log.Printf("removing old shards from trash for %v", repo)
60 removeAll(shards...)
61 delete(trash, repo)
62 }
63
64 // tombstones: Remove tombstones that conflict with index or trash. After this,
65 // tombstones only contain repos that are neither in the trash nor in the index.
66 for repo := range tombtones {
67 if _, conflicts := index[repo]; conflicts {
68 delete(tombtones, repo)
69 }
70 // Trash takes precedence over tombstones.
71 if _, conflicts := trash[repo]; conflicts {
72 delete(tombtones, repo)
73 }
74 }
75
76 // index: We are ID based, but store shards by name still. If we end up with
77 // shards that have the same ID but different names delete and start over.
78 // This can happen when a repository is renamed. In future we should make
79 // shard file names based on ID.
80 for repo, shards := range index {
81 if consistentRepoName(shards) {
82 continue
83 }
84
85 // prevent further processing since we will delete
86 delete(index, repo)
87
88 // This should be rare, so give an informative log message.
89 var paths []string
90 for _, shard := range shards {
91 paths = append(paths, filepath.Base(shard.Path))
92 }
93 log.Printf("removing shards for %v due to multiple repository names: %s", repo, strings.Join(paths, " "))
94
95 // We may be in both normal and compound shards in this case. First
96 // tombstone the compound shards so we don't just rm them.
97 simple := shards[:0]
98 for _, s := range shards {
99 if shardMerging && maybeSetTombstone([]shard{s}, repo) {
100 shardsLog(indexDir, "tombname", []shard{s})
101 } else {
102 simple = append(simple, s)
103 }
104 }
105
106 if len(simple) == 0 {
107 continue
108 }
109
110 removeAll(simple...)
111 shardsLog(indexDir, "removename", simple)
112 }
113
114 // index: Move missing repos from trash into index
115 // index: Restore deleted or tombstoned repos.
116 for _, repo := range repos {
117 // Delete from index so that index will only contain shards to be
118 // trashed.
119 delete(index, repo)
120
121 if shards, ok := trash[repo]; ok {
122 log.Printf("restoring shards from trash for %v", repo)
123 moveAll(indexDir, shards)
124 shardsLog(indexDir, "restore", shards)
125 continue
126 }
127
128 if s, ok := tombtones[repo]; ok {
129 log.Printf("removing tombstone for %v", repo)
130 err := zoekt.UnsetTombstone(s.Path, repo)
131 if err != nil {
132 log.Printf("error removing tombstone for %v: %s", repo, err)
133 } else {
134 shardsLog(indexDir, "untomb", []shard{s})
135 }
136 }
137 }
138
139 // index: Move non-existent repos into trash
140 for repo, shards := range index {
141 // Best-effort touch. If touch fails, we will just remove from the
142 // trash sooner.
143 for _, shard := range shards {
144 _ = os.Chtimes(shard.Path, now, now)
145 }
146
147 if shardMerging && maybeSetTombstone(shards, repo) {
148 shardsLog(indexDir, "tomb", shards)
149 continue
150 }
151 moveAll(trashDir, shards)
152 shardsLog(indexDir, "remove", shards)
153 }
154
155 // Remove .tmp files from crashed indexer runs-- for example, if an indexer
156 // OOMs, it will leave around .tmp files, usually in a loop. We can remove
157 // the files now since cleanup runs with a global lock (no indexing jobs
158 // running at the same time).
159 if failures, err := filepath.Glob(filepath.Join(indexDir, "*.tmp")); err != nil {
160 log.Printf("Glob: %v", err)
161 } else {
162 for _, f := range failures {
163 st, err := os.Stat(f)
164 if err != nil {
165 log.Printf("Stat(%q): %v", f, err)
166 continue
167 }
168 if !st.IsDir() {
169 log.Printf("removing tmp file: %s", f)
170 os.Remove(f)
171 }
172 }
173 }
174
175 metricCleanupDuration.Observe(time.Since(start).Seconds())
176}
177
178type shard struct {
179 RepoID uint32
180 RepoName string
181 Path string
182 ModTime time.Time
183 RepoTombstone bool
184}
185
186func getShards(dir string) map[uint32][]shard {
187 d, err := os.Open(dir)
188 if err != nil {
189 debug.Printf("failed to getShards: %s", dir)
190 return nil
191 }
192 defer d.Close()
193 names, _ := d.Readdirnames(-1)
194 sort.Strings(names)
195
196 shards := make(map[uint32][]shard, len(names))
197 for _, n := range names {
198 path := filepath.Join(dir, n)
199 fi, err := os.Stat(path)
200 if err != nil {
201 debug.Printf("stat failed: %v", err)
202 continue
203 }
204 if fi.IsDir() || filepath.Ext(path) != ".zoekt" {
205 continue
206 }
207
208 repos, _, err := zoekt.ReadMetadataPathAlive(path)
209 if err != nil {
210 debug.Printf("failed to read shard: %v", err)
211 continue
212 }
213
214 for _, repo := range repos {
215 shards[repo.ID] = append(shards[repo.ID], shard{
216 RepoID: repo.ID,
217 RepoName: repo.Name,
218 Path: path,
219 ModTime: fi.ModTime(),
220 RepoTombstone: repo.Tombstone,
221 })
222 }
223 }
224 return shards
225}
226
227// getTombstonedRepos return a map of tombstoned repositories in dir. If a
228// repository is tombstoned in more than one compound shard, only the latest one,
229// as determined by the date of the latest commit, is returned.
230func getTombstonedRepos(dir string) map[uint32]shard {
231 paths, err := filepath.Glob(filepath.Join(dir, "compound-*.zoekt"))
232 if err != nil {
233 return nil
234 }
235 if len(paths) == 0 {
236 return nil
237 }
238
239 m := make(map[uint32]shard)
240
241 for _, p := range paths {
242 repos, _, err := zoekt.ReadMetadataPath(p)
243 if err != nil {
244 continue
245 }
246 for _, repo := range repos {
247 if !repo.Tombstone {
248 continue
249 }
250 if v, ok := m[repo.ID]; ok && v.ModTime.After(repo.LatestCommitDate) {
251 continue
252 }
253 m[repo.ID] = shard{
254 RepoID: repo.ID,
255 RepoName: repo.Name,
256 Path: p,
257 ModTime: repo.LatestCommitDate,
258 RepoTombstone: repo.Tombstone,
259 }
260 }
261 }
262 return m
263}
264
265var incompleteRE = regexp.MustCompile(`\.zoekt[0-9]+(\.\w+)?$`)
266
267func removeIncompleteShards(dir string) {
268 d, err := os.Open(dir)
269 if err != nil {
270 debug.Printf("failed to removeIncompleteShards: %s", dir)
271 return
272 }
273 defer d.Close()
274
275 names, _ := d.Readdirnames(-1)
276 for _, n := range names {
277 if incompleteRE.MatchString(n) {
278 path := filepath.Join(dir, n)
279 if err := os.Remove(path); err != nil {
280 debug.Printf("failed to remove incomplete shard %s: %v", path, err)
281 } else {
282 debug.Printf("cleaned up incomplete shard %s", path)
283 }
284 }
285 }
286}
287
288func removeAll(shards ...shard) {
289 // Note on error handling here: We only expect this to fail due to
290 // IsNotExist, which is fine. Additionally this shouldn't fail
291 // partially. But if it does, and the file still exists, then we have the
292 // potential for a partial index for a repo. However, this should be
293 // exceedingly rare due to it being a mix of partial failure on something in
294 // trash + an admin re-adding a repository.
295 for _, shard := range shards {
296 paths, err := zoekt.IndexFilePaths(shard.Path)
297 if err != nil {
298 debug.Printf("failed to remove shard %s: %v", shard.Path, err)
299 }
300 for _, p := range paths {
301 if err := os.Remove(p); err != nil {
302 debug.Printf("failed to remove shard file %s: %v", p, err)
303 }
304 }
305 }
306}
307
308func moveAll(dstDir string, shards []shard) {
309 for i, shard := range shards {
310 paths, err := zoekt.IndexFilePaths(shard.Path)
311 if err != nil {
312 log.Printf("failed to stat shard paths, deleting all shards for %s: %v", shard.RepoName, err)
313 removeAll(shards...)
314 return
315 }
316
317 // Remove all files in dstDir for shard. This is to avoid cases like not
318 // overwriting an old meta file.
319 dstShard := shard
320 dstShard.Path = filepath.Join(dstDir, filepath.Base(shard.Path))
321 removeAll(dstShard)
322
323 // HACK we do not yet support tombstones in compound shard. So we avoid
324 // needing to deal with it by always deleting the whole compound shard.
325 if strings.HasPrefix(filepath.Base(shard.Path), "compound-") {
326 log.Printf("HACK removing compound shard since we don't support tombstoning: %s", shard.Path)
327 removeAll(shard)
328 continue
329 }
330
331 // Rename all paths, stop at first failure
332 for _, p := range paths {
333 dst := filepath.Join(dstDir, filepath.Base(p))
334 err = os.Rename(p, dst)
335 if err != nil {
336 break
337 }
338 }
339
340 if err != nil {
341 log.Printf("failed to move shard, deleting all shards for %s: %v", shard.RepoName, err)
342 removeAll(dstShard) // some files may have moved to dst
343 removeAll(shards...)
344 return
345 }
346
347 // update shards so partial failure removes the dst path
348 shards[i] = dstShard
349 }
350}
351
352// consistentRepoName returns true if the list of shards have a unique
353// repository name.
354func consistentRepoName(shards []shard) bool {
355 if len(shards) <= 1 {
356 return true
357 }
358 name := shards[0].RepoName
359 for _, shard := range shards[1:] {
360 if shard.RepoName != name {
361 return false
362 }
363 }
364 return true
365}
366
367// maybeSetTombstone will call zoekt.SetTombstone for repoID if shards
368// represents a compound shard. It returns true if shards represents a
369// compound shard.
370func maybeSetTombstone(shards []shard, repoID uint32) bool {
371 // 1 repo can be split across many simple shards but it should only be contained
372 // in 1 compound shard. Hence we check that len(shards)==1 and only consider the
373 // shard at index 0.
374 if len(shards) != 1 || !strings.HasPrefix(filepath.Base(shards[0].Path), "compound-") {
375 return false
376 }
377
378 if err := zoekt.SetTombstone(shards[0].Path, repoID); err != nil {
379 log.Printf("error setting tombstone for %d in shard %s: %s. Removing shard\n", repoID, shards[0].Path, err)
380 _ = os.Remove(shards[0].Path)
381 }
382 return true
383}
384
385func shardsLog(indexDir, action string, shards []shard) {
386 shardLogger := &lumberjack.Logger{
387 Filename: filepath.Join(indexDir, "zoekt-indexserver-shard-log.tsv"),
388 MaxSize: 100, // Megabyte
389 MaxBackups: 5,
390 }
391 defer shardLogger.Close()
392
393 for _, s := range shards {
394 shardName := filepath.Base(s.Path)
395 var shardSize int64
396 if fi, err := os.Stat(filepath.Join(indexDir, shardName)); err == nil {
397 shardSize = fi.Size()
398 }
399 _, _ = fmt.Fprintf(shardLogger, "%s\t%s\t%s\t%d\t%s\t%d\n", time.Now().UTC().Format(time.RFC3339), action, shardName, shardSize, s.RepoName, s.RepoID)
400 }
401}
402
403var metricVacuumRunning = promauto.NewGauge(prometheus.GaugeOpts{
404 Name: "index_vacuum_running",
405 Help: "Set to 1 if indexserver's vacuum job is running.",
406})
407
408var metricNumberCompoundShards = promauto.NewGauge(prometheus.GaugeOpts{
409 Name: "index_number_compound_shards",
410 Help: "The number of compound shards.",
411})
412
413// vacuum removes tombstoned repos from compound shards and removes compound
414// shards if they shrink below minSizeBytes. Vacuum locks the index directory for
415// each compound shard it vacuums.
416func (s *Server) vacuum() {
417 metricVacuumRunning.Set(1)
418 defer metricVacuumRunning.Set(0)
419
420 d, err := os.Open(s.IndexDir)
421 if err != nil {
422 return
423 }
424 defer d.Close()
425 fns, _ := d.Readdirnames(-1)
426
427 for _, fn := range fns {
428 // We could run this over all shards, but based on our current setup, simple
429 // shards won't have tombstones but instead will be moved to .trash.
430 if !strings.HasPrefix(fn, "compound-") || !strings.HasSuffix(fn, ".zoekt") {
431 continue
432 }
433
434 path := filepath.Join(s.IndexDir, fn)
435 info, err := os.Stat(path)
436 if err != nil {
437 debug.Printf("vacuum stat failed: %v", err)
438 continue
439 }
440
441 if info.Size() < s.mergeOpts.minSizeBytes {
442 cmd := exec.Command("zoekt-merge-index", "explode", path)
443
444 var b []byte
445 s.muIndexDir.Global(func() {
446 b, err = cmd.CombinedOutput()
447 })
448
449 if err != nil {
450 debug.Printf("failed to explode compound shard %s: %s", path, string(b))
451 } else {
452 shardsLog(s.IndexDir, "explode", []shard{{Path: path}})
453 }
454 continue
455 }
456
457 var removed []*zoekt.Repository
458 s.muIndexDir.Global(func() {
459 removed, err = removeTombstones(path)
460 })
461
462 if err != nil {
463 debug.Printf("error while removing tombstones in %s: %s", fn, err)
464 }
465 for _, repo := range removed {
466 shardsLog(s.IndexDir, "vac", []shard{{
467 RepoID: repo.ID,
468 RepoName: repo.Name,
469 Path: filepath.Join(s.IndexDir, fn),
470 ModTime: info.ModTime(),
471 }})
472 }
473 }
474}
475
476var mockMerger func() error
477
478// removeTombstones removes all tombstones from a compound shard at fn by merging
479// the compound shard with itself.
480func removeTombstones(fn string) ([]*zoekt.Repository, error) {
481 var runMerge func() error
482 if mockMerger != nil {
483 runMerge = mockMerger
484 } else {
485 runMerge = exec.Command("zoekt-merge-index", "merge", fn).Run
486 }
487
488 repos, _, err := zoekt.ReadMetadataPath(fn)
489 if err != nil {
490 return nil, fmt.Errorf("zoekt.ReadMetadataPath: %s", err)
491 }
492
493 var tombstones []*zoekt.Repository
494 for _, r := range repos {
495 if r.Tombstone {
496 tombstones = append(tombstones, r)
497 }
498 }
499 if len(tombstones) == 0 {
500 return nil, nil
501 }
502
503 defer func() {
504 paths, err := zoekt.IndexFilePaths(fn)
505 if err != nil {
506 return
507 }
508 for _, path := range paths {
509 os.Remove(path)
510 }
511 }()
512 err = runMerge()
513 if err != nil {
514 return nil, fmt.Errorf("runMerge: %s", err)
515 }
516 return tombstones, nil
517}