fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "fmt" 5 "log" 6 "os" 7 "os/exec" 8 "path/filepath" 9 "strconv" 10 "time" 11 12 "github.com/grafana/regexp" 13 "github.com/prometheus/client_golang/prometheus" 14 "github.com/prometheus/client_golang/prometheus/promauto" 15 "go.uber.org/atomic" 16 "gopkg.in/natefinch/lumberjack.v2" 17 18 "github.com/sourcegraph/zoekt" 19) 20 21var reCompound = regexp.MustCompile(`compound-.*\.zoekt`) 22 23var metricShardMergingRunning = promauto.NewGauge(prometheus.GaugeOpts{ 24 Name: "index_shard_merging_running", 25 Help: "Set to 1 if indexserver's merge job is running.", 26}) 27 28var metricShardMergingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 29 Name: "index_shard_merging_duration_seconds", 30 Help: "The duration of 1 shard merge operation.", 31 Buckets: prometheus.LinearBuckets(30, 30, 10), 32}, []string{"error"}) 33 34func pickCandidates(shards []candidate, targetSizeBytes int64) compound { 35 c := compound{} 36 for _, shard := range shards { 37 c.add(shard) 38 if c.size >= targetSizeBytes { 39 return c 40 } 41 } 42 return compound{} 43} 44 45var mergeRunning atomic.Bool 46 47func defaultMergeCmd(args ...string) *exec.Cmd { 48 cmd := exec.Command("zoekt-merge-index", "merge") 49 cmd.Args = append(cmd.Args, args...) 50 return cmd 51} 52 53// doMerge drives the merge process. It holds the lock on s.indexDir for the 54// duration of 1 merge, which might be several minutes, depending on the target 55// size of the compound shard. 56func (s *Server) doMerge() { 57 s.merge(defaultMergeCmd) 58} 59 60// same as doMerge but with a configurable merge command. 61func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) { 62 63 // Guard against the user triggering competing merge jobs with the debug 64 // command. 65 if !mergeRunning.CompareAndSwap(false, true) { 66 log.Printf("merge already running") 67 return 68 } 69 defer mergeRunning.Store(false) 70 71 metricShardMergingRunning.Set(1) 72 defer metricShardMergingRunning.Set(0) 73 74 wc := &lumberjack.Logger{ 75 Filename: filepath.Join(s.IndexDir, "zoekt-merge-log.tsv"), 76 MaxSize: 100, // Megabyte 77 MaxBackups: 5, 78 } 79 80 // We keep creating compound shards until we run out of shards to merge or until 81 // we encounter an error during merging. 82 next := true 83 for next { 84 next = false 85 s.muIndexDir.Global(func() { 86 candidates, excluded := loadCandidates(s.IndexDir, s.mergeOpts) 87 log.Printf("loadCandidates: candidates=%d excluded=%d", len(candidates), excluded) 88 89 c := pickCandidates(candidates, s.mergeOpts.targetSizeBytes) 90 if len(c.shards) <= 1 { 91 log.Printf("could not find enough shards to build a compound shard") 92 return 93 } 94 log.Printf("start merging: shards=%d total_size=%.2fMiB", len(c.shards), float64(c.size)/(1024*1024)) 95 96 var paths []string 97 for _, p := range c.shards { 98 paths = append(paths, p.path) 99 } 100 101 start := time.Now() 102 out, err := mergeCmd(paths...).CombinedOutput() 103 104 metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(time.Since(start).Seconds()) 105 if err != nil { 106 log.Printf("mergeCmd: out=%s, err=%s", out, err) 107 return 108 } 109 110 newCompoundName := reCompound.Find(out) 111 now := time.Now() 112 for _, s := range c.shards { 113 _, _ = fmt.Fprintf(wc, "%s\t%s\t%s\t%s\n", now.UTC().Format(time.RFC3339), "merge", filepath.Base(s.path), string(newCompoundName)) 114 } 115 116 next = true 117 }) 118 } 119} 120 121type candidate struct { 122 path string 123 124 // The size as reported by os.Stat. 125 sizeBytes int64 126} 127 128// loadCandidates returns all shards eligible for merging. 129func loadCandidates(dir string, opts mergeOpts) ([]candidate, int) { 130 excluded := 0 131 132 d, err := os.Open(dir) 133 if err != nil { 134 debug.Printf("failed to load candidates: %s", dir) 135 return []candidate{}, excluded 136 } 137 defer d.Close() 138 names, _ := d.Readdirnames(-1) 139 140 candidates := make([]candidate, 0, len(names)) 141 for _, n := range names { 142 path := filepath.Join(dir, n) 143 144 fi, err := os.Stat(path) 145 if err != nil { 146 debug.Printf("stat failed for %s: %s", n, err) 147 continue 148 } 149 150 if fi.IsDir() || filepath.Ext(path) != ".zoekt" { 151 continue 152 } 153 154 if isExcluded(path, fi, opts) { 155 excluded++ 156 continue 157 } 158 159 candidates = append(candidates, candidate{ 160 path: path, 161 sizeBytes: fi.Size(), 162 }) 163 } 164 return candidates, excluded 165} 166 167var reShard = regexp.MustCompile(`\.[0-9]{5}\.zoekt$`) 168 169func hasMultipleShards(path string) bool { 170 if !reShard.MatchString(path) { 171 return false 172 } 173 secondShard := reShard.ReplaceAllString(path, ".00001.zoekt") 174 _, err := os.Stat(secondShard) 175 return !os.IsNotExist(err) 176} 177 178type mergeOpts struct { 179 // targetSizeBytes is the target size in bytes for compound shards. The higher 180 // the value the more repositories a compound shard will contain and the bigger 181 // the potential for saving MEM. The savings in MEM come at the cost of a 182 // degraded search performance. 183 targetSizeBytes int64 184 185 // compound shards smaller than minSizeBytes will be deleted by vacuum. 186 minSizeBytes int64 187 188 // vacuumInterval is how often indexserver scans compound shards to remove 189 // tombstones. 190 vacuumInterval time.Duration 191 192 // mergeInterval defines how often indexserver runs the merge operation in 193 // the index directory. 194 mergeInterval time.Duration 195 196 // number of days since the last commit until we consider the shard for 197 // merging. For example, a value of 7 means that only repos that have been 198 // inactive for 7 days will be considered for merging. 199 minAgeDays int 200 201 // the MAX priority a shard can have to be considered for merging. 202 maxPriority float64 203} 204 205// isExcluded returns true if a shard should not be merged, false otherwise. 206// 207// We need path and FileInfo because FileInfo does not contain the full path, see 208// discussion here https://github.com/golang/go/issues/32300. 209func isExcluded(path string, fi os.FileInfo, opts mergeOpts) bool { 210 if hasMultipleShards(path) { 211 return true 212 } 213 214 repos, _, err := zoekt.ReadMetadataPath(path) 215 if err != nil { 216 debug.Printf("failed to load metadata for %s\n", fi.Name()) 217 return true 218 } 219 220 // Exclude compound shards from being merge targets. Why? We want repositories in a 221 // compound shard to be ordered based on their priority. The easiest way to 222 // enforce this is to delete the compound shard once it drops below a certain 223 // size (handled by cleanup), reindex the repositories and merge them with other 224 // shards in the correct order. 225 if len(repos) > 1 { 226 return true 227 } 228 229 if repos[0].LatestCommitDate.After(time.Now().AddDate(0, 0, -opts.minAgeDays)) { 230 return true 231 } 232 233 if priority, err := strconv.ParseFloat(repos[0].RawConfig["priority"], 64); err == nil && priority > opts.maxPriority { 234 return true 235 } 236 237 return false 238} 239 240type compound struct { 241 shards []candidate 242 size int64 243} 244 245func (c *compound) add(cand candidate) { 246 c.shards = append(c.shards, cand) 247 c.size += cand.sizeBytes 248}