fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "log" 5 "os" 6 "os/exec" 7 "path/filepath" 8 "strconv" 9 "time" 10 11 "github.com/grafana/regexp" 12 "github.com/prometheus/client_golang/prometheus" 13 "github.com/prometheus/client_golang/prometheus/promauto" 14 "go.uber.org/atomic" 15 16 "github.com/sourcegraph/zoekt" 17) 18 19var metricShardMergingRunning = promauto.NewGauge(prometheus.GaugeOpts{ 20 Name: "index_shard_merging_running", 21 Help: "Set to 1 if indexserver's merge job is running.", 22}) 23 24var metricShardMergingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 25 Name: "index_shard_merging_duration_seconds", 26 Help: "The duration of 1 shard merge operation.", 27 Buckets: prometheus.LinearBuckets(30, 30, 10), 28}, []string{"error"}) 29 30func pickCandidates(shards []candidate, targetSizeBytes int64) compound { 31 c := compound{} 32 for _, shard := range shards { 33 c.add(shard) 34 if c.size >= targetSizeBytes { 35 return c 36 } 37 } 38 return compound{} 39} 40 41var mergeRunning atomic.Bool 42 43func defaultMergeCmd(args ...string) *exec.Cmd { 44 cmd := exec.Command("zoekt-merge-index", "merge") 45 cmd.Args = append(cmd.Args, args...) 46 return cmd 47} 48 49// doMerge drives the merge process. It holds the lock on s.indexDir for the 50// duration of 1 merge, which might be several minutes, depending on the target 51// size of the compound shard. 52func (s *Server) doMerge() { 53 s.merge(defaultMergeCmd) 54} 55 56// same as doMerge but with a configurable merge command. 57func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) { 58 // Guard against the user triggering competing merge jobs with the debug 59 // command. 60 if !mergeRunning.CompareAndSwap(false, true) { 61 log.Printf("merge already running") 62 return 63 } 64 defer mergeRunning.Store(false) 65 66 metricShardMergingRunning.Set(1) 67 defer metricShardMergingRunning.Set(0) 68 69 // We keep creating compound shards until we run out of shards to merge or until 70 // we encounter an error during merging. 71 next := true 72 for next { 73 next = false 74 s.muIndexDir.Global(func() { 75 candidates, excluded := loadCandidates(s.IndexDir, s.mergeOpts) 76 log.Printf("loadCandidates: candidates=%d excluded=%d", len(candidates), excluded) 77 78 c := pickCandidates(candidates, s.mergeOpts.targetSizeBytes) 79 if len(c.shards) <= 1 { 80 log.Printf("could not find enough shards to build a compound shard") 81 return 82 } 83 log.Printf("start merging: shards=%d total_size=%.2fMiB", len(c.shards), float64(c.size)/(1024*1024)) 84 85 var paths []string 86 for _, p := range c.shards { 87 paths = append(paths, p.path) 88 } 89 90 start := time.Now() 91 out, err := mergeCmd(paths...).CombinedOutput() 92 93 metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(time.Since(start).Seconds()) 94 if err != nil { 95 log.Printf("mergeCmd: out=%s, err=%s", out, err) 96 return 97 } 98 99 next = true 100 }) 101 } 102} 103 104type candidate struct { 105 path string 106 107 // The size as reported by os.Stat. 108 sizeBytes int64 109} 110 111// loadCandidates returns all shards eligible for merging. 112func loadCandidates(dir string, opts mergeOpts) ([]candidate, int) { 113 excluded := 0 114 115 d, err := os.Open(dir) 116 if err != nil { 117 debug.Printf("failed to load candidates: %s", dir) 118 return []candidate{}, excluded 119 } 120 defer d.Close() 121 names, _ := d.Readdirnames(-1) 122 123 candidates := make([]candidate, 0, len(names)) 124 for _, n := range names { 125 path := filepath.Join(dir, n) 126 127 fi, err := os.Stat(path) 128 if err != nil { 129 debug.Printf("stat failed for %s: %s", n, err) 130 continue 131 } 132 133 if fi.IsDir() || filepath.Ext(path) != ".zoekt" { 134 continue 135 } 136 137 if isExcluded(path, fi, opts) { 138 excluded++ 139 continue 140 } 141 142 candidates = append(candidates, candidate{ 143 path: path, 144 sizeBytes: fi.Size(), 145 }) 146 } 147 return candidates, excluded 148} 149 150var reShard = regexp.MustCompile(`\.[0-9]{5}\.zoekt$`) 151 152func hasMultipleShards(path string) bool { 153 if !reShard.MatchString(path) { 154 return false 155 } 156 secondShard := reShard.ReplaceAllString(path, ".00001.zoekt") 157 _, err := os.Stat(secondShard) 158 return !os.IsNotExist(err) 159} 160 161type mergeOpts struct { 162 // targetSizeBytes is the target size in bytes for compound shards. The higher 163 // the value the more repositories a compound shard will contain and the bigger 164 // the potential for saving MEM. The savings in MEM come at the cost of a 165 // degraded search performance. 166 targetSizeBytes int64 167 168 // compound shards smaller than minSizeBytes will be deleted by vacuum. 169 minSizeBytes int64 170 171 // vacuumInterval is how often indexserver scans compound shards to remove 172 // tombstones. 173 vacuumInterval time.Duration 174 175 // mergeInterval defines how often indexserver runs the merge operation in 176 // the index directory. 177 mergeInterval time.Duration 178 179 // number of days since the last commit until we consider the shard for 180 // merging. For example, a value of 7 means that only repos that have been 181 // inactive for 7 days will be considered for merging. 182 minAgeDays int 183 184 // the MAX priority a shard can have to be considered for merging. 185 maxPriority float64 186} 187 188// isExcluded returns true if a shard should not be merged, false otherwise. 189// 190// We need path and FileInfo because FileInfo does not contain the full path, see 191// discussion here https://github.com/golang/go/issues/32300. 192func isExcluded(path string, fi os.FileInfo, opts mergeOpts) bool { 193 if hasMultipleShards(path) { 194 return true 195 } 196 197 repos, _, err := zoekt.ReadMetadataPath(path) 198 if err != nil { 199 debug.Printf("failed to load metadata for %s\n", fi.Name()) 200 return true 201 } 202 203 // Exclude compound shards from being merge targets. Why? We want repositories in a 204 // compound shard to be ordered based on their priority. The easiest way to 205 // enforce this is to delete the compound shard once it drops below a certain 206 // size (handled by cleanup), reindex the repositories and merge them with other 207 // shards in the correct order. 208 if len(repos) > 1 { 209 return true 210 } 211 212 if repos[0].LatestCommitDate.After(time.Now().AddDate(0, 0, -opts.minAgeDays)) { 213 return true 214 } 215 216 if priority, err := strconv.ParseFloat(repos[0].RawConfig["priority"], 64); err == nil && priority > opts.maxPriority { 217 return true 218 } 219 220 return false 221} 222 223type compound struct { 224 shards []candidate 225 size int64 226} 227 228func (c *compound) add(cand candidate) { 229 c.shards = append(c.shards, cand) 230 c.size += cand.sizeBytes 231}