fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

merging: tolerate process interruptions during merging (#289)

Currently the call to `zoekt-merge-index` does not tollerate process interruptions, which can potentially lead to duplicate indexes. This could be the root cause for the duplicate indexes we see in production.

With this PR

- `merge.Merge` returns a temporary compound shard (*.tmp)
- `zoekt-merge-index` calls `merge.Merge` and is reponsible to delete the input shards and rename the temporary compound shard returned by `merge.Merge`.
- `zoekt-sourcegraph-indexserver` just calls `zoekt-merge-index` and always expects a consistent state.

A positive side-effect is that both merge and explode now behave the same when called from the command line: both remove the input shards and leave the output shards behind.

+116 -54
+5 -2
build/builder_test.go
··· 453 453 files = append(files, indexFile) 454 454 } 455 455 456 - // merge all the normal shards into a compound shard 457 - _, err := zoekt.Merge(indexDir, files...) 456 + // merge all the simple shards into a compound shard 457 + tmpName, dstName, err := zoekt.Merge(indexDir, files...) 458 458 if err != nil { 459 459 t.Fatalf("merging index files into compound shard: %s", err) 460 + } 461 + if err := os.Rename(tmpName, dstName); err != nil { 462 + t.Fatal(err) 460 463 } 461 464 }
+27 -8
cmd/zoekt-merge-index/main.go
··· 29 29 files = append(files, indexFile) 30 30 } 31 31 32 - _, err := zoekt.Merge(dstDir, files...) 33 - return err 32 + tmpName, dstName, err := zoekt.Merge(dstDir, files...) 33 + if err != nil { 34 + return err 35 + } 36 + 37 + // Delete input shards. 38 + for _, name := range names { 39 + paths, err := zoekt.IndexFilePaths(name) 40 + if err != nil { 41 + return fmt.Errorf("zoekt-merge-index: %w", err) 42 + } 43 + for _, p := range paths { 44 + if err := os.Remove(p); err != nil { 45 + return fmt.Errorf("zoekt-merge-index: failed to remove simple shard: %w", err) 46 + } 47 + } 48 + } 49 + 50 + // We only rename the compound shard if all simple shards could be deleted in the 51 + // previous step. This guarantees we won't have duplicate indexes. 52 + if err := os.Rename(tmpName, dstName); err != nil { 53 + return fmt.Errorf("zoekt-merge-index: failed to rename compound shard: %w", err) 54 + } 55 + return nil 34 56 } 35 57 36 58 func mergeCmd(paths []string) error { ··· 45 67 } 46 68 log.Printf("merging %d paths from stdin", len(paths)) 47 69 } 48 - err := merge(filepath.Dir(paths[0]), paths) 49 - if err != nil { 50 - return err 51 - } 52 - return nil 70 + 71 + return merge(filepath.Dir(paths[0]), paths) 53 72 } 54 73 55 - // explode splits a shard into indiviual shards and places them in dstDir. 74 + // explode splits a shard into individual shards and places them in dstDir. 56 75 // If it returns without error, the input shard was deleted and the first 57 76 // result contains the list of all new shards. 58 77 //
+50 -10
cmd/zoekt-merge-index/main_test.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "io" 6 + "os" 5 7 "path/filepath" 6 8 "sort" 7 9 "testing" ··· 12 14 ) 13 15 14 16 func TestMerge(t *testing.T) { 15 - dir := t.TempDir() 16 - 17 17 v16Shards, err := filepath.Glob("../../testdata/shards/*_v16.*.zoekt") 18 18 if err != nil { 19 19 t.Fatal(err) 20 20 } 21 21 sort.Strings(v16Shards) 22 - t.Log(v16Shards) 23 22 24 - err = merge(dir, v16Shards) 23 + testShards, err := copyTestShards(t.TempDir(), v16Shards) 24 + if err != nil { 25 + t.Fatal(err) 26 + } 27 + t.Log(testShards) 28 + 29 + dir := t.TempDir() 30 + err = merge(dir, testShards) 25 31 if err != nil { 26 32 t.Fatal(err) 27 33 } ··· 55 61 56 62 // Merge 2 simple shards and then explode them. 57 63 func TestExplode(t *testing.T) { 58 - dir := t.TempDir() 59 - 60 64 v16Shards, err := filepath.Glob("../../testdata/shards/repo*_v16.*.zoekt") 61 65 if err != nil { 62 66 t.Fatal(err) 63 67 } 64 68 sort.Strings(v16Shards) 65 - t.Log(v16Shards) 69 + 70 + testShards, err := copyTestShards(t.TempDir(), v16Shards) 71 + if err != nil { 72 + t.Fatal(err) 73 + } 74 + t.Log(testShards) 66 75 67 - err = merge(dir, v16Shards) 76 + dir := t.TempDir() 77 + err = merge(dir, testShards) 68 78 if err != nil { 69 79 t.Fatal(err) 70 80 } ··· 92 102 t.Fatal(err) 93 103 } 94 104 95 - if len(exploded) != len(v16Shards) { 96 - t.Fatalf("the number of simple shards before %d and after %d should be the same", len(v16Shards), len(exploded)) 105 + if len(exploded) != len(testShards) { 106 + t.Fatalf("the number of simple shards before %d and after %d should be the same", len(testShards), len(exploded)) 97 107 } 98 108 99 109 ss, err := shards.NewDirectorySearcher(dir) ··· 139 149 }) 140 150 } 141 151 } 152 + 153 + func copyTestShards(dstDir string, srcShards []string) ([]string, error) { 154 + var tmpShards []string 155 + for _, s := range srcShards { 156 + dst := filepath.Join(dstDir, filepath.Base(s)) 157 + tmpShards = append(tmpShards, dst) 158 + if err := copyFile(s, dst); err != nil { 159 + return nil, err 160 + } 161 + } 162 + return tmpShards, nil 163 + } 164 + 165 + func copyFile(src, dst string) (err error) { 166 + s, err := os.Open(src) 167 + if err != nil { 168 + return err 169 + } 170 + defer s.Close() 171 + 172 + d, err := os.Create(dst) 173 + if err != nil { 174 + return err 175 + } 176 + if _, err := io.Copy(d, s); err != nil { 177 + d.Close() 178 + return err 179 + } 180 + return d.Close() 181 + }
+9 -4
cmd/zoekt-sourcegraph-indexserver/cleanup_test.go
··· 480 480 } 481 481 482 482 // create a compound shard. 483 - fn, err := merge(dir, repoFns) 483 + tmpFn, dstFn, err := merge(dir, repoFns) 484 484 if err != nil { 485 485 t.Fatal(err) 486 486 } 487 487 for _, old := range repoFns { 488 - os.Remove(old) 488 + if err := os.Remove(old); err != nil { 489 + t.Fatal(err) 490 + } 491 + } 492 + if err := os.Rename(tmpFn, dstFn); err != nil { 493 + t.Fatal(err) 489 494 } 490 - return fn 495 + return dstFn 491 496 } 492 497 493 498 func mergeHelper(t *testing.T, fn string) error { ··· 505 510 } 506 511 defer indexFile.Close() 507 512 508 - _, err = zoekt.Merge(filepath.Dir(fn), indexFile) 513 + _, _, err = zoekt.Merge(filepath.Dir(fn), indexFile) 509 514 return err 510 515 }
-15
cmd/zoekt-sourcegraph-indexserver/merge.go
··· 233 233 }() 234 234 235 235 err = cmd.Run() 236 - // If err==nil we can safely delete the candidate shards. In case err!=nil we 237 - // don't know if a compound shard was created or not, so it is best to just 238 - // delete the candidate shards to avoid duplicate results in case a compound 239 - // shard was created after all. 240 - for _, s := range shards { 241 - paths, err := zoekt.IndexFilePaths(s.path) 242 - if err != nil { 243 - debug.Printf("failed to remove s %s: %v", s.path, err) 244 - } 245 - for _, p := range paths { 246 - if err := os.Remove(p); err != nil { 247 - debug.Printf("failed to remove shard file %s: %v", p, err) 248 - } 249 - } 250 - } 251 236 return outBuf.Bytes(), errBuf.Bytes(), err 252 237 }
+8 -5
cmd/zoekt-sourcegraph-indexserver/meta_test.go
··· 62 62 // create a compound shard. Use a new indexdir to avoid the need to cleanup 63 63 // old shards. 64 64 dir = t.TempDir() 65 - fn, err := merge(dir, repoFns) 65 + tmpFn, dstFn, err := merge(dir, repoFns) 66 66 if err != nil { 67 67 t.Fatal(err) 68 68 } 69 + if err := os.Rename(tmpFn, dstFn); err != nil { 70 + t.Fatal(err) 71 + } 69 72 70 73 readPublic := func() []string { 71 - repos, _, _ := zoekt.ReadMetadataPath(fn) 72 74 var public []string 75 + repos, _, _ := zoekt.ReadMetadataPath(dstFn) 73 76 for _, r := range repos { 74 77 public = append(public, r.RawConfig["public"]) 75 78 } ··· 99 102 } 100 103 } 101 104 102 - func merge(dstDir string, names []string) (string, error) { 105 + func merge(dstDir string, names []string) (string, string, error) { 103 106 var files []zoekt.IndexFile 104 107 for _, fn := range names { 105 108 f, err := os.Open(fn) 106 109 if err != nil { 107 - return "", err 110 + return "", "", err 108 111 } 109 112 defer f.Close() 110 113 111 114 indexFile, err := zoekt.NewIndexFile(f) 112 115 if err != nil { 113 - return "", err 116 + return "", "", err 114 117 } 115 118 defer indexFile.Close() 116 119
+11 -8
merge.go
··· 13 13 "sort" 14 14 ) 15 15 16 - // Merge files into a compound shard fn in the directory dstDir. 17 - func Merge(dstDir string, files ...IndexFile) (fn string, _ error) { 16 + // Merge files into a compound shard in dstDir. Merge returns tmpName and a 17 + // dstName. It is the responsibility of the caller to delete the input shards and 18 + // rename the temporary compound shard from tmpName to dstName. 19 + func Merge(dstDir string, files ...IndexFile) (tmpName, dstName string, _ error) { 18 20 var ds []*indexData 19 21 for _, f := range files { 20 22 searcher, err := NewSearcher(f) 21 23 if err != nil { 22 - return "", err 24 + return "", "", err 23 25 } 24 26 ds = append(ds, searcher.(*indexData)) 25 27 } 26 28 27 29 ib, err := merge(ds...) 28 30 if err != nil { 29 - return "", err 31 + return "", "", err 30 32 } 31 33 32 34 hasher := sha1.New() ··· 40 42 } 41 43 } 42 44 43 - fn = filepath.Join(dstDir, fmt.Sprintf("compound-%x_v%d.%05d.zoekt", hasher.Sum(nil), NextIndexFormatVersion, 0)) 44 - if err := builderWriteAll(fn, ib); err != nil { 45 - return "", err 45 + dstName = filepath.Join(dstDir, fmt.Sprintf("compound-%x_v%d.%05d.zoekt", hasher.Sum(nil), NextIndexFormatVersion, 0)) 46 + tmpName = dstName + ".tmp" 47 + if err := builderWriteAll(tmpName, ib); err != nil { 48 + return "", "", err 46 49 } 47 - return fn, nil 50 + return tmpName, dstName, nil 48 51 } 49 52 50 53 func builderWriteAll(fn string, ib *IndexBuilder) error {
+6 -2
merge_test.go
··· 50 50 } 51 51 52 52 tmpDir := t.TempDir() 53 - cs, err := Merge(tmpDir, files...) 53 + tmpName, dstName, err := Merge(tmpDir, files...) 54 + if err != nil { 55 + t.Fatal(err) 56 + } 57 + err = os.Rename(tmpName, dstName) 54 58 if err != nil { 55 59 t.Fatal(err) 56 60 } 57 61 58 62 // explode 59 - f, err := os.Open(cs) 63 + f, err := os.Open(dstName) 60 64 if err != nil { 61 65 t.Fatal(err) 62 66 }