fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package search
16
17import (
18 "context"
19 "fmt"
20 "log"
21 "math"
22 "os"
23 "runtime"
24 "runtime/debug"
25 "slices"
26 "sort"
27 "strconv"
28 "sync"
29 "time"
30
31 "github.com/prometheus/client_golang/prometheus"
32 "github.com/prometheus/client_golang/prometheus/promauto"
33 sglog "github.com/sourcegraph/log"
34 "go.uber.org/atomic"
35 "golang.org/x/sync/semaphore"
36
37 "github.com/sourcegraph/zoekt"
38 "github.com/sourcegraph/zoekt/index"
39 "github.com/sourcegraph/zoekt/internal/tenant/systemtenant"
40 "github.com/sourcegraph/zoekt/internal/trace"
41 "github.com/sourcegraph/zoekt/query"
42)
43
44var (
45 shardRecoveryLogger = sync.OnceValue(func() sglog.Logger {
46 return sglog.Scoped("searchShards")
47 })
48
49 metricShardsLoaded = promauto.NewGauge(prometheus.GaugeOpts{
50 Name: "zoekt_shards_loaded",
51 Help: "The number of shards currently loaded",
52 })
53 metricShardsLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
54 Name: "zoekt_shards_loaded_total",
55 Help: "The total number of shards loaded",
56 })
57 metricShardsLoadFailedTotal = promauto.NewCounter(prometheus.CounterOpts{
58 Name: "zoekt_shards_load_failed_total",
59 Help: "The total number of shard loads that failed",
60 })
61
62 metricSearchRunning = promauto.NewGauge(prometheus.GaugeOpts{
63 Name: "zoekt_search_running",
64 Help: "The number of concurrent search requests running",
65 })
66 metricSearchShardRunning = promauto.NewGauge(prometheus.GaugeOpts{
67 Name: "zoekt_search_shard_running",
68 Help: "The number of concurrent search requests in a shard running",
69 })
70 metricSearchFailedTotal = promauto.NewCounter(prometheus.CounterOpts{
71 Name: "zoekt_search_failed_total",
72 Help: "The total number of search requests that failed",
73 })
74 metricSearchDuration = promauto.NewHistogram(prometheus.HistogramOpts{
75 Name: "zoekt_search_duration_seconds",
76 Help: "The duration a search request took in seconds",
77 Buckets: prometheus.DefBuckets, // DefBuckets good for service timings
78 })
79
80 // A Counter per Stat. Name should match field in zoekt.Stats.
81 metricSearchContentBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
82 Name: "zoekt_search_content_loaded_bytes_total",
83 Help: "Total amount of I/O for reading contents",
84 })
85 metricSearchIndexBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
86 Name: "zoekt_search_index_loaded_bytes_total",
87 Help: "Total amount of I/O for reading from index",
88 })
89 metricSearchCrashesTotal = promauto.NewCounter(prometheus.CounterOpts{
90 Name: "zoekt_search_crashes_total",
91 Help: "Total number of search shards that had a crash",
92 })
93 metricSearchFileCountTotal = promauto.NewCounter(prometheus.CounterOpts{
94 Name: "zoekt_search_file_count_total",
95 Help: "Total number of files containing a match",
96 })
97 metricSearchShardFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{
98 Name: "zoekt_search_shard_files_considered_total",
99 Help: "Total number of files in shards that we considered",
100 })
101 metricSearchFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{
102 Name: "zoekt_search_files_considered_total",
103 Help: "Total files that we evaluated. Equivalent to files for which all atom matches (including negations) evaluated to true",
104 })
105 metricSearchFilesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
106 Name: "zoekt_search_files_loaded_total",
107 Help: "Total files for which we loaded file content to verify substring matches",
108 })
109 metricSearchFilesSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{
110 Name: "zoekt_search_files_skipped_total",
111 Help: "Total candidate files whose contents weren't examined because we gathered enough matches",
112 })
113 metricSearchShardsSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{
114 Name: "zoekt_search_shards_skipped_total",
115 Help: "Total shards that we did not process because a query was canceled",
116 })
117 metricSearchMatchCountTotal = promauto.NewCounter(prometheus.CounterOpts{
118 Name: "zoekt_search_match_count_total",
119 Help: "Total number of non-overlapping matches",
120 })
121 metricSearchNgramMatchesTotal = promauto.NewCounter(prometheus.CounterOpts{
122 Name: "zoekt_search_ngram_matches_total",
123 Help: "Total number of candidate matches as a result of searching ngrams",
124 })
125 metricSearchNgramLookupsTotal = promauto.NewCounter(prometheus.CounterOpts{
126 Name: "zoekt_search_ngram_lookups_total",
127 Help: "Total number of times we accessed an ngram in the index",
128 })
129 metricSearchRegexpsConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{
130 Name: "zoekt_search_regexps_considered_total",
131 Help: "Total number of times regexp was called on files that we evaluated",
132 })
133
134 metricListRunning = promauto.NewGauge(prometheus.GaugeOpts{
135 Name: "zoekt_list_running",
136 Help: "The number of concurrent list requests running",
137 })
138 metricListShardRunning = promauto.NewGauge(prometheus.GaugeOpts{
139 Name: "zoekt_list_shard_running",
140 Help: "The number of concurrent list requests in a shard running",
141 })
142 metricShardsBatchReplaceDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{
143 Name: "zoekt_shards_batch_replace_duration_seconds",
144 Help: "The time it takes to replace a batch of Searchers.",
145 Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30},
146 })
147 metricListAllRepos = promauto.NewGauge(prometheus.GaugeOpts{
148 Name: "zoekt_list_all_stats_repos",
149 Help: "The last List(true) value for RepoStats.Repos. Repos is used for aggregrating the number of repositories.",
150 })
151 metricListAllShards = promauto.NewGauge(prometheus.GaugeOpts{
152 Name: "zoekt_list_all_stats_shards",
153 Help: "The last List(true) value for RepoStats.Shards. Shards is the total number of search shards.",
154 })
155 metricListAllDocuments = promauto.NewGauge(prometheus.GaugeOpts{
156 Name: "zoekt_list_all_stats_documents",
157 Help: "The last List(true) value for RepoStats.Documents. Documents holds the number of documents or files.",
158 })
159 metricListAllIndexBytes = promauto.NewGauge(prometheus.GaugeOpts{
160 Name: "zoekt_list_all_stats_index_bytes",
161 Help: "The last List(true) value for RepoStats.IndexBytes. IndexBytes is the amount of RAM used for index overhead.",
162 })
163 metricListAllContentBytes = promauto.NewGauge(prometheus.GaugeOpts{
164 Name: "zoekt_list_all_stats_content_bytes",
165 Help: "The last List(true) value for RepoStats.ContentBytes. ContentBytes is the amount of RAM used for raw content.",
166 })
167 metricListAllNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
168 Name: "zoekt_list_all_stats_new_lines_count",
169 Help: "The last List(true) value for RepoStats.NewLinesCount.",
170 })
171 metricListAllDefaultBranchNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
172 Name: "zoekt_list_all_stats_default_branch_new_lines_count",
173 Help: "The last List(true) value for RepoStats.DefaultBranchNewLinesCount.",
174 })
175 metricListAllOtherBranchesNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
176 Name: "zoekt_list_all_stats_other_branches_new_lines_count",
177 Help: "The last List(true) value for RepoStats.OtherBranchesNewLinesCount.",
178 })
179)
180
181type rankedShard struct {
182 zoekt.Searcher
183
184 priority float64 // maximum priority across all repos in the shard
185
186 // We have out of band ranking on compound shards which can change even if
187 // the shard file does not. So we compute a rank in getShards. We store
188 // repos here to avoid the cost of List in the search request path.
189 //
190 // repos is nil only if that call failed.
191 repos []*zoekt.Repository
192}
193
194// loaded stores the state we compute when updating the state of shards from
195// disk.
196type loaded struct {
197 // shards is the currently loaded shards sorted by decreasing rank and
198 // should not be mutated.
199 shards []*rankedShard
200
201 // ready is true if sharded searcher has finished loading all initial
202 // shards on startup.
203 ready bool
204}
205
206type shardedSearcher struct {
207 // Limit the number of parallel queries. Since searching is
208 // CPU bound, we can't do better than #CPU queries in
209 // parallel. If we do so, we just create more memory
210 // pressure.
211 sched scheduler
212
213 mu sync.Mutex // protects writes to shards
214 shards map[string]*rankedShard
215
216 ready atomic.Bool
217 ranked atomic.Value
218}
219
220func newShardedSearcher(n int64) *shardedSearcher {
221 ss := &shardedSearcher{
222 shards: make(map[string]*rankedShard),
223 sched: newScheduler(n),
224 }
225 return ss
226}
227
228// NewDirectorySearcher returns a searcher instance that loads all
229// shards corresponding to a glob into memory.
230func NewDirectorySearcher(dir string) (zoekt.Streamer, error) {
231 return newDirectorySearcher(dir, true)
232}
233
234// NewDirectorySearcherFast is like NewDirectorySearcher, but does not block
235// on the initial loading of shards.
236//
237// This exists since in the case of zoekt-webserver we are happy with having
238// partial availability since that is better than no availability on large
239// instances.
240func NewDirectorySearcherFast(dir string) (zoekt.Streamer, error) {
241 return newDirectorySearcher(dir, false)
242}
243
244func newDirectorySearcher(dir string, waitUntilReady bool) (zoekt.Streamer, error) {
245 ss := newShardedSearcher(int64(runtime.GOMAXPROCS(0)))
246 tl := &loader{
247 ss: ss,
248 }
249 dw, err := newDirectoryWatcher(dir, tl)
250 if err != nil {
251 return nil, err
252 }
253
254 if waitUntilReady {
255 if err := dw.WaitUntilReady(); err != nil {
256 return nil, err
257 }
258 }
259
260 ds := &directorySearcher{
261 Streamer: ss,
262 directoryWatcher: dw,
263 }
264
265 return &typeRepoSearcher{Streamer: ds}, nil
266}
267
268type directorySearcher struct {
269 zoekt.Streamer
270
271 directoryWatcher *DirectoryWatcher
272}
273
274func (s *directorySearcher) Close() {
275 // We need to Stop directoryWatcher first since it calls load/unload on
276 // Searcher.
277 s.directoryWatcher.Stop()
278 s.Streamer.Close()
279}
280
281type loader struct {
282 ss *shardedSearcher
283}
284
285func (tl *loader) load(keys ...string) {
286 // This is called with all keys on startup, so once this function has
287 // finished running shardedSearcher will be ready.
288 defer tl.ss.markReady()
289
290 if len(keys) == 0 {
291 // If there's nothing to load, we exit early here, but we want to mark
292 // ourselves as ready.
293 return
294 }
295
296 var (
297 mu sync.Mutex // synchronizes writes to the shards map
298 wg sync.WaitGroup // used to wait for all shards to load
299 sem = semaphore.NewWeighted(int64(runtime.GOMAXPROCS(0)))
300 loadedShards = make(map[string]zoekt.Searcher)
301 )
302
303 publishLoaded := func() {
304 mu.Lock()
305 chunk := loadedShards
306 loadedShards = make(map[string]zoekt.Searcher)
307 mu.Unlock()
308 tl.ss.replace(chunk)
309 }
310
311 log.Printf("[INFO] loading %d shard(s): %s", len(keys), humanTruncateList(keys, 5))
312
313 lastProgress := time.Now()
314 for i, key := range keys {
315 // If taking a while to start-up occasionally give a progress message
316 if time.Since(lastProgress) > 5*time.Second {
317 log.Printf("[INFO] still need to load %d shards...", len(keys)-i)
318 lastProgress = time.Now()
319
320 publishLoaded()
321 }
322
323 _ = sem.Acquire(context.Background(), 1)
324 wg.Add(1)
325
326 go func(key string) {
327 defer sem.Release(1)
328 defer wg.Done()
329
330 shard, err := loadShard(key)
331 if err != nil {
332 metricShardsLoadFailedTotal.Inc()
333 log.Printf("[ERROR] reloading: %s, err %v ", key, err)
334 return
335 }
336 metricShardsLoadedTotal.Inc()
337
338 mu.Lock()
339 loadedShards[key] = shard
340 mu.Unlock()
341 }(key)
342 }
343
344 wg.Wait()
345
346 publishLoaded()
347}
348
349func (tl *loader) drop(keys ...string) {
350 shards := make(map[string]zoekt.Searcher, len(keys))
351 for _, key := range keys {
352 shards[key] = nil
353 }
354 tl.ss.replace(shards)
355}
356
357func (ss *shardedSearcher) String() string {
358 return "shardedSearcher"
359}
360
361// Close closes references to open files. It may be called only once.
362func (ss *shardedSearcher) Close() {
363 ss.mu.Lock()
364 shards := make(map[string]zoekt.Searcher, len(ss.shards))
365 for k := range ss.shards {
366 shards[k] = nil
367 }
368 ss.mu.Unlock()
369
370 ss.replace(shards)
371}
372
373func selectRepoSet(shards []*rankedShard, q query.Q) ([]*rankedShard, query.Q) {
374 and, ok := q.(*query.And)
375 if ok {
376 return doSelectRepoSet(shards, and)
377 }
378
379 // We have queries which look like (reposet ...) and we want to do the same
380 // optimizations. To simplify we just always wrap the query in And and then
381 // on the return value call Simplify to unwrap. In particular this is
382 // important for List calls.
383 and = &query.And{Children: []query.Q{q}}
384 shards, q = doSelectRepoSet(shards, and)
385 return shards, query.Simplify(q)
386}
387
388func doSelectRepoSet(shards []*rankedShard, and *query.And) ([]*rankedShard, query.Q) {
389 // (and (reposet ...) (q))
390 // (and true (q)) with a filtered shards
391 // (and false) // noop
392
393 // (and (repobranches ...) (q))
394 // (and (repobranches ...) (q))
395
396 // Note: we also support (and (repo ...) (q)) even though sourcegraph does
397 // not generate those sorts of queries. This is to support manual testing.
398
399 hasReposForPredicate := func(pred func(repo *zoekt.Repository) bool) func(repos []*zoekt.Repository) (any, all bool) {
400 return func(repos []*zoekt.Repository) (any, all bool) {
401 any = false
402 all = true
403 for _, repo := range repos {
404 b := pred(repo)
405 any = any || b
406 all = all && b
407 }
408 return any, all
409 }
410 }
411
412 for i, c := range and.Children {
413 var setSize int
414 var hasRepos func([]*zoekt.Repository) (bool, bool)
415 switch setQuery := c.(type) {
416 case *query.RepoSet:
417 setSize = len(setQuery.Set)
418 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
419 return setQuery.Set[repo.Name]
420 })
421 case *query.RepoIDs:
422 setSize = int(setQuery.Repos.GetCardinality())
423 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
424 return setQuery.Repos.Contains(repo.ID)
425 })
426 case *query.Repo:
427 setSize = 0
428 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
429 return setQuery.Regexp.MatchString(repo.Name)
430 })
431 case *query.BranchesRepos:
432 for _, br := range setQuery.List {
433 setSize += int(br.Repos.GetCardinality())
434 }
435
436 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
437 for _, br := range setQuery.List {
438 if br.Repos.Contains(repo.ID) {
439 return true
440 }
441 }
442 return false
443 })
444 case *query.Meta:
445 // Meta queries filter repositories based on metadata fields.
446 // By checking this at the shard level, we can skip entire shards
447 // that don't contain any matching repositories, avoiding expensive
448 // I/O operations.
449 setSize = 0 // Unknown size, we'll filter based on metadata
450 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
451 if repo.Metadata == nil {
452 return false
453 }
454 v, ok := repo.Metadata[setQuery.Field]
455 if !ok {
456 return false
457 }
458 return setQuery.Value.MatchString(v)
459 })
460 default:
461 continue
462 }
463
464 // setSize may be larger than the number of shards we have. The size of
465 // filtered is bounded by min(len(set), len(shards))
466 if setSize > len(shards) {
467 setSize = len(shards)
468 }
469
470 filtered := make([]*rankedShard, 0, setSize)
471 filteredAll := true
472
473 for _, s := range shards {
474 if s.repos == nil {
475 // repos is nil if we failed to List the shard. This shouldn't
476 // happen, but if it does we don't know what is in it and must search
477 // it without simplifying the query.
478 filtered = append(filtered, s)
479 filteredAll = false
480 } else if any, all := hasRepos(s.repos); any {
481 filtered = append(filtered, s)
482 filteredAll = filteredAll && all
483 }
484 }
485
486 // We don't need to adjust the query since we are returning an empty set
487 // of shards to search.
488 if len(filtered) == 0 {
489 return filtered, and
490 }
491
492 // We can't simplify the query since we are searching shards which contain
493 // repos we aren't supposed to search.
494 if !filteredAll {
495 return filtered, and
496 }
497
498 // We don't want to mutate the original and, so we clone it before
499 // mutating it.
500 and = &query.And{Children: slices.Clone(and.Children)}
501
502 // This optimization allows us to avoid the work done by
503 // indexData.simplify for each shard.
504 //
505 // For example if our query is (and (reposet foo bar) (content baz))
506 // then at this point filtered is [foo bar] and q is the same. For each
507 // shard indexData.simplify will simplify to (and true (content baz)) ->
508 // (content baz). This work can be done now once, rather than per shard.
509 switch c := c.(type) {
510 case *query.RepoSet, *query.RepoIDs, *query.Repo, *query.Meta:
511 and.Children[i] = &query.Const{Value: true}
512 return filtered, query.Simplify(and)
513
514 case *query.BranchesRepos:
515 // We can only replace if all the repos want the same branches. We
516 // simplify and just check that we are requesting 1 branch. The common
517 // case is just asking for HEAD, so this should be effective.
518 if len(c.List) != 1 {
519 return filtered, and
520 }
521
522 // Every repo wants the same branches, so we can replace RepoBranches
523 // with a list of branch queries.
524 and.Children[i] = &query.Branch{Pattern: c.List[0].Branch, Exact: true}
525 return filtered, query.Simplify(and)
526 }
527
528 // Stop after first RepoSet, otherwise we might append duplicate
529 // shards to `filtered`
530 return filtered, and
531 }
532
533 return shards, and
534}
535
536func (ss *shardedSearcher) Search(ctx context.Context, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) {
537 tr, ctx := trace.New(ctx, "shardedSearcher.Search", "")
538 tr.LazyLog(q, true)
539 tr.LazyPrintf("opts: %+v", opts)
540 defer func() {
541 if sr != nil {
542 tr.LazyPrintf("num files: %d", len(sr.Files))
543 tr.LazyPrintf("stats: %+v", sr.Stats)
544 }
545 if err != nil {
546 tr.LazyPrintf("error: %v", err)
547 tr.SetError(err)
548 }
549 tr.Finish()
550 }()
551 ctx, cancel := context.WithCancel(ctx)
552 defer cancel()
553
554 collectSender := newCollectSender(opts)
555
556 start := time.Now()
557 proc, err := ss.sched.Acquire(ctx)
558 if err != nil {
559 return nil, err
560 }
561 defer proc.Release()
562 tr.LazyPrintf("acquired process")
563
564 wait := time.Since(start)
565 start = time.Now()
566
567 loaded := ss.getLoaded()
568 done, err := streamSearch(ctx, proc, q, opts, loaded.shards, collectSender)
569 defer done()
570 if err != nil {
571 return nil, err
572 }
573
574 aggregate, ok := collectSender.Done()
575 if !ok {
576 aggregate = &zoekt.SearchResult{
577 RepoURLs: map[string]string{},
578 LineFragments: map[string]string{},
579 }
580 }
581
582 copyFiles(aggregate)
583
584 if !loaded.ready {
585 // We may have missed results due to not being fully loaded.
586 aggregate.Stats.Crashes++
587 }
588
589 aggregate.Stats.Wait = wait
590 aggregate.Stats.Duration = time.Since(start)
591
592 return aggregate, nil
593}
594
595func (ss *shardedSearcher) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, sender zoekt.Sender) (err error) {
596 tr, ctx := trace.New(ctx, "shardedSearcher.StreamSearch", "")
597 defer func() {
598 if err != nil {
599 tr.LazyPrintf("error: %v", err)
600 tr.SetError(err)
601 }
602 tr.Finish()
603 }()
604
605 start := time.Now()
606 proc, err := ss.sched.Acquire(ctx)
607 if err != nil {
608 return err
609 }
610 defer proc.Release()
611 tr.LazyPrintf("acquired process")
612
613 loaded := ss.getLoaded()
614 shards := loaded.shards
615
616 maxPendingPriority := math.Inf(-1)
617 if len(shards) > 0 {
618 maxPendingPriority = shards[0].priority
619 }
620
621 stillLoadingCrashes := 0
622 if !loaded.ready {
623 // We may have missed results due to not being fully loaded.
624 stillLoadingCrashes++
625 }
626
627 sender.Send(&zoekt.SearchResult{
628 Stats: zoekt.Stats{
629 Crashes: stillLoadingCrashes,
630 Wait: time.Since(start),
631 },
632 Progress: zoekt.Progress{
633 MaxPendingPriority: maxPendingPriority,
634 },
635 })
636
637 // Matches flow from the shards up the stack in the following order:
638 //
639 // 1. Search shards
640 // 2. flushCollectSender (aggregate)
641 // 3. limitSender (limit)
642 // 4. copyFileSender (copy)
643 //
644 // For streaming, the wrapping has to happen in the inverted order.
645 sender = copyFileSender(sender)
646
647 if truncator, hasLimits := index.NewDisplayTruncator(opts); hasLimits {
648 var cancel context.CancelFunc
649 ctx, cancel = context.WithCancel(ctx)
650 defer cancel()
651 sender = limitSender(cancel, sender, truncator)
652 }
653
654 sender, flush := newFlushCollectSender(opts, sender)
655
656 done, err := streamSearch(ctx, proc, q, opts, shards, sender)
657
658 // Even though streaming is done, we may have results sitting in a buffer we
659 // need to flush. So we need to send those before calling done.
660 flush()
661 done()
662
663 return err
664}
665
666// streamSearch is an internal helper since both Search and StreamSearch are
667// largely similar.
668//
669// done must always be called, even if err is non-nil. The SearchResults sent
670// via sender contain references to the underlying mmap data that the garbage
671// collector can't see. Calling done informs the garbage collector it is free
672// to collect those shards. The caller must call copyFiles on any
673// SearchResults it returns/streams out before calling done.
674func streamSearch(ctx context.Context, proc *process, q query.Q, opts *zoekt.SearchOptions, shards []*rankedShard, sender zoekt.Sender) (done func(), err error) {
675 tr, ctx := trace.New(ctx, "shardedSearcher.streamSearch", "")
676 overallStart := time.Now()
677 metricSearchRunning.Inc()
678 defer func() {
679 metricSearchRunning.Dec()
680 metricSearchDuration.Observe(time.Since(overallStart).Seconds())
681 if err != nil {
682 metricSearchFailedTotal.Inc()
683
684 tr.LazyPrintf("error: %v", err)
685 tr.SetError(err)
686 }
687 tr.Finish()
688 }()
689
690 // Select the subset of shards that we will search over for the given query.
691 {
692 beforeLen := len(shards)
693 beforeQ := q
694 shards, q = selectRepoSet(shards, q)
695 tr.LazyPrintf("selectRepoSet shards=%d->%d q=%s->%s", beforeLen, len(shards), beforeQ, q)
696 }
697
698 if len(shards) == 0 {
699 return func() {}, nil
700 }
701
702 var cancel context.CancelFunc
703 if opts.MaxWallTime == 0 {
704 ctx, cancel = context.WithCancel(ctx)
705 } else {
706 ctx, cancel = context.WithTimeout(ctx, opts.MaxWallTime)
707 }
708
709 defer cancel()
710
711 // We set the number of workers to GOMAXPROCS, or the number of shards,
712 // whichever is smaller.
713 workers := min(runtime.GOMAXPROCS(0), len(shards))
714
715 type result struct {
716 priority float64
717 *zoekt.SearchResult
718 err error
719 }
720
721 var (
722 // buffered channels to continue searching when sending back results
723 // takes a while / blocks. The maximum pending result set is workers * 2.
724 results = make(chan *result, workers)
725 search = make(chan *rankedShard, workers)
726 wg sync.WaitGroup
727 )
728
729 // Start workers that receive shards from the search channel, search them,
730 // and send the results to the results channel. This process is repeated
731 // until the search channel is closed.
732 //
733 // Note: Making "search" a buffered channel has the effect of limiting the number of parallel shard searches.
734 // Since searching is mostly CPU bound, limiting parallel shard searches also reduces the peak working set.
735 wg.Add(workers)
736 for range workers {
737 go func() {
738 defer wg.Done()
739 for s := range search {
740 sr, err := searchOneShard(ctx, s, q, opts)
741 r := &result{priority: s.priority, SearchResult: sr, err: err}
742 results <- r
743 }
744 }()
745 }
746
747 go func() {
748 wg.Wait()
749 close(results)
750 }()
751
752 var (
753 pending = make(prioritySlice, 0, workers)
754 shard = 0
755 next = shards[shard]
756
757 // We need a separate nil-able reference to the same channel so we can close(search) for the worker
758 // go-routines to finish but also set work to nil in order for the select statement below to ignore
759 // that case when we want to stop a search. This is needed because sending on a closed channel panics.
760 work = search
761 )
762
763 stop := func() {
764 if work != nil {
765 close(search)
766 work = nil
767 next = nil
768 }
769 }
770
771 // tracked so we can stop when we hit TotalMaxMatchCount
772 var totalMatchCount int
773
774search:
775 for {
776 // At the top of each iteration, have the proc associated with this search yield its won "timeslice"
777 // to possibly allow other searches to make progress
778 _ = proc.Yield(ctx) // Note: we let searchOneShard handle context errors
779
780 select {
781 case work <- next: // is there a worker available to search the next shard?
782 pending.append(next.priority)
783
784 shard++
785 if shard == len(shards) {
786 stop()
787 } else {
788 next = shards[shard]
789 }
790 case r, ok := <-results: // is there a result to send back?
791 if !ok {
792 break search
793 }
794
795 // delete this result's priority from pending before computing the new max pending priority
796 pending.remove(r.priority)
797
798 if r.err != nil {
799 // Set final error and stop searching new shards, but consume any pending
800 // search results.
801 stop()
802 err = r.err
803 continue
804 }
805
806 // Update the match count statistics and stop searching new shards if we've
807 // reached the limit set in the options.
808 totalMatchCount += r.SearchResult.Stats.MatchCount
809 if opts.TotalMaxMatchCount > 0 && totalMatchCount > opts.TotalMaxMatchCount {
810 stop()
811 }
812
813 observeMetrics(r.SearchResult)
814
815 r.Priority = r.priority
816 r.MaxPendingPriority = pending.max()
817
818 sendByRepository(r.SearchResult, opts, sender) // send the result back to the client
819 }
820 }
821
822 return func() { runtime.KeepAlive(shards) }, err
823}
824
825// sendByRepository splits a zoekt.SearchResult by repository and calls
826// sender.Send for each batch. Ranking in Sourcegraph expects zoekt.SearchResult
827// to contain results with the same zoekt.SearchResult.priority only.
828//
829// We split by repository instead of by priority because it is easier to set
830// RepoURLs and LineFragments in zoekt.SearchResult.
831func sendByRepository(result *zoekt.SearchResult, opts *zoekt.SearchOptions, sender zoekt.Sender) {
832 if len(result.RepoURLs) <= 1 || len(result.Files) == 0 {
833 index.SortFiles(result.Files)
834 sender.Send(result)
835 return
836 }
837
838 send := func(repoURL string, a, b int, stats zoekt.Stats) {
839 index.SortFiles(result.Files[a:b])
840
841 // FileMatch.Repository holds the repo's unique key (its URL), which is
842 // also the key used for RepoURLs/LineFragments.
843 filteredRepoURLs := map[string]string{repoURL: result.RepoURLs[repoURL]}
844 filteredLineFragments := map[string]string{repoURL: result.LineFragments[repoURL]}
845
846 // Filter RepoURLs and LineFragments to only those of repoName and its
847 // subRepositories if there are subRepositories
848 for _, file := range result.Files[a:b] {
849 name := file.SubRepositoryName
850 if name == "" {
851 continue
852 }
853 _, repoSet := filteredRepoURLs[name]
854 url, ok := result.RepoURLs[name]
855 if !repoSet && ok {
856 filteredRepoURLs[name] = url
857 }
858 _, fragSet := filteredLineFragments[name]
859 frag, ok := result.LineFragments[name]
860 if !fragSet && ok {
861 filteredLineFragments[name] = frag
862 }
863 }
864
865 sender.Send(&zoekt.SearchResult{
866 Stats: stats,
867 Progress: zoekt.Progress{
868 Priority: result.Files[a].RepositoryPriority,
869 MaxPendingPriority: result.MaxPendingPriority,
870 },
871 Files: result.Files[a:b],
872 RepoURLs: filteredRepoURLs,
873 LineFragments: filteredLineFragments,
874 })
875 }
876
877 var startIndex, endIndex int
878 curRepoID := result.Files[0].RepositoryID
879 curRepoURL := result.Files[0].Repository
880
881 fm := zoekt.FileMatch{}
882 for endIndex, fm = range result.Files {
883 if curRepoID != fm.RepositoryID {
884 // Stats must stay aggregate-able, hence we sent the aggregate stats with the
885 // last event.
886 send(curRepoURL, startIndex, endIndex, zoekt.Stats{})
887
888 startIndex = endIndex
889 curRepoID = fm.RepositoryID
890 curRepoURL = fm.Repository
891 }
892 }
893
894 send(curRepoURL, startIndex, endIndex+1, result.Stats)
895}
896
897func observeMetrics(sr *zoekt.SearchResult) {
898 metricSearchContentBytesLoadedTotal.Add(float64(sr.Stats.ContentBytesLoaded))
899 metricSearchIndexBytesLoadedTotal.Add(float64(sr.Stats.IndexBytesLoaded))
900 metricSearchCrashesTotal.Add(float64(sr.Stats.Crashes))
901 metricSearchFileCountTotal.Add(float64(sr.Stats.FileCount))
902 metricSearchShardFilesConsideredTotal.Add(float64(sr.Stats.ShardFilesConsidered))
903 metricSearchFilesConsideredTotal.Add(float64(sr.Stats.FilesConsidered))
904 metricSearchFilesLoadedTotal.Add(float64(sr.Stats.FilesLoaded))
905 metricSearchFilesSkippedTotal.Add(float64(sr.Stats.FilesSkipped))
906 metricSearchShardsSkippedTotal.Add(float64(sr.Stats.ShardsSkipped))
907 metricSearchMatchCountTotal.Add(float64(sr.Stats.MatchCount))
908 metricSearchNgramMatchesTotal.Add(float64(sr.Stats.NgramMatches))
909 metricSearchNgramLookupsTotal.Add(float64(sr.Stats.NgramLookups))
910 metricSearchRegexpsConsideredTotal.Add(float64(sr.Stats.RegexpsConsidered))
911}
912
913func copySlice(src *[]byte) {
914 if *src == nil {
915 return
916 }
917 dst := make([]byte, len(*src))
918 copy(dst, *src)
919 *src = dst
920}
921
922func copyFiles(sr *zoekt.SearchResult) {
923 for i := range sr.Files {
924 copySlice(&sr.Files[i].Content)
925 copySlice(&sr.Files[i].Checksum)
926 for l := range sr.Files[i].LineMatches {
927 copySlice(&sr.Files[i].LineMatches[l].Line)
928 copySlice(&sr.Files[i].LineMatches[l].Before)
929 copySlice(&sr.Files[i].LineMatches[l].After)
930 }
931 for c := range sr.Files[i].ChunkMatches {
932 copySlice(&sr.Files[i].ChunkMatches[c].Content)
933 }
934 }
935}
936
937func logShardCrash(operation string, s zoekt.Searcher, q query.Q, recovered any, stack []byte) {
938 fields := []sglog.Field{
939 sglog.String("operation", operation),
940 sglog.String("shard", s.String()),
941 sglog.String("query", q.String()),
942 sglog.String("stacktrace", string(stack)),
943 }
944
945 if err, ok := recovered.(error); ok {
946 fields = append(fields, sglog.Error(err))
947 } else {
948 fields = append(fields, sglog.String("panic", fmt.Sprint(recovered)))
949 }
950
951 shardRecoveryLogger().Error("crashed shard", fields...)
952}
953
954func searchOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) {
955 metricSearchShardRunning.Inc()
956 defer func() {
957 metricSearchShardRunning.Dec()
958 if e := recover(); e != nil {
959 logShardCrash("search", s, q, e, debug.Stack())
960
961 if sr == nil {
962 sr = &zoekt.SearchResult{}
963 }
964 sr.Stats.Crashes = 1
965 }
966 }()
967
968 return s.Search(ctx, q, opts)
969}
970
971type shardListResult struct {
972 rl *zoekt.RepoList
973 err error
974}
975
976func listOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.ListOptions, sink chan shardListResult) {
977 metricListShardRunning.Inc()
978 defer func() {
979 metricListShardRunning.Dec()
980 if r := recover(); r != nil {
981 logShardCrash("list", s, q, r, debug.Stack())
982 sink <- shardListResult{
983 &zoekt.RepoList{Crashes: 1}, nil,
984 }
985 }
986 }()
987
988 ms, err := s.List(ctx, q, opts)
989 sink <- shardListResult{ms, err}
990}
991
992func (ss *shardedSearcher) List(ctx context.Context, q query.Q, opts *zoekt.ListOptions) (rl *zoekt.RepoList, err error) {
993 tr, ctx := trace.New(ctx, "shardedSearcher.List", "")
994 metricListRunning.Inc()
995 defer func() {
996 metricListRunning.Dec()
997 if rl != nil {
998 tr.LazyPrintf("repos.size=%d reposmap.size=%d crashes=%d stats=%+v", len(rl.Repos), len(rl.ReposMap), rl.Crashes, rl.Stats)
999 }
1000 if err != nil {
1001 tr.LazyPrintf("error: %v", err)
1002 tr.SetError(err)
1003 }
1004 tr.Finish()
1005 }()
1006
1007 q = query.Simplify(q)
1008 isAll := false
1009 if c, ok := q.(*query.Const); ok {
1010 isAll = c.Value
1011 }
1012
1013 proc, err := ss.sched.Acquire(ctx)
1014 if err != nil {
1015 return nil, err
1016 }
1017 defer proc.Release()
1018 tr.LazyPrintf("acquired process")
1019
1020 loaded := ss.getLoaded()
1021 shards := loaded.shards
1022
1023 // Setup what we return now, since we may short circuit if there are no
1024 // shards to search.
1025 stillLoadingCrashes := 0
1026 if !loaded.ready {
1027 // We may have missed results due to not being fully loaded.
1028 stillLoadingCrashes++
1029 }
1030 agg := zoekt.RepoList{
1031 Crashes: stillLoadingCrashes,
1032 ReposMap: zoekt.ReposMap{},
1033 Repos: []*zoekt.RepoListEntry{},
1034 }
1035
1036 // PERF: Select the subset of shards that we will search over for the given
1037 // query. A common List query only asks for a specific repo, so this is an
1038 // important optimization.
1039 {
1040 beforeLen := len(shards)
1041 beforeQ := q
1042 shards, q = selectRepoSet(shards, q)
1043 tr.LazyPrintf("selectRepoSet shards=%d->%d q=%s->%s", beforeLen, len(shards), beforeQ, q)
1044 }
1045
1046 if len(shards) == 0 {
1047 return &agg, nil
1048 }
1049
1050 shardCount := len(shards)
1051 all := make(chan shardListResult, shardCount)
1052 feeder := make(chan zoekt.Searcher, len(shards))
1053 for _, s := range shards {
1054 feeder <- s
1055 }
1056 close(feeder)
1057
1058 for range runtime.GOMAXPROCS(0) {
1059 go func() {
1060 for s := range feeder {
1061 listOneShard(ctx, s, q, opts, all)
1062 }
1063 }()
1064 }
1065
1066 uniq := map[string]*zoekt.RepoListEntry{}
1067
1068 for range shards {
1069 r := <-all
1070 if r.err != nil {
1071 return nil, r.err
1072 }
1073
1074 agg.Crashes += r.rl.Crashes
1075 agg.Stats.Add(&r.rl.Stats)
1076
1077 for _, r := range r.rl.Repos {
1078 prev, ok := uniq[r.Repository.URL]
1079 if !ok {
1080 cp := *r // We need to copy because we mutate r.Stats when merging duplicates
1081 uniq[r.Repository.URL] = &cp
1082 } else {
1083 prev.Stats.Add(&r.Stats)
1084 }
1085 }
1086
1087 for id, r := range r.rl.ReposMap {
1088 _, ok := agg.ReposMap[id]
1089 if !ok {
1090 agg.ReposMap[id] = r
1091 }
1092 }
1093 }
1094
1095 agg.Repos = make([]*zoekt.RepoListEntry, 0, len(uniq))
1096 for _, r := range uniq {
1097 agg.Repos = append(agg.Repos, r)
1098 }
1099
1100 // Only one of these fields is populated and in all cases the size of that
1101 // field is the number of Repos.
1102 //
1103 // Note: we don't just add individual Stats.Repos since a repository can
1104 // have multiple shards.
1105 agg.Stats.Repos = len(uniq) + len(agg.ReposMap)
1106
1107 if isAll && len(agg.Repos) > 0 {
1108 reportListAllMetrics(agg.Repos)
1109 }
1110
1111 return &agg, nil
1112}
1113
1114func reportListAllMetrics(repos []*zoekt.RepoListEntry) {
1115 var stats zoekt.RepoStats
1116 for _, r := range repos {
1117 stats.Add(&r.Stats)
1118 }
1119
1120 metricListAllRepos.Set(float64(stats.Repos))
1121 metricListAllIndexBytes.Set(float64(stats.IndexBytes))
1122 metricListAllContentBytes.Set(float64(stats.ContentBytes))
1123 metricListAllDocuments.Set(float64(stats.Documents))
1124 metricListAllShards.Set(float64(stats.Shards))
1125 metricListAllNewLinesCount.Set(float64(stats.NewLinesCount))
1126 metricListAllDefaultBranchNewLinesCount.Set(float64(stats.DefaultBranchNewLinesCount))
1127 metricListAllOtherBranchesNewLinesCount.Set(float64(stats.OtherBranchesNewLinesCount))
1128}
1129
1130// getLoaded returns the currently loaded shards. Shared so do not mutate.
1131func (s *shardedSearcher) getLoaded() loaded {
1132 // next commit will store the true value of this, for now we keep the
1133 // backwards compatible behaviour.
1134 ready := s.ready.Load()
1135 // ranked is loaded after ready to avoid a race were ready is true but
1136 // ranked is still not the final set of shards.
1137 ranked, _ := s.ranked.Load().([]*rankedShard)
1138 return loaded{
1139 shards: ranked,
1140 ready: ready,
1141 }
1142}
1143
1144func mkRankedShard(s zoekt.Searcher) *rankedShard {
1145 q := query.Const{Value: true}
1146 // We need to use WithUnsafeContext here, otherwise we cannot return a proper
1147 // rankedShard. On the user request path we use selectRepoSet which relies on
1148 // rankedShard.repos being set.
1149 result, err := s.List(systemtenant.WithUnsafeContext(context.Background()), &q, nil)
1150 if err != nil {
1151 log.Printf("[ERROR] mkRankedShard(%s): failed to cache repository list: %v", s, err)
1152 return &rankedShard{Searcher: s}
1153 }
1154
1155 var (
1156 maxPriority float64
1157 repos = make([]*zoekt.Repository, 0, len(result.Repos))
1158 )
1159 for i := range result.Repos {
1160 repo := &result.Repos[i].Repository
1161 repos = append(repos, repo)
1162 if repo.RawConfig != nil {
1163 priority, _ := strconv.ParseFloat(repo.RawConfig["priority"], 64)
1164 if priority > maxPriority {
1165 maxPriority = priority
1166 }
1167 }
1168 }
1169
1170 return &rankedShard{
1171 Searcher: s,
1172 repos: repos,
1173 priority: maxPriority,
1174 }
1175}
1176
1177// markReady should be called once all shards have been passed into replace on
1178// startup. Once s is marked as ready it stops reporting a Crash in the
1179// response Stats.
1180func (s *shardedSearcher) markReady() {
1181 s.ready.CompareAndSwap(false, true)
1182}
1183
1184func (s *shardedSearcher) replace(shards map[string]zoekt.Searcher) {
1185 if len(shards) == 0 {
1186 return
1187 }
1188
1189 defer func(began time.Time) {
1190 metricShardsBatchReplaceDurationSeconds.Observe(time.Since(began).Seconds())
1191 }(time.Now())
1192
1193 s.mu.Lock()
1194 defer s.mu.Unlock()
1195
1196 for key, shard := range shards {
1197 var r *rankedShard
1198 if shard != nil {
1199 r = mkRankedShard(shard)
1200 }
1201
1202 old := s.shards[key]
1203 if shard == nil {
1204 delete(s.shards, key)
1205 } else {
1206 s.shards[key] = r
1207 }
1208
1209 if old != nil && old.Searcher != nil {
1210 // _ ___ /^^\ /^\ /^^\_
1211 // _ _@)@) \ ,,/ '` ~ `'~~ ', `\.
1212 // _/o\_ _ _ _/~`.`...'~\ ./~~..,'`','',.,' ' ~:
1213 // / `,'.~,~.~ . , . , ~|, ,/ .,' , ,. .. ,,. `, ~\_
1214 // ( ' _' _ '_` _ ' . , `\_/ .' ..' ' ` ` `.. `, \_
1215 // ~V~ V~ V~ V~ ~\ ` ' . ' , ' .,.,''`.,.''`.,.``. ', \_
1216 // _/\ /\ /\ /\_/, . ' , `_/~\_ .' .,. ,, , _/~\_ `. `. '., \_
1217 // < ~ ~ '~`'~'`, ., . `_: ::: \_ ' `_/ ::: \_ `.,' . ', \_
1218 // \ ' `_ '`_ _ ',/ _::_::_ \ _ _/ _::_::_ \ `.,'.,`., \-,-,-,_,_,
1219 // `'~~ `'~~ `'~~ `'~~ \(_)(_)(_)/ `~~' \(_)(_)(_)/ ~'`\_.._,._,'_;_;_;_;_;
1220 //
1221 // We can't just call Close now, because there may be ongoing searches
1222 // which have old in the shards list. Previously we used an exclusive
1223 // lock to guarantee there were no concurrent searches. However, that
1224 // led to blocking on the read path.
1225 //
1226 // We could introduce granular locking per rankedShard to know when
1227 // there are no more references. However, this becomes tricky in
1228 // practice. Instead we rely on the garbage collector noticing old is no
1229 // longer used. We take care in our searchers to runtime.KeepAlive until
1230 // we have stopped referencing the underling mmap data.
1231 runtime.SetFinalizer(old, func(r *rankedShard) {
1232 r.Close()
1233 })
1234 }
1235 }
1236
1237 ranked := make([]*rankedShard, 0, len(s.shards))
1238 for _, r := range s.shards {
1239 ranked = append(ranked, r)
1240 }
1241
1242 sort.Slice(ranked, func(i, j int) bool {
1243 priorityDiff := ranked[i].priority - ranked[j].priority
1244 if priorityDiff != 0 {
1245 return priorityDiff > 0
1246 }
1247 if len(ranked[i].repos) == 0 || len(ranked[j].repos) == 0 {
1248 // Protect against empty names which can happen if we fail to List or
1249 // the shard is full of tombstones. Prefer the shard which has names.
1250 return len(ranked[i].repos) >= len(ranked[j].repos)
1251 }
1252 return ranked[i].repos[0].Name < ranked[j].repos[0].Name
1253 })
1254
1255 s.ranked.Store(ranked)
1256
1257 metricShardsLoaded.Set(float64(len(ranked)))
1258}
1259
1260func loadShard(fn string) (zoekt.Searcher, error) {
1261 f, err := os.Open(fn)
1262 if err != nil {
1263 return nil, err
1264 }
1265
1266 iFile, err := index.NewIndexFile(f)
1267 if err != nil {
1268 return nil, err
1269 }
1270 s, err := index.NewSearcher(iFile)
1271 if err != nil {
1272 iFile.Close()
1273 return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err)
1274 }
1275
1276 return s, nil
1277}
1278
1279// prioritySlice is a trivial implementation of an array that provides three
1280// things: appending a value, removing a value, and getting the array's max.
1281// Operations take O(n) time, which is acceptable because N is restricted to
1282// GOMAXPROCS (i.e., number of cpu cores) by the shardedSearcher interface.
1283type prioritySlice []float64
1284
1285func (p *prioritySlice) append(pri float64) {
1286 *p = append(*p, pri)
1287}
1288
1289func (p *prioritySlice) remove(pri float64) {
1290 for i, opri := range *p {
1291 if opri == pri {
1292 if i != len(*p)-1 {
1293 // swap to make this element the tail
1294 (*p)[i] = (*p)[len(*p)-1]
1295 }
1296 // pop the end off
1297 *p = (*p)[:len(*p)-1]
1298 break
1299 }
1300 }
1301}
1302
1303func (p *prioritySlice) max() float64 {
1304 // remove() and max() could be combined, but this is easier to read and
1305 // the expected performance difference from the extra lock and loop is
1306 // almost certainly irrelevant.
1307 maxPri := math.Inf(-1)
1308 for _, pri := range *p {
1309 if pri > maxPri {
1310 maxPri = pri
1311 }
1312 }
1313 return maxPri
1314}