fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package shards 16 17import ( 18 "context" 19 "fmt" 20 "log" 21 "math" 22 "os" 23 "runtime" 24 "runtime/debug" 25 "slices" 26 "sort" 27 "strconv" 28 "sync" 29 "time" 30 31 "github.com/sourcegraph/zoekt/index" 32 "golang.org/x/sync/semaphore" 33 34 "github.com/prometheus/client_golang/prometheus" 35 "github.com/prometheus/client_golang/prometheus/promauto" 36 "go.uber.org/atomic" 37 38 "github.com/sourcegraph/zoekt" 39 "github.com/sourcegraph/zoekt/internal/tenant/systemtenant" 40 "github.com/sourcegraph/zoekt/internal/trace" 41 "github.com/sourcegraph/zoekt/query" 42) 43 44var ( 45 metricShardsLoaded = promauto.NewGauge(prometheus.GaugeOpts{ 46 Name: "zoekt_shards_loaded", 47 Help: "The number of shards currently loaded", 48 }) 49 metricShardsLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 50 Name: "zoekt_shards_loaded_total", 51 Help: "The total number of shards loaded", 52 }) 53 metricShardsLoadFailedTotal = promauto.NewCounter(prometheus.CounterOpts{ 54 Name: "zoekt_shards_load_failed_total", 55 Help: "The total number of shard loads that failed", 56 }) 57 58 metricSearchRunning = promauto.NewGauge(prometheus.GaugeOpts{ 59 Name: "zoekt_search_running", 60 Help: "The number of concurrent search requests running", 61 }) 62 metricSearchShardRunning = promauto.NewGauge(prometheus.GaugeOpts{ 63 Name: "zoekt_search_shard_running", 64 Help: "The number of concurrent search requests in a shard running", 65 }) 66 metricSearchFailedTotal = promauto.NewCounter(prometheus.CounterOpts{ 67 Name: "zoekt_search_failed_total", 68 Help: "The total number of search requests that failed", 69 }) 70 metricSearchDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 71 Name: "zoekt_search_duration_seconds", 72 Help: "The duration a search request took in seconds", 73 Buckets: prometheus.DefBuckets, // DefBuckets good for service timings 74 }) 75 76 // A Counter per Stat. Name should match field in zoekt.Stats. 77 metricSearchContentBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 78 Name: "zoekt_search_content_loaded_bytes_total", 79 Help: "Total amount of I/O for reading contents", 80 }) 81 metricSearchIndexBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 82 Name: "zoekt_search_index_loaded_bytes_total", 83 Help: "Total amount of I/O for reading from index", 84 }) 85 metricSearchCrashesTotal = promauto.NewCounter(prometheus.CounterOpts{ 86 Name: "zoekt_search_crashes_total", 87 Help: "Total number of search shards that had a crash", 88 }) 89 metricSearchFileCountTotal = promauto.NewCounter(prometheus.CounterOpts{ 90 Name: "zoekt_search_file_count_total", 91 Help: "Total number of files containing a match", 92 }) 93 metricSearchShardFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{ 94 Name: "zoekt_search_shard_files_considered_total", 95 Help: "Total number of files in shards that we considered", 96 }) 97 metricSearchFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{ 98 Name: "zoekt_search_files_considered_total", 99 Help: "Total files that we evaluated. Equivalent to files for which all atom matches (including negations) evaluated to true", 100 }) 101 metricSearchFilesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 102 Name: "zoekt_search_files_loaded_total", 103 Help: "Total files for which we loaded file content to verify substring matches", 104 }) 105 metricSearchFilesSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{ 106 Name: "zoekt_search_files_skipped_total", 107 Help: "Total candidate files whose contents weren't examined because we gathered enough matches", 108 }) 109 metricSearchShardsSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{ 110 Name: "zoekt_search_shards_skipped_total", 111 Help: "Total shards that we did not process because a query was canceled", 112 }) 113 metricSearchMatchCountTotal = promauto.NewCounter(prometheus.CounterOpts{ 114 Name: "zoekt_search_match_count_total", 115 Help: "Total number of non-overlapping matches", 116 }) 117 metricSearchNgramMatchesTotal = promauto.NewCounter(prometheus.CounterOpts{ 118 Name: "zoekt_search_ngram_matches_total", 119 Help: "Total number of candidate matches as a result of searching ngrams", 120 }) 121 metricSearchNgramLookupsTotal = promauto.NewCounter(prometheus.CounterOpts{ 122 Name: "zoekt_search_ngram_lookups_total", 123 Help: "Total number of times we accessed an ngram in the index", 124 }) 125 metricSearchRegexpsConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{ 126 Name: "zoekt_search_regexps_considered_total", 127 Help: "Total number of times regexp was called on files that we evaluated", 128 }) 129 130 metricListRunning = promauto.NewGauge(prometheus.GaugeOpts{ 131 Name: "zoekt_list_running", 132 Help: "The number of concurrent list requests running", 133 }) 134 metricListShardRunning = promauto.NewGauge(prometheus.GaugeOpts{ 135 Name: "zoekt_list_shard_running", 136 Help: "The number of concurrent list requests in a shard running", 137 }) 138 metricShardsBatchReplaceDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{ 139 Name: "zoekt_shards_batch_replace_duration_seconds", 140 Help: "The time it takes to replace a batch of Searchers.", 141 Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30}, 142 }) 143 metricListAllRepos = promauto.NewGauge(prometheus.GaugeOpts{ 144 Name: "zoekt_list_all_stats_repos", 145 Help: "The last List(true) value for RepoStats.Repos. Repos is used for aggregrating the number of repositories.", 146 }) 147 metricListAllShards = promauto.NewGauge(prometheus.GaugeOpts{ 148 Name: "zoekt_list_all_stats_shards", 149 Help: "The last List(true) value for RepoStats.Shards. Shards is the total number of search shards.", 150 }) 151 metricListAllDocuments = promauto.NewGauge(prometheus.GaugeOpts{ 152 Name: "zoekt_list_all_stats_documents", 153 Help: "The last List(true) value for RepoStats.Documents. Documents holds the number of documents or files.", 154 }) 155 metricListAllIndexBytes = promauto.NewGauge(prometheus.GaugeOpts{ 156 Name: "zoekt_list_all_stats_index_bytes", 157 Help: "The last List(true) value for RepoStats.IndexBytes. IndexBytes is the amount of RAM used for index overhead.", 158 }) 159 metricListAllContentBytes = promauto.NewGauge(prometheus.GaugeOpts{ 160 Name: "zoekt_list_all_stats_content_bytes", 161 Help: "The last List(true) value for RepoStats.ContentBytes. ContentBytes is the amount of RAM used for raw content.", 162 }) 163 metricListAllNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 164 Name: "zoekt_list_all_stats_new_lines_count", 165 Help: "The last List(true) value for RepoStats.NewLinesCount.", 166 }) 167 metricListAllDefaultBranchNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 168 Name: "zoekt_list_all_stats_default_branch_new_lines_count", 169 Help: "The last List(true) value for RepoStats.DefaultBranchNewLinesCount.", 170 }) 171 metricListAllOtherBranchesNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 172 Name: "zoekt_list_all_stats_other_branches_new_lines_count", 173 Help: "The last List(true) value for RepoStats.OtherBranchesNewLinesCount.", 174 }) 175) 176 177type rankedShard struct { 178 zoekt.Searcher 179 180 priority float64 // maximum priority across all repos in the shard 181 182 // We have out of band ranking on compound shards which can change even if 183 // the shard file does not. So we compute a rank in getShards. We store 184 // repos here to avoid the cost of List in the search request path. 185 // 186 // repos is nil only if that call failed. 187 repos []*zoekt.Repository 188} 189 190// loaded stores the state we compute when updating the state of shards from 191// disk. 192type loaded struct { 193 // shards is the currently loaded shards sorted by decreasing rank and 194 // should not be mutated. 195 shards []*rankedShard 196 197 // ready is true if sharded searcher has finished loading all initial 198 // shards on startup. 199 ready bool 200} 201 202type shardedSearcher struct { 203 // Limit the number of parallel queries. Since searching is 204 // CPU bound, we can't do better than #CPU queries in 205 // parallel. If we do so, we just create more memory 206 // pressure. 207 sched scheduler 208 209 mu sync.Mutex // protects writes to shards 210 shards map[string]*rankedShard 211 212 ready atomic.Bool 213 ranked atomic.Value 214} 215 216func newShardedSearcher(n int64) *shardedSearcher { 217 ss := &shardedSearcher{ 218 shards: make(map[string]*rankedShard), 219 sched: newScheduler(n), 220 } 221 return ss 222} 223 224// NewDirectorySearcher returns a searcher instance that loads all 225// shards corresponding to a glob into memory. 226func NewDirectorySearcher(dir string) (zoekt.Streamer, error) { 227 return newDirectorySearcher(dir, true) 228} 229 230// NewDirectorySearcherFast is like NewDirectorySearcher, but does not block 231// on the initial loading of shards. 232// 233// This exists since in the case of zoekt-webserver we are happy with having 234// partial availability since that is better than no availability on large 235// instances. 236func NewDirectorySearcherFast(dir string) (zoekt.Streamer, error) { 237 return newDirectorySearcher(dir, false) 238} 239 240func newDirectorySearcher(dir string, waitUntilReady bool) (zoekt.Streamer, error) { 241 ss := newShardedSearcher(int64(runtime.GOMAXPROCS(0))) 242 tl := &loader{ 243 ss: ss, 244 } 245 dw, err := newDirectoryWatcher(dir, tl) 246 if err != nil { 247 return nil, err 248 } 249 250 if waitUntilReady { 251 if err := dw.WaitUntilReady(); err != nil { 252 return nil, err 253 } 254 } 255 256 ds := &directorySearcher{ 257 Streamer: ss, 258 directoryWatcher: dw, 259 } 260 261 return &typeRepoSearcher{Streamer: ds}, nil 262} 263 264type directorySearcher struct { 265 zoekt.Streamer 266 267 directoryWatcher *DirectoryWatcher 268} 269 270func (s *directorySearcher) Close() { 271 // We need to Stop directoryWatcher first since it calls load/unload on 272 // Searcher. 273 s.directoryWatcher.Stop() 274 s.Streamer.Close() 275} 276 277type loader struct { 278 ss *shardedSearcher 279} 280 281func (tl *loader) load(keys ...string) { 282 // This is called with all keys on startup, so once this function has 283 // finished running shardedSearcher will be ready. 284 defer tl.ss.markReady() 285 286 if len(keys) == 0 { 287 // If there's nothing to load, we exit early here, but we want to mark 288 // ourselves as ready. 289 return 290 } 291 292 var ( 293 mu sync.Mutex // synchronizes writes to the shards map 294 wg sync.WaitGroup // used to wait for all shards to load 295 sem = semaphore.NewWeighted(int64(runtime.GOMAXPROCS(0))) 296 loadedShards = make(map[string]zoekt.Searcher) 297 ) 298 299 publishLoaded := func() { 300 mu.Lock() 301 chunk := loadedShards 302 loadedShards = make(map[string]zoekt.Searcher) 303 mu.Unlock() 304 tl.ss.replace(chunk) 305 } 306 307 log.Printf("[INFO] loading %d shard(s): %s", len(keys), humanTruncateList(keys, 5)) 308 309 lastProgress := time.Now() 310 for i, key := range keys { 311 // If taking a while to start-up occasionally give a progress message 312 if time.Since(lastProgress) > 5*time.Second { 313 log.Printf("[INFO] still need to load %d shards...", len(keys)-i) 314 lastProgress = time.Now() 315 316 publishLoaded() 317 } 318 319 _ = sem.Acquire(context.Background(), 1) 320 wg.Add(1) 321 322 go func(key string) { 323 defer sem.Release(1) 324 defer wg.Done() 325 326 shard, err := loadShard(key) 327 if err != nil { 328 metricShardsLoadFailedTotal.Inc() 329 log.Printf("[ERROR] reloading: %s, err %v ", key, err) 330 return 331 } 332 metricShardsLoadedTotal.Inc() 333 334 mu.Lock() 335 loadedShards[key] = shard 336 mu.Unlock() 337 }(key) 338 } 339 340 wg.Wait() 341 342 publishLoaded() 343} 344 345func (tl *loader) drop(keys ...string) { 346 shards := make(map[string]zoekt.Searcher, len(keys)) 347 for _, key := range keys { 348 shards[key] = nil 349 } 350 tl.ss.replace(shards) 351} 352 353func (ss *shardedSearcher) String() string { 354 return "shardedSearcher" 355} 356 357// Close closes references to open files. It may be called only once. 358func (ss *shardedSearcher) Close() { 359 ss.mu.Lock() 360 shards := make(map[string]zoekt.Searcher, len(ss.shards)) 361 for k := range ss.shards { 362 shards[k] = nil 363 } 364 ss.mu.Unlock() 365 366 ss.replace(shards) 367} 368 369func selectRepoSet(shards []*rankedShard, q query.Q) ([]*rankedShard, query.Q) { 370 and, ok := q.(*query.And) 371 if ok { 372 return doSelectRepoSet(shards, and) 373 } 374 375 // We have queries which look like (reposet ...) and we want to do the same 376 // optimizations. To simplify we just always wrap the query in And and then 377 // on the return value call Simplify to unwrap. In particular this is 378 // important for List calls. 379 and = &query.And{Children: []query.Q{q}} 380 shards, q = doSelectRepoSet(shards, and) 381 return shards, query.Simplify(q) 382} 383 384func doSelectRepoSet(shards []*rankedShard, and *query.And) ([]*rankedShard, query.Q) { 385 // (and (reposet ...) (q)) 386 // (and true (q)) with a filtered shards 387 // (and false) // noop 388 389 // (and (repobranches ...) (q)) 390 // (and (repobranches ...) (q)) 391 392 // Note: we also support (and (repo ...) (q)) even though sourcegraph does 393 // not generate those sorts of queries. This is to support manual testing. 394 395 hasReposForPredicate := func(pred func(repo *zoekt.Repository) bool) func(repos []*zoekt.Repository) (any, all bool) { 396 return func(repos []*zoekt.Repository) (any, all bool) { 397 any = false 398 all = true 399 for _, repo := range repos { 400 b := pred(repo) 401 any = any || b 402 all = all && b 403 } 404 return any, all 405 } 406 } 407 408 for i, c := range and.Children { 409 var setSize int 410 var hasRepos func([]*zoekt.Repository) (bool, bool) 411 switch setQuery := c.(type) { 412 case *query.RepoSet: 413 setSize = len(setQuery.Set) 414 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 415 return setQuery.Set[repo.Name] 416 }) 417 case *query.RepoIDs: 418 setSize = int(setQuery.Repos.GetCardinality()) 419 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 420 return setQuery.Repos.Contains(repo.ID) 421 }) 422 case *query.Repo: 423 setSize = 0 424 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 425 return setQuery.Regexp.MatchString(repo.Name) 426 }) 427 case *query.BranchesRepos: 428 for _, br := range setQuery.List { 429 setSize += int(br.Repos.GetCardinality()) 430 } 431 432 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 433 for _, br := range setQuery.List { 434 if br.Repos.Contains(repo.ID) { 435 return true 436 } 437 } 438 return false 439 }) 440 default: 441 continue 442 } 443 444 // setSize may be larger than the number of shards we have. The size of 445 // filtered is bounded by min(len(set), len(shards)) 446 if setSize > len(shards) { 447 setSize = len(shards) 448 } 449 450 filtered := make([]*rankedShard, 0, setSize) 451 filteredAll := true 452 453 for _, s := range shards { 454 if s.repos == nil { 455 // repos is nil if we failed to List the shard. This shouldn't 456 // happen, but if it does we don't know what is in it and must search 457 // it without simplifying the query. 458 filtered = append(filtered, s) 459 filteredAll = false 460 } else if any, all := hasRepos(s.repos); any { 461 filtered = append(filtered, s) 462 filteredAll = filteredAll && all 463 } 464 } 465 466 // We don't need to adjust the query since we are returning an empty set 467 // of shards to search. 468 if len(filtered) == 0 { 469 return filtered, and 470 } 471 472 // We can't simplify the query since we are searching shards which contain 473 // repos we aren't supposed to search. 474 if !filteredAll { 475 return filtered, and 476 } 477 478 // We don't want to mutate the original and, so we clone it before 479 // mutating it. 480 and = &query.And{Children: slices.Clone(and.Children)} 481 482 // This optimization allows us to avoid the work done by 483 // indexData.simplify for each shard. 484 // 485 // For example if our query is (and (reposet foo bar) (content baz)) 486 // then at this point filtered is [foo bar] and q is the same. For each 487 // shard indexData.simplify will simplify to (and true (content baz)) -> 488 // (content baz). This work can be done now once, rather than per shard. 489 switch c := c.(type) { 490 case *query.RepoSet, *query.RepoIDs, *query.Repo: 491 and.Children[i] = &query.Const{Value: true} 492 return filtered, query.Simplify(and) 493 494 case *query.BranchesRepos: 495 // We can only replace if all the repos want the same branches. We 496 // simplify and just check that we are requesting 1 branch. The common 497 // case is just asking for HEAD, so this should be effective. 498 if len(c.List) != 1 { 499 return filtered, and 500 } 501 502 // Every repo wants the same branches, so we can replace RepoBranches 503 // with a list of branch queries. 504 and.Children[i] = &query.Branch{Pattern: c.List[0].Branch, Exact: true} 505 return filtered, query.Simplify(and) 506 } 507 508 // Stop after first RepoSet, otherwise we might append duplicate 509 // shards to `filtered` 510 return filtered, and 511 } 512 513 return shards, and 514} 515 516func (ss *shardedSearcher) Search(ctx context.Context, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) { 517 tr, ctx := trace.New(ctx, "shardedSearcher.Search", "") 518 tr.LazyLog(q, true) 519 tr.LazyPrintf("opts: %+v", opts) 520 defer func() { 521 if sr != nil { 522 tr.LazyPrintf("num files: %d", len(sr.Files)) 523 tr.LazyPrintf("stats: %+v", sr.Stats) 524 } 525 if err != nil { 526 tr.LazyPrintf("error: %v", err) 527 tr.SetError(err) 528 } 529 tr.Finish() 530 }() 531 ctx, cancel := context.WithCancel(ctx) 532 defer cancel() 533 534 collectSender := newCollectSender(opts) 535 536 start := time.Now() 537 proc, err := ss.sched.Acquire(ctx) 538 if err != nil { 539 return nil, err 540 } 541 defer proc.Release() 542 tr.LazyPrintf("acquired process") 543 544 wait := time.Since(start) 545 start = time.Now() 546 547 loaded := ss.getLoaded() 548 done, err := streamSearch(ctx, proc, q, opts, loaded.shards, collectSender) 549 defer done() 550 if err != nil { 551 return nil, err 552 } 553 554 aggregate, ok := collectSender.Done() 555 if !ok { 556 aggregate = &zoekt.SearchResult{ 557 RepoURLs: map[string]string{}, 558 LineFragments: map[string]string{}, 559 } 560 } 561 562 copyFiles(aggregate) 563 564 if !loaded.ready { 565 // We may have missed results due to not being fully loaded. 566 aggregate.Stats.Crashes++ 567 } 568 569 aggregate.Stats.Wait = wait 570 aggregate.Stats.Duration = time.Since(start) 571 572 return aggregate, nil 573} 574 575func (ss *shardedSearcher) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, sender zoekt.Sender) (err error) { 576 tr, ctx := trace.New(ctx, "shardedSearcher.StreamSearch", "") 577 defer func() { 578 if err != nil { 579 tr.LazyPrintf("error: %v", err) 580 tr.SetError(err) 581 } 582 tr.Finish() 583 }() 584 585 start := time.Now() 586 proc, err := ss.sched.Acquire(ctx) 587 if err != nil { 588 return err 589 } 590 defer proc.Release() 591 tr.LazyPrintf("acquired process") 592 593 loaded := ss.getLoaded() 594 shards := loaded.shards 595 596 maxPendingPriority := math.Inf(-1) 597 if len(shards) > 0 { 598 maxPendingPriority = shards[0].priority 599 } 600 601 stillLoadingCrashes := 0 602 if !loaded.ready { 603 // We may have missed results due to not being fully loaded. 604 stillLoadingCrashes++ 605 } 606 607 sender.Send(&zoekt.SearchResult{ 608 Stats: zoekt.Stats{ 609 Crashes: stillLoadingCrashes, 610 Wait: time.Since(start), 611 }, 612 Progress: zoekt.Progress{ 613 MaxPendingPriority: maxPendingPriority, 614 }, 615 }) 616 617 // Matches flow from the shards up the stack in the following order: 618 // 619 // 1. Search shards 620 // 2. flushCollectSender (aggregate) 621 // 3. limitSender (limit) 622 // 4. copyFileSender (copy) 623 // 624 // For streaming, the wrapping has to happen in the inverted order. 625 sender = copyFileSender(sender) 626 627 if truncator, hasLimits := index.NewDisplayTruncator(opts); hasLimits { 628 var cancel context.CancelFunc 629 ctx, cancel = context.WithCancel(ctx) 630 defer cancel() 631 sender = limitSender(cancel, sender, truncator) 632 } 633 634 sender, flush := newFlushCollectSender(opts, sender) 635 636 done, err := streamSearch(ctx, proc, q, opts, shards, sender) 637 638 // Even though streaming is done, we may have results sitting in a buffer we 639 // need to flush. So we need to send those before calling done. 640 flush() 641 done() 642 643 return err 644} 645 646// streamSearch is an internal helper since both Search and StreamSearch are 647// largely similar. 648// 649// done must always be called, even if err is non-nil. The SearchResults sent 650// via sender contain references to the underlying mmap data that the garbage 651// collector can't see. Calling done informs the garbage collector it is free 652// to collect those shards. The caller must call copyFiles on any 653// SearchResults it returns/streams out before calling done. 654func streamSearch(ctx context.Context, proc *process, q query.Q, opts *zoekt.SearchOptions, shards []*rankedShard, sender zoekt.Sender) (done func(), err error) { 655 tr, ctx := trace.New(ctx, "shardedSearcher.streamSearch", "") 656 overallStart := time.Now() 657 metricSearchRunning.Inc() 658 defer func() { 659 metricSearchRunning.Dec() 660 metricSearchDuration.Observe(time.Since(overallStart).Seconds()) 661 if err != nil { 662 metricSearchFailedTotal.Inc() 663 664 tr.LazyPrintf("error: %v", err) 665 tr.SetError(err) 666 } 667 tr.Finish() 668 }() 669 670 // Select the subset of shards that we will search over for the given query. 671 { 672 beforeLen := len(shards) 673 beforeQ := q 674 shards, q = selectRepoSet(shards, q) 675 tr.LazyPrintf("selectRepoSet shards=%d->%d q=%s->%s", beforeLen, len(shards), beforeQ, q) 676 } 677 678 if len(shards) == 0 { 679 return func() {}, nil 680 } 681 682 var cancel context.CancelFunc 683 if opts.MaxWallTime == 0 { 684 ctx, cancel = context.WithCancel(ctx) 685 } else { 686 ctx, cancel = context.WithTimeout(ctx, opts.MaxWallTime) 687 } 688 689 defer cancel() 690 691 // We set the number of workers to GOMAXPROCS, or the number of shards, 692 // whichever is smaller. 693 workers := runtime.GOMAXPROCS(0) 694 if workers > len(shards) { 695 workers = len(shards) 696 } 697 698 type result struct { 699 priority float64 700 *zoekt.SearchResult 701 err error 702 } 703 704 var ( 705 // buffered channels to continue searching when sending back results 706 // takes a while / blocks. The maximum pending result set is workers * 2. 707 results = make(chan *result, workers) 708 search = make(chan *rankedShard, workers) 709 wg sync.WaitGroup 710 ) 711 712 // Start workers that receive shards from the search channel, search them, 713 // and send the results to the results channel. This process is repeated 714 // until the search channel is closed. 715 // 716 // Note: Making "search" a buffered channel has the effect of limiting the number of parallel shard searches. 717 // Since searching is mostly CPU bound, limiting parallel shard searches also reduces the peak working set. 718 wg.Add(workers) 719 for i := 0; i < workers; i++ { 720 go func() { 721 defer wg.Done() 722 for s := range search { 723 sr, err := searchOneShard(ctx, s, q, opts) 724 r := &result{priority: s.priority, SearchResult: sr, err: err} 725 results <- r 726 } 727 }() 728 } 729 730 go func() { 731 wg.Wait() 732 close(results) 733 }() 734 735 var ( 736 pending = make(prioritySlice, 0, workers) 737 shard = 0 738 next = shards[shard] 739 740 // We need a separate nil-able reference to the same channel so we can close(search) for the worker 741 // go-routines to finish but also set work to nil in order for the select statement below to ignore 742 // that case when we want to stop a search. This is needed because sending on a closed channel panics. 743 work = search 744 ) 745 746 stop := func() { 747 if work != nil { 748 close(search) 749 work = nil 750 next = nil 751 } 752 } 753 754 // tracked so we can stop when we hit TotalMaxMatchCount 755 var totalMatchCount int 756 757search: 758 for { 759 // At the top of each iteration, have the proc associated with this search yield its won "timeslice" 760 // to possibly allow other searches to make progress 761 _ = proc.Yield(ctx) // Note: we let searchOneShard handle context errors 762 763 select { 764 case work <- next: // is there a worker available to search the next shard? 765 pending.append(next.priority) 766 767 shard++ 768 if shard == len(shards) { 769 stop() 770 } else { 771 next = shards[shard] 772 } 773 case r, ok := <-results: // is there a result to send back? 774 if !ok { 775 break search 776 } 777 778 // delete this result's priority from pending before computing the new max pending priority 779 pending.remove(r.priority) 780 781 if r.err != nil { 782 // Set final error and stop searching new shards, but consume any pending 783 // search results. 784 stop() 785 err = r.err 786 continue 787 } 788 789 // Update the match count statistics and stop searching new shards if we've 790 // reached the limit set in the options. 791 totalMatchCount += r.SearchResult.Stats.MatchCount 792 if opts.TotalMaxMatchCount > 0 && totalMatchCount > opts.TotalMaxMatchCount { 793 stop() 794 } 795 796 observeMetrics(r.SearchResult) 797 798 r.Priority = r.priority 799 r.MaxPendingPriority = pending.max() 800 801 sendByRepository(r.SearchResult, opts, sender) // send the result back to the client 802 } 803 } 804 805 return func() { runtime.KeepAlive(shards) }, err 806} 807 808// sendByRepository splits a zoekt.SearchResult by repository and calls 809// sender.Send for each batch. Ranking in Sourcegraph expects zoekt.SearchResult 810// to contain results with the same zoekt.SearchResult.priority only. 811// 812// We split by repository instead of by priority because it is easier to set 813// RepoURLs and LineFragments in zoekt.SearchResult. 814func sendByRepository(result *zoekt.SearchResult, opts *zoekt.SearchOptions, sender zoekt.Sender) { 815 if len(result.RepoURLs) <= 1 || len(result.Files) == 0 { 816 index.SortFiles(result.Files) 817 sender.Send(result) 818 return 819 } 820 821 send := func(repoName string, a, b int, stats zoekt.Stats) { 822 index.SortFiles(result.Files[a:b]) 823 sender.Send(&zoekt.SearchResult{ 824 Stats: stats, 825 Progress: zoekt.Progress{ 826 Priority: result.Files[a].RepositoryPriority, 827 MaxPendingPriority: result.MaxPendingPriority, 828 }, 829 Files: result.Files[a:b], 830 RepoURLs: map[string]string{repoName: result.RepoURLs[repoName]}, 831 LineFragments: map[string]string{repoName: result.LineFragments[repoName]}, 832 }) 833 } 834 835 var startIndex, endIndex int 836 curRepoID := result.Files[0].RepositoryID 837 curRepoName := result.Files[0].Repository 838 839 fm := zoekt.FileMatch{} 840 for endIndex, fm = range result.Files { 841 if curRepoID != fm.RepositoryID { 842 // Stats must stay aggregate-able, hence we sent the aggregate stats with the 843 // last event. 844 send(curRepoName, startIndex, endIndex, zoekt.Stats{}) 845 846 startIndex = endIndex 847 curRepoID = fm.RepositoryID 848 curRepoName = fm.Repository 849 } 850 } 851 852 send(curRepoName, startIndex, endIndex+1, result.Stats) 853} 854 855func observeMetrics(sr *zoekt.SearchResult) { 856 metricSearchContentBytesLoadedTotal.Add(float64(sr.Stats.ContentBytesLoaded)) 857 metricSearchIndexBytesLoadedTotal.Add(float64(sr.Stats.IndexBytesLoaded)) 858 metricSearchCrashesTotal.Add(float64(sr.Stats.Crashes)) 859 metricSearchFileCountTotal.Add(float64(sr.Stats.FileCount)) 860 metricSearchShardFilesConsideredTotal.Add(float64(sr.Stats.ShardFilesConsidered)) 861 metricSearchFilesConsideredTotal.Add(float64(sr.Stats.FilesConsidered)) 862 metricSearchFilesLoadedTotal.Add(float64(sr.Stats.FilesLoaded)) 863 metricSearchFilesSkippedTotal.Add(float64(sr.Stats.FilesSkipped)) 864 metricSearchShardsSkippedTotal.Add(float64(sr.Stats.ShardsSkipped)) 865 metricSearchMatchCountTotal.Add(float64(sr.Stats.MatchCount)) 866 metricSearchNgramMatchesTotal.Add(float64(sr.Stats.NgramMatches)) 867 metricSearchNgramLookupsTotal.Add(float64(sr.Stats.NgramLookups)) 868 metricSearchRegexpsConsideredTotal.Add(float64(sr.Stats.RegexpsConsidered)) 869} 870 871func copySlice(src *[]byte) { 872 if *src == nil { 873 return 874 } 875 dst := make([]byte, len(*src)) 876 copy(dst, *src) 877 *src = dst 878} 879 880func copyFiles(sr *zoekt.SearchResult) { 881 for i := range sr.Files { 882 copySlice(&sr.Files[i].Content) 883 copySlice(&sr.Files[i].Checksum) 884 for l := range sr.Files[i].LineMatches { 885 copySlice(&sr.Files[i].LineMatches[l].Line) 886 copySlice(&sr.Files[i].LineMatches[l].Before) 887 copySlice(&sr.Files[i].LineMatches[l].After) 888 } 889 for c := range sr.Files[i].ChunkMatches { 890 copySlice(&sr.Files[i].ChunkMatches[c].Content) 891 } 892 } 893} 894 895func searchOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) { 896 metricSearchShardRunning.Inc() 897 defer func() { 898 metricSearchShardRunning.Dec() 899 if e := recover(); e != nil { 900 log.Printf("[ERROR] crashed shard: %s: %#v, %s", s, e, debug.Stack()) 901 902 if sr == nil { 903 sr = &zoekt.SearchResult{} 904 } 905 sr.Stats.Crashes = 1 906 } 907 }() 908 909 return s.Search(ctx, q, opts) 910} 911 912type shardListResult struct { 913 rl *zoekt.RepoList 914 err error 915} 916 917func listOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.ListOptions, sink chan shardListResult) { 918 metricListShardRunning.Inc() 919 defer func() { 920 metricListShardRunning.Dec() 921 if r := recover(); r != nil { 922 log.Printf("[ERROR] crashed shard: %s: %s, %s", s.String(), r, debug.Stack()) 923 sink <- shardListResult{ 924 &zoekt.RepoList{Crashes: 1}, nil, 925 } 926 } 927 }() 928 929 ms, err := s.List(ctx, q, opts) 930 sink <- shardListResult{ms, err} 931} 932 933func (ss *shardedSearcher) List(ctx context.Context, q query.Q, opts *zoekt.ListOptions) (rl *zoekt.RepoList, err error) { 934 tr, ctx := trace.New(ctx, "shardedSearcher.List", "") 935 metricListRunning.Inc() 936 defer func() { 937 metricListRunning.Dec() 938 if rl != nil { 939 tr.LazyPrintf("repos.size=%d reposmap.size=%d crashes=%d stats=%+v", len(rl.Repos), len(rl.ReposMap), rl.Crashes, rl.Stats) 940 } 941 if err != nil { 942 tr.LazyPrintf("error: %v", err) 943 tr.SetError(err) 944 } 945 tr.Finish() 946 }() 947 948 q = query.Simplify(q) 949 isAll := false 950 if c, ok := q.(*query.Const); ok { 951 isAll = c.Value 952 } 953 954 proc, err := ss.sched.Acquire(ctx) 955 if err != nil { 956 return nil, err 957 } 958 defer proc.Release() 959 tr.LazyPrintf("acquired process") 960 961 loaded := ss.getLoaded() 962 shards := loaded.shards 963 964 // Setup what we return now, since we may short circuit if there are no 965 // shards to search. 966 stillLoadingCrashes := 0 967 if !loaded.ready { 968 // We may have missed results due to not being fully loaded. 969 stillLoadingCrashes++ 970 } 971 agg := zoekt.RepoList{ 972 Crashes: stillLoadingCrashes, 973 ReposMap: zoekt.ReposMap{}, 974 Repos: []*zoekt.RepoListEntry{}, 975 } 976 977 // PERF: Select the subset of shards that we will search over for the given 978 // query. A common List query only asks for a specific repo, so this is an 979 // important optimization. 980 { 981 beforeLen := len(shards) 982 beforeQ := q 983 shards, q = selectRepoSet(shards, q) 984 tr.LazyPrintf("selectRepoSet shards=%d->%d q=%s->%s", beforeLen, len(shards), beforeQ, q) 985 } 986 987 if len(shards) == 0 { 988 return &agg, nil 989 } 990 991 shardCount := len(shards) 992 all := make(chan shardListResult, shardCount) 993 feeder := make(chan zoekt.Searcher, len(shards)) 994 for _, s := range shards { 995 feeder <- s 996 } 997 close(feeder) 998 999 for i := 0; i < runtime.GOMAXPROCS(0); i++ { 1000 go func() { 1001 for s := range feeder { 1002 listOneShard(ctx, s, q, opts, all) 1003 } 1004 }() 1005 } 1006 1007 uniq := map[string]*zoekt.RepoListEntry{} 1008 1009 for range shards { 1010 r := <-all 1011 if r.err != nil { 1012 return nil, r.err 1013 } 1014 1015 agg.Crashes += r.rl.Crashes 1016 agg.Stats.Add(&r.rl.Stats) 1017 1018 for _, r := range r.rl.Repos { 1019 prev, ok := uniq[r.Repository.Name] 1020 if !ok { 1021 cp := *r // We need to copy because we mutate r.Stats when merging duplicates 1022 uniq[r.Repository.Name] = &cp 1023 } else { 1024 prev.Stats.Add(&r.Stats) 1025 } 1026 } 1027 1028 for id, r := range r.rl.ReposMap { 1029 _, ok := agg.ReposMap[id] 1030 if !ok { 1031 agg.ReposMap[id] = r 1032 } 1033 } 1034 } 1035 1036 agg.Repos = make([]*zoekt.RepoListEntry, 0, len(uniq)) 1037 for _, r := range uniq { 1038 agg.Repos = append(agg.Repos, r) 1039 } 1040 1041 // Only one of these fields is populated and in all cases the size of that 1042 // field is the number of Repos. 1043 // 1044 // Note: we don't just add individual Stats.Repos since a repository can 1045 // have multiple shards. 1046 agg.Stats.Repos = len(uniq) + len(agg.ReposMap) 1047 1048 if isAll && len(agg.Repos) > 0 { 1049 reportListAllMetrics(agg.Repos) 1050 } 1051 1052 return &agg, nil 1053} 1054 1055func reportListAllMetrics(repos []*zoekt.RepoListEntry) { 1056 var stats zoekt.RepoStats 1057 for _, r := range repos { 1058 stats.Add(&r.Stats) 1059 } 1060 1061 metricListAllRepos.Set(float64(stats.Repos)) 1062 metricListAllIndexBytes.Set(float64(stats.IndexBytes)) 1063 metricListAllContentBytes.Set(float64(stats.ContentBytes)) 1064 metricListAllDocuments.Set(float64(stats.Documents)) 1065 metricListAllShards.Set(float64(stats.Shards)) 1066 metricListAllNewLinesCount.Set(float64(stats.NewLinesCount)) 1067 metricListAllDefaultBranchNewLinesCount.Set(float64(stats.DefaultBranchNewLinesCount)) 1068 metricListAllOtherBranchesNewLinesCount.Set(float64(stats.OtherBranchesNewLinesCount)) 1069} 1070 1071// getLoaded returns the currently loaded shards. Shared so do not mutate. 1072func (s *shardedSearcher) getLoaded() loaded { 1073 // next commit will store the true value of this, for now we keep the 1074 // backwards compatible behaviour. 1075 ready := s.ready.Load() 1076 // ranked is loaded after ready to avoid a race were ready is true but 1077 // ranked is still not the final set of shards. 1078 ranked, _ := s.ranked.Load().([]*rankedShard) 1079 return loaded{ 1080 shards: ranked, 1081 ready: ready, 1082 } 1083} 1084 1085func mkRankedShard(s zoekt.Searcher) *rankedShard { 1086 q := query.Const{Value: true} 1087 // We need to use WithUnsafeContext here, otherwise we cannot return a proper 1088 // rankedShard. On the user request path we use selectRepoSet which relies on 1089 // rankedShard.repos being set. 1090 result, err := s.List(systemtenant.WithUnsafeContext(context.Background()), &q, nil) 1091 if err != nil { 1092 log.Printf("[ERROR] mkRankedShard(%s): failed to cache repository list: %v", s, err) 1093 return &rankedShard{Searcher: s} 1094 } 1095 1096 var ( 1097 maxPriority float64 1098 repos = make([]*zoekt.Repository, 0, len(result.Repos)) 1099 ) 1100 for i := range result.Repos { 1101 repo := &result.Repos[i].Repository 1102 repos = append(repos, repo) 1103 if repo.RawConfig != nil { 1104 priority, _ := strconv.ParseFloat(repo.RawConfig["priority"], 64) 1105 if priority > maxPriority { 1106 maxPriority = priority 1107 } 1108 } 1109 } 1110 1111 return &rankedShard{ 1112 Searcher: s, 1113 repos: repos, 1114 priority: maxPriority, 1115 } 1116} 1117 1118// markReady should be called once all shards have been passed into replace on 1119// startup. Once s is marked as ready it stops reporting a Crash in the 1120// response Stats. 1121func (s *shardedSearcher) markReady() { 1122 s.ready.CompareAndSwap(false, true) 1123} 1124 1125func (s *shardedSearcher) replace(shards map[string]zoekt.Searcher) { 1126 if len(shards) == 0 { 1127 return 1128 } 1129 1130 defer func(began time.Time) { 1131 metricShardsBatchReplaceDurationSeconds.Observe(time.Since(began).Seconds()) 1132 }(time.Now()) 1133 1134 s.mu.Lock() 1135 defer s.mu.Unlock() 1136 1137 for key, shard := range shards { 1138 var r *rankedShard 1139 if shard != nil { 1140 r = mkRankedShard(shard) 1141 } 1142 1143 old := s.shards[key] 1144 if shard == nil { 1145 delete(s.shards, key) 1146 } else { 1147 s.shards[key] = r 1148 } 1149 1150 if old != nil && old.Searcher != nil { 1151 // _ ___ /^^\ /^\ /^^\_ 1152 // _ _@)@) \ ,,/ '` ~ `'~~ ', `\. 1153 // _/o\_ _ _ _/~`.`...'~\ ./~~..,'`','',.,' ' ~: 1154 // / `,'.~,~.~ . , . , ~|, ,/ .,' , ,. .. ,,. `, ~\_ 1155 // ( ' _' _ '_` _ ' . , `\_/ .' ..' ' ` ` `.. `, \_ 1156 // ~V~ V~ V~ V~ ~\ ` ' . ' , ' .,.,''`.,.''`.,.``. ', \_ 1157 // _/\ /\ /\ /\_/, . ' , `_/~\_ .' .,. ,, , _/~\_ `. `. '., \_ 1158 // < ~ ~ '~`'~'`, ., . `_: ::: \_ ' `_/ ::: \_ `.,' . ', \_ 1159 // \ ' `_ '`_ _ ',/ _::_::_ \ _ _/ _::_::_ \ `.,'.,`., \-,-,-,_,_, 1160 // `'~~ `'~~ `'~~ `'~~ \(_)(_)(_)/ `~~' \(_)(_)(_)/ ~'`\_.._,._,'_;_;_;_;_; 1161 // 1162 // We can't just call Close now, because there may be ongoing searches 1163 // which have old in the shards list. Previously we used an exclusive 1164 // lock to guarantee there were no concurrent searches. However, that 1165 // led to blocking on the read path. 1166 // 1167 // We could introduce granular locking per rankedShard to know when 1168 // there are no more references. However, this becomes tricky in 1169 // practice. Instead we rely on the garbage collector noticing old is no 1170 // longer used. We take care in our searchers to runtime.KeepAlive until 1171 // we have stopped referencing the underling mmap data. 1172 runtime.SetFinalizer(old, func(r *rankedShard) { 1173 r.Close() 1174 }) 1175 } 1176 } 1177 1178 ranked := make([]*rankedShard, 0, len(s.shards)) 1179 for _, r := range s.shards { 1180 ranked = append(ranked, r) 1181 } 1182 1183 sort.Slice(ranked, func(i, j int) bool { 1184 priorityDiff := ranked[i].priority - ranked[j].priority 1185 if priorityDiff != 0 { 1186 return priorityDiff > 0 1187 } 1188 if len(ranked[i].repos) == 0 || len(ranked[j].repos) == 0 { 1189 // Protect against empty names which can happen if we fail to List or 1190 // the shard is full of tombstones. Prefer the shard which has names. 1191 return len(ranked[i].repos) >= len(ranked[j].repos) 1192 } 1193 return ranked[i].repos[0].Name < ranked[j].repos[0].Name 1194 }) 1195 1196 s.ranked.Store(ranked) 1197 1198 metricShardsLoaded.Set(float64(len(ranked))) 1199} 1200 1201func loadShard(fn string) (zoekt.Searcher, error) { 1202 f, err := os.Open(fn) 1203 if err != nil { 1204 return nil, err 1205 } 1206 1207 iFile, err := index.NewIndexFile(f) 1208 if err != nil { 1209 return nil, err 1210 } 1211 s, err := index.NewSearcher(iFile) 1212 if err != nil { 1213 iFile.Close() 1214 return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err) 1215 } 1216 1217 return s, nil 1218} 1219 1220// prioritySlice is a trivial implementation of an array that provides three 1221// things: appending a value, removing a value, and getting the array's max. 1222// Operations take O(n) time, which is acceptable because N is restricted to 1223// GOMAXPROCS (i.e., number of cpu cores) by the shardedSearcher interface. 1224type prioritySlice []float64 1225 1226func (p *prioritySlice) append(pri float64) { 1227 *p = append(*p, pri) 1228} 1229 1230func (p *prioritySlice) remove(pri float64) { 1231 for i, opri := range *p { 1232 if opri == pri { 1233 if i != len(*p)-1 { 1234 // swap to make this element the tail 1235 (*p)[i] = (*p)[len(*p)-1] 1236 } 1237 // pop the end off 1238 *p = (*p)[:len(*p)-1] 1239 break 1240 } 1241 } 1242} 1243 1244func (p *prioritySlice) max() float64 { 1245 // remove() and max() could be combined, but this is easier to read and 1246 // the expected performance difference from the extra lock and loop is 1247 // almost certainly irrelevant. 1248 maxPri := math.Inf(-1) 1249 for _, pri := range *p { 1250 if pri > maxPri { 1251 maxPri = pri 1252 } 1253 } 1254 return maxPri 1255}