fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "log"
5 "os"
6 "os/exec"
7 "path/filepath"
8 "strconv"
9 "time"
10
11 "github.com/grafana/regexp"
12 "github.com/prometheus/client_golang/prometheus"
13 "github.com/prometheus/client_golang/prometheus/promauto"
14 "go.uber.org/atomic"
15
16 "github.com/sourcegraph/zoekt"
17)
18
19var reCompound = regexp.MustCompile(`compound-.*\.zoekt`)
20
21var metricShardMergingRunning = promauto.NewGauge(prometheus.GaugeOpts{
22 Name: "index_shard_merging_running",
23 Help: "Set to 1 if indexserver's merge job is running.",
24})
25
26var metricShardMergingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
27 Name: "index_shard_merging_duration_seconds",
28 Help: "The duration of 1 shard merge operation.",
29 Buckets: prometheus.LinearBuckets(30, 30, 10),
30}, []string{"error"})
31
32func pickCandidates(shards []candidate, targetSizeBytes int64) compound {
33 c := compound{}
34 for _, shard := range shards {
35 c.add(shard)
36 if c.size >= targetSizeBytes {
37 return c
38 }
39 }
40 return compound{}
41}
42
43var mergeRunning atomic.Bool
44
45func defaultMergeCmd(args ...string) *exec.Cmd {
46 cmd := exec.Command("zoekt-merge-index", "merge")
47 cmd.Args = append(cmd.Args, args...)
48 return cmd
49}
50
51// doMerge drives the merge process. It holds the lock on s.indexDir for the
52// duration of 1 merge, which might be several minutes, depending on the target
53// size of the compound shard.
54func (s *Server) doMerge() {
55 s.merge(defaultMergeCmd)
56}
57
58// same as doMerge but with a configurable merge command.
59func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) {
60 // Guard against the user triggering competing merge jobs with the debug
61 // command.
62 if !mergeRunning.CompareAndSwap(false, true) {
63 log.Printf("merge already running")
64 return
65 }
66 defer mergeRunning.Store(false)
67
68 metricShardMergingRunning.Set(1)
69 defer metricShardMergingRunning.Set(0)
70
71 // We keep creating compound shards until we run out of shards to merge or until
72 // we encounter an error during merging.
73 next := true
74 for next {
75 next = false
76 s.muIndexDir.Global(func() {
77 candidates, excluded := loadCandidates(s.IndexDir, s.mergeOpts)
78 log.Printf("loadCandidates: candidates=%d excluded=%d", len(candidates), excluded)
79
80 c := pickCandidates(candidates, s.mergeOpts.targetSizeBytes)
81 if len(c.shards) <= 1 {
82 log.Printf("could not find enough shards to build a compound shard")
83 return
84 }
85 log.Printf("start merging: shards=%d total_size=%.2fMiB", len(c.shards), float64(c.size)/(1024*1024))
86
87 var paths []string
88 for _, p := range c.shards {
89 paths = append(paths, p.path)
90 }
91
92 start := time.Now()
93 out, err := mergeCmd(paths...).CombinedOutput()
94
95 metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(time.Since(start).Seconds())
96 if err != nil {
97 log.Printf("mergeCmd: out=%s, err=%s", out, err)
98 return
99 }
100
101 next = true
102 })
103 }
104}
105
106type candidate struct {
107 path string
108
109 // The size as reported by os.Stat.
110 sizeBytes int64
111}
112
113// loadCandidates returns all shards eligible for merging.
114func loadCandidates(dir string, opts mergeOpts) ([]candidate, int) {
115 excluded := 0
116
117 d, err := os.Open(dir)
118 if err != nil {
119 debug.Printf("failed to load candidates: %s", dir)
120 return []candidate{}, excluded
121 }
122 defer d.Close()
123 names, _ := d.Readdirnames(-1)
124
125 candidates := make([]candidate, 0, len(names))
126 for _, n := range names {
127 path := filepath.Join(dir, n)
128
129 fi, err := os.Stat(path)
130 if err != nil {
131 debug.Printf("stat failed for %s: %s", n, err)
132 continue
133 }
134
135 if fi.IsDir() || filepath.Ext(path) != ".zoekt" {
136 continue
137 }
138
139 if isExcluded(path, fi, opts) {
140 excluded++
141 continue
142 }
143
144 candidates = append(candidates, candidate{
145 path: path,
146 sizeBytes: fi.Size(),
147 })
148 }
149 return candidates, excluded
150}
151
152var reShard = regexp.MustCompile(`\.[0-9]{5}\.zoekt$`)
153
154func hasMultipleShards(path string) bool {
155 if !reShard.MatchString(path) {
156 return false
157 }
158 secondShard := reShard.ReplaceAllString(path, ".00001.zoekt")
159 _, err := os.Stat(secondShard)
160 return !os.IsNotExist(err)
161}
162
163type mergeOpts struct {
164 // targetSizeBytes is the target size in bytes for compound shards. The higher
165 // the value the more repositories a compound shard will contain and the bigger
166 // the potential for saving MEM. The savings in MEM come at the cost of a
167 // degraded search performance.
168 targetSizeBytes int64
169
170 // compound shards smaller than minSizeBytes will be deleted by vacuum.
171 minSizeBytes int64
172
173 // vacuumInterval is how often indexserver scans compound shards to remove
174 // tombstones.
175 vacuumInterval time.Duration
176
177 // mergeInterval defines how often indexserver runs the merge operation in
178 // the index directory.
179 mergeInterval time.Duration
180
181 // number of days since the last commit until we consider the shard for
182 // merging. For example, a value of 7 means that only repos that have been
183 // inactive for 7 days will be considered for merging.
184 minAgeDays int
185
186 // the MAX priority a shard can have to be considered for merging.
187 maxPriority float64
188}
189
190// isExcluded returns true if a shard should not be merged, false otherwise.
191//
192// We need path and FileInfo because FileInfo does not contain the full path, see
193// discussion here https://github.com/golang/go/issues/32300.
194func isExcluded(path string, fi os.FileInfo, opts mergeOpts) bool {
195 if hasMultipleShards(path) {
196 return true
197 }
198
199 repos, _, err := zoekt.ReadMetadataPath(path)
200 if err != nil {
201 debug.Printf("failed to load metadata for %s\n", fi.Name())
202 return true
203 }
204
205 // Exclude compound shards from being merge targets. Why? We want repositories in a
206 // compound shard to be ordered based on their priority. The easiest way to
207 // enforce this is to delete the compound shard once it drops below a certain
208 // size (handled by cleanup), reindex the repositories and merge them with other
209 // shards in the correct order.
210 if len(repos) > 1 {
211 return true
212 }
213
214 if repos[0].LatestCommitDate.After(time.Now().AddDate(0, 0, -opts.minAgeDays)) {
215 return true
216 }
217
218 if priority, err := strconv.ParseFloat(repos[0].RawConfig["priority"], 64); err == nil && priority > opts.maxPriority {
219 return true
220 }
221
222 return false
223}
224
225type compound struct {
226 shards []candidate
227 size int64
228}
229
230func (c *compound) add(cand candidate) {
231 c.shards = append(c.shards, cand)
232 c.size += cand.sizeBytes
233}