fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "fmt" 5 "log" 6 "os" 7 "os/exec" 8 "path/filepath" 9 "sort" 10 "strings" 11 "time" 12 13 "github.com/grafana/regexp" 14 "github.com/prometheus/client_golang/prometheus" 15 "github.com/prometheus/client_golang/prometheus/promauto" 16 "gopkg.in/natefinch/lumberjack.v2" 17 18 "github.com/sourcegraph/zoekt" 19) 20 21var metricCleanupDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 22 Name: "index_cleanup_duration_seconds", 23 Help: "The duration of one cleanup run", 24 Buckets: prometheus.LinearBuckets(1, 1, 10), 25}) 26 27// cleanup trashes shards in indexDir that do not exist in repos. For repos 28// that do not exist in indexDir, but do in indexDir/.trash it will move them 29// back into indexDir. Additionally it uses now to remove shards that have 30// been in the trash for 24 hours. It also deletes .tmp files older than 4 hours. 31func cleanup(indexDir string, repos []uint32, now time.Time, shardMerging bool) { 32 start := time.Now() 33 trashDir := filepath.Join(indexDir, ".trash") 34 if err := os.MkdirAll(trashDir, 0755); err != nil { 35 log.Printf("failed to create trash dir: %v", err) 36 } 37 38 trash := getShards(trashDir) 39 tombtones := getTombstonedRepos(indexDir) 40 index := getShards(indexDir) 41 42 // trash: Remove old shards and conflicts with index 43 minAge := now.Add(-24 * time.Hour) 44 for repo, shards := range trash { 45 old := false 46 for _, shard := range shards { 47 if shard.ModTime.Before(minAge) { 48 old = true 49 } else if shard.ModTime.After(now) { 50 debug.Printf("trashed shard %s has timestamp in the future, reseting to now", shard.Path) 51 _ = os.Chtimes(shard.Path, now, now) 52 } 53 } 54 55 if _, conflicts := index[repo]; !conflicts && !old { 56 continue 57 } 58 59 log.Printf("removing old shards from trash for %v", repo) 60 removeAll(shards...) 61 delete(trash, repo) 62 } 63 64 // tombstones: Remove tombstones that conflict with index or trash. After this, 65 // tombstones only contain repos that are neither in the trash nor in the index. 66 for repo := range tombtones { 67 if _, conflicts := index[repo]; conflicts { 68 delete(tombtones, repo) 69 } 70 // Trash takes precedence over tombstones. 71 if _, conflicts := trash[repo]; conflicts { 72 delete(tombtones, repo) 73 } 74 } 75 76 // index: We are ID based, but store shards by name still. If we end up with 77 // shards that have the same ID but different names delete and start over. 78 // This can happen when a repository is renamed. In future we should make 79 // shard file names based on ID. 80 for repo, shards := range index { 81 if consistentRepoName(shards) { 82 continue 83 } 84 85 // prevent further processing since we will delete 86 delete(index, repo) 87 88 // This should be rare, so give an informative log message. 89 var paths []string 90 for _, shard := range shards { 91 paths = append(paths, filepath.Base(shard.Path)) 92 } 93 log.Printf("removing shards for %v due to multiple repository names: %s", repo, strings.Join(paths, " ")) 94 95 // We may be in both normal and compound shards in this case. First 96 // tombstone the compound shards so we don't just rm them. 97 simple := shards[:0] 98 for _, s := range shards { 99 if shardMerging && maybeSetTombstone([]shard{s}, repo) { 100 shardsLog(indexDir, "tombname", []shard{s}) 101 } else { 102 simple = append(simple, s) 103 } 104 } 105 106 if len(simple) == 0 { 107 continue 108 } 109 110 removeAll(simple...) 111 shardsLog(indexDir, "removename", simple) 112 } 113 114 // index: Move missing repos from trash into index 115 // index: Restore deleted or tombstoned repos. 116 for _, repo := range repos { 117 // Delete from index so that index will only contain shards to be 118 // trashed. 119 delete(index, repo) 120 121 if shards, ok := trash[repo]; ok { 122 log.Printf("restoring shards from trash for %v", repo) 123 moveAll(indexDir, shards) 124 shardsLog(indexDir, "restore", shards) 125 continue 126 } 127 128 if s, ok := tombtones[repo]; ok { 129 log.Printf("removing tombstone for %v", repo) 130 err := zoekt.UnsetTombstone(s.Path, repo) 131 if err != nil { 132 log.Printf("error removing tombstone for %v: %s", repo, err) 133 } else { 134 shardsLog(indexDir, "untomb", []shard{s}) 135 } 136 } 137 } 138 139 // index: Move non-existent repos into trash 140 for repo, shards := range index { 141 // Best-effort touch. If touch fails, we will just remove from the 142 // trash sooner. 143 for _, shard := range shards { 144 _ = os.Chtimes(shard.Path, now, now) 145 } 146 147 if shardMerging && maybeSetTombstone(shards, repo) { 148 shardsLog(indexDir, "tomb", shards) 149 continue 150 } 151 moveAll(trashDir, shards) 152 shardsLog(indexDir, "remove", shards) 153 } 154 155 // Remove .tmp files from crashed indexer runs-- for example, if an indexer 156 // OOMs, it will leave around .tmp files, usually in a loop. We can remove 157 // the files now since cleanup runs with a global lock (no indexing jobs 158 // running at the same time). 159 if failures, err := filepath.Glob(filepath.Join(indexDir, "*.tmp")); err != nil { 160 log.Printf("Glob: %v", err) 161 } else { 162 for _, f := range failures { 163 st, err := os.Stat(f) 164 if err != nil { 165 log.Printf("Stat(%q): %v", f, err) 166 continue 167 } 168 if !st.IsDir() { 169 log.Printf("removing tmp file: %s", f) 170 os.Remove(f) 171 } 172 } 173 } 174 175 metricCleanupDuration.Observe(time.Since(start).Seconds()) 176} 177 178type shard struct { 179 RepoID uint32 180 RepoName string 181 Path string 182 ModTime time.Time 183 RepoTombstone bool 184} 185 186func getShards(dir string) map[uint32][]shard { 187 d, err := os.Open(dir) 188 if err != nil { 189 debug.Printf("failed to getShards: %s", dir) 190 return nil 191 } 192 defer d.Close() 193 names, _ := d.Readdirnames(-1) 194 sort.Strings(names) 195 196 shards := make(map[uint32][]shard, len(names)) 197 for _, n := range names { 198 path := filepath.Join(dir, n) 199 fi, err := os.Stat(path) 200 if err != nil { 201 debug.Printf("stat failed: %v", err) 202 continue 203 } 204 if fi.IsDir() || filepath.Ext(path) != ".zoekt" { 205 continue 206 } 207 208 repos, _, err := zoekt.ReadMetadataPathAlive(path) 209 if err != nil { 210 debug.Printf("failed to read shard: %v", err) 211 continue 212 } 213 214 for _, repo := range repos { 215 shards[repo.ID] = append(shards[repo.ID], shard{ 216 RepoID: repo.ID, 217 RepoName: repo.Name, 218 Path: path, 219 ModTime: fi.ModTime(), 220 RepoTombstone: repo.Tombstone, 221 }) 222 } 223 } 224 return shards 225} 226 227// getTombstonedRepos return a map of tombstoned repositories in dir. If a 228// repository is tombstoned in more than one compound shard, only the latest one, 229// as determined by the date of the latest commit, is returned. 230func getTombstonedRepos(dir string) map[uint32]shard { 231 paths, err := filepath.Glob(filepath.Join(dir, "compound-*.zoekt")) 232 if err != nil { 233 return nil 234 } 235 if len(paths) == 0 { 236 return nil 237 } 238 239 m := make(map[uint32]shard) 240 241 for _, p := range paths { 242 repos, _, err := zoekt.ReadMetadataPath(p) 243 if err != nil { 244 continue 245 } 246 for _, repo := range repos { 247 if !repo.Tombstone { 248 continue 249 } 250 if v, ok := m[repo.ID]; ok && v.ModTime.After(repo.LatestCommitDate) { 251 continue 252 } 253 m[repo.ID] = shard{ 254 RepoID: repo.ID, 255 RepoName: repo.Name, 256 Path: p, 257 ModTime: repo.LatestCommitDate, 258 RepoTombstone: repo.Tombstone, 259 } 260 } 261 } 262 return m 263} 264 265var incompleteRE = regexp.MustCompile(`\.zoekt[0-9]+(\.\w+)?$`) 266 267func removeIncompleteShards(dir string) { 268 d, err := os.Open(dir) 269 if err != nil { 270 debug.Printf("failed to removeIncompleteShards: %s", dir) 271 return 272 } 273 defer d.Close() 274 275 names, _ := d.Readdirnames(-1) 276 for _, n := range names { 277 if incompleteRE.MatchString(n) { 278 path := filepath.Join(dir, n) 279 if err := os.Remove(path); err != nil { 280 debug.Printf("failed to remove incomplete shard %s: %v", path, err) 281 } else { 282 debug.Printf("cleaned up incomplete shard %s", path) 283 } 284 } 285 } 286} 287 288func removeAll(shards ...shard) { 289 // Note on error handling here: We only expect this to fail due to 290 // IsNotExist, which is fine. Additionally this shouldn't fail 291 // partially. But if it does, and the file still exists, then we have the 292 // potential for a partial index for a repo. However, this should be 293 // exceedingly rare due to it being a mix of partial failure on something in 294 // trash + an admin re-adding a repository. 295 for _, shard := range shards { 296 paths, err := zoekt.IndexFilePaths(shard.Path) 297 if err != nil { 298 debug.Printf("failed to remove shard %s: %v", shard.Path, err) 299 } 300 for _, p := range paths { 301 if err := os.Remove(p); err != nil { 302 debug.Printf("failed to remove shard file %s: %v", p, err) 303 } 304 } 305 } 306} 307 308func moveAll(dstDir string, shards []shard) { 309 for i, shard := range shards { 310 paths, err := zoekt.IndexFilePaths(shard.Path) 311 if err != nil { 312 log.Printf("failed to stat shard paths, deleting all shards for %s: %v", shard.RepoName, err) 313 removeAll(shards...) 314 return 315 } 316 317 // Remove all files in dstDir for shard. This is to avoid cases like not 318 // overwriting an old meta file. 319 dstShard := shard 320 dstShard.Path = filepath.Join(dstDir, filepath.Base(shard.Path)) 321 removeAll(dstShard) 322 323 // HACK we do not yet support tombstones in compound shard. So we avoid 324 // needing to deal with it by always deleting the whole compound shard. 325 if strings.HasPrefix(filepath.Base(shard.Path), "compound-") { 326 log.Printf("HACK removing compound shard since we don't support tombstoning: %s", shard.Path) 327 removeAll(shard) 328 continue 329 } 330 331 // Rename all paths, stop at first failure 332 for _, p := range paths { 333 dst := filepath.Join(dstDir, filepath.Base(p)) 334 err = os.Rename(p, dst) 335 if err != nil { 336 break 337 } 338 } 339 340 if err != nil { 341 log.Printf("failed to move shard, deleting all shards for %s: %v", shard.RepoName, err) 342 removeAll(dstShard) // some files may have moved to dst 343 removeAll(shards...) 344 return 345 } 346 347 // update shards so partial failure removes the dst path 348 shards[i] = dstShard 349 } 350} 351 352// consistentRepoName returns true if the list of shards have a unique 353// repository name. 354func consistentRepoName(shards []shard) bool { 355 if len(shards) <= 1 { 356 return true 357 } 358 name := shards[0].RepoName 359 for _, shard := range shards[1:] { 360 if shard.RepoName != name { 361 return false 362 } 363 } 364 return true 365} 366 367// maybeSetTombstone will call zoekt.SetTombstone for repoID if shards 368// represents a compound shard. It returns true if shards represents a 369// compound shard. 370func maybeSetTombstone(shards []shard, repoID uint32) bool { 371 // 1 repo can be split across many simple shards but it should only be contained 372 // in 1 compound shard. Hence we check that len(shards)==1 and only consider the 373 // shard at index 0. 374 if len(shards) != 1 || !strings.HasPrefix(filepath.Base(shards[0].Path), "compound-") { 375 return false 376 } 377 378 if err := zoekt.SetTombstone(shards[0].Path, repoID); err != nil { 379 log.Printf("error setting tombstone for %d in shard %s: %s. Removing shard\n", repoID, shards[0].Path, err) 380 _ = os.Remove(shards[0].Path) 381 } 382 return true 383} 384 385func shardsLog(indexDir, action string, shards []shard) { 386 shardLogger := &lumberjack.Logger{ 387 Filename: filepath.Join(indexDir, "zoekt-indexserver-shard-log.tsv"), 388 MaxSize: 100, // Megabyte 389 MaxBackups: 5, 390 } 391 defer shardLogger.Close() 392 393 for _, s := range shards { 394 shardName := filepath.Base(s.Path) 395 var shardSize int64 396 if fi, err := os.Stat(filepath.Join(indexDir, shardName)); err == nil { 397 shardSize = fi.Size() 398 } 399 _, _ = fmt.Fprintf(shardLogger, "%s\t%s\t%s\t%d\t%s\t%d\n", time.Now().UTC().Format(time.RFC3339), action, shardName, shardSize, s.RepoName, s.RepoID) 400 } 401} 402 403var metricVacuumRunning = promauto.NewGauge(prometheus.GaugeOpts{ 404 Name: "index_vacuum_running", 405 Help: "Set to 1 if indexserver's vacuum job is running.", 406}) 407 408var metricNumberCompoundShards = promauto.NewGauge(prometheus.GaugeOpts{ 409 Name: "index_number_compound_shards", 410 Help: "The number of compound shards.", 411}) 412 413// vacuum removes tombstoned repos from compound shards and removes compound 414// shards if they shrink below minSizeBytes. Vacuum locks the index directory for 415// each compound shard it vacuums. 416func (s *Server) vacuum() { 417 metricVacuumRunning.Set(1) 418 defer metricVacuumRunning.Set(0) 419 420 d, err := os.Open(s.IndexDir) 421 if err != nil { 422 return 423 } 424 defer d.Close() 425 fns, _ := d.Readdirnames(-1) 426 427 for _, fn := range fns { 428 // We could run this over all shards, but based on our current setup, simple 429 // shards won't have tombstones but instead will be moved to .trash. 430 if !strings.HasPrefix(fn, "compound-") || !strings.HasSuffix(fn, ".zoekt") { 431 continue 432 } 433 434 path := filepath.Join(s.IndexDir, fn) 435 info, err := os.Stat(path) 436 if err != nil { 437 debug.Printf("vacuum stat failed: %v", err) 438 continue 439 } 440 441 if info.Size() < s.mergeOpts.minSizeBytes { 442 cmd := exec.Command("zoekt-merge-index", "explode", path) 443 444 var b []byte 445 s.muIndexDir.Global(func() { 446 b, err = cmd.CombinedOutput() 447 }) 448 449 if err != nil { 450 debug.Printf("failed to explode compound shard %s: %s", path, string(b)) 451 } else { 452 shardsLog(s.IndexDir, "explode", []shard{{Path: path}}) 453 } 454 continue 455 } 456 457 var removed []*zoekt.Repository 458 s.muIndexDir.Global(func() { 459 removed, err = removeTombstones(path) 460 }) 461 462 if err != nil { 463 debug.Printf("error while removing tombstones in %s: %s", fn, err) 464 } 465 for _, repo := range removed { 466 shardsLog(s.IndexDir, "vac", []shard{{ 467 RepoID: repo.ID, 468 RepoName: repo.Name, 469 Path: filepath.Join(s.IndexDir, fn), 470 ModTime: info.ModTime(), 471 }}) 472 } 473 } 474} 475 476var mockMerger func() error 477 478// removeTombstones removes all tombstones from a compound shard at fn by merging 479// the compound shard with itself. 480func removeTombstones(fn string) ([]*zoekt.Repository, error) { 481 var runMerge func() error 482 if mockMerger != nil { 483 runMerge = mockMerger 484 } else { 485 runMerge = exec.Command("zoekt-merge-index", "merge", fn).Run 486 } 487 488 repos, _, err := zoekt.ReadMetadataPath(fn) 489 if err != nil { 490 return nil, fmt.Errorf("zoekt.ReadMetadataPath: %s", err) 491 } 492 493 var tombstones []*zoekt.Repository 494 for _, r := range repos { 495 if r.Tombstone { 496 tombstones = append(tombstones, r) 497 } 498 } 499 if len(tombstones) == 0 { 500 return nil, nil 501 } 502 503 defer func() { 504 paths, err := zoekt.IndexFilePaths(fn) 505 if err != nil { 506 return 507 } 508 for _, path := range paths { 509 os.Remove(path) 510 } 511 }() 512 err = runMerge() 513 if err != nil { 514 return nil, fmt.Errorf("runMerge: %s", err) 515 } 516 return tombstones, nil 517}