fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "bytes"
5 "context"
6 "os"
7 "os/exec"
8 "path/filepath"
9 "strconv"
10 "time"
11
12 "github.com/grafana/regexp"
13 "github.com/prometheus/client_golang/prometheus"
14 "github.com/prometheus/client_golang/prometheus/promauto"
15 "go.uber.org/atomic"
16
17 "github.com/sourcegraph/zoekt/index"
18 "github.com/sourcegraph/zoekt/internal/tenant"
19)
20
21var metricShardMergingRunning = promauto.NewGauge(prometheus.GaugeOpts{
22 Name: "index_shard_merging_running",
23 Help: "Set to 1 if indexserver's merge job is running.",
24})
25
26var metricShardMergingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
27 Name: "index_shard_merging_duration_seconds",
28 Help: "The duration of 1 shard merge operation.",
29 Buckets: prometheus.LinearBuckets(30, 30, 10),
30}, []string{"error"})
31
32func pickCandidates(shards []candidate, targetSizeBytes int64) compound {
33 c := compound{}
34 for _, shard := range shards {
35 c.add(shard)
36 if c.size >= targetSizeBytes {
37 return c
38 }
39 }
40 return compound{}
41}
42
43var mergeRunning atomic.Bool
44
45func defaultMergeCmd(args ...string) *exec.Cmd {
46 cmd := exec.Command("zoekt-merge-index", "merge")
47 cmd.Args = append(cmd.Args, args...)
48 return cmd
49}
50
51func defaultExplodeCmd(args ...string) *exec.Cmd {
52 cmd := exec.Command("zoekt-merge-index", "explode")
53 cmd.Args = append(cmd.Args, args...)
54 return cmd
55}
56
57// doMerge drives the merge process. It holds the lock on s.indexDir for the
58// duration of 1 merge, which might be several minutes, depending on the target
59// size of the compound shard.
60func (s *Server) doMerge() {
61 s.merge(defaultMergeCmd)
62}
63
64// same as doMerge but with a configurable merge command.
65func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) {
66 // Guard against the user triggering competing merge jobs with the debug
67 // command.
68 if !mergeRunning.CompareAndSwap(false, true) {
69 infoLog.Printf("merge already running")
70 return
71 }
72 defer mergeRunning.Store(false)
73
74 metricShardMergingRunning.Set(1)
75 defer metricShardMergingRunning.Set(0)
76
77 // We keep creating compound shards until we run out of shards to merge or until
78 // we encounter an error during merging.
79 next := true
80 for next {
81 next = false
82 s.muIndexDir.Global(func() {
83 candidates, excluded := loadCandidates(s.IndexDir, s.mergeOpts)
84 infoLog.Printf("loadCandidates: candidates=%d excluded=%d", len(candidates), excluded)
85
86 c := pickCandidates(candidates, s.mergeOpts.targetSizeBytes)
87 if len(c.shards) <= 1 {
88 infoLog.Printf("could not find enough shards to build a compound shard")
89 return
90 }
91 infoLog.Printf("start merging: shards=%d total_size=%.2fMiB", len(c.shards), float64(c.size)/(1024*1024))
92
93 var paths []string
94 for _, p := range c.shards {
95 paths = append(paths, p.path)
96 }
97
98 start := time.Now()
99
100 cmd := mergeCmd(paths...)
101
102 // zoekt-merge-index writes the full path of the new compound shard to stdout.
103 stdoutBuf := &bytes.Buffer{}
104 stderrBuf := &bytes.Buffer{}
105 cmd.Stdout = stdoutBuf
106 cmd.Stderr = stderrBuf
107
108 err := cmd.Run()
109
110 durationSeconds := time.Since(start).Seconds()
111 metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(durationSeconds)
112 if err != nil {
113 errorLog.Printf("error merging shards: stdout=%s, stderr=%s, durationSeconds=%.2f err=%s", stdoutBuf.String(), stderrBuf.String(), durationSeconds, err)
114 return
115 }
116
117 infoLog.Printf("finished merging: shard=%s durationSeconds=%.2f", stdoutBuf.String(), durationSeconds)
118
119 next = true
120 })
121 }
122}
123
124type candidate struct {
125 path string
126
127 // The size as reported by os.Stat.
128 sizeBytes int64
129}
130
131// loadCandidates returns all shards eligible for merging.
132func loadCandidates(dir string, opts mergeOpts) ([]candidate, int) {
133 excluded := 0
134
135 d, err := os.Open(dir)
136 if err != nil {
137 debugLog.Printf("failed to load candidates: %s", dir)
138 return []candidate{}, excluded
139 }
140 defer d.Close()
141 names, _ := d.Readdirnames(-1)
142
143 candidates := make([]candidate, 0, len(names))
144 for _, n := range names {
145 path := filepath.Join(dir, n)
146
147 fi, err := os.Stat(path)
148 if err != nil {
149 debugLog.Printf("stat failed for %s: %s", n, err)
150 continue
151 }
152
153 if fi.IsDir() || filepath.Ext(path) != ".zoekt" {
154 continue
155 }
156
157 if isExcluded(path, fi, opts) {
158 excluded++
159 continue
160 }
161
162 candidates = append(candidates, candidate{
163 path: path,
164 sizeBytes: fi.Size(),
165 })
166 }
167 return candidates, excluded
168}
169
170var reShard = regexp.MustCompile(`\.[0-9]{5}\.zoekt$`)
171
172func hasMultipleShards(path string) bool {
173 if !reShard.MatchString(path) {
174 return false
175 }
176 secondShard := reShard.ReplaceAllString(path, ".00001.zoekt")
177 _, err := os.Stat(secondShard)
178 return !os.IsNotExist(err)
179}
180
181type mergeOpts struct {
182 // targetSizeBytes is the target size in bytes for compound shards. The higher
183 // the value the more repositories a compound shard will contain and the bigger
184 // the potential for saving MEM. The savings in MEM come at the cost of a
185 // degraded search performance.
186 targetSizeBytes int64
187
188 // compound shards smaller than minSizeBytes will be deleted by vacuum.
189 minSizeBytes int64
190
191 // vacuumInterval is how often indexserver scans compound shards to remove
192 // tombstones.
193 vacuumInterval time.Duration
194
195 // mergeInterval defines how often indexserver runs the merge operation in
196 // the index directory.
197 mergeInterval time.Duration
198
199 // number of days since the last commit until we consider the shard for
200 // merging. For example, a value of 7 means that only repos that have been
201 // inactive for 7 days will be considered for merging.
202 minAgeDays int
203}
204
205// isExcluded returns true if a shard should not be merged, false otherwise.
206//
207// We need path and FileInfo because FileInfo does not contain the full path, see
208// discussion here https://github.com/golang/go/issues/32300.
209func isExcluded(path string, fi os.FileInfo, opts mergeOpts) bool {
210 if hasMultipleShards(path) {
211 return true
212 }
213
214 repos, _, err := index.ReadMetadataPath(path)
215 if err != nil {
216 debugLog.Printf("failed to load metadata for %s\n", fi.Name())
217 return true
218 }
219
220 // Exclude compound shards from being merge targets. Why? We want repositories in a
221 // compound shard to be ordered based on their priority. The easiest way to
222 // enforce this is to delete the compound shard once it drops below a certain
223 // size (handled by cleanup), reindex the repositories and merge them with other
224 // shards in the correct order.
225 if len(repos) > 1 {
226 return true
227 }
228
229 if repos[0].LatestCommitDate.After(time.Now().AddDate(0, 0, -opts.minAgeDays)) {
230 return true
231 }
232
233 return false
234}
235
236type compound struct {
237 shards []candidate
238 size int64
239}
240
241func (c *compound) add(cand candidate) {
242 c.shards = append(c.shards, cand)
243 c.size += cand.sizeBytes
244}
245
246// explodeTenantCompoundShards explodes all compound shards that have repos from
247// the tenant in question. The caller must hold the global lock.
248func (s *Server) explodeTenantCompoundShards(ctx context.Context, explodeFunc func(path string) error) error {
249 tnt, err := tenant.FromContext(ctx)
250 if err != nil {
251 return err
252 }
253
254 paths, err := filepath.Glob(filepath.Join(s.IndexDir, "compound-*"))
255 if err != nil {
256 return err
257 }
258 if len(paths) == 0 {
259 return nil
260 }
261
262nextCompoundShard:
263 for _, path := range paths {
264 // We don't use ReadMetadataPathAlive because we want to detect
265 // tombstoned repos, too.
266 repos, _, err := index.ReadMetadataPath(path)
267 if err != nil {
268 return err
269 }
270 for _, repo := range repos {
271 if repo.TenantID == tnt.ID() {
272 err := explodeFunc(path)
273 if err != nil {
274 return err
275 }
276
277 continue nextCompoundShard
278 }
279 }
280 }
281 return nil
282}