fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "log" 5 "os" 6 "os/exec" 7 "path/filepath" 8 "strconv" 9 "time" 10 11 "github.com/grafana/regexp" 12 "github.com/prometheus/client_golang/prometheus" 13 "github.com/prometheus/client_golang/prometheus/promauto" 14 "go.uber.org/atomic" 15 16 "github.com/sourcegraph/zoekt" 17) 18 19var reCompound = regexp.MustCompile(`compound-.*\.zoekt`) 20 21var metricShardMergingRunning = promauto.NewGauge(prometheus.GaugeOpts{ 22 Name: "index_shard_merging_running", 23 Help: "Set to 1 if indexserver's merge job is running.", 24}) 25 26var metricShardMergingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 27 Name: "index_shard_merging_duration_seconds", 28 Help: "The duration of 1 shard merge operation.", 29 Buckets: prometheus.LinearBuckets(30, 30, 10), 30}, []string{"error"}) 31 32func pickCandidates(shards []candidate, targetSizeBytes int64) compound { 33 c := compound{} 34 for _, shard := range shards { 35 c.add(shard) 36 if c.size >= targetSizeBytes { 37 return c 38 } 39 } 40 return compound{} 41} 42 43var mergeRunning atomic.Bool 44 45func defaultMergeCmd(args ...string) *exec.Cmd { 46 cmd := exec.Command("zoekt-merge-index", "merge") 47 cmd.Args = append(cmd.Args, args...) 48 return cmd 49} 50 51// doMerge drives the merge process. It holds the lock on s.indexDir for the 52// duration of 1 merge, which might be several minutes, depending on the target 53// size of the compound shard. 54func (s *Server) doMerge() { 55 s.merge(defaultMergeCmd) 56} 57 58// same as doMerge but with a configurable merge command. 59func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) { 60 61 // Guard against the user triggering competing merge jobs with the debug 62 // command. 63 if !mergeRunning.CompareAndSwap(false, true) { 64 log.Printf("merge already running") 65 return 66 } 67 defer mergeRunning.Store(false) 68 69 metricShardMergingRunning.Set(1) 70 defer metricShardMergingRunning.Set(0) 71 72 // We keep creating compound shards until we run out of shards to merge or until 73 // we encounter an error during merging. 74 next := true 75 for next { 76 next = false 77 s.muIndexDir.Global(func() { 78 candidates, excluded := loadCandidates(s.IndexDir, s.mergeOpts) 79 log.Printf("loadCandidates: candidates=%d excluded=%d", len(candidates), excluded) 80 81 c := pickCandidates(candidates, s.mergeOpts.targetSizeBytes) 82 if len(c.shards) <= 1 { 83 log.Printf("could not find enough shards to build a compound shard") 84 return 85 } 86 log.Printf("start merging: shards=%d total_size=%.2fMiB", len(c.shards), float64(c.size)/(1024*1024)) 87 88 var paths []string 89 for _, p := range c.shards { 90 paths = append(paths, p.path) 91 } 92 93 start := time.Now() 94 out, err := mergeCmd(paths...).CombinedOutput() 95 96 metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(time.Since(start).Seconds()) 97 if err != nil { 98 log.Printf("mergeCmd: out=%s, err=%s", out, err) 99 return 100 } 101 102 next = true 103 }) 104 } 105} 106 107type candidate struct { 108 path string 109 110 // The size as reported by os.Stat. 111 sizeBytes int64 112} 113 114// loadCandidates returns all shards eligible for merging. 115func loadCandidates(dir string, opts mergeOpts) ([]candidate, int) { 116 excluded := 0 117 118 d, err := os.Open(dir) 119 if err != nil { 120 debug.Printf("failed to load candidates: %s", dir) 121 return []candidate{}, excluded 122 } 123 defer d.Close() 124 names, _ := d.Readdirnames(-1) 125 126 candidates := make([]candidate, 0, len(names)) 127 for _, n := range names { 128 path := filepath.Join(dir, n) 129 130 fi, err := os.Stat(path) 131 if err != nil { 132 debug.Printf("stat failed for %s: %s", n, err) 133 continue 134 } 135 136 if fi.IsDir() || filepath.Ext(path) != ".zoekt" { 137 continue 138 } 139 140 if isExcluded(path, fi, opts) { 141 excluded++ 142 continue 143 } 144 145 candidates = append(candidates, candidate{ 146 path: path, 147 sizeBytes: fi.Size(), 148 }) 149 } 150 return candidates, excluded 151} 152 153var reShard = regexp.MustCompile(`\.[0-9]{5}\.zoekt$`) 154 155func hasMultipleShards(path string) bool { 156 if !reShard.MatchString(path) { 157 return false 158 } 159 secondShard := reShard.ReplaceAllString(path, ".00001.zoekt") 160 _, err := os.Stat(secondShard) 161 return !os.IsNotExist(err) 162} 163 164type mergeOpts struct { 165 // targetSizeBytes is the target size in bytes for compound shards. The higher 166 // the value the more repositories a compound shard will contain and the bigger 167 // the potential for saving MEM. The savings in MEM come at the cost of a 168 // degraded search performance. 169 targetSizeBytes int64 170 171 // compound shards smaller than minSizeBytes will be deleted by vacuum. 172 minSizeBytes int64 173 174 // vacuumInterval is how often indexserver scans compound shards to remove 175 // tombstones. 176 vacuumInterval time.Duration 177 178 // mergeInterval defines how often indexserver runs the merge operation in 179 // the index directory. 180 mergeInterval time.Duration 181 182 // number of days since the last commit until we consider the shard for 183 // merging. For example, a value of 7 means that only repos that have been 184 // inactive for 7 days will be considered for merging. 185 minAgeDays int 186 187 // the MAX priority a shard can have to be considered for merging. 188 maxPriority float64 189} 190 191// isExcluded returns true if a shard should not be merged, false otherwise. 192// 193// We need path and FileInfo because FileInfo does not contain the full path, see 194// discussion here https://github.com/golang/go/issues/32300. 195func isExcluded(path string, fi os.FileInfo, opts mergeOpts) bool { 196 if hasMultipleShards(path) { 197 return true 198 } 199 200 repos, _, err := zoekt.ReadMetadataPath(path) 201 if err != nil { 202 debug.Printf("failed to load metadata for %s\n", fi.Name()) 203 return true 204 } 205 206 // Exclude compound shards from being merge targets. Why? We want repositories in a 207 // compound shard to be ordered based on their priority. The easiest way to 208 // enforce this is to delete the compound shard once it drops below a certain 209 // size (handled by cleanup), reindex the repositories and merge them with other 210 // shards in the correct order. 211 if len(repos) > 1 { 212 return true 213 } 214 215 if repos[0].LatestCommitDate.After(time.Now().AddDate(0, 0, -opts.minAgeDays)) { 216 return true 217 } 218 219 if priority, err := strconv.ParseFloat(repos[0].RawConfig["priority"], 64); err == nil && priority > opts.maxPriority { 220 return true 221 } 222 223 return false 224} 225 226type compound struct { 227 shards []candidate 228 size int64 229} 230 231func (c *compound) add(cand candidate) { 232 c.shards = append(c.shards, cand) 233 c.size += cand.sizeBytes 234}