fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "bytes"
5 "os"
6 "os/exec"
7 "path/filepath"
8 "strconv"
9 "time"
10
11 "github.com/grafana/regexp"
12 "github.com/prometheus/client_golang/prometheus"
13 "github.com/prometheus/client_golang/prometheus/promauto"
14 "github.com/sourcegraph/zoekt/index"
15 "go.uber.org/atomic"
16)
17
18var metricShardMergingRunning = promauto.NewGauge(prometheus.GaugeOpts{
19 Name: "index_shard_merging_running",
20 Help: "Set to 1 if indexserver's merge job is running.",
21})
22
23var metricShardMergingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
24 Name: "index_shard_merging_duration_seconds",
25 Help: "The duration of 1 shard merge operation.",
26 Buckets: prometheus.LinearBuckets(30, 30, 10),
27}, []string{"error"})
28
29func pickCandidates(shards []candidate, targetSizeBytes int64) compound {
30 c := compound{}
31 for _, shard := range shards {
32 c.add(shard)
33 if c.size >= targetSizeBytes {
34 return c
35 }
36 }
37 return compound{}
38}
39
40var mergeRunning atomic.Bool
41
42func defaultMergeCmd(args ...string) *exec.Cmd {
43 cmd := exec.Command("zoekt-merge-index", "merge")
44 cmd.Args = append(cmd.Args, args...)
45 return cmd
46}
47
48// doMerge drives the merge process. It holds the lock on s.indexDir for the
49// duration of 1 merge, which might be several minutes, depending on the target
50// size of the compound shard.
51func (s *Server) doMerge() {
52 s.merge(defaultMergeCmd)
53}
54
55// same as doMerge but with a configurable merge command.
56func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) {
57 // Guard against the user triggering competing merge jobs with the debug
58 // command.
59 if !mergeRunning.CompareAndSwap(false, true) {
60 infoLog.Printf("merge already running")
61 return
62 }
63 defer mergeRunning.Store(false)
64
65 metricShardMergingRunning.Set(1)
66 defer metricShardMergingRunning.Set(0)
67
68 // We keep creating compound shards until we run out of shards to merge or until
69 // we encounter an error during merging.
70 next := true
71 for next {
72 next = false
73 s.muIndexDir.Global(func() {
74 candidates, excluded := loadCandidates(s.IndexDir, s.mergeOpts)
75 infoLog.Printf("loadCandidates: candidates=%d excluded=%d", len(candidates), excluded)
76
77 c := pickCandidates(candidates, s.mergeOpts.targetSizeBytes)
78 if len(c.shards) <= 1 {
79 infoLog.Printf("could not find enough shards to build a compound shard")
80 return
81 }
82 infoLog.Printf("start merging: shards=%d total_size=%.2fMiB", len(c.shards), float64(c.size)/(1024*1024))
83
84 var paths []string
85 for _, p := range c.shards {
86 paths = append(paths, p.path)
87 }
88
89 start := time.Now()
90
91 cmd := mergeCmd(paths...)
92
93 // zoekt-merge-index writes the full path of the new compound shard to stdout.
94 stdoutBuf := &bytes.Buffer{}
95 stderrBuf := &bytes.Buffer{}
96 cmd.Stdout = stdoutBuf
97 cmd.Stderr = stderrBuf
98
99 err := cmd.Run()
100
101 durationSeconds := time.Since(start).Seconds()
102 metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(durationSeconds)
103 if err != nil {
104 errorLog.Printf("error merging shards: stdout=%s, stderr=%s, durationSeconds=%.2f err=%s", stdoutBuf.String(), stderrBuf.String(), durationSeconds, err)
105 return
106 }
107
108 infoLog.Printf("finished merging: shard=%s durationSeconds=%.2f", stdoutBuf.String(), durationSeconds)
109
110 next = true
111 })
112 }
113}
114
115type candidate struct {
116 path string
117
118 // The size as reported by os.Stat.
119 sizeBytes int64
120}
121
122// loadCandidates returns all shards eligible for merging.
123func loadCandidates(dir string, opts mergeOpts) ([]candidate, int) {
124 excluded := 0
125
126 d, err := os.Open(dir)
127 if err != nil {
128 debugLog.Printf("failed to load candidates: %s", dir)
129 return []candidate{}, excluded
130 }
131 defer d.Close()
132 names, _ := d.Readdirnames(-1)
133
134 candidates := make([]candidate, 0, len(names))
135 for _, n := range names {
136 path := filepath.Join(dir, n)
137
138 fi, err := os.Stat(path)
139 if err != nil {
140 debugLog.Printf("stat failed for %s: %s", n, err)
141 continue
142 }
143
144 if fi.IsDir() || filepath.Ext(path) != ".zoekt" {
145 continue
146 }
147
148 if isExcluded(path, fi, opts) {
149 excluded++
150 continue
151 }
152
153 candidates = append(candidates, candidate{
154 path: path,
155 sizeBytes: fi.Size(),
156 })
157 }
158 return candidates, excluded
159}
160
161var reShard = regexp.MustCompile(`\.[0-9]{5}\.zoekt$`)
162
163func hasMultipleShards(path string) bool {
164 if !reShard.MatchString(path) {
165 return false
166 }
167 secondShard := reShard.ReplaceAllString(path, ".00001.zoekt")
168 _, err := os.Stat(secondShard)
169 return !os.IsNotExist(err)
170}
171
172type mergeOpts struct {
173 // targetSizeBytes is the target size in bytes for compound shards. The higher
174 // the value the more repositories a compound shard will contain and the bigger
175 // the potential for saving MEM. The savings in MEM come at the cost of a
176 // degraded search performance.
177 targetSizeBytes int64
178
179 // compound shards smaller than minSizeBytes will be deleted by vacuum.
180 minSizeBytes int64
181
182 // vacuumInterval is how often indexserver scans compound shards to remove
183 // tombstones.
184 vacuumInterval time.Duration
185
186 // mergeInterval defines how often indexserver runs the merge operation in
187 // the index directory.
188 mergeInterval time.Duration
189
190 // number of days since the last commit until we consider the shard for
191 // merging. For example, a value of 7 means that only repos that have been
192 // inactive for 7 days will be considered for merging.
193 minAgeDays int
194}
195
196// isExcluded returns true if a shard should not be merged, false otherwise.
197//
198// We need path and FileInfo because FileInfo does not contain the full path, see
199// discussion here https://github.com/golang/go/issues/32300.
200func isExcluded(path string, fi os.FileInfo, opts mergeOpts) bool {
201 if hasMultipleShards(path) {
202 return true
203 }
204
205 repos, _, err := index.ReadMetadataPath(path)
206 if err != nil {
207 debugLog.Printf("failed to load metadata for %s\n", fi.Name())
208 return true
209 }
210
211 // Exclude compound shards from being merge targets. Why? We want repositories in a
212 // compound shard to be ordered based on their priority. The easiest way to
213 // enforce this is to delete the compound shard once it drops below a certain
214 // size (handled by cleanup), reindex the repositories and merge them with other
215 // shards in the correct order.
216 if len(repos) > 1 {
217 return true
218 }
219
220 if repos[0].LatestCommitDate.After(time.Now().AddDate(0, 0, -opts.minAgeDays)) {
221 return true
222 }
223
224 return false
225}
226
227type compound struct {
228 shards []candidate
229 size int64
230}
231
232func (c *compound) add(cand candidate) {
233 c.shards = append(c.shards, cand)
234 c.size += cand.sizeBytes
235}