fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package shards
16
17import (
18 "context"
19 "fmt"
20 "log"
21 "math"
22 "os"
23 "runtime"
24 "runtime/debug"
25 "sort"
26 "strconv"
27 "sync"
28 "time"
29
30 "golang.org/x/sync/semaphore"
31
32 "github.com/prometheus/client_golang/prometheus"
33 "github.com/prometheus/client_golang/prometheus/promauto"
34 "go.uber.org/atomic"
35
36 "github.com/sourcegraph/zoekt"
37 "github.com/sourcegraph/zoekt/query"
38 "github.com/sourcegraph/zoekt/trace"
39)
40
41var (
42 metricShardsLoaded = promauto.NewGauge(prometheus.GaugeOpts{
43 Name: "zoekt_shards_loaded",
44 Help: "The number of shards currently loaded",
45 })
46 metricShardsLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
47 Name: "zoekt_shards_loaded_total",
48 Help: "The total number of shards loaded",
49 })
50 metricShardsLoadFailedTotal = promauto.NewCounter(prometheus.CounterOpts{
51 Name: "zoekt_shards_load_failed_total",
52 Help: "The total number of shard loads that failed",
53 })
54
55 metricSearchRunning = promauto.NewGauge(prometheus.GaugeOpts{
56 Name: "zoekt_search_running",
57 Help: "The number of concurrent search requests running",
58 })
59 metricSearchShardRunning = promauto.NewGauge(prometheus.GaugeOpts{
60 Name: "zoekt_search_shard_running",
61 Help: "The number of concurrent search requests in a shard running",
62 })
63 metricSearchFailedTotal = promauto.NewCounter(prometheus.CounterOpts{
64 Name: "zoekt_search_failed_total",
65 Help: "The total number of search requests that failed",
66 })
67 metricSearchDuration = promauto.NewHistogram(prometheus.HistogramOpts{
68 Name: "zoekt_search_duration_seconds",
69 Help: "The duration a search request took in seconds",
70 Buckets: prometheus.DefBuckets, // DefBuckets good for service timings
71 })
72
73 // A Counter per Stat. Name should match field in zoekt.Stats.
74 metricSearchContentBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
75 Name: "zoekt_search_content_loaded_bytes_total",
76 Help: "Total amount of I/O for reading contents",
77 })
78 metricSearchIndexBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
79 Name: "zoekt_search_index_loaded_bytes_total",
80 Help: "Total amount of I/O for reading from index",
81 })
82 metricSearchCrashesTotal = promauto.NewCounter(prometheus.CounterOpts{
83 Name: "zoekt_search_crashes_total",
84 Help: "Total number of search shards that had a crash",
85 })
86 metricSearchFileCountTotal = promauto.NewCounter(prometheus.CounterOpts{
87 Name: "zoekt_search_file_count_total",
88 Help: "Total number of files containing a match",
89 })
90 metricSearchShardFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{
91 Name: "zoekt_search_shard_files_considered_total",
92 Help: "Total number of files in shards that we considered",
93 })
94 metricSearchFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{
95 Name: "zoekt_search_files_considered_total",
96 Help: "Total files that we evaluated. Equivalent to files for which all atom matches (including negations) evaluated to true",
97 })
98 metricSearchFilesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{
99 Name: "zoekt_search_files_loaded_total",
100 Help: "Total files for which we loaded file content to verify substring matches",
101 })
102 metricSearchFilesSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{
103 Name: "zoekt_search_files_skipped_total",
104 Help: "Total candidate files whose contents weren't examined because we gathered enough matches",
105 })
106 metricSearchShardsSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{
107 Name: "zoekt_search_shards_skipped_total",
108 Help: "Total shards that we did not process because a query was canceled",
109 })
110 metricSearchMatchCountTotal = promauto.NewCounter(prometheus.CounterOpts{
111 Name: "zoekt_search_match_count_total",
112 Help: "Total number of non-overlapping matches",
113 })
114 metricSearchNgramMatchesTotal = promauto.NewCounter(prometheus.CounterOpts{
115 Name: "zoekt_search_ngram_matches_total",
116 Help: "Total number of candidate matches as a result of searching ngrams",
117 })
118
119 metricListRunning = promauto.NewGauge(prometheus.GaugeOpts{
120 Name: "zoekt_list_running",
121 Help: "The number of concurrent list requests running",
122 })
123 metricListShardRunning = promauto.NewGauge(prometheus.GaugeOpts{
124 Name: "zoekt_list_shard_running",
125 Help: "The number of concurrent list requests in a shard running",
126 })
127 metricShardsBatchReplaceDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{
128 Name: "zoekt_shards_batch_replace_duration_seconds",
129 Help: "The time it takes to replace a batch of Searchers.",
130 Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30},
131 })
132 metricListAllRepos = promauto.NewGauge(prometheus.GaugeOpts{
133 Name: "zoekt_list_all_stats_repos",
134 Help: "The last List(true) value for RepoStats.Repos. Repos is used for aggregrating the number of repositories.",
135 })
136 metricListAllShards = promauto.NewGauge(prometheus.GaugeOpts{
137 Name: "zoekt_list_all_stats_shards",
138 Help: "The last List(true) value for RepoStats.Shards. Shards is the total number of search shards.",
139 })
140 metricListAllDocuments = promauto.NewGauge(prometheus.GaugeOpts{
141 Name: "zoekt_list_all_stats_documents",
142 Help: "The last List(true) value for RepoStats.Documents. Documents holds the number of documents or files.",
143 })
144 metricListAllIndexBytes = promauto.NewGauge(prometheus.GaugeOpts{
145 Name: "zoekt_list_all_stats_index_bytes",
146 Help: "The last List(true) value for RepoStats.IndexBytes. IndexBytes is the amount of RAM used for index overhead.",
147 })
148 metricListAllContentBytes = promauto.NewGauge(prometheus.GaugeOpts{
149 Name: "zoekt_list_all_stats_content_bytes",
150 Help: "The last List(true) value for RepoStats.ContentBytes. ContentBytes is the amount of RAM used for raw content.",
151 })
152 metricListAllNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
153 Name: "zoekt_list_all_stats_new_lines_count",
154 Help: "The last List(true) value for RepoStats.NewLinesCount.",
155 })
156 metricListAllDefaultBranchNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
157 Name: "zoekt_list_all_stats_default_branch_new_lines_count",
158 Help: "The last List(true) value for RepoStats.DefaultBranchNewLinesCount.",
159 })
160 metricListAllOtherBranchesNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{
161 Name: "zoekt_list_all_stats_other_branches_new_lines_count",
162 Help: "The last List(true) value for RepoStats.OtherBranchesNewLinesCount.",
163 })
164)
165
166type rankedShard struct {
167 zoekt.Searcher
168
169 priority float64 // maximum priority across all repos in the shard
170
171 // We have out of band ranking on compound shards which can change even if
172 // the shard file does not. So we compute a rank in getShards. We store
173 // repos here to avoid the cost of List in the search request path.
174 repos []*zoekt.Repository
175}
176
177// loaded stores the state we compute when updating the state of shards from
178// disk.
179type loaded struct {
180 // shards is the currently loaded shards sorted by decreasing rank and
181 // should not be mutated.
182 shards []*rankedShard
183
184 // ready is true if sharded searcher has finished loading all initial
185 // shards on startup.
186 ready bool
187}
188
189type shardedSearcher struct {
190 // Limit the number of parallel queries. Since searching is
191 // CPU bound, we can't do better than #CPU queries in
192 // parallel. If we do so, we just create more memory
193 // pressure.
194 sched scheduler
195
196 mu sync.Mutex // protects writes to shards
197 shards map[string]*rankedShard
198
199 ready atomic.Bool
200 ranked atomic.Value
201}
202
203func newShardedSearcher(n int64) *shardedSearcher {
204 ss := &shardedSearcher{
205 shards: make(map[string]*rankedShard),
206 sched: newScheduler(n),
207 }
208 return ss
209}
210
211// NewDirectorySearcher returns a searcher instance that loads all
212// shards corresponding to a glob into memory.
213func NewDirectorySearcher(dir string) (zoekt.Streamer, error) {
214 return newDirectorySearcher(dir, true)
215}
216
217// NewDirectorySearcherFast is like NewDirectorySearcher, but does not block
218// on the initial loading of shards.
219//
220// This exists since in the case of zoekt-webserver we are happy with having
221// partial availability since that is better than no availability on large
222// instances.
223func NewDirectorySearcherFast(dir string) (zoekt.Streamer, error) {
224 return newDirectorySearcher(dir, false)
225}
226
227func newDirectorySearcher(dir string, waitUntilReady bool) (zoekt.Streamer, error) {
228 ss := newShardedSearcher(int64(runtime.GOMAXPROCS(0)))
229 tl := &loader{
230 ss: ss,
231 }
232 dw, err := newDirectoryWatcher(dir, tl)
233 if err != nil {
234 return nil, err
235 }
236
237 if waitUntilReady {
238 if err := dw.WaitUntilReady(); err != nil {
239 return nil, err
240 }
241 }
242
243 ds := &directorySearcher{
244 Streamer: ss,
245 directoryWatcher: dw,
246 }
247
248 return &typeRepoSearcher{Streamer: ds}, nil
249}
250
251type directorySearcher struct {
252 zoekt.Streamer
253
254 directoryWatcher *DirectoryWatcher
255}
256
257func (s *directorySearcher) Close() {
258 // We need to Stop directoryWatcher first since it calls load/unload on
259 // Searcher.
260 s.directoryWatcher.Stop()
261 s.Streamer.Close()
262}
263
264type loader struct {
265 ss *shardedSearcher
266}
267
268func (tl *loader) load(keys ...string) {
269 // This is called with all keys on startup, so once this function has
270 // finished running shardedSearcher will be ready.
271 defer tl.ss.markReady()
272
273 if len(keys) == 0 {
274 // If there's nothing to load, we exit early here, but we want to mark
275 // ourselves as ready.
276 return
277 }
278
279 var (
280 mu sync.Mutex // synchronizes writes to the shards map
281 wg sync.WaitGroup // used to wait for all shards to load
282 sem = semaphore.NewWeighted(int64(runtime.GOMAXPROCS(0)))
283 loadedShards = make(map[string]zoekt.Searcher)
284 )
285
286 publishLoaded := func() {
287 mu.Lock()
288 chunk := loadedShards
289 loadedShards = make(map[string]zoekt.Searcher)
290 mu.Unlock()
291 tl.ss.replace(chunk)
292 }
293
294 log.Printf("loading %d shard(s): %s", len(keys), humanTruncateList(keys, 5))
295
296 lastProgress := time.Now()
297 for i, key := range keys {
298 // If taking a while to start-up occasionally give a progress message
299 if time.Since(lastProgress) > 5*time.Second {
300 log.Printf("still need to load %d shards...", len(keys)-i)
301 lastProgress = time.Now()
302
303 publishLoaded()
304 }
305
306 _ = sem.Acquire(context.Background(), 1)
307 wg.Add(1)
308
309 go func(key string) {
310 defer sem.Release(1)
311 defer wg.Done()
312
313 shard, err := loadShard(key)
314 if err != nil {
315 metricShardsLoadFailedTotal.Inc()
316 log.Printf("reloading: %s, err %v ", key, err)
317 return
318 }
319 metricShardsLoadedTotal.Inc()
320
321 mu.Lock()
322 loadedShards[key] = shard
323 mu.Unlock()
324 }(key)
325 }
326
327 wg.Wait()
328
329 publishLoaded()
330}
331
332func (tl *loader) drop(keys ...string) {
333 shards := make(map[string]zoekt.Searcher, len(keys))
334 for _, key := range keys {
335 shards[key] = nil
336 }
337 tl.ss.replace(shards)
338}
339
340func (ss *shardedSearcher) String() string {
341 return "shardedSearcher"
342}
343
344// Close closes references to open files. It may be called only once.
345func (ss *shardedSearcher) Close() {
346 ss.mu.Lock()
347 shards := make(map[string]zoekt.Searcher, len(ss.shards))
348 for k := range ss.shards {
349 shards[k] = nil
350 }
351 ss.mu.Unlock()
352
353 ss.replace(shards)
354}
355
356func selectRepoSet(shards []*rankedShard, q query.Q) ([]*rankedShard, query.Q) {
357 and, ok := q.(*query.And)
358 if !ok {
359 return shards, q
360 }
361
362 // (and (reposet ...) (q))
363 // (and true (q)) with a filtered shards
364 // (and false) // noop
365
366 // (and (repobranches ...) (q))
367 // (and (repobranches ...) (q))
368
369 hasReposForPredicate := func(pred func(repo *zoekt.Repository) bool) func(repos []*zoekt.Repository) (any, all bool) {
370 return func(repos []*zoekt.Repository) (any, all bool) {
371 any = false
372 all = true
373 for _, repo := range repos {
374 b := pred(repo)
375 any = any || b
376 all = all && b
377 }
378 return any, all
379 }
380 }
381
382 for i, c := range and.Children {
383 var setSize int
384 var hasRepos func([]*zoekt.Repository) (bool, bool)
385 switch setQuery := c.(type) {
386 case *query.RepoSet:
387 setSize = len(setQuery.Set)
388 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
389 return setQuery.Set[repo.Name]
390 })
391 case *query.RepoIDs:
392 setSize = int(setQuery.Repos.GetCardinality())
393 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
394 return setQuery.Repos.Contains(repo.ID)
395 })
396 case *query.BranchesRepos:
397 for _, br := range setQuery.List {
398 setSize += int(br.Repos.GetCardinality())
399 }
400
401 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool {
402 for _, br := range setQuery.List {
403 if br.Repos.Contains(repo.ID) {
404 return true
405 }
406 }
407 return false
408 })
409 default:
410 continue
411 }
412
413 // setSize may be larger than the number of shards we have. The size of
414 // filtered is bounded by min(len(set), len(shards))
415 if setSize > len(shards) {
416 setSize = len(shards)
417 }
418
419 filtered := make([]*rankedShard, 0, setSize)
420 filteredAll := true
421
422 for _, s := range shards {
423 if any, all := hasRepos(s.repos); any {
424 filtered = append(filtered, s)
425 filteredAll = filteredAll && all
426 }
427 }
428
429 // We don't need to adjust the query since we are returning an empty set
430 // of shards to search.
431 if len(filtered) == 0 {
432 return filtered, and
433 }
434
435 // We can't simplify the query since we are searching shards which contain
436 // repos we aren't supposed to search.
437 if !filteredAll {
438 return filtered, and
439 }
440
441 // This optimization allows us to avoid the work done by
442 // indexData.simplify for each shard.
443 //
444 // For example if our query is (and (reposet foo bar) (content baz))
445 // then at this point filtered is [foo bar] and q is the same. For each
446 // shard indexData.simplify will simplify to (and true (content baz)) ->
447 // (content baz). This work can be done now once, rather than per shard.
448 switch c := c.(type) {
449 case *query.RepoSet:
450 and.Children[i] = &query.Const{Value: true}
451 return filtered, query.Simplify(and)
452
453 case *query.RepoIDs:
454 and.Children[i] = &query.Const{Value: true}
455 return filtered, query.Simplify(and)
456
457 case *query.BranchesRepos:
458 // We can only replace if all the repos want the same branches. We
459 // simplify and just check that we are requesting 1 branch. The common
460 // case is just asking for HEAD, so this should be effective.
461 if len(c.List) != 1 {
462 return filtered, and
463 }
464
465 // Every repo wants the same branches, so we can replace RepoBranches
466 // with a list of branch queries.
467 and.Children[i] = &query.Branch{Pattern: c.List[0].Branch, Exact: true}
468 return filtered, query.Simplify(and)
469 }
470
471 // Stop after first RepoSet, otherwise we might append duplicate
472 // shards to `filtered`
473 return filtered, and
474 }
475
476 return shards, and
477}
478
479func (ss *shardedSearcher) Search(ctx context.Context, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) {
480 tr, ctx := trace.New(ctx, "shardedSearcher.Search", "")
481 defer func() {
482 if sr != nil {
483 tr.LazyPrintf("num files: %d", len(sr.Files))
484 tr.LazyPrintf("stats: %+v", sr.Stats)
485 }
486 tr.Finish()
487 }()
488 ctx, cancel := context.WithCancel(ctx)
489 defer cancel()
490
491 collectSender := newCollectSender(opts)
492
493 start := time.Now()
494 proc, err := ss.sched.Acquire(ctx)
495 if err != nil {
496 return nil, err
497 }
498 defer proc.Release()
499 tr.LazyPrintf("acquired process")
500
501 wait := time.Since(start)
502 start = time.Now()
503
504 loaded := ss.getLoaded()
505 done, err := streamSearch(ctx, proc, q, opts, loaded.shards, collectSender)
506 defer done()
507 if err != nil {
508 return nil, err
509 }
510
511 aggregate, ok := collectSender.Done()
512 if !ok {
513 aggregate = &zoekt.SearchResult{
514 RepoURLs: map[string]string{},
515 LineFragments: map[string]string{},
516 }
517 }
518
519 copyFiles(aggregate)
520
521 if !loaded.ready {
522 // We may have missed results due to not being fully loaded.
523 aggregate.Stats.Crashes++
524 }
525
526 aggregate.Stats.Wait = wait
527 aggregate.Stats.Duration = time.Since(start)
528
529 return aggregate, nil
530}
531
532func (ss *shardedSearcher) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, sender zoekt.Sender) (err error) {
533 tr, ctx := trace.New(ctx, "shardedSearcher.StreamSearch", "")
534 defer func() {
535 if err != nil {
536 tr.LazyPrintf("error: %v", err)
537 tr.SetError(err)
538 }
539 tr.Finish()
540 }()
541
542 start := time.Now()
543 proc, err := ss.sched.Acquire(ctx)
544 if err != nil {
545 return err
546 }
547 defer proc.Release()
548 tr.LazyPrintf("acquired process")
549
550 loaded := ss.getLoaded()
551 shards := loaded.shards
552
553 maxPendingPriority := math.Inf(-1)
554 if len(shards) > 0 {
555 maxPendingPriority = shards[0].priority
556 }
557
558 stillLoadingCrashes := 0
559 if !loaded.ready {
560 // We may have missed results due to not being fully loaded.
561 stillLoadingCrashes++
562 }
563
564 sender.Send(&zoekt.SearchResult{
565 Stats: zoekt.Stats{
566 Crashes: stillLoadingCrashes,
567 Wait: time.Since(start),
568 },
569 Progress: zoekt.Progress{
570 MaxPendingPriority: maxPendingPriority,
571 },
572 })
573
574 // Matches flow from the shards up the stack in the following order:
575 //
576 // 1. Search shards
577 // 2. flushCollectSender (aggregate)
578 // 3. limitSender (limit)
579 // 4. copyFileSender (copy)
580 //
581 // For streaming, the wrapping has to happen in the inverted order.
582 sender = copyFileSender(sender)
583
584 if opts.MaxDocDisplayCount > 0 {
585 var cancel context.CancelFunc
586 ctx, cancel = context.WithCancel(ctx)
587 defer cancel()
588 sender = limitSender(cancel, sender, opts.MaxDocDisplayCount)
589 }
590
591 sender, flush := newFlushCollectSender(opts, sender)
592
593 done, err := streamSearch(ctx, proc, q, opts, shards, sender)
594
595 // Even though streaming is done, we may have results sitting in a buffer we
596 // need to flush. So we need to send those before calling done.
597 flush()
598 done()
599
600 return err
601}
602
603// streamSearch is an internal helper since both Search and StreamSearch are
604// largely similar.
605//
606// done must always be called, even if err is non-nil. The SearchResults sent
607// via sender contain references to the underlying mmap data that the garbage
608// collector can't see. Calling done informs the garbage collector it is free
609// to collect those shards. The caller must call copyFiles on any
610// SearchResults it returns/streams out before calling done.
611func streamSearch(ctx context.Context, proc *process, q query.Q, opts *zoekt.SearchOptions, shards []*rankedShard, sender zoekt.Sender) (done func(), err error) {
612 tr, ctx := trace.New(ctx, "shardedSearcher.streamSearch", "")
613 tr.LazyLog(q, true)
614 tr.LazyPrintf("opts: %+v", opts)
615 overallStart := time.Now()
616 metricSearchRunning.Inc()
617 defer func() {
618 metricSearchRunning.Dec()
619 metricSearchDuration.Observe(time.Since(overallStart).Seconds())
620 if err != nil {
621 metricSearchFailedTotal.Inc()
622
623 tr.LazyPrintf("error: %v", err)
624 tr.SetError(err)
625 }
626 tr.Finish()
627 }()
628
629 tr.LazyPrintf("before selectRepoSet shards:%d", len(shards))
630 // Select the subset of shards that we will search over for the given query.
631 shards, q = selectRepoSet(shards, q)
632 tr.LazyPrintf("after selectRepoSet shards:%d %s", len(shards), q)
633
634 if len(shards) == 0 {
635 return func() {}, nil
636 }
637
638 var cancel context.CancelFunc
639 if opts.MaxWallTime == 0 {
640 ctx, cancel = context.WithCancel(ctx)
641 } else {
642 ctx, cancel = context.WithTimeout(ctx, opts.MaxWallTime)
643 }
644
645 defer cancel()
646
647 // We set the number of workers to GOMAXPROCS, or the number of shards,
648 // whichever is smaller.
649 workers := runtime.GOMAXPROCS(0)
650 if workers > len(shards) {
651 workers = len(shards)
652 }
653
654 type result struct {
655 priority float64
656 *zoekt.SearchResult
657 err error
658 }
659
660 var (
661 // buffered channels to continue searching when sending back results
662 // takes a while / blocks. The maximum pending result set is workers * 2.
663 results = make(chan *result, workers)
664 search = make(chan *rankedShard, workers)
665 wg sync.WaitGroup
666 )
667
668 // Start workers that receive shards from the search channel, search them,
669 // and send the results to the results channel. This process is repeated
670 // until the search channel is closed.
671 //
672 // Note: Making "search" a buffered channel has the effect of limiting the number of parallel shard searches.
673 // Since searching is mostly CPU bound, limiting parallel shard searches also reduces the peak working set.
674 wg.Add(workers)
675 for i := 0; i < workers; i++ {
676 go func() {
677 defer wg.Done()
678 for s := range search {
679 sr, err := searchOneShard(ctx, s, q, opts)
680 r := &result{priority: s.priority, SearchResult: sr, err: err}
681 results <- r
682 }
683 }()
684 }
685
686 go func() {
687 wg.Wait()
688 close(results)
689 }()
690
691 var (
692 pending = make(prioritySlice, 0, workers)
693 shard = 0
694 next = shards[shard]
695
696 // We need a separate nil-able reference to the same channel so we can close(search) for the worker
697 // go-routines to finish but also set work to nil in order for the select statement below to ignore
698 // that case when we want to stop a search. This is needed because sending on a closed channel panics.
699 work = search
700 )
701
702 stop := func() {
703 if work != nil {
704 close(search)
705 work = nil
706 next = nil
707 }
708 }
709
710 // tracked so we can stop when we hit TotalMaxMatchCount
711 var totalMatchCount int
712
713search:
714 for {
715 // At the top of each iteration, have the proc associated with this search yield its won "timeslice"
716 // to possibly allow other searches to make progress
717 _ = proc.Yield(ctx) // Note: we let searchOneShard handle context errors
718
719 select {
720 case work <- next: // is there a worker available to search the next shard?
721 pending.append(next.priority)
722
723 shard++
724 if shard == len(shards) {
725 stop()
726 } else {
727 next = shards[shard]
728 }
729 case r, ok := <-results: // is there a result to send back?
730 if !ok {
731 break search
732 }
733
734 // delete this result's priority from pending before computing the new max pending priority
735 pending.remove(r.priority)
736
737 if r.err != nil {
738 // Set final error and stop searching new shards, but consume any pending
739 // search results.
740 stop()
741 err = r.err
742 continue
743 }
744
745 // Update the match count statistics and stop searching new shards if we've
746 // reached the limit set in the options.
747 totalMatchCount += r.SearchResult.Stats.MatchCount
748 if opts.TotalMaxMatchCount > 0 && totalMatchCount > opts.TotalMaxMatchCount {
749 stop()
750 }
751
752 observeMetrics(r.SearchResult)
753
754 r.Priority = r.priority
755 r.MaxPendingPriority = pending.max()
756
757 sendByRepository(r.SearchResult, opts, sender) // send the result back to the client
758 }
759 }
760
761 return func() { runtime.KeepAlive(shards) }, err
762}
763
764// sendByRepository splits a zoekt.SearchResult by repository and calls
765// sender.Send for each batch. Ranking in Sourcegraph expects zoekt.SearchResult
766// to contain results with the same zoekt.SearchResult.Priority only.
767//
768// We split by repository instead of by priority because it is easier to set
769// RepoURLs and LineFragments in zoekt.SearchResult.
770func sendByRepository(result *zoekt.SearchResult, opts *zoekt.SearchOptions, sender zoekt.Sender) {
771
772 if len(result.RepoURLs) <= 1 || len(result.Files) == 0 {
773 zoekt.SortFiles(result.Files)
774 sender.Send(result)
775 return
776 }
777
778 send := func(repoName string, a, b int, stats zoekt.Stats) {
779 zoekt.SortFiles(result.Files[a:b])
780 sender.Send(&zoekt.SearchResult{
781 Stats: stats,
782 Progress: zoekt.Progress{
783 Priority: result.Files[a].RepositoryPriority,
784 MaxPendingPriority: result.MaxPendingPriority,
785 },
786 Files: result.Files[a:b],
787 RepoURLs: map[string]string{repoName: result.RepoURLs[repoName]},
788 LineFragments: map[string]string{repoName: result.LineFragments[repoName]},
789 })
790 }
791
792 var startIndex, endIndex int
793 curRepoID := result.Files[0].RepositoryID
794 curRepoName := result.Files[0].Repository
795
796 fm := zoekt.FileMatch{}
797 for endIndex, fm = range result.Files {
798 if curRepoID != fm.RepositoryID {
799 // Stats must stay aggregate-able, hence we sent the aggregate stats with the
800 // last event.
801 send(curRepoName, startIndex, endIndex, zoekt.Stats{})
802
803 startIndex = endIndex
804 curRepoID = fm.RepositoryID
805 curRepoName = fm.Repository
806 }
807 }
808
809 send(curRepoName, startIndex, endIndex+1, result.Stats)
810}
811
812func observeMetrics(sr *zoekt.SearchResult) {
813 metricSearchContentBytesLoadedTotal.Add(float64(sr.Stats.ContentBytesLoaded))
814 metricSearchIndexBytesLoadedTotal.Add(float64(sr.Stats.IndexBytesLoaded))
815 metricSearchCrashesTotal.Add(float64(sr.Stats.Crashes))
816 metricSearchFileCountTotal.Add(float64(sr.Stats.FileCount))
817 metricSearchShardFilesConsideredTotal.Add(float64(sr.Stats.ShardFilesConsidered))
818 metricSearchFilesConsideredTotal.Add(float64(sr.Stats.FilesConsidered))
819 metricSearchFilesLoadedTotal.Add(float64(sr.Stats.FilesLoaded))
820 metricSearchFilesSkippedTotal.Add(float64(sr.Stats.FilesSkipped))
821 metricSearchShardsSkippedTotal.Add(float64(sr.Stats.ShardsSkipped))
822 metricSearchMatchCountTotal.Add(float64(sr.Stats.MatchCount))
823 metricSearchNgramMatchesTotal.Add(float64(sr.Stats.NgramMatches))
824}
825
826func copySlice(src *[]byte) {
827 if *src == nil {
828 return
829 }
830 dst := make([]byte, len(*src))
831 copy(dst, *src)
832 *src = dst
833}
834
835func copyFiles(sr *zoekt.SearchResult) {
836 for i := range sr.Files {
837 copySlice(&sr.Files[i].Content)
838 copySlice(&sr.Files[i].Checksum)
839 for l := range sr.Files[i].LineMatches {
840 copySlice(&sr.Files[i].LineMatches[l].Line)
841 copySlice(&sr.Files[i].LineMatches[l].Before)
842 copySlice(&sr.Files[i].LineMatches[l].After)
843 }
844 for c := range sr.Files[i].ChunkMatches {
845 copySlice(&sr.Files[i].ChunkMatches[c].Content)
846 }
847 }
848}
849
850func searchOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) {
851 metricSearchShardRunning.Inc()
852 defer func() {
853 metricSearchShardRunning.Dec()
854 if e := recover(); e != nil {
855 log.Printf("crashed shard: %s: %#v, %s", s, e, debug.Stack())
856
857 if sr == nil {
858 sr = &zoekt.SearchResult{}
859 }
860 sr.Stats.Crashes = 1
861 }
862 }()
863
864 return s.Search(ctx, q, opts)
865}
866
867type shardListResult struct {
868 rl *zoekt.RepoList
869 err error
870}
871
872func listOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.ListOptions, sink chan shardListResult) {
873 metricListShardRunning.Inc()
874 defer func() {
875 metricListShardRunning.Dec()
876 if r := recover(); r != nil {
877 log.Printf("crashed shard: %s: %s, %s", s.String(), r, debug.Stack())
878 sink <- shardListResult{
879 &zoekt.RepoList{Crashes: 1}, nil,
880 }
881 }
882 }()
883
884 ms, err := s.List(ctx, q, opts)
885 sink <- shardListResult{ms, err}
886}
887
888func (ss *shardedSearcher) List(ctx context.Context, r query.Q, opts *zoekt.ListOptions) (rl *zoekt.RepoList, err error) {
889 tr, ctx := trace.New(ctx, "shardedSearcher.List", "")
890 tr.LazyLog(r, true)
891 tr.LazyPrintf("opts: %s", opts)
892 metricListRunning.Inc()
893 defer func() {
894 metricListRunning.Dec()
895 if rl != nil {
896 tr.LazyPrintf("repos size: %d", len(rl.Repos))
897 tr.LazyPrintf("crashes: %d", rl.Crashes)
898 tr.LazyPrintf("minimal size: %d", len(rl.Minimal))
899 }
900 if err != nil {
901 tr.LazyPrintf("error: %v", err)
902 tr.SetError(err)
903 }
904 tr.Finish()
905 }()
906
907 r = query.Simplify(r)
908 isAll := false
909 if c, ok := r.(*query.Const); ok {
910 isAll = c.Value
911 }
912
913 proc, err := ss.sched.Acquire(ctx)
914 if err != nil {
915 return nil, err
916 }
917 defer proc.Release()
918 tr.LazyPrintf("acquired process")
919
920 loaded := ss.getLoaded()
921 shards := loaded.shards
922 shardCount := len(shards)
923 all := make(chan shardListResult, shardCount)
924 tr.LazyPrintf("shardCount: %d", len(shards))
925
926 feeder := make(chan zoekt.Searcher, len(shards))
927 for _, s := range shards {
928 feeder <- s
929 }
930 close(feeder)
931
932 for i := 0; i < runtime.GOMAXPROCS(0); i++ {
933 go func() {
934 for s := range feeder {
935 listOneShard(ctx, s, r, opts, all)
936 }
937 }()
938 }
939
940 stillLoadingCrashes := 0
941 if !loaded.ready {
942 // We may have missed results due to not being fully loaded.
943 stillLoadingCrashes++
944 }
945
946 agg := zoekt.RepoList{
947 Crashes: stillLoadingCrashes,
948 Minimal: map[uint32]*zoekt.MinimalRepoListEntry{},
949 ReposMap: zoekt.ReposMap{},
950 }
951
952 uniq := map[string]*zoekt.RepoListEntry{}
953
954 for range shards {
955 r := <-all
956 if r.err != nil {
957 return nil, r.err
958 }
959
960 agg.Crashes += r.rl.Crashes
961 agg.Stats.Add(&r.rl.Stats)
962
963 for _, r := range r.rl.Repos {
964 prev, ok := uniq[r.Repository.Name]
965 if !ok {
966 cp := *r // We need to copy because we mutate r.Stats when merging duplicates
967 uniq[r.Repository.Name] = &cp
968 } else {
969 prev.Stats.Add(&r.Stats)
970 }
971 }
972
973 for id, r := range r.rl.Minimal {
974 _, ok := agg.Minimal[id]
975 if !ok {
976 agg.Minimal[id] = r
977 }
978 }
979
980 for id, r := range r.rl.ReposMap {
981 agg.ReposMap[id] = r
982 }
983 }
984
985 agg.Repos = make([]*zoekt.RepoListEntry, 0, len(uniq))
986 for _, r := range uniq {
987 agg.Repos = append(agg.Repos, r)
988 }
989
990 if isAll && len(agg.Repos) > 0 {
991 reportListAllMetrics(agg.Repos)
992 }
993
994 return &agg, nil
995}
996
997func reportListAllMetrics(repos []*zoekt.RepoListEntry) {
998 var stats zoekt.RepoStats
999 for _, r := range repos {
1000 stats.Add(&r.Stats)
1001 }
1002
1003 metricListAllRepos.Set(float64(stats.Repos))
1004 metricListAllIndexBytes.Set(float64(stats.IndexBytes))
1005 metricListAllContentBytes.Set(float64(stats.ContentBytes))
1006 metricListAllDocuments.Set(float64(stats.Documents))
1007 metricListAllShards.Set(float64(stats.Shards))
1008 metricListAllNewLinesCount.Set(float64(stats.NewLinesCount))
1009 metricListAllDefaultBranchNewLinesCount.Set(float64(stats.DefaultBranchNewLinesCount))
1010 metricListAllOtherBranchesNewLinesCount.Set(float64(stats.OtherBranchesNewLinesCount))
1011}
1012
1013// getLoaded returns the currently loaded shards. Shared so do not mutate.
1014func (s *shardedSearcher) getLoaded() loaded {
1015 // next commit will store the true value of this, for now we keep the
1016 // backwards compatible behaviour.
1017 ready := s.ready.Load()
1018 // ranked is loaded after ready to avoid a race were ready is true but
1019 // ranked is still not the final set of shards.
1020 ranked, _ := s.ranked.Load().([]*rankedShard)
1021 return loaded{
1022 shards: ranked,
1023 ready: ready,
1024 }
1025}
1026
1027func mkRankedShard(s zoekt.Searcher) *rankedShard {
1028 q := query.Const{Value: true}
1029 result, err := s.List(context.Background(), &q, nil)
1030 if err != nil {
1031 return &rankedShard{Searcher: s}
1032 }
1033 if len(result.Repos) == 0 {
1034 return &rankedShard{Searcher: s}
1035 }
1036
1037 var (
1038 maxPriority float64
1039 repos = make([]*zoekt.Repository, 0, len(result.Repos))
1040 )
1041 for i := range result.Repos {
1042 repo := &result.Repos[i].Repository
1043 repos = append(repos, repo)
1044 if repo.RawConfig != nil {
1045 priority, _ := strconv.ParseFloat(repo.RawConfig["priority"], 64)
1046 if priority > maxPriority {
1047 maxPriority = priority
1048 }
1049 }
1050 }
1051
1052 return &rankedShard{
1053 Searcher: s,
1054 repos: repos,
1055 priority: maxPriority,
1056 }
1057}
1058
1059// markReady should be called once all shards have been passed into replace on
1060// startup. Once s is marked as ready it stops reporting a Crash in the
1061// response Stats.
1062func (s *shardedSearcher) markReady() {
1063 s.ready.CompareAndSwap(false, true)
1064}
1065
1066func (s *shardedSearcher) replace(shards map[string]zoekt.Searcher) {
1067 if len(shards) == 0 {
1068 return
1069 }
1070
1071 defer func(began time.Time) {
1072 metricShardsBatchReplaceDurationSeconds.Observe(time.Since(began).Seconds())
1073 }(time.Now())
1074
1075 s.mu.Lock()
1076 defer s.mu.Unlock()
1077
1078 for key, shard := range shards {
1079 var r *rankedShard
1080 if shard != nil {
1081 r = mkRankedShard(shard)
1082 }
1083
1084 old := s.shards[key]
1085 if shard == nil {
1086 delete(s.shards, key)
1087 } else {
1088 s.shards[key] = r
1089 }
1090
1091 if old != nil && old.Searcher != nil {
1092 // _ ___ /^^\ /^\ /^^\_
1093 // _ _@)@) \ ,,/ '` ~ `'~~ ', `\.
1094 // _/o\_ _ _ _/~`.`...'~\ ./~~..,'`','',.,' ' ~:
1095 // / `,'.~,~.~ . , . , ~|, ,/ .,' , ,. .. ,,. `, ~\_
1096 // ( ' _' _ '_` _ ' . , `\_/ .' ..' ' ` ` `.. `, \_
1097 // ~V~ V~ V~ V~ ~\ ` ' . ' , ' .,.,''`.,.''`.,.``. ', \_
1098 // _/\ /\ /\ /\_/, . ' , `_/~\_ .' .,. ,, , _/~\_ `. `. '., \_
1099 // < ~ ~ '~`'~'`, ., . `_: ::: \_ ' `_/ ::: \_ `.,' . ', \_
1100 // \ ' `_ '`_ _ ',/ _::_::_ \ _ _/ _::_::_ \ `.,'.,`., \-,-,-,_,_,
1101 // `'~~ `'~~ `'~~ `'~~ \(_)(_)(_)/ `~~' \(_)(_)(_)/ ~'`\_.._,._,'_;_;_;_;_;
1102 //
1103 // We can't just call Close now, because there may be ongoing searches
1104 // which have old in the shards list. Previously we used an exclusive
1105 // lock to guarantee there were no concurrent searches. However, that
1106 // led to blocking on the read path.
1107 //
1108 // We could introduce granular locking per rankedShard to know when
1109 // there are no more references. However, this becomes tricky in
1110 // practice. Instead we rely on the garbage collector noticing old is no
1111 // longer used. We take care in our searchers to runtime.KeepAlive until
1112 // we have stopped referencing the underling mmap data.
1113 runtime.SetFinalizer(old, func(r *rankedShard) {
1114 r.Close()
1115 })
1116 }
1117 }
1118
1119 ranked := make([]*rankedShard, 0, len(s.shards))
1120 for _, r := range s.shards {
1121 ranked = append(ranked, r)
1122 }
1123
1124 sort.Slice(ranked, func(i, j int) bool {
1125 priorityDiff := ranked[i].priority - ranked[j].priority
1126 if priorityDiff != 0 {
1127 return priorityDiff > 0
1128 }
1129 if len(ranked[i].repos) == 0 || len(ranked[j].repos) == 0 {
1130 // Protect against empty names which can happen if we fail to List or
1131 // the shard is full of tombstones. Prefer the shard which has names.
1132 return len(ranked[i].repos) >= len(ranked[j].repos)
1133 }
1134 return ranked[i].repos[0].Name < ranked[j].repos[0].Name
1135 })
1136
1137 s.ranked.Store(ranked)
1138
1139 metricShardsLoaded.Set(float64(len(ranked)))
1140}
1141
1142func loadShard(fn string) (zoekt.Searcher, error) {
1143 f, err := os.Open(fn)
1144 if err != nil {
1145 return nil, err
1146 }
1147
1148 iFile, err := zoekt.NewIndexFile(f)
1149 if err != nil {
1150 return nil, err
1151 }
1152 s, err := zoekt.NewSearcher(iFile)
1153 if err != nil {
1154 iFile.Close()
1155 return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err)
1156 }
1157
1158 return s, nil
1159}
1160
1161// prioritySlice is a trivial implementation of an array that provides three
1162// things: appending a value, removing a value, and getting the array's max.
1163// Operations take O(n) time, which is acceptable because N is restricted to
1164// GOMAXPROCS (i.e., number of cpu cores) by the shardedSearcher interface.
1165type prioritySlice []float64
1166
1167func (p *prioritySlice) append(pri float64) {
1168 *p = append(*p, pri)
1169}
1170
1171func (p *prioritySlice) remove(pri float64) {
1172 for i, opri := range *p {
1173 if opri == pri {
1174 if i != len(*p)-1 {
1175 // swap to make this element the tail
1176 (*p)[i] = (*p)[len(*p)-1]
1177 }
1178 // pop the end off
1179 *p = (*p)[:len(*p)-1]
1180 break
1181 }
1182 }
1183}
1184
1185func (p *prioritySlice) max() float64 {
1186 // remove() and max() could be combined, but this is easier to read and
1187 // the expected performance difference from the extra lock and loop is
1188 // almost certainly irrelevant.
1189 maxPri := math.Inf(-1)
1190 for _, pri := range *p {
1191 if pri > maxPri {
1192 maxPri = pri
1193 }
1194 }
1195 return maxPri
1196}