fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package shards
16
17import (
18 "context"
19 "fmt"
20 "log"
21 "math"
22 "os"
23 "runtime"
24 "runtime/debug"
25 "sort"
26 "strconv"
27 "sync"
28 "time"
29
30 "golang.org/x/sync/semaphore"
31
32 "github.com/prometheus/client_golang/prometheus"
33 "github.com/prometheus/client_golang/prometheus/promauto"
34 "go.uber.org/atomic"
35
36 "github.com/sourcegraph/zoekt"
37 "github.com/sourcegraph/zoekt/query"
38 "github.com/sourcegraph/zoekt/trace"
39)
40
41var (
42 metricShardsLoaded = promauto.NewGauge(prometheus.GaugeOpts{
43 Name: "zoekt_shards_loaded",
44 Help: "The number of shards currently loaded",
45 })
46 metricShardsLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
47 Name: "zoekt_shards_loaded_total",
48 Help: "The total number of shards loaded",
49 })
50 metricShardsLoadFailedTotal = promauto.NewCounter(prometheus.CounterOpts{
51 Name: "zoekt_shards_load_failed_total",
52 Help: "The total number of shard loads that failed",
53 })
54
55 metricSearchRunning = promauto.NewGauge(prometheus.GaugeOpts{
56 Name: "zoekt_search_running",
57 Help: "The number of concurrent search requests running",
58 })
59 metricSearchShardRunning = promauto.NewGauge(prometheus.GaugeOpts{
60 Name: "zoekt_search_shard_running",
61 Help: "The number of concurrent search requests in a shard running",
62 })
63 metricSearchFailedTotal = promauto.NewCounter(prometheus.CounterOpts{
64 Name: "zoekt_search_failed_total",
65 Help: "The total number of search requests that failed",
66 })
67 metricSearchDuration = promauto.NewHistogram(prometheus.HistogramOpts{
68 Name: "zoekt_search_duration_seconds",
69 Help: "The duration a search request took in seconds",
70 Buckets: prometheus.DefBuckets, // DefBuckets good for service timings
71 })
72
73 // A Counter per Stat. Name should match field in zoekt.Stats.
74 metricSearchContentBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
75 Name: "zoekt_search_content_loaded_bytes_total",
76 Help: "Total amount of I/O for reading contents",
77 })
78 metricSearchIndexBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
79 Name: "zoekt_search_index_loaded_bytes_total",
80 Help: "Total amount of I/O for reading from index",
81 })
82 metricSearchCrashesTotal = promauto.NewCounter(prometheus.CounterOpts{
83 Name: "zoekt_search_crashes_total",
84 Help: "Total number of search shards that had a crash",
85 })
86 metricSearchFileCountTotal = promauto.NewCounter(prometheus.CounterOpts{
87 Name: "zoekt_search_file_count_total",
88 Help: "Total number of files containing a match",
89 })
90 metricSearchShardFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{
91 Name: "zoekt_search_shard_files_considered_total",
92 Help: "Total number of files in shards that we considered",
93 })
94 metricSearchFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{
95 Name: "zoekt_search_files_considered_total",
96 Help: "Total files that we evaluated. Equivalent to files for which all atom matches (including negations) evaluated to true",
97 })
98 metricSearchFilesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
99 Name: "zoekt_search_files_loaded_total",
100 Help: "Total files for which we loaded file content to verify substring matches",
101 })
102 metricSearchFilesSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{
103 Name: "zoekt_search_files_skipped_total",
104 Help: "Total candidate files whose contents weren't examined because we gathered enough matches",
105 })
106 metricSearchShardsSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{
107 Name: "zoekt_search_shards_skipped_total",
108 Help: "Total shards that we did not process because a query was canceled",
109 })
110 metricSearchMatchCountTotal = promauto.NewCounter(prometheus.CounterOpts{
111 Name: "zoekt_search_match_count_total",
112 Help: "Total number of non-overlapping matches",
113 })
114 metricSearchNgramMatchesTotal = promauto.NewCounter(prometheus.CounterOpts{
115 Name: "zoekt_search_ngram_matches_total",
116 Help: "Total number of candidate matches as a result of searching ngrams",
117 })
118 metricSearchNgramLookupsTotal = promauto.NewCounter(prometheus.CounterOpts{
119 Name: "zoekt_search_ngram_lookups_total",
120 Help: "Total number of times we accessed an ngram in the index",
121 })
122 metricSearchRegexpsConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{
123 Name: "zoekt_search_regexps_considered_total",
124 Help: "Total number of times regexp was called on files that we evaluated",
125 })
126
127 metricListRunning = promauto.NewGauge(prometheus.GaugeOpts{
128 Name: "zoekt_list_running",
129 Help: "The number of concurrent list requests running",
130 })
131 metricListShardRunning = promauto.NewGauge(prometheus.GaugeOpts{
132 Name: "zoekt_list_shard_running",
133 Help: "The number of concurrent list requests in a shard running",
134 })
135 metricShardsBatchReplaceDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{
136 Name: "zoekt_shards_batch_replace_duration_seconds",
137 Help: "The time it takes to replace a batch of Searchers.",
138 Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30},
139 })
140 metricListAllRepos = promauto.NewGauge(prometheus.GaugeOpts{
141 Name: "zoekt_list_all_stats_repos",
142 Help: "The last List(true) value for RepoStats.Repos. Repos is used for aggregrating the number of repositories.",
143 })
144 metricListAllShards = promauto.NewGauge(prometheus.GaugeOpts{
145 Name: "zoekt_list_all_stats_shards",
146 Help: "The last List(true) value for RepoStats.Shards. Shards is the total number of search shards.",
147 })
148 metricListAllDocuments = promauto.NewGauge(prometheus.GaugeOpts{
149 Name: "zoekt_list_all_stats_documents",
150 Help: "The last List(true) value for RepoStats.Documents. Documents holds the number of documents or files.",
151 })
152 metricListAllIndexBytes = promauto.NewGauge(prometheus.GaugeOpts{
153 Name: "zoekt_list_all_stats_index_bytes",
154 Help: "The last List(true) value for RepoStats.IndexBytes. IndexBytes is the amount of RAM used for index overhead.",
155 })
156 metricListAllContentBytes = promauto.NewGauge(prometheus.GaugeOpts{
157 Name: "zoekt_list_all_stats_content_bytes",
158 Help: "The last List(true) value for RepoStats.ContentBytes. ContentBytes is the amount of RAM used for raw content.",
159 })
160 metricListAllNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
161 Name: "zoekt_list_all_stats_new_lines_count",
162 Help: "The last List(true) value for RepoStats.NewLinesCount.",
163 })
164 metricListAllDefaultBranchNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
165 Name: "zoekt_list_all_stats_default_branch_new_lines_count",
166 Help: "The last List(true) value for RepoStats.DefaultBranchNewLinesCount.",
167 })
168 metricListAllOtherBranchesNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
169 Name: "zoekt_list_all_stats_other_branches_new_lines_count",
170 Help: "The last List(true) value for RepoStats.OtherBranchesNewLinesCount.",
171 })
172)
173
174type rankedShard struct {
175 zoekt.Searcher
176
177 priority float64 // maximum priority across all repos in the shard
178
179 // We have out of band ranking on compound shards which can change even if
180 // the shard file does not. So we compute a rank in getShards. We store
181 // repos here to avoid the cost of List in the search request path.
182 repos []*zoekt.Repository
183}
184
185// loaded stores the state we compute when updating the state of shards from
186// disk.
187type loaded struct {
188 // shards is the currently loaded shards sorted by decreasing rank and
189 // should not be mutated.
190 shards []*rankedShard
191
192 // ready is true if sharded searcher has finished loading all initial
193 // shards on startup.
194 ready bool
195}
196
197type shardedSearcher struct {
198 // Limit the number of parallel queries. Since searching is
199 // CPU bound, we can't do better than #CPU queries in
200 // parallel. If we do so, we just create more memory
201 // pressure.
202 sched scheduler
203
204 mu sync.Mutex // protects writes to shards
205 shards map[string]*rankedShard
206
207 ready atomic.Bool
208 ranked atomic.Value
209}
210
211func newShardedSearcher(n int64) *shardedSearcher {
212 ss := &shardedSearcher{
213 shards: make(map[string]*rankedShard),
214 sched: newScheduler(n),
215 }
216 return ss
217}
218
219// NewDirectorySearcher returns a searcher instance that loads all
220// shards corresponding to a glob into memory.
221func NewDirectorySearcher(dir string) (zoekt.Streamer, error) {
222 return newDirectorySearcher(dir, true)
223}
224
225// NewDirectorySearcherFast is like NewDirectorySearcher, but does not block
226// on the initial loading of shards.
227//
228// This exists since in the case of zoekt-webserver we are happy with having
229// partial availability since that is better than no availability on large
230// instances.
231func NewDirectorySearcherFast(dir string) (zoekt.Streamer, error) {
232 return newDirectorySearcher(dir, false)
233}
234
235func newDirectorySearcher(dir string, waitUntilReady bool) (zoekt.Streamer, error) {
236 ss := newShardedSearcher(int64(runtime.GOMAXPROCS(0)))
237 tl := &loader{
238 ss: ss,
239 }
240 dw, err := newDirectoryWatcher(dir, tl)
241 if err != nil {
242 return nil, err
243 }
244
245 if waitUntilReady {
246 if err := dw.WaitUntilReady(); err != nil {
247 return nil, err
248 }
249 }
250
251 ds := &directorySearcher{
252 Streamer: ss,
253 directoryWatcher: dw,
254 }
255
256 return &typeRepoSearcher{Streamer: ds}, nil
257}
258
259type directorySearcher struct {
260 zoekt.Streamer
261
262 directoryWatcher *DirectoryWatcher
263}
264
265func (s *directorySearcher) Close() {
266 // We need to Stop directoryWatcher first since it calls load/unload on
267 // Searcher.
268 s.directoryWatcher.Stop()
269 s.Streamer.Close()
270}
271
272type loader struct {
273 ss *shardedSearcher
274}
275
276func (tl *loader) load(keys ...string) {
277 // This is called with all keys on startup, so once this function has
278 // finished running shardedSearcher will be ready.
279 defer tl.ss.markReady()
280
281 if len(keys) == 0 {
282 // If there's nothing to load, we exit early here, but we want to mark
283 // ourselves as ready.
284 return
285 }
286
287 var (
288 mu sync.Mutex // synchronizes writes to the shards map
289 wg sync.WaitGroup // used to wait for all shards to load
290 sem = semaphore.NewWeighted(int64(runtime.GOMAXPROCS(0)))
291 loadedShards = make(map[string]zoekt.Searcher)
292 )
293
294 publishLoaded := func() {
295 mu.Lock()
296 chunk := loadedShards
297 loadedShards = make(map[string]zoekt.Searcher)
298 mu.Unlock()
299 tl.ss.replace(chunk)
300 }
301
302 log.Printf("loading %d shard(s): %s", len(keys), humanTruncateList(keys, 5))
303
304 lastProgress := time.Now()
305 for i, key := range keys {
306 // If taking a while to start-up occasionally give a progress message
307 if time.Since(lastProgress) > 5*time.Second {
308 log.Printf("still need to load %d shards...", len(keys)-i)
309 lastProgress = time.Now()
310
311 publishLoaded()
312 }
313
314 _ = sem.Acquire(context.Background(), 1)
315 wg.Add(1)
316
317 go func(key string) {
318 defer sem.Release(1)
319 defer wg.Done()
320
321 shard, err := loadShard(key)
322 if err != nil {
323 metricShardsLoadFailedTotal.Inc()
324 log.Printf("reloading: %s, err %v ", key, err)
325 return
326 }
327 metricShardsLoadedTotal.Inc()
328
329 mu.Lock()
330 loadedShards[key] = shard
331 mu.Unlock()
332 }(key)
333 }
334
335 wg.Wait()
336
337 publishLoaded()
338}
339
340func (tl *loader) drop(keys ...string) {
341 shards := make(map[string]zoekt.Searcher, len(keys))
342 for _, key := range keys {
343 shards[key] = nil
344 }
345 tl.ss.replace(shards)
346}
347
348func (ss *shardedSearcher) String() string {
349 return "shardedSearcher"
350}
351
352// Close closes references to open files. It may be called only once.
353func (ss *shardedSearcher) Close() {
354 ss.mu.Lock()
355 shards := make(map[string]zoekt.Searcher, len(ss.shards))
356 for k := range ss.shards {
357 shards[k] = nil
358 }
359 ss.mu.Unlock()
360
361 ss.replace(shards)
362}
363
364func selectRepoSet(shards []*rankedShard, q query.Q) ([]*rankedShard, query.Q) {
365 and, ok := q.(*query.And)
366 if !ok {
367 return shards, q
368 }
369
370 // (and (reposet ...) (q))
371 // (and true (q)) with a filtered shards
372 // (and false) // noop
373
374 // (and (repobranches ...) (q))
375 // (and (repobranches ...) (q))
376
377 hasReposForPredicate := func(pred func(repo *zoekt.Repository) bool) func(repos []*zoekt.Repository) (any, all bool) {
378 return func(repos []*zoekt.Repository) (any, all bool) {
379 any = false
380 all = true
381 for _, repo := range repos {
382 b := pred(repo)
383 any = any || b
384 all = all && b
385 }
386 return any, all
387 }
388 }
389
390 for i, c := range and.Children {
391 var setSize int
392 var hasRepos func([]*zoekt.Repository) (bool, bool)
393 switch setQuery := c.(type) {
394 case *query.RepoSet:
395 setSize = len(setQuery.Set)
396 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
397 return setQuery.Set[repo.Name]
398 })
399 case *query.RepoIDs:
400 setSize = int(setQuery.Repos.GetCardinality())
401 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
402 return setQuery.Repos.Contains(repo.ID)
403 })
404 case *query.BranchesRepos:
405 for _, br := range setQuery.List {
406 setSize += int(br.Repos.GetCardinality())
407 }
408
409 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
410 for _, br := range setQuery.List {
411 if br.Repos.Contains(repo.ID) {
412 return true
413 }
414 }
415 return false
416 })
417 default:
418 continue
419 }
420
421 // setSize may be larger than the number of shards we have. The size of
422 // filtered is bounded by min(len(set), len(shards))
423 if setSize > len(shards) {
424 setSize = len(shards)
425 }
426
427 filtered := make([]*rankedShard, 0, setSize)
428 filteredAll := true
429
430 for _, s := range shards {
431 if any, all := hasRepos(s.repos); any {
432 filtered = append(filtered, s)
433 filteredAll = filteredAll && all
434 }
435 }
436
437 // We don't need to adjust the query since we are returning an empty set
438 // of shards to search.
439 if len(filtered) == 0 {
440 return filtered, and
441 }
442
443 // We can't simplify the query since we are searching shards which contain
444 // repos we aren't supposed to search.
445 if !filteredAll {
446 return filtered, and
447 }
448
449 // This optimization allows us to avoid the work done by
450 // indexData.simplify for each shard.
451 //
452 // For example if our query is (and (reposet foo bar) (content baz))
453 // then at this point filtered is [foo bar] and q is the same. For each
454 // shard indexData.simplify will simplify to (and true (content baz)) ->
455 // (content baz). This work can be done now once, rather than per shard.
456 switch c := c.(type) {
457 case *query.RepoSet:
458 and.Children[i] = &query.Const{Value: true}
459 return filtered, query.Simplify(and)
460
461 case *query.RepoIDs:
462 and.Children[i] = &query.Const{Value: true}
463 return filtered, query.Simplify(and)
464
465 case *query.BranchesRepos:
466 // We can only replace if all the repos want the same branches. We
467 // simplify and just check that we are requesting 1 branch. The common
468 // case is just asking for HEAD, so this should be effective.
469 if len(c.List) != 1 {
470 return filtered, and
471 }
472
473 // Every repo wants the same branches, so we can replace RepoBranches
474 // with a list of branch queries.
475 and.Children[i] = &query.Branch{Pattern: c.List[0].Branch, Exact: true}
476 return filtered, query.Simplify(and)
477 }
478
479 // Stop after first RepoSet, otherwise we might append duplicate
480 // shards to `filtered`
481 return filtered, and
482 }
483
484 return shards, and
485}
486
487func (ss *shardedSearcher) Search(ctx context.Context, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) {
488 tr, ctx := trace.New(ctx, "shardedSearcher.Search", "")
489 defer func() {
490 if sr != nil {
491 tr.LazyPrintf("num files: %d", len(sr.Files))
492 tr.LazyPrintf("stats: %+v", sr.Stats)
493 }
494 tr.Finish()
495 }()
496 ctx, cancel := context.WithCancel(ctx)
497 defer cancel()
498
499 collectSender := newCollectSender(opts)
500
501 start := time.Now()
502 proc, err := ss.sched.Acquire(ctx)
503 if err != nil {
504 return nil, err
505 }
506 defer proc.Release()
507 tr.LazyPrintf("acquired process")
508
509 wait := time.Since(start)
510 start = time.Now()
511
512 loaded := ss.getLoaded()
513 done, err := streamSearch(ctx, proc, q, opts, loaded.shards, collectSender)
514 defer done()
515 if err != nil {
516 return nil, err
517 }
518
519 aggregate, ok := collectSender.Done()
520 if !ok {
521 aggregate = &zoekt.SearchResult{
522 RepoURLs: map[string]string{},
523 LineFragments: map[string]string{},
524 }
525 }
526
527 copyFiles(aggregate)
528
529 if !loaded.ready {
530 // We may have missed results due to not being fully loaded.
531 aggregate.Stats.Crashes++
532 }
533
534 aggregate.Stats.Wait = wait
535 aggregate.Stats.Duration = time.Since(start)
536
537 return aggregate, nil
538}
539
540func (ss *shardedSearcher) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, sender zoekt.Sender) (err error) {
541 tr, ctx := trace.New(ctx, "shardedSearcher.StreamSearch", "")
542 defer func() {
543 if err != nil {
544 tr.LazyPrintf("error: %v", err)
545 tr.SetError(err)
546 }
547 tr.Finish()
548 }()
549
550 start := time.Now()
551 proc, err := ss.sched.Acquire(ctx)
552 if err != nil {
553 return err
554 }
555 defer proc.Release()
556 tr.LazyPrintf("acquired process")
557
558 loaded := ss.getLoaded()
559 shards := loaded.shards
560
561 maxPendingPriority := math.Inf(-1)
562 if len(shards) > 0 {
563 maxPendingPriority = shards[0].priority
564 }
565
566 stillLoadingCrashes := 0
567 if !loaded.ready {
568 // We may have missed results due to not being fully loaded.
569 stillLoadingCrashes++
570 }
571
572 sender.Send(&zoekt.SearchResult{
573 Stats: zoekt.Stats{
574 Crashes: stillLoadingCrashes,
575 Wait: time.Since(start),
576 },
577 Progress: zoekt.Progress{
578 MaxPendingPriority: maxPendingPriority,
579 },
580 })
581
582 // Matches flow from the shards up the stack in the following order:
583 //
584 // 1. Search shards
585 // 2. flushCollectSender (aggregate)
586 // 3. limitSender (limit)
587 // 4. copyFileSender (copy)
588 //
589 // For streaming, the wrapping has to happen in the inverted order.
590 sender = copyFileSender(sender)
591
592 if opts.MaxDocDisplayCount > 0 {
593 var cancel context.CancelFunc
594 ctx, cancel = context.WithCancel(ctx)
595 defer cancel()
596 sender = limitSender(cancel, sender, opts.MaxDocDisplayCount)
597 }
598
599 sender, flush := newFlushCollectSender(opts, sender)
600
601 done, err := streamSearch(ctx, proc, q, opts, shards, sender)
602
603 // Even though streaming is done, we may have results sitting in a buffer we
604 // need to flush. So we need to send those before calling done.
605 flush()
606 done()
607
608 return err
609}
610
611// streamSearch is an internal helper since both Search and StreamSearch are
612// largely similar.
613//
614// done must always be called, even if err is non-nil. The SearchResults sent
615// via sender contain references to the underlying mmap data that the garbage
616// collector can't see. Calling done informs the garbage collector it is free
617// to collect those shards. The caller must call copyFiles on any
618// SearchResults it returns/streams out before calling done.
619func streamSearch(ctx context.Context, proc *process, q query.Q, opts *zoekt.SearchOptions, shards []*rankedShard, sender zoekt.Sender) (done func(), err error) {
620 tr, ctx := trace.New(ctx, "shardedSearcher.streamSearch", "")
621 tr.LazyLog(q, true)
622 tr.LazyPrintf("opts: %+v", opts)
623 overallStart := time.Now()
624 metricSearchRunning.Inc()
625 defer func() {
626 metricSearchRunning.Dec()
627 metricSearchDuration.Observe(time.Since(overallStart).Seconds())
628 if err != nil {
629 metricSearchFailedTotal.Inc()
630
631 tr.LazyPrintf("error: %v", err)
632 tr.SetError(err)
633 }
634 tr.Finish()
635 }()
636
637 tr.LazyPrintf("before selectRepoSet shards:%d", len(shards))
638 // Select the subset of shards that we will search over for the given query.
639 shards, q = selectRepoSet(shards, q)
640 tr.LazyPrintf("after selectRepoSet shards:%d %s", len(shards), q)
641
642 if len(shards) == 0 {
643 return func() {}, nil
644 }
645
646 var cancel context.CancelFunc
647 if opts.MaxWallTime == 0 {
648 ctx, cancel = context.WithCancel(ctx)
649 } else {
650 ctx, cancel = context.WithTimeout(ctx, opts.MaxWallTime)
651 }
652
653 defer cancel()
654
655 // We set the number of workers to GOMAXPROCS, or the number of shards,
656 // whichever is smaller.
657 workers := runtime.GOMAXPROCS(0)
658 if workers > len(shards) {
659 workers = len(shards)
660 }
661
662 type result struct {
663 priority float64
664 *zoekt.SearchResult
665 err error
666 }
667
668 var (
669 // buffered channels to continue searching when sending back results
670 // takes a while / blocks. The maximum pending result set is workers * 2.
671 results = make(chan *result, workers)
672 search = make(chan *rankedShard, workers)
673 wg sync.WaitGroup
674 )
675
676 // Start workers that receive shards from the search channel, search them,
677 // and send the results to the results channel. This process is repeated
678 // until the search channel is closed.
679 //
680 // Note: Making "search" a buffered channel has the effect of limiting the number of parallel shard searches.
681 // Since searching is mostly CPU bound, limiting parallel shard searches also reduces the peak working set.
682 wg.Add(workers)
683 for i := 0; i < workers; i++ {
684 go func() {
685 defer wg.Done()
686 for s := range search {
687 sr, err := searchOneShard(ctx, s, q, opts)
688 r := &result{priority: s.priority, SearchResult: sr, err: err}
689 results <- r
690 }
691 }()
692 }
693
694 go func() {
695 wg.Wait()
696 close(results)
697 }()
698
699 var (
700 pending = make(prioritySlice, 0, workers)
701 shard = 0
702 next = shards[shard]
703
704 // We need a separate nil-able reference to the same channel so we can close(search) for the worker
705 // go-routines to finish but also set work to nil in order for the select statement below to ignore
706 // that case when we want to stop a search. This is needed because sending on a closed channel panics.
707 work = search
708 )
709
710 stop := func() {
711 if work != nil {
712 close(search)
713 work = nil
714 next = nil
715 }
716 }
717
718 // tracked so we can stop when we hit TotalMaxMatchCount
719 var totalMatchCount int
720
721search:
722 for {
723 // At the top of each iteration, have the proc associated with this search yield its won "timeslice"
724 // to possibly allow other searches to make progress
725 _ = proc.Yield(ctx) // Note: we let searchOneShard handle context errors
726
727 select {
728 case work <- next: // is there a worker available to search the next shard?
729 pending.append(next.priority)
730
731 shard++
732 if shard == len(shards) {
733 stop()
734 } else {
735 next = shards[shard]
736 }
737 case r, ok := <-results: // is there a result to send back?
738 if !ok {
739 break search
740 }
741
742 // delete this result's priority from pending before computing the new max pending priority
743 pending.remove(r.priority)
744
745 if r.err != nil {
746 // Set final error and stop searching new shards, but consume any pending
747 // search results.
748 stop()
749 err = r.err
750 continue
751 }
752
753 // Update the match count statistics and stop searching new shards if we've
754 // reached the limit set in the options.
755 totalMatchCount += r.SearchResult.Stats.MatchCount
756 if opts.TotalMaxMatchCount > 0 && totalMatchCount > opts.TotalMaxMatchCount {
757 stop()
758 }
759
760 observeMetrics(r.SearchResult)
761
762 r.Priority = r.priority
763 r.MaxPendingPriority = pending.max()
764
765 sendByRepository(r.SearchResult, opts, sender) // send the result back to the client
766 }
767 }
768
769 return func() { runtime.KeepAlive(shards) }, err
770}
771
772// sendByRepository splits a zoekt.SearchResult by repository and calls
773// sender.Send for each batch. Ranking in Sourcegraph expects zoekt.SearchResult
774// to contain results with the same zoekt.SearchResult.Priority only.
775//
776// We split by repository instead of by priority because it is easier to set
777// RepoURLs and LineFragments in zoekt.SearchResult.
778func sendByRepository(result *zoekt.SearchResult, opts *zoekt.SearchOptions, sender zoekt.Sender) {
779
780 if len(result.RepoURLs) <= 1 || len(result.Files) == 0 {
781 zoekt.SortFiles(result.Files)
782 sender.Send(result)
783 return
784 }
785
786 send := func(repoName string, a, b int, stats zoekt.Stats) {
787 zoekt.SortFiles(result.Files[a:b])
788 sender.Send(&zoekt.SearchResult{
789 Stats: stats,
790 Progress: zoekt.Progress{
791 Priority: result.Files[a].RepositoryPriority,
792 MaxPendingPriority: result.MaxPendingPriority,
793 },
794 Files: result.Files[a:b],
795 RepoURLs: map[string]string{repoName: result.RepoURLs[repoName]},
796 LineFragments: map[string]string{repoName: result.LineFragments[repoName]},
797 })
798 }
799
800 var startIndex, endIndex int
801 curRepoID := result.Files[0].RepositoryID
802 curRepoName := result.Files[0].Repository
803
804 fm := zoekt.FileMatch{}
805 for endIndex, fm = range result.Files {
806 if curRepoID != fm.RepositoryID {
807 // Stats must stay aggregate-able, hence we sent the aggregate stats with the
808 // last event.
809 send(curRepoName, startIndex, endIndex, zoekt.Stats{})
810
811 startIndex = endIndex
812 curRepoID = fm.RepositoryID
813 curRepoName = fm.Repository
814 }
815 }
816
817 send(curRepoName, startIndex, endIndex+1, result.Stats)
818}
819
820func observeMetrics(sr *zoekt.SearchResult) {
821 metricSearchContentBytesLoadedTotal.Add(float64(sr.Stats.ContentBytesLoaded))
822 metricSearchIndexBytesLoadedTotal.Add(float64(sr.Stats.IndexBytesLoaded))
823 metricSearchCrashesTotal.Add(float64(sr.Stats.Crashes))
824 metricSearchFileCountTotal.Add(float64(sr.Stats.FileCount))
825 metricSearchShardFilesConsideredTotal.Add(float64(sr.Stats.ShardFilesConsidered))
826 metricSearchFilesConsideredTotal.Add(float64(sr.Stats.FilesConsidered))
827 metricSearchFilesLoadedTotal.Add(float64(sr.Stats.FilesLoaded))
828 metricSearchFilesSkippedTotal.Add(float64(sr.Stats.FilesSkipped))
829 metricSearchShardsSkippedTotal.Add(float64(sr.Stats.ShardsSkipped))
830 metricSearchMatchCountTotal.Add(float64(sr.Stats.MatchCount))
831 metricSearchNgramMatchesTotal.Add(float64(sr.Stats.NgramMatches))
832 metricSearchNgramLookupsTotal.Add(float64(sr.Stats.NgramLookups))
833 metricSearchRegexpsConsideredTotal.Add(float64(sr.Stats.RegexpsConsidered))
834}
835
836func copySlice(src *[]byte) {
837 if *src == nil {
838 return
839 }
840 dst := make([]byte, len(*src))
841 copy(dst, *src)
842 *src = dst
843}
844
845func copyFiles(sr *zoekt.SearchResult) {
846 for i := range sr.Files {
847 copySlice(&sr.Files[i].Content)
848 copySlice(&sr.Files[i].Checksum)
849 for l := range sr.Files[i].LineMatches {
850 copySlice(&sr.Files[i].LineMatches[l].Line)
851 copySlice(&sr.Files[i].LineMatches[l].Before)
852 copySlice(&sr.Files[i].LineMatches[l].After)
853 }
854 for c := range sr.Files[i].ChunkMatches {
855 copySlice(&sr.Files[i].ChunkMatches[c].Content)
856 }
857 }
858}
859
860func searchOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) {
861 metricSearchShardRunning.Inc()
862 defer func() {
863 metricSearchShardRunning.Dec()
864 if e := recover(); e != nil {
865 log.Printf("crashed shard: %s: %#v, %s", s, e, debug.Stack())
866
867 if sr == nil {
868 sr = &zoekt.SearchResult{}
869 }
870 sr.Stats.Crashes = 1
871 }
872 }()
873
874 return s.Search(ctx, q, opts)
875}
876
877type shardListResult struct {
878 rl *zoekt.RepoList
879 err error
880}
881
882func listOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.ListOptions, sink chan shardListResult) {
883 metricListShardRunning.Inc()
884 defer func() {
885 metricListShardRunning.Dec()
886 if r := recover(); r != nil {
887 log.Printf("crashed shard: %s: %s, %s", s.String(), r, debug.Stack())
888 sink <- shardListResult{
889 &zoekt.RepoList{Crashes: 1}, nil,
890 }
891 }
892 }()
893
894 ms, err := s.List(ctx, q, opts)
895 sink <- shardListResult{ms, err}
896}
897
898func (ss *shardedSearcher) List(ctx context.Context, r query.Q, opts *zoekt.ListOptions) (rl *zoekt.RepoList, err error) {
899 tr, ctx := trace.New(ctx, "shardedSearcher.List", "")
900 tr.LazyLog(r, true)
901 tr.LazyPrintf("opts: %s", opts)
902 metricListRunning.Inc()
903 defer func() {
904 metricListRunning.Dec()
905 if rl != nil {
906 tr.LazyPrintf("repos size: %d", len(rl.Repos))
907 tr.LazyPrintf("crashes: %d", rl.Crashes)
908 tr.LazyPrintf("minimal size: %d", len(rl.Minimal))
909 }
910 if err != nil {
911 tr.LazyPrintf("error: %v", err)
912 tr.SetError(err)
913 }
914 tr.Finish()
915 }()
916
917 r = query.Simplify(r)
918 isAll := false
919 if c, ok := r.(*query.Const); ok {
920 isAll = c.Value
921 }
922
923 proc, err := ss.sched.Acquire(ctx)
924 if err != nil {
925 return nil, err
926 }
927 defer proc.Release()
928 tr.LazyPrintf("acquired process")
929
930 loaded := ss.getLoaded()
931 shards := loaded.shards
932 shardCount := len(shards)
933 all := make(chan shardListResult, shardCount)
934 tr.LazyPrintf("shardCount: %d", len(shards))
935
936 feeder := make(chan zoekt.Searcher, len(shards))
937 for _, s := range shards {
938 feeder <- s
939 }
940 close(feeder)
941
942 for i := 0; i < runtime.GOMAXPROCS(0); i++ {
943 go func() {
944 for s := range feeder {
945 listOneShard(ctx, s, r, opts, all)
946 }
947 }()
948 }
949
950 stillLoadingCrashes := 0
951 if !loaded.ready {
952 // We may have missed results due to not being fully loaded.
953 stillLoadingCrashes++
954 }
955
956 agg := zoekt.RepoList{
957 Crashes: stillLoadingCrashes,
958 Minimal: map[uint32]*zoekt.MinimalRepoListEntry{},
959 ReposMap: zoekt.ReposMap{},
960 }
961
962 uniq := map[string]*zoekt.RepoListEntry{}
963
964 for range shards {
965 r := <-all
966 if r.err != nil {
967 return nil, r.err
968 }
969
970 agg.Crashes += r.rl.Crashes
971 agg.Stats.Add(&r.rl.Stats)
972
973 for _, r := range r.rl.Repos {
974 prev, ok := uniq[r.Repository.Name]
975 if !ok {
976 cp := *r // We need to copy because we mutate r.Stats when merging duplicates
977 uniq[r.Repository.Name] = &cp
978 } else {
979 prev.Stats.Add(&r.Stats)
980 }
981 }
982
983 for id, r := range r.rl.Minimal {
984 _, ok := agg.Minimal[id]
985 if !ok {
986 agg.Minimal[id] = r
987 }
988 }
989
990 for id, r := range r.rl.ReposMap {
991 agg.ReposMap[id] = r
992 }
993 }
994
995 agg.Repos = make([]*zoekt.RepoListEntry, 0, len(uniq))
996 for _, r := range uniq {
997 agg.Repos = append(agg.Repos, r)
998 }
999
1000 // Only one of these fields is populated and in all cases the size of that
1001 // field is the number of Repos.
1002 //
1003 // Note: we don't just add individual Stats.Repos since a repository can
1004 // have multiple shards.
1005 agg.Stats.Repos = len(uniq) + len(agg.Minimal) + len(agg.ReposMap)
1006
1007 if isAll && len(agg.Repos) > 0 {
1008 reportListAllMetrics(agg.Repos)
1009 }
1010
1011 return &agg, nil
1012}
1013
1014func reportListAllMetrics(repos []*zoekt.RepoListEntry) {
1015 var stats zoekt.RepoStats
1016 for _, r := range repos {
1017 stats.Add(&r.Stats)
1018 }
1019
1020 metricListAllRepos.Set(float64(stats.Repos))
1021 metricListAllIndexBytes.Set(float64(stats.IndexBytes))
1022 metricListAllContentBytes.Set(float64(stats.ContentBytes))
1023 metricListAllDocuments.Set(float64(stats.Documents))
1024 metricListAllShards.Set(float64(stats.Shards))
1025 metricListAllNewLinesCount.Set(float64(stats.NewLinesCount))
1026 metricListAllDefaultBranchNewLinesCount.Set(float64(stats.DefaultBranchNewLinesCount))
1027 metricListAllOtherBranchesNewLinesCount.Set(float64(stats.OtherBranchesNewLinesCount))
1028}
1029
1030// getLoaded returns the currently loaded shards. Shared so do not mutate.
1031func (s *shardedSearcher) getLoaded() loaded {
1032 // next commit will store the true value of this, for now we keep the
1033 // backwards compatible behaviour.
1034 ready := s.ready.Load()
1035 // ranked is loaded after ready to avoid a race were ready is true but
1036 // ranked is still not the final set of shards.
1037 ranked, _ := s.ranked.Load().([]*rankedShard)
1038 return loaded{
1039 shards: ranked,
1040 ready: ready,
1041 }
1042}
1043
1044func mkRankedShard(s zoekt.Searcher) *rankedShard {
1045 q := query.Const{Value: true}
1046 result, err := s.List(context.Background(), &q, nil)
1047 if err != nil {
1048 return &rankedShard{Searcher: s}
1049 }
1050 if len(result.Repos) == 0 {
1051 return &rankedShard{Searcher: s}
1052 }
1053
1054 var (
1055 maxPriority float64
1056 repos = make([]*zoekt.Repository, 0, len(result.Repos))
1057 )
1058 for i := range result.Repos {
1059 repo := &result.Repos[i].Repository
1060 repos = append(repos, repo)
1061 if repo.RawConfig != nil {
1062 priority, _ := strconv.ParseFloat(repo.RawConfig["priority"], 64)
1063 if priority > maxPriority {
1064 maxPriority = priority
1065 }
1066 }
1067 }
1068
1069 return &rankedShard{
1070 Searcher: s,
1071 repos: repos,
1072 priority: maxPriority,
1073 }
1074}
1075
1076// markReady should be called once all shards have been passed into replace on
1077// startup. Once s is marked as ready it stops reporting a Crash in the
1078// response Stats.
1079func (s *shardedSearcher) markReady() {
1080 s.ready.CompareAndSwap(false, true)
1081}
1082
1083func (s *shardedSearcher) replace(shards map[string]zoekt.Searcher) {
1084 if len(shards) == 0 {
1085 return
1086 }
1087
1088 defer func(began time.Time) {
1089 metricShardsBatchReplaceDurationSeconds.Observe(time.Since(began).Seconds())
1090 }(time.Now())
1091
1092 s.mu.Lock()
1093 defer s.mu.Unlock()
1094
1095 for key, shard := range shards {
1096 var r *rankedShard
1097 if shard != nil {
1098 r = mkRankedShard(shard)
1099 }
1100
1101 old := s.shards[key]
1102 if shard == nil {
1103 delete(s.shards, key)
1104 } else {
1105 s.shards[key] = r
1106 }
1107
1108 if old != nil && old.Searcher != nil {
1109 // _ ___ /^^\ /^\ /^^\_
1110 // _ _@)@) \ ,,/ '` ~ `'~~ ', `\.
1111 // _/o\_ _ _ _/~`.`...'~\ ./~~..,'`','',.,' ' ~:
1112 // / `,'.~,~.~ . , . , ~|, ,/ .,' , ,. .. ,,. `, ~\_
1113 // ( ' _' _ '_` _ ' . , `\_/ .' ..' ' ` ` `.. `, \_
1114 // ~V~ V~ V~ V~ ~\ ` ' . ' , ' .,.,''`.,.''`.,.``. ', \_
1115 // _/\ /\ /\ /\_/, . ' , `_/~\_ .' .,. ,, , _/~\_ `. `. '., \_
1116 // < ~ ~ '~`'~'`, ., . `_: ::: \_ ' `_/ ::: \_ `.,' . ', \_
1117 // \ ' `_ '`_ _ ',/ _::_::_ \ _ _/ _::_::_ \ `.,'.,`., \-,-,-,_,_,
1118 // `'~~ `'~~ `'~~ `'~~ \(_)(_)(_)/ `~~' \(_)(_)(_)/ ~'`\_.._,._,'_;_;_;_;_;
1119 //
1120 // We can't just call Close now, because there may be ongoing searches
1121 // which have old in the shards list. Previously we used an exclusive
1122 // lock to guarantee there were no concurrent searches. However, that
1123 // led to blocking on the read path.
1124 //
1125 // We could introduce granular locking per rankedShard to know when
1126 // there are no more references. However, this becomes tricky in
1127 // practice. Instead we rely on the garbage collector noticing old is no
1128 // longer used. We take care in our searchers to runtime.KeepAlive until
1129 // we have stopped referencing the underling mmap data.
1130 runtime.SetFinalizer(old, func(r *rankedShard) {
1131 r.Close()
1132 })
1133 }
1134 }
1135
1136 ranked := make([]*rankedShard, 0, len(s.shards))
1137 for _, r := range s.shards {
1138 ranked = append(ranked, r)
1139 }
1140
1141 sort.Slice(ranked, func(i, j int) bool {
1142 priorityDiff := ranked[i].priority - ranked[j].priority
1143 if priorityDiff != 0 {
1144 return priorityDiff > 0
1145 }
1146 if len(ranked[i].repos) == 0 || len(ranked[j].repos) == 0 {
1147 // Protect against empty names which can happen if we fail to List or
1148 // the shard is full of tombstones. Prefer the shard which has names.
1149 return len(ranked[i].repos) >= len(ranked[j].repos)
1150 }
1151 return ranked[i].repos[0].Name < ranked[j].repos[0].Name
1152 })
1153
1154 s.ranked.Store(ranked)
1155
1156 metricShardsLoaded.Set(float64(len(ranked)))
1157}
1158
1159func loadShard(fn string) (zoekt.Searcher, error) {
1160 f, err := os.Open(fn)
1161 if err != nil {
1162 return nil, err
1163 }
1164
1165 iFile, err := zoekt.NewIndexFile(f)
1166 if err != nil {
1167 return nil, err
1168 }
1169 s, err := zoekt.NewSearcher(iFile)
1170 if err != nil {
1171 iFile.Close()
1172 return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err)
1173 }
1174
1175 return s, nil
1176}
1177
1178// prioritySlice is a trivial implementation of an array that provides three
1179// things: appending a value, removing a value, and getting the array's max.
1180// Operations take O(n) time, which is acceptable because N is restricted to
1181// GOMAXPROCS (i.e., number of cpu cores) by the shardedSearcher interface.
1182type prioritySlice []float64
1183
1184func (p *prioritySlice) append(pri float64) {
1185 *p = append(*p, pri)
1186}
1187
1188func (p *prioritySlice) remove(pri float64) {
1189 for i, opri := range *p {
1190 if opri == pri {
1191 if i != len(*p)-1 {
1192 // swap to make this element the tail
1193 (*p)[i] = (*p)[len(*p)-1]
1194 }
1195 // pop the end off
1196 *p = (*p)[:len(*p)-1]
1197 break
1198 }
1199 }
1200}
1201
1202func (p *prioritySlice) max() float64 {
1203 // remove() and max() could be combined, but this is easier to read and
1204 // the expected performance difference from the extra lock and loop is
1205 // almost certainly irrelevant.
1206 maxPri := math.Inf(-1)
1207 for _, pri := range *p {
1208 if pri > maxPri {
1209 maxPri = pri
1210 }
1211 }
1212 return maxPri
1213}