fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package shards 16 17import ( 18 "context" 19 "fmt" 20 "log" 21 "math" 22 "os" 23 "runtime" 24 "runtime/debug" 25 "slices" 26 "sort" 27 "strconv" 28 "sync" 29 "time" 30 31 "golang.org/x/sync/semaphore" 32 33 "github.com/prometheus/client_golang/prometheus" 34 "github.com/prometheus/client_golang/prometheus/promauto" 35 "go.uber.org/atomic" 36 37 "github.com/sourcegraph/zoekt" 38 "github.com/sourcegraph/zoekt/query" 39 "github.com/sourcegraph/zoekt/trace" 40) 41 42var ( 43 metricShardsLoaded = promauto.NewGauge(prometheus.GaugeOpts{ 44 Name: "zoekt_shards_loaded", 45 Help: "The number of shards currently loaded", 46 }) 47 metricShardsLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 48 Name: "zoekt_shards_loaded_total", 49 Help: "The total number of shards loaded", 50 }) 51 metricShardsLoadFailedTotal = promauto.NewCounter(prometheus.CounterOpts{ 52 Name: "zoekt_shards_load_failed_total", 53 Help: "The total number of shard loads that failed", 54 }) 55 56 metricSearchRunning = promauto.NewGauge(prometheus.GaugeOpts{ 57 Name: "zoekt_search_running", 58 Help: "The number of concurrent search requests running", 59 }) 60 metricSearchShardRunning = promauto.NewGauge(prometheus.GaugeOpts{ 61 Name: "zoekt_search_shard_running", 62 Help: "The number of concurrent search requests in a shard running", 63 }) 64 metricSearchFailedTotal = promauto.NewCounter(prometheus.CounterOpts{ 65 Name: "zoekt_search_failed_total", 66 Help: "The total number of search requests that failed", 67 }) 68 metricSearchDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 69 Name: "zoekt_search_duration_seconds", 70 Help: "The duration a search request took in seconds", 71 Buckets: prometheus.DefBuckets, // DefBuckets good for service timings 72 }) 73 74 // A Counter per Stat. Name should match field in zoekt.Stats. 75 metricSearchContentBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 76 Name: "zoekt_search_content_loaded_bytes_total", 77 Help: "Total amount of I/O for reading contents", 78 }) 79 metricSearchIndexBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 80 Name: "zoekt_search_index_loaded_bytes_total", 81 Help: "Total amount of I/O for reading from index", 82 }) 83 metricSearchCrashesTotal = promauto.NewCounter(prometheus.CounterOpts{ 84 Name: "zoekt_search_crashes_total", 85 Help: "Total number of search shards that had a crash", 86 }) 87 metricSearchFileCountTotal = promauto.NewCounter(prometheus.CounterOpts{ 88 Name: "zoekt_search_file_count_total", 89 Help: "Total number of files containing a match", 90 }) 91 metricSearchShardFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{ 92 Name: "zoekt_search_shard_files_considered_total", 93 Help: "Total number of files in shards that we considered", 94 }) 95 metricSearchFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{ 96 Name: "zoekt_search_files_considered_total", 97 Help: "Total files that we evaluated. Equivalent to files for which all atom matches (including negations) evaluated to true", 98 }) 99 metricSearchFilesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 100 Name: "zoekt_search_files_loaded_total", 101 Help: "Total files for which we loaded file content to verify substring matches", 102 }) 103 metricSearchFilesSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{ 104 Name: "zoekt_search_files_skipped_total", 105 Help: "Total candidate files whose contents weren't examined because we gathered enough matches", 106 }) 107 metricSearchShardsSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{ 108 Name: "zoekt_search_shards_skipped_total", 109 Help: "Total shards that we did not process because a query was canceled", 110 }) 111 metricSearchMatchCountTotal = promauto.NewCounter(prometheus.CounterOpts{ 112 Name: "zoekt_search_match_count_total", 113 Help: "Total number of non-overlapping matches", 114 }) 115 metricSearchNgramMatchesTotal = promauto.NewCounter(prometheus.CounterOpts{ 116 Name: "zoekt_search_ngram_matches_total", 117 Help: "Total number of candidate matches as a result of searching ngrams", 118 }) 119 metricSearchNgramLookupsTotal = promauto.NewCounter(prometheus.CounterOpts{ 120 Name: "zoekt_search_ngram_lookups_total", 121 Help: "Total number of times we accessed an ngram in the index", 122 }) 123 metricSearchRegexpsConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{ 124 Name: "zoekt_search_regexps_considered_total", 125 Help: "Total number of times regexp was called on files that we evaluated", 126 }) 127 128 metricListRunning = promauto.NewGauge(prometheus.GaugeOpts{ 129 Name: "zoekt_list_running", 130 Help: "The number of concurrent list requests running", 131 }) 132 metricListShardRunning = promauto.NewGauge(prometheus.GaugeOpts{ 133 Name: "zoekt_list_shard_running", 134 Help: "The number of concurrent list requests in a shard running", 135 }) 136 metricShardsBatchReplaceDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{ 137 Name: "zoekt_shards_batch_replace_duration_seconds", 138 Help: "The time it takes to replace a batch of Searchers.", 139 Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30}, 140 }) 141 metricListAllRepos = promauto.NewGauge(prometheus.GaugeOpts{ 142 Name: "zoekt_list_all_stats_repos", 143 Help: "The last List(true) value for RepoStats.Repos. Repos is used for aggregrating the number of repositories.", 144 }) 145 metricListAllShards = promauto.NewGauge(prometheus.GaugeOpts{ 146 Name: "zoekt_list_all_stats_shards", 147 Help: "The last List(true) value for RepoStats.Shards. Shards is the total number of search shards.", 148 }) 149 metricListAllDocuments = promauto.NewGauge(prometheus.GaugeOpts{ 150 Name: "zoekt_list_all_stats_documents", 151 Help: "The last List(true) value for RepoStats.Documents. Documents holds the number of documents or files.", 152 }) 153 metricListAllIndexBytes = promauto.NewGauge(prometheus.GaugeOpts{ 154 Name: "zoekt_list_all_stats_index_bytes", 155 Help: "The last List(true) value for RepoStats.IndexBytes. IndexBytes is the amount of RAM used for index overhead.", 156 }) 157 metricListAllContentBytes = promauto.NewGauge(prometheus.GaugeOpts{ 158 Name: "zoekt_list_all_stats_content_bytes", 159 Help: "The last List(true) value for RepoStats.ContentBytes. ContentBytes is the amount of RAM used for raw content.", 160 }) 161 metricListAllNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 162 Name: "zoekt_list_all_stats_new_lines_count", 163 Help: "The last List(true) value for RepoStats.NewLinesCount.", 164 }) 165 metricListAllDefaultBranchNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 166 Name: "zoekt_list_all_stats_default_branch_new_lines_count", 167 Help: "The last List(true) value for RepoStats.DefaultBranchNewLinesCount.", 168 }) 169 metricListAllOtherBranchesNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 170 Name: "zoekt_list_all_stats_other_branches_new_lines_count", 171 Help: "The last List(true) value for RepoStats.OtherBranchesNewLinesCount.", 172 }) 173) 174 175type rankedShard struct { 176 zoekt.Searcher 177 178 priority float64 // maximum priority across all repos in the shard 179 180 // We have out of band ranking on compound shards which can change even if 181 // the shard file does not. So we compute a rank in getShards. We store 182 // repos here to avoid the cost of List in the search request path. 183 repos []*zoekt.Repository 184} 185 186// loaded stores the state we compute when updating the state of shards from 187// disk. 188type loaded struct { 189 // shards is the currently loaded shards sorted by decreasing rank and 190 // should not be mutated. 191 shards []*rankedShard 192 193 // ready is true if sharded searcher has finished loading all initial 194 // shards on startup. 195 ready bool 196} 197 198type shardedSearcher struct { 199 // Limit the number of parallel queries. Since searching is 200 // CPU bound, we can't do better than #CPU queries in 201 // parallel. If we do so, we just create more memory 202 // pressure. 203 sched scheduler 204 205 mu sync.Mutex // protects writes to shards 206 shards map[string]*rankedShard 207 208 ready atomic.Bool 209 ranked atomic.Value 210} 211 212func newShardedSearcher(n int64) *shardedSearcher { 213 ss := &shardedSearcher{ 214 shards: make(map[string]*rankedShard), 215 sched: newScheduler(n), 216 } 217 return ss 218} 219 220// NewDirectorySearcher returns a searcher instance that loads all 221// shards corresponding to a glob into memory. 222func NewDirectorySearcher(dir string) (zoekt.Streamer, error) { 223 return newDirectorySearcher(dir, true) 224} 225 226// NewDirectorySearcherFast is like NewDirectorySearcher, but does not block 227// on the initial loading of shards. 228// 229// This exists since in the case of zoekt-webserver we are happy with having 230// partial availability since that is better than no availability on large 231// instances. 232func NewDirectorySearcherFast(dir string) (zoekt.Streamer, error) { 233 return newDirectorySearcher(dir, false) 234} 235 236func newDirectorySearcher(dir string, waitUntilReady bool) (zoekt.Streamer, error) { 237 ss := newShardedSearcher(int64(runtime.GOMAXPROCS(0))) 238 tl := &loader{ 239 ss: ss, 240 } 241 dw, err := newDirectoryWatcher(dir, tl) 242 if err != nil { 243 return nil, err 244 } 245 246 if waitUntilReady { 247 if err := dw.WaitUntilReady(); err != nil { 248 return nil, err 249 } 250 } 251 252 ds := &directorySearcher{ 253 Streamer: ss, 254 directoryWatcher: dw, 255 } 256 257 return &typeRepoSearcher{Streamer: ds}, nil 258} 259 260type directorySearcher struct { 261 zoekt.Streamer 262 263 directoryWatcher *DirectoryWatcher 264} 265 266func (s *directorySearcher) Close() { 267 // We need to Stop directoryWatcher first since it calls load/unload on 268 // Searcher. 269 s.directoryWatcher.Stop() 270 s.Streamer.Close() 271} 272 273type loader struct { 274 ss *shardedSearcher 275} 276 277func (tl *loader) load(keys ...string) { 278 // This is called with all keys on startup, so once this function has 279 // finished running shardedSearcher will be ready. 280 defer tl.ss.markReady() 281 282 if len(keys) == 0 { 283 // If there's nothing to load, we exit early here, but we want to mark 284 // ourselves as ready. 285 return 286 } 287 288 var ( 289 mu sync.Mutex // synchronizes writes to the shards map 290 wg sync.WaitGroup // used to wait for all shards to load 291 sem = semaphore.NewWeighted(int64(runtime.GOMAXPROCS(0))) 292 loadedShards = make(map[string]zoekt.Searcher) 293 ) 294 295 publishLoaded := func() { 296 mu.Lock() 297 chunk := loadedShards 298 loadedShards = make(map[string]zoekt.Searcher) 299 mu.Unlock() 300 tl.ss.replace(chunk) 301 } 302 303 log.Printf("loading %d shard(s): %s", len(keys), humanTruncateList(keys, 5)) 304 305 lastProgress := time.Now() 306 for i, key := range keys { 307 // If taking a while to start-up occasionally give a progress message 308 if time.Since(lastProgress) > 5*time.Second { 309 log.Printf("still need to load %d shards...", len(keys)-i) 310 lastProgress = time.Now() 311 312 publishLoaded() 313 } 314 315 _ = sem.Acquire(context.Background(), 1) 316 wg.Add(1) 317 318 go func(key string) { 319 defer sem.Release(1) 320 defer wg.Done() 321 322 shard, err := loadShard(key) 323 if err != nil { 324 metricShardsLoadFailedTotal.Inc() 325 log.Printf("reloading: %s, err %v ", key, err) 326 return 327 } 328 metricShardsLoadedTotal.Inc() 329 330 mu.Lock() 331 loadedShards[key] = shard 332 mu.Unlock() 333 }(key) 334 } 335 336 wg.Wait() 337 338 publishLoaded() 339} 340 341func (tl *loader) drop(keys ...string) { 342 shards := make(map[string]zoekt.Searcher, len(keys)) 343 for _, key := range keys { 344 shards[key] = nil 345 } 346 tl.ss.replace(shards) 347} 348 349func (ss *shardedSearcher) String() string { 350 return "shardedSearcher" 351} 352 353// Close closes references to open files. It may be called only once. 354func (ss *shardedSearcher) Close() { 355 ss.mu.Lock() 356 shards := make(map[string]zoekt.Searcher, len(ss.shards)) 357 for k := range ss.shards { 358 shards[k] = nil 359 } 360 ss.mu.Unlock() 361 362 ss.replace(shards) 363} 364 365func selectRepoSet(shards []*rankedShard, q query.Q) ([]*rankedShard, query.Q) { 366 and, ok := q.(*query.And) 367 if ok { 368 return doSelectRepoSet(shards, and) 369 } 370 371 // We have queries which look like (reposet ...) and we want to do the same 372 // optimizations. To simplify we just always wrap the query in And and then 373 // on the return value call Simplify to unwrap. In particular this is 374 // important for List calls. 375 and = &query.And{Children: []query.Q{q}} 376 shards, q = doSelectRepoSet(shards, and) 377 return shards, query.Simplify(q) 378} 379 380func doSelectRepoSet(shards []*rankedShard, and *query.And) ([]*rankedShard, query.Q) { 381 // (and (reposet ...) (q)) 382 // (and true (q)) with a filtered shards 383 // (and false) // noop 384 385 // (and (repobranches ...) (q)) 386 // (and (repobranches ...) (q)) 387 388 // Note: we also support (and (repo ...) (q)) even though sourcegraph does 389 // not generate those sorts of queries. This is to support manual testing. 390 391 hasReposForPredicate := func(pred func(repo *zoekt.Repository) bool) func(repos []*zoekt.Repository) (any, all bool) { 392 return func(repos []*zoekt.Repository) (any, all bool) { 393 any = false 394 all = true 395 for _, repo := range repos { 396 b := pred(repo) 397 any = any || b 398 all = all && b 399 } 400 return any, all 401 } 402 } 403 404 for i, c := range and.Children { 405 var setSize int 406 var hasRepos func([]*zoekt.Repository) (bool, bool) 407 switch setQuery := c.(type) { 408 case *query.RepoSet: 409 setSize = len(setQuery.Set) 410 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 411 return setQuery.Set[repo.Name] 412 }) 413 case *query.RepoIDs: 414 setSize = int(setQuery.Repos.GetCardinality()) 415 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 416 return setQuery.Repos.Contains(repo.ID) 417 }) 418 case *query.Repo: 419 setSize = 0 420 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 421 return setQuery.Regexp.MatchString(repo.Name) 422 }) 423 case *query.BranchesRepos: 424 for _, br := range setQuery.List { 425 setSize += int(br.Repos.GetCardinality()) 426 } 427 428 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 429 for _, br := range setQuery.List { 430 if br.Repos.Contains(repo.ID) { 431 return true 432 } 433 } 434 return false 435 }) 436 default: 437 continue 438 } 439 440 // setSize may be larger than the number of shards we have. The size of 441 // filtered is bounded by min(len(set), len(shards)) 442 if setSize > len(shards) { 443 setSize = len(shards) 444 } 445 446 filtered := make([]*rankedShard, 0, setSize) 447 filteredAll := true 448 449 for _, s := range shards { 450 if any, all := hasRepos(s.repos); any { 451 filtered = append(filtered, s) 452 filteredAll = filteredAll && all 453 } 454 } 455 456 // We don't need to adjust the query since we are returning an empty set 457 // of shards to search. 458 if len(filtered) == 0 { 459 return filtered, and 460 } 461 462 // We can't simplify the query since we are searching shards which contain 463 // repos we aren't supposed to search. 464 if !filteredAll { 465 return filtered, and 466 } 467 468 // We don't want to mutate the original and, so we clone it before 469 // mutating it. 470 and = &query.And{Children: slices.Clone(and.Children)} 471 472 // This optimization allows us to avoid the work done by 473 // indexData.simplify for each shard. 474 // 475 // For example if our query is (and (reposet foo bar) (content baz)) 476 // then at this point filtered is [foo bar] and q is the same. For each 477 // shard indexData.simplify will simplify to (and true (content baz)) -> 478 // (content baz). This work can be done now once, rather than per shard. 479 switch c := c.(type) { 480 case *query.RepoSet, *query.RepoIDs, *query.Repo: 481 and.Children[i] = &query.Const{Value: true} 482 return filtered, query.Simplify(and) 483 484 case *query.BranchesRepos: 485 // We can only replace if all the repos want the same branches. We 486 // simplify and just check that we are requesting 1 branch. The common 487 // case is just asking for HEAD, so this should be effective. 488 if len(c.List) != 1 { 489 return filtered, and 490 } 491 492 // Every repo wants the same branches, so we can replace RepoBranches 493 // with a list of branch queries. 494 and.Children[i] = &query.Branch{Pattern: c.List[0].Branch, Exact: true} 495 return filtered, query.Simplify(and) 496 } 497 498 // Stop after first RepoSet, otherwise we might append duplicate 499 // shards to `filtered` 500 return filtered, and 501 } 502 503 return shards, and 504} 505 506func (ss *shardedSearcher) Search(ctx context.Context, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) { 507 tr, ctx := trace.New(ctx, "shardedSearcher.Search", "") 508 defer func() { 509 tr.Finish() 510 }() 511 ctx, cancel := context.WithCancel(ctx) 512 defer cancel() 513 514 collectSender := newCollectSender(opts) 515 516 start := time.Now() 517 proc, err := ss.sched.Acquire(ctx) 518 if err != nil { 519 return nil, err 520 } 521 defer proc.Release() 522 tr.LazyPrintf("acquired process") 523 524 wait := time.Since(start) 525 start = time.Now() 526 527 loaded := ss.getLoaded() 528 done, err := streamSearch(ctx, proc, q, opts, loaded.shards, collectSender) 529 defer done() 530 if err != nil { 531 return nil, err 532 } 533 534 aggregate, ok := collectSender.Done() 535 if !ok { 536 aggregate = &zoekt.SearchResult{ 537 RepoURLs: map[string]string{}, 538 LineFragments: map[string]string{}, 539 } 540 } 541 542 copyFiles(aggregate) 543 544 if !loaded.ready { 545 // We may have missed results due to not being fully loaded. 546 aggregate.Stats.Crashes++ 547 } 548 549 aggregate.Stats.Wait = wait 550 aggregate.Stats.Duration = time.Since(start) 551 552 return aggregate, nil 553} 554 555func (ss *shardedSearcher) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, sender zoekt.Sender) (err error) { 556 tr, ctx := trace.New(ctx, "shardedSearcher.StreamSearch", "") 557 defer func() { 558 if err != nil { 559 tr.LazyPrintf("error: %v", err) 560 tr.SetError(err) 561 } 562 tr.Finish() 563 }() 564 565 start := time.Now() 566 proc, err := ss.sched.Acquire(ctx) 567 if err != nil { 568 return err 569 } 570 defer proc.Release() 571 tr.LazyPrintf("acquired process") 572 573 loaded := ss.getLoaded() 574 shards := loaded.shards 575 576 maxPendingPriority := math.Inf(-1) 577 if len(shards) > 0 { 578 maxPendingPriority = shards[0].priority 579 } 580 581 stillLoadingCrashes := 0 582 if !loaded.ready { 583 // We may have missed results due to not being fully loaded. 584 stillLoadingCrashes++ 585 } 586 587 sender.Send(&zoekt.SearchResult{ 588 Stats: zoekt.Stats{ 589 Crashes: stillLoadingCrashes, 590 Wait: time.Since(start), 591 }, 592 Progress: zoekt.Progress{ 593 MaxPendingPriority: maxPendingPriority, 594 }, 595 }) 596 597 // Matches flow from the shards up the stack in the following order: 598 // 599 // 1. Search shards 600 // 2. flushCollectSender (aggregate) 601 // 3. limitSender (limit) 602 // 4. copyFileSender (copy) 603 // 604 // For streaming, the wrapping has to happen in the inverted order. 605 sender = copyFileSender(sender) 606 607 if truncator, hasLimits := zoekt.NewDisplayTruncator(opts); hasLimits { 608 var cancel context.CancelFunc 609 ctx, cancel = context.WithCancel(ctx) 610 defer cancel() 611 sender = limitSender(cancel, sender, truncator) 612 } 613 614 sender, flush := newFlushCollectSender(opts, sender) 615 616 done, err := streamSearch(ctx, proc, q, opts, shards, sender) 617 618 // Even though streaming is done, we may have results sitting in a buffer we 619 // need to flush. So we need to send those before calling done. 620 flush() 621 done() 622 623 return err 624} 625 626// streamSearch is an internal helper since both Search and StreamSearch are 627// largely similar. 628// 629// done must always be called, even if err is non-nil. The SearchResults sent 630// via sender contain references to the underlying mmap data that the garbage 631// collector can't see. Calling done informs the garbage collector it is free 632// to collect those shards. The caller must call copyFiles on any 633// SearchResults it returns/streams out before calling done. 634func streamSearch(ctx context.Context, proc *process, q query.Q, opts *zoekt.SearchOptions, shards []*rankedShard, sender zoekt.Sender) (done func(), err error) { 635 tr, ctx := trace.New(ctx, "shardedSearcher.streamSearch", "") 636 overallStart := time.Now() 637 metricSearchRunning.Inc() 638 defer func() { 639 metricSearchRunning.Dec() 640 metricSearchDuration.Observe(time.Since(overallStart).Seconds()) 641 if err != nil { 642 metricSearchFailedTotal.Inc() 643 644 tr.LazyPrintf("error: %v", err) 645 tr.SetError(err) 646 } 647 tr.Finish() 648 }() 649 650 // Select the subset of shards that we will search over for the given query. 651 { 652 beforeLen := len(shards) 653 beforeQ := q 654 shards, q = selectRepoSet(shards, q) 655 tr.LazyPrintf("selectRepoSet shards=%d->%d q=%s->%s", beforeLen, len(shards), beforeQ, q) 656 } 657 658 if len(shards) == 0 { 659 return func() {}, nil 660 } 661 662 var cancel context.CancelFunc 663 if opts.MaxWallTime == 0 { 664 ctx, cancel = context.WithCancel(ctx) 665 } else { 666 ctx, cancel = context.WithTimeout(ctx, opts.MaxWallTime) 667 } 668 669 defer cancel() 670 671 // We set the number of workers to GOMAXPROCS, or the number of shards, 672 // whichever is smaller. 673 workers := runtime.GOMAXPROCS(0) 674 if workers > len(shards) { 675 workers = len(shards) 676 } 677 678 type result struct { 679 priority float64 680 *zoekt.SearchResult 681 err error 682 } 683 684 var ( 685 // buffered channels to continue searching when sending back results 686 // takes a while / blocks. The maximum pending result set is workers * 2. 687 results = make(chan *result, workers) 688 search = make(chan *rankedShard, workers) 689 wg sync.WaitGroup 690 ) 691 692 // Start workers that receive shards from the search channel, search them, 693 // and send the results to the results channel. This process is repeated 694 // until the search channel is closed. 695 // 696 // Note: Making "search" a buffered channel has the effect of limiting the number of parallel shard searches. 697 // Since searching is mostly CPU bound, limiting parallel shard searches also reduces the peak working set. 698 wg.Add(workers) 699 for i := 0; i < workers; i++ { 700 go func() { 701 defer wg.Done() 702 for s := range search { 703 sr, err := searchOneShard(ctx, s, q, opts) 704 r := &result{priority: s.priority, SearchResult: sr, err: err} 705 results <- r 706 } 707 }() 708 } 709 710 go func() { 711 wg.Wait() 712 close(results) 713 }() 714 715 var ( 716 pending = make(prioritySlice, 0, workers) 717 shard = 0 718 next = shards[shard] 719 720 // We need a separate nil-able reference to the same channel so we can close(search) for the worker 721 // go-routines to finish but also set work to nil in order for the select statement below to ignore 722 // that case when we want to stop a search. This is needed because sending on a closed channel panics. 723 work = search 724 ) 725 726 stop := func() { 727 if work != nil { 728 close(search) 729 work = nil 730 next = nil 731 } 732 } 733 734 // tracked so we can stop when we hit TotalMaxMatchCount 735 var totalMatchCount int 736 737search: 738 for { 739 // At the top of each iteration, have the proc associated with this search yield its won "timeslice" 740 // to possibly allow other searches to make progress 741 _ = proc.Yield(ctx) // Note: we let searchOneShard handle context errors 742 743 select { 744 case work <- next: // is there a worker available to search the next shard? 745 pending.append(next.priority) 746 747 shard++ 748 if shard == len(shards) { 749 stop() 750 } else { 751 next = shards[shard] 752 } 753 case r, ok := <-results: // is there a result to send back? 754 if !ok { 755 break search 756 } 757 758 // delete this result's priority from pending before computing the new max pending priority 759 pending.remove(r.priority) 760 761 if r.err != nil { 762 // Set final error and stop searching new shards, but consume any pending 763 // search results. 764 stop() 765 err = r.err 766 continue 767 } 768 769 // Update the match count statistics and stop searching new shards if we've 770 // reached the limit set in the options. 771 totalMatchCount += r.SearchResult.Stats.MatchCount 772 if opts.TotalMaxMatchCount > 0 && totalMatchCount > opts.TotalMaxMatchCount { 773 stop() 774 } 775 776 observeMetrics(r.SearchResult) 777 778 r.Priority = r.priority 779 r.MaxPendingPriority = pending.max() 780 781 sendByRepository(r.SearchResult, opts, sender) // send the result back to the client 782 } 783 } 784 785 return func() { runtime.KeepAlive(shards) }, err 786} 787 788// sendByRepository splits a zoekt.SearchResult by repository and calls 789// sender.Send for each batch. Ranking in Sourcegraph expects zoekt.SearchResult 790// to contain results with the same zoekt.SearchResult.Priority only. 791// 792// We split by repository instead of by priority because it is easier to set 793// RepoURLs and LineFragments in zoekt.SearchResult. 794func sendByRepository(result *zoekt.SearchResult, opts *zoekt.SearchOptions, sender zoekt.Sender) { 795 if len(result.RepoURLs) <= 1 || len(result.Files) == 0 { 796 zoekt.SortFiles(result.Files) 797 sender.Send(result) 798 return 799 } 800 801 send := func(repoName string, a, b int, stats zoekt.Stats) { 802 zoekt.SortFiles(result.Files[a:b]) 803 sender.Send(&zoekt.SearchResult{ 804 Stats: stats, 805 Progress: zoekt.Progress{ 806 Priority: result.Files[a].RepositoryPriority, 807 MaxPendingPriority: result.MaxPendingPriority, 808 }, 809 Files: result.Files[a:b], 810 RepoURLs: map[string]string{repoName: result.RepoURLs[repoName]}, 811 LineFragments: map[string]string{repoName: result.LineFragments[repoName]}, 812 }) 813 } 814 815 var startIndex, endIndex int 816 curRepoID := result.Files[0].RepositoryID 817 curRepoName := result.Files[0].Repository 818 819 fm := zoekt.FileMatch{} 820 for endIndex, fm = range result.Files { 821 if curRepoID != fm.RepositoryID { 822 // Stats must stay aggregate-able, hence we sent the aggregate stats with the 823 // last event. 824 send(curRepoName, startIndex, endIndex, zoekt.Stats{}) 825 826 startIndex = endIndex 827 curRepoID = fm.RepositoryID 828 curRepoName = fm.Repository 829 } 830 } 831 832 send(curRepoName, startIndex, endIndex+1, result.Stats) 833} 834 835func observeMetrics(sr *zoekt.SearchResult) { 836 metricSearchContentBytesLoadedTotal.Add(float64(sr.Stats.ContentBytesLoaded)) 837 metricSearchIndexBytesLoadedTotal.Add(float64(sr.Stats.IndexBytesLoaded)) 838 metricSearchCrashesTotal.Add(float64(sr.Stats.Crashes)) 839 metricSearchFileCountTotal.Add(float64(sr.Stats.FileCount)) 840 metricSearchShardFilesConsideredTotal.Add(float64(sr.Stats.ShardFilesConsidered)) 841 metricSearchFilesConsideredTotal.Add(float64(sr.Stats.FilesConsidered)) 842 metricSearchFilesLoadedTotal.Add(float64(sr.Stats.FilesLoaded)) 843 metricSearchFilesSkippedTotal.Add(float64(sr.Stats.FilesSkipped)) 844 metricSearchShardsSkippedTotal.Add(float64(sr.Stats.ShardsSkipped)) 845 metricSearchMatchCountTotal.Add(float64(sr.Stats.MatchCount)) 846 metricSearchNgramMatchesTotal.Add(float64(sr.Stats.NgramMatches)) 847 metricSearchNgramLookupsTotal.Add(float64(sr.Stats.NgramLookups)) 848 metricSearchRegexpsConsideredTotal.Add(float64(sr.Stats.RegexpsConsidered)) 849} 850 851func copySlice(src *[]byte) { 852 if *src == nil { 853 return 854 } 855 dst := make([]byte, len(*src)) 856 copy(dst, *src) 857 *src = dst 858} 859 860func copyFiles(sr *zoekt.SearchResult) { 861 for i := range sr.Files { 862 copySlice(&sr.Files[i].Content) 863 copySlice(&sr.Files[i].Checksum) 864 for l := range sr.Files[i].LineMatches { 865 copySlice(&sr.Files[i].LineMatches[l].Line) 866 copySlice(&sr.Files[i].LineMatches[l].Before) 867 copySlice(&sr.Files[i].LineMatches[l].After) 868 } 869 for c := range sr.Files[i].ChunkMatches { 870 copySlice(&sr.Files[i].ChunkMatches[c].Content) 871 } 872 } 873} 874 875func searchOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) { 876 metricSearchShardRunning.Inc() 877 defer func() { 878 metricSearchShardRunning.Dec() 879 if e := recover(); e != nil { 880 log.Printf("crashed shard: %s: %#v, %s", s, e, debug.Stack()) 881 882 if sr == nil { 883 sr = &zoekt.SearchResult{} 884 } 885 sr.Stats.Crashes = 1 886 } 887 }() 888 889 return s.Search(ctx, q, opts) 890} 891 892type shardListResult struct { 893 rl *zoekt.RepoList 894 err error 895} 896 897func listOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.ListOptions, sink chan shardListResult) { 898 metricListShardRunning.Inc() 899 defer func() { 900 metricListShardRunning.Dec() 901 if r := recover(); r != nil { 902 log.Printf("crashed shard: %s: %s, %s", s.String(), r, debug.Stack()) 903 sink <- shardListResult{ 904 &zoekt.RepoList{Crashes: 1}, nil, 905 } 906 } 907 }() 908 909 ms, err := s.List(ctx, q, opts) 910 sink <- shardListResult{ms, err} 911} 912 913func (ss *shardedSearcher) List(ctx context.Context, q query.Q, opts *zoekt.ListOptions) (rl *zoekt.RepoList, err error) { 914 tr, ctx := trace.New(ctx, "shardedSearcher.List", "") 915 metricListRunning.Inc() 916 defer func() { 917 metricListRunning.Dec() 918 if rl != nil { 919 tr.LazyPrintf("repos.size=%d reposmap.size=%d crashes=%d", len(rl.Repos), len(rl.ReposMap), rl.Crashes) 920 } 921 if err != nil { 922 tr.LazyPrintf("error: %v", err) 923 tr.SetError(err) 924 } 925 tr.Finish() 926 }() 927 928 q = query.Simplify(q) 929 isAll := false 930 if c, ok := q.(*query.Const); ok { 931 isAll = c.Value 932 } 933 934 proc, err := ss.sched.Acquire(ctx) 935 if err != nil { 936 return nil, err 937 } 938 defer proc.Release() 939 tr.LazyPrintf("acquired process") 940 941 loaded := ss.getLoaded() 942 shards := loaded.shards 943 944 // Setup what we return now, since we may short circuit if there are no 945 // shards to search. 946 stillLoadingCrashes := 0 947 if !loaded.ready { 948 // We may have missed results due to not being fully loaded. 949 stillLoadingCrashes++ 950 } 951 agg := zoekt.RepoList{ 952 Crashes: stillLoadingCrashes, 953 ReposMap: zoekt.ReposMap{}, 954 Repos: []*zoekt.RepoListEntry{}, 955 } 956 957 // PERF: Select the subset of shards that we will search over for the given 958 // query. A common List query only asks for a specific repo, so this is an 959 // important optimization. 960 { 961 beforeLen := len(shards) 962 beforeQ := q 963 shards, q = selectRepoSet(shards, q) 964 tr.LazyPrintf("selectRepoSet shards=%d->%d q=%s->%s", beforeLen, len(shards), beforeQ, q) 965 } 966 967 if len(shards) == 0 { 968 return &agg, nil 969 } 970 971 shardCount := len(shards) 972 all := make(chan shardListResult, shardCount) 973 feeder := make(chan zoekt.Searcher, len(shards)) 974 for _, s := range shards { 975 feeder <- s 976 } 977 close(feeder) 978 979 for i := 0; i < runtime.GOMAXPROCS(0); i++ { 980 go func() { 981 for s := range feeder { 982 listOneShard(ctx, s, q, opts, all) 983 } 984 }() 985 } 986 987 uniq := map[string]*zoekt.RepoListEntry{} 988 989 for range shards { 990 r := <-all 991 if r.err != nil { 992 return nil, r.err 993 } 994 995 agg.Crashes += r.rl.Crashes 996 agg.Stats.Add(&r.rl.Stats) 997 998 for _, r := range r.rl.Repos { 999 prev, ok := uniq[r.Repository.Name] 1000 if !ok { 1001 cp := *r // We need to copy because we mutate r.Stats when merging duplicates 1002 uniq[r.Repository.Name] = &cp 1003 } else { 1004 prev.Stats.Add(&r.Stats) 1005 } 1006 } 1007 1008 for id, r := range r.rl.ReposMap { 1009 _, ok := agg.ReposMap[id] 1010 if !ok { 1011 agg.ReposMap[id] = r 1012 } 1013 } 1014 } 1015 1016 agg.Repos = make([]*zoekt.RepoListEntry, 0, len(uniq)) 1017 for _, r := range uniq { 1018 agg.Repos = append(agg.Repos, r) 1019 } 1020 1021 // Only one of these fields is populated and in all cases the size of that 1022 // field is the number of Repos. 1023 // 1024 // Note: we don't just add individual Stats.Repos since a repository can 1025 // have multiple shards. 1026 agg.Stats.Repos = len(uniq) + len(agg.ReposMap) 1027 1028 if isAll && len(agg.Repos) > 0 { 1029 reportListAllMetrics(agg.Repos) 1030 } 1031 1032 return &agg, nil 1033} 1034 1035func reportListAllMetrics(repos []*zoekt.RepoListEntry) { 1036 var stats zoekt.RepoStats 1037 for _, r := range repos { 1038 stats.Add(&r.Stats) 1039 } 1040 1041 metricListAllRepos.Set(float64(stats.Repos)) 1042 metricListAllIndexBytes.Set(float64(stats.IndexBytes)) 1043 metricListAllContentBytes.Set(float64(stats.ContentBytes)) 1044 metricListAllDocuments.Set(float64(stats.Documents)) 1045 metricListAllShards.Set(float64(stats.Shards)) 1046 metricListAllNewLinesCount.Set(float64(stats.NewLinesCount)) 1047 metricListAllDefaultBranchNewLinesCount.Set(float64(stats.DefaultBranchNewLinesCount)) 1048 metricListAllOtherBranchesNewLinesCount.Set(float64(stats.OtherBranchesNewLinesCount)) 1049} 1050 1051// getLoaded returns the currently loaded shards. Shared so do not mutate. 1052func (s *shardedSearcher) getLoaded() loaded { 1053 // next commit will store the true value of this, for now we keep the 1054 // backwards compatible behaviour. 1055 ready := s.ready.Load() 1056 // ranked is loaded after ready to avoid a race were ready is true but 1057 // ranked is still not the final set of shards. 1058 ranked, _ := s.ranked.Load().([]*rankedShard) 1059 return loaded{ 1060 shards: ranked, 1061 ready: ready, 1062 } 1063} 1064 1065func mkRankedShard(s zoekt.Searcher) *rankedShard { 1066 q := query.Const{Value: true} 1067 result, err := s.List(context.Background(), &q, nil) 1068 if err != nil { 1069 return &rankedShard{Searcher: s} 1070 } 1071 if len(result.Repos) == 0 { 1072 return &rankedShard{Searcher: s} 1073 } 1074 1075 var ( 1076 maxPriority float64 1077 repos = make([]*zoekt.Repository, 0, len(result.Repos)) 1078 ) 1079 for i := range result.Repos { 1080 repo := &result.Repos[i].Repository 1081 repos = append(repos, repo) 1082 if repo.RawConfig != nil { 1083 priority, _ := strconv.ParseFloat(repo.RawConfig["priority"], 64) 1084 if priority > maxPriority { 1085 maxPriority = priority 1086 } 1087 } 1088 } 1089 1090 return &rankedShard{ 1091 Searcher: s, 1092 repos: repos, 1093 priority: maxPriority, 1094 } 1095} 1096 1097// markReady should be called once all shards have been passed into replace on 1098// startup. Once s is marked as ready it stops reporting a Crash in the 1099// response Stats. 1100func (s *shardedSearcher) markReady() { 1101 s.ready.CompareAndSwap(false, true) 1102} 1103 1104func (s *shardedSearcher) replace(shards map[string]zoekt.Searcher) { 1105 if len(shards) == 0 { 1106 return 1107 } 1108 1109 defer func(began time.Time) { 1110 metricShardsBatchReplaceDurationSeconds.Observe(time.Since(began).Seconds()) 1111 }(time.Now()) 1112 1113 s.mu.Lock() 1114 defer s.mu.Unlock() 1115 1116 for key, shard := range shards { 1117 var r *rankedShard 1118 if shard != nil { 1119 r = mkRankedShard(shard) 1120 } 1121 1122 old := s.shards[key] 1123 if shard == nil { 1124 delete(s.shards, key) 1125 } else { 1126 s.shards[key] = r 1127 } 1128 1129 if old != nil && old.Searcher != nil { 1130 // _ ___ /^^\ /^\ /^^\_ 1131 // _ _@)@) \ ,,/ '` ~ `'~~ ', `\. 1132 // _/o\_ _ _ _/~`.`...'~\ ./~~..,'`','',.,' ' ~: 1133 // / `,'.~,~.~ . , . , ~|, ,/ .,' , ,. .. ,,. `, ~\_ 1134 // ( ' _' _ '_` _ ' . , `\_/ .' ..' ' ` ` `.. `, \_ 1135 // ~V~ V~ V~ V~ ~\ ` ' . ' , ' .,.,''`.,.''`.,.``. ', \_ 1136 // _/\ /\ /\ /\_/, . ' , `_/~\_ .' .,. ,, , _/~\_ `. `. '., \_ 1137 // < ~ ~ '~`'~'`, ., . `_: ::: \_ ' `_/ ::: \_ `.,' . ', \_ 1138 // \ ' `_ '`_ _ ',/ _::_::_ \ _ _/ _::_::_ \ `.,'.,`., \-,-,-,_,_, 1139 // `'~~ `'~~ `'~~ `'~~ \(_)(_)(_)/ `~~' \(_)(_)(_)/ ~'`\_.._,._,'_;_;_;_;_; 1140 // 1141 // We can't just call Close now, because there may be ongoing searches 1142 // which have old in the shards list. Previously we used an exclusive 1143 // lock to guarantee there were no concurrent searches. However, that 1144 // led to blocking on the read path. 1145 // 1146 // We could introduce granular locking per rankedShard to know when 1147 // there are no more references. However, this becomes tricky in 1148 // practice. Instead we rely on the garbage collector noticing old is no 1149 // longer used. We take care in our searchers to runtime.KeepAlive until 1150 // we have stopped referencing the underling mmap data. 1151 runtime.SetFinalizer(old, func(r *rankedShard) { 1152 r.Close() 1153 }) 1154 } 1155 } 1156 1157 ranked := make([]*rankedShard, 0, len(s.shards)) 1158 for _, r := range s.shards { 1159 ranked = append(ranked, r) 1160 } 1161 1162 sort.Slice(ranked, func(i, j int) bool { 1163 priorityDiff := ranked[i].priority - ranked[j].priority 1164 if priorityDiff != 0 { 1165 return priorityDiff > 0 1166 } 1167 if len(ranked[i].repos) == 0 || len(ranked[j].repos) == 0 { 1168 // Protect against empty names which can happen if we fail to List or 1169 // the shard is full of tombstones. Prefer the shard which has names. 1170 return len(ranked[i].repos) >= len(ranked[j].repos) 1171 } 1172 return ranked[i].repos[0].Name < ranked[j].repos[0].Name 1173 }) 1174 1175 s.ranked.Store(ranked) 1176 1177 metricShardsLoaded.Set(float64(len(ranked))) 1178} 1179 1180func loadShard(fn string) (zoekt.Searcher, error) { 1181 f, err := os.Open(fn) 1182 if err != nil { 1183 return nil, err 1184 } 1185 1186 iFile, err := zoekt.NewIndexFile(f) 1187 if err != nil { 1188 return nil, err 1189 } 1190 s, err := zoekt.NewSearcher(iFile) 1191 if err != nil { 1192 iFile.Close() 1193 return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err) 1194 } 1195 1196 return s, nil 1197} 1198 1199// prioritySlice is a trivial implementation of an array that provides three 1200// things: appending a value, removing a value, and getting the array's max. 1201// Operations take O(n) time, which is acceptable because N is restricted to 1202// GOMAXPROCS (i.e., number of cpu cores) by the shardedSearcher interface. 1203type prioritySlice []float64 1204 1205func (p *prioritySlice) append(pri float64) { 1206 *p = append(*p, pri) 1207} 1208 1209func (p *prioritySlice) remove(pri float64) { 1210 for i, opri := range *p { 1211 if opri == pri { 1212 if i != len(*p)-1 { 1213 // swap to make this element the tail 1214 (*p)[i] = (*p)[len(*p)-1] 1215 } 1216 // pop the end off 1217 *p = (*p)[:len(*p)-1] 1218 break 1219 } 1220 } 1221} 1222 1223func (p *prioritySlice) max() float64 { 1224 // remove() and max() could be combined, but this is easier to read and 1225 // the expected performance difference from the extra lock and loop is 1226 // almost certainly irrelevant. 1227 maxPri := math.Inf(-1) 1228 for _, pri := range *p { 1229 if pri > maxPri { 1230 maxPri = pri 1231 } 1232 } 1233 return maxPri 1234}