fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package shards
16
17import (
18 "context"
19 "fmt"
20 "log"
21 "math"
22 "os"
23 "runtime"
24 "runtime/debug"
25 "slices"
26 "sort"
27 "strconv"
28 "sync"
29 "time"
30
31 "golang.org/x/sync/semaphore"
32
33 "github.com/prometheus/client_golang/prometheus"
34 "github.com/prometheus/client_golang/prometheus/promauto"
35 "go.uber.org/atomic"
36
37 "github.com/sourcegraph/zoekt"
38 "github.com/sourcegraph/zoekt/query"
39 "github.com/sourcegraph/zoekt/trace"
40)
41
42var (
43 metricShardsLoaded = promauto.NewGauge(prometheus.GaugeOpts{
44 Name: "zoekt_shards_loaded",
45 Help: "The number of shards currently loaded",
46 })
47 metricShardsLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
48 Name: "zoekt_shards_loaded_total",
49 Help: "The total number of shards loaded",
50 })
51 metricShardsLoadFailedTotal = promauto.NewCounter(prometheus.CounterOpts{
52 Name: "zoekt_shards_load_failed_total",
53 Help: "The total number of shard loads that failed",
54 })
55
56 metricSearchRunning = promauto.NewGauge(prometheus.GaugeOpts{
57 Name: "zoekt_search_running",
58 Help: "The number of concurrent search requests running",
59 })
60 metricSearchShardRunning = promauto.NewGauge(prometheus.GaugeOpts{
61 Name: "zoekt_search_shard_running",
62 Help: "The number of concurrent search requests in a shard running",
63 })
64 metricSearchFailedTotal = promauto.NewCounter(prometheus.CounterOpts{
65 Name: "zoekt_search_failed_total",
66 Help: "The total number of search requests that failed",
67 })
68 metricSearchDuration = promauto.NewHistogram(prometheus.HistogramOpts{
69 Name: "zoekt_search_duration_seconds",
70 Help: "The duration a search request took in seconds",
71 Buckets: prometheus.DefBuckets, // DefBuckets good for service timings
72 })
73
74 // A Counter per Stat. Name should match field in zoekt.Stats.
75 metricSearchContentBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
76 Name: "zoekt_search_content_loaded_bytes_total",
77 Help: "Total amount of I/O for reading contents",
78 })
79 metricSearchIndexBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
80 Name: "zoekt_search_index_loaded_bytes_total",
81 Help: "Total amount of I/O for reading from index",
82 })
83 metricSearchCrashesTotal = promauto.NewCounter(prometheus.CounterOpts{
84 Name: "zoekt_search_crashes_total",
85 Help: "Total number of search shards that had a crash",
86 })
87 metricSearchFileCountTotal = promauto.NewCounter(prometheus.CounterOpts{
88 Name: "zoekt_search_file_count_total",
89 Help: "Total number of files containing a match",
90 })
91 metricSearchShardFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{
92 Name: "zoekt_search_shard_files_considered_total",
93 Help: "Total number of files in shards that we considered",
94 })
95 metricSearchFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{
96 Name: "zoekt_search_files_considered_total",
97 Help: "Total files that we evaluated. Equivalent to files for which all atom matches (including negations) evaluated to true",
98 })
99 metricSearchFilesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
100 Name: "zoekt_search_files_loaded_total",
101 Help: "Total files for which we loaded file content to verify substring matches",
102 })
103 metricSearchFilesSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{
104 Name: "zoekt_search_files_skipped_total",
105 Help: "Total candidate files whose contents weren't examined because we gathered enough matches",
106 })
107 metricSearchShardsSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{
108 Name: "zoekt_search_shards_skipped_total",
109 Help: "Total shards that we did not process because a query was canceled",
110 })
111 metricSearchMatchCountTotal = promauto.NewCounter(prometheus.CounterOpts{
112 Name: "zoekt_search_match_count_total",
113 Help: "Total number of non-overlapping matches",
114 })
115 metricSearchNgramMatchesTotal = promauto.NewCounter(prometheus.CounterOpts{
116 Name: "zoekt_search_ngram_matches_total",
117 Help: "Total number of candidate matches as a result of searching ngrams",
118 })
119 metricSearchNgramLookupsTotal = promauto.NewCounter(prometheus.CounterOpts{
120 Name: "zoekt_search_ngram_lookups_total",
121 Help: "Total number of times we accessed an ngram in the index",
122 })
123 metricSearchRegexpsConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{
124 Name: "zoekt_search_regexps_considered_total",
125 Help: "Total number of times regexp was called on files that we evaluated",
126 })
127
128 metricListRunning = promauto.NewGauge(prometheus.GaugeOpts{
129 Name: "zoekt_list_running",
130 Help: "The number of concurrent list requests running",
131 })
132 metricListShardRunning = promauto.NewGauge(prometheus.GaugeOpts{
133 Name: "zoekt_list_shard_running",
134 Help: "The number of concurrent list requests in a shard running",
135 })
136 metricShardsBatchReplaceDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{
137 Name: "zoekt_shards_batch_replace_duration_seconds",
138 Help: "The time it takes to replace a batch of Searchers.",
139 Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30},
140 })
141 metricListAllRepos = promauto.NewGauge(prometheus.GaugeOpts{
142 Name: "zoekt_list_all_stats_repos",
143 Help: "The last List(true) value for RepoStats.Repos. Repos is used for aggregrating the number of repositories.",
144 })
145 metricListAllShards = promauto.NewGauge(prometheus.GaugeOpts{
146 Name: "zoekt_list_all_stats_shards",
147 Help: "The last List(true) value for RepoStats.Shards. Shards is the total number of search shards.",
148 })
149 metricListAllDocuments = promauto.NewGauge(prometheus.GaugeOpts{
150 Name: "zoekt_list_all_stats_documents",
151 Help: "The last List(true) value for RepoStats.Documents. Documents holds the number of documents or files.",
152 })
153 metricListAllIndexBytes = promauto.NewGauge(prometheus.GaugeOpts{
154 Name: "zoekt_list_all_stats_index_bytes",
155 Help: "The last List(true) value for RepoStats.IndexBytes. IndexBytes is the amount of RAM used for index overhead.",
156 })
157 metricListAllContentBytes = promauto.NewGauge(prometheus.GaugeOpts{
158 Name: "zoekt_list_all_stats_content_bytes",
159 Help: "The last List(true) value for RepoStats.ContentBytes. ContentBytes is the amount of RAM used for raw content.",
160 })
161 metricListAllNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
162 Name: "zoekt_list_all_stats_new_lines_count",
163 Help: "The last List(true) value for RepoStats.NewLinesCount.",
164 })
165 metricListAllDefaultBranchNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
166 Name: "zoekt_list_all_stats_default_branch_new_lines_count",
167 Help: "The last List(true) value for RepoStats.DefaultBranchNewLinesCount.",
168 })
169 metricListAllOtherBranchesNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
170 Name: "zoekt_list_all_stats_other_branches_new_lines_count",
171 Help: "The last List(true) value for RepoStats.OtherBranchesNewLinesCount.",
172 })
173)
174
175type rankedShard struct {
176 zoekt.Searcher
177
178 priority float64 // maximum priority across all repos in the shard
179
180 // We have out of band ranking on compound shards which can change even if
181 // the shard file does not. So we compute a rank in getShards. We store
182 // repos here to avoid the cost of List in the search request path.
183 repos []*zoekt.Repository
184}
185
186// loaded stores the state we compute when updating the state of shards from
187// disk.
188type loaded struct {
189 // shards is the currently loaded shards sorted by decreasing rank and
190 // should not be mutated.
191 shards []*rankedShard
192
193 // ready is true if sharded searcher has finished loading all initial
194 // shards on startup.
195 ready bool
196}
197
198type shardedSearcher struct {
199 // Limit the number of parallel queries. Since searching is
200 // CPU bound, we can't do better than #CPU queries in
201 // parallel. If we do so, we just create more memory
202 // pressure.
203 sched scheduler
204
205 mu sync.Mutex // protects writes to shards
206 shards map[string]*rankedShard
207
208 ready atomic.Bool
209 ranked atomic.Value
210}
211
212func newShardedSearcher(n int64) *shardedSearcher {
213 ss := &shardedSearcher{
214 shards: make(map[string]*rankedShard),
215 sched: newScheduler(n),
216 }
217 return ss
218}
219
220// NewDirectorySearcher returns a searcher instance that loads all
221// shards corresponding to a glob into memory.
222func NewDirectorySearcher(dir string) (zoekt.Streamer, error) {
223 return newDirectorySearcher(dir, true)
224}
225
226// NewDirectorySearcherFast is like NewDirectorySearcher, but does not block
227// on the initial loading of shards.
228//
229// This exists since in the case of zoekt-webserver we are happy with having
230// partial availability since that is better than no availability on large
231// instances.
232func NewDirectorySearcherFast(dir string) (zoekt.Streamer, error) {
233 return newDirectorySearcher(dir, false)
234}
235
236func newDirectorySearcher(dir string, waitUntilReady bool) (zoekt.Streamer, error) {
237 ss := newShardedSearcher(int64(runtime.GOMAXPROCS(0)))
238 tl := &loader{
239 ss: ss,
240 }
241 dw, err := newDirectoryWatcher(dir, tl)
242 if err != nil {
243 return nil, err
244 }
245
246 if waitUntilReady {
247 if err := dw.WaitUntilReady(); err != nil {
248 return nil, err
249 }
250 }
251
252 ds := &directorySearcher{
253 Streamer: ss,
254 directoryWatcher: dw,
255 }
256
257 return &typeRepoSearcher{Streamer: ds}, nil
258}
259
260type directorySearcher struct {
261 zoekt.Streamer
262
263 directoryWatcher *DirectoryWatcher
264}
265
266func (s *directorySearcher) Close() {
267 // We need to Stop directoryWatcher first since it calls load/unload on
268 // Searcher.
269 s.directoryWatcher.Stop()
270 s.Streamer.Close()
271}
272
273type loader struct {
274 ss *shardedSearcher
275}
276
277func (tl *loader) load(keys ...string) {
278 // This is called with all keys on startup, so once this function has
279 // finished running shardedSearcher will be ready.
280 defer tl.ss.markReady()
281
282 if len(keys) == 0 {
283 // If there's nothing to load, we exit early here, but we want to mark
284 // ourselves as ready.
285 return
286 }
287
288 var (
289 mu sync.Mutex // synchronizes writes to the shards map
290 wg sync.WaitGroup // used to wait for all shards to load
291 sem = semaphore.NewWeighted(int64(runtime.GOMAXPROCS(0)))
292 loadedShards = make(map[string]zoekt.Searcher)
293 )
294
295 publishLoaded := func() {
296 mu.Lock()
297 chunk := loadedShards
298 loadedShards = make(map[string]zoekt.Searcher)
299 mu.Unlock()
300 tl.ss.replace(chunk)
301 }
302
303 log.Printf("loading %d shard(s): %s", len(keys), humanTruncateList(keys, 5))
304
305 lastProgress := time.Now()
306 for i, key := range keys {
307 // If taking a while to start-up occasionally give a progress message
308 if time.Since(lastProgress) > 5*time.Second {
309 log.Printf("still need to load %d shards...", len(keys)-i)
310 lastProgress = time.Now()
311
312 publishLoaded()
313 }
314
315 _ = sem.Acquire(context.Background(), 1)
316 wg.Add(1)
317
318 go func(key string) {
319 defer sem.Release(1)
320 defer wg.Done()
321
322 shard, err := loadShard(key)
323 if err != nil {
324 metricShardsLoadFailedTotal.Inc()
325 log.Printf("reloading: %s, err %v ", key, err)
326 return
327 }
328 metricShardsLoadedTotal.Inc()
329
330 mu.Lock()
331 loadedShards[key] = shard
332 mu.Unlock()
333 }(key)
334 }
335
336 wg.Wait()
337
338 publishLoaded()
339}
340
341func (tl *loader) drop(keys ...string) {
342 shards := make(map[string]zoekt.Searcher, len(keys))
343 for _, key := range keys {
344 shards[key] = nil
345 }
346 tl.ss.replace(shards)
347}
348
349func (ss *shardedSearcher) String() string {
350 return "shardedSearcher"
351}
352
353// Close closes references to open files. It may be called only once.
354func (ss *shardedSearcher) Close() {
355 ss.mu.Lock()
356 shards := make(map[string]zoekt.Searcher, len(ss.shards))
357 for k := range ss.shards {
358 shards[k] = nil
359 }
360 ss.mu.Unlock()
361
362 ss.replace(shards)
363}
364
365func selectRepoSet(shards []*rankedShard, q query.Q) ([]*rankedShard, query.Q) {
366 and, ok := q.(*query.And)
367 if ok {
368 return doSelectRepoSet(shards, and)
369 }
370
371 // We have queries which look like (reposet ...) and we want to do the same
372 // optimizations. To simplify we just always wrap the query in And and then
373 // on the return value call Simplify to unwrap. In particular this is
374 // important for List calls.
375 and = &query.And{Children: []query.Q{q}}
376 shards, q = doSelectRepoSet(shards, and)
377 return shards, query.Simplify(q)
378}
379
380func doSelectRepoSet(shards []*rankedShard, and *query.And) ([]*rankedShard, query.Q) {
381 // (and (reposet ...) (q))
382 // (and true (q)) with a filtered shards
383 // (and false) // noop
384
385 // (and (repobranches ...) (q))
386 // (and (repobranches ...) (q))
387
388 // Note: we also support (and (repo ...) (q)) even though sourcegraph does
389 // not generate those sorts of queries. This is to support manual testing.
390
391 hasReposForPredicate := func(pred func(repo *zoekt.Repository) bool) func(repos []*zoekt.Repository) (any, all bool) {
392 return func(repos []*zoekt.Repository) (any, all bool) {
393 any = false
394 all = true
395 for _, repo := range repos {
396 b := pred(repo)
397 any = any || b
398 all = all && b
399 }
400 return any, all
401 }
402 }
403
404 for i, c := range and.Children {
405 var setSize int
406 var hasRepos func([]*zoekt.Repository) (bool, bool)
407 switch setQuery := c.(type) {
408 case *query.RepoSet:
409 setSize = len(setQuery.Set)
410 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
411 return setQuery.Set[repo.Name]
412 })
413 case *query.RepoIDs:
414 setSize = int(setQuery.Repos.GetCardinality())
415 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
416 return setQuery.Repos.Contains(repo.ID)
417 })
418 case *query.Repo:
419 setSize = 0
420 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
421 return setQuery.Regexp.MatchString(repo.Name)
422 })
423 case *query.BranchesRepos:
424 for _, br := range setQuery.List {
425 setSize += int(br.Repos.GetCardinality())
426 }
427
428 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
429 for _, br := range setQuery.List {
430 if br.Repos.Contains(repo.ID) {
431 return true
432 }
433 }
434 return false
435 })
436 default:
437 continue
438 }
439
440 // setSize may be larger than the number of shards we have. The size of
441 // filtered is bounded by min(len(set), len(shards))
442 if setSize > len(shards) {
443 setSize = len(shards)
444 }
445
446 filtered := make([]*rankedShard, 0, setSize)
447 filteredAll := true
448
449 for _, s := range shards {
450 if any, all := hasRepos(s.repos); any {
451 filtered = append(filtered, s)
452 filteredAll = filteredAll && all
453 }
454 }
455
456 // We don't need to adjust the query since we are returning an empty set
457 // of shards to search.
458 if len(filtered) == 0 {
459 return filtered, and
460 }
461
462 // We can't simplify the query since we are searching shards which contain
463 // repos we aren't supposed to search.
464 if !filteredAll {
465 return filtered, and
466 }
467
468 // We don't want to mutate the original and, so we clone it before
469 // mutating it.
470 and = &query.And{Children: slices.Clone(and.Children)}
471
472 // This optimization allows us to avoid the work done by
473 // indexData.simplify for each shard.
474 //
475 // For example if our query is (and (reposet foo bar) (content baz))
476 // then at this point filtered is [foo bar] and q is the same. For each
477 // shard indexData.simplify will simplify to (and true (content baz)) ->
478 // (content baz). This work can be done now once, rather than per shard.
479 switch c := c.(type) {
480 case *query.RepoSet, *query.RepoIDs, *query.Repo:
481 and.Children[i] = &query.Const{Value: true}
482 return filtered, query.Simplify(and)
483
484 case *query.BranchesRepos:
485 // We can only replace if all the repos want the same branches. We
486 // simplify and just check that we are requesting 1 branch. The common
487 // case is just asking for HEAD, so this should be effective.
488 if len(c.List) != 1 {
489 return filtered, and
490 }
491
492 // Every repo wants the same branches, so we can replace RepoBranches
493 // with a list of branch queries.
494 and.Children[i] = &query.Branch{Pattern: c.List[0].Branch, Exact: true}
495 return filtered, query.Simplify(and)
496 }
497
498 // Stop after first RepoSet, otherwise we might append duplicate
499 // shards to `filtered`
500 return filtered, and
501 }
502
503 return shards, and
504}
505
506func (ss *shardedSearcher) Search(ctx context.Context, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) {
507 tr, ctx := trace.New(ctx, "shardedSearcher.Search", "")
508 defer func() {
509 tr.Finish()
510 }()
511 ctx, cancel := context.WithCancel(ctx)
512 defer cancel()
513
514 collectSender := newCollectSender(opts)
515
516 start := time.Now()
517 proc, err := ss.sched.Acquire(ctx)
518 if err != nil {
519 return nil, err
520 }
521 defer proc.Release()
522 tr.LazyPrintf("acquired process")
523
524 wait := time.Since(start)
525 start = time.Now()
526
527 loaded := ss.getLoaded()
528 done, err := streamSearch(ctx, proc, q, opts, loaded.shards, collectSender)
529 defer done()
530 if err != nil {
531 return nil, err
532 }
533
534 aggregate, ok := collectSender.Done()
535 if !ok {
536 aggregate = &zoekt.SearchResult{
537 RepoURLs: map[string]string{},
538 LineFragments: map[string]string{},
539 }
540 }
541
542 copyFiles(aggregate)
543
544 if !loaded.ready {
545 // We may have missed results due to not being fully loaded.
546 aggregate.Stats.Crashes++
547 }
548
549 aggregate.Stats.Wait = wait
550 aggregate.Stats.Duration = time.Since(start)
551
552 return aggregate, nil
553}
554
555func (ss *shardedSearcher) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, sender zoekt.Sender) (err error) {
556 tr, ctx := trace.New(ctx, "shardedSearcher.StreamSearch", "")
557 defer func() {
558 if err != nil {
559 tr.LazyPrintf("error: %v", err)
560 tr.SetError(err)
561 }
562 tr.Finish()
563 }()
564
565 start := time.Now()
566 proc, err := ss.sched.Acquire(ctx)
567 if err != nil {
568 return err
569 }
570 defer proc.Release()
571 tr.LazyPrintf("acquired process")
572
573 loaded := ss.getLoaded()
574 shards := loaded.shards
575
576 maxPendingPriority := math.Inf(-1)
577 if len(shards) > 0 {
578 maxPendingPriority = shards[0].priority
579 }
580
581 stillLoadingCrashes := 0
582 if !loaded.ready {
583 // We may have missed results due to not being fully loaded.
584 stillLoadingCrashes++
585 }
586
587 sender.Send(&zoekt.SearchResult{
588 Stats: zoekt.Stats{
589 Crashes: stillLoadingCrashes,
590 Wait: time.Since(start),
591 },
592 Progress: zoekt.Progress{
593 MaxPendingPriority: maxPendingPriority,
594 },
595 })
596
597 // Matches flow from the shards up the stack in the following order:
598 //
599 // 1. Search shards
600 // 2. flushCollectSender (aggregate)
601 // 3. limitSender (limit)
602 // 4. copyFileSender (copy)
603 //
604 // For streaming, the wrapping has to happen in the inverted order.
605 sender = copyFileSender(sender)
606
607 if truncator, hasLimits := zoekt.NewDisplayTruncator(opts); hasLimits {
608 var cancel context.CancelFunc
609 ctx, cancel = context.WithCancel(ctx)
610 defer cancel()
611 sender = limitSender(cancel, sender, truncator)
612 }
613
614 sender, flush := newFlushCollectSender(opts, sender)
615
616 done, err := streamSearch(ctx, proc, q, opts, shards, sender)
617
618 // Even though streaming is done, we may have results sitting in a buffer we
619 // need to flush. So we need to send those before calling done.
620 flush()
621 done()
622
623 return err
624}
625
626// streamSearch is an internal helper since both Search and StreamSearch are
627// largely similar.
628//
629// done must always be called, even if err is non-nil. The SearchResults sent
630// via sender contain references to the underlying mmap data that the garbage
631// collector can't see. Calling done informs the garbage collector it is free
632// to collect those shards. The caller must call copyFiles on any
633// SearchResults it returns/streams out before calling done.
634func streamSearch(ctx context.Context, proc *process, q query.Q, opts *zoekt.SearchOptions, shards []*rankedShard, sender zoekt.Sender) (done func(), err error) {
635 tr, ctx := trace.New(ctx, "shardedSearcher.streamSearch", "")
636 overallStart := time.Now()
637 metricSearchRunning.Inc()
638 defer func() {
639 metricSearchRunning.Dec()
640 metricSearchDuration.Observe(time.Since(overallStart).Seconds())
641 if err != nil {
642 metricSearchFailedTotal.Inc()
643
644 tr.LazyPrintf("error: %v", err)
645 tr.SetError(err)
646 }
647 tr.Finish()
648 }()
649
650 // Select the subset of shards that we will search over for the given query.
651 {
652 beforeLen := len(shards)
653 beforeQ := q
654 shards, q = selectRepoSet(shards, q)
655 tr.LazyPrintf("selectRepoSet shards=%d->%d q=%s->%s", beforeLen, len(shards), beforeQ, q)
656 }
657
658 if len(shards) == 0 {
659 return func() {}, nil
660 }
661
662 var cancel context.CancelFunc
663 if opts.MaxWallTime == 0 {
664 ctx, cancel = context.WithCancel(ctx)
665 } else {
666 ctx, cancel = context.WithTimeout(ctx, opts.MaxWallTime)
667 }
668
669 defer cancel()
670
671 // We set the number of workers to GOMAXPROCS, or the number of shards,
672 // whichever is smaller.
673 workers := runtime.GOMAXPROCS(0)
674 if workers > len(shards) {
675 workers = len(shards)
676 }
677
678 type result struct {
679 priority float64
680 *zoekt.SearchResult
681 err error
682 }
683
684 var (
685 // buffered channels to continue searching when sending back results
686 // takes a while / blocks. The maximum pending result set is workers * 2.
687 results = make(chan *result, workers)
688 search = make(chan *rankedShard, workers)
689 wg sync.WaitGroup
690 )
691
692 // Start workers that receive shards from the search channel, search them,
693 // and send the results to the results channel. This process is repeated
694 // until the search channel is closed.
695 //
696 // Note: Making "search" a buffered channel has the effect of limiting the number of parallel shard searches.
697 // Since searching is mostly CPU bound, limiting parallel shard searches also reduces the peak working set.
698 wg.Add(workers)
699 for i := 0; i < workers; i++ {
700 go func() {
701 defer wg.Done()
702 for s := range search {
703 sr, err := searchOneShard(ctx, s, q, opts)
704 r := &result{priority: s.priority, SearchResult: sr, err: err}
705 results <- r
706 }
707 }()
708 }
709
710 go func() {
711 wg.Wait()
712 close(results)
713 }()
714
715 var (
716 pending = make(prioritySlice, 0, workers)
717 shard = 0
718 next = shards[shard]
719
720 // We need a separate nil-able reference to the same channel so we can close(search) for the worker
721 // go-routines to finish but also set work to nil in order for the select statement below to ignore
722 // that case when we want to stop a search. This is needed because sending on a closed channel panics.
723 work = search
724 )
725
726 stop := func() {
727 if work != nil {
728 close(search)
729 work = nil
730 next = nil
731 }
732 }
733
734 // tracked so we can stop when we hit TotalMaxMatchCount
735 var totalMatchCount int
736
737search:
738 for {
739 // At the top of each iteration, have the proc associated with this search yield its won "timeslice"
740 // to possibly allow other searches to make progress
741 _ = proc.Yield(ctx) // Note: we let searchOneShard handle context errors
742
743 select {
744 case work <- next: // is there a worker available to search the next shard?
745 pending.append(next.priority)
746
747 shard++
748 if shard == len(shards) {
749 stop()
750 } else {
751 next = shards[shard]
752 }
753 case r, ok := <-results: // is there a result to send back?
754 if !ok {
755 break search
756 }
757
758 // delete this result's priority from pending before computing the new max pending priority
759 pending.remove(r.priority)
760
761 if r.err != nil {
762 // Set final error and stop searching new shards, but consume any pending
763 // search results.
764 stop()
765 err = r.err
766 continue
767 }
768
769 // Update the match count statistics and stop searching new shards if we've
770 // reached the limit set in the options.
771 totalMatchCount += r.SearchResult.Stats.MatchCount
772 if opts.TotalMaxMatchCount > 0 && totalMatchCount > opts.TotalMaxMatchCount {
773 stop()
774 }
775
776 observeMetrics(r.SearchResult)
777
778 r.Priority = r.priority
779 r.MaxPendingPriority = pending.max()
780
781 sendByRepository(r.SearchResult, opts, sender) // send the result back to the client
782 }
783 }
784
785 return func() { runtime.KeepAlive(shards) }, err
786}
787
788// sendByRepository splits a zoekt.SearchResult by repository and calls
789// sender.Send for each batch. Ranking in Sourcegraph expects zoekt.SearchResult
790// to contain results with the same zoekt.SearchResult.Priority only.
791//
792// We split by repository instead of by priority because it is easier to set
793// RepoURLs and LineFragments in zoekt.SearchResult.
794func sendByRepository(result *zoekt.SearchResult, opts *zoekt.SearchOptions, sender zoekt.Sender) {
795 if len(result.RepoURLs) <= 1 || len(result.Files) == 0 {
796 zoekt.SortFiles(result.Files)
797 sender.Send(result)
798 return
799 }
800
801 send := func(repoName string, a, b int, stats zoekt.Stats) {
802 zoekt.SortFiles(result.Files[a:b])
803 sender.Send(&zoekt.SearchResult{
804 Stats: stats,
805 Progress: zoekt.Progress{
806 Priority: result.Files[a].RepositoryPriority,
807 MaxPendingPriority: result.MaxPendingPriority,
808 },
809 Files: result.Files[a:b],
810 RepoURLs: map[string]string{repoName: result.RepoURLs[repoName]},
811 LineFragments: map[string]string{repoName: result.LineFragments[repoName]},
812 })
813 }
814
815 var startIndex, endIndex int
816 curRepoID := result.Files[0].RepositoryID
817 curRepoName := result.Files[0].Repository
818
819 fm := zoekt.FileMatch{}
820 for endIndex, fm = range result.Files {
821 if curRepoID != fm.RepositoryID {
822 // Stats must stay aggregate-able, hence we sent the aggregate stats with the
823 // last event.
824 send(curRepoName, startIndex, endIndex, zoekt.Stats{})
825
826 startIndex = endIndex
827 curRepoID = fm.RepositoryID
828 curRepoName = fm.Repository
829 }
830 }
831
832 send(curRepoName, startIndex, endIndex+1, result.Stats)
833}
834
835func observeMetrics(sr *zoekt.SearchResult) {
836 metricSearchContentBytesLoadedTotal.Add(float64(sr.Stats.ContentBytesLoaded))
837 metricSearchIndexBytesLoadedTotal.Add(float64(sr.Stats.IndexBytesLoaded))
838 metricSearchCrashesTotal.Add(float64(sr.Stats.Crashes))
839 metricSearchFileCountTotal.Add(float64(sr.Stats.FileCount))
840 metricSearchShardFilesConsideredTotal.Add(float64(sr.Stats.ShardFilesConsidered))
841 metricSearchFilesConsideredTotal.Add(float64(sr.Stats.FilesConsidered))
842 metricSearchFilesLoadedTotal.Add(float64(sr.Stats.FilesLoaded))
843 metricSearchFilesSkippedTotal.Add(float64(sr.Stats.FilesSkipped))
844 metricSearchShardsSkippedTotal.Add(float64(sr.Stats.ShardsSkipped))
845 metricSearchMatchCountTotal.Add(float64(sr.Stats.MatchCount))
846 metricSearchNgramMatchesTotal.Add(float64(sr.Stats.NgramMatches))
847 metricSearchNgramLookupsTotal.Add(float64(sr.Stats.NgramLookups))
848 metricSearchRegexpsConsideredTotal.Add(float64(sr.Stats.RegexpsConsidered))
849}
850
851func copySlice(src *[]byte) {
852 if *src == nil {
853 return
854 }
855 dst := make([]byte, len(*src))
856 copy(dst, *src)
857 *src = dst
858}
859
860func copyFiles(sr *zoekt.SearchResult) {
861 for i := range sr.Files {
862 copySlice(&sr.Files[i].Content)
863 copySlice(&sr.Files[i].Checksum)
864 for l := range sr.Files[i].LineMatches {
865 copySlice(&sr.Files[i].LineMatches[l].Line)
866 copySlice(&sr.Files[i].LineMatches[l].Before)
867 copySlice(&sr.Files[i].LineMatches[l].After)
868 }
869 for c := range sr.Files[i].ChunkMatches {
870 copySlice(&sr.Files[i].ChunkMatches[c].Content)
871 }
872 }
873}
874
875func searchOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) {
876 metricSearchShardRunning.Inc()
877 defer func() {
878 metricSearchShardRunning.Dec()
879 if e := recover(); e != nil {
880 log.Printf("crashed shard: %s: %#v, %s", s, e, debug.Stack())
881
882 if sr == nil {
883 sr = &zoekt.SearchResult{}
884 }
885 sr.Stats.Crashes = 1
886 }
887 }()
888
889 return s.Search(ctx, q, opts)
890}
891
892type shardListResult struct {
893 rl *zoekt.RepoList
894 err error
895}
896
897func listOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.ListOptions, sink chan shardListResult) {
898 metricListShardRunning.Inc()
899 defer func() {
900 metricListShardRunning.Dec()
901 if r := recover(); r != nil {
902 log.Printf("crashed shard: %s: %s, %s", s.String(), r, debug.Stack())
903 sink <- shardListResult{
904 &zoekt.RepoList{Crashes: 1}, nil,
905 }
906 }
907 }()
908
909 ms, err := s.List(ctx, q, opts)
910 sink <- shardListResult{ms, err}
911}
912
913func (ss *shardedSearcher) List(ctx context.Context, q query.Q, opts *zoekt.ListOptions) (rl *zoekt.RepoList, err error) {
914 tr, ctx := trace.New(ctx, "shardedSearcher.List", "")
915 metricListRunning.Inc()
916 defer func() {
917 metricListRunning.Dec()
918 if rl != nil {
919 tr.LazyPrintf("repos.size=%d reposmap.size=%d crashes=%d", len(rl.Repos), len(rl.ReposMap), rl.Crashes)
920 }
921 if err != nil {
922 tr.LazyPrintf("error: %v", err)
923 tr.SetError(err)
924 }
925 tr.Finish()
926 }()
927
928 q = query.Simplify(q)
929 isAll := false
930 if c, ok := q.(*query.Const); ok {
931 isAll = c.Value
932 }
933
934 proc, err := ss.sched.Acquire(ctx)
935 if err != nil {
936 return nil, err
937 }
938 defer proc.Release()
939 tr.LazyPrintf("acquired process")
940
941 loaded := ss.getLoaded()
942 shards := loaded.shards
943
944 // Setup what we return now, since we may short circuit if there are no
945 // shards to search.
946 stillLoadingCrashes := 0
947 if !loaded.ready {
948 // We may have missed results due to not being fully loaded.
949 stillLoadingCrashes++
950 }
951 agg := zoekt.RepoList{
952 Crashes: stillLoadingCrashes,
953 ReposMap: zoekt.ReposMap{},
954 Repos: []*zoekt.RepoListEntry{},
955 }
956
957 // PERF: Select the subset of shards that we will search over for the given
958 // query. A common List query only asks for a specific repo, so this is an
959 // important optimization.
960 {
961 beforeLen := len(shards)
962 beforeQ := q
963 shards, q = selectRepoSet(shards, q)
964 tr.LazyPrintf("selectRepoSet shards=%d->%d q=%s->%s", beforeLen, len(shards), beforeQ, q)
965 }
966
967 if len(shards) == 0 {
968 return &agg, nil
969 }
970
971 shardCount := len(shards)
972 all := make(chan shardListResult, shardCount)
973 feeder := make(chan zoekt.Searcher, len(shards))
974 for _, s := range shards {
975 feeder <- s
976 }
977 close(feeder)
978
979 for i := 0; i < runtime.GOMAXPROCS(0); i++ {
980 go func() {
981 for s := range feeder {
982 listOneShard(ctx, s, q, opts, all)
983 }
984 }()
985 }
986
987 uniq := map[string]*zoekt.RepoListEntry{}
988
989 for range shards {
990 r := <-all
991 if r.err != nil {
992 return nil, r.err
993 }
994
995 agg.Crashes += r.rl.Crashes
996 agg.Stats.Add(&r.rl.Stats)
997
998 for _, r := range r.rl.Repos {
999 prev, ok := uniq[r.Repository.Name]
1000 if !ok {
1001 cp := *r // We need to copy because we mutate r.Stats when merging duplicates
1002 uniq[r.Repository.Name] = &cp
1003 } else {
1004 prev.Stats.Add(&r.Stats)
1005 }
1006 }
1007
1008 for id, r := range r.rl.ReposMap {
1009 _, ok := agg.ReposMap[id]
1010 if !ok {
1011 agg.ReposMap[id] = r
1012 }
1013 }
1014 }
1015
1016 agg.Repos = make([]*zoekt.RepoListEntry, 0, len(uniq))
1017 for _, r := range uniq {
1018 agg.Repos = append(agg.Repos, r)
1019 }
1020
1021 // Only one of these fields is populated and in all cases the size of that
1022 // field is the number of Repos.
1023 //
1024 // Note: we don't just add individual Stats.Repos since a repository can
1025 // have multiple shards.
1026 agg.Stats.Repos = len(uniq) + len(agg.ReposMap)
1027
1028 if isAll && len(agg.Repos) > 0 {
1029 reportListAllMetrics(agg.Repos)
1030 }
1031
1032 return &agg, nil
1033}
1034
1035func reportListAllMetrics(repos []*zoekt.RepoListEntry) {
1036 var stats zoekt.RepoStats
1037 for _, r := range repos {
1038 stats.Add(&r.Stats)
1039 }
1040
1041 metricListAllRepos.Set(float64(stats.Repos))
1042 metricListAllIndexBytes.Set(float64(stats.IndexBytes))
1043 metricListAllContentBytes.Set(float64(stats.ContentBytes))
1044 metricListAllDocuments.Set(float64(stats.Documents))
1045 metricListAllShards.Set(float64(stats.Shards))
1046 metricListAllNewLinesCount.Set(float64(stats.NewLinesCount))
1047 metricListAllDefaultBranchNewLinesCount.Set(float64(stats.DefaultBranchNewLinesCount))
1048 metricListAllOtherBranchesNewLinesCount.Set(float64(stats.OtherBranchesNewLinesCount))
1049}
1050
1051// getLoaded returns the currently loaded shards. Shared so do not mutate.
1052func (s *shardedSearcher) getLoaded() loaded {
1053 // next commit will store the true value of this, for now we keep the
1054 // backwards compatible behaviour.
1055 ready := s.ready.Load()
1056 // ranked is loaded after ready to avoid a race were ready is true but
1057 // ranked is still not the final set of shards.
1058 ranked, _ := s.ranked.Load().([]*rankedShard)
1059 return loaded{
1060 shards: ranked,
1061 ready: ready,
1062 }
1063}
1064
1065func mkRankedShard(s zoekt.Searcher) *rankedShard {
1066 q := query.Const{Value: true}
1067 result, err := s.List(context.Background(), &q, nil)
1068 if err != nil {
1069 return &rankedShard{Searcher: s}
1070 }
1071 if len(result.Repos) == 0 {
1072 return &rankedShard{Searcher: s}
1073 }
1074
1075 var (
1076 maxPriority float64
1077 repos = make([]*zoekt.Repository, 0, len(result.Repos))
1078 )
1079 for i := range result.Repos {
1080 repo := &result.Repos[i].Repository
1081 repos = append(repos, repo)
1082 if repo.RawConfig != nil {
1083 priority, _ := strconv.ParseFloat(repo.RawConfig["priority"], 64)
1084 if priority > maxPriority {
1085 maxPriority = priority
1086 }
1087 }
1088 }
1089
1090 return &rankedShard{
1091 Searcher: s,
1092 repos: repos,
1093 priority: maxPriority,
1094 }
1095}
1096
1097// markReady should be called once all shards have been passed into replace on
1098// startup. Once s is marked as ready it stops reporting a Crash in the
1099// response Stats.
1100func (s *shardedSearcher) markReady() {
1101 s.ready.CompareAndSwap(false, true)
1102}
1103
1104func (s *shardedSearcher) replace(shards map[string]zoekt.Searcher) {
1105 if len(shards) == 0 {
1106 return
1107 }
1108
1109 defer func(began time.Time) {
1110 metricShardsBatchReplaceDurationSeconds.Observe(time.Since(began).Seconds())
1111 }(time.Now())
1112
1113 s.mu.Lock()
1114 defer s.mu.Unlock()
1115
1116 for key, shard := range shards {
1117 var r *rankedShard
1118 if shard != nil {
1119 r = mkRankedShard(shard)
1120 }
1121
1122 old := s.shards[key]
1123 if shard == nil {
1124 delete(s.shards, key)
1125 } else {
1126 s.shards[key] = r
1127 }
1128
1129 if old != nil && old.Searcher != nil {
1130 // _ ___ /^^\ /^\ /^^\_
1131 // _ _@)@) \ ,,/ '` ~ `'~~ ', `\.
1132 // _/o\_ _ _ _/~`.`...'~\ ./~~..,'`','',.,' ' ~:
1133 // / `,'.~,~.~ . , . , ~|, ,/ .,' , ,. .. ,,. `, ~\_
1134 // ( ' _' _ '_` _ ' . , `\_/ .' ..' ' ` ` `.. `, \_
1135 // ~V~ V~ V~ V~ ~\ ` ' . ' , ' .,.,''`.,.''`.,.``. ', \_
1136 // _/\ /\ /\ /\_/, . ' , `_/~\_ .' .,. ,, , _/~\_ `. `. '., \_
1137 // < ~ ~ '~`'~'`, ., . `_: ::: \_ ' `_/ ::: \_ `.,' . ', \_
1138 // \ ' `_ '`_ _ ',/ _::_::_ \ _ _/ _::_::_ \ `.,'.,`., \-,-,-,_,_,
1139 // `'~~ `'~~ `'~~ `'~~ \(_)(_)(_)/ `~~' \(_)(_)(_)/ ~'`\_.._,._,'_;_;_;_;_;
1140 //
1141 // We can't just call Close now, because there may be ongoing searches
1142 // which have old in the shards list. Previously we used an exclusive
1143 // lock to guarantee there were no concurrent searches. However, that
1144 // led to blocking on the read path.
1145 //
1146 // We could introduce granular locking per rankedShard to know when
1147 // there are no more references. However, this becomes tricky in
1148 // practice. Instead we rely on the garbage collector noticing old is no
1149 // longer used. We take care in our searchers to runtime.KeepAlive until
1150 // we have stopped referencing the underling mmap data.
1151 runtime.SetFinalizer(old, func(r *rankedShard) {
1152 r.Close()
1153 })
1154 }
1155 }
1156
1157 ranked := make([]*rankedShard, 0, len(s.shards))
1158 for _, r := range s.shards {
1159 ranked = append(ranked, r)
1160 }
1161
1162 sort.Slice(ranked, func(i, j int) bool {
1163 priorityDiff := ranked[i].priority - ranked[j].priority
1164 if priorityDiff != 0 {
1165 return priorityDiff > 0
1166 }
1167 if len(ranked[i].repos) == 0 || len(ranked[j].repos) == 0 {
1168 // Protect against empty names which can happen if we fail to List or
1169 // the shard is full of tombstones. Prefer the shard which has names.
1170 return len(ranked[i].repos) >= len(ranked[j].repos)
1171 }
1172 return ranked[i].repos[0].Name < ranked[j].repos[0].Name
1173 })
1174
1175 s.ranked.Store(ranked)
1176
1177 metricShardsLoaded.Set(float64(len(ranked)))
1178}
1179
1180func loadShard(fn string) (zoekt.Searcher, error) {
1181 f, err := os.Open(fn)
1182 if err != nil {
1183 return nil, err
1184 }
1185
1186 iFile, err := zoekt.NewIndexFile(f)
1187 if err != nil {
1188 return nil, err
1189 }
1190 s, err := zoekt.NewSearcher(iFile)
1191 if err != nil {
1192 iFile.Close()
1193 return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err)
1194 }
1195
1196 return s, nil
1197}
1198
1199// prioritySlice is a trivial implementation of an array that provides three
1200// things: appending a value, removing a value, and getting the array's max.
1201// Operations take O(n) time, which is acceptable because N is restricted to
1202// GOMAXPROCS (i.e., number of cpu cores) by the shardedSearcher interface.
1203type prioritySlice []float64
1204
1205func (p *prioritySlice) append(pri float64) {
1206 *p = append(*p, pri)
1207}
1208
1209func (p *prioritySlice) remove(pri float64) {
1210 for i, opri := range *p {
1211 if opri == pri {
1212 if i != len(*p)-1 {
1213 // swap to make this element the tail
1214 (*p)[i] = (*p)[len(*p)-1]
1215 }
1216 // pop the end off
1217 *p = (*p)[:len(*p)-1]
1218 break
1219 }
1220 }
1221}
1222
1223func (p *prioritySlice) max() float64 {
1224 // remove() and max() could be combined, but this is easier to read and
1225 // the expected performance difference from the extra lock and loop is
1226 // almost certainly irrelevant.
1227 maxPri := math.Inf(-1)
1228 for _, pri := range *p {
1229 if pri > maxPri {
1230 maxPri = pri
1231 }
1232 }
1233 return maxPri
1234}