fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

merging: lock index dir during merge (#382)

This updates mainly `doMerge`, which drives the merge process. Previously we didn't lock the index dir which can potentially lead to race conditions with indexing and cleanup.

Changes:
- lock index dir during merging. We release the lock after each compound shard we create
- set default merge interval from 1h to 8h
- recompute candidates before each merge, because candidates might change between two merges
- remove simulation mode
- the debug command is now curl-based and respects the lock

author
Stefan Hengl
committer
GitHub
date (Jun 30, 2022, 10:11 AM +0200) commit 9208d95a parent 629b7ab3
+259 -190
+5 -25
cmd/zoekt-sourcegraph-indexserver/debug.go
··· 7 7 "flag" 8 8 "fmt" 9 9 "log" 10 - "os" 11 10 "strconv" 12 11 13 12 "github.com/peterbourgon/ff/v3/ffcli" 14 - 15 - "github.com/google/zoekt/build" 16 13 ) 17 14 18 15 func debugIndex() *ffcli.Command { ··· 75 72 } 76 73 } 77 74 78 - func debugMerge() *ffcli.Command { 79 - fs := flag.NewFlagSet("debug merge", flag.ExitOnError) 80 - simulate := fs.Bool("simulate", false, "if set, merging will be simulated") 81 - targetSize := fs.Int64("merge_target_size", getEnvWithDefaultInt64("SRC_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 82 - index := fs.String("index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 83 - dbg := fs.Bool("debug", srcLogLevelIsDebug(), "turn on more verbose logging.") 84 - 85 - return &ffcli.Command{ 86 - Name: "merge", 87 - FlagSet: fs, 88 - ShortUsage: "merge [flags] <dir>", 89 - ShortHelp: "run a full merge operation inside dir", 90 - Exec: func(ctx context.Context, args []string) error { 91 - if *dbg { 92 - debug = log.New(os.Stderr, "", log.LstdFlags) 93 - } 94 - return doMerge(*index, *targetSize*1024*1024, *simulate) 95 - }, 96 - } 97 - } 98 - 99 75 func debugCmd() *ffcli.Command { 100 76 fs := flag.NewFlagSet("debug", flag.ExitOnError) 101 77 ··· 114 90 list the repositories that are OWNED by this instance. If indexed=true (default), the list may contain repositories 115 91 that this instance holds temporarily, for example during rebalancing. 116 92 93 + curl http://localhost:6072/debug/merge 94 + start a full merge operation in the index directory. You can check the status with 95 + "curl http://localhost:6072/metrics -sS | grep index_shard_merging_running". It is only possible 96 + to trigger one merge operation at a time. 97 + 117 98 curl http://localhost:6072/debug/queue 118 99 list the repositories in the indexing queue, sorted by descending priority. 119 100 ··· 133 114 FlagSet: fs, 134 115 Subcommands: []*ffcli.Command{ 135 116 debugIndex(), 136 - debugMerge(), 137 117 debugMeta(), 138 118 debugTrigrams(), 139 119 },
+20 -5
cmd/zoekt-sourcegraph-indexserver/main.go
··· 358 358 go func() { 359 359 for range jitterTicker(s.MergeInterval, syscall.SIGUSR1) { 360 360 if s.shardMerging { 361 - err := doMerge(s.IndexDir, s.TargetSizeBytes, false) 362 - if err != nil { 363 - log.Printf("error during merging: %s", err) 364 - } 361 + s.doMerge() 365 362 } 366 363 } 367 364 }() ··· 598 595 599 596 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 600 597 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 598 + mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 601 599 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 602 600 } 603 601 ··· 706 704 } 707 705 } 708 706 707 + // handleDebugMerge triggers a merge even if shard merging is not enabled. Users 708 + // can run this command during periods of low usage (evenings, weekends) to 709 + // trigger an initial merge run. In the steady-state, merges happen rarely, even 710 + // on busy instances, and users can rely on automatic merging instead. 711 + func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 712 + 713 + // A merge operation can take very long, depending on the number merges and the 714 + // target size of the compound shards. We run the merge in the background and 715 + // return immediately to the user. 716 + // 717 + // We track the status of the merge with metricShardMergingRunning. 718 + go func() { 719 + s.doMerge() 720 + }() 721 + w.Write([]byte("merging enqueued\n")) 722 + } 723 + 709 724 func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 710 725 indexed := listIndexed(s.IndexDir) 711 726 ··· 978 993 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 979 994 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 980 995 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", 24*time.Hour, "run vacuum this often") 981 - fs.DurationVar(&rc.mergeInterval, "merge_interval", time.Hour, "run merge this often") 996 + fs.DurationVar(&rc.mergeInterval, "merge_interval", 8*time.Hour, "run merge this often") 982 997 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 983 998 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 984 999 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of concurrent index jobs to run.")
+73 -89
cmd/zoekt-sourcegraph-indexserver/merge.go
··· 1 1 package main 2 2 3 3 import ( 4 - "bytes" 5 4 "fmt" 6 - "io" 5 + "log" 7 6 "os" 8 7 "os/exec" 9 8 "path/filepath" 10 9 "strconv" 11 10 "time" 12 11 13 - "github.com/google/zoekt" 14 12 "github.com/grafana/regexp" 15 13 "github.com/prometheus/client_golang/prometheus" 16 14 "github.com/prometheus/client_golang/prometheus/promauto" 15 + "go.uber.org/atomic" 17 16 "gopkg.in/natefinch/lumberjack.v2" 17 + 18 + "github.com/google/zoekt" 18 19 ) 19 20 20 21 var reCompound = regexp.MustCompile(`compound-.*\.zoekt`) ··· 30 31 Buckets: prometheus.LinearBuckets(30, 30, 10), 31 32 }, []string{"error"}) 32 33 33 - // doMerge drives the merge process. 34 - func doMerge(dir string, targetSizeBytes int64, simulate bool) error { 34 + func pickCandidates(shards []candidate, targetSizeBytes int64) compound { 35 + c := compound{} 36 + for _, shard := range shards { 37 + c.add(shard) 38 + if c.size >= targetSizeBytes { 39 + return c 40 + } 41 + } 42 + return compound{} 43 + } 44 + 45 + var mergeRunning atomic.Bool 46 + 47 + func defaultMergeCmd(args ...string) *exec.Cmd { 48 + cmd := exec.Command("zoekt-merge-index", "merge") 49 + cmd.Args = append(cmd.Args, args...) 50 + return cmd 51 + } 52 + 53 + // doMerge drives the merge process. It holds the lock on s.indexDir for the 54 + // duration of 1 merge, which might be several minutes, depending on the target 55 + // size of the compound shard. 56 + func (s *Server) doMerge() { 57 + s.merge(defaultMergeCmd) 58 + } 59 + 60 + // same as doMerge but with a configurable merge command. 61 + func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) { 62 + 63 + // Guard against the user triggering competing merge jobs with the debug 64 + // command. 65 + if !mergeRunning.CAS(false, true) { 66 + log.Printf("merge already running") 67 + return 68 + } 69 + defer mergeRunning.Store(false) 70 + 35 71 metricShardMergingRunning.Set(1) 36 72 defer metricShardMergingRunning.Set(0) 37 73 38 74 wc := &lumberjack.Logger{ 39 - Filename: filepath.Join(dir, "zoekt-merge-log.tsv"), 75 + Filename: filepath.Join(s.IndexDir, "zoekt-merge-log.tsv"), 40 76 MaxSize: 100, // Megabyte 41 77 MaxBackups: 5, 42 78 } 43 79 44 - if simulate { 45 - debug.Println("simulating") 46 - } 80 + // We keep creating compound shards until we run out of shards to merge or until 81 + // we encounter an error during merging. 82 + next := true 83 + for next { 84 + next = false 85 + s.muIndexDir.Global(func() { 86 + candidates, excluded := loadCandidates(s.IndexDir) 87 + log.Printf("loadCandidates: candidates=%d excluded=%d", len(candidates), excluded) 47 88 48 - shards, excluded := loadCandidates(dir) 49 - debug.Printf("merging: found %d candidate shards, %d shards were excluded\n", len(shards), excluded) 50 - if len(shards) == 0 { 51 - return nil 52 - } 89 + c := pickCandidates(candidates, s.TargetSizeBytes) 90 + if len(c.shards) <= 1 { 91 + log.Printf("could not find enough shards to build a compound shard") 92 + return 93 + } 94 + log.Printf("start merging: shards=%d total_size=%.2fMiB", len(c.shards), float64(c.size)/(1024*1024)) 53 95 54 - compounds, _ := generateCompounds(shards, targetSizeBytes) 55 - debug.Printf("merging: generated %d compounds\n", len(compounds)) 56 - if len(compounds) == 0 { 57 - return nil 58 - } 96 + var paths []string 97 + for _, p := range c.shards { 98 + paths = append(paths, p.path) 99 + } 59 100 60 - var totalSizeBytes int64 = 0 61 - totalShards := 0 62 - for ix, comp := range compounds { 63 - debug.Printf("compound %d: merging %d shards with total size %.2f MiB\n", ix, len(comp.shards), float64(comp.size)/(1024*1024)) 64 - if !simulate { 65 101 start := time.Now() 66 - stdOut, stdErr, err := callMerge(comp.shards) 102 + out, err := mergeCmd(paths...).CombinedOutput() 103 + 67 104 metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(time.Since(start).Seconds()) 68 - debug.Printf("callMerge: OUT: %s, ERR: %s\n", string(stdOut), string(stdErr)) 69 105 if err != nil { 70 - debug.Printf("error during merging compound %d, stdErr: %s, err: %s\n", ix, stdErr, err) 71 - continue 106 + log.Printf("mergeCmd: out=%s, err=%s", out, err) 107 + return 72 108 } 73 - // for len(comp.shards)<=1, callMerge is a NOP. Hence there is no need to log 74 - // anything here. 75 - if len(comp.shards) > 1 { 76 - newCompoundName := reCompound.Find(stdErr) 77 - now := time.Now() 78 - for _, s := range comp.shards { 79 - _, _ = fmt.Fprintf(wc, "%s\t%s\t%s\t%s\n", now.UTC().Format(time.RFC3339), "merge", filepath.Base(s.path), string(newCompoundName)) 80 - } 109 + 110 + newCompoundName := reCompound.Find(out) 111 + now := time.Now() 112 + for _, s := range c.shards { 113 + _, _ = fmt.Fprintf(wc, "%s\t%s\t%s\t%s\n", now.UTC().Format(time.RFC3339), "merge", filepath.Base(s.path), string(newCompoundName)) 81 114 } 82 - } 83 - totalShards += len(comp.shards) 84 - totalSizeBytes += comp.size 115 + 116 + next = true 117 + }) 85 118 } 86 - 87 - debug.Printf("total size: %.2f MiB, number of shards merged: %d\n", float64(totalSizeBytes)/(1024*1024), totalShards) 88 - return nil 89 119 } 90 120 91 121 type candidate struct { ··· 95 125 sizeBytes int64 96 126 } 97 127 98 - // loadCandidates returns all shards eligable for merging. 128 + // loadCandidates returns all shards eligible for merging. 99 129 func loadCandidates(dir string) ([]candidate, int) { 100 130 excluded := 0 101 131 ··· 189 219 c.shards = append(c.shards, cand) 190 220 c.size += cand.sizeBytes 191 221 } 192 - 193 - // generateCompounds groups simple shards into compound shards without performing 194 - // the actual merge. Shards that are not contained in any of the compound shards 195 - // are returned in the second argument. 196 - func generateCompounds(shards []candidate, targetSizeBytes int64) ([]compound, []candidate) { 197 - compounds := make([]compound, 0) 198 - cur := compound{} 199 - for _, s := range shards { 200 - cur.add(s) 201 - if cur.size > targetSizeBytes { 202 - compounds = append(compounds, cur) 203 - cur = compound{} 204 - } 205 - } 206 - return compounds, cur.shards 207 - } 208 - 209 - // callMerge calls zoekt-merge-index and captures its output. callMerge is a NOP 210 - // if len(shards) <= 1. 211 - func callMerge(shards []candidate) ([]byte, []byte, error) { 212 - if len(shards) <= 1 { 213 - return nil, nil, nil 214 - } 215 - 216 - cmd := exec.Command("zoekt-merge-index", "merge", "-") 217 - 218 - outBuf := &bytes.Buffer{} 219 - errBuf := &bytes.Buffer{} 220 - cmd.Stdout = outBuf 221 - cmd.Stderr = errBuf 222 - 223 - wc, err := cmd.StdinPipe() 224 - if err != nil { 225 - return nil, nil, err 226 - } 227 - 228 - go func() { 229 - for _, s := range shards { 230 - _, _ = io.WriteString(wc, fmt.Sprintf("%s\n", s.path)) 231 - } 232 - _ = wc.Close() 233 - }() 234 - 235 - err = cmd.Run() 236 - return outBuf.Bytes(), errBuf.Bytes(), err 237 - }
+161 -71
cmd/zoekt-sourcegraph-indexserver/merge_test.go
··· 1 1 package main 2 2 3 3 import ( 4 - "math/rand" 4 + "crypto/sha1" 5 + "fmt" 6 + "io" 5 7 "os" 8 + "os/exec" 6 9 "path/filepath" 7 10 "strings" 8 11 "testing" 9 - "testing/quick" 10 12 11 13 "github.com/google/zoekt" 12 14 "github.com/google/zoekt/build" ··· 42 44 } 43 45 } 44 46 45 - // genTestCompounds is a helper that generates compounds from n shards with sizes 46 - // in (0, targetSize]. 47 - func genTestCompounds(t *testing.T, n uint8, targetSize int64) ([]compound, []candidate, int64) { 48 - t.Helper() 47 + func TestDoNotDeleteSingleShards(t *testing.T) { 48 + dir := t.TempDir() 49 49 50 - candidates := make([]candidate, 0, n) 51 - var totalSize int64 52 - var i uint8 53 - for i = 0; i < n; i++ { 54 - thisSize := rand.Int63n(targetSize) + 1 55 - candidates = append(candidates, candidate{"", thisSize}) 56 - totalSize += thisSize 50 + // Create a test shard. 51 + opts := build.Options{ 52 + IndexDir: dir, 53 + RepositoryDescription: zoekt.Repository{Name: "test-repo"}, 54 + } 55 + opts.SetDefaults() 56 + b, err := build.NewBuilder(opts) 57 + if err != nil { 58 + t.Fatalf("NewBuilder: %v", err) 59 + } 60 + if err := b.AddFile("F", []byte(strings.Repeat("abc", 100))); err != nil { 61 + t.Fatalf("AddFile: %v", err) 62 + } 63 + if err := b.Finish(); err != nil { 64 + t.Errorf("Finish: %v", err) 57 65 } 58 66 59 - compounds, excluded := generateCompounds(candidates, targetSize) 60 - return compounds, excluded, totalSize 61 - } 67 + s := &Server{IndexDir: dir, TargetSizeBytes: 2000 * 1024 * 1024} 68 + s.merge(helperCallMerge) 62 69 63 - func TestEitherMergedOrExcluded(t *testing.T) { 64 - // n is uint8 to keep the slices reasonably small and the tests performant. 65 - f := func(n uint8) bool { 66 - compounds, excluded, wantTotalSize := genTestCompounds(t, n, 10) 67 - shardCount := len(excluded) 68 - var gotTotalSize int64 69 - for _, c := range compounds { 70 - shardCount += len(c.shards) 71 - gotTotalSize += c.size 72 - } 73 - for _, c := range excluded { 74 - gotTotalSize += c.sizeBytes 75 - } 76 - if shardCount != int(n) { 77 - t.Logf("shards: want %d, got %d", int(n), shardCount) 78 - return false 79 - } 80 - if gotTotalSize != wantTotalSize { 81 - t.Logf("total size: want %d, got %d", wantTotalSize, gotTotalSize) 82 - return false 83 - } 84 - return true 70 + _, err = os.Stat(filepath.Join(dir, "test-repo_v16.00000.zoekt")) 71 + if err != nil { 72 + t.Fatal(err) 85 73 } 74 + } 86 75 87 - if err := quick.Check(f, nil); err != nil { 88 - t.Fatal(err) 76 + func helperCallMerge(s ...string) *exec.Cmd { 77 + cs := []string{"-test.run=TestCallMerge", "--"} 78 + cs = append(cs, s...) 79 + env := []string{ 80 + "GO_TEST_WANT_CALL_MERGE=1", 89 81 } 82 + cmd := exec.Command(os.Args[0], cs...) 83 + cmd.Env = append(env, os.Environ()...) 84 + return cmd 90 85 } 91 86 92 - func TestCompoundsHaveSizeAboveTargetSize(t *testing.T) { 93 - f := func(n uint8, targetSize int64) bool { 94 - if targetSize <= 0 { 95 - return true 96 - } 87 + func TestCallMerge(t *testing.T) { 88 + if os.Getenv("GO_TEST_WANT_CALL_MERGE") != "1" { 89 + return 90 + } 91 + defer os.Exit(0) 97 92 98 - compounds, _, _ := genTestCompounds(t, n, targetSize) 99 - for _, c := range compounds { 100 - if c.size < targetSize { 101 - return false 102 - } 93 + args := os.Args 94 + for len(args) > 0 { 95 + if args[0] == "--" { 96 + args = args[1:] 97 + break 103 98 } 104 - return true 99 + args = args[1:] 105 100 } 106 101 107 - if err := quick.Check(f, nil); err != nil { 108 - t.Fatal(err) 102 + // We mock the merge process by deleting the input shards and creating an empty 103 + // compound shard with a proper name. 104 + h := sha1.New() 105 + for _, a := range args { 106 + h.Write([]byte(filepath.Base(a))) 107 + h.Write([]byte{0}) 108 + _ = os.Remove(a) 109 109 } 110 + 111 + compoundShardName := filepath.Join(filepath.Dir(args[1]), fmt.Sprintf("compound-%x_v%d.%05d.zoekt", h.Sum(nil), 17, 0)) 112 + f, _ := os.Create(compoundShardName) 113 + _ = f.Close() 114 + 115 + // Just like zoekt-merge-index, we write the name of the compound shard to 116 + // stdout. 117 + _, _ = fmt.Fprint(os.Stdout, compoundShardName) 110 118 } 111 119 112 - func TestDoNotDeleteSingleShards(t *testing.T) { 113 - dir := t.TempDir() 120 + func TestMerge(t *testing.T) { 114 121 115 - // Create a test shard. 116 - opts := build.Options{ 117 - IndexDir: dir, 118 - RepositoryDescription: zoekt.Repository{Name: "test-repo"}, 122 + // A fixed set of shards gives us reliable shard sizes which makes it easy to 123 + // define a cutoff with targetSizeBytes. 124 + m := []string{ 125 + "../../testdata/shards/repo_v16.00000.zoekt", 126 + "../../testdata/shards/repo2_v16.00000.zoekt", 127 + "../../testdata/shards/ctagsrepo_v16.00000.zoekt", 119 128 } 120 - opts.SetDefaults() 121 - b, err := build.NewBuilder(opts) 122 - if err != nil { 123 - t.Fatalf("NewBuilder: %v", err) 129 + 130 + testCases := []struct { 131 + name string 132 + targetSizeBytes int64 133 + wantCompound int 134 + wantSimple int 135 + }{ 136 + { 137 + name: "3 shards", 138 + targetSizeBytes: 6 * 1024, 139 + wantCompound: 1, 140 + wantSimple: 0, 141 + }, 142 + { 143 + name: "2 shards", 144 + targetSizeBytes: 4 * 1024, 145 + wantCompound: 1, 146 + wantSimple: 1, 147 + }, 148 + { 149 + // This is a pathological case where the target size of a compound shard is 150 + // smaller than the size of a simple shard. In realistic scenarios, 151 + // targetSizeBytes should be 100x or more of a typical shard size. 152 + name: "target size too small", 153 + targetSizeBytes: 2 * 1024, 154 + wantCompound: 0, 155 + wantSimple: 3, 156 + }, 157 + { 158 + name: "target size too big", 159 + targetSizeBytes: 10 * 1024, 160 + wantCompound: 0, 161 + wantSimple: 3, 162 + }, 163 + { 164 + name: "target size 0", 165 + targetSizeBytes: 0, 166 + wantCompound: 0, 167 + wantSimple: 3, 168 + }, 124 169 } 125 - if err := b.AddFile("F", []byte(strings.Repeat("abc", 100))); err != nil { 126 - t.Fatalf("AddFile: %v", err) 170 + 171 + checkCount := func(dir string, pattern string, want int) { 172 + have, err := filepath.Glob(filepath.Join(dir, pattern)) 173 + if err != nil { 174 + t.Fatal(err) 175 + } 176 + if len(have) != want { 177 + t.Fatalf("want %d, have %d", want, len(have)) 178 + } 127 179 } 128 - if err := b.Finish(); err != nil { 129 - t.Errorf("Finish: %v", err) 180 + 181 + for _, tc := range testCases { 182 + t.Run(tc.name, func(t *testing.T) { 183 + dir := t.TempDir() 184 + _, err := copyTestShards(dir, m) 185 + if err != nil { 186 + t.Fatal(err) 187 + } 188 + 189 + s := &Server{ 190 + IndexDir: dir, 191 + TargetSizeBytes: tc.targetSizeBytes, 192 + } 193 + 194 + s.merge(helperCallMerge) 195 + 196 + checkCount(dir, "compound-*", tc.wantCompound) 197 + checkCount(dir, "*_v16.00000.zoekt", tc.wantSimple) 198 + }) 130 199 } 131 200 132 - err = doMerge(dir, 2000*1024*1024, false) 201 + } 202 + 203 + func copyTestShards(dstDir string, srcShards []string) ([]string, error) { 204 + var tmpShards []string 205 + for _, s := range srcShards { 206 + dst := filepath.Join(dstDir, filepath.Base(s)) 207 + tmpShards = append(tmpShards, dst) 208 + if err := copyFile(s, dst); err != nil { 209 + return nil, err 210 + } 211 + } 212 + return tmpShards, nil 213 + } 214 + 215 + func copyFile(src, dst string) (err error) { 216 + s, err := os.Open(src) 133 217 if err != nil { 134 - t.Fatal(err) 218 + return err 135 219 } 220 + defer s.Close() 136 221 137 - _, err = os.Stat(filepath.Join(dir, "test-repo_v16.00000.zoekt")) 222 + d, err := os.Create(dst) 138 223 if err != nil { 139 - t.Fatal(err) 224 + return err 225 + } 226 + if _, err := io.Copy(d, s); err != nil { 227 + d.Close() 228 + return err 140 229 } 230 + return d.Close() 141 231 }