fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package shards
16
17import (
18 "context"
19 "fmt"
20 "log"
21 "math"
22 "os"
23 "runtime"
24 "runtime/debug"
25 "slices"
26 "sort"
27 "strconv"
28 "sync"
29 "time"
30
31 "github.com/sourcegraph/zoekt/index"
32 "golang.org/x/sync/semaphore"
33
34 "github.com/prometheus/client_golang/prometheus"
35 "github.com/prometheus/client_golang/prometheus/promauto"
36 "go.uber.org/atomic"
37
38 "github.com/sourcegraph/zoekt"
39 "github.com/sourcegraph/zoekt/internal/tenant/systemtenant"
40 "github.com/sourcegraph/zoekt/internal/trace"
41 "github.com/sourcegraph/zoekt/query"
42)
43
44var (
45 metricShardsLoaded = promauto.NewGauge(prometheus.GaugeOpts{
46 Name: "zoekt_shards_loaded",
47 Help: "The number of shards currently loaded",
48 })
49 metricShardsLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
50 Name: "zoekt_shards_loaded_total",
51 Help: "The total number of shards loaded",
52 })
53 metricShardsLoadFailedTotal = promauto.NewCounter(prometheus.CounterOpts{
54 Name: "zoekt_shards_load_failed_total",
55 Help: "The total number of shard loads that failed",
56 })
57
58 metricSearchRunning = promauto.NewGauge(prometheus.GaugeOpts{
59 Name: "zoekt_search_running",
60 Help: "The number of concurrent search requests running",
61 })
62 metricSearchShardRunning = promauto.NewGauge(prometheus.GaugeOpts{
63 Name: "zoekt_search_shard_running",
64 Help: "The number of concurrent search requests in a shard running",
65 })
66 metricSearchFailedTotal = promauto.NewCounter(prometheus.CounterOpts{
67 Name: "zoekt_search_failed_total",
68 Help: "The total number of search requests that failed",
69 })
70 metricSearchDuration = promauto.NewHistogram(prometheus.HistogramOpts{
71 Name: "zoekt_search_duration_seconds",
72 Help: "The duration a search request took in seconds",
73 Buckets: prometheus.DefBuckets, // DefBuckets good for service timings
74 })
75
76 // A Counter per Stat. Name should match field in zoekt.Stats.
77 metricSearchContentBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
78 Name: "zoekt_search_content_loaded_bytes_total",
79 Help: "Total amount of I/O for reading contents",
80 })
81 metricSearchIndexBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
82 Name: "zoekt_search_index_loaded_bytes_total",
83 Help: "Total amount of I/O for reading from index",
84 })
85 metricSearchCrashesTotal = promauto.NewCounter(prometheus.CounterOpts{
86 Name: "zoekt_search_crashes_total",
87 Help: "Total number of search shards that had a crash",
88 })
89 metricSearchFileCountTotal = promauto.NewCounter(prometheus.CounterOpts{
90 Name: "zoekt_search_file_count_total",
91 Help: "Total number of files containing a match",
92 })
93 metricSearchShardFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{
94 Name: "zoekt_search_shard_files_considered_total",
95 Help: "Total number of files in shards that we considered",
96 })
97 metricSearchFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{
98 Name: "zoekt_search_files_considered_total",
99 Help: "Total files that we evaluated. Equivalent to files for which all atom matches (including negations) evaluated to true",
100 })
101 metricSearchFilesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
102 Name: "zoekt_search_files_loaded_total",
103 Help: "Total files for which we loaded file content to verify substring matches",
104 })
105 metricSearchFilesSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{
106 Name: "zoekt_search_files_skipped_total",
107 Help: "Total candidate files whose contents weren't examined because we gathered enough matches",
108 })
109 metricSearchShardsSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{
110 Name: "zoekt_search_shards_skipped_total",
111 Help: "Total shards that we did not process because a query was canceled",
112 })
113 metricSearchMatchCountTotal = promauto.NewCounter(prometheus.CounterOpts{
114 Name: "zoekt_search_match_count_total",
115 Help: "Total number of non-overlapping matches",
116 })
117 metricSearchNgramMatchesTotal = promauto.NewCounter(prometheus.CounterOpts{
118 Name: "zoekt_search_ngram_matches_total",
119 Help: "Total number of candidate matches as a result of searching ngrams",
120 })
121 metricSearchNgramLookupsTotal = promauto.NewCounter(prometheus.CounterOpts{
122 Name: "zoekt_search_ngram_lookups_total",
123 Help: "Total number of times we accessed an ngram in the index",
124 })
125 metricSearchRegexpsConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{
126 Name: "zoekt_search_regexps_considered_total",
127 Help: "Total number of times regexp was called on files that we evaluated",
128 })
129
130 metricListRunning = promauto.NewGauge(prometheus.GaugeOpts{
131 Name: "zoekt_list_running",
132 Help: "The number of concurrent list requests running",
133 })
134 metricListShardRunning = promauto.NewGauge(prometheus.GaugeOpts{
135 Name: "zoekt_list_shard_running",
136 Help: "The number of concurrent list requests in a shard running",
137 })
138 metricShardsBatchReplaceDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{
139 Name: "zoekt_shards_batch_replace_duration_seconds",
140 Help: "The time it takes to replace a batch of Searchers.",
141 Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30},
142 })
143 metricListAllRepos = promauto.NewGauge(prometheus.GaugeOpts{
144 Name: "zoekt_list_all_stats_repos",
145 Help: "The last List(true) value for RepoStats.Repos. Repos is used for aggregrating the number of repositories.",
146 })
147 metricListAllShards = promauto.NewGauge(prometheus.GaugeOpts{
148 Name: "zoekt_list_all_stats_shards",
149 Help: "The last List(true) value for RepoStats.Shards. Shards is the total number of search shards.",
150 })
151 metricListAllDocuments = promauto.NewGauge(prometheus.GaugeOpts{
152 Name: "zoekt_list_all_stats_documents",
153 Help: "The last List(true) value for RepoStats.Documents. Documents holds the number of documents or files.",
154 })
155 metricListAllIndexBytes = promauto.NewGauge(prometheus.GaugeOpts{
156 Name: "zoekt_list_all_stats_index_bytes",
157 Help: "The last List(true) value for RepoStats.IndexBytes. IndexBytes is the amount of RAM used for index overhead.",
158 })
159 metricListAllContentBytes = promauto.NewGauge(prometheus.GaugeOpts{
160 Name: "zoekt_list_all_stats_content_bytes",
161 Help: "The last List(true) value for RepoStats.ContentBytes. ContentBytes is the amount of RAM used for raw content.",
162 })
163 metricListAllNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
164 Name: "zoekt_list_all_stats_new_lines_count",
165 Help: "The last List(true) value for RepoStats.NewLinesCount.",
166 })
167 metricListAllDefaultBranchNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
168 Name: "zoekt_list_all_stats_default_branch_new_lines_count",
169 Help: "The last List(true) value for RepoStats.DefaultBranchNewLinesCount.",
170 })
171 metricListAllOtherBranchesNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
172 Name: "zoekt_list_all_stats_other_branches_new_lines_count",
173 Help: "The last List(true) value for RepoStats.OtherBranchesNewLinesCount.",
174 })
175)
176
177type rankedShard struct {
178 zoekt.Searcher
179
180 priority float64 // maximum priority across all repos in the shard
181
182 // We have out of band ranking on compound shards which can change even if
183 // the shard file does not. So we compute a rank in getShards. We store
184 // repos here to avoid the cost of List in the search request path.
185 //
186 // repos is nil only if that call failed.
187 repos []*zoekt.Repository
188}
189
190// loaded stores the state we compute when updating the state of shards from
191// disk.
192type loaded struct {
193 // shards is the currently loaded shards sorted by decreasing rank and
194 // should not be mutated.
195 shards []*rankedShard
196
197 // ready is true if sharded searcher has finished loading all initial
198 // shards on startup.
199 ready bool
200}
201
202type shardedSearcher struct {
203 // Limit the number of parallel queries. Since searching is
204 // CPU bound, we can't do better than #CPU queries in
205 // parallel. If we do so, we just create more memory
206 // pressure.
207 sched scheduler
208
209 mu sync.Mutex // protects writes to shards
210 shards map[string]*rankedShard
211
212 ready atomic.Bool
213 ranked atomic.Value
214}
215
216func newShardedSearcher(n int64) *shardedSearcher {
217 ss := &shardedSearcher{
218 shards: make(map[string]*rankedShard),
219 sched: newScheduler(n),
220 }
221 return ss
222}
223
224// NewDirectorySearcher returns a searcher instance that loads all
225// shards corresponding to a glob into memory.
226func NewDirectorySearcher(dir string) (zoekt.Streamer, error) {
227 return newDirectorySearcher(dir, true)
228}
229
230// NewDirectorySearcherFast is like NewDirectorySearcher, but does not block
231// on the initial loading of shards.
232//
233// This exists since in the case of zoekt-webserver we are happy with having
234// partial availability since that is better than no availability on large
235// instances.
236func NewDirectorySearcherFast(dir string) (zoekt.Streamer, error) {
237 return newDirectorySearcher(dir, false)
238}
239
240func newDirectorySearcher(dir string, waitUntilReady bool) (zoekt.Streamer, error) {
241 ss := newShardedSearcher(int64(runtime.GOMAXPROCS(0)))
242 tl := &loader{
243 ss: ss,
244 }
245 dw, err := newDirectoryWatcher(dir, tl)
246 if err != nil {
247 return nil, err
248 }
249
250 if waitUntilReady {
251 if err := dw.WaitUntilReady(); err != nil {
252 return nil, err
253 }
254 }
255
256 ds := &directorySearcher{
257 Streamer: ss,
258 directoryWatcher: dw,
259 }
260
261 return &typeRepoSearcher{Streamer: ds}, nil
262}
263
264type directorySearcher struct {
265 zoekt.Streamer
266
267 directoryWatcher *DirectoryWatcher
268}
269
270func (s *directorySearcher) Close() {
271 // We need to Stop directoryWatcher first since it calls load/unload on
272 // Searcher.
273 s.directoryWatcher.Stop()
274 s.Streamer.Close()
275}
276
277type loader struct {
278 ss *shardedSearcher
279}
280
281func (tl *loader) load(keys ...string) {
282 // This is called with all keys on startup, so once this function has
283 // finished running shardedSearcher will be ready.
284 defer tl.ss.markReady()
285
286 if len(keys) == 0 {
287 // If there's nothing to load, we exit early here, but we want to mark
288 // ourselves as ready.
289 return
290 }
291
292 var (
293 mu sync.Mutex // synchronizes writes to the shards map
294 wg sync.WaitGroup // used to wait for all shards to load
295 sem = semaphore.NewWeighted(int64(runtime.GOMAXPROCS(0)))
296 loadedShards = make(map[string]zoekt.Searcher)
297 )
298
299 publishLoaded := func() {
300 mu.Lock()
301 chunk := loadedShards
302 loadedShards = make(map[string]zoekt.Searcher)
303 mu.Unlock()
304 tl.ss.replace(chunk)
305 }
306
307 log.Printf("[INFO] loading %d shard(s): %s", len(keys), humanTruncateList(keys, 5))
308
309 lastProgress := time.Now()
310 for i, key := range keys {
311 // If taking a while to start-up occasionally give a progress message
312 if time.Since(lastProgress) > 5*time.Second {
313 log.Printf("[INFO] still need to load %d shards...", len(keys)-i)
314 lastProgress = time.Now()
315
316 publishLoaded()
317 }
318
319 _ = sem.Acquire(context.Background(), 1)
320 wg.Add(1)
321
322 go func(key string) {
323 defer sem.Release(1)
324 defer wg.Done()
325
326 shard, err := loadShard(key)
327 if err != nil {
328 metricShardsLoadFailedTotal.Inc()
329 log.Printf("[ERROR] reloading: %s, err %v ", key, err)
330 return
331 }
332 metricShardsLoadedTotal.Inc()
333
334 mu.Lock()
335 loadedShards[key] = shard
336 mu.Unlock()
337 }(key)
338 }
339
340 wg.Wait()
341
342 publishLoaded()
343}
344
345func (tl *loader) drop(keys ...string) {
346 shards := make(map[string]zoekt.Searcher, len(keys))
347 for _, key := range keys {
348 shards[key] = nil
349 }
350 tl.ss.replace(shards)
351}
352
353func (ss *shardedSearcher) String() string {
354 return "shardedSearcher"
355}
356
357// Close closes references to open files. It may be called only once.
358func (ss *shardedSearcher) Close() {
359 ss.mu.Lock()
360 shards := make(map[string]zoekt.Searcher, len(ss.shards))
361 for k := range ss.shards {
362 shards[k] = nil
363 }
364 ss.mu.Unlock()
365
366 ss.replace(shards)
367}
368
369func selectRepoSet(shards []*rankedShard, q query.Q) ([]*rankedShard, query.Q) {
370 and, ok := q.(*query.And)
371 if ok {
372 return doSelectRepoSet(shards, and)
373 }
374
375 // We have queries which look like (reposet ...) and we want to do the same
376 // optimizations. To simplify we just always wrap the query in And and then
377 // on the return value call Simplify to unwrap. In particular this is
378 // important for List calls.
379 and = &query.And{Children: []query.Q{q}}
380 shards, q = doSelectRepoSet(shards, and)
381 return shards, query.Simplify(q)
382}
383
384func doSelectRepoSet(shards []*rankedShard, and *query.And) ([]*rankedShard, query.Q) {
385 // (and (reposet ...) (q))
386 // (and true (q)) with a filtered shards
387 // (and false) // noop
388
389 // (and (repobranches ...) (q))
390 // (and (repobranches ...) (q))
391
392 // Note: we also support (and (repo ...) (q)) even though sourcegraph does
393 // not generate those sorts of queries. This is to support manual testing.
394
395 hasReposForPredicate := func(pred func(repo *zoekt.Repository) bool) func(repos []*zoekt.Repository) (any, all bool) {
396 return func(repos []*zoekt.Repository) (any, all bool) {
397 any = false
398 all = true
399 for _, repo := range repos {
400 b := pred(repo)
401 any = any || b
402 all = all && b
403 }
404 return any, all
405 }
406 }
407
408 for i, c := range and.Children {
409 var setSize int
410 var hasRepos func([]*zoekt.Repository) (bool, bool)
411 switch setQuery := c.(type) {
412 case *query.RepoSet:
413 setSize = len(setQuery.Set)
414 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
415 return setQuery.Set[repo.Name]
416 })
417 case *query.RepoIDs:
418 setSize = int(setQuery.Repos.GetCardinality())
419 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
420 return setQuery.Repos.Contains(repo.ID)
421 })
422 case *query.Repo:
423 setSize = 0
424 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
425 return setQuery.Regexp.MatchString(repo.Name)
426 })
427 case *query.BranchesRepos:
428 for _, br := range setQuery.List {
429 setSize += int(br.Repos.GetCardinality())
430 }
431
432 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
433 for _, br := range setQuery.List {
434 if br.Repos.Contains(repo.ID) {
435 return true
436 }
437 }
438 return false
439 })
440 default:
441 continue
442 }
443
444 // setSize may be larger than the number of shards we have. The size of
445 // filtered is bounded by min(len(set), len(shards))
446 if setSize > len(shards) {
447 setSize = len(shards)
448 }
449
450 filtered := make([]*rankedShard, 0, setSize)
451 filteredAll := true
452
453 for _, s := range shards {
454 if s.repos == nil {
455 // repos is nil if we failed to List the shard. This shouldn't
456 // happen, but if it does we don't know what is in it and must search
457 // it without simplifying the query.
458 filtered = append(filtered, s)
459 filteredAll = false
460 } else if any, all := hasRepos(s.repos); any {
461 filtered = append(filtered, s)
462 filteredAll = filteredAll && all
463 }
464 }
465
466 // We don't need to adjust the query since we are returning an empty set
467 // of shards to search.
468 if len(filtered) == 0 {
469 return filtered, and
470 }
471
472 // We can't simplify the query since we are searching shards which contain
473 // repos we aren't supposed to search.
474 if !filteredAll {
475 return filtered, and
476 }
477
478 // We don't want to mutate the original and, so we clone it before
479 // mutating it.
480 and = &query.And{Children: slices.Clone(and.Children)}
481
482 // This optimization allows us to avoid the work done by
483 // indexData.simplify for each shard.
484 //
485 // For example if our query is (and (reposet foo bar) (content baz))
486 // then at this point filtered is [foo bar] and q is the same. For each
487 // shard indexData.simplify will simplify to (and true (content baz)) ->
488 // (content baz). This work can be done now once, rather than per shard.
489 switch c := c.(type) {
490 case *query.RepoSet, *query.RepoIDs, *query.Repo:
491 and.Children[i] = &query.Const{Value: true}
492 return filtered, query.Simplify(and)
493
494 case *query.BranchesRepos:
495 // We can only replace if all the repos want the same branches. We
496 // simplify and just check that we are requesting 1 branch. The common
497 // case is just asking for HEAD, so this should be effective.
498 if len(c.List) != 1 {
499 return filtered, and
500 }
501
502 // Every repo wants the same branches, so we can replace RepoBranches
503 // with a list of branch queries.
504 and.Children[i] = &query.Branch{Pattern: c.List[0].Branch, Exact: true}
505 return filtered, query.Simplify(and)
506 }
507
508 // Stop after first RepoSet, otherwise we might append duplicate
509 // shards to `filtered`
510 return filtered, and
511 }
512
513 return shards, and
514}
515
516func (ss *shardedSearcher) Search(ctx context.Context, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) {
517 tr, ctx := trace.New(ctx, "shardedSearcher.Search", "")
518 tr.LazyLog(q, true)
519 tr.LazyPrintf("opts: %+v", opts)
520 defer func() {
521 if sr != nil {
522 tr.LazyPrintf("num files: %d", len(sr.Files))
523 tr.LazyPrintf("stats: %+v", sr.Stats)
524 }
525 if err != nil {
526 tr.LazyPrintf("error: %v", err)
527 tr.SetError(err)
528 }
529 tr.Finish()
530 }()
531 ctx, cancel := context.WithCancel(ctx)
532 defer cancel()
533
534 collectSender := newCollectSender(opts)
535
536 start := time.Now()
537 proc, err := ss.sched.Acquire(ctx)
538 if err != nil {
539 return nil, err
540 }
541 defer proc.Release()
542 tr.LazyPrintf("acquired process")
543
544 wait := time.Since(start)
545 start = time.Now()
546
547 loaded := ss.getLoaded()
548 done, err := streamSearch(ctx, proc, q, opts, loaded.shards, collectSender)
549 defer done()
550 if err != nil {
551 return nil, err
552 }
553
554 aggregate, ok := collectSender.Done()
555 if !ok {
556 aggregate = &zoekt.SearchResult{
557 RepoURLs: map[string]string{},
558 LineFragments: map[string]string{},
559 }
560 }
561
562 copyFiles(aggregate)
563
564 if !loaded.ready {
565 // We may have missed results due to not being fully loaded.
566 aggregate.Stats.Crashes++
567 }
568
569 aggregate.Stats.Wait = wait
570 aggregate.Stats.Duration = time.Since(start)
571
572 return aggregate, nil
573}
574
575func (ss *shardedSearcher) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, sender zoekt.Sender) (err error) {
576 tr, ctx := trace.New(ctx, "shardedSearcher.StreamSearch", "")
577 defer func() {
578 if err != nil {
579 tr.LazyPrintf("error: %v", err)
580 tr.SetError(err)
581 }
582 tr.Finish()
583 }()
584
585 start := time.Now()
586 proc, err := ss.sched.Acquire(ctx)
587 if err != nil {
588 return err
589 }
590 defer proc.Release()
591 tr.LazyPrintf("acquired process")
592
593 loaded := ss.getLoaded()
594 shards := loaded.shards
595
596 maxPendingPriority := math.Inf(-1)
597 if len(shards) > 0 {
598 maxPendingPriority = shards[0].priority
599 }
600
601 stillLoadingCrashes := 0
602 if !loaded.ready {
603 // We may have missed results due to not being fully loaded.
604 stillLoadingCrashes++
605 }
606
607 sender.Send(&zoekt.SearchResult{
608 Stats: zoekt.Stats{
609 Crashes: stillLoadingCrashes,
610 Wait: time.Since(start),
611 },
612 Progress: zoekt.Progress{
613 MaxPendingPriority: maxPendingPriority,
614 },
615 })
616
617 // Matches flow from the shards up the stack in the following order:
618 //
619 // 1. Search shards
620 // 2. flushCollectSender (aggregate)
621 // 3. limitSender (limit)
622 // 4. copyFileSender (copy)
623 //
624 // For streaming, the wrapping has to happen in the inverted order.
625 sender = copyFileSender(sender)
626
627 if truncator, hasLimits := index.NewDisplayTruncator(opts); hasLimits {
628 var cancel context.CancelFunc
629 ctx, cancel = context.WithCancel(ctx)
630 defer cancel()
631 sender = limitSender(cancel, sender, truncator)
632 }
633
634 sender, flush := newFlushCollectSender(opts, sender)
635
636 done, err := streamSearch(ctx, proc, q, opts, shards, sender)
637
638 // Even though streaming is done, we may have results sitting in a buffer we
639 // need to flush. So we need to send those before calling done.
640 flush()
641 done()
642
643 return err
644}
645
646// streamSearch is an internal helper since both Search and StreamSearch are
647// largely similar.
648//
649// done must always be called, even if err is non-nil. The SearchResults sent
650// via sender contain references to the underlying mmap data that the garbage
651// collector can't see. Calling done informs the garbage collector it is free
652// to collect those shards. The caller must call copyFiles on any
653// SearchResults it returns/streams out before calling done.
654func streamSearch(ctx context.Context, proc *process, q query.Q, opts *zoekt.SearchOptions, shards []*rankedShard, sender zoekt.Sender) (done func(), err error) {
655 tr, ctx := trace.New(ctx, "shardedSearcher.streamSearch", "")
656 overallStart := time.Now()
657 metricSearchRunning.Inc()
658 defer func() {
659 metricSearchRunning.Dec()
660 metricSearchDuration.Observe(time.Since(overallStart).Seconds())
661 if err != nil {
662 metricSearchFailedTotal.Inc()
663
664 tr.LazyPrintf("error: %v", err)
665 tr.SetError(err)
666 }
667 tr.Finish()
668 }()
669
670 // Select the subset of shards that we will search over for the given query.
671 {
672 beforeLen := len(shards)
673 beforeQ := q
674 shards, q = selectRepoSet(shards, q)
675 tr.LazyPrintf("selectRepoSet shards=%d->%d q=%s->%s", beforeLen, len(shards), beforeQ, q)
676 }
677
678 if len(shards) == 0 {
679 return func() {}, nil
680 }
681
682 var cancel context.CancelFunc
683 if opts.MaxWallTime == 0 {
684 ctx, cancel = context.WithCancel(ctx)
685 } else {
686 ctx, cancel = context.WithTimeout(ctx, opts.MaxWallTime)
687 }
688
689 defer cancel()
690
691 // We set the number of workers to GOMAXPROCS, or the number of shards,
692 // whichever is smaller.
693 workers := runtime.GOMAXPROCS(0)
694 if workers > len(shards) {
695 workers = len(shards)
696 }
697
698 type result struct {
699 priority float64
700 *zoekt.SearchResult
701 err error
702 }
703
704 var (
705 // buffered channels to continue searching when sending back results
706 // takes a while / blocks. The maximum pending result set is workers * 2.
707 results = make(chan *result, workers)
708 search = make(chan *rankedShard, workers)
709 wg sync.WaitGroup
710 )
711
712 // Start workers that receive shards from the search channel, search them,
713 // and send the results to the results channel. This process is repeated
714 // until the search channel is closed.
715 //
716 // Note: Making "search" a buffered channel has the effect of limiting the number of parallel shard searches.
717 // Since searching is mostly CPU bound, limiting parallel shard searches also reduces the peak working set.
718 wg.Add(workers)
719 for i := 0; i < workers; i++ {
720 go func() {
721 defer wg.Done()
722 for s := range search {
723 sr, err := searchOneShard(ctx, s, q, opts)
724 r := &result{priority: s.priority, SearchResult: sr, err: err}
725 results <- r
726 }
727 }()
728 }
729
730 go func() {
731 wg.Wait()
732 close(results)
733 }()
734
735 var (
736 pending = make(prioritySlice, 0, workers)
737 shard = 0
738 next = shards[shard]
739
740 // We need a separate nil-able reference to the same channel so we can close(search) for the worker
741 // go-routines to finish but also set work to nil in order for the select statement below to ignore
742 // that case when we want to stop a search. This is needed because sending on a closed channel panics.
743 work = search
744 )
745
746 stop := func() {
747 if work != nil {
748 close(search)
749 work = nil
750 next = nil
751 }
752 }
753
754 // tracked so we can stop when we hit TotalMaxMatchCount
755 var totalMatchCount int
756
757search:
758 for {
759 // At the top of each iteration, have the proc associated with this search yield its won "timeslice"
760 // to possibly allow other searches to make progress
761 _ = proc.Yield(ctx) // Note: we let searchOneShard handle context errors
762
763 select {
764 case work <- next: // is there a worker available to search the next shard?
765 pending.append(next.priority)
766
767 shard++
768 if shard == len(shards) {
769 stop()
770 } else {
771 next = shards[shard]
772 }
773 case r, ok := <-results: // is there a result to send back?
774 if !ok {
775 break search
776 }
777
778 // delete this result's priority from pending before computing the new max pending priority
779 pending.remove(r.priority)
780
781 if r.err != nil {
782 // Set final error and stop searching new shards, but consume any pending
783 // search results.
784 stop()
785 err = r.err
786 continue
787 }
788
789 // Update the match count statistics and stop searching new shards if we've
790 // reached the limit set in the options.
791 totalMatchCount += r.SearchResult.Stats.MatchCount
792 if opts.TotalMaxMatchCount > 0 && totalMatchCount > opts.TotalMaxMatchCount {
793 stop()
794 }
795
796 observeMetrics(r.SearchResult)
797
798 r.Priority = r.priority
799 r.MaxPendingPriority = pending.max()
800
801 sendByRepository(r.SearchResult, opts, sender) // send the result back to the client
802 }
803 }
804
805 return func() { runtime.KeepAlive(shards) }, err
806}
807
808// sendByRepository splits a zoekt.SearchResult by repository and calls
809// sender.Send for each batch. Ranking in Sourcegraph expects zoekt.SearchResult
810// to contain results with the same zoekt.SearchResult.priority only.
811//
812// We split by repository instead of by priority because it is easier to set
813// RepoURLs and LineFragments in zoekt.SearchResult.
814func sendByRepository(result *zoekt.SearchResult, opts *zoekt.SearchOptions, sender zoekt.Sender) {
815 if len(result.RepoURLs) <= 1 || len(result.Files) == 0 {
816 index.SortFiles(result.Files)
817 sender.Send(result)
818 return
819 }
820
821 send := func(repoName string, a, b int, stats zoekt.Stats) {
822 index.SortFiles(result.Files[a:b])
823 sender.Send(&zoekt.SearchResult{
824 Stats: stats,
825 Progress: zoekt.Progress{
826 Priority: result.Files[a].RepositoryPriority,
827 MaxPendingPriority: result.MaxPendingPriority,
828 },
829 Files: result.Files[a:b],
830 RepoURLs: map[string]string{repoName: result.RepoURLs[repoName]},
831 LineFragments: map[string]string{repoName: result.LineFragments[repoName]},
832 })
833 }
834
835 var startIndex, endIndex int
836 curRepoID := result.Files[0].RepositoryID
837 curRepoName := result.Files[0].Repository
838
839 fm := zoekt.FileMatch{}
840 for endIndex, fm = range result.Files {
841 if curRepoID != fm.RepositoryID {
842 // Stats must stay aggregate-able, hence we sent the aggregate stats with the
843 // last event.
844 send(curRepoName, startIndex, endIndex, zoekt.Stats{})
845
846 startIndex = endIndex
847 curRepoID = fm.RepositoryID
848 curRepoName = fm.Repository
849 }
850 }
851
852 send(curRepoName, startIndex, endIndex+1, result.Stats)
853}
854
855func observeMetrics(sr *zoekt.SearchResult) {
856 metricSearchContentBytesLoadedTotal.Add(float64(sr.Stats.ContentBytesLoaded))
857 metricSearchIndexBytesLoadedTotal.Add(float64(sr.Stats.IndexBytesLoaded))
858 metricSearchCrashesTotal.Add(float64(sr.Stats.Crashes))
859 metricSearchFileCountTotal.Add(float64(sr.Stats.FileCount))
860 metricSearchShardFilesConsideredTotal.Add(float64(sr.Stats.ShardFilesConsidered))
861 metricSearchFilesConsideredTotal.Add(float64(sr.Stats.FilesConsidered))
862 metricSearchFilesLoadedTotal.Add(float64(sr.Stats.FilesLoaded))
863 metricSearchFilesSkippedTotal.Add(float64(sr.Stats.FilesSkipped))
864 metricSearchShardsSkippedTotal.Add(float64(sr.Stats.ShardsSkipped))
865 metricSearchMatchCountTotal.Add(float64(sr.Stats.MatchCount))
866 metricSearchNgramMatchesTotal.Add(float64(sr.Stats.NgramMatches))
867 metricSearchNgramLookupsTotal.Add(float64(sr.Stats.NgramLookups))
868 metricSearchRegexpsConsideredTotal.Add(float64(sr.Stats.RegexpsConsidered))
869}
870
871func copySlice(src *[]byte) {
872 if *src == nil {
873 return
874 }
875 dst := make([]byte, len(*src))
876 copy(dst, *src)
877 *src = dst
878}
879
880func copyFiles(sr *zoekt.SearchResult) {
881 for i := range sr.Files {
882 copySlice(&sr.Files[i].Content)
883 copySlice(&sr.Files[i].Checksum)
884 for l := range sr.Files[i].LineMatches {
885 copySlice(&sr.Files[i].LineMatches[l].Line)
886 copySlice(&sr.Files[i].LineMatches[l].Before)
887 copySlice(&sr.Files[i].LineMatches[l].After)
888 }
889 for c := range sr.Files[i].ChunkMatches {
890 copySlice(&sr.Files[i].ChunkMatches[c].Content)
891 }
892 }
893}
894
895func searchOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) {
896 metricSearchShardRunning.Inc()
897 defer func() {
898 metricSearchShardRunning.Dec()
899 if e := recover(); e != nil {
900 log.Printf("[ERROR] crashed shard: %s: %#v, %s", s, e, debug.Stack())
901
902 if sr == nil {
903 sr = &zoekt.SearchResult{}
904 }
905 sr.Stats.Crashes = 1
906 }
907 }()
908
909 return s.Search(ctx, q, opts)
910}
911
912type shardListResult struct {
913 rl *zoekt.RepoList
914 err error
915}
916
917func listOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.ListOptions, sink chan shardListResult) {
918 metricListShardRunning.Inc()
919 defer func() {
920 metricListShardRunning.Dec()
921 if r := recover(); r != nil {
922 log.Printf("[ERROR] crashed shard: %s: %s, %s", s.String(), r, debug.Stack())
923 sink <- shardListResult{
924 &zoekt.RepoList{Crashes: 1}, nil,
925 }
926 }
927 }()
928
929 ms, err := s.List(ctx, q, opts)
930 sink <- shardListResult{ms, err}
931}
932
933func (ss *shardedSearcher) List(ctx context.Context, q query.Q, opts *zoekt.ListOptions) (rl *zoekt.RepoList, err error) {
934 tr, ctx := trace.New(ctx, "shardedSearcher.List", "")
935 metricListRunning.Inc()
936 defer func() {
937 metricListRunning.Dec()
938 if rl != nil {
939 tr.LazyPrintf("repos.size=%d reposmap.size=%d crashes=%d stats=%+v", len(rl.Repos), len(rl.ReposMap), rl.Crashes, rl.Stats)
940 }
941 if err != nil {
942 tr.LazyPrintf("error: %v", err)
943 tr.SetError(err)
944 }
945 tr.Finish()
946 }()
947
948 q = query.Simplify(q)
949 isAll := false
950 if c, ok := q.(*query.Const); ok {
951 isAll = c.Value
952 }
953
954 proc, err := ss.sched.Acquire(ctx)
955 if err != nil {
956 return nil, err
957 }
958 defer proc.Release()
959 tr.LazyPrintf("acquired process")
960
961 loaded := ss.getLoaded()
962 shards := loaded.shards
963
964 // Setup what we return now, since we may short circuit if there are no
965 // shards to search.
966 stillLoadingCrashes := 0
967 if !loaded.ready {
968 // We may have missed results due to not being fully loaded.
969 stillLoadingCrashes++
970 }
971 agg := zoekt.RepoList{
972 Crashes: stillLoadingCrashes,
973 ReposMap: zoekt.ReposMap{},
974 Repos: []*zoekt.RepoListEntry{},
975 }
976
977 // PERF: Select the subset of shards that we will search over for the given
978 // query. A common List query only asks for a specific repo, so this is an
979 // important optimization.
980 {
981 beforeLen := len(shards)
982 beforeQ := q
983 shards, q = selectRepoSet(shards, q)
984 tr.LazyPrintf("selectRepoSet shards=%d->%d q=%s->%s", beforeLen, len(shards), beforeQ, q)
985 }
986
987 if len(shards) == 0 {
988 return &agg, nil
989 }
990
991 shardCount := len(shards)
992 all := make(chan shardListResult, shardCount)
993 feeder := make(chan zoekt.Searcher, len(shards))
994 for _, s := range shards {
995 feeder <- s
996 }
997 close(feeder)
998
999 for i := 0; i < runtime.GOMAXPROCS(0); i++ {
1000 go func() {
1001 for s := range feeder {
1002 listOneShard(ctx, s, q, opts, all)
1003 }
1004 }()
1005 }
1006
1007 uniq := map[string]*zoekt.RepoListEntry{}
1008
1009 for range shards {
1010 r := <-all
1011 if r.err != nil {
1012 return nil, r.err
1013 }
1014
1015 agg.Crashes += r.rl.Crashes
1016 agg.Stats.Add(&r.rl.Stats)
1017
1018 for _, r := range r.rl.Repos {
1019 prev, ok := uniq[r.Repository.Name]
1020 if !ok {
1021 cp := *r // We need to copy because we mutate r.Stats when merging duplicates
1022 uniq[r.Repository.Name] = &cp
1023 } else {
1024 prev.Stats.Add(&r.Stats)
1025 }
1026 }
1027
1028 for id, r := range r.rl.ReposMap {
1029 _, ok := agg.ReposMap[id]
1030 if !ok {
1031 agg.ReposMap[id] = r
1032 }
1033 }
1034 }
1035
1036 agg.Repos = make([]*zoekt.RepoListEntry, 0, len(uniq))
1037 for _, r := range uniq {
1038 agg.Repos = append(agg.Repos, r)
1039 }
1040
1041 // Only one of these fields is populated and in all cases the size of that
1042 // field is the number of Repos.
1043 //
1044 // Note: we don't just add individual Stats.Repos since a repository can
1045 // have multiple shards.
1046 agg.Stats.Repos = len(uniq) + len(agg.ReposMap)
1047
1048 if isAll && len(agg.Repos) > 0 {
1049 reportListAllMetrics(agg.Repos)
1050 }
1051
1052 return &agg, nil
1053}
1054
1055func reportListAllMetrics(repos []*zoekt.RepoListEntry) {
1056 var stats zoekt.RepoStats
1057 for _, r := range repos {
1058 stats.Add(&r.Stats)
1059 }
1060
1061 metricListAllRepos.Set(float64(stats.Repos))
1062 metricListAllIndexBytes.Set(float64(stats.IndexBytes))
1063 metricListAllContentBytes.Set(float64(stats.ContentBytes))
1064 metricListAllDocuments.Set(float64(stats.Documents))
1065 metricListAllShards.Set(float64(stats.Shards))
1066 metricListAllNewLinesCount.Set(float64(stats.NewLinesCount))
1067 metricListAllDefaultBranchNewLinesCount.Set(float64(stats.DefaultBranchNewLinesCount))
1068 metricListAllOtherBranchesNewLinesCount.Set(float64(stats.OtherBranchesNewLinesCount))
1069}
1070
1071// getLoaded returns the currently loaded shards. Shared so do not mutate.
1072func (s *shardedSearcher) getLoaded() loaded {
1073 // next commit will store the true value of this, for now we keep the
1074 // backwards compatible behaviour.
1075 ready := s.ready.Load()
1076 // ranked is loaded after ready to avoid a race were ready is true but
1077 // ranked is still not the final set of shards.
1078 ranked, _ := s.ranked.Load().([]*rankedShard)
1079 return loaded{
1080 shards: ranked,
1081 ready: ready,
1082 }
1083}
1084
1085func mkRankedShard(s zoekt.Searcher) *rankedShard {
1086 q := query.Const{Value: true}
1087 // We need to use WithUnsafeContext here, otherwise we cannot return a proper
1088 // rankedShard. On the user request path we use selectRepoSet which relies on
1089 // rankedShard.repos being set.
1090 result, err := s.List(systemtenant.WithUnsafeContext(context.Background()), &q, nil)
1091 if err != nil {
1092 log.Printf("[ERROR] mkRankedShard(%s): failed to cache repository list: %v", s, err)
1093 return &rankedShard{Searcher: s}
1094 }
1095
1096 var (
1097 maxPriority float64
1098 repos = make([]*zoekt.Repository, 0, len(result.Repos))
1099 )
1100 for i := range result.Repos {
1101 repo := &result.Repos[i].Repository
1102 repos = append(repos, repo)
1103 if repo.RawConfig != nil {
1104 priority, _ := strconv.ParseFloat(repo.RawConfig["priority"], 64)
1105 if priority > maxPriority {
1106 maxPriority = priority
1107 }
1108 }
1109 }
1110
1111 return &rankedShard{
1112 Searcher: s,
1113 repos: repos,
1114 priority: maxPriority,
1115 }
1116}
1117
1118// markReady should be called once all shards have been passed into replace on
1119// startup. Once s is marked as ready it stops reporting a Crash in the
1120// response Stats.
1121func (s *shardedSearcher) markReady() {
1122 s.ready.CompareAndSwap(false, true)
1123}
1124
1125func (s *shardedSearcher) replace(shards map[string]zoekt.Searcher) {
1126 if len(shards) == 0 {
1127 return
1128 }
1129
1130 defer func(began time.Time) {
1131 metricShardsBatchReplaceDurationSeconds.Observe(time.Since(began).Seconds())
1132 }(time.Now())
1133
1134 s.mu.Lock()
1135 defer s.mu.Unlock()
1136
1137 for key, shard := range shards {
1138 var r *rankedShard
1139 if shard != nil {
1140 r = mkRankedShard(shard)
1141 }
1142
1143 old := s.shards[key]
1144 if shard == nil {
1145 delete(s.shards, key)
1146 } else {
1147 s.shards[key] = r
1148 }
1149
1150 if old != nil && old.Searcher != nil {
1151 // _ ___ /^^\ /^\ /^^\_
1152 // _ _@)@) \ ,,/ '` ~ `'~~ ', `\.
1153 // _/o\_ _ _ _/~`.`...'~\ ./~~..,'`','',.,' ' ~:
1154 // / `,'.~,~.~ . , . , ~|, ,/ .,' , ,. .. ,,. `, ~\_
1155 // ( ' _' _ '_` _ ' . , `\_/ .' ..' ' ` ` `.. `, \_
1156 // ~V~ V~ V~ V~ ~\ ` ' . ' , ' .,.,''`.,.''`.,.``. ', \_
1157 // _/\ /\ /\ /\_/, . ' , `_/~\_ .' .,. ,, , _/~\_ `. `. '., \_
1158 // < ~ ~ '~`'~'`, ., . `_: ::: \_ ' `_/ ::: \_ `.,' . ', \_
1159 // \ ' `_ '`_ _ ',/ _::_::_ \ _ _/ _::_::_ \ `.,'.,`., \-,-,-,_,_,
1160 // `'~~ `'~~ `'~~ `'~~ \(_)(_)(_)/ `~~' \(_)(_)(_)/ ~'`\_.._,._,'_;_;_;_;_;
1161 //
1162 // We can't just call Close now, because there may be ongoing searches
1163 // which have old in the shards list. Previously we used an exclusive
1164 // lock to guarantee there were no concurrent searches. However, that
1165 // led to blocking on the read path.
1166 //
1167 // We could introduce granular locking per rankedShard to know when
1168 // there are no more references. However, this becomes tricky in
1169 // practice. Instead we rely on the garbage collector noticing old is no
1170 // longer used. We take care in our searchers to runtime.KeepAlive until
1171 // we have stopped referencing the underling mmap data.
1172 runtime.SetFinalizer(old, func(r *rankedShard) {
1173 r.Close()
1174 })
1175 }
1176 }
1177
1178 ranked := make([]*rankedShard, 0, len(s.shards))
1179 for _, r := range s.shards {
1180 ranked = append(ranked, r)
1181 }
1182
1183 sort.Slice(ranked, func(i, j int) bool {
1184 priorityDiff := ranked[i].priority - ranked[j].priority
1185 if priorityDiff != 0 {
1186 return priorityDiff > 0
1187 }
1188 if len(ranked[i].repos) == 0 || len(ranked[j].repos) == 0 {
1189 // Protect against empty names which can happen if we fail to List or
1190 // the shard is full of tombstones. Prefer the shard which has names.
1191 return len(ranked[i].repos) >= len(ranked[j].repos)
1192 }
1193 return ranked[i].repos[0].Name < ranked[j].repos[0].Name
1194 })
1195
1196 s.ranked.Store(ranked)
1197
1198 metricShardsLoaded.Set(float64(len(ranked)))
1199}
1200
1201func loadShard(fn string) (zoekt.Searcher, error) {
1202 f, err := os.Open(fn)
1203 if err != nil {
1204 return nil, err
1205 }
1206
1207 iFile, err := index.NewIndexFile(f)
1208 if err != nil {
1209 return nil, err
1210 }
1211 s, err := index.NewSearcher(iFile)
1212 if err != nil {
1213 iFile.Close()
1214 return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err)
1215 }
1216
1217 return s, nil
1218}
1219
1220// prioritySlice is a trivial implementation of an array that provides three
1221// things: appending a value, removing a value, and getting the array's max.
1222// Operations take O(n) time, which is acceptable because N is restricted to
1223// GOMAXPROCS (i.e., number of cpu cores) by the shardedSearcher interface.
1224type prioritySlice []float64
1225
1226func (p *prioritySlice) append(pri float64) {
1227 *p = append(*p, pri)
1228}
1229
1230func (p *prioritySlice) remove(pri float64) {
1231 for i, opri := range *p {
1232 if opri == pri {
1233 if i != len(*p)-1 {
1234 // swap to make this element the tail
1235 (*p)[i] = (*p)[len(*p)-1]
1236 }
1237 // pop the end off
1238 *p = (*p)[:len(*p)-1]
1239 break
1240 }
1241 }
1242}
1243
1244func (p *prioritySlice) max() float64 {
1245 // remove() and max() could be combined, but this is easier to read and
1246 // the expected performance difference from the extra lock and loop is
1247 // almost certainly irrelevant.
1248 maxPri := math.Inf(-1)
1249 for _, pri := range *p {
1250 if pri > maxPri {
1251 maxPri = pri
1252 }
1253 }
1254 return maxPri
1255}