fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "log"
5 "os"
6 "os/exec"
7 "path/filepath"
8 "strconv"
9 "time"
10
11 "github.com/grafana/regexp"
12 "github.com/prometheus/client_golang/prometheus"
13 "github.com/prometheus/client_golang/prometheus/promauto"
14 "go.uber.org/atomic"
15
16 "github.com/sourcegraph/zoekt"
17)
18
19var reCompound = regexp.MustCompile(`compound-.*\.zoekt`)
20
21var metricShardMergingRunning = promauto.NewGauge(prometheus.GaugeOpts{
22 Name: "index_shard_merging_running",
23 Help: "Set to 1 if indexserver's merge job is running.",
24})
25
26var metricShardMergingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
27 Name: "index_shard_merging_duration_seconds",
28 Help: "The duration of 1 shard merge operation.",
29 Buckets: prometheus.LinearBuckets(30, 30, 10),
30}, []string{"error"})
31
32func pickCandidates(shards []candidate, targetSizeBytes int64) compound {
33 c := compound{}
34 for _, shard := range shards {
35 c.add(shard)
36 if c.size >= targetSizeBytes {
37 return c
38 }
39 }
40 return compound{}
41}
42
43var mergeRunning atomic.Bool
44
45func defaultMergeCmd(args ...string) *exec.Cmd {
46 cmd := exec.Command("zoekt-merge-index", "merge")
47 cmd.Args = append(cmd.Args, args...)
48 return cmd
49}
50
51// doMerge drives the merge process. It holds the lock on s.indexDir for the
52// duration of 1 merge, which might be several minutes, depending on the target
53// size of the compound shard.
54func (s *Server) doMerge() {
55 s.merge(defaultMergeCmd)
56}
57
58// same as doMerge but with a configurable merge command.
59func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) {
60
61 // Guard against the user triggering competing merge jobs with the debug
62 // command.
63 if !mergeRunning.CompareAndSwap(false, true) {
64 log.Printf("merge already running")
65 return
66 }
67 defer mergeRunning.Store(false)
68
69 metricShardMergingRunning.Set(1)
70 defer metricShardMergingRunning.Set(0)
71
72 // We keep creating compound shards until we run out of shards to merge or until
73 // we encounter an error during merging.
74 next := true
75 for next {
76 next = false
77 s.muIndexDir.Global(func() {
78 candidates, excluded := loadCandidates(s.IndexDir, s.mergeOpts)
79 log.Printf("loadCandidates: candidates=%d excluded=%d", len(candidates), excluded)
80
81 c := pickCandidates(candidates, s.mergeOpts.targetSizeBytes)
82 if len(c.shards) <= 1 {
83 log.Printf("could not find enough shards to build a compound shard")
84 return
85 }
86 log.Printf("start merging: shards=%d total_size=%.2fMiB", len(c.shards), float64(c.size)/(1024*1024))
87
88 var paths []string
89 for _, p := range c.shards {
90 paths = append(paths, p.path)
91 }
92
93 start := time.Now()
94 out, err := mergeCmd(paths...).CombinedOutput()
95
96 metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(time.Since(start).Seconds())
97 if err != nil {
98 log.Printf("mergeCmd: out=%s, err=%s", out, err)
99 return
100 }
101
102 next = true
103 })
104 }
105}
106
107type candidate struct {
108 path string
109
110 // The size as reported by os.Stat.
111 sizeBytes int64
112}
113
114// loadCandidates returns all shards eligible for merging.
115func loadCandidates(dir string, opts mergeOpts) ([]candidate, int) {
116 excluded := 0
117
118 d, err := os.Open(dir)
119 if err != nil {
120 debug.Printf("failed to load candidates: %s", dir)
121 return []candidate{}, excluded
122 }
123 defer d.Close()
124 names, _ := d.Readdirnames(-1)
125
126 candidates := make([]candidate, 0, len(names))
127 for _, n := range names {
128 path := filepath.Join(dir, n)
129
130 fi, err := os.Stat(path)
131 if err != nil {
132 debug.Printf("stat failed for %s: %s", n, err)
133 continue
134 }
135
136 if fi.IsDir() || filepath.Ext(path) != ".zoekt" {
137 continue
138 }
139
140 if isExcluded(path, fi, opts) {
141 excluded++
142 continue
143 }
144
145 candidates = append(candidates, candidate{
146 path: path,
147 sizeBytes: fi.Size(),
148 })
149 }
150 return candidates, excluded
151}
152
153var reShard = regexp.MustCompile(`\.[0-9]{5}\.zoekt$`)
154
155func hasMultipleShards(path string) bool {
156 if !reShard.MatchString(path) {
157 return false
158 }
159 secondShard := reShard.ReplaceAllString(path, ".00001.zoekt")
160 _, err := os.Stat(secondShard)
161 return !os.IsNotExist(err)
162}
163
164type mergeOpts struct {
165 // targetSizeBytes is the target size in bytes for compound shards. The higher
166 // the value the more repositories a compound shard will contain and the bigger
167 // the potential for saving MEM. The savings in MEM come at the cost of a
168 // degraded search performance.
169 targetSizeBytes int64
170
171 // compound shards smaller than minSizeBytes will be deleted by vacuum.
172 minSizeBytes int64
173
174 // vacuumInterval is how often indexserver scans compound shards to remove
175 // tombstones.
176 vacuumInterval time.Duration
177
178 // mergeInterval defines how often indexserver runs the merge operation in
179 // the index directory.
180 mergeInterval time.Duration
181
182 // number of days since the last commit until we consider the shard for
183 // merging. For example, a value of 7 means that only repos that have been
184 // inactive for 7 days will be considered for merging.
185 minAgeDays int
186
187 // the MAX priority a shard can have to be considered for merging.
188 maxPriority float64
189}
190
191// isExcluded returns true if a shard should not be merged, false otherwise.
192//
193// We need path and FileInfo because FileInfo does not contain the full path, see
194// discussion here https://github.com/golang/go/issues/32300.
195func isExcluded(path string, fi os.FileInfo, opts mergeOpts) bool {
196 if hasMultipleShards(path) {
197 return true
198 }
199
200 repos, _, err := zoekt.ReadMetadataPath(path)
201 if err != nil {
202 debug.Printf("failed to load metadata for %s\n", fi.Name())
203 return true
204 }
205
206 // Exclude compound shards from being merge targets. Why? We want repositories in a
207 // compound shard to be ordered based on their priority. The easiest way to
208 // enforce this is to delete the compound shard once it drops below a certain
209 // size (handled by cleanup), reindex the repositories and merge them with other
210 // shards in the correct order.
211 if len(repos) > 1 {
212 return true
213 }
214
215 if repos[0].LatestCommitDate.After(time.Now().AddDate(0, 0, -opts.minAgeDays)) {
216 return true
217 }
218
219 if priority, err := strconv.ParseFloat(repos[0].RawConfig["priority"], 64); err == nil && priority > opts.maxPriority {
220 return true
221 }
222
223 return false
224}
225
226type compound struct {
227 shards []candidate
228 size int64
229}
230
231func (c *compound) add(cand candidate) {
232 c.shards = append(c.shards, cand)
233 c.size += cand.sizeBytes
234}