fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

Indexing: limit shard parallelism when index concurrency is set (#699)

By default, `zoekt-sourcegraph-indexserver` builds one repo at a time. For each
repo, shards are built in parallel using a number of threads equal to available
CPUs. There are two ways to adjust the indexing concurrency:
1. Passing `cpu_fraction`, which limits the available CPUs for parallel shard
building
2. Passing `index_concurrency` (or setting the `SRC_INDEX_CONCURRENCY`
environment variable), to index more than one repo at once

If you set `index_concurrency` to some number greater than 1, then indexing
will use more threads than available CPUs. This seems undesirable, especially
if you set `cpu_fraction`, since you'd expect that to put an upper bound on CPU
usage.

This changes the shard-level parallelism to `available CPUs / index_concurrency`
(rounded down), to bound the CPU usage as expected.

+61 -8
+8 -5
cmd/zoekt-sourcegraph-indexserver/main.go
··· 641 641 } 642 642 643 643 func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 644 + parallelism := math.Ceil(float64(s.CPUCount) / float64(s.IndexConcurrency)) 644 645 return &indexArgs{ 645 646 IndexOptions: opts, 646 647 647 648 IndexDir: s.IndexDir, 648 - Parallelism: s.CPUCount, 649 + Parallelism: int(parallelism), 649 650 650 651 Incremental: true, 651 652 ··· 1409 1410 } 1410 1411 } 1411 1412 1412 - if conf.indexConcurrency < 1 { 1413 - conf.indexConcurrency = 1 1414 - } 1415 - 1416 1413 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1417 1414 if cpuCount < 1 { 1418 1415 cpuCount = 1 1416 + } 1417 + 1418 + if conf.indexConcurrency < 1 { 1419 + conf.indexConcurrency = 1 1420 + } else if conf.indexConcurrency > int64(cpuCount) { 1421 + conf.indexConcurrency = int64(cpuCount) 1419 1422 } 1420 1423 1421 1424 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
+53 -3
cmd/zoekt-sourcegraph-indexserver/main_test.go
··· 33 33 } 34 34 35 35 s := &Server{ 36 - Sourcegraph: newSourcegraphClient(root, "", WithBatchSize(0)), 37 - IndexDir: "/testdata/index", 38 - CPUCount: 6, 36 + Sourcegraph: newSourcegraphClient(root, "", WithBatchSize(0)), 37 + IndexDir: "/testdata/index", 38 + CPUCount: 6, 39 + IndexConcurrency: 1, 39 40 } 40 41 want := &indexArgs{ 41 42 IndexOptions: IndexOptions{ ··· 49 50 got := s.indexArgs(IndexOptions{Name: "testName"}) 50 51 if !cmp.Equal(got, want) { 51 52 t.Errorf("mismatch (-want +got):\n%s", cmp.Diff(want, got)) 53 + } 54 + } 55 + 56 + func TestServer_parallelism(t *testing.T) { 57 + root, err := url.Parse("http://api.test") 58 + if err != nil { 59 + t.Fatal(err) 60 + } 61 + 62 + cases := []struct { 63 + name string 64 + cpuCount int 65 + indexConcurrency int 66 + wantParallelism int 67 + }{ 68 + { 69 + name: "CPU count divides evenly", 70 + cpuCount: 16, 71 + indexConcurrency: 2, 72 + wantParallelism: 8, 73 + }, 74 + { 75 + name: "round parallelism up", 76 + cpuCount: 4, 77 + indexConcurrency: 3, 78 + wantParallelism: 2, 79 + }, 80 + { 81 + name: "no shard level parallelism", 82 + cpuCount: 4, 83 + indexConcurrency: 4, 84 + wantParallelism: 1, 85 + }, 86 + } 87 + 88 + for _, tt := range cases { 89 + t.Run(tt.name, func(t *testing.T) { 90 + s := &Server{ 91 + Sourcegraph: newSourcegraphClient(root, "", WithBatchSize(0)), 92 + IndexDir: "/testdata/index", 93 + CPUCount: tt.cpuCount, 94 + IndexConcurrency: tt.indexConcurrency, 95 + } 96 + 97 + got := s.indexArgs(IndexOptions{Name: "testName"}) 98 + if !cmp.Equal(got.Parallelism, tt.wantParallelism) { 99 + t.Errorf("mismatch, want: %d, got: %d", tt.wantParallelism, got.Parallelism) 100 + } 101 + }) 52 102 } 53 103 } 54 104