fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "bytes" 5 "context" 6 "os" 7 "os/exec" 8 "path/filepath" 9 "strconv" 10 "time" 11 12 "github.com/grafana/regexp" 13 "github.com/prometheus/client_golang/prometheus" 14 "github.com/prometheus/client_golang/prometheus/promauto" 15 "go.uber.org/atomic" 16 17 "github.com/sourcegraph/zoekt/index" 18 "github.com/sourcegraph/zoekt/internal/tenant" 19) 20 21var metricShardMergingRunning = promauto.NewGauge(prometheus.GaugeOpts{ 22 Name: "index_shard_merging_running", 23 Help: "Set to 1 if indexserver's merge job is running.", 24}) 25 26var metricShardMergingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 27 Name: "index_shard_merging_duration_seconds", 28 Help: "The duration of 1 shard merge operation.", 29 Buckets: prometheus.LinearBuckets(30, 30, 10), 30}, []string{"error"}) 31 32func pickCandidates(shards []candidate, targetSizeBytes int64) compound { 33 c := compound{} 34 for _, shard := range shards { 35 c.add(shard) 36 if c.size >= targetSizeBytes { 37 return c 38 } 39 } 40 return compound{} 41} 42 43var mergeRunning atomic.Bool 44 45func defaultMergeCmd(args ...string) *exec.Cmd { 46 cmd := exec.Command("zoekt-merge-index", "merge") 47 cmd.Args = append(cmd.Args, args...) 48 return cmd 49} 50 51func defaultExplodeCmd(args ...string) *exec.Cmd { 52 cmd := exec.Command("zoekt-merge-index", "explode") 53 cmd.Args = append(cmd.Args, args...) 54 return cmd 55} 56 57// doMerge drives the merge process. It holds the lock on s.indexDir for the 58// duration of 1 merge, which might be several minutes, depending on the target 59// size of the compound shard. 60func (s *Server) doMerge() { 61 s.merge(defaultMergeCmd) 62} 63 64// same as doMerge but with a configurable merge command. 65func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) { 66 // Guard against the user triggering competing merge jobs with the debug 67 // command. 68 if !mergeRunning.CompareAndSwap(false, true) { 69 infoLog.Printf("merge already running") 70 return 71 } 72 defer mergeRunning.Store(false) 73 74 metricShardMergingRunning.Set(1) 75 defer metricShardMergingRunning.Set(0) 76 77 // We keep creating compound shards until we run out of shards to merge or until 78 // we encounter an error during merging. 79 next := true 80 for next { 81 next = false 82 s.muIndexDir.Global(func() { 83 candidates, excluded := loadCandidates(s.IndexDir, s.mergeOpts) 84 infoLog.Printf("loadCandidates: candidates=%d excluded=%d", len(candidates), excluded) 85 86 c := pickCandidates(candidates, s.mergeOpts.targetSizeBytes) 87 if len(c.shards) <= 1 { 88 infoLog.Printf("could not find enough shards to build a compound shard") 89 return 90 } 91 infoLog.Printf("start merging: shards=%d total_size=%.2fMiB", len(c.shards), float64(c.size)/(1024*1024)) 92 93 var paths []string 94 for _, p := range c.shards { 95 paths = append(paths, p.path) 96 } 97 98 start := time.Now() 99 100 cmd := mergeCmd(paths...) 101 102 // zoekt-merge-index writes the full path of the new compound shard to stdout. 103 stdoutBuf := &bytes.Buffer{} 104 stderrBuf := &bytes.Buffer{} 105 cmd.Stdout = stdoutBuf 106 cmd.Stderr = stderrBuf 107 108 err := cmd.Run() 109 110 durationSeconds := time.Since(start).Seconds() 111 metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(durationSeconds) 112 if err != nil { 113 errorLog.Printf("error merging shards: stdout=%s, stderr=%s, durationSeconds=%.2f err=%s", stdoutBuf.String(), stderrBuf.String(), durationSeconds, err) 114 return 115 } 116 117 infoLog.Printf("finished merging: shard=%s durationSeconds=%.2f", stdoutBuf.String(), durationSeconds) 118 119 next = true 120 }) 121 } 122} 123 124type candidate struct { 125 path string 126 127 // The size as reported by os.Stat. 128 sizeBytes int64 129} 130 131// loadCandidates returns all shards eligible for merging. 132func loadCandidates(dir string, opts mergeOpts) ([]candidate, int) { 133 excluded := 0 134 135 d, err := os.Open(dir) 136 if err != nil { 137 debugLog.Printf("failed to load candidates: %s", dir) 138 return []candidate{}, excluded 139 } 140 defer d.Close() 141 names, _ := d.Readdirnames(-1) 142 143 candidates := make([]candidate, 0, len(names)) 144 for _, n := range names { 145 path := filepath.Join(dir, n) 146 147 fi, err := os.Stat(path) 148 if err != nil { 149 debugLog.Printf("stat failed for %s: %s", n, err) 150 continue 151 } 152 153 if fi.IsDir() || filepath.Ext(path) != ".zoekt" { 154 continue 155 } 156 157 if isExcluded(path, fi, opts) { 158 excluded++ 159 continue 160 } 161 162 candidates = append(candidates, candidate{ 163 path: path, 164 sizeBytes: fi.Size(), 165 }) 166 } 167 return candidates, excluded 168} 169 170var reShard = regexp.MustCompile(`\.[0-9]{5}\.zoekt$`) 171 172func hasMultipleShards(path string) bool { 173 if !reShard.MatchString(path) { 174 return false 175 } 176 secondShard := reShard.ReplaceAllString(path, ".00001.zoekt") 177 _, err := os.Stat(secondShard) 178 return !os.IsNotExist(err) 179} 180 181type mergeOpts struct { 182 // targetSizeBytes is the target size in bytes for compound shards. The higher 183 // the value the more repositories a compound shard will contain and the bigger 184 // the potential for saving MEM. The savings in MEM come at the cost of a 185 // degraded search performance. 186 targetSizeBytes int64 187 188 // compound shards smaller than minSizeBytes will be deleted by vacuum. 189 minSizeBytes int64 190 191 // vacuumInterval is how often indexserver scans compound shards to remove 192 // tombstones. 193 vacuumInterval time.Duration 194 195 // mergeInterval defines how often indexserver runs the merge operation in 196 // the index directory. 197 mergeInterval time.Duration 198 199 // number of days since the last commit until we consider the shard for 200 // merging. For example, a value of 7 means that only repos that have been 201 // inactive for 7 days will be considered for merging. 202 minAgeDays int 203} 204 205// isExcluded returns true if a shard should not be merged, false otherwise. 206// 207// We need path and FileInfo because FileInfo does not contain the full path, see 208// discussion here https://github.com/golang/go/issues/32300. 209func isExcluded(path string, fi os.FileInfo, opts mergeOpts) bool { 210 if hasMultipleShards(path) { 211 return true 212 } 213 214 repos, _, err := index.ReadMetadataPath(path) 215 if err != nil { 216 debugLog.Printf("failed to load metadata for %s\n", fi.Name()) 217 return true 218 } 219 220 // Exclude compound shards from being merge targets. Why? We want repositories in a 221 // compound shard to be ordered based on their priority. The easiest way to 222 // enforce this is to delete the compound shard once it drops below a certain 223 // size (handled by cleanup), reindex the repositories and merge them with other 224 // shards in the correct order. 225 if len(repos) > 1 { 226 return true 227 } 228 229 if repos[0].LatestCommitDate.After(time.Now().AddDate(0, 0, -opts.minAgeDays)) { 230 return true 231 } 232 233 return false 234} 235 236type compound struct { 237 shards []candidate 238 size int64 239} 240 241func (c *compound) add(cand candidate) { 242 c.shards = append(c.shards, cand) 243 c.size += cand.sizeBytes 244} 245 246// explodeTenantCompoundShards explodes all compound shards that have repos from 247// the tenant in question. The caller must hold the global lock. 248func (s *Server) explodeTenantCompoundShards(ctx context.Context, explodeFunc func(path string) error) error { 249 tnt, err := tenant.FromContext(ctx) 250 if err != nil { 251 return err 252 } 253 254 paths, err := filepath.Glob(filepath.Join(s.IndexDir, "compound-*")) 255 if err != nil { 256 return err 257 } 258 if len(paths) == 0 { 259 return nil 260 } 261 262nextCompoundShard: 263 for _, path := range paths { 264 // We don't use ReadMetadataPathAlive because we want to detect 265 // tombstoned repos, too. 266 repos, _, err := index.ReadMetadataPath(path) 267 if err != nil { 268 return err 269 } 270 for _, repo := range repos { 271 if repo.TenantID == tnt.ID() { 272 err := explodeFunc(path) 273 if err != nil { 274 return err 275 } 276 277 continue nextCompoundShard 278 } 279 } 280 } 281 return nil 282}