fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "fmt"
5 "log"
6 "os"
7 "os/exec"
8 "path/filepath"
9 "strconv"
10 "time"
11
12 "github.com/grafana/regexp"
13 "github.com/prometheus/client_golang/prometheus"
14 "github.com/prometheus/client_golang/prometheus/promauto"
15 "go.uber.org/atomic"
16 "gopkg.in/natefinch/lumberjack.v2"
17
18 "github.com/sourcegraph/zoekt"
19)
20
21var reCompound = regexp.MustCompile(`compound-.*\.zoekt`)
22
23var metricShardMergingRunning = promauto.NewGauge(prometheus.GaugeOpts{
24 Name: "index_shard_merging_running",
25 Help: "Set to 1 if indexserver's merge job is running.",
26})
27
28var metricShardMergingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
29 Name: "index_shard_merging_duration_seconds",
30 Help: "The duration of 1 shard merge operation.",
31 Buckets: prometheus.LinearBuckets(30, 30, 10),
32}, []string{"error"})
33
34func pickCandidates(shards []candidate, targetSizeBytes int64) compound {
35 c := compound{}
36 for _, shard := range shards {
37 c.add(shard)
38 if c.size >= targetSizeBytes {
39 return c
40 }
41 }
42 return compound{}
43}
44
45var mergeRunning atomic.Bool
46
47func defaultMergeCmd(args ...string) *exec.Cmd {
48 cmd := exec.Command("zoekt-merge-index", "merge")
49 cmd.Args = append(cmd.Args, args...)
50 return cmd
51}
52
53// doMerge drives the merge process. It holds the lock on s.indexDir for the
54// duration of 1 merge, which might be several minutes, depending on the target
55// size of the compound shard.
56func (s *Server) doMerge() {
57 s.merge(defaultMergeCmd)
58}
59
60// same as doMerge but with a configurable merge command.
61func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) {
62
63 // Guard against the user triggering competing merge jobs with the debug
64 // command.
65 if !mergeRunning.CompareAndSwap(false, true) {
66 log.Printf("merge already running")
67 return
68 }
69 defer mergeRunning.Store(false)
70
71 metricShardMergingRunning.Set(1)
72 defer metricShardMergingRunning.Set(0)
73
74 wc := &lumberjack.Logger{
75 Filename: filepath.Join(s.IndexDir, "zoekt-merge-log.tsv"),
76 MaxSize: 100, // Megabyte
77 MaxBackups: 5,
78 }
79
80 // We keep creating compound shards until we run out of shards to merge or until
81 // we encounter an error during merging.
82 next := true
83 for next {
84 next = false
85 s.muIndexDir.Global(func() {
86 candidates, excluded := loadCandidates(s.IndexDir, s.mergeOpts)
87 log.Printf("loadCandidates: candidates=%d excluded=%d", len(candidates), excluded)
88
89 c := pickCandidates(candidates, s.mergeOpts.targetSizeBytes)
90 if len(c.shards) <= 1 {
91 log.Printf("could not find enough shards to build a compound shard")
92 return
93 }
94 log.Printf("start merging: shards=%d total_size=%.2fMiB", len(c.shards), float64(c.size)/(1024*1024))
95
96 var paths []string
97 for _, p := range c.shards {
98 paths = append(paths, p.path)
99 }
100
101 start := time.Now()
102 out, err := mergeCmd(paths...).CombinedOutput()
103
104 metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(time.Since(start).Seconds())
105 if err != nil {
106 log.Printf("mergeCmd: out=%s, err=%s", out, err)
107 return
108 }
109
110 newCompoundName := reCompound.Find(out)
111 now := time.Now()
112 for _, s := range c.shards {
113 _, _ = fmt.Fprintf(wc, "%s\t%s\t%s\t%s\n", now.UTC().Format(time.RFC3339), "merge", filepath.Base(s.path), string(newCompoundName))
114 }
115
116 next = true
117 })
118 }
119}
120
121type candidate struct {
122 path string
123
124 // The size as reported by os.Stat.
125 sizeBytes int64
126}
127
128// loadCandidates returns all shards eligible for merging.
129func loadCandidates(dir string, opts mergeOpts) ([]candidate, int) {
130 excluded := 0
131
132 d, err := os.Open(dir)
133 if err != nil {
134 debug.Printf("failed to load candidates: %s", dir)
135 return []candidate{}, excluded
136 }
137 defer d.Close()
138 names, _ := d.Readdirnames(-1)
139
140 candidates := make([]candidate, 0, len(names))
141 for _, n := range names {
142 path := filepath.Join(dir, n)
143
144 fi, err := os.Stat(path)
145 if err != nil {
146 debug.Printf("stat failed for %s: %s", n, err)
147 continue
148 }
149
150 if fi.IsDir() || filepath.Ext(path) != ".zoekt" {
151 continue
152 }
153
154 if isExcluded(path, fi, opts) {
155 excluded++
156 continue
157 }
158
159 candidates = append(candidates, candidate{
160 path: path,
161 sizeBytes: fi.Size(),
162 })
163 }
164 return candidates, excluded
165}
166
167var reShard = regexp.MustCompile(`\.[0-9]{5}\.zoekt$`)
168
169func hasMultipleShards(path string) bool {
170 if !reShard.MatchString(path) {
171 return false
172 }
173 secondShard := reShard.ReplaceAllString(path, ".00001.zoekt")
174 _, err := os.Stat(secondShard)
175 return !os.IsNotExist(err)
176}
177
178type mergeOpts struct {
179 // targetSizeBytes is the target size in bytes for compound shards. The higher
180 // the value the more repositories a compound shard will contain and the bigger
181 // the potential for saving MEM. The savings in MEM come at the cost of a
182 // degraded search performance.
183 targetSizeBytes int64
184
185 // compound shards smaller than minSizeBytes will be deleted by vacuum.
186 minSizeBytes int64
187
188 // vacuumInterval is how often indexserver scans compound shards to remove
189 // tombstones.
190 vacuumInterval time.Duration
191
192 // mergeInterval defines how often indexserver runs the merge operation in
193 // the index directory.
194 mergeInterval time.Duration
195
196 // number of days since the last commit until we consider the shard for
197 // merging. For example, a value of 7 means that only repos that have been
198 // inactive for 7 days will be considered for merging.
199 minAgeDays int
200
201 // the MAX priority a shard can have to be considered for merging.
202 maxPriority float64
203}
204
205// isExcluded returns true if a shard should not be merged, false otherwise.
206//
207// We need path and FileInfo because FileInfo does not contain the full path, see
208// discussion here https://github.com/golang/go/issues/32300.
209func isExcluded(path string, fi os.FileInfo, opts mergeOpts) bool {
210 if hasMultipleShards(path) {
211 return true
212 }
213
214 repos, _, err := zoekt.ReadMetadataPath(path)
215 if err != nil {
216 debug.Printf("failed to load metadata for %s\n", fi.Name())
217 return true
218 }
219
220 // Exclude compound shards from being merge targets. Why? We want repositories in a
221 // compound shard to be ordered based on their priority. The easiest way to
222 // enforce this is to delete the compound shard once it drops below a certain
223 // size (handled by cleanup), reindex the repositories and merge them with other
224 // shards in the correct order.
225 if len(repos) > 1 {
226 return true
227 }
228
229 if repos[0].LatestCommitDate.After(time.Now().AddDate(0, 0, -opts.minAgeDays)) {
230 return true
231 }
232
233 if priority, err := strconv.ParseFloat(repos[0].RawConfig["priority"], 64); err == nil && priority > opts.maxPriority {
234 return true
235 }
236
237 return false
238}
239
240type compound struct {
241 shards []candidate
242 size int64
243}
244
245func (c *compound) add(cand candidate) {
246 c.shards = append(c.shards, cand)
247 c.size += cand.sizeBytes
248}