fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package shards 16 17import ( 18 "context" 19 "fmt" 20 "log" 21 "math" 22 "os" 23 "runtime" 24 "runtime/debug" 25 "sort" 26 "strconv" 27 "sync" 28 "time" 29 30 "golang.org/x/sync/semaphore" 31 32 "github.com/prometheus/client_golang/prometheus" 33 "github.com/prometheus/client_golang/prometheus/promauto" 34 "go.uber.org/atomic" 35 36 "github.com/sourcegraph/zoekt" 37 "github.com/sourcegraph/zoekt/query" 38 "github.com/sourcegraph/zoekt/trace" 39) 40 41var ( 42 metricShardsLoaded = promauto.NewGauge(prometheus.GaugeOpts{ 43 Name: "zoekt_shards_loaded", 44 Help: "The number of shards currently loaded", 45 }) 46 metricShardsLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 47 Name: "zoekt_shards_loaded_total", 48 Help: "The total number of shards loaded", 49 }) 50 metricShardsLoadFailedTotal = promauto.NewCounter(prometheus.CounterOpts{ 51 Name: "zoekt_shards_load_failed_total", 52 Help: "The total number of shard loads that failed", 53 }) 54 55 metricSearchRunning = promauto.NewGauge(prometheus.GaugeOpts{ 56 Name: "zoekt_search_running", 57 Help: "The number of concurrent search requests running", 58 }) 59 metricSearchShardRunning = promauto.NewGauge(prometheus.GaugeOpts{ 60 Name: "zoekt_search_shard_running", 61 Help: "The number of concurrent search requests in a shard running", 62 }) 63 metricSearchFailedTotal = promauto.NewCounter(prometheus.CounterOpts{ 64 Name: "zoekt_search_failed_total", 65 Help: "The total number of search requests that failed", 66 }) 67 metricSearchDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 68 Name: "zoekt_search_duration_seconds", 69 Help: "The duration a search request took in seconds", 70 Buckets: prometheus.DefBuckets, // DefBuckets good for service timings 71 }) 72 73 // A Counter per Stat. Name should match field in zoekt.Stats. 74 metricSearchContentBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 75 Name: "zoekt_search_content_loaded_bytes_total", 76 Help: "Total amount of I/O for reading contents", 77 }) 78 metricSearchIndexBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 79 Name: "zoekt_search_index_loaded_bytes_total", 80 Help: "Total amount of I/O for reading from index", 81 }) 82 metricSearchCrashesTotal = promauto.NewCounter(prometheus.CounterOpts{ 83 Name: "zoekt_search_crashes_total", 84 Help: "Total number of search shards that had a crash", 85 }) 86 metricSearchFileCountTotal = promauto.NewCounter(prometheus.CounterOpts{ 87 Name: "zoekt_search_file_count_total", 88 Help: "Total number of files containing a match", 89 }) 90 metricSearchShardFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{ 91 Name: "zoekt_search_shard_files_considered_total", 92 Help: "Total number of files in shards that we considered", 93 }) 94 metricSearchFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{ 95 Name: "zoekt_search_files_considered_total", 96 Help: "Total files that we evaluated. Equivalent to files for which all atom matches (including negations) evaluated to true", 97 }) 98 metricSearchFilesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 99 Name: "zoekt_search_files_loaded_total", 100 Help: "Total files for which we loaded file content to verify substring matches", 101 }) 102 metricSearchFilesSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{ 103 Name: "zoekt_search_files_skipped_total", 104 Help: "Total candidate files whose contents weren't examined because we gathered enough matches", 105 }) 106 metricSearchShardsSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{ 107 Name: "zoekt_search_shards_skipped_total", 108 Help: "Total shards that we did not process because a query was canceled", 109 }) 110 metricSearchMatchCountTotal = promauto.NewCounter(prometheus.CounterOpts{ 111 Name: "zoekt_search_match_count_total", 112 Help: "Total number of non-overlapping matches", 113 }) 114 metricSearchNgramMatchesTotal = promauto.NewCounter(prometheus.CounterOpts{ 115 Name: "zoekt_search_ngram_matches_total", 116 Help: "Total number of candidate matches as a result of searching ngrams", 117 }) 118 metricSearchNgramLookupsTotal = promauto.NewCounter(prometheus.CounterOpts{ 119 Name: "zoekt_search_ngram_lookups_total", 120 Help: "Total number of times we accessed an ngram in the index", 121 }) 122 metricSearchRegexpsConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{ 123 Name: "zoekt_search_regexps_considered_total", 124 Help: "Total number of times regexp was called on files that we evaluated", 125 }) 126 127 metricListRunning = promauto.NewGauge(prometheus.GaugeOpts{ 128 Name: "zoekt_list_running", 129 Help: "The number of concurrent list requests running", 130 }) 131 metricListShardRunning = promauto.NewGauge(prometheus.GaugeOpts{ 132 Name: "zoekt_list_shard_running", 133 Help: "The number of concurrent list requests in a shard running", 134 }) 135 metricShardsBatchReplaceDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{ 136 Name: "zoekt_shards_batch_replace_duration_seconds", 137 Help: "The time it takes to replace a batch of Searchers.", 138 Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30}, 139 }) 140 metricListAllRepos = promauto.NewGauge(prometheus.GaugeOpts{ 141 Name: "zoekt_list_all_stats_repos", 142 Help: "The last List(true) value for RepoStats.Repos. Repos is used for aggregrating the number of repositories.", 143 }) 144 metricListAllShards = promauto.NewGauge(prometheus.GaugeOpts{ 145 Name: "zoekt_list_all_stats_shards", 146 Help: "The last List(true) value for RepoStats.Shards. Shards is the total number of search shards.", 147 }) 148 metricListAllDocuments = promauto.NewGauge(prometheus.GaugeOpts{ 149 Name: "zoekt_list_all_stats_documents", 150 Help: "The last List(true) value for RepoStats.Documents. Documents holds the number of documents or files.", 151 }) 152 metricListAllIndexBytes = promauto.NewGauge(prometheus.GaugeOpts{ 153 Name: "zoekt_list_all_stats_index_bytes", 154 Help: "The last List(true) value for RepoStats.IndexBytes. IndexBytes is the amount of RAM used for index overhead.", 155 }) 156 metricListAllContentBytes = promauto.NewGauge(prometheus.GaugeOpts{ 157 Name: "zoekt_list_all_stats_content_bytes", 158 Help: "The last List(true) value for RepoStats.ContentBytes. ContentBytes is the amount of RAM used for raw content.", 159 }) 160 metricListAllNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 161 Name: "zoekt_list_all_stats_new_lines_count", 162 Help: "The last List(true) value for RepoStats.NewLinesCount.", 163 }) 164 metricListAllDefaultBranchNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 165 Name: "zoekt_list_all_stats_default_branch_new_lines_count", 166 Help: "The last List(true) value for RepoStats.DefaultBranchNewLinesCount.", 167 }) 168 metricListAllOtherBranchesNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 169 Name: "zoekt_list_all_stats_other_branches_new_lines_count", 170 Help: "The last List(true) value for RepoStats.OtherBranchesNewLinesCount.", 171 }) 172) 173 174type rankedShard struct { 175 zoekt.Searcher 176 177 priority float64 // maximum priority across all repos in the shard 178 179 // We have out of band ranking on compound shards which can change even if 180 // the shard file does not. So we compute a rank in getShards. We store 181 // repos here to avoid the cost of List in the search request path. 182 repos []*zoekt.Repository 183} 184 185// loaded stores the state we compute when updating the state of shards from 186// disk. 187type loaded struct { 188 // shards is the currently loaded shards sorted by decreasing rank and 189 // should not be mutated. 190 shards []*rankedShard 191 192 // ready is true if sharded searcher has finished loading all initial 193 // shards on startup. 194 ready bool 195} 196 197type shardedSearcher struct { 198 // Limit the number of parallel queries. Since searching is 199 // CPU bound, we can't do better than #CPU queries in 200 // parallel. If we do so, we just create more memory 201 // pressure. 202 sched scheduler 203 204 mu sync.Mutex // protects writes to shards 205 shards map[string]*rankedShard 206 207 ready atomic.Bool 208 ranked atomic.Value 209} 210 211func newShardedSearcher(n int64) *shardedSearcher { 212 ss := &shardedSearcher{ 213 shards: make(map[string]*rankedShard), 214 sched: newScheduler(n), 215 } 216 return ss 217} 218 219// NewDirectorySearcher returns a searcher instance that loads all 220// shards corresponding to a glob into memory. 221func NewDirectorySearcher(dir string) (zoekt.Streamer, error) { 222 return newDirectorySearcher(dir, true) 223} 224 225// NewDirectorySearcherFast is like NewDirectorySearcher, but does not block 226// on the initial loading of shards. 227// 228// This exists since in the case of zoekt-webserver we are happy with having 229// partial availability since that is better than no availability on large 230// instances. 231func NewDirectorySearcherFast(dir string) (zoekt.Streamer, error) { 232 return newDirectorySearcher(dir, false) 233} 234 235func newDirectorySearcher(dir string, waitUntilReady bool) (zoekt.Streamer, error) { 236 ss := newShardedSearcher(int64(runtime.GOMAXPROCS(0))) 237 tl := &loader{ 238 ss: ss, 239 } 240 dw, err := newDirectoryWatcher(dir, tl) 241 if err != nil { 242 return nil, err 243 } 244 245 if waitUntilReady { 246 if err := dw.WaitUntilReady(); err != nil { 247 return nil, err 248 } 249 } 250 251 ds := &directorySearcher{ 252 Streamer: ss, 253 directoryWatcher: dw, 254 } 255 256 return &typeRepoSearcher{Streamer: ds}, nil 257} 258 259type directorySearcher struct { 260 zoekt.Streamer 261 262 directoryWatcher *DirectoryWatcher 263} 264 265func (s *directorySearcher) Close() { 266 // We need to Stop directoryWatcher first since it calls load/unload on 267 // Searcher. 268 s.directoryWatcher.Stop() 269 s.Streamer.Close() 270} 271 272type loader struct { 273 ss *shardedSearcher 274} 275 276func (tl *loader) load(keys ...string) { 277 // This is called with all keys on startup, so once this function has 278 // finished running shardedSearcher will be ready. 279 defer tl.ss.markReady() 280 281 if len(keys) == 0 { 282 // If there's nothing to load, we exit early here, but we want to mark 283 // ourselves as ready. 284 return 285 } 286 287 var ( 288 mu sync.Mutex // synchronizes writes to the shards map 289 wg sync.WaitGroup // used to wait for all shards to load 290 sem = semaphore.NewWeighted(int64(runtime.GOMAXPROCS(0))) 291 loadedShards = make(map[string]zoekt.Searcher) 292 ) 293 294 publishLoaded := func() { 295 mu.Lock() 296 chunk := loadedShards 297 loadedShards = make(map[string]zoekt.Searcher) 298 mu.Unlock() 299 tl.ss.replace(chunk) 300 } 301 302 log.Printf("loading %d shard(s): %s", len(keys), humanTruncateList(keys, 5)) 303 304 lastProgress := time.Now() 305 for i, key := range keys { 306 // If taking a while to start-up occasionally give a progress message 307 if time.Since(lastProgress) > 5*time.Second { 308 log.Printf("still need to load %d shards...", len(keys)-i) 309 lastProgress = time.Now() 310 311 publishLoaded() 312 } 313 314 _ = sem.Acquire(context.Background(), 1) 315 wg.Add(1) 316 317 go func(key string) { 318 defer sem.Release(1) 319 defer wg.Done() 320 321 shard, err := loadShard(key) 322 if err != nil { 323 metricShardsLoadFailedTotal.Inc() 324 log.Printf("reloading: %s, err %v ", key, err) 325 return 326 } 327 metricShardsLoadedTotal.Inc() 328 329 mu.Lock() 330 loadedShards[key] = shard 331 mu.Unlock() 332 }(key) 333 } 334 335 wg.Wait() 336 337 publishLoaded() 338} 339 340func (tl *loader) drop(keys ...string) { 341 shards := make(map[string]zoekt.Searcher, len(keys)) 342 for _, key := range keys { 343 shards[key] = nil 344 } 345 tl.ss.replace(shards) 346} 347 348func (ss *shardedSearcher) String() string { 349 return "shardedSearcher" 350} 351 352// Close closes references to open files. It may be called only once. 353func (ss *shardedSearcher) Close() { 354 ss.mu.Lock() 355 shards := make(map[string]zoekt.Searcher, len(ss.shards)) 356 for k := range ss.shards { 357 shards[k] = nil 358 } 359 ss.mu.Unlock() 360 361 ss.replace(shards) 362} 363 364func selectRepoSet(shards []*rankedShard, q query.Q) ([]*rankedShard, query.Q) { 365 and, ok := q.(*query.And) 366 if !ok { 367 return shards, q 368 } 369 370 // (and (reposet ...) (q)) 371 // (and true (q)) with a filtered shards 372 // (and false) // noop 373 374 // (and (repobranches ...) (q)) 375 // (and (repobranches ...) (q)) 376 377 hasReposForPredicate := func(pred func(repo *zoekt.Repository) bool) func(repos []*zoekt.Repository) (any, all bool) { 378 return func(repos []*zoekt.Repository) (any, all bool) { 379 any = false 380 all = true 381 for _, repo := range repos { 382 b := pred(repo) 383 any = any || b 384 all = all && b 385 } 386 return any, all 387 } 388 } 389 390 for i, c := range and.Children { 391 var setSize int 392 var hasRepos func([]*zoekt.Repository) (bool, bool) 393 switch setQuery := c.(type) { 394 case *query.RepoSet: 395 setSize = len(setQuery.Set) 396 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 397 return setQuery.Set[repo.Name] 398 }) 399 case *query.RepoIDs: 400 setSize = int(setQuery.Repos.GetCardinality()) 401 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 402 return setQuery.Repos.Contains(repo.ID) 403 }) 404 case *query.BranchesRepos: 405 for _, br := range setQuery.List { 406 setSize += int(br.Repos.GetCardinality()) 407 } 408 409 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 410 for _, br := range setQuery.List { 411 if br.Repos.Contains(repo.ID) { 412 return true 413 } 414 } 415 return false 416 }) 417 default: 418 continue 419 } 420 421 // setSize may be larger than the number of shards we have. The size of 422 // filtered is bounded by min(len(set), len(shards)) 423 if setSize > len(shards) { 424 setSize = len(shards) 425 } 426 427 filtered := make([]*rankedShard, 0, setSize) 428 filteredAll := true 429 430 for _, s := range shards { 431 if any, all := hasRepos(s.repos); any { 432 filtered = append(filtered, s) 433 filteredAll = filteredAll && all 434 } 435 } 436 437 // We don't need to adjust the query since we are returning an empty set 438 // of shards to search. 439 if len(filtered) == 0 { 440 return filtered, and 441 } 442 443 // We can't simplify the query since we are searching shards which contain 444 // repos we aren't supposed to search. 445 if !filteredAll { 446 return filtered, and 447 } 448 449 // This optimization allows us to avoid the work done by 450 // indexData.simplify for each shard. 451 // 452 // For example if our query is (and (reposet foo bar) (content baz)) 453 // then at this point filtered is [foo bar] and q is the same. For each 454 // shard indexData.simplify will simplify to (and true (content baz)) -> 455 // (content baz). This work can be done now once, rather than per shard. 456 switch c := c.(type) { 457 case *query.RepoSet: 458 and.Children[i] = &query.Const{Value: true} 459 return filtered, query.Simplify(and) 460 461 case *query.RepoIDs: 462 and.Children[i] = &query.Const{Value: true} 463 return filtered, query.Simplify(and) 464 465 case *query.BranchesRepos: 466 // We can only replace if all the repos want the same branches. We 467 // simplify and just check that we are requesting 1 branch. The common 468 // case is just asking for HEAD, so this should be effective. 469 if len(c.List) != 1 { 470 return filtered, and 471 } 472 473 // Every repo wants the same branches, so we can replace RepoBranches 474 // with a list of branch queries. 475 and.Children[i] = &query.Branch{Pattern: c.List[0].Branch, Exact: true} 476 return filtered, query.Simplify(and) 477 } 478 479 // Stop after first RepoSet, otherwise we might append duplicate 480 // shards to `filtered` 481 return filtered, and 482 } 483 484 return shards, and 485} 486 487func (ss *shardedSearcher) Search(ctx context.Context, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) { 488 tr, ctx := trace.New(ctx, "shardedSearcher.Search", "") 489 defer func() { 490 if sr != nil { 491 tr.LazyPrintf("num files: %d", len(sr.Files)) 492 tr.LazyPrintf("stats: %+v", sr.Stats) 493 } 494 tr.Finish() 495 }() 496 ctx, cancel := context.WithCancel(ctx) 497 defer cancel() 498 499 collectSender := newCollectSender(opts) 500 501 start := time.Now() 502 proc, err := ss.sched.Acquire(ctx) 503 if err != nil { 504 return nil, err 505 } 506 defer proc.Release() 507 tr.LazyPrintf("acquired process") 508 509 wait := time.Since(start) 510 start = time.Now() 511 512 loaded := ss.getLoaded() 513 done, err := streamSearch(ctx, proc, q, opts, loaded.shards, collectSender) 514 defer done() 515 if err != nil { 516 return nil, err 517 } 518 519 aggregate, ok := collectSender.Done() 520 if !ok { 521 aggregate = &zoekt.SearchResult{ 522 RepoURLs: map[string]string{}, 523 LineFragments: map[string]string{}, 524 } 525 } 526 527 copyFiles(aggregate) 528 529 if !loaded.ready { 530 // We may have missed results due to not being fully loaded. 531 aggregate.Stats.Crashes++ 532 } 533 534 aggregate.Stats.Wait = wait 535 aggregate.Stats.Duration = time.Since(start) 536 537 return aggregate, nil 538} 539 540func (ss *shardedSearcher) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, sender zoekt.Sender) (err error) { 541 tr, ctx := trace.New(ctx, "shardedSearcher.StreamSearch", "") 542 defer func() { 543 if err != nil { 544 tr.LazyPrintf("error: %v", err) 545 tr.SetError(err) 546 } 547 tr.Finish() 548 }() 549 550 start := time.Now() 551 proc, err := ss.sched.Acquire(ctx) 552 if err != nil { 553 return err 554 } 555 defer proc.Release() 556 tr.LazyPrintf("acquired process") 557 558 loaded := ss.getLoaded() 559 shards := loaded.shards 560 561 maxPendingPriority := math.Inf(-1) 562 if len(shards) > 0 { 563 maxPendingPriority = shards[0].priority 564 } 565 566 stillLoadingCrashes := 0 567 if !loaded.ready { 568 // We may have missed results due to not being fully loaded. 569 stillLoadingCrashes++ 570 } 571 572 sender.Send(&zoekt.SearchResult{ 573 Stats: zoekt.Stats{ 574 Crashes: stillLoadingCrashes, 575 Wait: time.Since(start), 576 }, 577 Progress: zoekt.Progress{ 578 MaxPendingPriority: maxPendingPriority, 579 }, 580 }) 581 582 // Matches flow from the shards up the stack in the following order: 583 // 584 // 1. Search shards 585 // 2. flushCollectSender (aggregate) 586 // 3. limitSender (limit) 587 // 4. copyFileSender (copy) 588 // 589 // For streaming, the wrapping has to happen in the inverted order. 590 sender = copyFileSender(sender) 591 592 if opts.MaxDocDisplayCount > 0 { 593 var cancel context.CancelFunc 594 ctx, cancel = context.WithCancel(ctx) 595 defer cancel() 596 sender = limitSender(cancel, sender, opts.MaxDocDisplayCount) 597 } 598 599 sender, flush := newFlushCollectSender(opts, sender) 600 601 done, err := streamSearch(ctx, proc, q, opts, shards, sender) 602 603 // Even though streaming is done, we may have results sitting in a buffer we 604 // need to flush. So we need to send those before calling done. 605 flush() 606 done() 607 608 return err 609} 610 611// streamSearch is an internal helper since both Search and StreamSearch are 612// largely similar. 613// 614// done must always be called, even if err is non-nil. The SearchResults sent 615// via sender contain references to the underlying mmap data that the garbage 616// collector can't see. Calling done informs the garbage collector it is free 617// to collect those shards. The caller must call copyFiles on any 618// SearchResults it returns/streams out before calling done. 619func streamSearch(ctx context.Context, proc *process, q query.Q, opts *zoekt.SearchOptions, shards []*rankedShard, sender zoekt.Sender) (done func(), err error) { 620 tr, ctx := trace.New(ctx, "shardedSearcher.streamSearch", "") 621 tr.LazyLog(q, true) 622 tr.LazyPrintf("opts: %+v", opts) 623 overallStart := time.Now() 624 metricSearchRunning.Inc() 625 defer func() { 626 metricSearchRunning.Dec() 627 metricSearchDuration.Observe(time.Since(overallStart).Seconds()) 628 if err != nil { 629 metricSearchFailedTotal.Inc() 630 631 tr.LazyPrintf("error: %v", err) 632 tr.SetError(err) 633 } 634 tr.Finish() 635 }() 636 637 tr.LazyPrintf("before selectRepoSet shards:%d", len(shards)) 638 // Select the subset of shards that we will search over for the given query. 639 shards, q = selectRepoSet(shards, q) 640 tr.LazyPrintf("after selectRepoSet shards:%d %s", len(shards), q) 641 642 if len(shards) == 0 { 643 return func() {}, nil 644 } 645 646 var cancel context.CancelFunc 647 if opts.MaxWallTime == 0 { 648 ctx, cancel = context.WithCancel(ctx) 649 } else { 650 ctx, cancel = context.WithTimeout(ctx, opts.MaxWallTime) 651 } 652 653 defer cancel() 654 655 // We set the number of workers to GOMAXPROCS, or the number of shards, 656 // whichever is smaller. 657 workers := runtime.GOMAXPROCS(0) 658 if workers > len(shards) { 659 workers = len(shards) 660 } 661 662 type result struct { 663 priority float64 664 *zoekt.SearchResult 665 err error 666 } 667 668 var ( 669 // buffered channels to continue searching when sending back results 670 // takes a while / blocks. The maximum pending result set is workers * 2. 671 results = make(chan *result, workers) 672 search = make(chan *rankedShard, workers) 673 wg sync.WaitGroup 674 ) 675 676 // Start workers that receive shards from the search channel, search them, 677 // and send the results to the results channel. This process is repeated 678 // until the search channel is closed. 679 // 680 // Note: Making "search" a buffered channel has the effect of limiting the number of parallel shard searches. 681 // Since searching is mostly CPU bound, limiting parallel shard searches also reduces the peak working set. 682 wg.Add(workers) 683 for i := 0; i < workers; i++ { 684 go func() { 685 defer wg.Done() 686 for s := range search { 687 sr, err := searchOneShard(ctx, s, q, opts) 688 r := &result{priority: s.priority, SearchResult: sr, err: err} 689 results <- r 690 } 691 }() 692 } 693 694 go func() { 695 wg.Wait() 696 close(results) 697 }() 698 699 var ( 700 pending = make(prioritySlice, 0, workers) 701 shard = 0 702 next = shards[shard] 703 704 // We need a separate nil-able reference to the same channel so we can close(search) for the worker 705 // go-routines to finish but also set work to nil in order for the select statement below to ignore 706 // that case when we want to stop a search. This is needed because sending on a closed channel panics. 707 work = search 708 ) 709 710 stop := func() { 711 if work != nil { 712 close(search) 713 work = nil 714 next = nil 715 } 716 } 717 718 // tracked so we can stop when we hit TotalMaxMatchCount 719 var totalMatchCount int 720 721search: 722 for { 723 // At the top of each iteration, have the proc associated with this search yield its won "timeslice" 724 // to possibly allow other searches to make progress 725 _ = proc.Yield(ctx) // Note: we let searchOneShard handle context errors 726 727 select { 728 case work <- next: // is there a worker available to search the next shard? 729 pending.append(next.priority) 730 731 shard++ 732 if shard == len(shards) { 733 stop() 734 } else { 735 next = shards[shard] 736 } 737 case r, ok := <-results: // is there a result to send back? 738 if !ok { 739 break search 740 } 741 742 // delete this result's priority from pending before computing the new max pending priority 743 pending.remove(r.priority) 744 745 if r.err != nil { 746 // Set final error and stop searching new shards, but consume any pending 747 // search results. 748 stop() 749 err = r.err 750 continue 751 } 752 753 // Update the match count statistics and stop searching new shards if we've 754 // reached the limit set in the options. 755 totalMatchCount += r.SearchResult.Stats.MatchCount 756 if opts.TotalMaxMatchCount > 0 && totalMatchCount > opts.TotalMaxMatchCount { 757 stop() 758 } 759 760 observeMetrics(r.SearchResult) 761 762 r.Priority = r.priority 763 r.MaxPendingPriority = pending.max() 764 765 sendByRepository(r.SearchResult, opts, sender) // send the result back to the client 766 } 767 } 768 769 return func() { runtime.KeepAlive(shards) }, err 770} 771 772// sendByRepository splits a zoekt.SearchResult by repository and calls 773// sender.Send for each batch. Ranking in Sourcegraph expects zoekt.SearchResult 774// to contain results with the same zoekt.SearchResult.Priority only. 775// 776// We split by repository instead of by priority because it is easier to set 777// RepoURLs and LineFragments in zoekt.SearchResult. 778func sendByRepository(result *zoekt.SearchResult, opts *zoekt.SearchOptions, sender zoekt.Sender) { 779 780 if len(result.RepoURLs) <= 1 || len(result.Files) == 0 { 781 zoekt.SortFiles(result.Files) 782 sender.Send(result) 783 return 784 } 785 786 send := func(repoName string, a, b int, stats zoekt.Stats) { 787 zoekt.SortFiles(result.Files[a:b]) 788 sender.Send(&zoekt.SearchResult{ 789 Stats: stats, 790 Progress: zoekt.Progress{ 791 Priority: result.Files[a].RepositoryPriority, 792 MaxPendingPriority: result.MaxPendingPriority, 793 }, 794 Files: result.Files[a:b], 795 RepoURLs: map[string]string{repoName: result.RepoURLs[repoName]}, 796 LineFragments: map[string]string{repoName: result.LineFragments[repoName]}, 797 }) 798 } 799 800 var startIndex, endIndex int 801 curRepoID := result.Files[0].RepositoryID 802 curRepoName := result.Files[0].Repository 803 804 fm := zoekt.FileMatch{} 805 for endIndex, fm = range result.Files { 806 if curRepoID != fm.RepositoryID { 807 // Stats must stay aggregate-able, hence we sent the aggregate stats with the 808 // last event. 809 send(curRepoName, startIndex, endIndex, zoekt.Stats{}) 810 811 startIndex = endIndex 812 curRepoID = fm.RepositoryID 813 curRepoName = fm.Repository 814 } 815 } 816 817 send(curRepoName, startIndex, endIndex+1, result.Stats) 818} 819 820func observeMetrics(sr *zoekt.SearchResult) { 821 metricSearchContentBytesLoadedTotal.Add(float64(sr.Stats.ContentBytesLoaded)) 822 metricSearchIndexBytesLoadedTotal.Add(float64(sr.Stats.IndexBytesLoaded)) 823 metricSearchCrashesTotal.Add(float64(sr.Stats.Crashes)) 824 metricSearchFileCountTotal.Add(float64(sr.Stats.FileCount)) 825 metricSearchShardFilesConsideredTotal.Add(float64(sr.Stats.ShardFilesConsidered)) 826 metricSearchFilesConsideredTotal.Add(float64(sr.Stats.FilesConsidered)) 827 metricSearchFilesLoadedTotal.Add(float64(sr.Stats.FilesLoaded)) 828 metricSearchFilesSkippedTotal.Add(float64(sr.Stats.FilesSkipped)) 829 metricSearchShardsSkippedTotal.Add(float64(sr.Stats.ShardsSkipped)) 830 metricSearchMatchCountTotal.Add(float64(sr.Stats.MatchCount)) 831 metricSearchNgramMatchesTotal.Add(float64(sr.Stats.NgramMatches)) 832 metricSearchNgramLookupsTotal.Add(float64(sr.Stats.NgramLookups)) 833 metricSearchRegexpsConsideredTotal.Add(float64(sr.Stats.RegexpsConsidered)) 834} 835 836func copySlice(src *[]byte) { 837 if *src == nil { 838 return 839 } 840 dst := make([]byte, len(*src)) 841 copy(dst, *src) 842 *src = dst 843} 844 845func copyFiles(sr *zoekt.SearchResult) { 846 for i := range sr.Files { 847 copySlice(&sr.Files[i].Content) 848 copySlice(&sr.Files[i].Checksum) 849 for l := range sr.Files[i].LineMatches { 850 copySlice(&sr.Files[i].LineMatches[l].Line) 851 copySlice(&sr.Files[i].LineMatches[l].Before) 852 copySlice(&sr.Files[i].LineMatches[l].After) 853 } 854 for c := range sr.Files[i].ChunkMatches { 855 copySlice(&sr.Files[i].ChunkMatches[c].Content) 856 } 857 } 858} 859 860func searchOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) { 861 metricSearchShardRunning.Inc() 862 defer func() { 863 metricSearchShardRunning.Dec() 864 if e := recover(); e != nil { 865 log.Printf("crashed shard: %s: %#v, %s", s, e, debug.Stack()) 866 867 if sr == nil { 868 sr = &zoekt.SearchResult{} 869 } 870 sr.Stats.Crashes = 1 871 } 872 }() 873 874 return s.Search(ctx, q, opts) 875} 876 877type shardListResult struct { 878 rl *zoekt.RepoList 879 err error 880} 881 882func listOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.ListOptions, sink chan shardListResult) { 883 metricListShardRunning.Inc() 884 defer func() { 885 metricListShardRunning.Dec() 886 if r := recover(); r != nil { 887 log.Printf("crashed shard: %s: %s, %s", s.String(), r, debug.Stack()) 888 sink <- shardListResult{ 889 &zoekt.RepoList{Crashes: 1}, nil, 890 } 891 } 892 }() 893 894 ms, err := s.List(ctx, q, opts) 895 sink <- shardListResult{ms, err} 896} 897 898func (ss *shardedSearcher) List(ctx context.Context, r query.Q, opts *zoekt.ListOptions) (rl *zoekt.RepoList, err error) { 899 tr, ctx := trace.New(ctx, "shardedSearcher.List", "") 900 tr.LazyLog(r, true) 901 tr.LazyPrintf("opts: %s", opts) 902 metricListRunning.Inc() 903 defer func() { 904 metricListRunning.Dec() 905 if rl != nil { 906 tr.LazyPrintf("repos size: %d", len(rl.Repos)) 907 tr.LazyPrintf("crashes: %d", rl.Crashes) 908 tr.LazyPrintf("minimal size: %d", len(rl.Minimal)) 909 } 910 if err != nil { 911 tr.LazyPrintf("error: %v", err) 912 tr.SetError(err) 913 } 914 tr.Finish() 915 }() 916 917 r = query.Simplify(r) 918 isAll := false 919 if c, ok := r.(*query.Const); ok { 920 isAll = c.Value 921 } 922 923 proc, err := ss.sched.Acquire(ctx) 924 if err != nil { 925 return nil, err 926 } 927 defer proc.Release() 928 tr.LazyPrintf("acquired process") 929 930 loaded := ss.getLoaded() 931 shards := loaded.shards 932 shardCount := len(shards) 933 all := make(chan shardListResult, shardCount) 934 tr.LazyPrintf("shardCount: %d", len(shards)) 935 936 feeder := make(chan zoekt.Searcher, len(shards)) 937 for _, s := range shards { 938 feeder <- s 939 } 940 close(feeder) 941 942 for i := 0; i < runtime.GOMAXPROCS(0); i++ { 943 go func() { 944 for s := range feeder { 945 listOneShard(ctx, s, r, opts, all) 946 } 947 }() 948 } 949 950 stillLoadingCrashes := 0 951 if !loaded.ready { 952 // We may have missed results due to not being fully loaded. 953 stillLoadingCrashes++ 954 } 955 956 agg := zoekt.RepoList{ 957 Crashes: stillLoadingCrashes, 958 Minimal: map[uint32]*zoekt.MinimalRepoListEntry{}, 959 ReposMap: zoekt.ReposMap{}, 960 } 961 962 uniq := map[string]*zoekt.RepoListEntry{} 963 964 for range shards { 965 r := <-all 966 if r.err != nil { 967 return nil, r.err 968 } 969 970 agg.Crashes += r.rl.Crashes 971 agg.Stats.Add(&r.rl.Stats) 972 973 for _, r := range r.rl.Repos { 974 prev, ok := uniq[r.Repository.Name] 975 if !ok { 976 cp := *r // We need to copy because we mutate r.Stats when merging duplicates 977 uniq[r.Repository.Name] = &cp 978 } else { 979 prev.Stats.Add(&r.Stats) 980 } 981 } 982 983 for id, r := range r.rl.Minimal { 984 _, ok := agg.Minimal[id] 985 if !ok { 986 agg.Minimal[id] = r 987 } 988 } 989 990 for id, r := range r.rl.ReposMap { 991 agg.ReposMap[id] = r 992 } 993 } 994 995 agg.Repos = make([]*zoekt.RepoListEntry, 0, len(uniq)) 996 for _, r := range uniq { 997 agg.Repos = append(agg.Repos, r) 998 } 999 1000 // Only one of these fields is populated and in all cases the size of that 1001 // field is the number of Repos. 1002 // 1003 // Note: we don't just add individual Stats.Repos since a repository can 1004 // have multiple shards. 1005 agg.Stats.Repos = len(uniq) + len(agg.Minimal) + len(agg.ReposMap) 1006 1007 if isAll && len(agg.Repos) > 0 { 1008 reportListAllMetrics(agg.Repos) 1009 } 1010 1011 return &agg, nil 1012} 1013 1014func reportListAllMetrics(repos []*zoekt.RepoListEntry) { 1015 var stats zoekt.RepoStats 1016 for _, r := range repos { 1017 stats.Add(&r.Stats) 1018 } 1019 1020 metricListAllRepos.Set(float64(stats.Repos)) 1021 metricListAllIndexBytes.Set(float64(stats.IndexBytes)) 1022 metricListAllContentBytes.Set(float64(stats.ContentBytes)) 1023 metricListAllDocuments.Set(float64(stats.Documents)) 1024 metricListAllShards.Set(float64(stats.Shards)) 1025 metricListAllNewLinesCount.Set(float64(stats.NewLinesCount)) 1026 metricListAllDefaultBranchNewLinesCount.Set(float64(stats.DefaultBranchNewLinesCount)) 1027 metricListAllOtherBranchesNewLinesCount.Set(float64(stats.OtherBranchesNewLinesCount)) 1028} 1029 1030// getLoaded returns the currently loaded shards. Shared so do not mutate. 1031func (s *shardedSearcher) getLoaded() loaded { 1032 // next commit will store the true value of this, for now we keep the 1033 // backwards compatible behaviour. 1034 ready := s.ready.Load() 1035 // ranked is loaded after ready to avoid a race were ready is true but 1036 // ranked is still not the final set of shards. 1037 ranked, _ := s.ranked.Load().([]*rankedShard) 1038 return loaded{ 1039 shards: ranked, 1040 ready: ready, 1041 } 1042} 1043 1044func mkRankedShard(s zoekt.Searcher) *rankedShard { 1045 q := query.Const{Value: true} 1046 result, err := s.List(context.Background(), &q, nil) 1047 if err != nil { 1048 return &rankedShard{Searcher: s} 1049 } 1050 if len(result.Repos) == 0 { 1051 return &rankedShard{Searcher: s} 1052 } 1053 1054 var ( 1055 maxPriority float64 1056 repos = make([]*zoekt.Repository, 0, len(result.Repos)) 1057 ) 1058 for i := range result.Repos { 1059 repo := &result.Repos[i].Repository 1060 repos = append(repos, repo) 1061 if repo.RawConfig != nil { 1062 priority, _ := strconv.ParseFloat(repo.RawConfig["priority"], 64) 1063 if priority > maxPriority { 1064 maxPriority = priority 1065 } 1066 } 1067 } 1068 1069 return &rankedShard{ 1070 Searcher: s, 1071 repos: repos, 1072 priority: maxPriority, 1073 } 1074} 1075 1076// markReady should be called once all shards have been passed into replace on 1077// startup. Once s is marked as ready it stops reporting a Crash in the 1078// response Stats. 1079func (s *shardedSearcher) markReady() { 1080 s.ready.CompareAndSwap(false, true) 1081} 1082 1083func (s *shardedSearcher) replace(shards map[string]zoekt.Searcher) { 1084 if len(shards) == 0 { 1085 return 1086 } 1087 1088 defer func(began time.Time) { 1089 metricShardsBatchReplaceDurationSeconds.Observe(time.Since(began).Seconds()) 1090 }(time.Now()) 1091 1092 s.mu.Lock() 1093 defer s.mu.Unlock() 1094 1095 for key, shard := range shards { 1096 var r *rankedShard 1097 if shard != nil { 1098 r = mkRankedShard(shard) 1099 } 1100 1101 old := s.shards[key] 1102 if shard == nil { 1103 delete(s.shards, key) 1104 } else { 1105 s.shards[key] = r 1106 } 1107 1108 if old != nil && old.Searcher != nil { 1109 // _ ___ /^^\ /^\ /^^\_ 1110 // _ _@)@) \ ,,/ '` ~ `'~~ ', `\. 1111 // _/o\_ _ _ _/~`.`...'~\ ./~~..,'`','',.,' ' ~: 1112 // / `,'.~,~.~ . , . , ~|, ,/ .,' , ,. .. ,,. `, ~\_ 1113 // ( ' _' _ '_` _ ' . , `\_/ .' ..' ' ` ` `.. `, \_ 1114 // ~V~ V~ V~ V~ ~\ ` ' . ' , ' .,.,''`.,.''`.,.``. ', \_ 1115 // _/\ /\ /\ /\_/, . ' , `_/~\_ .' .,. ,, , _/~\_ `. `. '., \_ 1116 // < ~ ~ '~`'~'`, ., . `_: ::: \_ ' `_/ ::: \_ `.,' . ', \_ 1117 // \ ' `_ '`_ _ ',/ _::_::_ \ _ _/ _::_::_ \ `.,'.,`., \-,-,-,_,_, 1118 // `'~~ `'~~ `'~~ `'~~ \(_)(_)(_)/ `~~' \(_)(_)(_)/ ~'`\_.._,._,'_;_;_;_;_; 1119 // 1120 // We can't just call Close now, because there may be ongoing searches 1121 // which have old in the shards list. Previously we used an exclusive 1122 // lock to guarantee there were no concurrent searches. However, that 1123 // led to blocking on the read path. 1124 // 1125 // We could introduce granular locking per rankedShard to know when 1126 // there are no more references. However, this becomes tricky in 1127 // practice. Instead we rely on the garbage collector noticing old is no 1128 // longer used. We take care in our searchers to runtime.KeepAlive until 1129 // we have stopped referencing the underling mmap data. 1130 runtime.SetFinalizer(old, func(r *rankedShard) { 1131 r.Close() 1132 }) 1133 } 1134 } 1135 1136 ranked := make([]*rankedShard, 0, len(s.shards)) 1137 for _, r := range s.shards { 1138 ranked = append(ranked, r) 1139 } 1140 1141 sort.Slice(ranked, func(i, j int) bool { 1142 priorityDiff := ranked[i].priority - ranked[j].priority 1143 if priorityDiff != 0 { 1144 return priorityDiff > 0 1145 } 1146 if len(ranked[i].repos) == 0 || len(ranked[j].repos) == 0 { 1147 // Protect against empty names which can happen if we fail to List or 1148 // the shard is full of tombstones. Prefer the shard which has names. 1149 return len(ranked[i].repos) >= len(ranked[j].repos) 1150 } 1151 return ranked[i].repos[0].Name < ranked[j].repos[0].Name 1152 }) 1153 1154 s.ranked.Store(ranked) 1155 1156 metricShardsLoaded.Set(float64(len(ranked))) 1157} 1158 1159func loadShard(fn string) (zoekt.Searcher, error) { 1160 f, err := os.Open(fn) 1161 if err != nil { 1162 return nil, err 1163 } 1164 1165 iFile, err := zoekt.NewIndexFile(f) 1166 if err != nil { 1167 return nil, err 1168 } 1169 s, err := zoekt.NewSearcher(iFile) 1170 if err != nil { 1171 iFile.Close() 1172 return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err) 1173 } 1174 1175 return s, nil 1176} 1177 1178// prioritySlice is a trivial implementation of an array that provides three 1179// things: appending a value, removing a value, and getting the array's max. 1180// Operations take O(n) time, which is acceptable because N is restricted to 1181// GOMAXPROCS (i.e., number of cpu cores) by the shardedSearcher interface. 1182type prioritySlice []float64 1183 1184func (p *prioritySlice) append(pri float64) { 1185 *p = append(*p, pri) 1186} 1187 1188func (p *prioritySlice) remove(pri float64) { 1189 for i, opri := range *p { 1190 if opri == pri { 1191 if i != len(*p)-1 { 1192 // swap to make this element the tail 1193 (*p)[i] = (*p)[len(*p)-1] 1194 } 1195 // pop the end off 1196 *p = (*p)[:len(*p)-1] 1197 break 1198 } 1199 } 1200} 1201 1202func (p *prioritySlice) max() float64 { 1203 // remove() and max() could be combined, but this is easier to read and 1204 // the expected performance difference from the extra lock and loop is 1205 // almost certainly irrelevant. 1206 maxPri := math.Inf(-1) 1207 for _, pri := range *p { 1208 if pri > maxPri { 1209 maxPri = pri 1210 } 1211 } 1212 return maxPri 1213}