fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "bytes"
5 "context"
6 "os"
7 "os/exec"
8 "path/filepath"
9 "strconv"
10 "time"
11
12 "github.com/grafana/regexp"
13 "github.com/prometheus/client_golang/prometheus"
14 "github.com/prometheus/client_golang/prometheus/promauto"
15 "github.com/sourcegraph/zoekt/index"
16 "github.com/sourcegraph/zoekt/internal/tenant"
17 "go.uber.org/atomic"
18)
19
20var metricShardMergingRunning = promauto.NewGauge(prometheus.GaugeOpts{
21 Name: "index_shard_merging_running",
22 Help: "Set to 1 if indexserver's merge job is running.",
23})
24
25var metricShardMergingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
26 Name: "index_shard_merging_duration_seconds",
27 Help: "The duration of 1 shard merge operation.",
28 Buckets: prometheus.LinearBuckets(30, 30, 10),
29}, []string{"error"})
30
31func pickCandidates(shards []candidate, targetSizeBytes int64) compound {
32 c := compound{}
33 for _, shard := range shards {
34 c.add(shard)
35 if c.size >= targetSizeBytes {
36 return c
37 }
38 }
39 return compound{}
40}
41
42var mergeRunning atomic.Bool
43
44func defaultMergeCmd(args ...string) *exec.Cmd {
45 cmd := exec.Command("zoekt-merge-index", "merge")
46 cmd.Args = append(cmd.Args, args...)
47 return cmd
48}
49
50func defaultExplodeCmd(args ...string) *exec.Cmd {
51 cmd := exec.Command("zoekt-merge-index", "explode")
52 cmd.Args = append(cmd.Args, args...)
53 return cmd
54}
55
56// doMerge drives the merge process. It holds the lock on s.indexDir for the
57// duration of 1 merge, which might be several minutes, depending on the target
58// size of the compound shard.
59func (s *Server) doMerge() {
60 s.merge(defaultMergeCmd)
61}
62
63// same as doMerge but with a configurable merge command.
64func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) {
65 // Guard against the user triggering competing merge jobs with the debug
66 // command.
67 if !mergeRunning.CompareAndSwap(false, true) {
68 infoLog.Printf("merge already running")
69 return
70 }
71 defer mergeRunning.Store(false)
72
73 metricShardMergingRunning.Set(1)
74 defer metricShardMergingRunning.Set(0)
75
76 // We keep creating compound shards until we run out of shards to merge or until
77 // we encounter an error during merging.
78 next := true
79 for next {
80 next = false
81 s.muIndexDir.Global(func() {
82 candidates, excluded := loadCandidates(s.IndexDir, s.mergeOpts)
83 infoLog.Printf("loadCandidates: candidates=%d excluded=%d", len(candidates), excluded)
84
85 c := pickCandidates(candidates, s.mergeOpts.targetSizeBytes)
86 if len(c.shards) <= 1 {
87 infoLog.Printf("could not find enough shards to build a compound shard")
88 return
89 }
90 infoLog.Printf("start merging: shards=%d total_size=%.2fMiB", len(c.shards), float64(c.size)/(1024*1024))
91
92 var paths []string
93 for _, p := range c.shards {
94 paths = append(paths, p.path)
95 }
96
97 start := time.Now()
98
99 cmd := mergeCmd(paths...)
100
101 // zoekt-merge-index writes the full path of the new compound shard to stdout.
102 stdoutBuf := &bytes.Buffer{}
103 stderrBuf := &bytes.Buffer{}
104 cmd.Stdout = stdoutBuf
105 cmd.Stderr = stderrBuf
106
107 err := cmd.Run()
108
109 durationSeconds := time.Since(start).Seconds()
110 metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(durationSeconds)
111 if err != nil {
112 errorLog.Printf("error merging shards: stdout=%s, stderr=%s, durationSeconds=%.2f err=%s", stdoutBuf.String(), stderrBuf.String(), durationSeconds, err)
113 return
114 }
115
116 infoLog.Printf("finished merging: shard=%s durationSeconds=%.2f", stdoutBuf.String(), durationSeconds)
117
118 next = true
119 })
120 }
121}
122
123type candidate struct {
124 path string
125
126 // The size as reported by os.Stat.
127 sizeBytes int64
128}
129
130// loadCandidates returns all shards eligible for merging.
131func loadCandidates(dir string, opts mergeOpts) ([]candidate, int) {
132 excluded := 0
133
134 d, err := os.Open(dir)
135 if err != nil {
136 debugLog.Printf("failed to load candidates: %s", dir)
137 return []candidate{}, excluded
138 }
139 defer d.Close()
140 names, _ := d.Readdirnames(-1)
141
142 candidates := make([]candidate, 0, len(names))
143 for _, n := range names {
144 path := filepath.Join(dir, n)
145
146 fi, err := os.Stat(path)
147 if err != nil {
148 debugLog.Printf("stat failed for %s: %s", n, err)
149 continue
150 }
151
152 if fi.IsDir() || filepath.Ext(path) != ".zoekt" {
153 continue
154 }
155
156 if isExcluded(path, fi, opts) {
157 excluded++
158 continue
159 }
160
161 candidates = append(candidates, candidate{
162 path: path,
163 sizeBytes: fi.Size(),
164 })
165 }
166 return candidates, excluded
167}
168
169var reShard = regexp.MustCompile(`\.[0-9]{5}\.zoekt$`)
170
171func hasMultipleShards(path string) bool {
172 if !reShard.MatchString(path) {
173 return false
174 }
175 secondShard := reShard.ReplaceAllString(path, ".00001.zoekt")
176 _, err := os.Stat(secondShard)
177 return !os.IsNotExist(err)
178}
179
180type mergeOpts struct {
181 // targetSizeBytes is the target size in bytes for compound shards. The higher
182 // the value the more repositories a compound shard will contain and the bigger
183 // the potential for saving MEM. The savings in MEM come at the cost of a
184 // degraded search performance.
185 targetSizeBytes int64
186
187 // compound shards smaller than minSizeBytes will be deleted by vacuum.
188 minSizeBytes int64
189
190 // vacuumInterval is how often indexserver scans compound shards to remove
191 // tombstones.
192 vacuumInterval time.Duration
193
194 // mergeInterval defines how often indexserver runs the merge operation in
195 // the index directory.
196 mergeInterval time.Duration
197
198 // number of days since the last commit until we consider the shard for
199 // merging. For example, a value of 7 means that only repos that have been
200 // inactive for 7 days will be considered for merging.
201 minAgeDays int
202}
203
204// isExcluded returns true if a shard should not be merged, false otherwise.
205//
206// We need path and FileInfo because FileInfo does not contain the full path, see
207// discussion here https://github.com/golang/go/issues/32300.
208func isExcluded(path string, fi os.FileInfo, opts mergeOpts) bool {
209 if hasMultipleShards(path) {
210 return true
211 }
212
213 repos, _, err := index.ReadMetadataPath(path)
214 if err != nil {
215 debugLog.Printf("failed to load metadata for %s\n", fi.Name())
216 return true
217 }
218
219 // Exclude compound shards from being merge targets. Why? We want repositories in a
220 // compound shard to be ordered based on their priority. The easiest way to
221 // enforce this is to delete the compound shard once it drops below a certain
222 // size (handled by cleanup), reindex the repositories and merge them with other
223 // shards in the correct order.
224 if len(repos) > 1 {
225 return true
226 }
227
228 if repos[0].LatestCommitDate.After(time.Now().AddDate(0, 0, -opts.minAgeDays)) {
229 return true
230 }
231
232 return false
233}
234
235type compound struct {
236 shards []candidate
237 size int64
238}
239
240func (c *compound) add(cand candidate) {
241 c.shards = append(c.shards, cand)
242 c.size += cand.sizeBytes
243}
244
245// explodeTenantCompoundShards explodes all compound shards that have repos from
246// the tenant in question. The caller must hold the global lock.
247func (s *Server) explodeTenantCompoundShards(ctx context.Context, explodeFunc func(path string) error) error {
248 tnt, err := tenant.FromContext(ctx)
249 if err != nil {
250 return err
251 }
252
253 paths, err := filepath.Glob(filepath.Join(s.IndexDir, "compound-*"))
254 if err != nil {
255 return err
256 }
257 if len(paths) == 0 {
258 return nil
259 }
260
261nextCompoundShard:
262 for _, path := range paths {
263 // We don't use ReadMetadataPathAlive because we want to detect
264 // tombstoned repos, too.
265 repos, _, err := index.ReadMetadataPath(path)
266 if err != nil {
267 return err
268 }
269 for _, repo := range repos {
270 if repo.TenantID == tnt.ID() {
271 err := explodeFunc(path)
272 if err != nil {
273 return err
274 }
275
276 continue nextCompoundShard
277 }
278 }
279 }
280 return nil
281}