fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package shards 16 17import ( 18 "context" 19 "fmt" 20 "log" 21 "math" 22 "os" 23 "runtime" 24 "runtime/debug" 25 "sort" 26 "strconv" 27 "sync" 28 "time" 29 30 "golang.org/x/sync/semaphore" 31 32 "github.com/prometheus/client_golang/prometheus" 33 "github.com/prometheus/client_golang/prometheus/promauto" 34 "go.uber.org/atomic" 35 36 "github.com/sourcegraph/zoekt" 37 "github.com/sourcegraph/zoekt/query" 38 "github.com/sourcegraph/zoekt/trace" 39) 40 41var ( 42 metricShardsLoaded = promauto.NewGauge(prometheus.GaugeOpts{ 43 Name: "zoekt_shards_loaded", 44 Help: "The number of shards currently loaded", 45 }) 46 metricShardsLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 47 Name: "zoekt_shards_loaded_total", 48 Help: "The total number of shards loaded", 49 }) 50 metricShardsLoadFailedTotal = promauto.NewCounter(prometheus.CounterOpts{ 51 Name: "zoekt_shards_load_failed_total", 52 Help: "The total number of shard loads that failed", 53 }) 54 55 metricSearchRunning = promauto.NewGauge(prometheus.GaugeOpts{ 56 Name: "zoekt_search_running", 57 Help: "The number of concurrent search requests running", 58 }) 59 metricSearchShardRunning = promauto.NewGauge(prometheus.GaugeOpts{ 60 Name: "zoekt_search_shard_running", 61 Help: "The number of concurrent search requests in a shard running", 62 }) 63 metricSearchFailedTotal = promauto.NewCounter(prometheus.CounterOpts{ 64 Name: "zoekt_search_failed_total", 65 Help: "The total number of search requests that failed", 66 }) 67 metricSearchDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 68 Name: "zoekt_search_duration_seconds", 69 Help: "The duration a search request took in seconds", 70 Buckets: prometheus.DefBuckets, // DefBuckets good for service timings 71 }) 72 73 // A Counter per Stat. Name should match field in zoekt.Stats. 74 metricSearchContentBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 75 Name: "zoekt_search_content_loaded_bytes_total", 76 Help: "Total amount of I/O for reading contents", 77 }) 78 metricSearchIndexBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 79 Name: "zoekt_search_index_loaded_bytes_total", 80 Help: "Total amount of I/O for reading from index", 81 }) 82 metricSearchCrashesTotal = promauto.NewCounter(prometheus.CounterOpts{ 83 Name: "zoekt_search_crashes_total", 84 Help: "Total number of search shards that had a crash", 85 }) 86 metricSearchFileCountTotal = promauto.NewCounter(prometheus.CounterOpts{ 87 Name: "zoekt_search_file_count_total", 88 Help: "Total number of files containing a match", 89 }) 90 metricSearchShardFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{ 91 Name: "zoekt_search_shard_files_considered_total", 92 Help: "Total number of files in shards that we considered", 93 }) 94 metricSearchFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{ 95 Name: "zoekt_search_files_considered_total", 96 Help: "Total files that we evaluated. Equivalent to files for which all atom matches (including negations) evaluated to true", 97 }) 98 metricSearchFilesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 99 Name: "zoekt_search_files_loaded_total", 100 Help: "Total files for which we loaded file content to verify substring matches", 101 }) 102 metricSearchFilesSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{ 103 Name: "zoekt_search_files_skipped_total", 104 Help: "Total candidate files whose contents weren't examined because we gathered enough matches", 105 }) 106 metricSearchShardsSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{ 107 Name: "zoekt_search_shards_skipped_total", 108 Help: "Total shards that we did not process because a query was canceled", 109 }) 110 metricSearchMatchCountTotal = promauto.NewCounter(prometheus.CounterOpts{ 111 Name: "zoekt_search_match_count_total", 112 Help: "Total number of non-overlapping matches", 113 }) 114 metricSearchNgramMatchesTotal = promauto.NewCounter(prometheus.CounterOpts{ 115 Name: "zoekt_search_ngram_matches_total", 116 Help: "Total number of candidate matches as a result of searching ngrams", 117 }) 118 119 metricListRunning = promauto.NewGauge(prometheus.GaugeOpts{ 120 Name: "zoekt_list_running", 121 Help: "The number of concurrent list requests running", 122 }) 123 metricListShardRunning = promauto.NewGauge(prometheus.GaugeOpts{ 124 Name: "zoekt_list_shard_running", 125 Help: "The number of concurrent list requests in a shard running", 126 }) 127 metricShardsBatchReplaceDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{ 128 Name: "zoekt_shards_batch_replace_duration_seconds", 129 Help: "The time it takes to replace a batch of Searchers.", 130 Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30}, 131 }) 132 metricListAllRepos = promauto.NewGauge(prometheus.GaugeOpts{ 133 Name: "zoekt_list_all_stats_repos", 134 Help: "The last List(true) value for RepoStats.Repos. Repos is used for aggregrating the number of repositories.", 135 }) 136 metricListAllShards = promauto.NewGauge(prometheus.GaugeOpts{ 137 Name: "zoekt_list_all_stats_shards", 138 Help: "The last List(true) value for RepoStats.Shards. Shards is the total number of search shards.", 139 }) 140 metricListAllDocuments = promauto.NewGauge(prometheus.GaugeOpts{ 141 Name: "zoekt_list_all_stats_documents", 142 Help: "The last List(true) value for RepoStats.Documents. Documents holds the number of documents or files.", 143 }) 144 metricListAllIndexBytes = promauto.NewGauge(prometheus.GaugeOpts{ 145 Name: "zoekt_list_all_stats_index_bytes", 146 Help: "The last List(true) value for RepoStats.IndexBytes. IndexBytes is the amount of RAM used for index overhead.", 147 }) 148 metricListAllContentBytes = promauto.NewGauge(prometheus.GaugeOpts{ 149 Name: "zoekt_list_all_stats_content_bytes", 150 Help: "The last List(true) value for RepoStats.ContentBytes. ContentBytes is the amount of RAM used for raw content.", 151 }) 152 metricListAllNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 153 Name: "zoekt_list_all_stats_new_lines_count", 154 Help: "The last List(true) value for RepoStats.NewLinesCount.", 155 }) 156 metricListAllDefaultBranchNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 157 Name: "zoekt_list_all_stats_default_branch_new_lines_count", 158 Help: "The last List(true) value for RepoStats.DefaultBranchNewLinesCount.", 159 }) 160 metricListAllOtherBranchesNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 161 Name: "zoekt_list_all_stats_other_branches_new_lines_count", 162 Help: "The last List(true) value for RepoStats.OtherBranchesNewLinesCount.", 163 }) 164) 165 166type rankedShard struct { 167 zoekt.Searcher 168 169 priority float64 // maximum priority across all repos in the shard 170 171 // We have out of band ranking on compound shards which can change even if 172 // the shard file does not. So we compute a rank in getShards. We store 173 // repos here to avoid the cost of List in the search request path. 174 repos []*zoekt.Repository 175} 176 177// loaded stores the state we compute when updating the state of shards from 178// disk. 179type loaded struct { 180 // shards is the currently loaded shards sorted by decreasing rank and 181 // should not be mutated. 182 shards []*rankedShard 183 184 // ready is true if sharded searcher has finished loading all initial 185 // shards on startup. 186 ready bool 187} 188 189type shardedSearcher struct { 190 // Limit the number of parallel queries. Since searching is 191 // CPU bound, we can't do better than #CPU queries in 192 // parallel. If we do so, we just create more memory 193 // pressure. 194 sched scheduler 195 196 mu sync.Mutex // protects writes to shards 197 shards map[string]*rankedShard 198 199 ready atomic.Bool 200 ranked atomic.Value 201} 202 203func newShardedSearcher(n int64) *shardedSearcher { 204 ss := &shardedSearcher{ 205 shards: make(map[string]*rankedShard), 206 sched: newScheduler(n), 207 } 208 return ss 209} 210 211// NewDirectorySearcher returns a searcher instance that loads all 212// shards corresponding to a glob into memory. 213func NewDirectorySearcher(dir string) (zoekt.Streamer, error) { 214 return newDirectorySearcher(dir, true) 215} 216 217// NewDirectorySearcherFast is like NewDirectorySearcher, but does not block 218// on the initial loading of shards. 219// 220// This exists since in the case of zoekt-webserver we are happy with having 221// partial availability since that is better than no availability on large 222// instances. 223func NewDirectorySearcherFast(dir string) (zoekt.Streamer, error) { 224 return newDirectorySearcher(dir, false) 225} 226 227func newDirectorySearcher(dir string, waitUntilReady bool) (zoekt.Streamer, error) { 228 ss := newShardedSearcher(int64(runtime.GOMAXPROCS(0))) 229 tl := &loader{ 230 ss: ss, 231 } 232 dw, err := newDirectoryWatcher(dir, tl) 233 if err != nil { 234 return nil, err 235 } 236 237 if waitUntilReady { 238 if err := dw.WaitUntilReady(); err != nil { 239 return nil, err 240 } 241 } 242 243 ds := &directorySearcher{ 244 Streamer: ss, 245 directoryWatcher: dw, 246 } 247 248 return &typeRepoSearcher{Streamer: ds}, nil 249} 250 251type directorySearcher struct { 252 zoekt.Streamer 253 254 directoryWatcher *DirectoryWatcher 255} 256 257func (s *directorySearcher) Close() { 258 // We need to Stop directoryWatcher first since it calls load/unload on 259 // Searcher. 260 s.directoryWatcher.Stop() 261 s.Streamer.Close() 262} 263 264type loader struct { 265 ss *shardedSearcher 266} 267 268func (tl *loader) load(keys ...string) { 269 // This is called with all keys on startup, so once this function has 270 // finished running shardedSearcher will be ready. 271 defer tl.ss.markReady() 272 273 if len(keys) == 0 { 274 // If there's nothing to load, we exit early here, but we want to mark 275 // ourselves as ready. 276 return 277 } 278 279 var ( 280 mu sync.Mutex // synchronizes writes to the shards map 281 wg sync.WaitGroup // used to wait for all shards to load 282 sem = semaphore.NewWeighted(int64(runtime.GOMAXPROCS(0))) 283 loadedShards = make(map[string]zoekt.Searcher) 284 ) 285 286 publishLoaded := func() { 287 mu.Lock() 288 chunk := loadedShards 289 loadedShards = make(map[string]zoekt.Searcher) 290 mu.Unlock() 291 tl.ss.replace(chunk) 292 } 293 294 log.Printf("loading %d shard(s): %s", len(keys), humanTruncateList(keys, 5)) 295 296 lastProgress := time.Now() 297 for i, key := range keys { 298 // If taking a while to start-up occasionally give a progress message 299 if time.Since(lastProgress) > 5*time.Second { 300 log.Printf("still need to load %d shards...", len(keys)-i) 301 lastProgress = time.Now() 302 303 publishLoaded() 304 } 305 306 _ = sem.Acquire(context.Background(), 1) 307 wg.Add(1) 308 309 go func(key string) { 310 defer sem.Release(1) 311 defer wg.Done() 312 313 shard, err := loadShard(key) 314 if err != nil { 315 metricShardsLoadFailedTotal.Inc() 316 log.Printf("reloading: %s, err %v ", key, err) 317 return 318 } 319 metricShardsLoadedTotal.Inc() 320 321 mu.Lock() 322 loadedShards[key] = shard 323 mu.Unlock() 324 }(key) 325 } 326 327 wg.Wait() 328 329 publishLoaded() 330} 331 332func (tl *loader) drop(keys ...string) { 333 shards := make(map[string]zoekt.Searcher, len(keys)) 334 for _, key := range keys { 335 shards[key] = nil 336 } 337 tl.ss.replace(shards) 338} 339 340func (ss *shardedSearcher) String() string { 341 return "shardedSearcher" 342} 343 344// Close closes references to open files. It may be called only once. 345func (ss *shardedSearcher) Close() { 346 ss.mu.Lock() 347 shards := make(map[string]zoekt.Searcher, len(ss.shards)) 348 for k := range ss.shards { 349 shards[k] = nil 350 } 351 ss.mu.Unlock() 352 353 ss.replace(shards) 354} 355 356func selectRepoSet(shards []*rankedShard, q query.Q) ([]*rankedShard, query.Q) { 357 and, ok := q.(*query.And) 358 if !ok { 359 return shards, q 360 } 361 362 // (and (reposet ...) (q)) 363 // (and true (q)) with a filtered shards 364 // (and false) // noop 365 366 // (and (repobranches ...) (q)) 367 // (and (repobranches ...) (q)) 368 369 hasReposForPredicate := func(pred func(repo *zoekt.Repository) bool) func(repos []*zoekt.Repository) (any, all bool) { 370 return func(repos []*zoekt.Repository) (any, all bool) { 371 any = false 372 all = true 373 for _, repo := range repos { 374 b := pred(repo) 375 any = any || b 376 all = all && b 377 } 378 return any, all 379 } 380 } 381 382 for i, c := range and.Children { 383 var setSize int 384 var hasRepos func([]*zoekt.Repository) (bool, bool) 385 switch setQuery := c.(type) { 386 case *query.RepoSet: 387 setSize = len(setQuery.Set) 388 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 389 return setQuery.Set[repo.Name] 390 }) 391 case *query.RepoIDs: 392 setSize = int(setQuery.Repos.GetCardinality()) 393 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 394 return setQuery.Repos.Contains(repo.ID) 395 }) 396 case *query.BranchesRepos: 397 for _, br := range setQuery.List { 398 setSize += int(br.Repos.GetCardinality()) 399 } 400 401 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 402 for _, br := range setQuery.List { 403 if br.Repos.Contains(repo.ID) { 404 return true 405 } 406 } 407 return false 408 }) 409 default: 410 continue 411 } 412 413 // setSize may be larger than the number of shards we have. The size of 414 // filtered is bounded by min(len(set), len(shards)) 415 if setSize > len(shards) { 416 setSize = len(shards) 417 } 418 419 filtered := make([]*rankedShard, 0, setSize) 420 filteredAll := true 421 422 for _, s := range shards { 423 if any, all := hasRepos(s.repos); any { 424 filtered = append(filtered, s) 425 filteredAll = filteredAll && all 426 } 427 } 428 429 // We don't need to adjust the query since we are returning an empty set 430 // of shards to search. 431 if len(filtered) == 0 { 432 return filtered, and 433 } 434 435 // We can't simplify the query since we are searching shards which contain 436 // repos we aren't supposed to search. 437 if !filteredAll { 438 return filtered, and 439 } 440 441 // This optimization allows us to avoid the work done by 442 // indexData.simplify for each shard. 443 // 444 // For example if our query is (and (reposet foo bar) (content baz)) 445 // then at this point filtered is [foo bar] and q is the same. For each 446 // shard indexData.simplify will simplify to (and true (content baz)) -> 447 // (content baz). This work can be done now once, rather than per shard. 448 switch c := c.(type) { 449 case *query.RepoSet: 450 and.Children[i] = &query.Const{Value: true} 451 return filtered, query.Simplify(and) 452 453 case *query.RepoIDs: 454 and.Children[i] = &query.Const{Value: true} 455 return filtered, query.Simplify(and) 456 457 case *query.BranchesRepos: 458 // We can only replace if all the repos want the same branches. We 459 // simplify and just check that we are requesting 1 branch. The common 460 // case is just asking for HEAD, so this should be effective. 461 if len(c.List) != 1 { 462 return filtered, and 463 } 464 465 // Every repo wants the same branches, so we can replace RepoBranches 466 // with a list of branch queries. 467 and.Children[i] = &query.Branch{Pattern: c.List[0].Branch, Exact: true} 468 return filtered, query.Simplify(and) 469 } 470 471 // Stop after first RepoSet, otherwise we might append duplicate 472 // shards to `filtered` 473 return filtered, and 474 } 475 476 return shards, and 477} 478 479func (ss *shardedSearcher) Search(ctx context.Context, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) { 480 tr, ctx := trace.New(ctx, "shardedSearcher.Search", "") 481 defer func() { 482 if sr != nil { 483 tr.LazyPrintf("num files: %d", len(sr.Files)) 484 tr.LazyPrintf("stats: %+v", sr.Stats) 485 } 486 tr.Finish() 487 }() 488 ctx, cancel := context.WithCancel(ctx) 489 defer cancel() 490 491 collectSender := newCollectSender(opts) 492 493 start := time.Now() 494 proc, err := ss.sched.Acquire(ctx) 495 if err != nil { 496 return nil, err 497 } 498 defer proc.Release() 499 tr.LazyPrintf("acquired process") 500 501 wait := time.Since(start) 502 start = time.Now() 503 504 loaded := ss.getLoaded() 505 done, err := streamSearch(ctx, proc, q, opts, loaded.shards, collectSender) 506 defer done() 507 if err != nil { 508 return nil, err 509 } 510 511 aggregate, ok := collectSender.Done() 512 if !ok { 513 aggregate = &zoekt.SearchResult{ 514 RepoURLs: map[string]string{}, 515 LineFragments: map[string]string{}, 516 } 517 } 518 519 copyFiles(aggregate) 520 521 if !loaded.ready { 522 // We may have missed results due to not being fully loaded. 523 aggregate.Stats.Crashes++ 524 } 525 526 aggregate.Stats.Wait = wait 527 aggregate.Stats.Duration = time.Since(start) 528 529 return aggregate, nil 530} 531 532func (ss *shardedSearcher) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, sender zoekt.Sender) (err error) { 533 tr, ctx := trace.New(ctx, "shardedSearcher.StreamSearch", "") 534 defer func() { 535 if err != nil { 536 tr.LazyPrintf("error: %v", err) 537 tr.SetError(err) 538 } 539 tr.Finish() 540 }() 541 542 start := time.Now() 543 proc, err := ss.sched.Acquire(ctx) 544 if err != nil { 545 return err 546 } 547 defer proc.Release() 548 tr.LazyPrintf("acquired process") 549 550 loaded := ss.getLoaded() 551 shards := loaded.shards 552 553 maxPendingPriority := math.Inf(-1) 554 if len(shards) > 0 { 555 maxPendingPriority = shards[0].priority 556 } 557 558 stillLoadingCrashes := 0 559 if !loaded.ready { 560 // We may have missed results due to not being fully loaded. 561 stillLoadingCrashes++ 562 } 563 564 sender.Send(&zoekt.SearchResult{ 565 Stats: zoekt.Stats{ 566 Crashes: stillLoadingCrashes, 567 Wait: time.Since(start), 568 }, 569 Progress: zoekt.Progress{ 570 MaxPendingPriority: maxPendingPriority, 571 }, 572 }) 573 574 // Matches flow from the shards up the stack in the following order: 575 // 576 // 1. Search shards 577 // 2. flushCollectSender (aggregate) 578 // 3. limitSender (limit) 579 // 4. copyFileSender (copy) 580 // 581 // For streaming, the wrapping has to happen in the inverted order. 582 sender = copyFileSender(sender) 583 584 if opts.MaxDocDisplayCount > 0 { 585 var cancel context.CancelFunc 586 ctx, cancel = context.WithCancel(ctx) 587 defer cancel() 588 sender = limitSender(cancel, sender, opts.MaxDocDisplayCount) 589 } 590 591 sender, flush := newFlushCollectSender(opts, sender) 592 593 done, err := streamSearch(ctx, proc, q, opts, shards, sender) 594 595 // Even though streaming is done, we may have results sitting in a buffer we 596 // need to flush. So we need to send those before calling done. 597 flush() 598 done() 599 600 return err 601} 602 603// streamSearch is an internal helper since both Search and StreamSearch are 604// largely similar. 605// 606// done must always be called, even if err is non-nil. The SearchResults sent 607// via sender contain references to the underlying mmap data that the garbage 608// collector can't see. Calling done informs the garbage collector it is free 609// to collect those shards. The caller must call copyFiles on any 610// SearchResults it returns/streams out before calling done. 611func streamSearch(ctx context.Context, proc *process, q query.Q, opts *zoekt.SearchOptions, shards []*rankedShard, sender zoekt.Sender) (done func(), err error) { 612 tr, ctx := trace.New(ctx, "shardedSearcher.streamSearch", "") 613 tr.LazyLog(q, true) 614 tr.LazyPrintf("opts: %+v", opts) 615 overallStart := time.Now() 616 metricSearchRunning.Inc() 617 defer func() { 618 metricSearchRunning.Dec() 619 metricSearchDuration.Observe(time.Since(overallStart).Seconds()) 620 if err != nil { 621 metricSearchFailedTotal.Inc() 622 623 tr.LazyPrintf("error: %v", err) 624 tr.SetError(err) 625 } 626 tr.Finish() 627 }() 628 629 tr.LazyPrintf("before selectRepoSet shards:%d", len(shards)) 630 // Select the subset of shards that we will search over for the given query. 631 shards, q = selectRepoSet(shards, q) 632 tr.LazyPrintf("after selectRepoSet shards:%d %s", len(shards), q) 633 634 if len(shards) == 0 { 635 return func() {}, nil 636 } 637 638 var cancel context.CancelFunc 639 if opts.MaxWallTime == 0 { 640 ctx, cancel = context.WithCancel(ctx) 641 } else { 642 ctx, cancel = context.WithTimeout(ctx, opts.MaxWallTime) 643 } 644 645 defer cancel() 646 647 // We set the number of workers to GOMAXPROCS, or the number of shards, 648 // whichever is smaller. 649 workers := runtime.GOMAXPROCS(0) 650 if workers > len(shards) { 651 workers = len(shards) 652 } 653 654 type result struct { 655 priority float64 656 *zoekt.SearchResult 657 err error 658 } 659 660 var ( 661 // buffered channels to continue searching when sending back results 662 // takes a while / blocks. The maximum pending result set is workers * 2. 663 results = make(chan *result, workers) 664 search = make(chan *rankedShard, workers) 665 wg sync.WaitGroup 666 ) 667 668 // Start workers that receive shards from the search channel, search them, 669 // and send the results to the results channel. This process is repeated 670 // until the search channel is closed. 671 // 672 // Note: Making "search" a buffered channel has the effect of limiting the number of parallel shard searches. 673 // Since searching is mostly CPU bound, limiting parallel shard searches also reduces the peak working set. 674 wg.Add(workers) 675 for i := 0; i < workers; i++ { 676 go func() { 677 defer wg.Done() 678 for s := range search { 679 sr, err := searchOneShard(ctx, s, q, opts) 680 r := &result{priority: s.priority, SearchResult: sr, err: err} 681 results <- r 682 } 683 }() 684 } 685 686 go func() { 687 wg.Wait() 688 close(results) 689 }() 690 691 var ( 692 pending = make(prioritySlice, 0, workers) 693 shard = 0 694 next = shards[shard] 695 696 // We need a separate nil-able reference to the same channel so we can close(search) for the worker 697 // go-routines to finish but also set work to nil in order for the select statement below to ignore 698 // that case when we want to stop a search. This is needed because sending on a closed channel panics. 699 work = search 700 ) 701 702 stop := func() { 703 if work != nil { 704 close(search) 705 work = nil 706 next = nil 707 } 708 } 709 710 // tracked so we can stop when we hit TotalMaxMatchCount 711 var totalMatchCount int 712 713search: 714 for { 715 // At the top of each iteration, have the proc associated with this search yield its won "timeslice" 716 // to possibly allow other searches to make progress 717 _ = proc.Yield(ctx) // Note: we let searchOneShard handle context errors 718 719 select { 720 case work <- next: // is there a worker available to search the next shard? 721 pending.append(next.priority) 722 723 shard++ 724 if shard == len(shards) { 725 stop() 726 } else { 727 next = shards[shard] 728 } 729 case r, ok := <-results: // is there a result to send back? 730 if !ok { 731 break search 732 } 733 734 // delete this result's priority from pending before computing the new max pending priority 735 pending.remove(r.priority) 736 737 if r.err != nil { 738 // Set final error and stop searching new shards, but consume any pending 739 // search results. 740 stop() 741 err = r.err 742 continue 743 } 744 745 // Update the match count statistics and stop searching new shards if we've 746 // reached the limit set in the options. 747 totalMatchCount += r.SearchResult.Stats.MatchCount 748 if opts.TotalMaxMatchCount > 0 && totalMatchCount > opts.TotalMaxMatchCount { 749 stop() 750 } 751 752 observeMetrics(r.SearchResult) 753 754 r.Priority = r.priority 755 r.MaxPendingPriority = pending.max() 756 757 sendByRepository(r.SearchResult, opts, sender) // send the result back to the client 758 } 759 } 760 761 return func() { runtime.KeepAlive(shards) }, err 762} 763 764// sendByRepository splits a zoekt.SearchResult by repository and calls 765// sender.Send for each batch. Ranking in Sourcegraph expects zoekt.SearchResult 766// to contain results with the same zoekt.SearchResult.Priority only. 767// 768// We split by repository instead of by priority because it is easier to set 769// RepoURLs and LineFragments in zoekt.SearchResult. 770func sendByRepository(result *zoekt.SearchResult, opts *zoekt.SearchOptions, sender zoekt.Sender) { 771 772 if len(result.RepoURLs) <= 1 || len(result.Files) == 0 { 773 zoekt.SortFiles(result.Files) 774 sender.Send(result) 775 return 776 } 777 778 send := func(repoName string, a, b int, stats zoekt.Stats) { 779 zoekt.SortFiles(result.Files[a:b]) 780 sender.Send(&zoekt.SearchResult{ 781 Stats: stats, 782 Progress: zoekt.Progress{ 783 Priority: result.Files[a].RepositoryPriority, 784 MaxPendingPriority: result.MaxPendingPriority, 785 }, 786 Files: result.Files[a:b], 787 RepoURLs: map[string]string{repoName: result.RepoURLs[repoName]}, 788 LineFragments: map[string]string{repoName: result.LineFragments[repoName]}, 789 }) 790 } 791 792 var startIndex, endIndex int 793 curRepoID := result.Files[0].RepositoryID 794 curRepoName := result.Files[0].Repository 795 796 fm := zoekt.FileMatch{} 797 for endIndex, fm = range result.Files { 798 if curRepoID != fm.RepositoryID { 799 // Stats must stay aggregate-able, hence we sent the aggregate stats with the 800 // last event. 801 send(curRepoName, startIndex, endIndex, zoekt.Stats{}) 802 803 startIndex = endIndex 804 curRepoID = fm.RepositoryID 805 curRepoName = fm.Repository 806 } 807 } 808 809 send(curRepoName, startIndex, endIndex+1, result.Stats) 810} 811 812func observeMetrics(sr *zoekt.SearchResult) { 813 metricSearchContentBytesLoadedTotal.Add(float64(sr.Stats.ContentBytesLoaded)) 814 metricSearchIndexBytesLoadedTotal.Add(float64(sr.Stats.IndexBytesLoaded)) 815 metricSearchCrashesTotal.Add(float64(sr.Stats.Crashes)) 816 metricSearchFileCountTotal.Add(float64(sr.Stats.FileCount)) 817 metricSearchShardFilesConsideredTotal.Add(float64(sr.Stats.ShardFilesConsidered)) 818 metricSearchFilesConsideredTotal.Add(float64(sr.Stats.FilesConsidered)) 819 metricSearchFilesLoadedTotal.Add(float64(sr.Stats.FilesLoaded)) 820 metricSearchFilesSkippedTotal.Add(float64(sr.Stats.FilesSkipped)) 821 metricSearchShardsSkippedTotal.Add(float64(sr.Stats.ShardsSkipped)) 822 metricSearchMatchCountTotal.Add(float64(sr.Stats.MatchCount)) 823 metricSearchNgramMatchesTotal.Add(float64(sr.Stats.NgramMatches)) 824} 825 826func copySlice(src *[]byte) { 827 if *src == nil { 828 return 829 } 830 dst := make([]byte, len(*src)) 831 copy(dst, *src) 832 *src = dst 833} 834 835func copyFiles(sr *zoekt.SearchResult) { 836 for i := range sr.Files { 837 copySlice(&sr.Files[i].Content) 838 copySlice(&sr.Files[i].Checksum) 839 for l := range sr.Files[i].LineMatches { 840 copySlice(&sr.Files[i].LineMatches[l].Line) 841 copySlice(&sr.Files[i].LineMatches[l].Before) 842 copySlice(&sr.Files[i].LineMatches[l].After) 843 } 844 for c := range sr.Files[i].ChunkMatches { 845 copySlice(&sr.Files[i].ChunkMatches[c].Content) 846 } 847 } 848} 849 850func searchOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) { 851 metricSearchShardRunning.Inc() 852 defer func() { 853 metricSearchShardRunning.Dec() 854 if e := recover(); e != nil { 855 log.Printf("crashed shard: %s: %#v, %s", s, e, debug.Stack()) 856 857 if sr == nil { 858 sr = &zoekt.SearchResult{} 859 } 860 sr.Stats.Crashes = 1 861 } 862 }() 863 864 return s.Search(ctx, q, opts) 865} 866 867type shardListResult struct { 868 rl *zoekt.RepoList 869 err error 870} 871 872func listOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.ListOptions, sink chan shardListResult) { 873 metricListShardRunning.Inc() 874 defer func() { 875 metricListShardRunning.Dec() 876 if r := recover(); r != nil { 877 log.Printf("crashed shard: %s: %s, %s", s.String(), r, debug.Stack()) 878 sink <- shardListResult{ 879 &zoekt.RepoList{Crashes: 1}, nil, 880 } 881 } 882 }() 883 884 ms, err := s.List(ctx, q, opts) 885 sink <- shardListResult{ms, err} 886} 887 888func (ss *shardedSearcher) List(ctx context.Context, r query.Q, opts *zoekt.ListOptions) (rl *zoekt.RepoList, err error) { 889 tr, ctx := trace.New(ctx, "shardedSearcher.List", "") 890 tr.LazyLog(r, true) 891 tr.LazyPrintf("opts: %s", opts) 892 metricListRunning.Inc() 893 defer func() { 894 metricListRunning.Dec() 895 if rl != nil { 896 tr.LazyPrintf("repos size: %d", len(rl.Repos)) 897 tr.LazyPrintf("crashes: %d", rl.Crashes) 898 tr.LazyPrintf("minimal size: %d", len(rl.Minimal)) 899 } 900 if err != nil { 901 tr.LazyPrintf("error: %v", err) 902 tr.SetError(err) 903 } 904 tr.Finish() 905 }() 906 907 r = query.Simplify(r) 908 isAll := false 909 if c, ok := r.(*query.Const); ok { 910 isAll = c.Value 911 } 912 913 proc, err := ss.sched.Acquire(ctx) 914 if err != nil { 915 return nil, err 916 } 917 defer proc.Release() 918 tr.LazyPrintf("acquired process") 919 920 loaded := ss.getLoaded() 921 shards := loaded.shards 922 shardCount := len(shards) 923 all := make(chan shardListResult, shardCount) 924 tr.LazyPrintf("shardCount: %d", len(shards)) 925 926 feeder := make(chan zoekt.Searcher, len(shards)) 927 for _, s := range shards { 928 feeder <- s 929 } 930 close(feeder) 931 932 for i := 0; i < runtime.GOMAXPROCS(0); i++ { 933 go func() { 934 for s := range feeder { 935 listOneShard(ctx, s, r, opts, all) 936 } 937 }() 938 } 939 940 stillLoadingCrashes := 0 941 if !loaded.ready { 942 // We may have missed results due to not being fully loaded. 943 stillLoadingCrashes++ 944 } 945 946 agg := zoekt.RepoList{ 947 Crashes: stillLoadingCrashes, 948 Minimal: map[uint32]*zoekt.MinimalRepoListEntry{}, 949 ReposMap: zoekt.ReposMap{}, 950 } 951 952 uniq := map[string]*zoekt.RepoListEntry{} 953 954 for range shards { 955 r := <-all 956 if r.err != nil { 957 return nil, r.err 958 } 959 960 agg.Crashes += r.rl.Crashes 961 agg.Stats.Add(&r.rl.Stats) 962 963 for _, r := range r.rl.Repos { 964 prev, ok := uniq[r.Repository.Name] 965 if !ok { 966 cp := *r // We need to copy because we mutate r.Stats when merging duplicates 967 uniq[r.Repository.Name] = &cp 968 } else { 969 prev.Stats.Add(&r.Stats) 970 } 971 } 972 973 for id, r := range r.rl.Minimal { 974 _, ok := agg.Minimal[id] 975 if !ok { 976 agg.Minimal[id] = r 977 } 978 } 979 980 for id, r := range r.rl.ReposMap { 981 agg.ReposMap[id] = r 982 } 983 } 984 985 agg.Repos = make([]*zoekt.RepoListEntry, 0, len(uniq)) 986 for _, r := range uniq { 987 agg.Repos = append(agg.Repos, r) 988 } 989 990 if isAll && len(agg.Repos) > 0 { 991 reportListAllMetrics(agg.Repos) 992 } 993 994 return &agg, nil 995} 996 997func reportListAllMetrics(repos []*zoekt.RepoListEntry) { 998 var stats zoekt.RepoStats 999 for _, r := range repos { 1000 stats.Add(&r.Stats) 1001 } 1002 1003 metricListAllRepos.Set(float64(stats.Repos)) 1004 metricListAllIndexBytes.Set(float64(stats.IndexBytes)) 1005 metricListAllContentBytes.Set(float64(stats.ContentBytes)) 1006 metricListAllDocuments.Set(float64(stats.Documents)) 1007 metricListAllShards.Set(float64(stats.Shards)) 1008 metricListAllNewLinesCount.Set(float64(stats.NewLinesCount)) 1009 metricListAllDefaultBranchNewLinesCount.Set(float64(stats.DefaultBranchNewLinesCount)) 1010 metricListAllOtherBranchesNewLinesCount.Set(float64(stats.OtherBranchesNewLinesCount)) 1011} 1012 1013// getLoaded returns the currently loaded shards. Shared so do not mutate. 1014func (s *shardedSearcher) getLoaded() loaded { 1015 // next commit will store the true value of this, for now we keep the 1016 // backwards compatible behaviour. 1017 ready := s.ready.Load() 1018 // ranked is loaded after ready to avoid a race were ready is true but 1019 // ranked is still not the final set of shards. 1020 ranked, _ := s.ranked.Load().([]*rankedShard) 1021 return loaded{ 1022 shards: ranked, 1023 ready: ready, 1024 } 1025} 1026 1027func mkRankedShard(s zoekt.Searcher) *rankedShard { 1028 q := query.Const{Value: true} 1029 result, err := s.List(context.Background(), &q, nil) 1030 if err != nil { 1031 return &rankedShard{Searcher: s} 1032 } 1033 if len(result.Repos) == 0 { 1034 return &rankedShard{Searcher: s} 1035 } 1036 1037 var ( 1038 maxPriority float64 1039 repos = make([]*zoekt.Repository, 0, len(result.Repos)) 1040 ) 1041 for i := range result.Repos { 1042 repo := &result.Repos[i].Repository 1043 repos = append(repos, repo) 1044 if repo.RawConfig != nil { 1045 priority, _ := strconv.ParseFloat(repo.RawConfig["priority"], 64) 1046 if priority > maxPriority { 1047 maxPriority = priority 1048 } 1049 } 1050 } 1051 1052 return &rankedShard{ 1053 Searcher: s, 1054 repos: repos, 1055 priority: maxPriority, 1056 } 1057} 1058 1059// markReady should be called once all shards have been passed into replace on 1060// startup. Once s is marked as ready it stops reporting a Crash in the 1061// response Stats. 1062func (s *shardedSearcher) markReady() { 1063 s.ready.CompareAndSwap(false, true) 1064} 1065 1066func (s *shardedSearcher) replace(shards map[string]zoekt.Searcher) { 1067 if len(shards) == 0 { 1068 return 1069 } 1070 1071 defer func(began time.Time) { 1072 metricShardsBatchReplaceDurationSeconds.Observe(time.Since(began).Seconds()) 1073 }(time.Now()) 1074 1075 s.mu.Lock() 1076 defer s.mu.Unlock() 1077 1078 for key, shard := range shards { 1079 var r *rankedShard 1080 if shard != nil { 1081 r = mkRankedShard(shard) 1082 } 1083 1084 old := s.shards[key] 1085 if shard == nil { 1086 delete(s.shards, key) 1087 } else { 1088 s.shards[key] = r 1089 } 1090 1091 if old != nil && old.Searcher != nil { 1092 // _ ___ /^^\ /^\ /^^\_ 1093 // _ _@)@) \ ,,/ '` ~ `'~~ ', `\. 1094 // _/o\_ _ _ _/~`.`...'~\ ./~~..,'`','',.,' ' ~: 1095 // / `,'.~,~.~ . , . , ~|, ,/ .,' , ,. .. ,,. `, ~\_ 1096 // ( ' _' _ '_` _ ' . , `\_/ .' ..' ' ` ` `.. `, \_ 1097 // ~V~ V~ V~ V~ ~\ ` ' . ' , ' .,.,''`.,.''`.,.``. ', \_ 1098 // _/\ /\ /\ /\_/, . ' , `_/~\_ .' .,. ,, , _/~\_ `. `. '., \_ 1099 // < ~ ~ '~`'~'`, ., . `_: ::: \_ ' `_/ ::: \_ `.,' . ', \_ 1100 // \ ' `_ '`_ _ ',/ _::_::_ \ _ _/ _::_::_ \ `.,'.,`., \-,-,-,_,_, 1101 // `'~~ `'~~ `'~~ `'~~ \(_)(_)(_)/ `~~' \(_)(_)(_)/ ~'`\_.._,._,'_;_;_;_;_; 1102 // 1103 // We can't just call Close now, because there may be ongoing searches 1104 // which have old in the shards list. Previously we used an exclusive 1105 // lock to guarantee there were no concurrent searches. However, that 1106 // led to blocking on the read path. 1107 // 1108 // We could introduce granular locking per rankedShard to know when 1109 // there are no more references. However, this becomes tricky in 1110 // practice. Instead we rely on the garbage collector noticing old is no 1111 // longer used. We take care in our searchers to runtime.KeepAlive until 1112 // we have stopped referencing the underling mmap data. 1113 runtime.SetFinalizer(old, func(r *rankedShard) { 1114 r.Close() 1115 }) 1116 } 1117 } 1118 1119 ranked := make([]*rankedShard, 0, len(s.shards)) 1120 for _, r := range s.shards { 1121 ranked = append(ranked, r) 1122 } 1123 1124 sort.Slice(ranked, func(i, j int) bool { 1125 priorityDiff := ranked[i].priority - ranked[j].priority 1126 if priorityDiff != 0 { 1127 return priorityDiff > 0 1128 } 1129 if len(ranked[i].repos) == 0 || len(ranked[j].repos) == 0 { 1130 // Protect against empty names which can happen if we fail to List or 1131 // the shard is full of tombstones. Prefer the shard which has names. 1132 return len(ranked[i].repos) >= len(ranked[j].repos) 1133 } 1134 return ranked[i].repos[0].Name < ranked[j].repos[0].Name 1135 }) 1136 1137 s.ranked.Store(ranked) 1138 1139 metricShardsLoaded.Set(float64(len(ranked))) 1140} 1141 1142func loadShard(fn string) (zoekt.Searcher, error) { 1143 f, err := os.Open(fn) 1144 if err != nil { 1145 return nil, err 1146 } 1147 1148 iFile, err := zoekt.NewIndexFile(f) 1149 if err != nil { 1150 return nil, err 1151 } 1152 s, err := zoekt.NewSearcher(iFile) 1153 if err != nil { 1154 iFile.Close() 1155 return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err) 1156 } 1157 1158 return s, nil 1159} 1160 1161// prioritySlice is a trivial implementation of an array that provides three 1162// things: appending a value, removing a value, and getting the array's max. 1163// Operations take O(n) time, which is acceptable because N is restricted to 1164// GOMAXPROCS (i.e., number of cpu cores) by the shardedSearcher interface. 1165type prioritySlice []float64 1166 1167func (p *prioritySlice) append(pri float64) { 1168 *p = append(*p, pri) 1169} 1170 1171func (p *prioritySlice) remove(pri float64) { 1172 for i, opri := range *p { 1173 if opri == pri { 1174 if i != len(*p)-1 { 1175 // swap to make this element the tail 1176 (*p)[i] = (*p)[len(*p)-1] 1177 } 1178 // pop the end off 1179 *p = (*p)[:len(*p)-1] 1180 break 1181 } 1182 } 1183} 1184 1185func (p *prioritySlice) max() float64 { 1186 // remove() and max() could be combined, but this is easier to read and 1187 // the expected performance difference from the extra lock and loop is 1188 // almost certainly irrelevant. 1189 maxPri := math.Inf(-1) 1190 for _, pri := range *p { 1191 if pri > maxPri { 1192 maxPri = pri 1193 } 1194 } 1195 return maxPri 1196}