fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "bytes" 5 "os" 6 "os/exec" 7 "path/filepath" 8 "strconv" 9 "time" 10 11 "github.com/grafana/regexp" 12 "github.com/prometheus/client_golang/prometheus" 13 "github.com/prometheus/client_golang/prometheus/promauto" 14 "go.uber.org/atomic" 15 16 "github.com/sourcegraph/zoekt" 17) 18 19var metricShardMergingRunning = promauto.NewGauge(prometheus.GaugeOpts{ 20 Name: "index_shard_merging_running", 21 Help: "Set to 1 if indexserver's merge job is running.", 22}) 23 24var metricShardMergingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 25 Name: "index_shard_merging_duration_seconds", 26 Help: "The duration of 1 shard merge operation.", 27 Buckets: prometheus.LinearBuckets(30, 30, 10), 28}, []string{"error"}) 29 30func pickCandidates(shards []candidate, targetSizeBytes int64) compound { 31 c := compound{} 32 for _, shard := range shards { 33 c.add(shard) 34 if c.size >= targetSizeBytes { 35 return c 36 } 37 } 38 return compound{} 39} 40 41var mergeRunning atomic.Bool 42 43func defaultMergeCmd(args ...string) *exec.Cmd { 44 cmd := exec.Command("zoekt-merge-index", "merge") 45 cmd.Args = append(cmd.Args, args...) 46 return cmd 47} 48 49// doMerge drives the merge process. It holds the lock on s.indexDir for the 50// duration of 1 merge, which might be several minutes, depending on the target 51// size of the compound shard. 52func (s *Server) doMerge() { 53 s.merge(defaultMergeCmd) 54} 55 56// same as doMerge but with a configurable merge command. 57func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) { 58 // Guard against the user triggering competing merge jobs with the debug 59 // command. 60 if !mergeRunning.CompareAndSwap(false, true) { 61 infoLog.Printf("merge already running") 62 return 63 } 64 defer mergeRunning.Store(false) 65 66 metricShardMergingRunning.Set(1) 67 defer metricShardMergingRunning.Set(0) 68 69 // We keep creating compound shards until we run out of shards to merge or until 70 // we encounter an error during merging. 71 next := true 72 for next { 73 next = false 74 s.muIndexDir.Global(func() { 75 candidates, excluded := loadCandidates(s.IndexDir, s.mergeOpts) 76 infoLog.Printf("loadCandidates: candidates=%d excluded=%d", len(candidates), excluded) 77 78 c := pickCandidates(candidates, s.mergeOpts.targetSizeBytes) 79 if len(c.shards) <= 1 { 80 infoLog.Printf("could not find enough shards to build a compound shard") 81 return 82 } 83 infoLog.Printf("start merging: shards=%d total_size=%.2fMiB", len(c.shards), float64(c.size)/(1024*1024)) 84 85 var paths []string 86 for _, p := range c.shards { 87 paths = append(paths, p.path) 88 } 89 90 start := time.Now() 91 92 cmd := mergeCmd(paths...) 93 94 // zoekt-merge-index writes the full path of the new compound shard to stdout. 95 stdoutBuf := &bytes.Buffer{} 96 stderrBuf := &bytes.Buffer{} 97 cmd.Stdout = stdoutBuf 98 cmd.Stderr = stderrBuf 99 100 err := cmd.Run() 101 102 durationSeconds := time.Since(start).Seconds() 103 metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(durationSeconds) 104 if err != nil { 105 errorLog.Printf("error merging shards: stdout=%s, stderr=%s, durationSeconds=%.2f err=%s", stdoutBuf.String(), stderrBuf.String(), durationSeconds, err) 106 return 107 } 108 109 infoLog.Printf("finished merging: shard=%s durationSeconds=%.2f", stdoutBuf.String(), durationSeconds) 110 111 next = true 112 }) 113 } 114} 115 116type candidate struct { 117 path string 118 119 // The size as reported by os.Stat. 120 sizeBytes int64 121} 122 123// loadCandidates returns all shards eligible for merging. 124func loadCandidates(dir string, opts mergeOpts) ([]candidate, int) { 125 excluded := 0 126 127 d, err := os.Open(dir) 128 if err != nil { 129 debugLog.Printf("failed to load candidates: %s", dir) 130 return []candidate{}, excluded 131 } 132 defer d.Close() 133 names, _ := d.Readdirnames(-1) 134 135 candidates := make([]candidate, 0, len(names)) 136 for _, n := range names { 137 path := filepath.Join(dir, n) 138 139 fi, err := os.Stat(path) 140 if err != nil { 141 debugLog.Printf("stat failed for %s: %s", n, err) 142 continue 143 } 144 145 if fi.IsDir() || filepath.Ext(path) != ".zoekt" { 146 continue 147 } 148 149 if isExcluded(path, fi, opts) { 150 excluded++ 151 continue 152 } 153 154 candidates = append(candidates, candidate{ 155 path: path, 156 sizeBytes: fi.Size(), 157 }) 158 } 159 return candidates, excluded 160} 161 162var reShard = regexp.MustCompile(`\.[0-9]{5}\.zoekt$`) 163 164func hasMultipleShards(path string) bool { 165 if !reShard.MatchString(path) { 166 return false 167 } 168 secondShard := reShard.ReplaceAllString(path, ".00001.zoekt") 169 _, err := os.Stat(secondShard) 170 return !os.IsNotExist(err) 171} 172 173type mergeOpts struct { 174 // targetSizeBytes is the target size in bytes for compound shards. The higher 175 // the value the more repositories a compound shard will contain and the bigger 176 // the potential for saving MEM. The savings in MEM come at the cost of a 177 // degraded search performance. 178 targetSizeBytes int64 179 180 // compound shards smaller than minSizeBytes will be deleted by vacuum. 181 minSizeBytes int64 182 183 // vacuumInterval is how often indexserver scans compound shards to remove 184 // tombstones. 185 vacuumInterval time.Duration 186 187 // mergeInterval defines how often indexserver runs the merge operation in 188 // the index directory. 189 mergeInterval time.Duration 190 191 // number of days since the last commit until we consider the shard for 192 // merging. For example, a value of 7 means that only repos that have been 193 // inactive for 7 days will be considered for merging. 194 minAgeDays int 195} 196 197// isExcluded returns true if a shard should not be merged, false otherwise. 198// 199// We need path and FileInfo because FileInfo does not contain the full path, see 200// discussion here https://github.com/golang/go/issues/32300. 201func isExcluded(path string, fi os.FileInfo, opts mergeOpts) bool { 202 if hasMultipleShards(path) { 203 return true 204 } 205 206 repos, _, err := zoekt.ReadMetadataPath(path) 207 if err != nil { 208 debugLog.Printf("failed to load metadata for %s\n", fi.Name()) 209 return true 210 } 211 212 // Exclude compound shards from being merge targets. Why? We want repositories in a 213 // compound shard to be ordered based on their priority. The easiest way to 214 // enforce this is to delete the compound shard once it drops below a certain 215 // size (handled by cleanup), reindex the repositories and merge them with other 216 // shards in the correct order. 217 if len(repos) > 1 { 218 return true 219 } 220 221 if repos[0].LatestCommitDate.After(time.Now().AddDate(0, 0, -opts.minAgeDays)) { 222 return true 223 } 224 225 return false 226} 227 228type compound struct { 229 shards []candidate 230 size int64 231} 232 233func (c *compound) add(cand candidate) { 234 c.shards = append(c.shards, cand) 235 c.size += cand.sizeBytes 236}