fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "fmt"
5 "log"
6 "os"
7 "os/exec"
8 "path/filepath"
9 "sort"
10 "strings"
11 "time"
12
13 "github.com/grafana/regexp"
14 "github.com/prometheus/client_golang/prometheus"
15 "github.com/prometheus/client_golang/prometheus/promauto"
16
17 "github.com/sourcegraph/zoekt"
18)
19
20var metricCleanupDuration = promauto.NewHistogram(prometheus.HistogramOpts{
21 Name: "index_cleanup_duration_seconds",
22 Help: "The duration of one cleanup run",
23 Buckets: prometheus.LinearBuckets(1, 1, 10),
24})
25
26// cleanup trashes shards in indexDir that do not exist in repos. For repos
27// that do not exist in indexDir, but do in indexDir/.trash it will move them
28// back into indexDir. Additionally it uses now to remove shards that have
29// been in the trash for 24 hours. It also deletes .tmp files older than 4 hours.
30func cleanup(indexDir string, repos []uint32, now time.Time, shardMerging bool) {
31 start := time.Now()
32 trashDir := filepath.Join(indexDir, ".trash")
33 if err := os.MkdirAll(trashDir, 0755); err != nil {
34 log.Printf("failed to create trash dir: %v", err)
35 }
36
37 trash := getShards(trashDir)
38 tombtones := getTombstonedRepos(indexDir)
39 index := getShards(indexDir)
40
41 // trash: Remove old shards and conflicts with index
42 minAge := now.Add(-24 * time.Hour)
43 for repo, shards := range trash {
44 old := false
45 for _, shard := range shards {
46 if shard.ModTime.Before(minAge) {
47 old = true
48 } else if shard.ModTime.After(now) {
49 debug.Printf("trashed shard %s has timestamp in the future, reseting to now", shard.Path)
50 _ = os.Chtimes(shard.Path, now, now)
51 }
52 }
53
54 if _, conflicts := index[repo]; !conflicts && !old {
55 continue
56 }
57
58 log.Printf("removing old shards from trash for %v", repo)
59 removeAll(shards...)
60 delete(trash, repo)
61 }
62
63 // tombstones: Remove tombstones that conflict with index or trash. After this,
64 // tombstones only contain repos that are neither in the trash nor in the index.
65 for repo := range tombtones {
66 if _, conflicts := index[repo]; conflicts {
67 delete(tombtones, repo)
68 }
69 // Trash takes precedence over tombstones.
70 if _, conflicts := trash[repo]; conflicts {
71 delete(tombtones, repo)
72 }
73 }
74
75 // index: We are ID based, but store shards by name still. If we end up with
76 // shards that have the same ID but different names delete and start over.
77 // This can happen when a repository is renamed. In future we should make
78 // shard file names based on ID.
79 for repo, shards := range index {
80 if consistentRepoName(shards) {
81 continue
82 }
83
84 // prevent further processing since we will delete
85 delete(index, repo)
86
87 // This should be rare, so give an informative log message.
88 var paths []string
89 for _, shard := range shards {
90 paths = append(paths, filepath.Base(shard.Path))
91 }
92 log.Printf("removing shards for %v due to multiple repository names: %s", repo, strings.Join(paths, " "))
93
94 // We may be in both normal and compound shards in this case. First
95 // tombstone the compound shards so we don't just rm them.
96 simple := shards[:0]
97 for _, s := range shards {
98 if shardMerging && maybeSetTombstone([]shard{s}, repo) {
99 continue
100 }
101
102 simple = append(simple, s)
103 }
104
105 if len(simple) == 0 {
106 continue
107 }
108
109 removeAll(simple...)
110 }
111
112 // index: Move missing repos from trash into index
113 // index: Restore deleted or tombstoned repos.
114 for _, repo := range repos {
115 // Delete from index so that index will only contain shards to be
116 // trashed.
117 delete(index, repo)
118
119 if shards, ok := trash[repo]; ok {
120 log.Printf("restoring shards from trash for %v", repo)
121 moveAll(indexDir, shards)
122 continue
123 }
124
125 if s, ok := tombtones[repo]; ok {
126 log.Printf("removing tombstone for %v", repo)
127 err := zoekt.UnsetTombstone(s.Path, repo)
128 if err != nil {
129 log.Printf("error removing tombstone for %v: %s", repo, err)
130 }
131 }
132 }
133
134 // index: Move non-existent repos into trash
135 for repo, shards := range index {
136 // Best-effort touch. If touch fails, we will just remove from the
137 // trash sooner.
138 for _, shard := range shards {
139 _ = os.Chtimes(shard.Path, now, now)
140 }
141
142 if shardMerging && maybeSetTombstone(shards, repo) {
143 continue
144 }
145 moveAll(trashDir, shards)
146 }
147
148 // Remove .tmp files from crashed indexer runs-- for example, if an indexer
149 // OOMs, it will leave around .tmp files, usually in a loop. We can remove
150 // the files now since cleanup runs with a global lock (no indexing jobs
151 // running at the same time).
152 if failures, err := filepath.Glob(filepath.Join(indexDir, "*.tmp")); err != nil {
153 log.Printf("Glob: %v", err)
154 } else {
155 for _, f := range failures {
156 st, err := os.Stat(f)
157 if err != nil {
158 log.Printf("Stat(%q): %v", f, err)
159 continue
160 }
161 if !st.IsDir() {
162 log.Printf("removing tmp file: %s", f)
163 os.Remove(f)
164 }
165 }
166 }
167
168 metricCleanupDuration.Observe(time.Since(start).Seconds())
169}
170
171type shard struct {
172 RepoID uint32
173 RepoName string
174 Path string
175 ModTime time.Time
176 RepoTombstone bool
177}
178
179func getShards(dir string) map[uint32][]shard {
180 d, err := os.Open(dir)
181 if err != nil {
182 debug.Printf("failed to getShards: %s", dir)
183 return nil
184 }
185 defer d.Close()
186 names, _ := d.Readdirnames(-1)
187 sort.Strings(names)
188
189 shards := make(map[uint32][]shard, len(names))
190 for _, n := range names {
191 path := filepath.Join(dir, n)
192 fi, err := os.Stat(path)
193 if err != nil {
194 debug.Printf("stat failed: %v", err)
195 continue
196 }
197 if fi.IsDir() || filepath.Ext(path) != ".zoekt" {
198 continue
199 }
200
201 repos, _, err := zoekt.ReadMetadataPathAlive(path)
202 if err != nil {
203 debug.Printf("failed to read shard: %v", err)
204 continue
205 }
206
207 for _, repo := range repos {
208 shards[repo.ID] = append(shards[repo.ID], shard{
209 RepoID: repo.ID,
210 RepoName: repo.Name,
211 Path: path,
212 ModTime: fi.ModTime(),
213 RepoTombstone: repo.Tombstone,
214 })
215 }
216 }
217 return shards
218}
219
220// getTombstonedRepos return a map of tombstoned repositories in dir. If a
221// repository is tombstoned in more than one compound shard, only the latest one,
222// as determined by the date of the latest commit, is returned.
223func getTombstonedRepos(dir string) map[uint32]shard {
224 paths, err := filepath.Glob(filepath.Join(dir, "compound-*.zoekt"))
225 if err != nil {
226 return nil
227 }
228 if len(paths) == 0 {
229 return nil
230 }
231
232 m := make(map[uint32]shard)
233
234 for _, p := range paths {
235 repos, _, err := zoekt.ReadMetadataPath(p)
236 if err != nil {
237 continue
238 }
239 for _, repo := range repos {
240 if !repo.Tombstone {
241 continue
242 }
243 if v, ok := m[repo.ID]; ok && v.ModTime.After(repo.LatestCommitDate) {
244 continue
245 }
246 m[repo.ID] = shard{
247 RepoID: repo.ID,
248 RepoName: repo.Name,
249 Path: p,
250 ModTime: repo.LatestCommitDate,
251 RepoTombstone: repo.Tombstone,
252 }
253 }
254 }
255 return m
256}
257
258var incompleteRE = regexp.MustCompile(`\.zoekt[0-9]+(\.\w+)?$`)
259
260func removeIncompleteShards(dir string) {
261 d, err := os.Open(dir)
262 if err != nil {
263 debug.Printf("failed to removeIncompleteShards: %s", dir)
264 return
265 }
266 defer d.Close()
267
268 names, _ := d.Readdirnames(-1)
269 for _, n := range names {
270 if incompleteRE.MatchString(n) {
271 path := filepath.Join(dir, n)
272 if err := os.Remove(path); err != nil {
273 debug.Printf("failed to remove incomplete shard %s: %v", path, err)
274 } else {
275 debug.Printf("cleaned up incomplete shard %s", path)
276 }
277 }
278 }
279}
280
281func removeAll(shards ...shard) {
282 // Note on error handling here: We only expect this to fail due to
283 // IsNotExist, which is fine. Additionally this shouldn't fail
284 // partially. But if it does, and the file still exists, then we have the
285 // potential for a partial index for a repo. However, this should be
286 // exceedingly rare due to it being a mix of partial failure on something in
287 // trash + an admin re-adding a repository.
288 for _, shard := range shards {
289 paths, err := zoekt.IndexFilePaths(shard.Path)
290 if err != nil {
291 debug.Printf("failed to remove shard %s: %v", shard.Path, err)
292 }
293 for _, p := range paths {
294 if err := os.Remove(p); err != nil {
295 debug.Printf("failed to remove shard file %s: %v", p, err)
296 }
297 }
298 }
299}
300
301func moveAll(dstDir string, shards []shard) {
302 for i, shard := range shards {
303 paths, err := zoekt.IndexFilePaths(shard.Path)
304 if err != nil {
305 log.Printf("failed to stat shard paths, deleting all shards for %s: %v", shard.RepoName, err)
306 removeAll(shards...)
307 return
308 }
309
310 // Remove all files in dstDir for shard. This is to avoid cases like not
311 // overwriting an old meta file.
312 dstShard := shard
313 dstShard.Path = filepath.Join(dstDir, filepath.Base(shard.Path))
314 removeAll(dstShard)
315
316 // HACK we do not yet support tombstones in compound shard. So we avoid
317 // needing to deal with it by always deleting the whole compound shard.
318 if strings.HasPrefix(filepath.Base(shard.Path), "compound-") {
319 log.Printf("HACK removing compound shard since we don't support tombstoning: %s", shard.Path)
320 removeAll(shard)
321 continue
322 }
323
324 // Rename all paths, stop at first failure
325 for _, p := range paths {
326 dst := filepath.Join(dstDir, filepath.Base(p))
327 err = os.Rename(p, dst)
328 if err != nil {
329 break
330 }
331 }
332
333 if err != nil {
334 log.Printf("failed to move shard, deleting all shards for %s: %v", shard.RepoName, err)
335 removeAll(dstShard) // some files may have moved to dst
336 removeAll(shards...)
337 return
338 }
339
340 // update shards so partial failure removes the dst path
341 shards[i] = dstShard
342 }
343}
344
345// consistentRepoName returns true if the list of shards have a unique
346// repository name.
347func consistentRepoName(shards []shard) bool {
348 if len(shards) <= 1 {
349 return true
350 }
351 name := shards[0].RepoName
352 for _, shard := range shards[1:] {
353 if shard.RepoName != name {
354 return false
355 }
356 }
357 return true
358}
359
360// maybeSetTombstone will call zoekt.SetTombstone for repoID if shards
361// represents a compound shard. It returns true if shards represents a
362// compound shard.
363func maybeSetTombstone(shards []shard, repoID uint32) bool {
364 // 1 repo can be split across many simple shards but it should only be contained
365 // in 1 compound shard. Hence we check that len(shards)==1 and only consider the
366 // shard at index 0.
367 if len(shards) != 1 || !strings.HasPrefix(filepath.Base(shards[0].Path), "compound-") {
368 return false
369 }
370
371 if err := zoekt.SetTombstone(shards[0].Path, repoID); err != nil {
372 log.Printf("error setting tombstone for %d in shard %s: %s. Removing shard\n", repoID, shards[0].Path, err)
373 _ = os.Remove(shards[0].Path)
374 }
375 return true
376}
377
378var metricVacuumRunning = promauto.NewGauge(prometheus.GaugeOpts{
379 Name: "index_vacuum_running",
380 Help: "Set to 1 if indexserver's vacuum job is running.",
381})
382
383var metricNumberCompoundShards = promauto.NewGauge(prometheus.GaugeOpts{
384 Name: "index_number_compound_shards",
385 Help: "The number of compound shards.",
386})
387
388// vacuum removes tombstoned repos from compound shards and removes compound
389// shards if they shrink below minSizeBytes. Vacuum locks the index directory for
390// each compound shard it vacuums.
391func (s *Server) vacuum() {
392 metricVacuumRunning.Set(1)
393 defer metricVacuumRunning.Set(0)
394
395 d, err := os.Open(s.IndexDir)
396 if err != nil {
397 return
398 }
399 defer d.Close()
400 fns, _ := d.Readdirnames(-1)
401
402 for _, fn := range fns {
403 // We could run this over all shards, but based on our current setup, simple
404 // shards won't have tombstones but instead will be moved to .trash.
405 if !strings.HasPrefix(fn, "compound-") || !strings.HasSuffix(fn, ".zoekt") {
406 continue
407 }
408
409 path := filepath.Join(s.IndexDir, fn)
410 info, err := os.Stat(path)
411 if err != nil {
412 debug.Printf("vacuum stat failed: %v", err)
413 continue
414 }
415
416 if info.Size() < s.mergeOpts.minSizeBytes {
417 cmd := exec.Command("zoekt-merge-index", "explode", path)
418
419 var b []byte
420 s.muIndexDir.Global(func() {
421 b, err = cmd.CombinedOutput()
422 })
423
424 if err != nil {
425 debug.Printf("failed to explode compound shard %s: %s", path, string(b))
426 }
427 continue
428 }
429
430 s.muIndexDir.Global(func() {
431 _, err = removeTombstones(path)
432 })
433
434 if err != nil {
435 debug.Printf("error while removing tombstones in %s: %s", fn, err)
436 }
437 }
438}
439
440var mockMerger func() error
441
442// removeTombstones removes all tombstones from a compound shard at fn by merging
443// the compound shard with itself.
444func removeTombstones(fn string) ([]*zoekt.Repository, error) {
445 var runMerge func() error
446 if mockMerger != nil {
447 runMerge = mockMerger
448 } else {
449 runMerge = exec.Command("zoekt-merge-index", "merge", fn).Run
450 }
451
452 repos, _, err := zoekt.ReadMetadataPath(fn)
453 if err != nil {
454 return nil, fmt.Errorf("zoekt.ReadMetadataPath: %s", err)
455 }
456
457 var tombstones []*zoekt.Repository
458 for _, r := range repos {
459 if r.Tombstone {
460 tombstones = append(tombstones, r)
461 }
462 }
463 if len(tombstones) == 0 {
464 return nil, nil
465 }
466
467 defer func() {
468 paths, err := zoekt.IndexFilePaths(fn)
469 if err != nil {
470 return
471 }
472 for _, path := range paths {
473 os.Remove(path)
474 }
475 }()
476 err = runMerge()
477 if err != nil {
478 return nil, fmt.Errorf("runMerge: %s", err)
479 }
480 return tombstones, nil
481}