fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "log"
5 "os"
6 "os/exec"
7 "path/filepath"
8 "strconv"
9 "time"
10
11 "github.com/grafana/regexp"
12 "github.com/prometheus/client_golang/prometheus"
13 "github.com/prometheus/client_golang/prometheus/promauto"
14 "go.uber.org/atomic"
15
16 "github.com/sourcegraph/zoekt"
17)
18
19var metricShardMergingRunning = promauto.NewGauge(prometheus.GaugeOpts{
20 Name: "index_shard_merging_running",
21 Help: "Set to 1 if indexserver's merge job is running.",
22})
23
24var metricShardMergingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
25 Name: "index_shard_merging_duration_seconds",
26 Help: "The duration of 1 shard merge operation.",
27 Buckets: prometheus.LinearBuckets(30, 30, 10),
28}, []string{"error"})
29
30func pickCandidates(shards []candidate, targetSizeBytes int64) compound {
31 c := compound{}
32 for _, shard := range shards {
33 c.add(shard)
34 if c.size >= targetSizeBytes {
35 return c
36 }
37 }
38 return compound{}
39}
40
41var mergeRunning atomic.Bool
42
43func defaultMergeCmd(args ...string) *exec.Cmd {
44 cmd := exec.Command("zoekt-merge-index", "merge")
45 cmd.Args = append(cmd.Args, args...)
46 return cmd
47}
48
49// doMerge drives the merge process. It holds the lock on s.indexDir for the
50// duration of 1 merge, which might be several minutes, depending on the target
51// size of the compound shard.
52func (s *Server) doMerge() {
53 s.merge(defaultMergeCmd)
54}
55
56// same as doMerge but with a configurable merge command.
57func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) {
58 // Guard against the user triggering competing merge jobs with the debug
59 // command.
60 if !mergeRunning.CompareAndSwap(false, true) {
61 log.Printf("merge already running")
62 return
63 }
64 defer mergeRunning.Store(false)
65
66 metricShardMergingRunning.Set(1)
67 defer metricShardMergingRunning.Set(0)
68
69 // We keep creating compound shards until we run out of shards to merge or until
70 // we encounter an error during merging.
71 next := true
72 for next {
73 next = false
74 s.muIndexDir.Global(func() {
75 candidates, excluded := loadCandidates(s.IndexDir, s.mergeOpts)
76 log.Printf("loadCandidates: candidates=%d excluded=%d", len(candidates), excluded)
77
78 c := pickCandidates(candidates, s.mergeOpts.targetSizeBytes)
79 if len(c.shards) <= 1 {
80 log.Printf("could not find enough shards to build a compound shard")
81 return
82 }
83 log.Printf("start merging: shards=%d total_size=%.2fMiB", len(c.shards), float64(c.size)/(1024*1024))
84
85 var paths []string
86 for _, p := range c.shards {
87 paths = append(paths, p.path)
88 }
89
90 start := time.Now()
91 out, err := mergeCmd(paths...).CombinedOutput()
92
93 metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(time.Since(start).Seconds())
94 if err != nil {
95 log.Printf("mergeCmd: out=%s, err=%s", out, err)
96 return
97 }
98
99 next = true
100 })
101 }
102}
103
104type candidate struct {
105 path string
106
107 // The size as reported by os.Stat.
108 sizeBytes int64
109}
110
111// loadCandidates returns all shards eligible for merging.
112func loadCandidates(dir string, opts mergeOpts) ([]candidate, int) {
113 excluded := 0
114
115 d, err := os.Open(dir)
116 if err != nil {
117 debug.Printf("failed to load candidates: %s", dir)
118 return []candidate{}, excluded
119 }
120 defer d.Close()
121 names, _ := d.Readdirnames(-1)
122
123 candidates := make([]candidate, 0, len(names))
124 for _, n := range names {
125 path := filepath.Join(dir, n)
126
127 fi, err := os.Stat(path)
128 if err != nil {
129 debug.Printf("stat failed for %s: %s", n, err)
130 continue
131 }
132
133 if fi.IsDir() || filepath.Ext(path) != ".zoekt" {
134 continue
135 }
136
137 if isExcluded(path, fi, opts) {
138 excluded++
139 continue
140 }
141
142 candidates = append(candidates, candidate{
143 path: path,
144 sizeBytes: fi.Size(),
145 })
146 }
147 return candidates, excluded
148}
149
150var reShard = regexp.MustCompile(`\.[0-9]{5}\.zoekt$`)
151
152func hasMultipleShards(path string) bool {
153 if !reShard.MatchString(path) {
154 return false
155 }
156 secondShard := reShard.ReplaceAllString(path, ".00001.zoekt")
157 _, err := os.Stat(secondShard)
158 return !os.IsNotExist(err)
159}
160
161type mergeOpts struct {
162 // targetSizeBytes is the target size in bytes for compound shards. The higher
163 // the value the more repositories a compound shard will contain and the bigger
164 // the potential for saving MEM. The savings in MEM come at the cost of a
165 // degraded search performance.
166 targetSizeBytes int64
167
168 // compound shards smaller than minSizeBytes will be deleted by vacuum.
169 minSizeBytes int64
170
171 // vacuumInterval is how often indexserver scans compound shards to remove
172 // tombstones.
173 vacuumInterval time.Duration
174
175 // mergeInterval defines how often indexserver runs the merge operation in
176 // the index directory.
177 mergeInterval time.Duration
178
179 // number of days since the last commit until we consider the shard for
180 // merging. For example, a value of 7 means that only repos that have been
181 // inactive for 7 days will be considered for merging.
182 minAgeDays int
183
184 // the MAX priority a shard can have to be considered for merging.
185 maxPriority float64
186}
187
188// isExcluded returns true if a shard should not be merged, false otherwise.
189//
190// We need path and FileInfo because FileInfo does not contain the full path, see
191// discussion here https://github.com/golang/go/issues/32300.
192func isExcluded(path string, fi os.FileInfo, opts mergeOpts) bool {
193 if hasMultipleShards(path) {
194 return true
195 }
196
197 repos, _, err := zoekt.ReadMetadataPath(path)
198 if err != nil {
199 debug.Printf("failed to load metadata for %s\n", fi.Name())
200 return true
201 }
202
203 // Exclude compound shards from being merge targets. Why? We want repositories in a
204 // compound shard to be ordered based on their priority. The easiest way to
205 // enforce this is to delete the compound shard once it drops below a certain
206 // size (handled by cleanup), reindex the repositories and merge them with other
207 // shards in the correct order.
208 if len(repos) > 1 {
209 return true
210 }
211
212 if repos[0].LatestCommitDate.After(time.Now().AddDate(0, 0, -opts.minAgeDays)) {
213 return true
214 }
215
216 if priority, err := strconv.ParseFloat(repos[0].RawConfig["priority"], 64); err == nil && priority > opts.maxPriority {
217 return true
218 }
219
220 return false
221}
222
223type compound struct {
224 shards []candidate
225 size int64
226}
227
228func (c *compound) add(cand candidate) {
229 c.shards = append(c.shards, cand)
230 c.size += cand.sizeBytes
231}