fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package shards 16 17import ( 18 "context" 19 "fmt" 20 "log" 21 "math" 22 "os" 23 "runtime" 24 "runtime/debug" 25 "slices" 26 "sort" 27 "strconv" 28 "sync" 29 "time" 30 31 "golang.org/x/sync/semaphore" 32 33 "github.com/prometheus/client_golang/prometheus" 34 "github.com/prometheus/client_golang/prometheus/promauto" 35 "go.uber.org/atomic" 36 37 "github.com/sourcegraph/zoekt" 38 "github.com/sourcegraph/zoekt/query" 39 "github.com/sourcegraph/zoekt/trace" 40) 41 42var ( 43 metricShardsLoaded = promauto.NewGauge(prometheus.GaugeOpts{ 44 Name: "zoekt_shards_loaded", 45 Help: "The number of shards currently loaded", 46 }) 47 metricShardsLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 48 Name: "zoekt_shards_loaded_total", 49 Help: "The total number of shards loaded", 50 }) 51 metricShardsLoadFailedTotal = promauto.NewCounter(prometheus.CounterOpts{ 52 Name: "zoekt_shards_load_failed_total", 53 Help: "The total number of shard loads that failed", 54 }) 55 56 metricSearchRunning = promauto.NewGauge(prometheus.GaugeOpts{ 57 Name: "zoekt_search_running", 58 Help: "The number of concurrent search requests running", 59 }) 60 metricSearchShardRunning = promauto.NewGauge(prometheus.GaugeOpts{ 61 Name: "zoekt_search_shard_running", 62 Help: "The number of concurrent search requests in a shard running", 63 }) 64 metricSearchFailedTotal = promauto.NewCounter(prometheus.CounterOpts{ 65 Name: "zoekt_search_failed_total", 66 Help: "The total number of search requests that failed", 67 }) 68 metricSearchDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 69 Name: "zoekt_search_duration_seconds", 70 Help: "The duration a search request took in seconds", 71 Buckets: prometheus.DefBuckets, // DefBuckets good for service timings 72 }) 73 74 // A Counter per Stat. Name should match field in zoekt.Stats. 75 metricSearchContentBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 76 Name: "zoekt_search_content_loaded_bytes_total", 77 Help: "Total amount of I/O for reading contents", 78 }) 79 metricSearchIndexBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 80 Name: "zoekt_search_index_loaded_bytes_total", 81 Help: "Total amount of I/O for reading from index", 82 }) 83 metricSearchCrashesTotal = promauto.NewCounter(prometheus.CounterOpts{ 84 Name: "zoekt_search_crashes_total", 85 Help: "Total number of search shards that had a crash", 86 }) 87 metricSearchFileCountTotal = promauto.NewCounter(prometheus.CounterOpts{ 88 Name: "zoekt_search_file_count_total", 89 Help: "Total number of files containing a match", 90 }) 91 metricSearchShardFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{ 92 Name: "zoekt_search_shard_files_considered_total", 93 Help: "Total number of files in shards that we considered", 94 }) 95 metricSearchFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{ 96 Name: "zoekt_search_files_considered_total", 97 Help: "Total files that we evaluated. Equivalent to files for which all atom matches (including negations) evaluated to true", 98 }) 99 metricSearchFilesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 100 Name: "zoekt_search_files_loaded_total", 101 Help: "Total files for which we loaded file content to verify substring matches", 102 }) 103 metricSearchFilesSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{ 104 Name: "zoekt_search_files_skipped_total", 105 Help: "Total candidate files whose contents weren't examined because we gathered enough matches", 106 }) 107 metricSearchShardsSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{ 108 Name: "zoekt_search_shards_skipped_total", 109 Help: "Total shards that we did not process because a query was canceled", 110 }) 111 metricSearchMatchCountTotal = promauto.NewCounter(prometheus.CounterOpts{ 112 Name: "zoekt_search_match_count_total", 113 Help: "Total number of non-overlapping matches", 114 }) 115 metricSearchNgramMatchesTotal = promauto.NewCounter(prometheus.CounterOpts{ 116 Name: "zoekt_search_ngram_matches_total", 117 Help: "Total number of candidate matches as a result of searching ngrams", 118 }) 119 metricSearchNgramLookupsTotal = promauto.NewCounter(prometheus.CounterOpts{ 120 Name: "zoekt_search_ngram_lookups_total", 121 Help: "Total number of times we accessed an ngram in the index", 122 }) 123 metricSearchRegexpsConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{ 124 Name: "zoekt_search_regexps_considered_total", 125 Help: "Total number of times regexp was called on files that we evaluated", 126 }) 127 128 metricListRunning = promauto.NewGauge(prometheus.GaugeOpts{ 129 Name: "zoekt_list_running", 130 Help: "The number of concurrent list requests running", 131 }) 132 metricListShardRunning = promauto.NewGauge(prometheus.GaugeOpts{ 133 Name: "zoekt_list_shard_running", 134 Help: "The number of concurrent list requests in a shard running", 135 }) 136 metricShardsBatchReplaceDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{ 137 Name: "zoekt_shards_batch_replace_duration_seconds", 138 Help: "The time it takes to replace a batch of Searchers.", 139 Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30}, 140 }) 141 metricListAllRepos = promauto.NewGauge(prometheus.GaugeOpts{ 142 Name: "zoekt_list_all_stats_repos", 143 Help: "The last List(true) value for RepoStats.Repos. Repos is used for aggregrating the number of repositories.", 144 }) 145 metricListAllShards = promauto.NewGauge(prometheus.GaugeOpts{ 146 Name: "zoekt_list_all_stats_shards", 147 Help: "The last List(true) value for RepoStats.Shards. Shards is the total number of search shards.", 148 }) 149 metricListAllDocuments = promauto.NewGauge(prometheus.GaugeOpts{ 150 Name: "zoekt_list_all_stats_documents", 151 Help: "The last List(true) value for RepoStats.Documents. Documents holds the number of documents or files.", 152 }) 153 metricListAllIndexBytes = promauto.NewGauge(prometheus.GaugeOpts{ 154 Name: "zoekt_list_all_stats_index_bytes", 155 Help: "The last List(true) value for RepoStats.IndexBytes. IndexBytes is the amount of RAM used for index overhead.", 156 }) 157 metricListAllContentBytes = promauto.NewGauge(prometheus.GaugeOpts{ 158 Name: "zoekt_list_all_stats_content_bytes", 159 Help: "The last List(true) value for RepoStats.ContentBytes. ContentBytes is the amount of RAM used for raw content.", 160 }) 161 metricListAllNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 162 Name: "zoekt_list_all_stats_new_lines_count", 163 Help: "The last List(true) value for RepoStats.NewLinesCount.", 164 }) 165 metricListAllDefaultBranchNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 166 Name: "zoekt_list_all_stats_default_branch_new_lines_count", 167 Help: "The last List(true) value for RepoStats.DefaultBranchNewLinesCount.", 168 }) 169 metricListAllOtherBranchesNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 170 Name: "zoekt_list_all_stats_other_branches_new_lines_count", 171 Help: "The last List(true) value for RepoStats.OtherBranchesNewLinesCount.", 172 }) 173) 174 175type rankedShard struct { 176 zoekt.Searcher 177 178 priority float64 // maximum priority across all repos in the shard 179 180 // We have out of band ranking on compound shards which can change even if 181 // the shard file does not. So we compute a rank in getShards. We store 182 // repos here to avoid the cost of List in the search request path. 183 // 184 // repos is nil only if that call failed. 185 repos []*zoekt.Repository 186} 187 188// loaded stores the state we compute when updating the state of shards from 189// disk. 190type loaded struct { 191 // shards is the currently loaded shards sorted by decreasing rank and 192 // should not be mutated. 193 shards []*rankedShard 194 195 // ready is true if sharded searcher has finished loading all initial 196 // shards on startup. 197 ready bool 198} 199 200type shardedSearcher struct { 201 // Limit the number of parallel queries. Since searching is 202 // CPU bound, we can't do better than #CPU queries in 203 // parallel. If we do so, we just create more memory 204 // pressure. 205 sched scheduler 206 207 mu sync.Mutex // protects writes to shards 208 shards map[string]*rankedShard 209 210 ready atomic.Bool 211 ranked atomic.Value 212} 213 214func newShardedSearcher(n int64) *shardedSearcher { 215 ss := &shardedSearcher{ 216 shards: make(map[string]*rankedShard), 217 sched: newScheduler(n), 218 } 219 return ss 220} 221 222// NewDirectorySearcher returns a searcher instance that loads all 223// shards corresponding to a glob into memory. 224func NewDirectorySearcher(dir string) (zoekt.Streamer, error) { 225 return newDirectorySearcher(dir, true) 226} 227 228// NewDirectorySearcherFast is like NewDirectorySearcher, but does not block 229// on the initial loading of shards. 230// 231// This exists since in the case of zoekt-webserver we are happy with having 232// partial availability since that is better than no availability on large 233// instances. 234func NewDirectorySearcherFast(dir string) (zoekt.Streamer, error) { 235 return newDirectorySearcher(dir, false) 236} 237 238func newDirectorySearcher(dir string, waitUntilReady bool) (zoekt.Streamer, error) { 239 ss := newShardedSearcher(int64(runtime.GOMAXPROCS(0))) 240 tl := &loader{ 241 ss: ss, 242 } 243 dw, err := newDirectoryWatcher(dir, tl) 244 if err != nil { 245 return nil, err 246 } 247 248 if waitUntilReady { 249 if err := dw.WaitUntilReady(); err != nil { 250 return nil, err 251 } 252 } 253 254 ds := &directorySearcher{ 255 Streamer: ss, 256 directoryWatcher: dw, 257 } 258 259 return &typeRepoSearcher{Streamer: ds}, nil 260} 261 262type directorySearcher struct { 263 zoekt.Streamer 264 265 directoryWatcher *DirectoryWatcher 266} 267 268func (s *directorySearcher) Close() { 269 // We need to Stop directoryWatcher first since it calls load/unload on 270 // Searcher. 271 s.directoryWatcher.Stop() 272 s.Streamer.Close() 273} 274 275type loader struct { 276 ss *shardedSearcher 277} 278 279func (tl *loader) load(keys ...string) { 280 // This is called with all keys on startup, so once this function has 281 // finished running shardedSearcher will be ready. 282 defer tl.ss.markReady() 283 284 if len(keys) == 0 { 285 // If there's nothing to load, we exit early here, but we want to mark 286 // ourselves as ready. 287 return 288 } 289 290 var ( 291 mu sync.Mutex // synchronizes writes to the shards map 292 wg sync.WaitGroup // used to wait for all shards to load 293 sem = semaphore.NewWeighted(int64(runtime.GOMAXPROCS(0))) 294 loadedShards = make(map[string]zoekt.Searcher) 295 ) 296 297 publishLoaded := func() { 298 mu.Lock() 299 chunk := loadedShards 300 loadedShards = make(map[string]zoekt.Searcher) 301 mu.Unlock() 302 tl.ss.replace(chunk) 303 } 304 305 log.Printf("loading %d shard(s): %s", len(keys), humanTruncateList(keys, 5)) 306 307 lastProgress := time.Now() 308 for i, key := range keys { 309 // If taking a while to start-up occasionally give a progress message 310 if time.Since(lastProgress) > 5*time.Second { 311 log.Printf("still need to load %d shards...", len(keys)-i) 312 lastProgress = time.Now() 313 314 publishLoaded() 315 } 316 317 _ = sem.Acquire(context.Background(), 1) 318 wg.Add(1) 319 320 go func(key string) { 321 defer sem.Release(1) 322 defer wg.Done() 323 324 shard, err := loadShard(key) 325 if err != nil { 326 metricShardsLoadFailedTotal.Inc() 327 log.Printf("reloading: %s, err %v ", key, err) 328 return 329 } 330 metricShardsLoadedTotal.Inc() 331 332 mu.Lock() 333 loadedShards[key] = shard 334 mu.Unlock() 335 }(key) 336 } 337 338 wg.Wait() 339 340 publishLoaded() 341} 342 343func (tl *loader) drop(keys ...string) { 344 shards := make(map[string]zoekt.Searcher, len(keys)) 345 for _, key := range keys { 346 shards[key] = nil 347 } 348 tl.ss.replace(shards) 349} 350 351func (ss *shardedSearcher) String() string { 352 return "shardedSearcher" 353} 354 355// Close closes references to open files. It may be called only once. 356func (ss *shardedSearcher) Close() { 357 ss.mu.Lock() 358 shards := make(map[string]zoekt.Searcher, len(ss.shards)) 359 for k := range ss.shards { 360 shards[k] = nil 361 } 362 ss.mu.Unlock() 363 364 ss.replace(shards) 365} 366 367func selectRepoSet(shards []*rankedShard, q query.Q) ([]*rankedShard, query.Q) { 368 and, ok := q.(*query.And) 369 if ok { 370 return doSelectRepoSet(shards, and) 371 } 372 373 // We have queries which look like (reposet ...) and we want to do the same 374 // optimizations. To simplify we just always wrap the query in And and then 375 // on the return value call Simplify to unwrap. In particular this is 376 // important for List calls. 377 and = &query.And{Children: []query.Q{q}} 378 shards, q = doSelectRepoSet(shards, and) 379 return shards, query.Simplify(q) 380} 381 382func doSelectRepoSet(shards []*rankedShard, and *query.And) ([]*rankedShard, query.Q) { 383 // (and (reposet ...) (q)) 384 // (and true (q)) with a filtered shards 385 // (and false) // noop 386 387 // (and (repobranches ...) (q)) 388 // (and (repobranches ...) (q)) 389 390 // Note: we also support (and (repo ...) (q)) even though sourcegraph does 391 // not generate those sorts of queries. This is to support manual testing. 392 393 hasReposForPredicate := func(pred func(repo *zoekt.Repository) bool) func(repos []*zoekt.Repository) (any, all bool) { 394 return func(repos []*zoekt.Repository) (any, all bool) { 395 any = false 396 all = true 397 for _, repo := range repos { 398 b := pred(repo) 399 any = any || b 400 all = all && b 401 } 402 return any, all 403 } 404 } 405 406 for i, c := range and.Children { 407 var setSize int 408 var hasRepos func([]*zoekt.Repository) (bool, bool) 409 switch setQuery := c.(type) { 410 case *query.RepoSet: 411 setSize = len(setQuery.Set) 412 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 413 return setQuery.Set[repo.Name] 414 }) 415 case *query.RepoIDs: 416 setSize = int(setQuery.Repos.GetCardinality()) 417 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 418 return setQuery.Repos.Contains(repo.ID) 419 }) 420 case *query.Repo: 421 setSize = 0 422 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 423 return setQuery.Regexp.MatchString(repo.Name) 424 }) 425 case *query.BranchesRepos: 426 for _, br := range setQuery.List { 427 setSize += int(br.Repos.GetCardinality()) 428 } 429 430 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 431 for _, br := range setQuery.List { 432 if br.Repos.Contains(repo.ID) { 433 return true 434 } 435 } 436 return false 437 }) 438 default: 439 continue 440 } 441 442 // setSize may be larger than the number of shards we have. The size of 443 // filtered is bounded by min(len(set), len(shards)) 444 if setSize > len(shards) { 445 setSize = len(shards) 446 } 447 448 filtered := make([]*rankedShard, 0, setSize) 449 filteredAll := true 450 451 for _, s := range shards { 452 if s.repos == nil { 453 // repos is nil if we failed to List the shard. This shouldn't 454 // happen, but if it does we don't know what is in it and must search 455 // it without simplifying the query. 456 filtered = append(filtered, s) 457 filteredAll = false 458 } else if any, all := hasRepos(s.repos); any { 459 filtered = append(filtered, s) 460 filteredAll = filteredAll && all 461 } 462 } 463 464 // We don't need to adjust the query since we are returning an empty set 465 // of shards to search. 466 if len(filtered) == 0 { 467 return filtered, and 468 } 469 470 // We can't simplify the query since we are searching shards which contain 471 // repos we aren't supposed to search. 472 if !filteredAll { 473 return filtered, and 474 } 475 476 // We don't want to mutate the original and, so we clone it before 477 // mutating it. 478 and = &query.And{Children: slices.Clone(and.Children)} 479 480 // This optimization allows us to avoid the work done by 481 // indexData.simplify for each shard. 482 // 483 // For example if our query is (and (reposet foo bar) (content baz)) 484 // then at this point filtered is [foo bar] and q is the same. For each 485 // shard indexData.simplify will simplify to (and true (content baz)) -> 486 // (content baz). This work can be done now once, rather than per shard. 487 switch c := c.(type) { 488 case *query.RepoSet, *query.RepoIDs, *query.Repo: 489 and.Children[i] = &query.Const{Value: true} 490 return filtered, query.Simplify(and) 491 492 case *query.BranchesRepos: 493 // We can only replace if all the repos want the same branches. We 494 // simplify and just check that we are requesting 1 branch. The common 495 // case is just asking for HEAD, so this should be effective. 496 if len(c.List) != 1 { 497 return filtered, and 498 } 499 500 // Every repo wants the same branches, so we can replace RepoBranches 501 // with a list of branch queries. 502 and.Children[i] = &query.Branch{Pattern: c.List[0].Branch, Exact: true} 503 return filtered, query.Simplify(and) 504 } 505 506 // Stop after first RepoSet, otherwise we might append duplicate 507 // shards to `filtered` 508 return filtered, and 509 } 510 511 return shards, and 512} 513 514func (ss *shardedSearcher) Search(ctx context.Context, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) { 515 tr, ctx := trace.New(ctx, "shardedSearcher.Search", "") 516 defer func() { 517 tr.Finish() 518 }() 519 ctx, cancel := context.WithCancel(ctx) 520 defer cancel() 521 522 collectSender := newCollectSender(opts) 523 524 start := time.Now() 525 proc, err := ss.sched.Acquire(ctx) 526 if err != nil { 527 return nil, err 528 } 529 defer proc.Release() 530 tr.LazyPrintf("acquired process") 531 532 wait := time.Since(start) 533 start = time.Now() 534 535 loaded := ss.getLoaded() 536 done, err := streamSearch(ctx, proc, q, opts, loaded.shards, collectSender) 537 defer done() 538 if err != nil { 539 return nil, err 540 } 541 542 aggregate, ok := collectSender.Done() 543 if !ok { 544 aggregate = &zoekt.SearchResult{ 545 RepoURLs: map[string]string{}, 546 LineFragments: map[string]string{}, 547 } 548 } 549 550 copyFiles(aggregate) 551 552 if !loaded.ready { 553 // We may have missed results due to not being fully loaded. 554 aggregate.Stats.Crashes++ 555 } 556 557 aggregate.Stats.Wait = wait 558 aggregate.Stats.Duration = time.Since(start) 559 560 return aggregate, nil 561} 562 563func (ss *shardedSearcher) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, sender zoekt.Sender) (err error) { 564 tr, ctx := trace.New(ctx, "shardedSearcher.StreamSearch", "") 565 defer func() { 566 if err != nil { 567 tr.LazyPrintf("error: %v", err) 568 tr.SetError(err) 569 } 570 tr.Finish() 571 }() 572 573 start := time.Now() 574 proc, err := ss.sched.Acquire(ctx) 575 if err != nil { 576 return err 577 } 578 defer proc.Release() 579 tr.LazyPrintf("acquired process") 580 581 loaded := ss.getLoaded() 582 shards := loaded.shards 583 584 maxPendingPriority := math.Inf(-1) 585 if len(shards) > 0 { 586 maxPendingPriority = shards[0].priority 587 } 588 589 stillLoadingCrashes := 0 590 if !loaded.ready { 591 // We may have missed results due to not being fully loaded. 592 stillLoadingCrashes++ 593 } 594 595 sender.Send(&zoekt.SearchResult{ 596 Stats: zoekt.Stats{ 597 Crashes: stillLoadingCrashes, 598 Wait: time.Since(start), 599 }, 600 Progress: zoekt.Progress{ 601 MaxPendingPriority: maxPendingPriority, 602 }, 603 }) 604 605 // Matches flow from the shards up the stack in the following order: 606 // 607 // 1. Search shards 608 // 2. flushCollectSender (aggregate) 609 // 3. limitSender (limit) 610 // 4. copyFileSender (copy) 611 // 612 // For streaming, the wrapping has to happen in the inverted order. 613 sender = copyFileSender(sender) 614 615 if truncator, hasLimits := zoekt.NewDisplayTruncator(opts); hasLimits { 616 var cancel context.CancelFunc 617 ctx, cancel = context.WithCancel(ctx) 618 defer cancel() 619 sender = limitSender(cancel, sender, truncator) 620 } 621 622 sender, flush := newFlushCollectSender(opts, sender) 623 624 done, err := streamSearch(ctx, proc, q, opts, shards, sender) 625 626 // Even though streaming is done, we may have results sitting in a buffer we 627 // need to flush. So we need to send those before calling done. 628 flush() 629 done() 630 631 return err 632} 633 634// streamSearch is an internal helper since both Search and StreamSearch are 635// largely similar. 636// 637// done must always be called, even if err is non-nil. The SearchResults sent 638// via sender contain references to the underlying mmap data that the garbage 639// collector can't see. Calling done informs the garbage collector it is free 640// to collect those shards. The caller must call copyFiles on any 641// SearchResults it returns/streams out before calling done. 642func streamSearch(ctx context.Context, proc *process, q query.Q, opts *zoekt.SearchOptions, shards []*rankedShard, sender zoekt.Sender) (done func(), err error) { 643 tr, ctx := trace.New(ctx, "shardedSearcher.streamSearch", "") 644 overallStart := time.Now() 645 metricSearchRunning.Inc() 646 defer func() { 647 metricSearchRunning.Dec() 648 metricSearchDuration.Observe(time.Since(overallStart).Seconds()) 649 if err != nil { 650 metricSearchFailedTotal.Inc() 651 652 tr.LazyPrintf("error: %v", err) 653 tr.SetError(err) 654 } 655 tr.Finish() 656 }() 657 658 // Select the subset of shards that we will search over for the given query. 659 { 660 beforeLen := len(shards) 661 beforeQ := q 662 shards, q = selectRepoSet(shards, q) 663 tr.LazyPrintf("selectRepoSet shards=%d->%d q=%s->%s", beforeLen, len(shards), beforeQ, q) 664 } 665 666 if len(shards) == 0 { 667 return func() {}, nil 668 } 669 670 var cancel context.CancelFunc 671 if opts.MaxWallTime == 0 { 672 ctx, cancel = context.WithCancel(ctx) 673 } else { 674 ctx, cancel = context.WithTimeout(ctx, opts.MaxWallTime) 675 } 676 677 defer cancel() 678 679 // We set the number of workers to GOMAXPROCS, or the number of shards, 680 // whichever is smaller. 681 workers := runtime.GOMAXPROCS(0) 682 if workers > len(shards) { 683 workers = len(shards) 684 } 685 686 type result struct { 687 priority float64 688 *zoekt.SearchResult 689 err error 690 } 691 692 var ( 693 // buffered channels to continue searching when sending back results 694 // takes a while / blocks. The maximum pending result set is workers * 2. 695 results = make(chan *result, workers) 696 search = make(chan *rankedShard, workers) 697 wg sync.WaitGroup 698 ) 699 700 // Start workers that receive shards from the search channel, search them, 701 // and send the results to the results channel. This process is repeated 702 // until the search channel is closed. 703 // 704 // Note: Making "search" a buffered channel has the effect of limiting the number of parallel shard searches. 705 // Since searching is mostly CPU bound, limiting parallel shard searches also reduces the peak working set. 706 wg.Add(workers) 707 for i := 0; i < workers; i++ { 708 go func() { 709 defer wg.Done() 710 for s := range search { 711 sr, err := searchOneShard(ctx, s, q, opts) 712 r := &result{priority: s.priority, SearchResult: sr, err: err} 713 results <- r 714 } 715 }() 716 } 717 718 go func() { 719 wg.Wait() 720 close(results) 721 }() 722 723 var ( 724 pending = make(prioritySlice, 0, workers) 725 shard = 0 726 next = shards[shard] 727 728 // We need a separate nil-able reference to the same channel so we can close(search) for the worker 729 // go-routines to finish but also set work to nil in order for the select statement below to ignore 730 // that case when we want to stop a search. This is needed because sending on a closed channel panics. 731 work = search 732 ) 733 734 stop := func() { 735 if work != nil { 736 close(search) 737 work = nil 738 next = nil 739 } 740 } 741 742 // tracked so we can stop when we hit TotalMaxMatchCount 743 var totalMatchCount int 744 745search: 746 for { 747 // At the top of each iteration, have the proc associated with this search yield its won "timeslice" 748 // to possibly allow other searches to make progress 749 _ = proc.Yield(ctx) // Note: we let searchOneShard handle context errors 750 751 select { 752 case work <- next: // is there a worker available to search the next shard? 753 pending.append(next.priority) 754 755 shard++ 756 if shard == len(shards) { 757 stop() 758 } else { 759 next = shards[shard] 760 } 761 case r, ok := <-results: // is there a result to send back? 762 if !ok { 763 break search 764 } 765 766 // delete this result's priority from pending before computing the new max pending priority 767 pending.remove(r.priority) 768 769 if r.err != nil { 770 // Set final error and stop searching new shards, but consume any pending 771 // search results. 772 stop() 773 err = r.err 774 continue 775 } 776 777 // Update the match count statistics and stop searching new shards if we've 778 // reached the limit set in the options. 779 totalMatchCount += r.SearchResult.Stats.MatchCount 780 if opts.TotalMaxMatchCount > 0 && totalMatchCount > opts.TotalMaxMatchCount { 781 stop() 782 } 783 784 observeMetrics(r.SearchResult) 785 786 r.Priority = r.priority 787 r.MaxPendingPriority = pending.max() 788 789 sendByRepository(r.SearchResult, opts, sender) // send the result back to the client 790 } 791 } 792 793 return func() { runtime.KeepAlive(shards) }, err 794} 795 796// sendByRepository splits a zoekt.SearchResult by repository and calls 797// sender.Send for each batch. Ranking in Sourcegraph expects zoekt.SearchResult 798// to contain results with the same zoekt.SearchResult.Priority only. 799// 800// We split by repository instead of by priority because it is easier to set 801// RepoURLs and LineFragments in zoekt.SearchResult. 802func sendByRepository(result *zoekt.SearchResult, opts *zoekt.SearchOptions, sender zoekt.Sender) { 803 if len(result.RepoURLs) <= 1 || len(result.Files) == 0 { 804 zoekt.SortFiles(result.Files) 805 sender.Send(result) 806 return 807 } 808 809 send := func(repoName string, a, b int, stats zoekt.Stats) { 810 zoekt.SortFiles(result.Files[a:b]) 811 sender.Send(&zoekt.SearchResult{ 812 Stats: stats, 813 Progress: zoekt.Progress{ 814 Priority: result.Files[a].RepositoryPriority, 815 MaxPendingPriority: result.MaxPendingPriority, 816 }, 817 Files: result.Files[a:b], 818 RepoURLs: map[string]string{repoName: result.RepoURLs[repoName]}, 819 LineFragments: map[string]string{repoName: result.LineFragments[repoName]}, 820 }) 821 } 822 823 var startIndex, endIndex int 824 curRepoID := result.Files[0].RepositoryID 825 curRepoName := result.Files[0].Repository 826 827 fm := zoekt.FileMatch{} 828 for endIndex, fm = range result.Files { 829 if curRepoID != fm.RepositoryID { 830 // Stats must stay aggregate-able, hence we sent the aggregate stats with the 831 // last event. 832 send(curRepoName, startIndex, endIndex, zoekt.Stats{}) 833 834 startIndex = endIndex 835 curRepoID = fm.RepositoryID 836 curRepoName = fm.Repository 837 } 838 } 839 840 send(curRepoName, startIndex, endIndex+1, result.Stats) 841} 842 843func observeMetrics(sr *zoekt.SearchResult) { 844 metricSearchContentBytesLoadedTotal.Add(float64(sr.Stats.ContentBytesLoaded)) 845 metricSearchIndexBytesLoadedTotal.Add(float64(sr.Stats.IndexBytesLoaded)) 846 metricSearchCrashesTotal.Add(float64(sr.Stats.Crashes)) 847 metricSearchFileCountTotal.Add(float64(sr.Stats.FileCount)) 848 metricSearchShardFilesConsideredTotal.Add(float64(sr.Stats.ShardFilesConsidered)) 849 metricSearchFilesConsideredTotal.Add(float64(sr.Stats.FilesConsidered)) 850 metricSearchFilesLoadedTotal.Add(float64(sr.Stats.FilesLoaded)) 851 metricSearchFilesSkippedTotal.Add(float64(sr.Stats.FilesSkipped)) 852 metricSearchShardsSkippedTotal.Add(float64(sr.Stats.ShardsSkipped)) 853 metricSearchMatchCountTotal.Add(float64(sr.Stats.MatchCount)) 854 metricSearchNgramMatchesTotal.Add(float64(sr.Stats.NgramMatches)) 855 metricSearchNgramLookupsTotal.Add(float64(sr.Stats.NgramLookups)) 856 metricSearchRegexpsConsideredTotal.Add(float64(sr.Stats.RegexpsConsidered)) 857} 858 859func copySlice(src *[]byte) { 860 if *src == nil { 861 return 862 } 863 dst := make([]byte, len(*src)) 864 copy(dst, *src) 865 *src = dst 866} 867 868func copyFiles(sr *zoekt.SearchResult) { 869 for i := range sr.Files { 870 copySlice(&sr.Files[i].Content) 871 copySlice(&sr.Files[i].Checksum) 872 for l := range sr.Files[i].LineMatches { 873 copySlice(&sr.Files[i].LineMatches[l].Line) 874 copySlice(&sr.Files[i].LineMatches[l].Before) 875 copySlice(&sr.Files[i].LineMatches[l].After) 876 } 877 for c := range sr.Files[i].ChunkMatches { 878 copySlice(&sr.Files[i].ChunkMatches[c].Content) 879 } 880 } 881} 882 883func searchOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) { 884 metricSearchShardRunning.Inc() 885 defer func() { 886 metricSearchShardRunning.Dec() 887 if e := recover(); e != nil { 888 log.Printf("crashed shard: %s: %#v, %s", s, e, debug.Stack()) 889 890 if sr == nil { 891 sr = &zoekt.SearchResult{} 892 } 893 sr.Stats.Crashes = 1 894 } 895 }() 896 897 return s.Search(ctx, q, opts) 898} 899 900type shardListResult struct { 901 rl *zoekt.RepoList 902 err error 903} 904 905func listOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.ListOptions, sink chan shardListResult) { 906 metricListShardRunning.Inc() 907 defer func() { 908 metricListShardRunning.Dec() 909 if r := recover(); r != nil { 910 log.Printf("crashed shard: %s: %s, %s", s.String(), r, debug.Stack()) 911 sink <- shardListResult{ 912 &zoekt.RepoList{Crashes: 1}, nil, 913 } 914 } 915 }() 916 917 ms, err := s.List(ctx, q, opts) 918 sink <- shardListResult{ms, err} 919} 920 921func (ss *shardedSearcher) List(ctx context.Context, q query.Q, opts *zoekt.ListOptions) (rl *zoekt.RepoList, err error) { 922 tr, ctx := trace.New(ctx, "shardedSearcher.List", "") 923 metricListRunning.Inc() 924 defer func() { 925 metricListRunning.Dec() 926 if rl != nil { 927 tr.LazyPrintf("repos.size=%d reposmap.size=%d crashes=%d", len(rl.Repos), len(rl.ReposMap), rl.Crashes) 928 } 929 if err != nil { 930 tr.LazyPrintf("error: %v", err) 931 tr.SetError(err) 932 } 933 tr.Finish() 934 }() 935 936 q = query.Simplify(q) 937 isAll := false 938 if c, ok := q.(*query.Const); ok { 939 isAll = c.Value 940 } 941 942 proc, err := ss.sched.Acquire(ctx) 943 if err != nil { 944 return nil, err 945 } 946 defer proc.Release() 947 tr.LazyPrintf("acquired process") 948 949 loaded := ss.getLoaded() 950 shards := loaded.shards 951 952 // Setup what we return now, since we may short circuit if there are no 953 // shards to search. 954 stillLoadingCrashes := 0 955 if !loaded.ready { 956 // We may have missed results due to not being fully loaded. 957 stillLoadingCrashes++ 958 } 959 agg := zoekt.RepoList{ 960 Crashes: stillLoadingCrashes, 961 ReposMap: zoekt.ReposMap{}, 962 Repos: []*zoekt.RepoListEntry{}, 963 } 964 965 // PERF: Select the subset of shards that we will search over for the given 966 // query. A common List query only asks for a specific repo, so this is an 967 // important optimization. 968 { 969 beforeLen := len(shards) 970 beforeQ := q 971 shards, q = selectRepoSet(shards, q) 972 tr.LazyPrintf("selectRepoSet shards=%d->%d q=%s->%s", beforeLen, len(shards), beforeQ, q) 973 } 974 975 if len(shards) == 0 { 976 return &agg, nil 977 } 978 979 shardCount := len(shards) 980 all := make(chan shardListResult, shardCount) 981 feeder := make(chan zoekt.Searcher, len(shards)) 982 for _, s := range shards { 983 feeder <- s 984 } 985 close(feeder) 986 987 for i := 0; i < runtime.GOMAXPROCS(0); i++ { 988 go func() { 989 for s := range feeder { 990 listOneShard(ctx, s, q, opts, all) 991 } 992 }() 993 } 994 995 uniq := map[string]*zoekt.RepoListEntry{} 996 997 for range shards { 998 r := <-all 999 if r.err != nil { 1000 return nil, r.err 1001 } 1002 1003 agg.Crashes += r.rl.Crashes 1004 agg.Stats.Add(&r.rl.Stats) 1005 1006 for _, r := range r.rl.Repos { 1007 prev, ok := uniq[r.Repository.Name] 1008 if !ok { 1009 cp := *r // We need to copy because we mutate r.Stats when merging duplicates 1010 uniq[r.Repository.Name] = &cp 1011 } else { 1012 prev.Stats.Add(&r.Stats) 1013 } 1014 } 1015 1016 for id, r := range r.rl.ReposMap { 1017 _, ok := agg.ReposMap[id] 1018 if !ok { 1019 agg.ReposMap[id] = r 1020 } 1021 } 1022 } 1023 1024 agg.Repos = make([]*zoekt.RepoListEntry, 0, len(uniq)) 1025 for _, r := range uniq { 1026 agg.Repos = append(agg.Repos, r) 1027 } 1028 1029 // Only one of these fields is populated and in all cases the size of that 1030 // field is the number of Repos. 1031 // 1032 // Note: we don't just add individual Stats.Repos since a repository can 1033 // have multiple shards. 1034 agg.Stats.Repos = len(uniq) + len(agg.ReposMap) 1035 1036 if isAll && len(agg.Repos) > 0 { 1037 reportListAllMetrics(agg.Repos) 1038 } 1039 1040 return &agg, nil 1041} 1042 1043func reportListAllMetrics(repos []*zoekt.RepoListEntry) { 1044 var stats zoekt.RepoStats 1045 for _, r := range repos { 1046 stats.Add(&r.Stats) 1047 } 1048 1049 metricListAllRepos.Set(float64(stats.Repos)) 1050 metricListAllIndexBytes.Set(float64(stats.IndexBytes)) 1051 metricListAllContentBytes.Set(float64(stats.ContentBytes)) 1052 metricListAllDocuments.Set(float64(stats.Documents)) 1053 metricListAllShards.Set(float64(stats.Shards)) 1054 metricListAllNewLinesCount.Set(float64(stats.NewLinesCount)) 1055 metricListAllDefaultBranchNewLinesCount.Set(float64(stats.DefaultBranchNewLinesCount)) 1056 metricListAllOtherBranchesNewLinesCount.Set(float64(stats.OtherBranchesNewLinesCount)) 1057} 1058 1059// getLoaded returns the currently loaded shards. Shared so do not mutate. 1060func (s *shardedSearcher) getLoaded() loaded { 1061 // next commit will store the true value of this, for now we keep the 1062 // backwards compatible behaviour. 1063 ready := s.ready.Load() 1064 // ranked is loaded after ready to avoid a race were ready is true but 1065 // ranked is still not the final set of shards. 1066 ranked, _ := s.ranked.Load().([]*rankedShard) 1067 return loaded{ 1068 shards: ranked, 1069 ready: ready, 1070 } 1071} 1072 1073func mkRankedShard(s zoekt.Searcher) *rankedShard { 1074 q := query.Const{Value: true} 1075 result, err := s.List(context.Background(), &q, nil) 1076 if err != nil { 1077 log.Printf("mkRankedShard(%s): failed to cache repository list: %v", s, err) 1078 return &rankedShard{Searcher: s} 1079 } 1080 1081 var ( 1082 maxPriority float64 1083 repos = make([]*zoekt.Repository, 0, len(result.Repos)) 1084 ) 1085 for i := range result.Repos { 1086 repo := &result.Repos[i].Repository 1087 repos = append(repos, repo) 1088 if repo.RawConfig != nil { 1089 priority, _ := strconv.ParseFloat(repo.RawConfig["priority"], 64) 1090 if priority > maxPriority { 1091 maxPriority = priority 1092 } 1093 } 1094 } 1095 1096 return &rankedShard{ 1097 Searcher: s, 1098 repos: repos, 1099 priority: maxPriority, 1100 } 1101} 1102 1103// markReady should be called once all shards have been passed into replace on 1104// startup. Once s is marked as ready it stops reporting a Crash in the 1105// response Stats. 1106func (s *shardedSearcher) markReady() { 1107 s.ready.CompareAndSwap(false, true) 1108} 1109 1110func (s *shardedSearcher) replace(shards map[string]zoekt.Searcher) { 1111 if len(shards) == 0 { 1112 return 1113 } 1114 1115 defer func(began time.Time) { 1116 metricShardsBatchReplaceDurationSeconds.Observe(time.Since(began).Seconds()) 1117 }(time.Now()) 1118 1119 s.mu.Lock() 1120 defer s.mu.Unlock() 1121 1122 for key, shard := range shards { 1123 var r *rankedShard 1124 if shard != nil { 1125 r = mkRankedShard(shard) 1126 } 1127 1128 old := s.shards[key] 1129 if shard == nil { 1130 delete(s.shards, key) 1131 } else { 1132 s.shards[key] = r 1133 } 1134 1135 if old != nil && old.Searcher != nil { 1136 // _ ___ /^^\ /^\ /^^\_ 1137 // _ _@)@) \ ,,/ '` ~ `'~~ ', `\. 1138 // _/o\_ _ _ _/~`.`...'~\ ./~~..,'`','',.,' ' ~: 1139 // / `,'.~,~.~ . , . , ~|, ,/ .,' , ,. .. ,,. `, ~\_ 1140 // ( ' _' _ '_` _ ' . , `\_/ .' ..' ' ` ` `.. `, \_ 1141 // ~V~ V~ V~ V~ ~\ ` ' . ' , ' .,.,''`.,.''`.,.``. ', \_ 1142 // _/\ /\ /\ /\_/, . ' , `_/~\_ .' .,. ,, , _/~\_ `. `. '., \_ 1143 // < ~ ~ '~`'~'`, ., . `_: ::: \_ ' `_/ ::: \_ `.,' . ', \_ 1144 // \ ' `_ '`_ _ ',/ _::_::_ \ _ _/ _::_::_ \ `.,'.,`., \-,-,-,_,_, 1145 // `'~~ `'~~ `'~~ `'~~ \(_)(_)(_)/ `~~' \(_)(_)(_)/ ~'`\_.._,._,'_;_;_;_;_; 1146 // 1147 // We can't just call Close now, because there may be ongoing searches 1148 // which have old in the shards list. Previously we used an exclusive 1149 // lock to guarantee there were no concurrent searches. However, that 1150 // led to blocking on the read path. 1151 // 1152 // We could introduce granular locking per rankedShard to know when 1153 // there are no more references. However, this becomes tricky in 1154 // practice. Instead we rely on the garbage collector noticing old is no 1155 // longer used. We take care in our searchers to runtime.KeepAlive until 1156 // we have stopped referencing the underling mmap data. 1157 runtime.SetFinalizer(old, func(r *rankedShard) { 1158 r.Close() 1159 }) 1160 } 1161 } 1162 1163 ranked := make([]*rankedShard, 0, len(s.shards)) 1164 for _, r := range s.shards { 1165 ranked = append(ranked, r) 1166 } 1167 1168 sort.Slice(ranked, func(i, j int) bool { 1169 priorityDiff := ranked[i].priority - ranked[j].priority 1170 if priorityDiff != 0 { 1171 return priorityDiff > 0 1172 } 1173 if len(ranked[i].repos) == 0 || len(ranked[j].repos) == 0 { 1174 // Protect against empty names which can happen if we fail to List or 1175 // the shard is full of tombstones. Prefer the shard which has names. 1176 return len(ranked[i].repos) >= len(ranked[j].repos) 1177 } 1178 return ranked[i].repos[0].Name < ranked[j].repos[0].Name 1179 }) 1180 1181 s.ranked.Store(ranked) 1182 1183 metricShardsLoaded.Set(float64(len(ranked))) 1184} 1185 1186func loadShard(fn string) (zoekt.Searcher, error) { 1187 f, err := os.Open(fn) 1188 if err != nil { 1189 return nil, err 1190 } 1191 1192 iFile, err := zoekt.NewIndexFile(f) 1193 if err != nil { 1194 return nil, err 1195 } 1196 s, err := zoekt.NewSearcher(iFile) 1197 if err != nil { 1198 iFile.Close() 1199 return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err) 1200 } 1201 1202 return s, nil 1203} 1204 1205// prioritySlice is a trivial implementation of an array that provides three 1206// things: appending a value, removing a value, and getting the array's max. 1207// Operations take O(n) time, which is acceptable because N is restricted to 1208// GOMAXPROCS (i.e., number of cpu cores) by the shardedSearcher interface. 1209type prioritySlice []float64 1210 1211func (p *prioritySlice) append(pri float64) { 1212 *p = append(*p, pri) 1213} 1214 1215func (p *prioritySlice) remove(pri float64) { 1216 for i, opri := range *p { 1217 if opri == pri { 1218 if i != len(*p)-1 { 1219 // swap to make this element the tail 1220 (*p)[i] = (*p)[len(*p)-1] 1221 } 1222 // pop the end off 1223 *p = (*p)[:len(*p)-1] 1224 break 1225 } 1226 } 1227} 1228 1229func (p *prioritySlice) max() float64 { 1230 // remove() and max() could be combined, but this is easier to read and 1231 // the expected performance difference from the extra lock and loop is 1232 // almost certainly irrelevant. 1233 maxPri := math.Inf(-1) 1234 for _, pri := range *p { 1235 if pri > maxPri { 1236 maxPri = pri 1237 } 1238 } 1239 return maxPri 1240}