fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

at tngl 40 kB View raw
1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package search 16 17import ( 18 "context" 19 "fmt" 20 "log" 21 "math" 22 "os" 23 "runtime" 24 "runtime/debug" 25 "slices" 26 "sort" 27 "strconv" 28 "sync" 29 "time" 30 31 "github.com/prometheus/client_golang/prometheus" 32 "github.com/prometheus/client_golang/prometheus/promauto" 33 sglog "github.com/sourcegraph/log" 34 "go.uber.org/atomic" 35 "golang.org/x/sync/semaphore" 36 37 "github.com/sourcegraph/zoekt" 38 "github.com/sourcegraph/zoekt/index" 39 "github.com/sourcegraph/zoekt/internal/tenant/systemtenant" 40 "github.com/sourcegraph/zoekt/internal/trace" 41 "github.com/sourcegraph/zoekt/query" 42) 43 44var ( 45 shardRecoveryLogger = sync.OnceValue(func() sglog.Logger { 46 return sglog.Scoped("searchShards") 47 }) 48 49 metricShardsLoaded = promauto.NewGauge(prometheus.GaugeOpts{ 50 Name: "zoekt_shards_loaded", 51 Help: "The number of shards currently loaded", 52 }) 53 metricShardsLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 54 Name: "zoekt_shards_loaded_total", 55 Help: "The total number of shards loaded", 56 }) 57 metricShardsLoadFailedTotal = promauto.NewCounter(prometheus.CounterOpts{ 58 Name: "zoekt_shards_load_failed_total", 59 Help: "The total number of shard loads that failed", 60 }) 61 62 metricSearchRunning = promauto.NewGauge(prometheus.GaugeOpts{ 63 Name: "zoekt_search_running", 64 Help: "The number of concurrent search requests running", 65 }) 66 metricSearchShardRunning = promauto.NewGauge(prometheus.GaugeOpts{ 67 Name: "zoekt_search_shard_running", 68 Help: "The number of concurrent search requests in a shard running", 69 }) 70 metricSearchFailedTotal = promauto.NewCounter(prometheus.CounterOpts{ 71 Name: "zoekt_search_failed_total", 72 Help: "The total number of search requests that failed", 73 }) 74 metricSearchDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 75 Name: "zoekt_search_duration_seconds", 76 Help: "The duration a search request took in seconds", 77 Buckets: prometheus.DefBuckets, // DefBuckets good for service timings 78 }) 79 80 // A Counter per Stat. Name should match field in zoekt.Stats. 81 metricSearchContentBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 82 Name: "zoekt_search_content_loaded_bytes_total", 83 Help: "Total amount of I/O for reading contents", 84 }) 85 metricSearchIndexBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 86 Name: "zoekt_search_index_loaded_bytes_total", 87 Help: "Total amount of I/O for reading from index", 88 }) 89 metricSearchCrashesTotal = promauto.NewCounter(prometheus.CounterOpts{ 90 Name: "zoekt_search_crashes_total", 91 Help: "Total number of search shards that had a crash", 92 }) 93 metricSearchFileCountTotal = promauto.NewCounter(prometheus.CounterOpts{ 94 Name: "zoekt_search_file_count_total", 95 Help: "Total number of files containing a match", 96 }) 97 metricSearchShardFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{ 98 Name: "zoekt_search_shard_files_considered_total", 99 Help: "Total number of files in shards that we considered", 100 }) 101 metricSearchFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{ 102 Name: "zoekt_search_files_considered_total", 103 Help: "Total files that we evaluated. Equivalent to files for which all atom matches (including negations) evaluated to true", 104 }) 105 metricSearchFilesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 106 Name: "zoekt_search_files_loaded_total", 107 Help: "Total files for which we loaded file content to verify substring matches", 108 }) 109 metricSearchFilesSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{ 110 Name: "zoekt_search_files_skipped_total", 111 Help: "Total candidate files whose contents weren't examined because we gathered enough matches", 112 }) 113 metricSearchShardsSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{ 114 Name: "zoekt_search_shards_skipped_total", 115 Help: "Total shards that we did not process because a query was canceled", 116 }) 117 metricSearchMatchCountTotal = promauto.NewCounter(prometheus.CounterOpts{ 118 Name: "zoekt_search_match_count_total", 119 Help: "Total number of non-overlapping matches", 120 }) 121 metricSearchNgramMatchesTotal = promauto.NewCounter(prometheus.CounterOpts{ 122 Name: "zoekt_search_ngram_matches_total", 123 Help: "Total number of candidate matches as a result of searching ngrams", 124 }) 125 metricSearchNgramLookupsTotal = promauto.NewCounter(prometheus.CounterOpts{ 126 Name: "zoekt_search_ngram_lookups_total", 127 Help: "Total number of times we accessed an ngram in the index", 128 }) 129 metricSearchRegexpsConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{ 130 Name: "zoekt_search_regexps_considered_total", 131 Help: "Total number of times regexp was called on files that we evaluated", 132 }) 133 134 metricListRunning = promauto.NewGauge(prometheus.GaugeOpts{ 135 Name: "zoekt_list_running", 136 Help: "The number of concurrent list requests running", 137 }) 138 metricListShardRunning = promauto.NewGauge(prometheus.GaugeOpts{ 139 Name: "zoekt_list_shard_running", 140 Help: "The number of concurrent list requests in a shard running", 141 }) 142 metricShardsBatchReplaceDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{ 143 Name: "zoekt_shards_batch_replace_duration_seconds", 144 Help: "The time it takes to replace a batch of Searchers.", 145 Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30}, 146 }) 147 metricListAllRepos = promauto.NewGauge(prometheus.GaugeOpts{ 148 Name: "zoekt_list_all_stats_repos", 149 Help: "The last List(true) value for RepoStats.Repos. Repos is used for aggregrating the number of repositories.", 150 }) 151 metricListAllShards = promauto.NewGauge(prometheus.GaugeOpts{ 152 Name: "zoekt_list_all_stats_shards", 153 Help: "The last List(true) value for RepoStats.Shards. Shards is the total number of search shards.", 154 }) 155 metricListAllDocuments = promauto.NewGauge(prometheus.GaugeOpts{ 156 Name: "zoekt_list_all_stats_documents", 157 Help: "The last List(true) value for RepoStats.Documents. Documents holds the number of documents or files.", 158 }) 159 metricListAllIndexBytes = promauto.NewGauge(prometheus.GaugeOpts{ 160 Name: "zoekt_list_all_stats_index_bytes", 161 Help: "The last List(true) value for RepoStats.IndexBytes. IndexBytes is the amount of RAM used for index overhead.", 162 }) 163 metricListAllContentBytes = promauto.NewGauge(prometheus.GaugeOpts{ 164 Name: "zoekt_list_all_stats_content_bytes", 165 Help: "The last List(true) value for RepoStats.ContentBytes. ContentBytes is the amount of RAM used for raw content.", 166 }) 167 metricListAllNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 168 Name: "zoekt_list_all_stats_new_lines_count", 169 Help: "The last List(true) value for RepoStats.NewLinesCount.", 170 }) 171 metricListAllDefaultBranchNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 172 Name: "zoekt_list_all_stats_default_branch_new_lines_count", 173 Help: "The last List(true) value for RepoStats.DefaultBranchNewLinesCount.", 174 }) 175 metricListAllOtherBranchesNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 176 Name: "zoekt_list_all_stats_other_branches_new_lines_count", 177 Help: "The last List(true) value for RepoStats.OtherBranchesNewLinesCount.", 178 }) 179) 180 181type rankedShard struct { 182 zoekt.Searcher 183 184 priority float64 // maximum priority across all repos in the shard 185 186 // We have out of band ranking on compound shards which can change even if 187 // the shard file does not. So we compute a rank in getShards. We store 188 // repos here to avoid the cost of List in the search request path. 189 // 190 // repos is nil only if that call failed. 191 repos []*zoekt.Repository 192} 193 194// loaded stores the state we compute when updating the state of shards from 195// disk. 196type loaded struct { 197 // shards is the currently loaded shards sorted by decreasing rank and 198 // should not be mutated. 199 shards []*rankedShard 200 201 // ready is true if sharded searcher has finished loading all initial 202 // shards on startup. 203 ready bool 204} 205 206type shardedSearcher struct { 207 // Limit the number of parallel queries. Since searching is 208 // CPU bound, we can't do better than #CPU queries in 209 // parallel. If we do so, we just create more memory 210 // pressure. 211 sched scheduler 212 213 mu sync.Mutex // protects writes to shards 214 shards map[string]*rankedShard 215 216 ready atomic.Bool 217 ranked atomic.Value 218} 219 220func newShardedSearcher(n int64) *shardedSearcher { 221 ss := &shardedSearcher{ 222 shards: make(map[string]*rankedShard), 223 sched: newScheduler(n), 224 } 225 return ss 226} 227 228// NewDirectorySearcher returns a searcher instance that loads all 229// shards corresponding to a glob into memory. 230func NewDirectorySearcher(dir string) (zoekt.Streamer, error) { 231 return newDirectorySearcher(dir, true) 232} 233 234// NewDirectorySearcherFast is like NewDirectorySearcher, but does not block 235// on the initial loading of shards. 236// 237// This exists since in the case of zoekt-webserver we are happy with having 238// partial availability since that is better than no availability on large 239// instances. 240func NewDirectorySearcherFast(dir string) (zoekt.Streamer, error) { 241 return newDirectorySearcher(dir, false) 242} 243 244func newDirectorySearcher(dir string, waitUntilReady bool) (zoekt.Streamer, error) { 245 ss := newShardedSearcher(int64(runtime.GOMAXPROCS(0))) 246 tl := &loader{ 247 ss: ss, 248 } 249 dw, err := newDirectoryWatcher(dir, tl) 250 if err != nil { 251 return nil, err 252 } 253 254 if waitUntilReady { 255 if err := dw.WaitUntilReady(); err != nil { 256 return nil, err 257 } 258 } 259 260 ds := &directorySearcher{ 261 Streamer: ss, 262 directoryWatcher: dw, 263 } 264 265 return &typeRepoSearcher{Streamer: ds}, nil 266} 267 268type directorySearcher struct { 269 zoekt.Streamer 270 271 directoryWatcher *DirectoryWatcher 272} 273 274func (s *directorySearcher) Close() { 275 // We need to Stop directoryWatcher first since it calls load/unload on 276 // Searcher. 277 s.directoryWatcher.Stop() 278 s.Streamer.Close() 279} 280 281type loader struct { 282 ss *shardedSearcher 283} 284 285func (tl *loader) load(keys ...string) { 286 // This is called with all keys on startup, so once this function has 287 // finished running shardedSearcher will be ready. 288 defer tl.ss.markReady() 289 290 if len(keys) == 0 { 291 // If there's nothing to load, we exit early here, but we want to mark 292 // ourselves as ready. 293 return 294 } 295 296 var ( 297 mu sync.Mutex // synchronizes writes to the shards map 298 wg sync.WaitGroup // used to wait for all shards to load 299 sem = semaphore.NewWeighted(int64(runtime.GOMAXPROCS(0))) 300 loadedShards = make(map[string]zoekt.Searcher) 301 ) 302 303 publishLoaded := func() { 304 mu.Lock() 305 chunk := loadedShards 306 loadedShards = make(map[string]zoekt.Searcher) 307 mu.Unlock() 308 tl.ss.replace(chunk) 309 } 310 311 log.Printf("[INFO] loading %d shard(s): %s", len(keys), humanTruncateList(keys, 5)) 312 313 lastProgress := time.Now() 314 for i, key := range keys { 315 // If taking a while to start-up occasionally give a progress message 316 if time.Since(lastProgress) > 5*time.Second { 317 log.Printf("[INFO] still need to load %d shards...", len(keys)-i) 318 lastProgress = time.Now() 319 320 publishLoaded() 321 } 322 323 _ = sem.Acquire(context.Background(), 1) 324 wg.Add(1) 325 326 go func(key string) { 327 defer sem.Release(1) 328 defer wg.Done() 329 330 shard, err := loadShard(key) 331 if err != nil { 332 metricShardsLoadFailedTotal.Inc() 333 log.Printf("[ERROR] reloading: %s, err %v ", key, err) 334 return 335 } 336 metricShardsLoadedTotal.Inc() 337 338 mu.Lock() 339 loadedShards[key] = shard 340 mu.Unlock() 341 }(key) 342 } 343 344 wg.Wait() 345 346 publishLoaded() 347} 348 349func (tl *loader) drop(keys ...string) { 350 shards := make(map[string]zoekt.Searcher, len(keys)) 351 for _, key := range keys { 352 shards[key] = nil 353 } 354 tl.ss.replace(shards) 355} 356 357func (ss *shardedSearcher) String() string { 358 return "shardedSearcher" 359} 360 361// Close closes references to open files. It may be called only once. 362func (ss *shardedSearcher) Close() { 363 ss.mu.Lock() 364 shards := make(map[string]zoekt.Searcher, len(ss.shards)) 365 for k := range ss.shards { 366 shards[k] = nil 367 } 368 ss.mu.Unlock() 369 370 ss.replace(shards) 371} 372 373func selectRepoSet(shards []*rankedShard, q query.Q) ([]*rankedShard, query.Q) { 374 and, ok := q.(*query.And) 375 if ok { 376 return doSelectRepoSet(shards, and) 377 } 378 379 // We have queries which look like (reposet ...) and we want to do the same 380 // optimizations. To simplify we just always wrap the query in And and then 381 // on the return value call Simplify to unwrap. In particular this is 382 // important for List calls. 383 and = &query.And{Children: []query.Q{q}} 384 shards, q = doSelectRepoSet(shards, and) 385 return shards, query.Simplify(q) 386} 387 388func doSelectRepoSet(shards []*rankedShard, and *query.And) ([]*rankedShard, query.Q) { 389 // (and (reposet ...) (q)) 390 // (and true (q)) with a filtered shards 391 // (and false) // noop 392 393 // (and (repobranches ...) (q)) 394 // (and (repobranches ...) (q)) 395 396 // Note: we also support (and (repo ...) (q)) even though sourcegraph does 397 // not generate those sorts of queries. This is to support manual testing. 398 399 hasReposForPredicate := func(pred func(repo *zoekt.Repository) bool) func(repos []*zoekt.Repository) (any, all bool) { 400 return func(repos []*zoekt.Repository) (any, all bool) { 401 any = false 402 all = true 403 for _, repo := range repos { 404 b := pred(repo) 405 any = any || b 406 all = all && b 407 } 408 return any, all 409 } 410 } 411 412 for i, c := range and.Children { 413 var setSize int 414 var hasRepos func([]*zoekt.Repository) (bool, bool) 415 switch setQuery := c.(type) { 416 case *query.RepoSet: 417 setSize = len(setQuery.Set) 418 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 419 return setQuery.Set[repo.Name] 420 }) 421 case *query.RepoIDs: 422 setSize = int(setQuery.Repos.GetCardinality()) 423 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 424 return setQuery.Repos.Contains(repo.ID) 425 }) 426 case *query.Repo: 427 setSize = 0 428 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 429 return setQuery.Regexp.MatchString(repo.Name) 430 }) 431 case *query.BranchesRepos: 432 for _, br := range setQuery.List { 433 setSize += int(br.Repos.GetCardinality()) 434 } 435 436 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 437 for _, br := range setQuery.List { 438 if br.Repos.Contains(repo.ID) { 439 return true 440 } 441 } 442 return false 443 }) 444 case *query.Meta: 445 // Meta queries filter repositories based on metadata fields. 446 // By checking this at the shard level, we can skip entire shards 447 // that don't contain any matching repositories, avoiding expensive 448 // I/O operations. 449 setSize = 0 // Unknown size, we'll filter based on metadata 450 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 451 if repo.Metadata == nil { 452 return false 453 } 454 v, ok := repo.Metadata[setQuery.Field] 455 if !ok { 456 return false 457 } 458 return setQuery.Value.MatchString(v) 459 }) 460 default: 461 continue 462 } 463 464 // setSize may be larger than the number of shards we have. The size of 465 // filtered is bounded by min(len(set), len(shards)) 466 if setSize > len(shards) { 467 setSize = len(shards) 468 } 469 470 filtered := make([]*rankedShard, 0, setSize) 471 filteredAll := true 472 473 for _, s := range shards { 474 if s.repos == nil { 475 // repos is nil if we failed to List the shard. This shouldn't 476 // happen, but if it does we don't know what is in it and must search 477 // it without simplifying the query. 478 filtered = append(filtered, s) 479 filteredAll = false 480 } else if any, all := hasRepos(s.repos); any { 481 filtered = append(filtered, s) 482 filteredAll = filteredAll && all 483 } 484 } 485 486 // We don't need to adjust the query since we are returning an empty set 487 // of shards to search. 488 if len(filtered) == 0 { 489 return filtered, and 490 } 491 492 // We can't simplify the query since we are searching shards which contain 493 // repos we aren't supposed to search. 494 if !filteredAll { 495 return filtered, and 496 } 497 498 // We don't want to mutate the original and, so we clone it before 499 // mutating it. 500 and = &query.And{Children: slices.Clone(and.Children)} 501 502 // This optimization allows us to avoid the work done by 503 // indexData.simplify for each shard. 504 // 505 // For example if our query is (and (reposet foo bar) (content baz)) 506 // then at this point filtered is [foo bar] and q is the same. For each 507 // shard indexData.simplify will simplify to (and true (content baz)) -> 508 // (content baz). This work can be done now once, rather than per shard. 509 switch c := c.(type) { 510 case *query.RepoSet, *query.RepoIDs, *query.Repo, *query.Meta: 511 and.Children[i] = &query.Const{Value: true} 512 return filtered, query.Simplify(and) 513 514 case *query.BranchesRepos: 515 // We can only replace if all the repos want the same branches. We 516 // simplify and just check that we are requesting 1 branch. The common 517 // case is just asking for HEAD, so this should be effective. 518 if len(c.List) != 1 { 519 return filtered, and 520 } 521 522 // Every repo wants the same branches, so we can replace RepoBranches 523 // with a list of branch queries. 524 and.Children[i] = &query.Branch{Pattern: c.List[0].Branch, Exact: true} 525 return filtered, query.Simplify(and) 526 } 527 528 // Stop after first RepoSet, otherwise we might append duplicate 529 // shards to `filtered` 530 return filtered, and 531 } 532 533 return shards, and 534} 535 536func (ss *shardedSearcher) Search(ctx context.Context, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) { 537 tr, ctx := trace.New(ctx, "shardedSearcher.Search", "") 538 tr.LazyLog(q, true) 539 tr.LazyPrintf("opts: %+v", opts) 540 defer func() { 541 if sr != nil { 542 tr.LazyPrintf("num files: %d", len(sr.Files)) 543 tr.LazyPrintf("stats: %+v", sr.Stats) 544 } 545 if err != nil { 546 tr.LazyPrintf("error: %v", err) 547 tr.SetError(err) 548 } 549 tr.Finish() 550 }() 551 ctx, cancel := context.WithCancel(ctx) 552 defer cancel() 553 554 collectSender := newCollectSender(opts) 555 556 start := time.Now() 557 proc, err := ss.sched.Acquire(ctx) 558 if err != nil { 559 return nil, err 560 } 561 defer proc.Release() 562 tr.LazyPrintf("acquired process") 563 564 wait := time.Since(start) 565 start = time.Now() 566 567 loaded := ss.getLoaded() 568 done, err := streamSearch(ctx, proc, q, opts, loaded.shards, collectSender) 569 defer done() 570 if err != nil { 571 return nil, err 572 } 573 574 aggregate, ok := collectSender.Done() 575 if !ok { 576 aggregate = &zoekt.SearchResult{ 577 RepoURLs: map[string]string{}, 578 LineFragments: map[string]string{}, 579 } 580 } 581 582 copyFiles(aggregate) 583 584 if !loaded.ready { 585 // We may have missed results due to not being fully loaded. 586 aggregate.Stats.Crashes++ 587 } 588 589 aggregate.Stats.Wait = wait 590 aggregate.Stats.Duration = time.Since(start) 591 592 return aggregate, nil 593} 594 595func (ss *shardedSearcher) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, sender zoekt.Sender) (err error) { 596 tr, ctx := trace.New(ctx, "shardedSearcher.StreamSearch", "") 597 defer func() { 598 if err != nil { 599 tr.LazyPrintf("error: %v", err) 600 tr.SetError(err) 601 } 602 tr.Finish() 603 }() 604 605 start := time.Now() 606 proc, err := ss.sched.Acquire(ctx) 607 if err != nil { 608 return err 609 } 610 defer proc.Release() 611 tr.LazyPrintf("acquired process") 612 613 loaded := ss.getLoaded() 614 shards := loaded.shards 615 616 maxPendingPriority := math.Inf(-1) 617 if len(shards) > 0 { 618 maxPendingPriority = shards[0].priority 619 } 620 621 stillLoadingCrashes := 0 622 if !loaded.ready { 623 // We may have missed results due to not being fully loaded. 624 stillLoadingCrashes++ 625 } 626 627 sender.Send(&zoekt.SearchResult{ 628 Stats: zoekt.Stats{ 629 Crashes: stillLoadingCrashes, 630 Wait: time.Since(start), 631 }, 632 Progress: zoekt.Progress{ 633 MaxPendingPriority: maxPendingPriority, 634 }, 635 }) 636 637 // Matches flow from the shards up the stack in the following order: 638 // 639 // 1. Search shards 640 // 2. flushCollectSender (aggregate) 641 // 3. limitSender (limit) 642 // 4. copyFileSender (copy) 643 // 644 // For streaming, the wrapping has to happen in the inverted order. 645 sender = copyFileSender(sender) 646 647 if truncator, hasLimits := index.NewDisplayTruncator(opts); hasLimits { 648 var cancel context.CancelFunc 649 ctx, cancel = context.WithCancel(ctx) 650 defer cancel() 651 sender = limitSender(cancel, sender, truncator) 652 } 653 654 sender, flush := newFlushCollectSender(opts, sender) 655 656 done, err := streamSearch(ctx, proc, q, opts, shards, sender) 657 658 // Even though streaming is done, we may have results sitting in a buffer we 659 // need to flush. So we need to send those before calling done. 660 flush() 661 done() 662 663 return err 664} 665 666// streamSearch is an internal helper since both Search and StreamSearch are 667// largely similar. 668// 669// done must always be called, even if err is non-nil. The SearchResults sent 670// via sender contain references to the underlying mmap data that the garbage 671// collector can't see. Calling done informs the garbage collector it is free 672// to collect those shards. The caller must call copyFiles on any 673// SearchResults it returns/streams out before calling done. 674func streamSearch(ctx context.Context, proc *process, q query.Q, opts *zoekt.SearchOptions, shards []*rankedShard, sender zoekt.Sender) (done func(), err error) { 675 tr, ctx := trace.New(ctx, "shardedSearcher.streamSearch", "") 676 overallStart := time.Now() 677 metricSearchRunning.Inc() 678 defer func() { 679 metricSearchRunning.Dec() 680 metricSearchDuration.Observe(time.Since(overallStart).Seconds()) 681 if err != nil { 682 metricSearchFailedTotal.Inc() 683 684 tr.LazyPrintf("error: %v", err) 685 tr.SetError(err) 686 } 687 tr.Finish() 688 }() 689 690 // Select the subset of shards that we will search over for the given query. 691 { 692 beforeLen := len(shards) 693 beforeQ := q 694 shards, q = selectRepoSet(shards, q) 695 tr.LazyPrintf("selectRepoSet shards=%d->%d q=%s->%s", beforeLen, len(shards), beforeQ, q) 696 } 697 698 if len(shards) == 0 { 699 return func() {}, nil 700 } 701 702 var cancel context.CancelFunc 703 if opts.MaxWallTime == 0 { 704 ctx, cancel = context.WithCancel(ctx) 705 } else { 706 ctx, cancel = context.WithTimeout(ctx, opts.MaxWallTime) 707 } 708 709 defer cancel() 710 711 // We set the number of workers to GOMAXPROCS, or the number of shards, 712 // whichever is smaller. 713 workers := min(runtime.GOMAXPROCS(0), len(shards)) 714 715 type result struct { 716 priority float64 717 *zoekt.SearchResult 718 err error 719 } 720 721 var ( 722 // buffered channels to continue searching when sending back results 723 // takes a while / blocks. The maximum pending result set is workers * 2. 724 results = make(chan *result, workers) 725 search = make(chan *rankedShard, workers) 726 wg sync.WaitGroup 727 ) 728 729 // Start workers that receive shards from the search channel, search them, 730 // and send the results to the results channel. This process is repeated 731 // until the search channel is closed. 732 // 733 // Note: Making "search" a buffered channel has the effect of limiting the number of parallel shard searches. 734 // Since searching is mostly CPU bound, limiting parallel shard searches also reduces the peak working set. 735 wg.Add(workers) 736 for range workers { 737 go func() { 738 defer wg.Done() 739 for s := range search { 740 sr, err := searchOneShard(ctx, s, q, opts) 741 r := &result{priority: s.priority, SearchResult: sr, err: err} 742 results <- r 743 } 744 }() 745 } 746 747 go func() { 748 wg.Wait() 749 close(results) 750 }() 751 752 var ( 753 pending = make(prioritySlice, 0, workers) 754 shard = 0 755 next = shards[shard] 756 757 // We need a separate nil-able reference to the same channel so we can close(search) for the worker 758 // go-routines to finish but also set work to nil in order for the select statement below to ignore 759 // that case when we want to stop a search. This is needed because sending on a closed channel panics. 760 work = search 761 ) 762 763 stop := func() { 764 if work != nil { 765 close(search) 766 work = nil 767 next = nil 768 } 769 } 770 771 // tracked so we can stop when we hit TotalMaxMatchCount 772 var totalMatchCount int 773 774search: 775 for { 776 // At the top of each iteration, have the proc associated with this search yield its won "timeslice" 777 // to possibly allow other searches to make progress 778 _ = proc.Yield(ctx) // Note: we let searchOneShard handle context errors 779 780 select { 781 case work <- next: // is there a worker available to search the next shard? 782 pending.append(next.priority) 783 784 shard++ 785 if shard == len(shards) { 786 stop() 787 } else { 788 next = shards[shard] 789 } 790 case r, ok := <-results: // is there a result to send back? 791 if !ok { 792 break search 793 } 794 795 // delete this result's priority from pending before computing the new max pending priority 796 pending.remove(r.priority) 797 798 if r.err != nil { 799 // Set final error and stop searching new shards, but consume any pending 800 // search results. 801 stop() 802 err = r.err 803 continue 804 } 805 806 // Update the match count statistics and stop searching new shards if we've 807 // reached the limit set in the options. 808 totalMatchCount += r.SearchResult.Stats.MatchCount 809 if opts.TotalMaxMatchCount > 0 && totalMatchCount > opts.TotalMaxMatchCount { 810 stop() 811 } 812 813 observeMetrics(r.SearchResult) 814 815 r.Priority = r.priority 816 r.MaxPendingPriority = pending.max() 817 818 sendByRepository(r.SearchResult, opts, sender) // send the result back to the client 819 } 820 } 821 822 return func() { runtime.KeepAlive(shards) }, err 823} 824 825// sendByRepository splits a zoekt.SearchResult by repository and calls 826// sender.Send for each batch. Ranking in Sourcegraph expects zoekt.SearchResult 827// to contain results with the same zoekt.SearchResult.priority only. 828// 829// We split by repository instead of by priority because it is easier to set 830// RepoURLs and LineFragments in zoekt.SearchResult. 831func sendByRepository(result *zoekt.SearchResult, opts *zoekt.SearchOptions, sender zoekt.Sender) { 832 if len(result.RepoURLs) <= 1 || len(result.Files) == 0 { 833 index.SortFiles(result.Files) 834 sender.Send(result) 835 return 836 } 837 838 send := func(repoURL string, a, b int, stats zoekt.Stats) { 839 index.SortFiles(result.Files[a:b]) 840 841 // FileMatch.Repository holds the repo's unique key (its URL), which is 842 // also the key used for RepoURLs/LineFragments. 843 filteredRepoURLs := map[string]string{repoURL: result.RepoURLs[repoURL]} 844 filteredLineFragments := map[string]string{repoURL: result.LineFragments[repoURL]} 845 846 // Filter RepoURLs and LineFragments to only those of repoName and its 847 // subRepositories if there are subRepositories 848 for _, file := range result.Files[a:b] { 849 name := file.SubRepositoryName 850 if name == "" { 851 continue 852 } 853 _, repoSet := filteredRepoURLs[name] 854 url, ok := result.RepoURLs[name] 855 if !repoSet && ok { 856 filteredRepoURLs[name] = url 857 } 858 _, fragSet := filteredLineFragments[name] 859 frag, ok := result.LineFragments[name] 860 if !fragSet && ok { 861 filteredLineFragments[name] = frag 862 } 863 } 864 865 sender.Send(&zoekt.SearchResult{ 866 Stats: stats, 867 Progress: zoekt.Progress{ 868 Priority: result.Files[a].RepositoryPriority, 869 MaxPendingPriority: result.MaxPendingPriority, 870 }, 871 Files: result.Files[a:b], 872 RepoURLs: filteredRepoURLs, 873 LineFragments: filteredLineFragments, 874 }) 875 } 876 877 var startIndex, endIndex int 878 curRepoID := result.Files[0].RepositoryID 879 curRepoURL := result.Files[0].Repository 880 881 fm := zoekt.FileMatch{} 882 for endIndex, fm = range result.Files { 883 if curRepoID != fm.RepositoryID { 884 // Stats must stay aggregate-able, hence we sent the aggregate stats with the 885 // last event. 886 send(curRepoURL, startIndex, endIndex, zoekt.Stats{}) 887 888 startIndex = endIndex 889 curRepoID = fm.RepositoryID 890 curRepoURL = fm.Repository 891 } 892 } 893 894 send(curRepoURL, startIndex, endIndex+1, result.Stats) 895} 896 897func observeMetrics(sr *zoekt.SearchResult) { 898 metricSearchContentBytesLoadedTotal.Add(float64(sr.Stats.ContentBytesLoaded)) 899 metricSearchIndexBytesLoadedTotal.Add(float64(sr.Stats.IndexBytesLoaded)) 900 metricSearchCrashesTotal.Add(float64(sr.Stats.Crashes)) 901 metricSearchFileCountTotal.Add(float64(sr.Stats.FileCount)) 902 metricSearchShardFilesConsideredTotal.Add(float64(sr.Stats.ShardFilesConsidered)) 903 metricSearchFilesConsideredTotal.Add(float64(sr.Stats.FilesConsidered)) 904 metricSearchFilesLoadedTotal.Add(float64(sr.Stats.FilesLoaded)) 905 metricSearchFilesSkippedTotal.Add(float64(sr.Stats.FilesSkipped)) 906 metricSearchShardsSkippedTotal.Add(float64(sr.Stats.ShardsSkipped)) 907 metricSearchMatchCountTotal.Add(float64(sr.Stats.MatchCount)) 908 metricSearchNgramMatchesTotal.Add(float64(sr.Stats.NgramMatches)) 909 metricSearchNgramLookupsTotal.Add(float64(sr.Stats.NgramLookups)) 910 metricSearchRegexpsConsideredTotal.Add(float64(sr.Stats.RegexpsConsidered)) 911} 912 913func copySlice(src *[]byte) { 914 if *src == nil { 915 return 916 } 917 dst := make([]byte, len(*src)) 918 copy(dst, *src) 919 *src = dst 920} 921 922func copyFiles(sr *zoekt.SearchResult) { 923 for i := range sr.Files { 924 copySlice(&sr.Files[i].Content) 925 copySlice(&sr.Files[i].Checksum) 926 for l := range sr.Files[i].LineMatches { 927 copySlice(&sr.Files[i].LineMatches[l].Line) 928 copySlice(&sr.Files[i].LineMatches[l].Before) 929 copySlice(&sr.Files[i].LineMatches[l].After) 930 } 931 for c := range sr.Files[i].ChunkMatches { 932 copySlice(&sr.Files[i].ChunkMatches[c].Content) 933 } 934 } 935} 936 937func logShardCrash(operation string, s zoekt.Searcher, q query.Q, recovered any, stack []byte) { 938 fields := []sglog.Field{ 939 sglog.String("operation", operation), 940 sglog.String("shard", s.String()), 941 sglog.String("query", q.String()), 942 sglog.String("stacktrace", string(stack)), 943 } 944 945 if err, ok := recovered.(error); ok { 946 fields = append(fields, sglog.Error(err)) 947 } else { 948 fields = append(fields, sglog.String("panic", fmt.Sprint(recovered))) 949 } 950 951 shardRecoveryLogger().Error("crashed shard", fields...) 952} 953 954func searchOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) { 955 metricSearchShardRunning.Inc() 956 defer func() { 957 metricSearchShardRunning.Dec() 958 if e := recover(); e != nil { 959 logShardCrash("search", s, q, e, debug.Stack()) 960 961 if sr == nil { 962 sr = &zoekt.SearchResult{} 963 } 964 sr.Stats.Crashes = 1 965 } 966 }() 967 968 return s.Search(ctx, q, opts) 969} 970 971type shardListResult struct { 972 rl *zoekt.RepoList 973 err error 974} 975 976func listOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.ListOptions, sink chan shardListResult) { 977 metricListShardRunning.Inc() 978 defer func() { 979 metricListShardRunning.Dec() 980 if r := recover(); r != nil { 981 logShardCrash("list", s, q, r, debug.Stack()) 982 sink <- shardListResult{ 983 &zoekt.RepoList{Crashes: 1}, nil, 984 } 985 } 986 }() 987 988 ms, err := s.List(ctx, q, opts) 989 sink <- shardListResult{ms, err} 990} 991 992func (ss *shardedSearcher) List(ctx context.Context, q query.Q, opts *zoekt.ListOptions) (rl *zoekt.RepoList, err error) { 993 tr, ctx := trace.New(ctx, "shardedSearcher.List", "") 994 metricListRunning.Inc() 995 defer func() { 996 metricListRunning.Dec() 997 if rl != nil { 998 tr.LazyPrintf("repos.size=%d reposmap.size=%d crashes=%d stats=%+v", len(rl.Repos), len(rl.ReposMap), rl.Crashes, rl.Stats) 999 } 1000 if err != nil { 1001 tr.LazyPrintf("error: %v", err) 1002 tr.SetError(err) 1003 } 1004 tr.Finish() 1005 }() 1006 1007 q = query.Simplify(q) 1008 isAll := false 1009 if c, ok := q.(*query.Const); ok { 1010 isAll = c.Value 1011 } 1012 1013 proc, err := ss.sched.Acquire(ctx) 1014 if err != nil { 1015 return nil, err 1016 } 1017 defer proc.Release() 1018 tr.LazyPrintf("acquired process") 1019 1020 loaded := ss.getLoaded() 1021 shards := loaded.shards 1022 1023 // Setup what we return now, since we may short circuit if there are no 1024 // shards to search. 1025 stillLoadingCrashes := 0 1026 if !loaded.ready { 1027 // We may have missed results due to not being fully loaded. 1028 stillLoadingCrashes++ 1029 } 1030 agg := zoekt.RepoList{ 1031 Crashes: stillLoadingCrashes, 1032 ReposMap: zoekt.ReposMap{}, 1033 Repos: []*zoekt.RepoListEntry{}, 1034 } 1035 1036 // PERF: Select the subset of shards that we will search over for the given 1037 // query. A common List query only asks for a specific repo, so this is an 1038 // important optimization. 1039 { 1040 beforeLen := len(shards) 1041 beforeQ := q 1042 shards, q = selectRepoSet(shards, q) 1043 tr.LazyPrintf("selectRepoSet shards=%d->%d q=%s->%s", beforeLen, len(shards), beforeQ, q) 1044 } 1045 1046 if len(shards) == 0 { 1047 return &agg, nil 1048 } 1049 1050 shardCount := len(shards) 1051 all := make(chan shardListResult, shardCount) 1052 feeder := make(chan zoekt.Searcher, len(shards)) 1053 for _, s := range shards { 1054 feeder <- s 1055 } 1056 close(feeder) 1057 1058 for range runtime.GOMAXPROCS(0) { 1059 go func() { 1060 for s := range feeder { 1061 listOneShard(ctx, s, q, opts, all) 1062 } 1063 }() 1064 } 1065 1066 uniq := map[string]*zoekt.RepoListEntry{} 1067 1068 for range shards { 1069 r := <-all 1070 if r.err != nil { 1071 return nil, r.err 1072 } 1073 1074 agg.Crashes += r.rl.Crashes 1075 agg.Stats.Add(&r.rl.Stats) 1076 1077 for _, r := range r.rl.Repos { 1078 prev, ok := uniq[r.Repository.URL] 1079 if !ok { 1080 cp := *r // We need to copy because we mutate r.Stats when merging duplicates 1081 uniq[r.Repository.URL] = &cp 1082 } else { 1083 prev.Stats.Add(&r.Stats) 1084 } 1085 } 1086 1087 for id, r := range r.rl.ReposMap { 1088 _, ok := agg.ReposMap[id] 1089 if !ok { 1090 agg.ReposMap[id] = r 1091 } 1092 } 1093 } 1094 1095 agg.Repos = make([]*zoekt.RepoListEntry, 0, len(uniq)) 1096 for _, r := range uniq { 1097 agg.Repos = append(agg.Repos, r) 1098 } 1099 1100 // Only one of these fields is populated and in all cases the size of that 1101 // field is the number of Repos. 1102 // 1103 // Note: we don't just add individual Stats.Repos since a repository can 1104 // have multiple shards. 1105 agg.Stats.Repos = len(uniq) + len(agg.ReposMap) 1106 1107 if isAll && len(agg.Repos) > 0 { 1108 reportListAllMetrics(agg.Repos) 1109 } 1110 1111 return &agg, nil 1112} 1113 1114func reportListAllMetrics(repos []*zoekt.RepoListEntry) { 1115 var stats zoekt.RepoStats 1116 for _, r := range repos { 1117 stats.Add(&r.Stats) 1118 } 1119 1120 metricListAllRepos.Set(float64(stats.Repos)) 1121 metricListAllIndexBytes.Set(float64(stats.IndexBytes)) 1122 metricListAllContentBytes.Set(float64(stats.ContentBytes)) 1123 metricListAllDocuments.Set(float64(stats.Documents)) 1124 metricListAllShards.Set(float64(stats.Shards)) 1125 metricListAllNewLinesCount.Set(float64(stats.NewLinesCount)) 1126 metricListAllDefaultBranchNewLinesCount.Set(float64(stats.DefaultBranchNewLinesCount)) 1127 metricListAllOtherBranchesNewLinesCount.Set(float64(stats.OtherBranchesNewLinesCount)) 1128} 1129 1130// getLoaded returns the currently loaded shards. Shared so do not mutate. 1131func (s *shardedSearcher) getLoaded() loaded { 1132 // next commit will store the true value of this, for now we keep the 1133 // backwards compatible behaviour. 1134 ready := s.ready.Load() 1135 // ranked is loaded after ready to avoid a race were ready is true but 1136 // ranked is still not the final set of shards. 1137 ranked, _ := s.ranked.Load().([]*rankedShard) 1138 return loaded{ 1139 shards: ranked, 1140 ready: ready, 1141 } 1142} 1143 1144func mkRankedShard(s zoekt.Searcher) *rankedShard { 1145 q := query.Const{Value: true} 1146 // We need to use WithUnsafeContext here, otherwise we cannot return a proper 1147 // rankedShard. On the user request path we use selectRepoSet which relies on 1148 // rankedShard.repos being set. 1149 result, err := s.List(systemtenant.WithUnsafeContext(context.Background()), &q, nil) 1150 if err != nil { 1151 log.Printf("[ERROR] mkRankedShard(%s): failed to cache repository list: %v", s, err) 1152 return &rankedShard{Searcher: s} 1153 } 1154 1155 var ( 1156 maxPriority float64 1157 repos = make([]*zoekt.Repository, 0, len(result.Repos)) 1158 ) 1159 for i := range result.Repos { 1160 repo := &result.Repos[i].Repository 1161 repos = append(repos, repo) 1162 if repo.RawConfig != nil { 1163 priority, _ := strconv.ParseFloat(repo.RawConfig["priority"], 64) 1164 if priority > maxPriority { 1165 maxPriority = priority 1166 } 1167 } 1168 } 1169 1170 return &rankedShard{ 1171 Searcher: s, 1172 repos: repos, 1173 priority: maxPriority, 1174 } 1175} 1176 1177// markReady should be called once all shards have been passed into replace on 1178// startup. Once s is marked as ready it stops reporting a Crash in the 1179// response Stats. 1180func (s *shardedSearcher) markReady() { 1181 s.ready.CompareAndSwap(false, true) 1182} 1183 1184func (s *shardedSearcher) replace(shards map[string]zoekt.Searcher) { 1185 if len(shards) == 0 { 1186 return 1187 } 1188 1189 defer func(began time.Time) { 1190 metricShardsBatchReplaceDurationSeconds.Observe(time.Since(began).Seconds()) 1191 }(time.Now()) 1192 1193 s.mu.Lock() 1194 defer s.mu.Unlock() 1195 1196 for key, shard := range shards { 1197 var r *rankedShard 1198 if shard != nil { 1199 r = mkRankedShard(shard) 1200 } 1201 1202 old := s.shards[key] 1203 if shard == nil { 1204 delete(s.shards, key) 1205 } else { 1206 s.shards[key] = r 1207 } 1208 1209 if old != nil && old.Searcher != nil { 1210 // _ ___ /^^\ /^\ /^^\_ 1211 // _ _@)@) \ ,,/ '` ~ `'~~ ', `\. 1212 // _/o\_ _ _ _/~`.`...'~\ ./~~..,'`','',.,' ' ~: 1213 // / `,'.~,~.~ . , . , ~|, ,/ .,' , ,. .. ,,. `, ~\_ 1214 // ( ' _' _ '_` _ ' . , `\_/ .' ..' ' ` ` `.. `, \_ 1215 // ~V~ V~ V~ V~ ~\ ` ' . ' , ' .,.,''`.,.''`.,.``. ', \_ 1216 // _/\ /\ /\ /\_/, . ' , `_/~\_ .' .,. ,, , _/~\_ `. `. '., \_ 1217 // < ~ ~ '~`'~'`, ., . `_: ::: \_ ' `_/ ::: \_ `.,' . ', \_ 1218 // \ ' `_ '`_ _ ',/ _::_::_ \ _ _/ _::_::_ \ `.,'.,`., \-,-,-,_,_, 1219 // `'~~ `'~~ `'~~ `'~~ \(_)(_)(_)/ `~~' \(_)(_)(_)/ ~'`\_.._,._,'_;_;_;_;_; 1220 // 1221 // We can't just call Close now, because there may be ongoing searches 1222 // which have old in the shards list. Previously we used an exclusive 1223 // lock to guarantee there were no concurrent searches. However, that 1224 // led to blocking on the read path. 1225 // 1226 // We could introduce granular locking per rankedShard to know when 1227 // there are no more references. However, this becomes tricky in 1228 // practice. Instead we rely on the garbage collector noticing old is no 1229 // longer used. We take care in our searchers to runtime.KeepAlive until 1230 // we have stopped referencing the underling mmap data. 1231 runtime.SetFinalizer(old, func(r *rankedShard) { 1232 r.Close() 1233 }) 1234 } 1235 } 1236 1237 ranked := make([]*rankedShard, 0, len(s.shards)) 1238 for _, r := range s.shards { 1239 ranked = append(ranked, r) 1240 } 1241 1242 sort.Slice(ranked, func(i, j int) bool { 1243 priorityDiff := ranked[i].priority - ranked[j].priority 1244 if priorityDiff != 0 { 1245 return priorityDiff > 0 1246 } 1247 if len(ranked[i].repos) == 0 || len(ranked[j].repos) == 0 { 1248 // Protect against empty names which can happen if we fail to List or 1249 // the shard is full of tombstones. Prefer the shard which has names. 1250 return len(ranked[i].repos) >= len(ranked[j].repos) 1251 } 1252 return ranked[i].repos[0].Name < ranked[j].repos[0].Name 1253 }) 1254 1255 s.ranked.Store(ranked) 1256 1257 metricShardsLoaded.Set(float64(len(ranked))) 1258} 1259 1260func loadShard(fn string) (zoekt.Searcher, error) { 1261 f, err := os.Open(fn) 1262 if err != nil { 1263 return nil, err 1264 } 1265 1266 iFile, err := index.NewIndexFile(f) 1267 if err != nil { 1268 return nil, err 1269 } 1270 s, err := index.NewSearcher(iFile) 1271 if err != nil { 1272 iFile.Close() 1273 return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err) 1274 } 1275 1276 return s, nil 1277} 1278 1279// prioritySlice is a trivial implementation of an array that provides three 1280// things: appending a value, removing a value, and getting the array's max. 1281// Operations take O(n) time, which is acceptable because N is restricted to 1282// GOMAXPROCS (i.e., number of cpu cores) by the shardedSearcher interface. 1283type prioritySlice []float64 1284 1285func (p *prioritySlice) append(pri float64) { 1286 *p = append(*p, pri) 1287} 1288 1289func (p *prioritySlice) remove(pri float64) { 1290 for i, opri := range *p { 1291 if opri == pri { 1292 if i != len(*p)-1 { 1293 // swap to make this element the tail 1294 (*p)[i] = (*p)[len(*p)-1] 1295 } 1296 // pop the end off 1297 *p = (*p)[:len(*p)-1] 1298 break 1299 } 1300 } 1301} 1302 1303func (p *prioritySlice) max() float64 { 1304 // remove() and max() could be combined, but this is easier to read and 1305 // the expected performance difference from the extra lock and loop is 1306 // almost certainly irrelevant. 1307 maxPri := math.Inf(-1) 1308 for _, pri := range *p { 1309 if pri > maxPri { 1310 maxPri = pri 1311 } 1312 } 1313 return maxPri 1314}