fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "fmt" 5 "log" 6 "os" 7 "os/exec" 8 "path/filepath" 9 "sort" 10 "strings" 11 "time" 12 13 "github.com/grafana/regexp" 14 "github.com/prometheus/client_golang/prometheus" 15 "github.com/prometheus/client_golang/prometheus/promauto" 16 17 "github.com/sourcegraph/zoekt" 18) 19 20var metricCleanupDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 21 Name: "index_cleanup_duration_seconds", 22 Help: "The duration of one cleanup run", 23 Buckets: prometheus.LinearBuckets(1, 1, 10), 24}) 25 26// cleanup trashes shards in indexDir that do not exist in repos. For repos 27// that do not exist in indexDir, but do in indexDir/.trash it will move them 28// back into indexDir. Additionally it uses now to remove shards that have 29// been in the trash for 24 hours. It also deletes .tmp files older than 4 hours. 30func cleanup(indexDir string, repos []uint32, now time.Time, shardMerging bool) { 31 start := time.Now() 32 trashDir := filepath.Join(indexDir, ".trash") 33 if err := os.MkdirAll(trashDir, 0755); err != nil { 34 log.Printf("failed to create trash dir: %v", err) 35 } 36 37 trash := getShards(trashDir) 38 tombtones := getTombstonedRepos(indexDir) 39 index := getShards(indexDir) 40 41 // trash: Remove old shards and conflicts with index 42 minAge := now.Add(-24 * time.Hour) 43 for repo, shards := range trash { 44 old := false 45 for _, shard := range shards { 46 if shard.ModTime.Before(minAge) { 47 old = true 48 } else if shard.ModTime.After(now) { 49 debug.Printf("trashed shard %s has timestamp in the future, reseting to now", shard.Path) 50 _ = os.Chtimes(shard.Path, now, now) 51 } 52 } 53 54 if _, conflicts := index[repo]; !conflicts && !old { 55 continue 56 } 57 58 log.Printf("removing old shards from trash for %v", repo) 59 removeAll(shards...) 60 delete(trash, repo) 61 } 62 63 // tombstones: Remove tombstones that conflict with index or trash. After this, 64 // tombstones only contain repos that are neither in the trash nor in the index. 65 for repo := range tombtones { 66 if _, conflicts := index[repo]; conflicts { 67 delete(tombtones, repo) 68 } 69 // Trash takes precedence over tombstones. 70 if _, conflicts := trash[repo]; conflicts { 71 delete(tombtones, repo) 72 } 73 } 74 75 // index: We are ID based, but store shards by name still. If we end up with 76 // shards that have the same ID but different names delete and start over. 77 // This can happen when a repository is renamed. In future we should make 78 // shard file names based on ID. 79 for repo, shards := range index { 80 if consistentRepoName(shards) { 81 continue 82 } 83 84 // prevent further processing since we will delete 85 delete(index, repo) 86 87 // This should be rare, so give an informative log message. 88 var paths []string 89 for _, shard := range shards { 90 paths = append(paths, filepath.Base(shard.Path)) 91 } 92 log.Printf("removing shards for %v due to multiple repository names: %s", repo, strings.Join(paths, " ")) 93 94 // We may be in both normal and compound shards in this case. First 95 // tombstone the compound shards so we don't just rm them. 96 simple := shards[:0] 97 for _, s := range shards { 98 if shardMerging && maybeSetTombstone([]shard{s}, repo) { 99 continue 100 } 101 102 simple = append(simple, s) 103 } 104 105 if len(simple) == 0 { 106 continue 107 } 108 109 removeAll(simple...) 110 } 111 112 // index: Move missing repos from trash into index 113 // index: Restore deleted or tombstoned repos. 114 for _, repo := range repos { 115 // Delete from index so that index will only contain shards to be 116 // trashed. 117 delete(index, repo) 118 119 if shards, ok := trash[repo]; ok { 120 log.Printf("restoring shards from trash for %v", repo) 121 moveAll(indexDir, shards) 122 continue 123 } 124 125 if s, ok := tombtones[repo]; ok { 126 log.Printf("removing tombstone for %v", repo) 127 err := zoekt.UnsetTombstone(s.Path, repo) 128 if err != nil { 129 log.Printf("error removing tombstone for %v: %s", repo, err) 130 } 131 } 132 } 133 134 // index: Move non-existent repos into trash 135 for repo, shards := range index { 136 // Best-effort touch. If touch fails, we will just remove from the 137 // trash sooner. 138 for _, shard := range shards { 139 _ = os.Chtimes(shard.Path, now, now) 140 } 141 142 if shardMerging && maybeSetTombstone(shards, repo) { 143 continue 144 } 145 moveAll(trashDir, shards) 146 } 147 148 // Remove .tmp files from crashed indexer runs-- for example, if an indexer 149 // OOMs, it will leave around .tmp files, usually in a loop. We can remove 150 // the files now since cleanup runs with a global lock (no indexing jobs 151 // running at the same time). 152 if failures, err := filepath.Glob(filepath.Join(indexDir, "*.tmp")); err != nil { 153 log.Printf("Glob: %v", err) 154 } else { 155 for _, f := range failures { 156 st, err := os.Stat(f) 157 if err != nil { 158 log.Printf("Stat(%q): %v", f, err) 159 continue 160 } 161 if !st.IsDir() { 162 log.Printf("removing tmp file: %s", f) 163 os.Remove(f) 164 } 165 } 166 } 167 168 metricCleanupDuration.Observe(time.Since(start).Seconds()) 169} 170 171type shard struct { 172 RepoID uint32 173 RepoName string 174 Path string 175 ModTime time.Time 176 RepoTombstone bool 177} 178 179func getShards(dir string) map[uint32][]shard { 180 d, err := os.Open(dir) 181 if err != nil { 182 debug.Printf("failed to getShards: %s", dir) 183 return nil 184 } 185 defer d.Close() 186 names, _ := d.Readdirnames(-1) 187 sort.Strings(names) 188 189 shards := make(map[uint32][]shard, len(names)) 190 for _, n := range names { 191 path := filepath.Join(dir, n) 192 fi, err := os.Stat(path) 193 if err != nil { 194 debug.Printf("stat failed: %v", err) 195 continue 196 } 197 if fi.IsDir() || filepath.Ext(path) != ".zoekt" { 198 continue 199 } 200 201 repos, _, err := zoekt.ReadMetadataPathAlive(path) 202 if err != nil { 203 debug.Printf("failed to read shard: %v", err) 204 continue 205 } 206 207 for _, repo := range repos { 208 shards[repo.ID] = append(shards[repo.ID], shard{ 209 RepoID: repo.ID, 210 RepoName: repo.Name, 211 Path: path, 212 ModTime: fi.ModTime(), 213 RepoTombstone: repo.Tombstone, 214 }) 215 } 216 } 217 return shards 218} 219 220// getTombstonedRepos return a map of tombstoned repositories in dir. If a 221// repository is tombstoned in more than one compound shard, only the latest one, 222// as determined by the date of the latest commit, is returned. 223func getTombstonedRepos(dir string) map[uint32]shard { 224 paths, err := filepath.Glob(filepath.Join(dir, "compound-*.zoekt")) 225 if err != nil { 226 return nil 227 } 228 if len(paths) == 0 { 229 return nil 230 } 231 232 m := make(map[uint32]shard) 233 234 for _, p := range paths { 235 repos, _, err := zoekt.ReadMetadataPath(p) 236 if err != nil { 237 continue 238 } 239 for _, repo := range repos { 240 if !repo.Tombstone { 241 continue 242 } 243 if v, ok := m[repo.ID]; ok && v.ModTime.After(repo.LatestCommitDate) { 244 continue 245 } 246 m[repo.ID] = shard{ 247 RepoID: repo.ID, 248 RepoName: repo.Name, 249 Path: p, 250 ModTime: repo.LatestCommitDate, 251 RepoTombstone: repo.Tombstone, 252 } 253 } 254 } 255 return m 256} 257 258var incompleteRE = regexp.MustCompile(`\.zoekt[0-9]+(\.\w+)?$`) 259 260func removeIncompleteShards(dir string) { 261 d, err := os.Open(dir) 262 if err != nil { 263 debug.Printf("failed to removeIncompleteShards: %s", dir) 264 return 265 } 266 defer d.Close() 267 268 names, _ := d.Readdirnames(-1) 269 for _, n := range names { 270 if incompleteRE.MatchString(n) { 271 path := filepath.Join(dir, n) 272 if err := os.Remove(path); err != nil { 273 debug.Printf("failed to remove incomplete shard %s: %v", path, err) 274 } else { 275 debug.Printf("cleaned up incomplete shard %s", path) 276 } 277 } 278 } 279} 280 281func removeAll(shards ...shard) { 282 // Note on error handling here: We only expect this to fail due to 283 // IsNotExist, which is fine. Additionally this shouldn't fail 284 // partially. But if it does, and the file still exists, then we have the 285 // potential for a partial index for a repo. However, this should be 286 // exceedingly rare due to it being a mix of partial failure on something in 287 // trash + an admin re-adding a repository. 288 for _, shard := range shards { 289 paths, err := zoekt.IndexFilePaths(shard.Path) 290 if err != nil { 291 debug.Printf("failed to remove shard %s: %v", shard.Path, err) 292 } 293 for _, p := range paths { 294 if err := os.Remove(p); err != nil { 295 debug.Printf("failed to remove shard file %s: %v", p, err) 296 } 297 } 298 } 299} 300 301func moveAll(dstDir string, shards []shard) { 302 for i, shard := range shards { 303 paths, err := zoekt.IndexFilePaths(shard.Path) 304 if err != nil { 305 log.Printf("failed to stat shard paths, deleting all shards for %s: %v", shard.RepoName, err) 306 removeAll(shards...) 307 return 308 } 309 310 // Remove all files in dstDir for shard. This is to avoid cases like not 311 // overwriting an old meta file. 312 dstShard := shard 313 dstShard.Path = filepath.Join(dstDir, filepath.Base(shard.Path)) 314 removeAll(dstShard) 315 316 // HACK we do not yet support tombstones in compound shard. So we avoid 317 // needing to deal with it by always deleting the whole compound shard. 318 if strings.HasPrefix(filepath.Base(shard.Path), "compound-") { 319 log.Printf("HACK removing compound shard since we don't support tombstoning: %s", shard.Path) 320 removeAll(shard) 321 continue 322 } 323 324 // Rename all paths, stop at first failure 325 for _, p := range paths { 326 dst := filepath.Join(dstDir, filepath.Base(p)) 327 err = os.Rename(p, dst) 328 if err != nil { 329 break 330 } 331 } 332 333 if err != nil { 334 log.Printf("failed to move shard, deleting all shards for %s: %v", shard.RepoName, err) 335 removeAll(dstShard) // some files may have moved to dst 336 removeAll(shards...) 337 return 338 } 339 340 // update shards so partial failure removes the dst path 341 shards[i] = dstShard 342 } 343} 344 345// consistentRepoName returns true if the list of shards have a unique 346// repository name. 347func consistentRepoName(shards []shard) bool { 348 if len(shards) <= 1 { 349 return true 350 } 351 name := shards[0].RepoName 352 for _, shard := range shards[1:] { 353 if shard.RepoName != name { 354 return false 355 } 356 } 357 return true 358} 359 360// maybeSetTombstone will call zoekt.SetTombstone for repoID if shards 361// represents a compound shard. It returns true if shards represents a 362// compound shard. 363func maybeSetTombstone(shards []shard, repoID uint32) bool { 364 // 1 repo can be split across many simple shards but it should only be contained 365 // in 1 compound shard. Hence we check that len(shards)==1 and only consider the 366 // shard at index 0. 367 if len(shards) != 1 || !strings.HasPrefix(filepath.Base(shards[0].Path), "compound-") { 368 return false 369 } 370 371 if err := zoekt.SetTombstone(shards[0].Path, repoID); err != nil { 372 log.Printf("error setting tombstone for %d in shard %s: %s. Removing shard\n", repoID, shards[0].Path, err) 373 _ = os.Remove(shards[0].Path) 374 } 375 return true 376} 377 378var metricVacuumRunning = promauto.NewGauge(prometheus.GaugeOpts{ 379 Name: "index_vacuum_running", 380 Help: "Set to 1 if indexserver's vacuum job is running.", 381}) 382 383var metricNumberCompoundShards = promauto.NewGauge(prometheus.GaugeOpts{ 384 Name: "index_number_compound_shards", 385 Help: "The number of compound shards.", 386}) 387 388// vacuum removes tombstoned repos from compound shards and removes compound 389// shards if they shrink below minSizeBytes. Vacuum locks the index directory for 390// each compound shard it vacuums. 391func (s *Server) vacuum() { 392 metricVacuumRunning.Set(1) 393 defer metricVacuumRunning.Set(0) 394 395 d, err := os.Open(s.IndexDir) 396 if err != nil { 397 return 398 } 399 defer d.Close() 400 fns, _ := d.Readdirnames(-1) 401 402 for _, fn := range fns { 403 // We could run this over all shards, but based on our current setup, simple 404 // shards won't have tombstones but instead will be moved to .trash. 405 if !strings.HasPrefix(fn, "compound-") || !strings.HasSuffix(fn, ".zoekt") { 406 continue 407 } 408 409 path := filepath.Join(s.IndexDir, fn) 410 info, err := os.Stat(path) 411 if err != nil { 412 debug.Printf("vacuum stat failed: %v", err) 413 continue 414 } 415 416 if info.Size() < s.mergeOpts.minSizeBytes { 417 cmd := exec.Command("zoekt-merge-index", "explode", path) 418 419 var b []byte 420 s.muIndexDir.Global(func() { 421 b, err = cmd.CombinedOutput() 422 }) 423 424 if err != nil { 425 debug.Printf("failed to explode compound shard %s: %s", path, string(b)) 426 } 427 continue 428 } 429 430 s.muIndexDir.Global(func() { 431 _, err = removeTombstones(path) 432 }) 433 434 if err != nil { 435 debug.Printf("error while removing tombstones in %s: %s", fn, err) 436 } 437 } 438} 439 440var mockMerger func() error 441 442// removeTombstones removes all tombstones from a compound shard at fn by merging 443// the compound shard with itself. 444func removeTombstones(fn string) ([]*zoekt.Repository, error) { 445 var runMerge func() error 446 if mockMerger != nil { 447 runMerge = mockMerger 448 } else { 449 runMerge = exec.Command("zoekt-merge-index", "merge", fn).Run 450 } 451 452 repos, _, err := zoekt.ReadMetadataPath(fn) 453 if err != nil { 454 return nil, fmt.Errorf("zoekt.ReadMetadataPath: %s", err) 455 } 456 457 var tombstones []*zoekt.Repository 458 for _, r := range repos { 459 if r.Tombstone { 460 tombstones = append(tombstones, r) 461 } 462 } 463 if len(tombstones) == 0 { 464 return nil, nil 465 } 466 467 defer func() { 468 paths, err := zoekt.IndexFilePaths(fn) 469 if err != nil { 470 return 471 } 472 for _, path := range paths { 473 os.Remove(path) 474 } 475 }() 476 err = runMerge() 477 if err != nil { 478 return nil, fmt.Errorf("runMerge: %s", err) 479 } 480 return tombstones, nil 481}