fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

indexserver: expose all merge options (#527)

With this change we expose "the maximum allowed priority" and "the minimum age
of the latest commit" as command line flags and ENVs.

At the same time we clean up the Server struct a bit and bundle all
fields related to shard merging in a dedicated options struct.

The motiviation is to make shard merging fully configurable before
rolling it out to all customers.

+73 -39
+1 -1
cmd/zoekt-sourcegraph-indexserver/cleanup.go
··· 438 438 continue 439 439 } 440 440 441 - if info.Size() < s.minSizeBytes { 441 + if info.Size() < s.mergeOpts.minSizeBytes { 442 442 cmd := exec.Command("zoekt-merge-index", "explode", path) 443 443 444 444 var b []byte
+35 -28
cmd/zoekt-sourcegraph-indexserver/main.go
··· 151 151 152 152 // Interval is how often we sync with Sourcegraph. 153 153 Interval time.Duration 154 - 155 - // VacuumInterval is how often indexserver scans compound shards to remove 156 - // tombstones. 157 - VacuumInterval time.Duration 158 - 159 - // MergeInterval defines how often indexserver runs the merge operation in the index 160 - // directory. 161 - MergeInterval time.Duration 162 - 163 - // TargetSizeBytes is the target size in bytes for compound shards. The higher 164 - // the value the more repositories a compound shard will contain and the bigger 165 - // the potential for saving MEM. The savings in MEM come at the cost of a 166 - // degraded search performance. 167 - TargetSizeBytes int64 168 - 169 - // Compound shards smaller than minSizeBytes will be deleted by vacuum. 170 - minSizeBytes int64 171 - 172 154 // CPUCount is the amount of parallelism to use when indexing a 173 155 // repository. 174 156 CPUCount int ··· 194 176 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 195 177 196 178 hostname string 179 + 180 + mergeOpts mergeOpts 197 181 } 198 182 199 183 var debug = log.New(io.Discard, "", log.LstdFlags) ··· 360 344 }() 361 345 362 346 go func() { 363 - for range jitterTicker(s.VacuumInterval, unix.SIGUSR1) { 347 + for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 364 348 if s.shardMerging { 365 349 s.vacuum() 366 350 } ··· 368 352 }() 369 353 370 354 go func() { 371 - for range jitterTicker(s.MergeInterval, unix.SIGUSR1) { 355 + for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 372 356 if s.shardMerging { 373 357 s.doMerge() 374 358 } ··· 1064 1048 return i 1065 1049 } 1066 1050 1051 + func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1052 + v := os.Getenv(k) 1053 + if v == "" { 1054 + return defaultVal 1055 + } 1056 + f, err := strconv.ParseFloat(v, 64) 1057 + if err != nil { 1058 + log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1059 + } 1060 + return f 1061 + } 1062 + 1067 1063 func getEnvWithDefaultString(k string, defaultVal string) string { 1068 1064 v := os.Getenv(k) 1069 1065 if v == "" { ··· 1151 1147 mergeInterval time.Duration 1152 1148 targetSize int64 1153 1149 minSize int64 1150 + minAgeDays int 1151 + maxPriority float64 1154 1152 1155 1153 // config values related to backoff indexing repos with one or more consecutive failures 1156 1154 backoffDuration time.Duration ··· 1160 1158 func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1161 1159 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1162 1160 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1163 - fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", 24*time.Hour, "run vacuum this often") 1164 - fs.DurationVar(&rc.mergeInterval, "merge_interval", 8*time.Hour, "run merge this often") 1165 - fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 1166 - fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 1167 1161 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of concurrent index jobs to run.") 1168 1162 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1169 1163 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") ··· 1172 1166 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate") 1173 1167 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1174 1168 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1169 + 1170 + // flags related to shard merging 1171 + fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1172 + fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1173 + fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 1174 + fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 1175 + fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1176 + fs.Float64Var(&rc.maxPriority, "merge_max_priority", getEnvWithDefaultFloat64("SRC_MERGE_MAX_PRIORITY", 100), "the maximum priority a shard can have to be considered for merging.") 1177 + 1175 1178 } 1176 1179 1177 1180 func startServer(conf rootConfig) error { ··· 1352 1355 IndexDir: conf.index, 1353 1356 IndexConcurrency: int(conf.indexConcurrency), 1354 1357 Interval: conf.interval, 1355 - VacuumInterval: conf.vacuumInterval, 1356 - MergeInterval: conf.mergeInterval, 1357 1358 CPUCount: cpuCount, 1358 - TargetSizeBytes: conf.targetSize * 1024 * 1024, 1359 1359 queue: *q, 1360 - minSizeBytes: conf.minSize * 1024 * 1024, 1361 1360 shardMerging: zoekt.ShardMergingEnabled(), 1362 1361 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1363 1362 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1364 1363 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1365 1364 hostname: conf.hostname, 1365 + mergeOpts: mergeOpts{ 1366 + vacuumInterval: conf.vacuumInterval, 1367 + mergeInterval: conf.mergeInterval, 1368 + targetSizeBytes: conf.targetSize * 1024 * 1024, 1369 + minSizeBytes: conf.minSize * 1024 * 1024, 1370 + minAgeDays: conf.minAgeDays, 1371 + maxPriority: conf.maxPriority, 1372 + }, 1366 1373 }, err 1367 1374 } 1368 1375
+34 -7
cmd/zoekt-sourcegraph-indexserver/merge.go
··· 83 83 for next { 84 84 next = false 85 85 s.muIndexDir.Global(func() { 86 - candidates, excluded := loadCandidates(s.IndexDir) 86 + candidates, excluded := loadCandidates(s.IndexDir, s.mergeOpts) 87 87 log.Printf("loadCandidates: candidates=%d excluded=%d", len(candidates), excluded) 88 88 89 - c := pickCandidates(candidates, s.TargetSizeBytes) 89 + c := pickCandidates(candidates, s.mergeOpts.targetSizeBytes) 90 90 if len(c.shards) <= 1 { 91 91 log.Printf("could not find enough shards to build a compound shard") 92 92 return ··· 126 126 } 127 127 128 128 // loadCandidates returns all shards eligible for merging. 129 - func loadCandidates(dir string) ([]candidate, int) { 129 + func loadCandidates(dir string, opts mergeOpts) ([]candidate, int) { 130 130 excluded := 0 131 131 132 132 d, err := os.Open(dir) ··· 151 151 continue 152 152 } 153 153 154 - if isExcluded(path, fi) { 154 + if isExcluded(path, fi, opts) { 155 155 excluded++ 156 156 continue 157 157 } ··· 175 175 return !os.IsNotExist(err) 176 176 } 177 177 178 + type mergeOpts struct { 179 + // targetSizeBytes is the target size in bytes for compound shards. The higher 180 + // the value the more repositories a compound shard will contain and the bigger 181 + // the potential for saving MEM. The savings in MEM come at the cost of a 182 + // degraded search performance. 183 + targetSizeBytes int64 184 + 185 + // compound shards smaller than minSizeBytes will be deleted by vacuum. 186 + minSizeBytes int64 187 + 188 + // vacuumInterval is how often indexserver scans compound shards to remove 189 + // tombstones. 190 + vacuumInterval time.Duration 191 + 192 + // mergeInterval defines how often indexserver runs the merge operation in 193 + // the index directory. 194 + mergeInterval time.Duration 195 + 196 + // number of days since the last commit until we consider the shard for 197 + // merging. For example, a value of 7 means that only repos that have been 198 + // inactive for 7 days will be considered for merging. 199 + minAgeDays int 200 + 201 + // the MAX priority a shard can have to be considered for merging. 202 + maxPriority float64 203 + } 204 + 178 205 // isExcluded returns true if a shard should not be merged, false otherwise. 179 206 // 180 207 // We need path and FileInfo because FileInfo does not contain the full path, see 181 208 // discussion here https://github.com/golang/go/issues/32300. 182 - func isExcluded(path string, fi os.FileInfo) bool { 209 + func isExcluded(path string, fi os.FileInfo, opts mergeOpts) bool { 183 210 if hasMultipleShards(path) { 184 211 return true 185 212 } ··· 199 226 return true 200 227 } 201 228 202 - if repos[0].LatestCommitDate.After(time.Now().AddDate(0, 0, -7)) { 229 + if repos[0].LatestCommitDate.After(time.Now().AddDate(0, 0, -opts.minAgeDays)) { 203 230 return true 204 231 } 205 232 206 - if priority, err := strconv.ParseFloat(repos[0].RawConfig["priority"], 64); err == nil && priority > 100 { 233 + if priority, err := strconv.ParseFloat(repos[0].RawConfig["priority"], 64); err == nil && priority > opts.maxPriority { 207 234 return true 208 235 } 209 236
+3 -3
cmd/zoekt-sourcegraph-indexserver/merge_test.go
··· 64 64 t.Errorf("Finish: %v", err) 65 65 } 66 66 67 - s := &Server{IndexDir: dir, TargetSizeBytes: 2000 * 1024 * 1024} 67 + s := &Server{IndexDir: dir, mergeOpts: mergeOpts{targetSizeBytes: 2000 * 1024 * 1024}} 68 68 s.merge(helperCallMerge) 69 69 70 70 _, err = os.Stat(filepath.Join(dir, "test-repo_v16.00000.zoekt")) ··· 187 187 } 188 188 189 189 s := &Server{ 190 - IndexDir: dir, 191 - TargetSizeBytes: tc.targetSizeBytes, 190 + IndexDir: dir, 191 + mergeOpts: mergeOpts{targetSizeBytes: tc.targetSizeBytes}, 192 192 } 193 193 194 194 s.merge(helperCallMerge)