fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "bytes" 5 "os" 6 "os/exec" 7 "path/filepath" 8 "strconv" 9 "time" 10 11 "github.com/grafana/regexp" 12 "github.com/prometheus/client_golang/prometheus" 13 "github.com/prometheus/client_golang/prometheus/promauto" 14 "github.com/sourcegraph/zoekt/index" 15 "go.uber.org/atomic" 16) 17 18var metricShardMergingRunning = promauto.NewGauge(prometheus.GaugeOpts{ 19 Name: "index_shard_merging_running", 20 Help: "Set to 1 if indexserver's merge job is running.", 21}) 22 23var metricShardMergingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 24 Name: "index_shard_merging_duration_seconds", 25 Help: "The duration of 1 shard merge operation.", 26 Buckets: prometheus.LinearBuckets(30, 30, 10), 27}, []string{"error"}) 28 29func pickCandidates(shards []candidate, targetSizeBytes int64) compound { 30 c := compound{} 31 for _, shard := range shards { 32 c.add(shard) 33 if c.size >= targetSizeBytes { 34 return c 35 } 36 } 37 return compound{} 38} 39 40var mergeRunning atomic.Bool 41 42func defaultMergeCmd(args ...string) *exec.Cmd { 43 cmd := exec.Command("zoekt-merge-index", "merge") 44 cmd.Args = append(cmd.Args, args...) 45 return cmd 46} 47 48// doMerge drives the merge process. It holds the lock on s.indexDir for the 49// duration of 1 merge, which might be several minutes, depending on the target 50// size of the compound shard. 51func (s *Server) doMerge() { 52 s.merge(defaultMergeCmd) 53} 54 55// same as doMerge but with a configurable merge command. 56func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) { 57 // Guard against the user triggering competing merge jobs with the debug 58 // command. 59 if !mergeRunning.CompareAndSwap(false, true) { 60 infoLog.Printf("merge already running") 61 return 62 } 63 defer mergeRunning.Store(false) 64 65 metricShardMergingRunning.Set(1) 66 defer metricShardMergingRunning.Set(0) 67 68 // We keep creating compound shards until we run out of shards to merge or until 69 // we encounter an error during merging. 70 next := true 71 for next { 72 next = false 73 s.muIndexDir.Global(func() { 74 candidates, excluded := loadCandidates(s.IndexDir, s.mergeOpts) 75 infoLog.Printf("loadCandidates: candidates=%d excluded=%d", len(candidates), excluded) 76 77 c := pickCandidates(candidates, s.mergeOpts.targetSizeBytes) 78 if len(c.shards) <= 1 { 79 infoLog.Printf("could not find enough shards to build a compound shard") 80 return 81 } 82 infoLog.Printf("start merging: shards=%d total_size=%.2fMiB", len(c.shards), float64(c.size)/(1024*1024)) 83 84 var paths []string 85 for _, p := range c.shards { 86 paths = append(paths, p.path) 87 } 88 89 start := time.Now() 90 91 cmd := mergeCmd(paths...) 92 93 // zoekt-merge-index writes the full path of the new compound shard to stdout. 94 stdoutBuf := &bytes.Buffer{} 95 stderrBuf := &bytes.Buffer{} 96 cmd.Stdout = stdoutBuf 97 cmd.Stderr = stderrBuf 98 99 err := cmd.Run() 100 101 durationSeconds := time.Since(start).Seconds() 102 metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(durationSeconds) 103 if err != nil { 104 errorLog.Printf("error merging shards: stdout=%s, stderr=%s, durationSeconds=%.2f err=%s", stdoutBuf.String(), stderrBuf.String(), durationSeconds, err) 105 return 106 } 107 108 infoLog.Printf("finished merging: shard=%s durationSeconds=%.2f", stdoutBuf.String(), durationSeconds) 109 110 next = true 111 }) 112 } 113} 114 115type candidate struct { 116 path string 117 118 // The size as reported by os.Stat. 119 sizeBytes int64 120} 121 122// loadCandidates returns all shards eligible for merging. 123func loadCandidates(dir string, opts mergeOpts) ([]candidate, int) { 124 excluded := 0 125 126 d, err := os.Open(dir) 127 if err != nil { 128 debugLog.Printf("failed to load candidates: %s", dir) 129 return []candidate{}, excluded 130 } 131 defer d.Close() 132 names, _ := d.Readdirnames(-1) 133 134 candidates := make([]candidate, 0, len(names)) 135 for _, n := range names { 136 path := filepath.Join(dir, n) 137 138 fi, err := os.Stat(path) 139 if err != nil { 140 debugLog.Printf("stat failed for %s: %s", n, err) 141 continue 142 } 143 144 if fi.IsDir() || filepath.Ext(path) != ".zoekt" { 145 continue 146 } 147 148 if isExcluded(path, fi, opts) { 149 excluded++ 150 continue 151 } 152 153 candidates = append(candidates, candidate{ 154 path: path, 155 sizeBytes: fi.Size(), 156 }) 157 } 158 return candidates, excluded 159} 160 161var reShard = regexp.MustCompile(`\.[0-9]{5}\.zoekt$`) 162 163func hasMultipleShards(path string) bool { 164 if !reShard.MatchString(path) { 165 return false 166 } 167 secondShard := reShard.ReplaceAllString(path, ".00001.zoekt") 168 _, err := os.Stat(secondShard) 169 return !os.IsNotExist(err) 170} 171 172type mergeOpts struct { 173 // targetSizeBytes is the target size in bytes for compound shards. The higher 174 // the value the more repositories a compound shard will contain and the bigger 175 // the potential for saving MEM. The savings in MEM come at the cost of a 176 // degraded search performance. 177 targetSizeBytes int64 178 179 // compound shards smaller than minSizeBytes will be deleted by vacuum. 180 minSizeBytes int64 181 182 // vacuumInterval is how often indexserver scans compound shards to remove 183 // tombstones. 184 vacuumInterval time.Duration 185 186 // mergeInterval defines how often indexserver runs the merge operation in 187 // the index directory. 188 mergeInterval time.Duration 189 190 // number of days since the last commit until we consider the shard for 191 // merging. For example, a value of 7 means that only repos that have been 192 // inactive for 7 days will be considered for merging. 193 minAgeDays int 194} 195 196// isExcluded returns true if a shard should not be merged, false otherwise. 197// 198// We need path and FileInfo because FileInfo does not contain the full path, see 199// discussion here https://github.com/golang/go/issues/32300. 200func isExcluded(path string, fi os.FileInfo, opts mergeOpts) bool { 201 if hasMultipleShards(path) { 202 return true 203 } 204 205 repos, _, err := index.ReadMetadataPath(path) 206 if err != nil { 207 debugLog.Printf("failed to load metadata for %s\n", fi.Name()) 208 return true 209 } 210 211 // Exclude compound shards from being merge targets. Why? We want repositories in a 212 // compound shard to be ordered based on their priority. The easiest way to 213 // enforce this is to delete the compound shard once it drops below a certain 214 // size (handled by cleanup), reindex the repositories and merge them with other 215 // shards in the correct order. 216 if len(repos) > 1 { 217 return true 218 } 219 220 if repos[0].LatestCommitDate.After(time.Now().AddDate(0, 0, -opts.minAgeDays)) { 221 return true 222 } 223 224 return false 225} 226 227type compound struct { 228 shards []candidate 229 size int64 230} 231 232func (c *compound) add(cand candidate) { 233 c.shards = append(c.shards, cand) 234 c.size += cand.sizeBytes 235}