fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "bytes" 5 "log" 6 "os" 7 "os/exec" 8 "path/filepath" 9 "strconv" 10 "time" 11 12 "github.com/grafana/regexp" 13 "github.com/prometheus/client_golang/prometheus" 14 "github.com/prometheus/client_golang/prometheus/promauto" 15 "go.uber.org/atomic" 16 17 "github.com/sourcegraph/zoekt" 18) 19 20var metricShardMergingRunning = promauto.NewGauge(prometheus.GaugeOpts{ 21 Name: "index_shard_merging_running", 22 Help: "Set to 1 if indexserver's merge job is running.", 23}) 24 25var metricShardMergingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 26 Name: "index_shard_merging_duration_seconds", 27 Help: "The duration of 1 shard merge operation.", 28 Buckets: prometheus.LinearBuckets(30, 30, 10), 29}, []string{"error"}) 30 31func pickCandidates(shards []candidate, targetSizeBytes int64) compound { 32 c := compound{} 33 for _, shard := range shards { 34 c.add(shard) 35 if c.size >= targetSizeBytes { 36 return c 37 } 38 } 39 return compound{} 40} 41 42var mergeRunning atomic.Bool 43 44func defaultMergeCmd(args ...string) *exec.Cmd { 45 cmd := exec.Command("zoekt-merge-index", "merge") 46 cmd.Args = append(cmd.Args, args...) 47 return cmd 48} 49 50// doMerge drives the merge process. It holds the lock on s.indexDir for the 51// duration of 1 merge, which might be several minutes, depending on the target 52// size of the compound shard. 53func (s *Server) doMerge() { 54 s.merge(defaultMergeCmd) 55} 56 57// same as doMerge but with a configurable merge command. 58func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) { 59 // Guard against the user triggering competing merge jobs with the debug 60 // command. 61 if !mergeRunning.CompareAndSwap(false, true) { 62 log.Printf("merge already running") 63 return 64 } 65 defer mergeRunning.Store(false) 66 67 metricShardMergingRunning.Set(1) 68 defer metricShardMergingRunning.Set(0) 69 70 // We keep creating compound shards until we run out of shards to merge or until 71 // we encounter an error during merging. 72 next := true 73 for next { 74 next = false 75 s.muIndexDir.Global(func() { 76 candidates, excluded := loadCandidates(s.IndexDir, s.mergeOpts) 77 log.Printf("loadCandidates: candidates=%d excluded=%d", len(candidates), excluded) 78 79 c := pickCandidates(candidates, s.mergeOpts.targetSizeBytes) 80 if len(c.shards) <= 1 { 81 log.Printf("could not find enough shards to build a compound shard") 82 return 83 } 84 log.Printf("start merging: shards=%d total_size=%.2fMiB", len(c.shards), float64(c.size)/(1024*1024)) 85 86 var paths []string 87 for _, p := range c.shards { 88 paths = append(paths, p.path) 89 } 90 91 start := time.Now() 92 93 cmd := mergeCmd(paths...) 94 95 // zoekt-merge-index writes the full path of the new compound shard to stdout. 96 stdoutBuf := &bytes.Buffer{} 97 stderrBuf := &bytes.Buffer{} 98 cmd.Stdout = stdoutBuf 99 cmd.Stderr = stderrBuf 100 101 err := cmd.Run() 102 103 durationSeconds := time.Since(start).Seconds() 104 metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(durationSeconds) 105 if err != nil { 106 log.Printf("error merging shards: stdout=%s, stderr=%s, durationSeconds=%.2f err=%s", stdoutBuf.String(), stderrBuf.String(), durationSeconds, err) 107 return 108 } 109 110 log.Printf("finished merging: shard=%s durationSeconds=%.2f", stdoutBuf.String(), durationSeconds) 111 112 next = true 113 }) 114 } 115} 116 117type candidate struct { 118 path string 119 120 // The size as reported by os.Stat. 121 sizeBytes int64 122} 123 124// loadCandidates returns all shards eligible for merging. 125func loadCandidates(dir string, opts mergeOpts) ([]candidate, int) { 126 excluded := 0 127 128 d, err := os.Open(dir) 129 if err != nil { 130 debug.Printf("failed to load candidates: %s", dir) 131 return []candidate{}, excluded 132 } 133 defer d.Close() 134 names, _ := d.Readdirnames(-1) 135 136 candidates := make([]candidate, 0, len(names)) 137 for _, n := range names { 138 path := filepath.Join(dir, n) 139 140 fi, err := os.Stat(path) 141 if err != nil { 142 debug.Printf("stat failed for %s: %s", n, err) 143 continue 144 } 145 146 if fi.IsDir() || filepath.Ext(path) != ".zoekt" { 147 continue 148 } 149 150 if isExcluded(path, fi, opts) { 151 excluded++ 152 continue 153 } 154 155 candidates = append(candidates, candidate{ 156 path: path, 157 sizeBytes: fi.Size(), 158 }) 159 } 160 return candidates, excluded 161} 162 163var reShard = regexp.MustCompile(`\.[0-9]{5}\.zoekt$`) 164 165func hasMultipleShards(path string) bool { 166 if !reShard.MatchString(path) { 167 return false 168 } 169 secondShard := reShard.ReplaceAllString(path, ".00001.zoekt") 170 _, err := os.Stat(secondShard) 171 return !os.IsNotExist(err) 172} 173 174type mergeOpts struct { 175 // targetSizeBytes is the target size in bytes for compound shards. The higher 176 // the value the more repositories a compound shard will contain and the bigger 177 // the potential for saving MEM. The savings in MEM come at the cost of a 178 // degraded search performance. 179 targetSizeBytes int64 180 181 // compound shards smaller than minSizeBytes will be deleted by vacuum. 182 minSizeBytes int64 183 184 // vacuumInterval is how often indexserver scans compound shards to remove 185 // tombstones. 186 vacuumInterval time.Duration 187 188 // mergeInterval defines how often indexserver runs the merge operation in 189 // the index directory. 190 mergeInterval time.Duration 191 192 // number of days since the last commit until we consider the shard for 193 // merging. For example, a value of 7 means that only repos that have been 194 // inactive for 7 days will be considered for merging. 195 minAgeDays int 196} 197 198// isExcluded returns true if a shard should not be merged, false otherwise. 199// 200// We need path and FileInfo because FileInfo does not contain the full path, see 201// discussion here https://github.com/golang/go/issues/32300. 202func isExcluded(path string, fi os.FileInfo, opts mergeOpts) bool { 203 if hasMultipleShards(path) { 204 return true 205 } 206 207 repos, _, err := zoekt.ReadMetadataPath(path) 208 if err != nil { 209 debug.Printf("failed to load metadata for %s\n", fi.Name()) 210 return true 211 } 212 213 // Exclude compound shards from being merge targets. Why? We want repositories in a 214 // compound shard to be ordered based on their priority. The easiest way to 215 // enforce this is to delete the compound shard once it drops below a certain 216 // size (handled by cleanup), reindex the repositories and merge them with other 217 // shards in the correct order. 218 if len(repos) > 1 { 219 return true 220 } 221 222 if repos[0].LatestCommitDate.After(time.Now().AddDate(0, 0, -opts.minAgeDays)) { 223 return true 224 } 225 226 return false 227} 228 229type compound struct { 230 shards []candidate 231 size int64 232} 233 234func (c *compound) add(cand candidate) { 235 c.shards = append(c.shards, cand) 236 c.size += cand.sizeBytes 237}