fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "fmt" 5 "os" 6 "os/exec" 7 "path/filepath" 8 "sort" 9 "strings" 10 "time" 11 12 "github.com/grafana/regexp" 13 "github.com/prometheus/client_golang/prometheus" 14 "github.com/prometheus/client_golang/prometheus/promauto" 15 16 "github.com/sourcegraph/zoekt" 17) 18 19var metricCleanupDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 20 Name: "index_cleanup_duration_seconds", 21 Help: "The duration of one cleanup run", 22 Buckets: prometheus.LinearBuckets(1, 1, 10), 23}) 24 25// cleanup trashes shards in indexDir that do not exist in repos. For repos 26// that do not exist in indexDir, but do in indexDir/.trash it will move them 27// back into indexDir. Additionally it uses now to remove shards that have 28// been in the trash for 24 hours. It also deletes .tmp files older than 4 hours. 29func cleanup(indexDir string, repos []uint32, now time.Time, shardMerging bool) { 30 start := time.Now() 31 trashDir := filepath.Join(indexDir, ".trash") 32 if err := os.MkdirAll(trashDir, 0o755); err != nil { 33 errorLog.Printf("failed to create trash dir: %v", err) 34 } 35 36 trash := getShards(trashDir) 37 tombtones := getTombstonedRepos(indexDir) 38 index := getShards(indexDir) 39 40 // trash: Remove old shards and conflicts with index 41 minAge := now.Add(-24 * time.Hour) 42 for repo, shards := range trash { 43 old := false 44 for _, shard := range shards { 45 if shard.ModTime.Before(minAge) { 46 old = true 47 } else if shard.ModTime.After(now) { 48 debugLog.Printf("trashed shard %s has timestamp in the future, reseting to now", shard.Path) 49 _ = os.Chtimes(shard.Path, now, now) 50 } 51 } 52 53 if _, conflicts := index[repo]; !conflicts && !old { 54 continue 55 } 56 57 infoLog.Printf("removing old shards from trash for %v", repo) 58 removeAll(shards...) 59 delete(trash, repo) 60 } 61 62 // tombstones: Remove tombstones that conflict with index or trash. After this, 63 // tombstones only contain repos that are neither in the trash nor in the index. 64 for repo := range tombtones { 65 if _, conflicts := index[repo]; conflicts { 66 delete(tombtones, repo) 67 } 68 // Trash takes precedence over tombstones. 69 if _, conflicts := trash[repo]; conflicts { 70 delete(tombtones, repo) 71 } 72 } 73 74 // index: We are ID based, but store shards by name still. If we end up with 75 // shards that have the same ID but different names delete and start over. 76 // This can happen when a repository is renamed. In future we should make 77 // shard file names based on ID. 78 for repo, shards := range index { 79 if consistentRepoName(shards) { 80 continue 81 } 82 83 // prevent further processing since we will delete 84 delete(index, repo) 85 86 // This should be rare, so give an informative log message. 87 var paths []string 88 for _, shard := range shards { 89 paths = append(paths, filepath.Base(shard.Path)) 90 } 91 infoLog.Printf("removing shards for %v due to multiple repository names: %s", repo, strings.Join(paths, " ")) 92 93 // We may be in both normal and compound shards in this case. First 94 // tombstone the compound shards so we don't just rm them. 95 simple := shards[:0] 96 for _, s := range shards { 97 if shardMerging && maybeSetTombstone([]shard{s}, repo) { 98 continue 99 } 100 101 simple = append(simple, s) 102 } 103 104 if len(simple) == 0 { 105 continue 106 } 107 108 removeAll(simple...) 109 } 110 111 // index: Move missing repos from trash into index 112 // index: Restore deleted or tombstoned repos. 113 for _, repo := range repos { 114 // Delete from index so that index will only contain shards to be 115 // trashed. 116 delete(index, repo) 117 118 if shards, ok := trash[repo]; ok { 119 infoLog.Printf("restoring shards from trash for %v", repo) 120 moveAll(indexDir, shards) 121 continue 122 } 123 124 if s, ok := tombtones[repo]; ok { 125 infoLog.Printf("removing tombstone for %v", repo) 126 err := zoekt.UnsetTombstone(s.Path, repo) 127 if err != nil { 128 errorLog.Printf("error removing tombstone for %v: %s", repo, err) 129 } 130 } 131 } 132 133 // index: Move non-existent repos into trash 134 for repo, shards := range index { 135 // Best-effort touch. If touch fails, we will just remove from the 136 // trash sooner. 137 for _, shard := range shards { 138 _ = os.Chtimes(shard.Path, now, now) 139 } 140 141 if shardMerging && maybeSetTombstone(shards, repo) { 142 continue 143 } 144 moveAll(trashDir, shards) 145 } 146 147 // Remove .tmp files from crashed indexer runs-- for example, if an indexer 148 // OOMs, it will leave around .tmp files, usually in a loop. We can remove 149 // the files now since cleanup runs with a global lock (no indexing jobs 150 // running at the same time). 151 if failures, err := filepath.Glob(filepath.Join(indexDir, "*.tmp")); err != nil { 152 errorLog.Printf("Glob: %v", err) 153 } else { 154 for _, f := range failures { 155 st, err := os.Stat(f) 156 if err != nil { 157 errorLog.Printf("Stat(%q): %v", f, err) 158 continue 159 } 160 if !st.IsDir() { 161 infoLog.Printf("removing tmp file: %s", f) 162 os.Remove(f) 163 } 164 } 165 } 166 167 metricCleanupDuration.Observe(time.Since(start).Seconds()) 168} 169 170type shard struct { 171 RepoID uint32 172 RepoName string 173 Path string 174 ModTime time.Time 175 RepoTombstone bool 176} 177 178func getShards(dir string) map[uint32][]shard { 179 d, err := os.Open(dir) 180 if err != nil { 181 debugLog.Printf("failed to getShards: %s", dir) 182 return nil 183 } 184 defer d.Close() 185 names, _ := d.Readdirnames(-1) 186 sort.Strings(names) 187 188 shards := make(map[uint32][]shard, len(names)) 189 for _, n := range names { 190 path := filepath.Join(dir, n) 191 fi, err := os.Stat(path) 192 if err != nil { 193 debugLog.Printf("stat failed: %v", err) 194 continue 195 } 196 if fi.IsDir() || filepath.Ext(path) != ".zoekt" { 197 continue 198 } 199 200 repos, _, err := zoekt.ReadMetadataPathAlive(path) 201 if err != nil { 202 debugLog.Printf("failed to read shard: %v", err) 203 continue 204 } 205 206 for _, repo := range repos { 207 shards[repo.ID] = append(shards[repo.ID], shard{ 208 RepoID: repo.ID, 209 RepoName: repo.Name, 210 Path: path, 211 ModTime: fi.ModTime(), 212 RepoTombstone: repo.Tombstone, 213 }) 214 } 215 } 216 return shards 217} 218 219// getTombstonedRepos return a map of tombstoned repositories in dir. If a 220// repository is tombstoned in more than one compound shard, only the latest one, 221// as determined by the date of the latest commit, is returned. 222func getTombstonedRepos(dir string) map[uint32]shard { 223 paths, err := filepath.Glob(filepath.Join(dir, "compound-*.zoekt")) 224 if err != nil { 225 return nil 226 } 227 if len(paths) == 0 { 228 return nil 229 } 230 231 m := make(map[uint32]shard) 232 233 for _, p := range paths { 234 repos, _, err := zoekt.ReadMetadataPath(p) 235 if err != nil { 236 continue 237 } 238 for _, repo := range repos { 239 if !repo.Tombstone { 240 continue 241 } 242 if v, ok := m[repo.ID]; ok && v.ModTime.After(repo.LatestCommitDate) { 243 continue 244 } 245 m[repo.ID] = shard{ 246 RepoID: repo.ID, 247 RepoName: repo.Name, 248 Path: p, 249 ModTime: repo.LatestCommitDate, 250 RepoTombstone: repo.Tombstone, 251 } 252 } 253 } 254 return m 255} 256 257var incompleteRE = regexp.MustCompile(`\.zoekt[0-9]+(\.\w+)?$`) 258 259func removeIncompleteShards(dir string) { 260 d, err := os.Open(dir) 261 if err != nil { 262 debugLog.Printf("failed to removeIncompleteShards: %s", dir) 263 return 264 } 265 defer d.Close() 266 267 names, _ := d.Readdirnames(-1) 268 for _, n := range names { 269 if incompleteRE.MatchString(n) { 270 path := filepath.Join(dir, n) 271 if err := os.Remove(path); err != nil { 272 debugLog.Printf("failed to remove incomplete shard %s: %v", path, err) 273 } else { 274 debugLog.Printf("cleaned up incomplete shard %s", path) 275 } 276 } 277 } 278} 279 280func removeAll(shards ...shard) { 281 // Note on error handling here: We only expect this to fail due to 282 // IsNotExist, which is fine. Additionally this shouldn't fail 283 // partially. But if it does, and the file still exists, then we have the 284 // potential for a partial index for a repo. However, this should be 285 // exceedingly rare due to it being a mix of partial failure on something in 286 // trash + an admin re-adding a repository. 287 for _, shard := range shards { 288 paths, err := zoekt.IndexFilePaths(shard.Path) 289 if err != nil { 290 debugLog.Printf("failed to remove shard %s: %v", shard.Path, err) 291 } 292 for _, p := range paths { 293 if err := os.Remove(p); err != nil { 294 debugLog.Printf("failed to remove shard file %s: %v", p, err) 295 } 296 } 297 } 298} 299 300func moveAll(dstDir string, shards []shard) { 301 for i, shard := range shards { 302 paths, err := zoekt.IndexFilePaths(shard.Path) 303 if err != nil { 304 errorLog.Printf("failed to stat shard paths, deleting all shards for %s: %v", shard.RepoName, err) 305 removeAll(shards...) 306 return 307 } 308 309 // Remove all files in dstDir for shard. This is to avoid cases like not 310 // overwriting an old meta file. 311 dstShard := shard 312 dstShard.Path = filepath.Join(dstDir, filepath.Base(shard.Path)) 313 removeAll(dstShard) 314 315 // HACK we do not yet support tombstones in compound shard. So we avoid 316 // needing to deal with it by always deleting the whole compound shard. 317 if strings.HasPrefix(filepath.Base(shard.Path), "compound-") { 318 infoLog.Printf("HACK removing compound shard since we don't support tombstoning: %s", shard.Path) 319 removeAll(shard) 320 continue 321 } 322 323 // Rename all paths, stop at first failure 324 for _, p := range paths { 325 dst := filepath.Join(dstDir, filepath.Base(p)) 326 err = os.Rename(p, dst) 327 if err != nil { 328 break 329 } 330 } 331 332 if err != nil { 333 errorLog.Printf("failed to move shard, deleting all shards for %s: %v", shard.RepoName, err) 334 removeAll(dstShard) // some files may have moved to dst 335 removeAll(shards...) 336 return 337 } 338 339 // update shards so partial failure removes the dst path 340 shards[i] = dstShard 341 } 342} 343 344// consistentRepoName returns true if the list of shards have a unique 345// repository name. 346func consistentRepoName(shards []shard) bool { 347 if len(shards) <= 1 { 348 return true 349 } 350 name := shards[0].RepoName 351 for _, shard := range shards[1:] { 352 if shard.RepoName != name { 353 return false 354 } 355 } 356 return true 357} 358 359// maybeSetTombstone will call zoekt.SetTombstone for repoID if shards 360// represents a compound shard. It returns true if shards represents a 361// compound shard. 362func maybeSetTombstone(shards []shard, repoID uint32) bool { 363 // 1 repo can be split across many simple shards but it should only be contained 364 // in 1 compound shard. Hence we check that len(shards)==1 and only consider the 365 // shard at index 0. 366 if len(shards) != 1 || !strings.HasPrefix(filepath.Base(shards[0].Path), "compound-") { 367 return false 368 } 369 370 if err := zoekt.SetTombstone(shards[0].Path, repoID); err != nil { 371 errorLog.Printf("error setting tombstone for %d in shard %s: %s. Removing shard\n", repoID, shards[0].Path, err) 372 _ = os.Remove(shards[0].Path) 373 } 374 return true 375} 376 377var metricVacuumRunning = promauto.NewGauge(prometheus.GaugeOpts{ 378 Name: "index_vacuum_running", 379 Help: "Set to 1 if indexserver's vacuum job is running.", 380}) 381 382var metricNumberShards = promauto.NewGauge(prometheus.GaugeOpts{ 383 Name: "index_number_shards", 384 Help: "The number of total shards.", 385}) 386 387var metricNumberCompoundShards = promauto.NewGauge(prometheus.GaugeOpts{ 388 Name: "index_number_compound_shards", 389 Help: "The number of compound shards.", 390}) 391 392// vacuum removes tombstoned repos from compound shards and removes compound 393// shards if they shrink below minSizeBytes. Vacuum locks the index directory for 394// each compound shard it vacuums. 395func (s *Server) vacuum() { 396 metricVacuumRunning.Set(1) 397 defer metricVacuumRunning.Set(0) 398 399 d, err := os.Open(s.IndexDir) 400 if err != nil { 401 return 402 } 403 defer d.Close() 404 fns, _ := d.Readdirnames(-1) 405 406 for _, fn := range fns { 407 // We could run this over all shards, but based on our current setup, simple 408 // shards won't have tombstones but instead will be moved to .trash. 409 if !strings.HasPrefix(fn, "compound-") || !strings.HasSuffix(fn, ".zoekt") { 410 continue 411 } 412 413 path := filepath.Join(s.IndexDir, fn) 414 info, err := os.Stat(path) 415 if err != nil { 416 errorLog.Printf("vacuum stat failed: %v", err) 417 continue 418 } 419 420 if info.Size() < s.mergeOpts.minSizeBytes { 421 cmd := exec.Command("zoekt-merge-index", "explode", path) 422 423 var b []byte 424 s.muIndexDir.Global(func() { 425 b, err = cmd.CombinedOutput() 426 }) 427 428 if err != nil { 429 errorLog.Printf("failed to explode compound shard: shard=%s out=%s err=%s", path, string(b), err) 430 } 431 infoLog.Printf("exploded compound shard: shard=%s", path) 432 continue 433 } 434 435 s.muIndexDir.Global(func() { 436 _, err = removeTombstones(path) 437 }) 438 439 if err != nil { 440 errorLog.Printf("error while removing tombstones in %s: %s", path, err) 441 } 442 } 443} 444 445var mockMerger func() error 446 447// removeTombstones removes all tombstones from a compound shard at fn by merging 448// the compound shard with itself. 449func removeTombstones(fn string) ([]*zoekt.Repository, error) { 450 var runMerge func() error 451 if mockMerger != nil { 452 runMerge = mockMerger 453 } else { 454 runMerge = exec.Command("zoekt-merge-index", "merge", fn).Run 455 } 456 457 repos, _, err := zoekt.ReadMetadataPath(fn) 458 if err != nil { 459 return nil, fmt.Errorf("zoekt.ReadMetadataPath: %s", err) 460 } 461 462 var tombstones []*zoekt.Repository 463 for _, r := range repos { 464 if r.Tombstone { 465 tombstones = append(tombstones, r) 466 } 467 } 468 if len(tombstones) == 0 { 469 return nil, nil 470 } 471 472 defer func() { 473 paths, err := zoekt.IndexFilePaths(fn) 474 if err != nil { 475 return 476 } 477 for _, path := range paths { 478 os.Remove(path) 479 } 480 }() 481 err = runMerge() 482 if err != nil { 483 return nil, fmt.Errorf("runMerge: %s", err) 484 } 485 return tombstones, nil 486}