fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "bytes" 5 "context" 6 "os" 7 "os/exec" 8 "path/filepath" 9 "strconv" 10 "time" 11 12 "github.com/grafana/regexp" 13 "github.com/prometheus/client_golang/prometheus" 14 "github.com/prometheus/client_golang/prometheus/promauto" 15 "github.com/sourcegraph/zoekt/index" 16 "github.com/sourcegraph/zoekt/internal/tenant" 17 "go.uber.org/atomic" 18) 19 20var metricShardMergingRunning = promauto.NewGauge(prometheus.GaugeOpts{ 21 Name: "index_shard_merging_running", 22 Help: "Set to 1 if indexserver's merge job is running.", 23}) 24 25var metricShardMergingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 26 Name: "index_shard_merging_duration_seconds", 27 Help: "The duration of 1 shard merge operation.", 28 Buckets: prometheus.LinearBuckets(30, 30, 10), 29}, []string{"error"}) 30 31func pickCandidates(shards []candidate, targetSizeBytes int64) compound { 32 c := compound{} 33 for _, shard := range shards { 34 c.add(shard) 35 if c.size >= targetSizeBytes { 36 return c 37 } 38 } 39 return compound{} 40} 41 42var mergeRunning atomic.Bool 43 44func defaultMergeCmd(args ...string) *exec.Cmd { 45 cmd := exec.Command("zoekt-merge-index", "merge") 46 cmd.Args = append(cmd.Args, args...) 47 return cmd 48} 49 50func defaultExplodeCmd(args ...string) *exec.Cmd { 51 cmd := exec.Command("zoekt-merge-index", "explode") 52 cmd.Args = append(cmd.Args, args...) 53 return cmd 54} 55 56// doMerge drives the merge process. It holds the lock on s.indexDir for the 57// duration of 1 merge, which might be several minutes, depending on the target 58// size of the compound shard. 59func (s *Server) doMerge() { 60 s.merge(defaultMergeCmd) 61} 62 63// same as doMerge but with a configurable merge command. 64func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) { 65 // Guard against the user triggering competing merge jobs with the debug 66 // command. 67 if !mergeRunning.CompareAndSwap(false, true) { 68 infoLog.Printf("merge already running") 69 return 70 } 71 defer mergeRunning.Store(false) 72 73 metricShardMergingRunning.Set(1) 74 defer metricShardMergingRunning.Set(0) 75 76 // We keep creating compound shards until we run out of shards to merge or until 77 // we encounter an error during merging. 78 next := true 79 for next { 80 next = false 81 s.muIndexDir.Global(func() { 82 candidates, excluded := loadCandidates(s.IndexDir, s.mergeOpts) 83 infoLog.Printf("loadCandidates: candidates=%d excluded=%d", len(candidates), excluded) 84 85 c := pickCandidates(candidates, s.mergeOpts.targetSizeBytes) 86 if len(c.shards) <= 1 { 87 infoLog.Printf("could not find enough shards to build a compound shard") 88 return 89 } 90 infoLog.Printf("start merging: shards=%d total_size=%.2fMiB", len(c.shards), float64(c.size)/(1024*1024)) 91 92 var paths []string 93 for _, p := range c.shards { 94 paths = append(paths, p.path) 95 } 96 97 start := time.Now() 98 99 cmd := mergeCmd(paths...) 100 101 // zoekt-merge-index writes the full path of the new compound shard to stdout. 102 stdoutBuf := &bytes.Buffer{} 103 stderrBuf := &bytes.Buffer{} 104 cmd.Stdout = stdoutBuf 105 cmd.Stderr = stderrBuf 106 107 err := cmd.Run() 108 109 durationSeconds := time.Since(start).Seconds() 110 metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(durationSeconds) 111 if err != nil { 112 errorLog.Printf("error merging shards: stdout=%s, stderr=%s, durationSeconds=%.2f err=%s", stdoutBuf.String(), stderrBuf.String(), durationSeconds, err) 113 return 114 } 115 116 infoLog.Printf("finished merging: shard=%s durationSeconds=%.2f", stdoutBuf.String(), durationSeconds) 117 118 next = true 119 }) 120 } 121} 122 123type candidate struct { 124 path string 125 126 // The size as reported by os.Stat. 127 sizeBytes int64 128} 129 130// loadCandidates returns all shards eligible for merging. 131func loadCandidates(dir string, opts mergeOpts) ([]candidate, int) { 132 excluded := 0 133 134 d, err := os.Open(dir) 135 if err != nil { 136 debugLog.Printf("failed to load candidates: %s", dir) 137 return []candidate{}, excluded 138 } 139 defer d.Close() 140 names, _ := d.Readdirnames(-1) 141 142 candidates := make([]candidate, 0, len(names)) 143 for _, n := range names { 144 path := filepath.Join(dir, n) 145 146 fi, err := os.Stat(path) 147 if err != nil { 148 debugLog.Printf("stat failed for %s: %s", n, err) 149 continue 150 } 151 152 if fi.IsDir() || filepath.Ext(path) != ".zoekt" { 153 continue 154 } 155 156 if isExcluded(path, fi, opts) { 157 excluded++ 158 continue 159 } 160 161 candidates = append(candidates, candidate{ 162 path: path, 163 sizeBytes: fi.Size(), 164 }) 165 } 166 return candidates, excluded 167} 168 169var reShard = regexp.MustCompile(`\.[0-9]{5}\.zoekt$`) 170 171func hasMultipleShards(path string) bool { 172 if !reShard.MatchString(path) { 173 return false 174 } 175 secondShard := reShard.ReplaceAllString(path, ".00001.zoekt") 176 _, err := os.Stat(secondShard) 177 return !os.IsNotExist(err) 178} 179 180type mergeOpts struct { 181 // targetSizeBytes is the target size in bytes for compound shards. The higher 182 // the value the more repositories a compound shard will contain and the bigger 183 // the potential for saving MEM. The savings in MEM come at the cost of a 184 // degraded search performance. 185 targetSizeBytes int64 186 187 // compound shards smaller than minSizeBytes will be deleted by vacuum. 188 minSizeBytes int64 189 190 // vacuumInterval is how often indexserver scans compound shards to remove 191 // tombstones. 192 vacuumInterval time.Duration 193 194 // mergeInterval defines how often indexserver runs the merge operation in 195 // the index directory. 196 mergeInterval time.Duration 197 198 // number of days since the last commit until we consider the shard for 199 // merging. For example, a value of 7 means that only repos that have been 200 // inactive for 7 days will be considered for merging. 201 minAgeDays int 202} 203 204// isExcluded returns true if a shard should not be merged, false otherwise. 205// 206// We need path and FileInfo because FileInfo does not contain the full path, see 207// discussion here https://github.com/golang/go/issues/32300. 208func isExcluded(path string, fi os.FileInfo, opts mergeOpts) bool { 209 if hasMultipleShards(path) { 210 return true 211 } 212 213 repos, _, err := index.ReadMetadataPath(path) 214 if err != nil { 215 debugLog.Printf("failed to load metadata for %s\n", fi.Name()) 216 return true 217 } 218 219 // Exclude compound shards from being merge targets. Why? We want repositories in a 220 // compound shard to be ordered based on their priority. The easiest way to 221 // enforce this is to delete the compound shard once it drops below a certain 222 // size (handled by cleanup), reindex the repositories and merge them with other 223 // shards in the correct order. 224 if len(repos) > 1 { 225 return true 226 } 227 228 if repos[0].LatestCommitDate.After(time.Now().AddDate(0, 0, -opts.minAgeDays)) { 229 return true 230 } 231 232 return false 233} 234 235type compound struct { 236 shards []candidate 237 size int64 238} 239 240func (c *compound) add(cand candidate) { 241 c.shards = append(c.shards, cand) 242 c.size += cand.sizeBytes 243} 244 245// explodeTenantCompoundShards explodes all compound shards that have repos from 246// the tenant in question. The caller must hold the global lock. 247func (s *Server) explodeTenantCompoundShards(ctx context.Context, explodeFunc func(path string) error) error { 248 tnt, err := tenant.FromContext(ctx) 249 if err != nil { 250 return err 251 } 252 253 paths, err := filepath.Glob(filepath.Join(s.IndexDir, "compound-*")) 254 if err != nil { 255 return err 256 } 257 if len(paths) == 0 { 258 return nil 259 } 260 261nextCompoundShard: 262 for _, path := range paths { 263 // We don't use ReadMetadataPathAlive because we want to detect 264 // tombstoned repos, too. 265 repos, _, err := index.ReadMetadataPath(path) 266 if err != nil { 267 return err 268 } 269 for _, repo := range repos { 270 if repo.TenantID == tnt.ID() { 271 err := explodeFunc(path) 272 if err != nil { 273 return err 274 } 275 276 continue nextCompoundShard 277 } 278 } 279 } 280 return nil 281}