fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

indexserver: add IndexConcurrency for concurrent index jobs (#390)

This is a tunable which we only expect to be used when bootstrapping a
large number of repos to be indexed. The previous commit introduced some
concurrency control to prevent processing the same repository
concurrently. This allows us to naively spin up N goroutines to process
the queue. Note: the queue already contains concurrency control.

Now what is needed is some better observability around what is currently
being processed / etc. The net/trace instrumentation does expose this,
but we need something smoother if we polish up this feature.

Meta: The amount of copy paste to add a new variable is a bit of a
smell, but we can address that in a later commit.

Test Plan: go test. Manually start up a fresh instance with 10s of repos
to index and observe concurrent indexing.

plz-review-url: https://plz.review/review/6342

+19 -1
+19 -1
cmd/zoekt-sourcegraph-indexserver/main.go
··· 137 137 // IndexDir is the index directory to use. 138 138 IndexDir string 139 139 140 + // IndexConcurrency is the number of repositories we index at once. 141 + IndexConcurrency int 142 + 140 143 // Interval is how often we sync with Sourcegraph. 141 144 Interval time.Duration 142 145 ··· 363 366 } 364 367 }() 365 368 366 - // In the current goroutine process the queue forever. 369 + for i := 0; i < s.IndexConcurrency; i++ { 370 + go s.processQueue() 371 + } 372 + 373 + // block forever 374 + select {} 375 + } 376 + 377 + func (s *Server) processQueue() { 367 378 for { 368 379 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 369 380 time.Sleep(time.Second) ··· 949 960 root string 950 961 interval time.Duration 951 962 index string 963 + indexConcurrency int64 952 964 listen string 953 965 hostname string 954 966 cpuFraction float64 ··· 969 981 fs.DurationVar(&rc.mergeInterval, "merge_interval", time.Hour, "run merge this often") 970 982 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 971 983 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 984 + fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of concurrent index jobs to run.") 972 985 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 973 986 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 974 987 fs.StringVar(&rc.hostname, "hostname", hostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") ··· 1093 1106 } 1094 1107 } 1095 1108 1109 + if conf.indexConcurrency < 1 { 1110 + conf.indexConcurrency = 1 1111 + } 1112 + 1096 1113 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1097 1114 if cpuCount < 1 { 1098 1115 cpuCount = 1 ··· 1101 1118 return &Server{ 1102 1119 Sourcegraph: sg, 1103 1120 IndexDir: conf.index, 1121 + IndexConcurrency: int(conf.indexConcurrency), 1104 1122 Interval: conf.interval, 1105 1123 VacuumInterval: conf.vacuumInterval, 1106 1124 MergeInterval: conf.mergeInterval,