fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package shards
16
17import (
18 "context"
19 "fmt"
20 "log"
21 "math"
22 "os"
23 "runtime"
24 "runtime/debug"
25 "slices"
26 "sort"
27 "strconv"
28 "sync"
29 "time"
30
31 "golang.org/x/sync/semaphore"
32
33 "github.com/prometheus/client_golang/prometheus"
34 "github.com/prometheus/client_golang/prometheus/promauto"
35 "go.uber.org/atomic"
36
37 "github.com/sourcegraph/zoekt"
38 "github.com/sourcegraph/zoekt/query"
39 "github.com/sourcegraph/zoekt/trace"
40)
41
42var (
43 metricShardsLoaded = promauto.NewGauge(prometheus.GaugeOpts{
44 Name: "zoekt_shards_loaded",
45 Help: "The number of shards currently loaded",
46 })
47 metricShardsLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
48 Name: "zoekt_shards_loaded_total",
49 Help: "The total number of shards loaded",
50 })
51 metricShardsLoadFailedTotal = promauto.NewCounter(prometheus.CounterOpts{
52 Name: "zoekt_shards_load_failed_total",
53 Help: "The total number of shard loads that failed",
54 })
55
56 metricSearchRunning = promauto.NewGauge(prometheus.GaugeOpts{
57 Name: "zoekt_search_running",
58 Help: "The number of concurrent search requests running",
59 })
60 metricSearchShardRunning = promauto.NewGauge(prometheus.GaugeOpts{
61 Name: "zoekt_search_shard_running",
62 Help: "The number of concurrent search requests in a shard running",
63 })
64 metricSearchFailedTotal = promauto.NewCounter(prometheus.CounterOpts{
65 Name: "zoekt_search_failed_total",
66 Help: "The total number of search requests that failed",
67 })
68 metricSearchDuration = promauto.NewHistogram(prometheus.HistogramOpts{
69 Name: "zoekt_search_duration_seconds",
70 Help: "The duration a search request took in seconds",
71 Buckets: prometheus.DefBuckets, // DefBuckets good for service timings
72 })
73
74 // A Counter per Stat. Name should match field in zoekt.Stats.
75 metricSearchContentBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
76 Name: "zoekt_search_content_loaded_bytes_total",
77 Help: "Total amount of I/O for reading contents",
78 })
79 metricSearchIndexBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
80 Name: "zoekt_search_index_loaded_bytes_total",
81 Help: "Total amount of I/O for reading from index",
82 })
83 metricSearchCrashesTotal = promauto.NewCounter(prometheus.CounterOpts{
84 Name: "zoekt_search_crashes_total",
85 Help: "Total number of search shards that had a crash",
86 })
87 metricSearchFileCountTotal = promauto.NewCounter(prometheus.CounterOpts{
88 Name: "zoekt_search_file_count_total",
89 Help: "Total number of files containing a match",
90 })
91 metricSearchShardFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{
92 Name: "zoekt_search_shard_files_considered_total",
93 Help: "Total number of files in shards that we considered",
94 })
95 metricSearchFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{
96 Name: "zoekt_search_files_considered_total",
97 Help: "Total files that we evaluated. Equivalent to files for which all atom matches (including negations) evaluated to true",
98 })
99 metricSearchFilesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
100 Name: "zoekt_search_files_loaded_total",
101 Help: "Total files for which we loaded file content to verify substring matches",
102 })
103 metricSearchFilesSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{
104 Name: "zoekt_search_files_skipped_total",
105 Help: "Total candidate files whose contents weren't examined because we gathered enough matches",
106 })
107 metricSearchShardsSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{
108 Name: "zoekt_search_shards_skipped_total",
109 Help: "Total shards that we did not process because a query was canceled",
110 })
111 metricSearchMatchCountTotal = promauto.NewCounter(prometheus.CounterOpts{
112 Name: "zoekt_search_match_count_total",
113 Help: "Total number of non-overlapping matches",
114 })
115 metricSearchNgramMatchesTotal = promauto.NewCounter(prometheus.CounterOpts{
116 Name: "zoekt_search_ngram_matches_total",
117 Help: "Total number of candidate matches as a result of searching ngrams",
118 })
119 metricSearchNgramLookupsTotal = promauto.NewCounter(prometheus.CounterOpts{
120 Name: "zoekt_search_ngram_lookups_total",
121 Help: "Total number of times we accessed an ngram in the index",
122 })
123 metricSearchRegexpsConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{
124 Name: "zoekt_search_regexps_considered_total",
125 Help: "Total number of times regexp was called on files that we evaluated",
126 })
127
128 metricListRunning = promauto.NewGauge(prometheus.GaugeOpts{
129 Name: "zoekt_list_running",
130 Help: "The number of concurrent list requests running",
131 })
132 metricListShardRunning = promauto.NewGauge(prometheus.GaugeOpts{
133 Name: "zoekt_list_shard_running",
134 Help: "The number of concurrent list requests in a shard running",
135 })
136 metricShardsBatchReplaceDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{
137 Name: "zoekt_shards_batch_replace_duration_seconds",
138 Help: "The time it takes to replace a batch of Searchers.",
139 Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30},
140 })
141 metricListAllRepos = promauto.NewGauge(prometheus.GaugeOpts{
142 Name: "zoekt_list_all_stats_repos",
143 Help: "The last List(true) value for RepoStats.Repos. Repos is used for aggregrating the number of repositories.",
144 })
145 metricListAllShards = promauto.NewGauge(prometheus.GaugeOpts{
146 Name: "zoekt_list_all_stats_shards",
147 Help: "The last List(true) value for RepoStats.Shards. Shards is the total number of search shards.",
148 })
149 metricListAllDocuments = promauto.NewGauge(prometheus.GaugeOpts{
150 Name: "zoekt_list_all_stats_documents",
151 Help: "The last List(true) value for RepoStats.Documents. Documents holds the number of documents or files.",
152 })
153 metricListAllIndexBytes = promauto.NewGauge(prometheus.GaugeOpts{
154 Name: "zoekt_list_all_stats_index_bytes",
155 Help: "The last List(true) value for RepoStats.IndexBytes. IndexBytes is the amount of RAM used for index overhead.",
156 })
157 metricListAllContentBytes = promauto.NewGauge(prometheus.GaugeOpts{
158 Name: "zoekt_list_all_stats_content_bytes",
159 Help: "The last List(true) value for RepoStats.ContentBytes. ContentBytes is the amount of RAM used for raw content.",
160 })
161 metricListAllNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
162 Name: "zoekt_list_all_stats_new_lines_count",
163 Help: "The last List(true) value for RepoStats.NewLinesCount.",
164 })
165 metricListAllDefaultBranchNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
166 Name: "zoekt_list_all_stats_default_branch_new_lines_count",
167 Help: "The last List(true) value for RepoStats.DefaultBranchNewLinesCount.",
168 })
169 metricListAllOtherBranchesNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
170 Name: "zoekt_list_all_stats_other_branches_new_lines_count",
171 Help: "The last List(true) value for RepoStats.OtherBranchesNewLinesCount.",
172 })
173)
174
175type rankedShard struct {
176 zoekt.Searcher
177
178 priority float64 // maximum priority across all repos in the shard
179
180 // We have out of band ranking on compound shards which can change even if
181 // the shard file does not. So we compute a rank in getShards. We store
182 // repos here to avoid the cost of List in the search request path.
183 //
184 // repos is nil only if that call failed.
185 repos []*zoekt.Repository
186}
187
188// loaded stores the state we compute when updating the state of shards from
189// disk.
190type loaded struct {
191 // shards is the currently loaded shards sorted by decreasing rank and
192 // should not be mutated.
193 shards []*rankedShard
194
195 // ready is true if sharded searcher has finished loading all initial
196 // shards on startup.
197 ready bool
198}
199
200type shardedSearcher struct {
201 // Limit the number of parallel queries. Since searching is
202 // CPU bound, we can't do better than #CPU queries in
203 // parallel. If we do so, we just create more memory
204 // pressure.
205 sched scheduler
206
207 mu sync.Mutex // protects writes to shards
208 shards map[string]*rankedShard
209
210 ready atomic.Bool
211 ranked atomic.Value
212}
213
214func newShardedSearcher(n int64) *shardedSearcher {
215 ss := &shardedSearcher{
216 shards: make(map[string]*rankedShard),
217 sched: newScheduler(n),
218 }
219 return ss
220}
221
222// NewDirectorySearcher returns a searcher instance that loads all
223// shards corresponding to a glob into memory.
224func NewDirectorySearcher(dir string) (zoekt.Streamer, error) {
225 return newDirectorySearcher(dir, true)
226}
227
228// NewDirectorySearcherFast is like NewDirectorySearcher, but does not block
229// on the initial loading of shards.
230//
231// This exists since in the case of zoekt-webserver we are happy with having
232// partial availability since that is better than no availability on large
233// instances.
234func NewDirectorySearcherFast(dir string) (zoekt.Streamer, error) {
235 return newDirectorySearcher(dir, false)
236}
237
238func newDirectorySearcher(dir string, waitUntilReady bool) (zoekt.Streamer, error) {
239 ss := newShardedSearcher(int64(runtime.GOMAXPROCS(0)))
240 tl := &loader{
241 ss: ss,
242 }
243 dw, err := newDirectoryWatcher(dir, tl)
244 if err != nil {
245 return nil, err
246 }
247
248 if waitUntilReady {
249 if err := dw.WaitUntilReady(); err != nil {
250 return nil, err
251 }
252 }
253
254 ds := &directorySearcher{
255 Streamer: ss,
256 directoryWatcher: dw,
257 }
258
259 return &typeRepoSearcher{Streamer: ds}, nil
260}
261
262type directorySearcher struct {
263 zoekt.Streamer
264
265 directoryWatcher *DirectoryWatcher
266}
267
268func (s *directorySearcher) Close() {
269 // We need to Stop directoryWatcher first since it calls load/unload on
270 // Searcher.
271 s.directoryWatcher.Stop()
272 s.Streamer.Close()
273}
274
275type loader struct {
276 ss *shardedSearcher
277}
278
279func (tl *loader) load(keys ...string) {
280 // This is called with all keys on startup, so once this function has
281 // finished running shardedSearcher will be ready.
282 defer tl.ss.markReady()
283
284 if len(keys) == 0 {
285 // If there's nothing to load, we exit early here, but we want to mark
286 // ourselves as ready.
287 return
288 }
289
290 var (
291 mu sync.Mutex // synchronizes writes to the shards map
292 wg sync.WaitGroup // used to wait for all shards to load
293 sem = semaphore.NewWeighted(int64(runtime.GOMAXPROCS(0)))
294 loadedShards = make(map[string]zoekt.Searcher)
295 )
296
297 publishLoaded := func() {
298 mu.Lock()
299 chunk := loadedShards
300 loadedShards = make(map[string]zoekt.Searcher)
301 mu.Unlock()
302 tl.ss.replace(chunk)
303 }
304
305 log.Printf("loading %d shard(s): %s", len(keys), humanTruncateList(keys, 5))
306
307 lastProgress := time.Now()
308 for i, key := range keys {
309 // If taking a while to start-up occasionally give a progress message
310 if time.Since(lastProgress) > 5*time.Second {
311 log.Printf("still need to load %d shards...", len(keys)-i)
312 lastProgress = time.Now()
313
314 publishLoaded()
315 }
316
317 _ = sem.Acquire(context.Background(), 1)
318 wg.Add(1)
319
320 go func(key string) {
321 defer sem.Release(1)
322 defer wg.Done()
323
324 shard, err := loadShard(key)
325 if err != nil {
326 metricShardsLoadFailedTotal.Inc()
327 log.Printf("reloading: %s, err %v ", key, err)
328 return
329 }
330 metricShardsLoadedTotal.Inc()
331
332 mu.Lock()
333 loadedShards[key] = shard
334 mu.Unlock()
335 }(key)
336 }
337
338 wg.Wait()
339
340 publishLoaded()
341}
342
343func (tl *loader) drop(keys ...string) {
344 shards := make(map[string]zoekt.Searcher, len(keys))
345 for _, key := range keys {
346 shards[key] = nil
347 }
348 tl.ss.replace(shards)
349}
350
351func (ss *shardedSearcher) String() string {
352 return "shardedSearcher"
353}
354
355// Close closes references to open files. It may be called only once.
356func (ss *shardedSearcher) Close() {
357 ss.mu.Lock()
358 shards := make(map[string]zoekt.Searcher, len(ss.shards))
359 for k := range ss.shards {
360 shards[k] = nil
361 }
362 ss.mu.Unlock()
363
364 ss.replace(shards)
365}
366
367func selectRepoSet(shards []*rankedShard, q query.Q) ([]*rankedShard, query.Q) {
368 and, ok := q.(*query.And)
369 if ok {
370 return doSelectRepoSet(shards, and)
371 }
372
373 // We have queries which look like (reposet ...) and we want to do the same
374 // optimizations. To simplify we just always wrap the query in And and then
375 // on the return value call Simplify to unwrap. In particular this is
376 // important for List calls.
377 and = &query.And{Children: []query.Q{q}}
378 shards, q = doSelectRepoSet(shards, and)
379 return shards, query.Simplify(q)
380}
381
382func doSelectRepoSet(shards []*rankedShard, and *query.And) ([]*rankedShard, query.Q) {
383 // (and (reposet ...) (q))
384 // (and true (q)) with a filtered shards
385 // (and false) // noop
386
387 // (and (repobranches ...) (q))
388 // (and (repobranches ...) (q))
389
390 // Note: we also support (and (repo ...) (q)) even though sourcegraph does
391 // not generate those sorts of queries. This is to support manual testing.
392
393 hasReposForPredicate := func(pred func(repo *zoekt.Repository) bool) func(repos []*zoekt.Repository) (any, all bool) {
394 return func(repos []*zoekt.Repository) (any, all bool) {
395 any = false
396 all = true
397 for _, repo := range repos {
398 b := pred(repo)
399 any = any || b
400 all = all && b
401 }
402 return any, all
403 }
404 }
405
406 for i, c := range and.Children {
407 var setSize int
408 var hasRepos func([]*zoekt.Repository) (bool, bool)
409 switch setQuery := c.(type) {
410 case *query.RepoSet:
411 setSize = len(setQuery.Set)
412 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
413 return setQuery.Set[repo.Name]
414 })
415 case *query.RepoIDs:
416 setSize = int(setQuery.Repos.GetCardinality())
417 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
418 return setQuery.Repos.Contains(repo.ID)
419 })
420 case *query.Repo:
421 setSize = 0
422 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
423 return setQuery.Regexp.MatchString(repo.Name)
424 })
425 case *query.BranchesRepos:
426 for _, br := range setQuery.List {
427 setSize += int(br.Repos.GetCardinality())
428 }
429
430 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
431 for _, br := range setQuery.List {
432 if br.Repos.Contains(repo.ID) {
433 return true
434 }
435 }
436 return false
437 })
438 default:
439 continue
440 }
441
442 // setSize may be larger than the number of shards we have. The size of
443 // filtered is bounded by min(len(set), len(shards))
444 if setSize > len(shards) {
445 setSize = len(shards)
446 }
447
448 filtered := make([]*rankedShard, 0, setSize)
449 filteredAll := true
450
451 for _, s := range shards {
452 if s.repos == nil {
453 // repos is nil if we failed to List the shard. This shouldn't
454 // happen, but if it does we don't know what is in it and must search
455 // it without simplifying the query.
456 filtered = append(filtered, s)
457 filteredAll = false
458 } else if any, all := hasRepos(s.repos); any {
459 filtered = append(filtered, s)
460 filteredAll = filteredAll && all
461 }
462 }
463
464 // We don't need to adjust the query since we are returning an empty set
465 // of shards to search.
466 if len(filtered) == 0 {
467 return filtered, and
468 }
469
470 // We can't simplify the query since we are searching shards which contain
471 // repos we aren't supposed to search.
472 if !filteredAll {
473 return filtered, and
474 }
475
476 // We don't want to mutate the original and, so we clone it before
477 // mutating it.
478 and = &query.And{Children: slices.Clone(and.Children)}
479
480 // This optimization allows us to avoid the work done by
481 // indexData.simplify for each shard.
482 //
483 // For example if our query is (and (reposet foo bar) (content baz))
484 // then at this point filtered is [foo bar] and q is the same. For each
485 // shard indexData.simplify will simplify to (and true (content baz)) ->
486 // (content baz). This work can be done now once, rather than per shard.
487 switch c := c.(type) {
488 case *query.RepoSet, *query.RepoIDs, *query.Repo:
489 and.Children[i] = &query.Const{Value: true}
490 return filtered, query.Simplify(and)
491
492 case *query.BranchesRepos:
493 // We can only replace if all the repos want the same branches. We
494 // simplify and just check that we are requesting 1 branch. The common
495 // case is just asking for HEAD, so this should be effective.
496 if len(c.List) != 1 {
497 return filtered, and
498 }
499
500 // Every repo wants the same branches, so we can replace RepoBranches
501 // with a list of branch queries.
502 and.Children[i] = &query.Branch{Pattern: c.List[0].Branch, Exact: true}
503 return filtered, query.Simplify(and)
504 }
505
506 // Stop after first RepoSet, otherwise we might append duplicate
507 // shards to `filtered`
508 return filtered, and
509 }
510
511 return shards, and
512}
513
514func (ss *shardedSearcher) Search(ctx context.Context, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) {
515 tr, ctx := trace.New(ctx, "shardedSearcher.Search", "")
516 defer func() {
517 tr.Finish()
518 }()
519 ctx, cancel := context.WithCancel(ctx)
520 defer cancel()
521
522 collectSender := newCollectSender(opts)
523
524 start := time.Now()
525 proc, err := ss.sched.Acquire(ctx)
526 if err != nil {
527 return nil, err
528 }
529 defer proc.Release()
530 tr.LazyPrintf("acquired process")
531
532 wait := time.Since(start)
533 start = time.Now()
534
535 loaded := ss.getLoaded()
536 done, err := streamSearch(ctx, proc, q, opts, loaded.shards, collectSender)
537 defer done()
538 if err != nil {
539 return nil, err
540 }
541
542 aggregate, ok := collectSender.Done()
543 if !ok {
544 aggregate = &zoekt.SearchResult{
545 RepoURLs: map[string]string{},
546 LineFragments: map[string]string{},
547 }
548 }
549
550 copyFiles(aggregate)
551
552 if !loaded.ready {
553 // We may have missed results due to not being fully loaded.
554 aggregate.Stats.Crashes++
555 }
556
557 aggregate.Stats.Wait = wait
558 aggregate.Stats.Duration = time.Since(start)
559
560 return aggregate, nil
561}
562
563func (ss *shardedSearcher) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, sender zoekt.Sender) (err error) {
564 tr, ctx := trace.New(ctx, "shardedSearcher.StreamSearch", "")
565 defer func() {
566 if err != nil {
567 tr.LazyPrintf("error: %v", err)
568 tr.SetError(err)
569 }
570 tr.Finish()
571 }()
572
573 start := time.Now()
574 proc, err := ss.sched.Acquire(ctx)
575 if err != nil {
576 return err
577 }
578 defer proc.Release()
579 tr.LazyPrintf("acquired process")
580
581 loaded := ss.getLoaded()
582 shards := loaded.shards
583
584 maxPendingPriority := math.Inf(-1)
585 if len(shards) > 0 {
586 maxPendingPriority = shards[0].priority
587 }
588
589 stillLoadingCrashes := 0
590 if !loaded.ready {
591 // We may have missed results due to not being fully loaded.
592 stillLoadingCrashes++
593 }
594
595 sender.Send(&zoekt.SearchResult{
596 Stats: zoekt.Stats{
597 Crashes: stillLoadingCrashes,
598 Wait: time.Since(start),
599 },
600 Progress: zoekt.Progress{
601 MaxPendingPriority: maxPendingPriority,
602 },
603 })
604
605 // Matches flow from the shards up the stack in the following order:
606 //
607 // 1. Search shards
608 // 2. flushCollectSender (aggregate)
609 // 3. limitSender (limit)
610 // 4. copyFileSender (copy)
611 //
612 // For streaming, the wrapping has to happen in the inverted order.
613 sender = copyFileSender(sender)
614
615 if truncator, hasLimits := zoekt.NewDisplayTruncator(opts); hasLimits {
616 var cancel context.CancelFunc
617 ctx, cancel = context.WithCancel(ctx)
618 defer cancel()
619 sender = limitSender(cancel, sender, truncator)
620 }
621
622 sender, flush := newFlushCollectSender(opts, sender)
623
624 done, err := streamSearch(ctx, proc, q, opts, shards, sender)
625
626 // Even though streaming is done, we may have results sitting in a buffer we
627 // need to flush. So we need to send those before calling done.
628 flush()
629 done()
630
631 return err
632}
633
634// streamSearch is an internal helper since both Search and StreamSearch are
635// largely similar.
636//
637// done must always be called, even if err is non-nil. The SearchResults sent
638// via sender contain references to the underlying mmap data that the garbage
639// collector can't see. Calling done informs the garbage collector it is free
640// to collect those shards. The caller must call copyFiles on any
641// SearchResults it returns/streams out before calling done.
642func streamSearch(ctx context.Context, proc *process, q query.Q, opts *zoekt.SearchOptions, shards []*rankedShard, sender zoekt.Sender) (done func(), err error) {
643 tr, ctx := trace.New(ctx, "shardedSearcher.streamSearch", "")
644 overallStart := time.Now()
645 metricSearchRunning.Inc()
646 defer func() {
647 metricSearchRunning.Dec()
648 metricSearchDuration.Observe(time.Since(overallStart).Seconds())
649 if err != nil {
650 metricSearchFailedTotal.Inc()
651
652 tr.LazyPrintf("error: %v", err)
653 tr.SetError(err)
654 }
655 tr.Finish()
656 }()
657
658 // Select the subset of shards that we will search over for the given query.
659 {
660 beforeLen := len(shards)
661 beforeQ := q
662 shards, q = selectRepoSet(shards, q)
663 tr.LazyPrintf("selectRepoSet shards=%d->%d q=%s->%s", beforeLen, len(shards), beforeQ, q)
664 }
665
666 if len(shards) == 0 {
667 return func() {}, nil
668 }
669
670 var cancel context.CancelFunc
671 if opts.MaxWallTime == 0 {
672 ctx, cancel = context.WithCancel(ctx)
673 } else {
674 ctx, cancel = context.WithTimeout(ctx, opts.MaxWallTime)
675 }
676
677 defer cancel()
678
679 // We set the number of workers to GOMAXPROCS, or the number of shards,
680 // whichever is smaller.
681 workers := runtime.GOMAXPROCS(0)
682 if workers > len(shards) {
683 workers = len(shards)
684 }
685
686 type result struct {
687 priority float64
688 *zoekt.SearchResult
689 err error
690 }
691
692 var (
693 // buffered channels to continue searching when sending back results
694 // takes a while / blocks. The maximum pending result set is workers * 2.
695 results = make(chan *result, workers)
696 search = make(chan *rankedShard, workers)
697 wg sync.WaitGroup
698 )
699
700 // Start workers that receive shards from the search channel, search them,
701 // and send the results to the results channel. This process is repeated
702 // until the search channel is closed.
703 //
704 // Note: Making "search" a buffered channel has the effect of limiting the number of parallel shard searches.
705 // Since searching is mostly CPU bound, limiting parallel shard searches also reduces the peak working set.
706 wg.Add(workers)
707 for i := 0; i < workers; i++ {
708 go func() {
709 defer wg.Done()
710 for s := range search {
711 sr, err := searchOneShard(ctx, s, q, opts)
712 r := &result{priority: s.priority, SearchResult: sr, err: err}
713 results <- r
714 }
715 }()
716 }
717
718 go func() {
719 wg.Wait()
720 close(results)
721 }()
722
723 var (
724 pending = make(prioritySlice, 0, workers)
725 shard = 0
726 next = shards[shard]
727
728 // We need a separate nil-able reference to the same channel so we can close(search) for the worker
729 // go-routines to finish but also set work to nil in order for the select statement below to ignore
730 // that case when we want to stop a search. This is needed because sending on a closed channel panics.
731 work = search
732 )
733
734 stop := func() {
735 if work != nil {
736 close(search)
737 work = nil
738 next = nil
739 }
740 }
741
742 // tracked so we can stop when we hit TotalMaxMatchCount
743 var totalMatchCount int
744
745search:
746 for {
747 // At the top of each iteration, have the proc associated with this search yield its won "timeslice"
748 // to possibly allow other searches to make progress
749 _ = proc.Yield(ctx) // Note: we let searchOneShard handle context errors
750
751 select {
752 case work <- next: // is there a worker available to search the next shard?
753 pending.append(next.priority)
754
755 shard++
756 if shard == len(shards) {
757 stop()
758 } else {
759 next = shards[shard]
760 }
761 case r, ok := <-results: // is there a result to send back?
762 if !ok {
763 break search
764 }
765
766 // delete this result's priority from pending before computing the new max pending priority
767 pending.remove(r.priority)
768
769 if r.err != nil {
770 // Set final error and stop searching new shards, but consume any pending
771 // search results.
772 stop()
773 err = r.err
774 continue
775 }
776
777 // Update the match count statistics and stop searching new shards if we've
778 // reached the limit set in the options.
779 totalMatchCount += r.SearchResult.Stats.MatchCount
780 if opts.TotalMaxMatchCount > 0 && totalMatchCount > opts.TotalMaxMatchCount {
781 stop()
782 }
783
784 observeMetrics(r.SearchResult)
785
786 r.Priority = r.priority
787 r.MaxPendingPriority = pending.max()
788
789 sendByRepository(r.SearchResult, opts, sender) // send the result back to the client
790 }
791 }
792
793 return func() { runtime.KeepAlive(shards) }, err
794}
795
796// sendByRepository splits a zoekt.SearchResult by repository and calls
797// sender.Send for each batch. Ranking in Sourcegraph expects zoekt.SearchResult
798// to contain results with the same zoekt.SearchResult.Priority only.
799//
800// We split by repository instead of by priority because it is easier to set
801// RepoURLs and LineFragments in zoekt.SearchResult.
802func sendByRepository(result *zoekt.SearchResult, opts *zoekt.SearchOptions, sender zoekt.Sender) {
803 if len(result.RepoURLs) <= 1 || len(result.Files) == 0 {
804 zoekt.SortFiles(result.Files)
805 sender.Send(result)
806 return
807 }
808
809 send := func(repoName string, a, b int, stats zoekt.Stats) {
810 zoekt.SortFiles(result.Files[a:b])
811 sender.Send(&zoekt.SearchResult{
812 Stats: stats,
813 Progress: zoekt.Progress{
814 Priority: result.Files[a].RepositoryPriority,
815 MaxPendingPriority: result.MaxPendingPriority,
816 },
817 Files: result.Files[a:b],
818 RepoURLs: map[string]string{repoName: result.RepoURLs[repoName]},
819 LineFragments: map[string]string{repoName: result.LineFragments[repoName]},
820 })
821 }
822
823 var startIndex, endIndex int
824 curRepoID := result.Files[0].RepositoryID
825 curRepoName := result.Files[0].Repository
826
827 fm := zoekt.FileMatch{}
828 for endIndex, fm = range result.Files {
829 if curRepoID != fm.RepositoryID {
830 // Stats must stay aggregate-able, hence we sent the aggregate stats with the
831 // last event.
832 send(curRepoName, startIndex, endIndex, zoekt.Stats{})
833
834 startIndex = endIndex
835 curRepoID = fm.RepositoryID
836 curRepoName = fm.Repository
837 }
838 }
839
840 send(curRepoName, startIndex, endIndex+1, result.Stats)
841}
842
843func observeMetrics(sr *zoekt.SearchResult) {
844 metricSearchContentBytesLoadedTotal.Add(float64(sr.Stats.ContentBytesLoaded))
845 metricSearchIndexBytesLoadedTotal.Add(float64(sr.Stats.IndexBytesLoaded))
846 metricSearchCrashesTotal.Add(float64(sr.Stats.Crashes))
847 metricSearchFileCountTotal.Add(float64(sr.Stats.FileCount))
848 metricSearchShardFilesConsideredTotal.Add(float64(sr.Stats.ShardFilesConsidered))
849 metricSearchFilesConsideredTotal.Add(float64(sr.Stats.FilesConsidered))
850 metricSearchFilesLoadedTotal.Add(float64(sr.Stats.FilesLoaded))
851 metricSearchFilesSkippedTotal.Add(float64(sr.Stats.FilesSkipped))
852 metricSearchShardsSkippedTotal.Add(float64(sr.Stats.ShardsSkipped))
853 metricSearchMatchCountTotal.Add(float64(sr.Stats.MatchCount))
854 metricSearchNgramMatchesTotal.Add(float64(sr.Stats.NgramMatches))
855 metricSearchNgramLookupsTotal.Add(float64(sr.Stats.NgramLookups))
856 metricSearchRegexpsConsideredTotal.Add(float64(sr.Stats.RegexpsConsidered))
857}
858
859func copySlice(src *[]byte) {
860 if *src == nil {
861 return
862 }
863 dst := make([]byte, len(*src))
864 copy(dst, *src)
865 *src = dst
866}
867
868func copyFiles(sr *zoekt.SearchResult) {
869 for i := range sr.Files {
870 copySlice(&sr.Files[i].Content)
871 copySlice(&sr.Files[i].Checksum)
872 for l := range sr.Files[i].LineMatches {
873 copySlice(&sr.Files[i].LineMatches[l].Line)
874 copySlice(&sr.Files[i].LineMatches[l].Before)
875 copySlice(&sr.Files[i].LineMatches[l].After)
876 }
877 for c := range sr.Files[i].ChunkMatches {
878 copySlice(&sr.Files[i].ChunkMatches[c].Content)
879 }
880 }
881}
882
883func searchOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) {
884 metricSearchShardRunning.Inc()
885 defer func() {
886 metricSearchShardRunning.Dec()
887 if e := recover(); e != nil {
888 log.Printf("crashed shard: %s: %#v, %s", s, e, debug.Stack())
889
890 if sr == nil {
891 sr = &zoekt.SearchResult{}
892 }
893 sr.Stats.Crashes = 1
894 }
895 }()
896
897 return s.Search(ctx, q, opts)
898}
899
900type shardListResult struct {
901 rl *zoekt.RepoList
902 err error
903}
904
905func listOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.ListOptions, sink chan shardListResult) {
906 metricListShardRunning.Inc()
907 defer func() {
908 metricListShardRunning.Dec()
909 if r := recover(); r != nil {
910 log.Printf("crashed shard: %s: %s, %s", s.String(), r, debug.Stack())
911 sink <- shardListResult{
912 &zoekt.RepoList{Crashes: 1}, nil,
913 }
914 }
915 }()
916
917 ms, err := s.List(ctx, q, opts)
918 sink <- shardListResult{ms, err}
919}
920
921func (ss *shardedSearcher) List(ctx context.Context, q query.Q, opts *zoekt.ListOptions) (rl *zoekt.RepoList, err error) {
922 tr, ctx := trace.New(ctx, "shardedSearcher.List", "")
923 metricListRunning.Inc()
924 defer func() {
925 metricListRunning.Dec()
926 if rl != nil {
927 tr.LazyPrintf("repos.size=%d reposmap.size=%d crashes=%d", len(rl.Repos), len(rl.ReposMap), rl.Crashes)
928 }
929 if err != nil {
930 tr.LazyPrintf("error: %v", err)
931 tr.SetError(err)
932 }
933 tr.Finish()
934 }()
935
936 q = query.Simplify(q)
937 isAll := false
938 if c, ok := q.(*query.Const); ok {
939 isAll = c.Value
940 }
941
942 proc, err := ss.sched.Acquire(ctx)
943 if err != nil {
944 return nil, err
945 }
946 defer proc.Release()
947 tr.LazyPrintf("acquired process")
948
949 loaded := ss.getLoaded()
950 shards := loaded.shards
951
952 // Setup what we return now, since we may short circuit if there are no
953 // shards to search.
954 stillLoadingCrashes := 0
955 if !loaded.ready {
956 // We may have missed results due to not being fully loaded.
957 stillLoadingCrashes++
958 }
959 agg := zoekt.RepoList{
960 Crashes: stillLoadingCrashes,
961 ReposMap: zoekt.ReposMap{},
962 Repos: []*zoekt.RepoListEntry{},
963 }
964
965 // PERF: Select the subset of shards that we will search over for the given
966 // query. A common List query only asks for a specific repo, so this is an
967 // important optimization.
968 {
969 beforeLen := len(shards)
970 beforeQ := q
971 shards, q = selectRepoSet(shards, q)
972 tr.LazyPrintf("selectRepoSet shards=%d->%d q=%s->%s", beforeLen, len(shards), beforeQ, q)
973 }
974
975 if len(shards) == 0 {
976 return &agg, nil
977 }
978
979 shardCount := len(shards)
980 all := make(chan shardListResult, shardCount)
981 feeder := make(chan zoekt.Searcher, len(shards))
982 for _, s := range shards {
983 feeder <- s
984 }
985 close(feeder)
986
987 for i := 0; i < runtime.GOMAXPROCS(0); i++ {
988 go func() {
989 for s := range feeder {
990 listOneShard(ctx, s, q, opts, all)
991 }
992 }()
993 }
994
995 uniq := map[string]*zoekt.RepoListEntry{}
996
997 for range shards {
998 r := <-all
999 if r.err != nil {
1000 return nil, r.err
1001 }
1002
1003 agg.Crashes += r.rl.Crashes
1004 agg.Stats.Add(&r.rl.Stats)
1005
1006 for _, r := range r.rl.Repos {
1007 prev, ok := uniq[r.Repository.Name]
1008 if !ok {
1009 cp := *r // We need to copy because we mutate r.Stats when merging duplicates
1010 uniq[r.Repository.Name] = &cp
1011 } else {
1012 prev.Stats.Add(&r.Stats)
1013 }
1014 }
1015
1016 for id, r := range r.rl.ReposMap {
1017 _, ok := agg.ReposMap[id]
1018 if !ok {
1019 agg.ReposMap[id] = r
1020 }
1021 }
1022 }
1023
1024 agg.Repos = make([]*zoekt.RepoListEntry, 0, len(uniq))
1025 for _, r := range uniq {
1026 agg.Repos = append(agg.Repos, r)
1027 }
1028
1029 // Only one of these fields is populated and in all cases the size of that
1030 // field is the number of Repos.
1031 //
1032 // Note: we don't just add individual Stats.Repos since a repository can
1033 // have multiple shards.
1034 agg.Stats.Repos = len(uniq) + len(agg.ReposMap)
1035
1036 if isAll && len(agg.Repos) > 0 {
1037 reportListAllMetrics(agg.Repos)
1038 }
1039
1040 return &agg, nil
1041}
1042
1043func reportListAllMetrics(repos []*zoekt.RepoListEntry) {
1044 var stats zoekt.RepoStats
1045 for _, r := range repos {
1046 stats.Add(&r.Stats)
1047 }
1048
1049 metricListAllRepos.Set(float64(stats.Repos))
1050 metricListAllIndexBytes.Set(float64(stats.IndexBytes))
1051 metricListAllContentBytes.Set(float64(stats.ContentBytes))
1052 metricListAllDocuments.Set(float64(stats.Documents))
1053 metricListAllShards.Set(float64(stats.Shards))
1054 metricListAllNewLinesCount.Set(float64(stats.NewLinesCount))
1055 metricListAllDefaultBranchNewLinesCount.Set(float64(stats.DefaultBranchNewLinesCount))
1056 metricListAllOtherBranchesNewLinesCount.Set(float64(stats.OtherBranchesNewLinesCount))
1057}
1058
1059// getLoaded returns the currently loaded shards. Shared so do not mutate.
1060func (s *shardedSearcher) getLoaded() loaded {
1061 // next commit will store the true value of this, for now we keep the
1062 // backwards compatible behaviour.
1063 ready := s.ready.Load()
1064 // ranked is loaded after ready to avoid a race were ready is true but
1065 // ranked is still not the final set of shards.
1066 ranked, _ := s.ranked.Load().([]*rankedShard)
1067 return loaded{
1068 shards: ranked,
1069 ready: ready,
1070 }
1071}
1072
1073func mkRankedShard(s zoekt.Searcher) *rankedShard {
1074 q := query.Const{Value: true}
1075 result, err := s.List(context.Background(), &q, nil)
1076 if err != nil {
1077 log.Printf("mkRankedShard(%s): failed to cache repository list: %v", s, err)
1078 return &rankedShard{Searcher: s}
1079 }
1080
1081 var (
1082 maxPriority float64
1083 repos = make([]*zoekt.Repository, 0, len(result.Repos))
1084 )
1085 for i := range result.Repos {
1086 repo := &result.Repos[i].Repository
1087 repos = append(repos, repo)
1088 if repo.RawConfig != nil {
1089 priority, _ := strconv.ParseFloat(repo.RawConfig["priority"], 64)
1090 if priority > maxPriority {
1091 maxPriority = priority
1092 }
1093 }
1094 }
1095
1096 return &rankedShard{
1097 Searcher: s,
1098 repos: repos,
1099 priority: maxPriority,
1100 }
1101}
1102
1103// markReady should be called once all shards have been passed into replace on
1104// startup. Once s is marked as ready it stops reporting a Crash in the
1105// response Stats.
1106func (s *shardedSearcher) markReady() {
1107 s.ready.CompareAndSwap(false, true)
1108}
1109
1110func (s *shardedSearcher) replace(shards map[string]zoekt.Searcher) {
1111 if len(shards) == 0 {
1112 return
1113 }
1114
1115 defer func(began time.Time) {
1116 metricShardsBatchReplaceDurationSeconds.Observe(time.Since(began).Seconds())
1117 }(time.Now())
1118
1119 s.mu.Lock()
1120 defer s.mu.Unlock()
1121
1122 for key, shard := range shards {
1123 var r *rankedShard
1124 if shard != nil {
1125 r = mkRankedShard(shard)
1126 }
1127
1128 old := s.shards[key]
1129 if shard == nil {
1130 delete(s.shards, key)
1131 } else {
1132 s.shards[key] = r
1133 }
1134
1135 if old != nil && old.Searcher != nil {
1136 // _ ___ /^^\ /^\ /^^\_
1137 // _ _@)@) \ ,,/ '` ~ `'~~ ', `\.
1138 // _/o\_ _ _ _/~`.`...'~\ ./~~..,'`','',.,' ' ~:
1139 // / `,'.~,~.~ . , . , ~|, ,/ .,' , ,. .. ,,. `, ~\_
1140 // ( ' _' _ '_` _ ' . , `\_/ .' ..' ' ` ` `.. `, \_
1141 // ~V~ V~ V~ V~ ~\ ` ' . ' , ' .,.,''`.,.''`.,.``. ', \_
1142 // _/\ /\ /\ /\_/, . ' , `_/~\_ .' .,. ,, , _/~\_ `. `. '., \_
1143 // < ~ ~ '~`'~'`, ., . `_: ::: \_ ' `_/ ::: \_ `.,' . ', \_
1144 // \ ' `_ '`_ _ ',/ _::_::_ \ _ _/ _::_::_ \ `.,'.,`., \-,-,-,_,_,
1145 // `'~~ `'~~ `'~~ `'~~ \(_)(_)(_)/ `~~' \(_)(_)(_)/ ~'`\_.._,._,'_;_;_;_;_;
1146 //
1147 // We can't just call Close now, because there may be ongoing searches
1148 // which have old in the shards list. Previously we used an exclusive
1149 // lock to guarantee there were no concurrent searches. However, that
1150 // led to blocking on the read path.
1151 //
1152 // We could introduce granular locking per rankedShard to know when
1153 // there are no more references. However, this becomes tricky in
1154 // practice. Instead we rely on the garbage collector noticing old is no
1155 // longer used. We take care in our searchers to runtime.KeepAlive until
1156 // we have stopped referencing the underling mmap data.
1157 runtime.SetFinalizer(old, func(r *rankedShard) {
1158 r.Close()
1159 })
1160 }
1161 }
1162
1163 ranked := make([]*rankedShard, 0, len(s.shards))
1164 for _, r := range s.shards {
1165 ranked = append(ranked, r)
1166 }
1167
1168 sort.Slice(ranked, func(i, j int) bool {
1169 priorityDiff := ranked[i].priority - ranked[j].priority
1170 if priorityDiff != 0 {
1171 return priorityDiff > 0
1172 }
1173 if len(ranked[i].repos) == 0 || len(ranked[j].repos) == 0 {
1174 // Protect against empty names which can happen if we fail to List or
1175 // the shard is full of tombstones. Prefer the shard which has names.
1176 return len(ranked[i].repos) >= len(ranked[j].repos)
1177 }
1178 return ranked[i].repos[0].Name < ranked[j].repos[0].Name
1179 })
1180
1181 s.ranked.Store(ranked)
1182
1183 metricShardsLoaded.Set(float64(len(ranked)))
1184}
1185
1186func loadShard(fn string) (zoekt.Searcher, error) {
1187 f, err := os.Open(fn)
1188 if err != nil {
1189 return nil, err
1190 }
1191
1192 iFile, err := zoekt.NewIndexFile(f)
1193 if err != nil {
1194 return nil, err
1195 }
1196 s, err := zoekt.NewSearcher(iFile)
1197 if err != nil {
1198 iFile.Close()
1199 return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err)
1200 }
1201
1202 return s, nil
1203}
1204
1205// prioritySlice is a trivial implementation of an array that provides three
1206// things: appending a value, removing a value, and getting the array's max.
1207// Operations take O(n) time, which is acceptable because N is restricted to
1208// GOMAXPROCS (i.e., number of cpu cores) by the shardedSearcher interface.
1209type prioritySlice []float64
1210
1211func (p *prioritySlice) append(pri float64) {
1212 *p = append(*p, pri)
1213}
1214
1215func (p *prioritySlice) remove(pri float64) {
1216 for i, opri := range *p {
1217 if opri == pri {
1218 if i != len(*p)-1 {
1219 // swap to make this element the tail
1220 (*p)[i] = (*p)[len(*p)-1]
1221 }
1222 // pop the end off
1223 *p = (*p)[:len(*p)-1]
1224 break
1225 }
1226 }
1227}
1228
1229func (p *prioritySlice) max() float64 {
1230 // remove() and max() could be combined, but this is easier to read and
1231 // the expected performance difference from the extra lock and loop is
1232 // almost certainly irrelevant.
1233 maxPri := math.Inf(-1)
1234 for _, pri := range *p {
1235 if pri > maxPri {
1236 maxPri = pri
1237 }
1238 }
1239 return maxPri
1240}