fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "bytes"
5 "os"
6 "os/exec"
7 "path/filepath"
8 "strconv"
9 "time"
10
11 "github.com/grafana/regexp"
12 "github.com/prometheus/client_golang/prometheus"
13 "github.com/prometheus/client_golang/prometheus/promauto"
14 "go.uber.org/atomic"
15
16 "github.com/sourcegraph/zoekt"
17)
18
19var metricShardMergingRunning = promauto.NewGauge(prometheus.GaugeOpts{
20 Name: "index_shard_merging_running",
21 Help: "Set to 1 if indexserver's merge job is running.",
22})
23
24var metricShardMergingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
25 Name: "index_shard_merging_duration_seconds",
26 Help: "The duration of 1 shard merge operation.",
27 Buckets: prometheus.LinearBuckets(30, 30, 10),
28}, []string{"error"})
29
30func pickCandidates(shards []candidate, targetSizeBytes int64) compound {
31 c := compound{}
32 for _, shard := range shards {
33 c.add(shard)
34 if c.size >= targetSizeBytes {
35 return c
36 }
37 }
38 return compound{}
39}
40
41var mergeRunning atomic.Bool
42
43func defaultMergeCmd(args ...string) *exec.Cmd {
44 cmd := exec.Command("zoekt-merge-index", "merge")
45 cmd.Args = append(cmd.Args, args...)
46 return cmd
47}
48
49// doMerge drives the merge process. It holds the lock on s.indexDir for the
50// duration of 1 merge, which might be several minutes, depending on the target
51// size of the compound shard.
52func (s *Server) doMerge() {
53 s.merge(defaultMergeCmd)
54}
55
56// same as doMerge but with a configurable merge command.
57func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) {
58 // Guard against the user triggering competing merge jobs with the debug
59 // command.
60 if !mergeRunning.CompareAndSwap(false, true) {
61 infoLog.Printf("merge already running")
62 return
63 }
64 defer mergeRunning.Store(false)
65
66 metricShardMergingRunning.Set(1)
67 defer metricShardMergingRunning.Set(0)
68
69 // We keep creating compound shards until we run out of shards to merge or until
70 // we encounter an error during merging.
71 next := true
72 for next {
73 next = false
74 s.muIndexDir.Global(func() {
75 candidates, excluded := loadCandidates(s.IndexDir, s.mergeOpts)
76 infoLog.Printf("loadCandidates: candidates=%d excluded=%d", len(candidates), excluded)
77
78 c := pickCandidates(candidates, s.mergeOpts.targetSizeBytes)
79 if len(c.shards) <= 1 {
80 infoLog.Printf("could not find enough shards to build a compound shard")
81 return
82 }
83 infoLog.Printf("start merging: shards=%d total_size=%.2fMiB", len(c.shards), float64(c.size)/(1024*1024))
84
85 var paths []string
86 for _, p := range c.shards {
87 paths = append(paths, p.path)
88 }
89
90 start := time.Now()
91
92 cmd := mergeCmd(paths...)
93
94 // zoekt-merge-index writes the full path of the new compound shard to stdout.
95 stdoutBuf := &bytes.Buffer{}
96 stderrBuf := &bytes.Buffer{}
97 cmd.Stdout = stdoutBuf
98 cmd.Stderr = stderrBuf
99
100 err := cmd.Run()
101
102 durationSeconds := time.Since(start).Seconds()
103 metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(durationSeconds)
104 if err != nil {
105 errorLog.Printf("error merging shards: stdout=%s, stderr=%s, durationSeconds=%.2f err=%s", stdoutBuf.String(), stderrBuf.String(), durationSeconds, err)
106 return
107 }
108
109 infoLog.Printf("finished merging: shard=%s durationSeconds=%.2f", stdoutBuf.String(), durationSeconds)
110
111 next = true
112 })
113 }
114}
115
116type candidate struct {
117 path string
118
119 // The size as reported by os.Stat.
120 sizeBytes int64
121}
122
123// loadCandidates returns all shards eligible for merging.
124func loadCandidates(dir string, opts mergeOpts) ([]candidate, int) {
125 excluded := 0
126
127 d, err := os.Open(dir)
128 if err != nil {
129 debugLog.Printf("failed to load candidates: %s", dir)
130 return []candidate{}, excluded
131 }
132 defer d.Close()
133 names, _ := d.Readdirnames(-1)
134
135 candidates := make([]candidate, 0, len(names))
136 for _, n := range names {
137 path := filepath.Join(dir, n)
138
139 fi, err := os.Stat(path)
140 if err != nil {
141 debugLog.Printf("stat failed for %s: %s", n, err)
142 continue
143 }
144
145 if fi.IsDir() || filepath.Ext(path) != ".zoekt" {
146 continue
147 }
148
149 if isExcluded(path, fi, opts) {
150 excluded++
151 continue
152 }
153
154 candidates = append(candidates, candidate{
155 path: path,
156 sizeBytes: fi.Size(),
157 })
158 }
159 return candidates, excluded
160}
161
162var reShard = regexp.MustCompile(`\.[0-9]{5}\.zoekt$`)
163
164func hasMultipleShards(path string) bool {
165 if !reShard.MatchString(path) {
166 return false
167 }
168 secondShard := reShard.ReplaceAllString(path, ".00001.zoekt")
169 _, err := os.Stat(secondShard)
170 return !os.IsNotExist(err)
171}
172
173type mergeOpts struct {
174 // targetSizeBytes is the target size in bytes for compound shards. The higher
175 // the value the more repositories a compound shard will contain and the bigger
176 // the potential for saving MEM. The savings in MEM come at the cost of a
177 // degraded search performance.
178 targetSizeBytes int64
179
180 // compound shards smaller than minSizeBytes will be deleted by vacuum.
181 minSizeBytes int64
182
183 // vacuumInterval is how often indexserver scans compound shards to remove
184 // tombstones.
185 vacuumInterval time.Duration
186
187 // mergeInterval defines how often indexserver runs the merge operation in
188 // the index directory.
189 mergeInterval time.Duration
190
191 // number of days since the last commit until we consider the shard for
192 // merging. For example, a value of 7 means that only repos that have been
193 // inactive for 7 days will be considered for merging.
194 minAgeDays int
195}
196
197// isExcluded returns true if a shard should not be merged, false otherwise.
198//
199// We need path and FileInfo because FileInfo does not contain the full path, see
200// discussion here https://github.com/golang/go/issues/32300.
201func isExcluded(path string, fi os.FileInfo, opts mergeOpts) bool {
202 if hasMultipleShards(path) {
203 return true
204 }
205
206 repos, _, err := zoekt.ReadMetadataPath(path)
207 if err != nil {
208 debugLog.Printf("failed to load metadata for %s\n", fi.Name())
209 return true
210 }
211
212 // Exclude compound shards from being merge targets. Why? We want repositories in a
213 // compound shard to be ordered based on their priority. The easiest way to
214 // enforce this is to delete the compound shard once it drops below a certain
215 // size (handled by cleanup), reindex the repositories and merge them with other
216 // shards in the correct order.
217 if len(repos) > 1 {
218 return true
219 }
220
221 if repos[0].LatestCommitDate.After(time.Now().AddDate(0, 0, -opts.minAgeDays)) {
222 return true
223 }
224
225 return false
226}
227
228type compound struct {
229 shards []candidate
230 size int64
231}
232
233func (c *compound) add(cand candidate) {
234 c.shards = append(c.shards, cand)
235 c.size += cand.sizeBytes
236}