fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "bytes"
5 "log"
6 "os"
7 "os/exec"
8 "path/filepath"
9 "strconv"
10 "time"
11
12 "github.com/grafana/regexp"
13 "github.com/prometheus/client_golang/prometheus"
14 "github.com/prometheus/client_golang/prometheus/promauto"
15 "go.uber.org/atomic"
16
17 "github.com/sourcegraph/zoekt"
18)
19
20var metricShardMergingRunning = promauto.NewGauge(prometheus.GaugeOpts{
21 Name: "index_shard_merging_running",
22 Help: "Set to 1 if indexserver's merge job is running.",
23})
24
25var metricShardMergingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
26 Name: "index_shard_merging_duration_seconds",
27 Help: "The duration of 1 shard merge operation.",
28 Buckets: prometheus.LinearBuckets(30, 30, 10),
29}, []string{"error"})
30
31func pickCandidates(shards []candidate, targetSizeBytes int64) compound {
32 c := compound{}
33 for _, shard := range shards {
34 c.add(shard)
35 if c.size >= targetSizeBytes {
36 return c
37 }
38 }
39 return compound{}
40}
41
42var mergeRunning atomic.Bool
43
44func defaultMergeCmd(args ...string) *exec.Cmd {
45 cmd := exec.Command("zoekt-merge-index", "merge")
46 cmd.Args = append(cmd.Args, args...)
47 return cmd
48}
49
50// doMerge drives the merge process. It holds the lock on s.indexDir for the
51// duration of 1 merge, which might be several minutes, depending on the target
52// size of the compound shard.
53func (s *Server) doMerge() {
54 s.merge(defaultMergeCmd)
55}
56
57// same as doMerge but with a configurable merge command.
58func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) {
59 // Guard against the user triggering competing merge jobs with the debug
60 // command.
61 if !mergeRunning.CompareAndSwap(false, true) {
62 log.Printf("merge already running")
63 return
64 }
65 defer mergeRunning.Store(false)
66
67 metricShardMergingRunning.Set(1)
68 defer metricShardMergingRunning.Set(0)
69
70 // We keep creating compound shards until we run out of shards to merge or until
71 // we encounter an error during merging.
72 next := true
73 for next {
74 next = false
75 s.muIndexDir.Global(func() {
76 candidates, excluded := loadCandidates(s.IndexDir, s.mergeOpts)
77 log.Printf("loadCandidates: candidates=%d excluded=%d", len(candidates), excluded)
78
79 c := pickCandidates(candidates, s.mergeOpts.targetSizeBytes)
80 if len(c.shards) <= 1 {
81 log.Printf("could not find enough shards to build a compound shard")
82 return
83 }
84 log.Printf("start merging: shards=%d total_size=%.2fMiB", len(c.shards), float64(c.size)/(1024*1024))
85
86 var paths []string
87 for _, p := range c.shards {
88 paths = append(paths, p.path)
89 }
90
91 start := time.Now()
92
93 cmd := mergeCmd(paths...)
94
95 // zoekt-merge-index writes the full path of the new compound shard to stdout.
96 stdoutBuf := &bytes.Buffer{}
97 stderrBuf := &bytes.Buffer{}
98 cmd.Stdout = stdoutBuf
99 cmd.Stderr = stderrBuf
100
101 err := cmd.Run()
102
103 durationSeconds := time.Since(start).Seconds()
104 metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(durationSeconds)
105 if err != nil {
106 log.Printf("error merging shards: stdout=%s, stderr=%s, durationSeconds=%.2f err=%s", stdoutBuf.String(), stderrBuf.String(), durationSeconds, err)
107 return
108 }
109
110 log.Printf("finished merging: shard=%s durationSeconds=%.2f", stdoutBuf.String(), durationSeconds)
111
112 next = true
113 })
114 }
115}
116
117type candidate struct {
118 path string
119
120 // The size as reported by os.Stat.
121 sizeBytes int64
122}
123
124// loadCandidates returns all shards eligible for merging.
125func loadCandidates(dir string, opts mergeOpts) ([]candidate, int) {
126 excluded := 0
127
128 d, err := os.Open(dir)
129 if err != nil {
130 debug.Printf("failed to load candidates: %s", dir)
131 return []candidate{}, excluded
132 }
133 defer d.Close()
134 names, _ := d.Readdirnames(-1)
135
136 candidates := make([]candidate, 0, len(names))
137 for _, n := range names {
138 path := filepath.Join(dir, n)
139
140 fi, err := os.Stat(path)
141 if err != nil {
142 debug.Printf("stat failed for %s: %s", n, err)
143 continue
144 }
145
146 if fi.IsDir() || filepath.Ext(path) != ".zoekt" {
147 continue
148 }
149
150 if isExcluded(path, fi, opts) {
151 excluded++
152 continue
153 }
154
155 candidates = append(candidates, candidate{
156 path: path,
157 sizeBytes: fi.Size(),
158 })
159 }
160 return candidates, excluded
161}
162
163var reShard = regexp.MustCompile(`\.[0-9]{5}\.zoekt$`)
164
165func hasMultipleShards(path string) bool {
166 if !reShard.MatchString(path) {
167 return false
168 }
169 secondShard := reShard.ReplaceAllString(path, ".00001.zoekt")
170 _, err := os.Stat(secondShard)
171 return !os.IsNotExist(err)
172}
173
174type mergeOpts struct {
175 // targetSizeBytes is the target size in bytes for compound shards. The higher
176 // the value the more repositories a compound shard will contain and the bigger
177 // the potential for saving MEM. The savings in MEM come at the cost of a
178 // degraded search performance.
179 targetSizeBytes int64
180
181 // compound shards smaller than minSizeBytes will be deleted by vacuum.
182 minSizeBytes int64
183
184 // vacuumInterval is how often indexserver scans compound shards to remove
185 // tombstones.
186 vacuumInterval time.Duration
187
188 // mergeInterval defines how often indexserver runs the merge operation in
189 // the index directory.
190 mergeInterval time.Duration
191
192 // number of days since the last commit until we consider the shard for
193 // merging. For example, a value of 7 means that only repos that have been
194 // inactive for 7 days will be considered for merging.
195 minAgeDays int
196}
197
198// isExcluded returns true if a shard should not be merged, false otherwise.
199//
200// We need path and FileInfo because FileInfo does not contain the full path, see
201// discussion here https://github.com/golang/go/issues/32300.
202func isExcluded(path string, fi os.FileInfo, opts mergeOpts) bool {
203 if hasMultipleShards(path) {
204 return true
205 }
206
207 repos, _, err := zoekt.ReadMetadataPath(path)
208 if err != nil {
209 debug.Printf("failed to load metadata for %s\n", fi.Name())
210 return true
211 }
212
213 // Exclude compound shards from being merge targets. Why? We want repositories in a
214 // compound shard to be ordered based on their priority. The easiest way to
215 // enforce this is to delete the compound shard once it drops below a certain
216 // size (handled by cleanup), reindex the repositories and merge them with other
217 // shards in the correct order.
218 if len(repos) > 1 {
219 return true
220 }
221
222 if repos[0].LatestCommitDate.After(time.Now().AddDate(0, 0, -opts.minAgeDays)) {
223 return true
224 }
225
226 return false
227}
228
229type compound struct {
230 shards []candidate
231 size int64
232}
233
234func (c *compound) add(cand candidate) {
235 c.shards = append(c.shards, cand)
236 c.size += cand.sizeBytes
237}