fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package search
16
17import (
18 "context"
19 "fmt"
20 "log"
21 "math"
22 "os"
23 "runtime"
24 "runtime/debug"
25 "slices"
26 "sort"
27 "strconv"
28 "sync"
29 "time"
30
31 "github.com/prometheus/client_golang/prometheus"
32 "github.com/prometheus/client_golang/prometheus/promauto"
33 "go.uber.org/atomic"
34 "golang.org/x/sync/semaphore"
35
36 "github.com/sourcegraph/zoekt"
37 "github.com/sourcegraph/zoekt/index"
38 "github.com/sourcegraph/zoekt/internal/tenant/systemtenant"
39 "github.com/sourcegraph/zoekt/internal/trace"
40 "github.com/sourcegraph/zoekt/query"
41)
42
43var (
44 metricShardsLoaded = promauto.NewGauge(prometheus.GaugeOpts{
45 Name: "zoekt_shards_loaded",
46 Help: "The number of shards currently loaded",
47 })
48 metricShardsLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
49 Name: "zoekt_shards_loaded_total",
50 Help: "The total number of shards loaded",
51 })
52 metricShardsLoadFailedTotal = promauto.NewCounter(prometheus.CounterOpts{
53 Name: "zoekt_shards_load_failed_total",
54 Help: "The total number of shard loads that failed",
55 })
56
57 metricSearchRunning = promauto.NewGauge(prometheus.GaugeOpts{
58 Name: "zoekt_search_running",
59 Help: "The number of concurrent search requests running",
60 })
61 metricSearchShardRunning = promauto.NewGauge(prometheus.GaugeOpts{
62 Name: "zoekt_search_shard_running",
63 Help: "The number of concurrent search requests in a shard running",
64 })
65 metricSearchFailedTotal = promauto.NewCounter(prometheus.CounterOpts{
66 Name: "zoekt_search_failed_total",
67 Help: "The total number of search requests that failed",
68 })
69 metricSearchDuration = promauto.NewHistogram(prometheus.HistogramOpts{
70 Name: "zoekt_search_duration_seconds",
71 Help: "The duration a search request took in seconds",
72 Buckets: prometheus.DefBuckets, // DefBuckets good for service timings
73 })
74
75 // A Counter per Stat. Name should match field in zoekt.Stats.
76 metricSearchContentBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
77 Name: "zoekt_search_content_loaded_bytes_total",
78 Help: "Total amount of I/O for reading contents",
79 })
80 metricSearchIndexBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
81 Name: "zoekt_search_index_loaded_bytes_total",
82 Help: "Total amount of I/O for reading from index",
83 })
84 metricSearchCrashesTotal = promauto.NewCounter(prometheus.CounterOpts{
85 Name: "zoekt_search_crashes_total",
86 Help: "Total number of search shards that had a crash",
87 })
88 metricSearchFileCountTotal = promauto.NewCounter(prometheus.CounterOpts{
89 Name: "zoekt_search_file_count_total",
90 Help: "Total number of files containing a match",
91 })
92 metricSearchShardFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{
93 Name: "zoekt_search_shard_files_considered_total",
94 Help: "Total number of files in shards that we considered",
95 })
96 metricSearchFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{
97 Name: "zoekt_search_files_considered_total",
98 Help: "Total files that we evaluated. Equivalent to files for which all atom matches (including negations) evaluated to true",
99 })
100 metricSearchFilesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
101 Name: "zoekt_search_files_loaded_total",
102 Help: "Total files for which we loaded file content to verify substring matches",
103 })
104 metricSearchFilesSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{
105 Name: "zoekt_search_files_skipped_total",
106 Help: "Total candidate files whose contents weren't examined because we gathered enough matches",
107 })
108 metricSearchShardsSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{
109 Name: "zoekt_search_shards_skipped_total",
110 Help: "Total shards that we did not process because a query was canceled",
111 })
112 metricSearchMatchCountTotal = promauto.NewCounter(prometheus.CounterOpts{
113 Name: "zoekt_search_match_count_total",
114 Help: "Total number of non-overlapping matches",
115 })
116 metricSearchNgramMatchesTotal = promauto.NewCounter(prometheus.CounterOpts{
117 Name: "zoekt_search_ngram_matches_total",
118 Help: "Total number of candidate matches as a result of searching ngrams",
119 })
120 metricSearchNgramLookupsTotal = promauto.NewCounter(prometheus.CounterOpts{
121 Name: "zoekt_search_ngram_lookups_total",
122 Help: "Total number of times we accessed an ngram in the index",
123 })
124 metricSearchRegexpsConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{
125 Name: "zoekt_search_regexps_considered_total",
126 Help: "Total number of times regexp was called on files that we evaluated",
127 })
128
129 metricListRunning = promauto.NewGauge(prometheus.GaugeOpts{
130 Name: "zoekt_list_running",
131 Help: "The number of concurrent list requests running",
132 })
133 metricListShardRunning = promauto.NewGauge(prometheus.GaugeOpts{
134 Name: "zoekt_list_shard_running",
135 Help: "The number of concurrent list requests in a shard running",
136 })
137 metricShardsBatchReplaceDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{
138 Name: "zoekt_shards_batch_replace_duration_seconds",
139 Help: "The time it takes to replace a batch of Searchers.",
140 Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30},
141 })
142 metricListAllRepos = promauto.NewGauge(prometheus.GaugeOpts{
143 Name: "zoekt_list_all_stats_repos",
144 Help: "The last List(true) value for RepoStats.Repos. Repos is used for aggregrating the number of repositories.",
145 })
146 metricListAllShards = promauto.NewGauge(prometheus.GaugeOpts{
147 Name: "zoekt_list_all_stats_shards",
148 Help: "The last List(true) value for RepoStats.Shards. Shards is the total number of search shards.",
149 })
150 metricListAllDocuments = promauto.NewGauge(prometheus.GaugeOpts{
151 Name: "zoekt_list_all_stats_documents",
152 Help: "The last List(true) value for RepoStats.Documents. Documents holds the number of documents or files.",
153 })
154 metricListAllIndexBytes = promauto.NewGauge(prometheus.GaugeOpts{
155 Name: "zoekt_list_all_stats_index_bytes",
156 Help: "The last List(true) value for RepoStats.IndexBytes. IndexBytes is the amount of RAM used for index overhead.",
157 })
158 metricListAllContentBytes = promauto.NewGauge(prometheus.GaugeOpts{
159 Name: "zoekt_list_all_stats_content_bytes",
160 Help: "The last List(true) value for RepoStats.ContentBytes. ContentBytes is the amount of RAM used for raw content.",
161 })
162 metricListAllNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
163 Name: "zoekt_list_all_stats_new_lines_count",
164 Help: "The last List(true) value for RepoStats.NewLinesCount.",
165 })
166 metricListAllDefaultBranchNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
167 Name: "zoekt_list_all_stats_default_branch_new_lines_count",
168 Help: "The last List(true) value for RepoStats.DefaultBranchNewLinesCount.",
169 })
170 metricListAllOtherBranchesNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
171 Name: "zoekt_list_all_stats_other_branches_new_lines_count",
172 Help: "The last List(true) value for RepoStats.OtherBranchesNewLinesCount.",
173 })
174)
175
176type rankedShard struct {
177 zoekt.Searcher
178
179 priority float64 // maximum priority across all repos in the shard
180
181 // We have out of band ranking on compound shards which can change even if
182 // the shard file does not. So we compute a rank in getShards. We store
183 // repos here to avoid the cost of List in the search request path.
184 //
185 // repos is nil only if that call failed.
186 repos []*zoekt.Repository
187}
188
189// loaded stores the state we compute when updating the state of shards from
190// disk.
191type loaded struct {
192 // shards is the currently loaded shards sorted by decreasing rank and
193 // should not be mutated.
194 shards []*rankedShard
195
196 // ready is true if sharded searcher has finished loading all initial
197 // shards on startup.
198 ready bool
199}
200
201type shardedSearcher struct {
202 // Limit the number of parallel queries. Since searching is
203 // CPU bound, we can't do better than #CPU queries in
204 // parallel. If we do so, we just create more memory
205 // pressure.
206 sched scheduler
207
208 mu sync.Mutex // protects writes to shards
209 shards map[string]*rankedShard
210
211 ready atomic.Bool
212 ranked atomic.Value
213}
214
215func newShardedSearcher(n int64) *shardedSearcher {
216 ss := &shardedSearcher{
217 shards: make(map[string]*rankedShard),
218 sched: newScheduler(n),
219 }
220 return ss
221}
222
223// NewDirectorySearcher returns a searcher instance that loads all
224// shards corresponding to a glob into memory.
225func NewDirectorySearcher(dir string) (zoekt.Streamer, error) {
226 return newDirectorySearcher(dir, true)
227}
228
229// NewDirectorySearcherFast is like NewDirectorySearcher, but does not block
230// on the initial loading of shards.
231//
232// This exists since in the case of zoekt-webserver we are happy with having
233// partial availability since that is better than no availability on large
234// instances.
235func NewDirectorySearcherFast(dir string) (zoekt.Streamer, error) {
236 return newDirectorySearcher(dir, false)
237}
238
239func newDirectorySearcher(dir string, waitUntilReady bool) (zoekt.Streamer, error) {
240 ss := newShardedSearcher(int64(runtime.GOMAXPROCS(0)))
241 tl := &loader{
242 ss: ss,
243 }
244 dw, err := newDirectoryWatcher(dir, tl)
245 if err != nil {
246 return nil, err
247 }
248
249 if waitUntilReady {
250 if err := dw.WaitUntilReady(); err != nil {
251 return nil, err
252 }
253 }
254
255 ds := &directorySearcher{
256 Streamer: ss,
257 directoryWatcher: dw,
258 }
259
260 return &typeRepoSearcher{Streamer: ds}, nil
261}
262
263type directorySearcher struct {
264 zoekt.Streamer
265
266 directoryWatcher *DirectoryWatcher
267}
268
269func (s *directorySearcher) Close() {
270 // We need to Stop directoryWatcher first since it calls load/unload on
271 // Searcher.
272 s.directoryWatcher.Stop()
273 s.Streamer.Close()
274}
275
276type loader struct {
277 ss *shardedSearcher
278}
279
280func (tl *loader) load(keys ...string) {
281 // This is called with all keys on startup, so once this function has
282 // finished running shardedSearcher will be ready.
283 defer tl.ss.markReady()
284
285 if len(keys) == 0 {
286 // If there's nothing to load, we exit early here, but we want to mark
287 // ourselves as ready.
288 return
289 }
290
291 var (
292 mu sync.Mutex // synchronizes writes to the shards map
293 wg sync.WaitGroup // used to wait for all shards to load
294 sem = semaphore.NewWeighted(int64(runtime.GOMAXPROCS(0)))
295 loadedShards = make(map[string]zoekt.Searcher)
296 )
297
298 publishLoaded := func() {
299 mu.Lock()
300 chunk := loadedShards
301 loadedShards = make(map[string]zoekt.Searcher)
302 mu.Unlock()
303 tl.ss.replace(chunk)
304 }
305
306 log.Printf("[INFO] loading %d shard(s): %s", len(keys), humanTruncateList(keys, 5))
307
308 lastProgress := time.Now()
309 for i, key := range keys {
310 // If taking a while to start-up occasionally give a progress message
311 if time.Since(lastProgress) > 5*time.Second {
312 log.Printf("[INFO] still need to load %d shards...", len(keys)-i)
313 lastProgress = time.Now()
314
315 publishLoaded()
316 }
317
318 _ = sem.Acquire(context.Background(), 1)
319 wg.Add(1)
320
321 go func(key string) {
322 defer sem.Release(1)
323 defer wg.Done()
324
325 shard, err := loadShard(key)
326 if err != nil {
327 metricShardsLoadFailedTotal.Inc()
328 log.Printf("[ERROR] reloading: %s, err %v ", key, err)
329 return
330 }
331 metricShardsLoadedTotal.Inc()
332
333 mu.Lock()
334 loadedShards[key] = shard
335 mu.Unlock()
336 }(key)
337 }
338
339 wg.Wait()
340
341 publishLoaded()
342}
343
344func (tl *loader) drop(keys ...string) {
345 shards := make(map[string]zoekt.Searcher, len(keys))
346 for _, key := range keys {
347 shards[key] = nil
348 }
349 tl.ss.replace(shards)
350}
351
352func (ss *shardedSearcher) String() string {
353 return "shardedSearcher"
354}
355
356// Close closes references to open files. It may be called only once.
357func (ss *shardedSearcher) Close() {
358 ss.mu.Lock()
359 shards := make(map[string]zoekt.Searcher, len(ss.shards))
360 for k := range ss.shards {
361 shards[k] = nil
362 }
363 ss.mu.Unlock()
364
365 ss.replace(shards)
366}
367
368func selectRepoSet(shards []*rankedShard, q query.Q) ([]*rankedShard, query.Q) {
369 and, ok := q.(*query.And)
370 if ok {
371 return doSelectRepoSet(shards, and)
372 }
373
374 // We have queries which look like (reposet ...) and we want to do the same
375 // optimizations. To simplify we just always wrap the query in And and then
376 // on the return value call Simplify to unwrap. In particular this is
377 // important for List calls.
378 and = &query.And{Children: []query.Q{q}}
379 shards, q = doSelectRepoSet(shards, and)
380 return shards, query.Simplify(q)
381}
382
383func doSelectRepoSet(shards []*rankedShard, and *query.And) ([]*rankedShard, query.Q) {
384 // (and (reposet ...) (q))
385 // (and true (q)) with a filtered shards
386 // (and false) // noop
387
388 // (and (repobranches ...) (q))
389 // (and (repobranches ...) (q))
390
391 // Note: we also support (and (repo ...) (q)) even though sourcegraph does
392 // not generate those sorts of queries. This is to support manual testing.
393
394 hasReposForPredicate := func(pred func(repo *zoekt.Repository) bool) func(repos []*zoekt.Repository) (any, all bool) {
395 return func(repos []*zoekt.Repository) (any, all bool) {
396 any = false
397 all = true
398 for _, repo := range repos {
399 b := pred(repo)
400 any = any || b
401 all = all && b
402 }
403 return any, all
404 }
405 }
406
407 for i, c := range and.Children {
408 var setSize int
409 var hasRepos func([]*zoekt.Repository) (bool, bool)
410 switch setQuery := c.(type) {
411 case *query.RepoSet:
412 setSize = len(setQuery.Set)
413 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
414 return setQuery.Set[repo.Name]
415 })
416 case *query.RepoIDs:
417 setSize = int(setQuery.Repos.GetCardinality())
418 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
419 return setQuery.Repos.Contains(repo.ID)
420 })
421 case *query.Repo:
422 setSize = 0
423 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
424 return setQuery.Regexp.MatchString(repo.Name)
425 })
426 case *query.BranchesRepos:
427 for _, br := range setQuery.List {
428 setSize += int(br.Repos.GetCardinality())
429 }
430
431 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
432 for _, br := range setQuery.List {
433 if br.Repos.Contains(repo.ID) {
434 return true
435 }
436 }
437 return false
438 })
439 case *query.Meta:
440 // Meta queries filter repositories based on metadata fields.
441 // By checking this at the shard level, we can skip entire shards
442 // that don't contain any matching repositories, avoiding expensive
443 // I/O operations.
444 setSize = 0 // Unknown size, we'll filter based on metadata
445 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
446 if repo.Metadata == nil {
447 return false
448 }
449 v, ok := repo.Metadata[setQuery.Field]
450 if !ok {
451 return false
452 }
453 return setQuery.Value.MatchString(v)
454 })
455 default:
456 continue
457 }
458
459 // setSize may be larger than the number of shards we have. The size of
460 // filtered is bounded by min(len(set), len(shards))
461 if setSize > len(shards) {
462 setSize = len(shards)
463 }
464
465 filtered := make([]*rankedShard, 0, setSize)
466 filteredAll := true
467
468 for _, s := range shards {
469 if s.repos == nil {
470 // repos is nil if we failed to List the shard. This shouldn't
471 // happen, but if it does we don't know what is in it and must search
472 // it without simplifying the query.
473 filtered = append(filtered, s)
474 filteredAll = false
475 } else if any, all := hasRepos(s.repos); any {
476 filtered = append(filtered, s)
477 filteredAll = filteredAll && all
478 }
479 }
480
481 // We don't need to adjust the query since we are returning an empty set
482 // of shards to search.
483 if len(filtered) == 0 {
484 return filtered, and
485 }
486
487 // We can't simplify the query since we are searching shards which contain
488 // repos we aren't supposed to search.
489 if !filteredAll {
490 return filtered, and
491 }
492
493 // We don't want to mutate the original and, so we clone it before
494 // mutating it.
495 and = &query.And{Children: slices.Clone(and.Children)}
496
497 // This optimization allows us to avoid the work done by
498 // indexData.simplify for each shard.
499 //
500 // For example if our query is (and (reposet foo bar) (content baz))
501 // then at this point filtered is [foo bar] and q is the same. For each
502 // shard indexData.simplify will simplify to (and true (content baz)) ->
503 // (content baz). This work can be done now once, rather than per shard.
504 switch c := c.(type) {
505 case *query.RepoSet, *query.RepoIDs, *query.Repo, *query.Meta:
506 and.Children[i] = &query.Const{Value: true}
507 return filtered, query.Simplify(and)
508
509 case *query.BranchesRepos:
510 // We can only replace if all the repos want the same branches. We
511 // simplify and just check that we are requesting 1 branch. The common
512 // case is just asking for HEAD, so this should be effective.
513 if len(c.List) != 1 {
514 return filtered, and
515 }
516
517 // Every repo wants the same branches, so we can replace RepoBranches
518 // with a list of branch queries.
519 and.Children[i] = &query.Branch{Pattern: c.List[0].Branch, Exact: true}
520 return filtered, query.Simplify(and)
521 }
522
523 // Stop after first RepoSet, otherwise we might append duplicate
524 // shards to `filtered`
525 return filtered, and
526 }
527
528 return shards, and
529}
530
531func (ss *shardedSearcher) Search(ctx context.Context, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) {
532 tr, ctx := trace.New(ctx, "shardedSearcher.Search", "")
533 tr.LazyLog(q, true)
534 tr.LazyPrintf("opts: %+v", opts)
535 defer func() {
536 if sr != nil {
537 tr.LazyPrintf("num files: %d", len(sr.Files))
538 tr.LazyPrintf("stats: %+v", sr.Stats)
539 }
540 if err != nil {
541 tr.LazyPrintf("error: %v", err)
542 tr.SetError(err)
543 }
544 tr.Finish()
545 }()
546 ctx, cancel := context.WithCancel(ctx)
547 defer cancel()
548
549 collectSender := newCollectSender(opts)
550
551 start := time.Now()
552 proc, err := ss.sched.Acquire(ctx)
553 if err != nil {
554 return nil, err
555 }
556 defer proc.Release()
557 tr.LazyPrintf("acquired process")
558
559 wait := time.Since(start)
560 start = time.Now()
561
562 loaded := ss.getLoaded()
563 done, err := streamSearch(ctx, proc, q, opts, loaded.shards, collectSender)
564 defer done()
565 if err != nil {
566 return nil, err
567 }
568
569 aggregate, ok := collectSender.Done()
570 if !ok {
571 aggregate = &zoekt.SearchResult{
572 RepoURLs: map[string]string{},
573 LineFragments: map[string]string{},
574 }
575 }
576
577 copyFiles(aggregate)
578
579 if !loaded.ready {
580 // We may have missed results due to not being fully loaded.
581 aggregate.Stats.Crashes++
582 }
583
584 aggregate.Stats.Wait = wait
585 aggregate.Stats.Duration = time.Since(start)
586
587 return aggregate, nil
588}
589
590func (ss *shardedSearcher) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, sender zoekt.Sender) (err error) {
591 tr, ctx := trace.New(ctx, "shardedSearcher.StreamSearch", "")
592 defer func() {
593 if err != nil {
594 tr.LazyPrintf("error: %v", err)
595 tr.SetError(err)
596 }
597 tr.Finish()
598 }()
599
600 start := time.Now()
601 proc, err := ss.sched.Acquire(ctx)
602 if err != nil {
603 return err
604 }
605 defer proc.Release()
606 tr.LazyPrintf("acquired process")
607
608 loaded := ss.getLoaded()
609 shards := loaded.shards
610
611 maxPendingPriority := math.Inf(-1)
612 if len(shards) > 0 {
613 maxPendingPriority = shards[0].priority
614 }
615
616 stillLoadingCrashes := 0
617 if !loaded.ready {
618 // We may have missed results due to not being fully loaded.
619 stillLoadingCrashes++
620 }
621
622 sender.Send(&zoekt.SearchResult{
623 Stats: zoekt.Stats{
624 Crashes: stillLoadingCrashes,
625 Wait: time.Since(start),
626 },
627 Progress: zoekt.Progress{
628 MaxPendingPriority: maxPendingPriority,
629 },
630 })
631
632 // Matches flow from the shards up the stack in the following order:
633 //
634 // 1. Search shards
635 // 2. flushCollectSender (aggregate)
636 // 3. limitSender (limit)
637 // 4. copyFileSender (copy)
638 //
639 // For streaming, the wrapping has to happen in the inverted order.
640 sender = copyFileSender(sender)
641
642 if truncator, hasLimits := index.NewDisplayTruncator(opts); hasLimits {
643 var cancel context.CancelFunc
644 ctx, cancel = context.WithCancel(ctx)
645 defer cancel()
646 sender = limitSender(cancel, sender, truncator)
647 }
648
649 sender, flush := newFlushCollectSender(opts, sender)
650
651 done, err := streamSearch(ctx, proc, q, opts, shards, sender)
652
653 // Even though streaming is done, we may have results sitting in a buffer we
654 // need to flush. So we need to send those before calling done.
655 flush()
656 done()
657
658 return err
659}
660
661// streamSearch is an internal helper since both Search and StreamSearch are
662// largely similar.
663//
664// done must always be called, even if err is non-nil. The SearchResults sent
665// via sender contain references to the underlying mmap data that the garbage
666// collector can't see. Calling done informs the garbage collector it is free
667// to collect those shards. The caller must call copyFiles on any
668// SearchResults it returns/streams out before calling done.
669func streamSearch(ctx context.Context, proc *process, q query.Q, opts *zoekt.SearchOptions, shards []*rankedShard, sender zoekt.Sender) (done func(), err error) {
670 tr, ctx := trace.New(ctx, "shardedSearcher.streamSearch", "")
671 overallStart := time.Now()
672 metricSearchRunning.Inc()
673 defer func() {
674 metricSearchRunning.Dec()
675 metricSearchDuration.Observe(time.Since(overallStart).Seconds())
676 if err != nil {
677 metricSearchFailedTotal.Inc()
678
679 tr.LazyPrintf("error: %v", err)
680 tr.SetError(err)
681 }
682 tr.Finish()
683 }()
684
685 // Select the subset of shards that we will search over for the given query.
686 {
687 beforeLen := len(shards)
688 beforeQ := q
689 shards, q = selectRepoSet(shards, q)
690 tr.LazyPrintf("selectRepoSet shards=%d->%d q=%s->%s", beforeLen, len(shards), beforeQ, q)
691 }
692
693 if len(shards) == 0 {
694 return func() {}, nil
695 }
696
697 var cancel context.CancelFunc
698 if opts.MaxWallTime == 0 {
699 ctx, cancel = context.WithCancel(ctx)
700 } else {
701 ctx, cancel = context.WithTimeout(ctx, opts.MaxWallTime)
702 }
703
704 defer cancel()
705
706 // We set the number of workers to GOMAXPROCS, or the number of shards,
707 // whichever is smaller.
708 workers := min(runtime.GOMAXPROCS(0), len(shards))
709
710 type result struct {
711 priority float64
712 *zoekt.SearchResult
713 err error
714 }
715
716 var (
717 // buffered channels to continue searching when sending back results
718 // takes a while / blocks. The maximum pending result set is workers * 2.
719 results = make(chan *result, workers)
720 search = make(chan *rankedShard, workers)
721 wg sync.WaitGroup
722 )
723
724 // Start workers that receive shards from the search channel, search them,
725 // and send the results to the results channel. This process is repeated
726 // until the search channel is closed.
727 //
728 // Note: Making "search" a buffered channel has the effect of limiting the number of parallel shard searches.
729 // Since searching is mostly CPU bound, limiting parallel shard searches also reduces the peak working set.
730 wg.Add(workers)
731 for range workers {
732 go func() {
733 defer wg.Done()
734 for s := range search {
735 sr, err := searchOneShard(ctx, s, q, opts)
736 r := &result{priority: s.priority, SearchResult: sr, err: err}
737 results <- r
738 }
739 }()
740 }
741
742 go func() {
743 wg.Wait()
744 close(results)
745 }()
746
747 var (
748 pending = make(prioritySlice, 0, workers)
749 shard = 0
750 next = shards[shard]
751
752 // We need a separate nil-able reference to the same channel so we can close(search) for the worker
753 // go-routines to finish but also set work to nil in order for the select statement below to ignore
754 // that case when we want to stop a search. This is needed because sending on a closed channel panics.
755 work = search
756 )
757
758 stop := func() {
759 if work != nil {
760 close(search)
761 work = nil
762 next = nil
763 }
764 }
765
766 // tracked so we can stop when we hit TotalMaxMatchCount
767 var totalMatchCount int
768
769search:
770 for {
771 // At the top of each iteration, have the proc associated with this search yield its won "timeslice"
772 // to possibly allow other searches to make progress
773 _ = proc.Yield(ctx) // Note: we let searchOneShard handle context errors
774
775 select {
776 case work <- next: // is there a worker available to search the next shard?
777 pending.append(next.priority)
778
779 shard++
780 if shard == len(shards) {
781 stop()
782 } else {
783 next = shards[shard]
784 }
785 case r, ok := <-results: // is there a result to send back?
786 if !ok {
787 break search
788 }
789
790 // delete this result's priority from pending before computing the new max pending priority
791 pending.remove(r.priority)
792
793 if r.err != nil {
794 // Set final error and stop searching new shards, but consume any pending
795 // search results.
796 stop()
797 err = r.err
798 continue
799 }
800
801 // Update the match count statistics and stop searching new shards if we've
802 // reached the limit set in the options.
803 totalMatchCount += r.SearchResult.Stats.MatchCount
804 if opts.TotalMaxMatchCount > 0 && totalMatchCount > opts.TotalMaxMatchCount {
805 stop()
806 }
807
808 observeMetrics(r.SearchResult)
809
810 r.Priority = r.priority
811 r.MaxPendingPriority = pending.max()
812
813 sendByRepository(r.SearchResult, opts, sender) // send the result back to the client
814 }
815 }
816
817 return func() { runtime.KeepAlive(shards) }, err
818}
819
820// sendByRepository splits a zoekt.SearchResult by repository and calls
821// sender.Send for each batch. Ranking in Sourcegraph expects zoekt.SearchResult
822// to contain results with the same zoekt.SearchResult.priority only.
823//
824// We split by repository instead of by priority because it is easier to set
825// RepoURLs and LineFragments in zoekt.SearchResult.
826func sendByRepository(result *zoekt.SearchResult, opts *zoekt.SearchOptions, sender zoekt.Sender) {
827 if len(result.RepoURLs) <= 1 || len(result.Files) == 0 {
828 index.SortFiles(result.Files)
829 sender.Send(result)
830 return
831 }
832
833 send := func(repoName string, a, b int, stats zoekt.Stats) {
834 index.SortFiles(result.Files[a:b])
835
836 filteredRepoURLs := map[string]string{repoName: result.RepoURLs[repoName]}
837 filteredLineFragments := map[string]string{repoName: result.LineFragments[repoName]}
838
839 // Filter RepoURLs and LineFragments to only those of repoName and its
840 // subRepositories if there are subRepositories
841 for _, file := range result.Files[a:b] {
842 name := file.SubRepositoryName
843 if name == "" {
844 continue
845 }
846 _, repoSet := filteredRepoURLs[name]
847 url, ok := result.RepoURLs[name]
848 if !repoSet && ok {
849 filteredRepoURLs[name] = url
850 }
851 _, fragSet := filteredLineFragments[name]
852 frag, ok := result.LineFragments[name]
853 if !fragSet && ok {
854 filteredLineFragments[name] = frag
855 }
856 }
857
858 sender.Send(&zoekt.SearchResult{
859 Stats: stats,
860 Progress: zoekt.Progress{
861 Priority: result.Files[a].RepositoryPriority,
862 MaxPendingPriority: result.MaxPendingPriority,
863 },
864 Files: result.Files[a:b],
865 RepoURLs: filteredRepoURLs,
866 LineFragments: filteredLineFragments,
867 })
868 }
869
870 var startIndex, endIndex int
871 curRepoID := result.Files[0].RepositoryID
872 curRepoName := result.Files[0].Repository
873
874 fm := zoekt.FileMatch{}
875 for endIndex, fm = range result.Files {
876 if curRepoID != fm.RepositoryID {
877 // Stats must stay aggregate-able, hence we sent the aggregate stats with the
878 // last event.
879 send(curRepoName, startIndex, endIndex, zoekt.Stats{})
880
881 startIndex = endIndex
882 curRepoID = fm.RepositoryID
883 curRepoName = fm.Repository
884 }
885 }
886
887 send(curRepoName, startIndex, endIndex+1, result.Stats)
888}
889
890func observeMetrics(sr *zoekt.SearchResult) {
891 metricSearchContentBytesLoadedTotal.Add(float64(sr.Stats.ContentBytesLoaded))
892 metricSearchIndexBytesLoadedTotal.Add(float64(sr.Stats.IndexBytesLoaded))
893 metricSearchCrashesTotal.Add(float64(sr.Stats.Crashes))
894 metricSearchFileCountTotal.Add(float64(sr.Stats.FileCount))
895 metricSearchShardFilesConsideredTotal.Add(float64(sr.Stats.ShardFilesConsidered))
896 metricSearchFilesConsideredTotal.Add(float64(sr.Stats.FilesConsidered))
897 metricSearchFilesLoadedTotal.Add(float64(sr.Stats.FilesLoaded))
898 metricSearchFilesSkippedTotal.Add(float64(sr.Stats.FilesSkipped))
899 metricSearchShardsSkippedTotal.Add(float64(sr.Stats.ShardsSkipped))
900 metricSearchMatchCountTotal.Add(float64(sr.Stats.MatchCount))
901 metricSearchNgramMatchesTotal.Add(float64(sr.Stats.NgramMatches))
902 metricSearchNgramLookupsTotal.Add(float64(sr.Stats.NgramLookups))
903 metricSearchRegexpsConsideredTotal.Add(float64(sr.Stats.RegexpsConsidered))
904}
905
906func copySlice(src *[]byte) {
907 if *src == nil {
908 return
909 }
910 dst := make([]byte, len(*src))
911 copy(dst, *src)
912 *src = dst
913}
914
915func copyFiles(sr *zoekt.SearchResult) {
916 for i := range sr.Files {
917 copySlice(&sr.Files[i].Content)
918 copySlice(&sr.Files[i].Checksum)
919 for l := range sr.Files[i].LineMatches {
920 copySlice(&sr.Files[i].LineMatches[l].Line)
921 copySlice(&sr.Files[i].LineMatches[l].Before)
922 copySlice(&sr.Files[i].LineMatches[l].After)
923 }
924 for c := range sr.Files[i].ChunkMatches {
925 copySlice(&sr.Files[i].ChunkMatches[c].Content)
926 }
927 }
928}
929
930func searchOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) {
931 metricSearchShardRunning.Inc()
932 defer func() {
933 metricSearchShardRunning.Dec()
934 if e := recover(); e != nil {
935 log.Printf("[ERROR] crashed shard: %s: %#v, %s", s, e, debug.Stack())
936
937 if sr == nil {
938 sr = &zoekt.SearchResult{}
939 }
940 sr.Stats.Crashes = 1
941 }
942 }()
943
944 return s.Search(ctx, q, opts)
945}
946
947type shardListResult struct {
948 rl *zoekt.RepoList
949 err error
950}
951
952func listOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.ListOptions, sink chan shardListResult) {
953 metricListShardRunning.Inc()
954 defer func() {
955 metricListShardRunning.Dec()
956 if r := recover(); r != nil {
957 log.Printf("[ERROR] crashed shard: %s: %s, %s", s.String(), r, debug.Stack())
958 sink <- shardListResult{
959 &zoekt.RepoList{Crashes: 1}, nil,
960 }
961 }
962 }()
963
964 ms, err := s.List(ctx, q, opts)
965 sink <- shardListResult{ms, err}
966}
967
968func (ss *shardedSearcher) List(ctx context.Context, q query.Q, opts *zoekt.ListOptions) (rl *zoekt.RepoList, err error) {
969 tr, ctx := trace.New(ctx, "shardedSearcher.List", "")
970 metricListRunning.Inc()
971 defer func() {
972 metricListRunning.Dec()
973 if rl != nil {
974 tr.LazyPrintf("repos.size=%d reposmap.size=%d crashes=%d stats=%+v", len(rl.Repos), len(rl.ReposMap), rl.Crashes, rl.Stats)
975 }
976 if err != nil {
977 tr.LazyPrintf("error: %v", err)
978 tr.SetError(err)
979 }
980 tr.Finish()
981 }()
982
983 q = query.Simplify(q)
984 isAll := false
985 if c, ok := q.(*query.Const); ok {
986 isAll = c.Value
987 }
988
989 proc, err := ss.sched.Acquire(ctx)
990 if err != nil {
991 return nil, err
992 }
993 defer proc.Release()
994 tr.LazyPrintf("acquired process")
995
996 loaded := ss.getLoaded()
997 shards := loaded.shards
998
999 // Setup what we return now, since we may short circuit if there are no
1000 // shards to search.
1001 stillLoadingCrashes := 0
1002 if !loaded.ready {
1003 // We may have missed results due to not being fully loaded.
1004 stillLoadingCrashes++
1005 }
1006 agg := zoekt.RepoList{
1007 Crashes: stillLoadingCrashes,
1008 ReposMap: zoekt.ReposMap{},
1009 Repos: []*zoekt.RepoListEntry{},
1010 }
1011
1012 // PERF: Select the subset of shards that we will search over for the given
1013 // query. A common List query only asks for a specific repo, so this is an
1014 // important optimization.
1015 {
1016 beforeLen := len(shards)
1017 beforeQ := q
1018 shards, q = selectRepoSet(shards, q)
1019 tr.LazyPrintf("selectRepoSet shards=%d->%d q=%s->%s", beforeLen, len(shards), beforeQ, q)
1020 }
1021
1022 if len(shards) == 0 {
1023 return &agg, nil
1024 }
1025
1026 shardCount := len(shards)
1027 all := make(chan shardListResult, shardCount)
1028 feeder := make(chan zoekt.Searcher, len(shards))
1029 for _, s := range shards {
1030 feeder <- s
1031 }
1032 close(feeder)
1033
1034 for range runtime.GOMAXPROCS(0) {
1035 go func() {
1036 for s := range feeder {
1037 listOneShard(ctx, s, q, opts, all)
1038 }
1039 }()
1040 }
1041
1042 uniq := map[string]*zoekt.RepoListEntry{}
1043
1044 for range shards {
1045 r := <-all
1046 if r.err != nil {
1047 return nil, r.err
1048 }
1049
1050 agg.Crashes += r.rl.Crashes
1051 agg.Stats.Add(&r.rl.Stats)
1052
1053 for _, r := range r.rl.Repos {
1054 prev, ok := uniq[r.Repository.Name]
1055 if !ok {
1056 cp := *r // We need to copy because we mutate r.Stats when merging duplicates
1057 uniq[r.Repository.Name] = &cp
1058 } else {
1059 prev.Stats.Add(&r.Stats)
1060 }
1061 }
1062
1063 for id, r := range r.rl.ReposMap {
1064 _, ok := agg.ReposMap[id]
1065 if !ok {
1066 agg.ReposMap[id] = r
1067 }
1068 }
1069 }
1070
1071 agg.Repos = make([]*zoekt.RepoListEntry, 0, len(uniq))
1072 for _, r := range uniq {
1073 agg.Repos = append(agg.Repos, r)
1074 }
1075
1076 // Only one of these fields is populated and in all cases the size of that
1077 // field is the number of Repos.
1078 //
1079 // Note: we don't just add individual Stats.Repos since a repository can
1080 // have multiple shards.
1081 agg.Stats.Repos = len(uniq) + len(agg.ReposMap)
1082
1083 if isAll && len(agg.Repos) > 0 {
1084 reportListAllMetrics(agg.Repos)
1085 }
1086
1087 return &agg, nil
1088}
1089
1090func reportListAllMetrics(repos []*zoekt.RepoListEntry) {
1091 var stats zoekt.RepoStats
1092 for _, r := range repos {
1093 stats.Add(&r.Stats)
1094 }
1095
1096 metricListAllRepos.Set(float64(stats.Repos))
1097 metricListAllIndexBytes.Set(float64(stats.IndexBytes))
1098 metricListAllContentBytes.Set(float64(stats.ContentBytes))
1099 metricListAllDocuments.Set(float64(stats.Documents))
1100 metricListAllShards.Set(float64(stats.Shards))
1101 metricListAllNewLinesCount.Set(float64(stats.NewLinesCount))
1102 metricListAllDefaultBranchNewLinesCount.Set(float64(stats.DefaultBranchNewLinesCount))
1103 metricListAllOtherBranchesNewLinesCount.Set(float64(stats.OtherBranchesNewLinesCount))
1104}
1105
1106// getLoaded returns the currently loaded shards. Shared so do not mutate.
1107func (s *shardedSearcher) getLoaded() loaded {
1108 // next commit will store the true value of this, for now we keep the
1109 // backwards compatible behaviour.
1110 ready := s.ready.Load()
1111 // ranked is loaded after ready to avoid a race were ready is true but
1112 // ranked is still not the final set of shards.
1113 ranked, _ := s.ranked.Load().([]*rankedShard)
1114 return loaded{
1115 shards: ranked,
1116 ready: ready,
1117 }
1118}
1119
1120func mkRankedShard(s zoekt.Searcher) *rankedShard {
1121 q := query.Const{Value: true}
1122 // We need to use WithUnsafeContext here, otherwise we cannot return a proper
1123 // rankedShard. On the user request path we use selectRepoSet which relies on
1124 // rankedShard.repos being set.
1125 result, err := s.List(systemtenant.WithUnsafeContext(context.Background()), &q, nil)
1126 if err != nil {
1127 log.Printf("[ERROR] mkRankedShard(%s): failed to cache repository list: %v", s, err)
1128 return &rankedShard{Searcher: s}
1129 }
1130
1131 var (
1132 maxPriority float64
1133 repos = make([]*zoekt.Repository, 0, len(result.Repos))
1134 )
1135 for i := range result.Repos {
1136 repo := &result.Repos[i].Repository
1137 repos = append(repos, repo)
1138 if repo.RawConfig != nil {
1139 priority, _ := strconv.ParseFloat(repo.RawConfig["priority"], 64)
1140 if priority > maxPriority {
1141 maxPriority = priority
1142 }
1143 }
1144 }
1145
1146 return &rankedShard{
1147 Searcher: s,
1148 repos: repos,
1149 priority: maxPriority,
1150 }
1151}
1152
1153// markReady should be called once all shards have been passed into replace on
1154// startup. Once s is marked as ready it stops reporting a Crash in the
1155// response Stats.
1156func (s *shardedSearcher) markReady() {
1157 s.ready.CompareAndSwap(false, true)
1158}
1159
1160func (s *shardedSearcher) replace(shards map[string]zoekt.Searcher) {
1161 if len(shards) == 0 {
1162 return
1163 }
1164
1165 defer func(began time.Time) {
1166 metricShardsBatchReplaceDurationSeconds.Observe(time.Since(began).Seconds())
1167 }(time.Now())
1168
1169 s.mu.Lock()
1170 defer s.mu.Unlock()
1171
1172 for key, shard := range shards {
1173 var r *rankedShard
1174 if shard != nil {
1175 r = mkRankedShard(shard)
1176 }
1177
1178 old := s.shards[key]
1179 if shard == nil {
1180 delete(s.shards, key)
1181 } else {
1182 s.shards[key] = r
1183 }
1184
1185 if old != nil && old.Searcher != nil {
1186 // _ ___ /^^\ /^\ /^^\_
1187 // _ _@)@) \ ,,/ '` ~ `'~~ ', `\.
1188 // _/o\_ _ _ _/~`.`...'~\ ./~~..,'`','',.,' ' ~:
1189 // / `,'.~,~.~ . , . , ~|, ,/ .,' , ,. .. ,,. `, ~\_
1190 // ( ' _' _ '_` _ ' . , `\_/ .' ..' ' ` ` `.. `, \_
1191 // ~V~ V~ V~ V~ ~\ ` ' . ' , ' .,.,''`.,.''`.,.``. ', \_
1192 // _/\ /\ /\ /\_/, . ' , `_/~\_ .' .,. ,, , _/~\_ `. `. '., \_
1193 // < ~ ~ '~`'~'`, ., . `_: ::: \_ ' `_/ ::: \_ `.,' . ', \_
1194 // \ ' `_ '`_ _ ',/ _::_::_ \ _ _/ _::_::_ \ `.,'.,`., \-,-,-,_,_,
1195 // `'~~ `'~~ `'~~ `'~~ \(_)(_)(_)/ `~~' \(_)(_)(_)/ ~'`\_.._,._,'_;_;_;_;_;
1196 //
1197 // We can't just call Close now, because there may be ongoing searches
1198 // which have old in the shards list. Previously we used an exclusive
1199 // lock to guarantee there were no concurrent searches. However, that
1200 // led to blocking on the read path.
1201 //
1202 // We could introduce granular locking per rankedShard to know when
1203 // there are no more references. However, this becomes tricky in
1204 // practice. Instead we rely on the garbage collector noticing old is no
1205 // longer used. We take care in our searchers to runtime.KeepAlive until
1206 // we have stopped referencing the underling mmap data.
1207 runtime.SetFinalizer(old, func(r *rankedShard) {
1208 r.Close()
1209 })
1210 }
1211 }
1212
1213 ranked := make([]*rankedShard, 0, len(s.shards))
1214 for _, r := range s.shards {
1215 ranked = append(ranked, r)
1216 }
1217
1218 sort.Slice(ranked, func(i, j int) bool {
1219 priorityDiff := ranked[i].priority - ranked[j].priority
1220 if priorityDiff != 0 {
1221 return priorityDiff > 0
1222 }
1223 if len(ranked[i].repos) == 0 || len(ranked[j].repos) == 0 {
1224 // Protect against empty names which can happen if we fail to List or
1225 // the shard is full of tombstones. Prefer the shard which has names.
1226 return len(ranked[i].repos) >= len(ranked[j].repos)
1227 }
1228 return ranked[i].repos[0].Name < ranked[j].repos[0].Name
1229 })
1230
1231 s.ranked.Store(ranked)
1232
1233 metricShardsLoaded.Set(float64(len(ranked)))
1234}
1235
1236func loadShard(fn string) (zoekt.Searcher, error) {
1237 f, err := os.Open(fn)
1238 if err != nil {
1239 return nil, err
1240 }
1241
1242 iFile, err := index.NewIndexFile(f)
1243 if err != nil {
1244 return nil, err
1245 }
1246 s, err := index.NewSearcher(iFile)
1247 if err != nil {
1248 iFile.Close()
1249 return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err)
1250 }
1251
1252 return s, nil
1253}
1254
1255// prioritySlice is a trivial implementation of an array that provides three
1256// things: appending a value, removing a value, and getting the array's max.
1257// Operations take O(n) time, which is acceptable because N is restricted to
1258// GOMAXPROCS (i.e., number of cpu cores) by the shardedSearcher interface.
1259type prioritySlice []float64
1260
1261func (p *prioritySlice) append(pri float64) {
1262 *p = append(*p, pri)
1263}
1264
1265func (p *prioritySlice) remove(pri float64) {
1266 for i, opri := range *p {
1267 if opri == pri {
1268 if i != len(*p)-1 {
1269 // swap to make this element the tail
1270 (*p)[i] = (*p)[len(*p)-1]
1271 }
1272 // pop the end off
1273 *p = (*p)[:len(*p)-1]
1274 break
1275 }
1276 }
1277}
1278
1279func (p *prioritySlice) max() float64 {
1280 // remove() and max() could be combined, but this is easier to read and
1281 // the expected performance difference from the extra lock and loop is
1282 // almost certainly irrelevant.
1283 maxPri := math.Inf(-1)
1284 for _, pri := range *p {
1285 if pri > maxPri {
1286 maxPri = pri
1287 }
1288 }
1289 return maxPri
1290}