fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package search 16 17import ( 18 "context" 19 "fmt" 20 "log" 21 "math" 22 "os" 23 "runtime" 24 "runtime/debug" 25 "slices" 26 "sort" 27 "strconv" 28 "sync" 29 "time" 30 31 "github.com/prometheus/client_golang/prometheus" 32 "github.com/prometheus/client_golang/prometheus/promauto" 33 "go.uber.org/atomic" 34 "golang.org/x/sync/semaphore" 35 36 "github.com/sourcegraph/zoekt" 37 "github.com/sourcegraph/zoekt/index" 38 "github.com/sourcegraph/zoekt/internal/tenant/systemtenant" 39 "github.com/sourcegraph/zoekt/internal/trace" 40 "github.com/sourcegraph/zoekt/query" 41) 42 43var ( 44 metricShardsLoaded = promauto.NewGauge(prometheus.GaugeOpts{ 45 Name: "zoekt_shards_loaded", 46 Help: "The number of shards currently loaded", 47 }) 48 metricShardsLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 49 Name: "zoekt_shards_loaded_total", 50 Help: "The total number of shards loaded", 51 }) 52 metricShardsLoadFailedTotal = promauto.NewCounter(prometheus.CounterOpts{ 53 Name: "zoekt_shards_load_failed_total", 54 Help: "The total number of shard loads that failed", 55 }) 56 57 metricSearchRunning = promauto.NewGauge(prometheus.GaugeOpts{ 58 Name: "zoekt_search_running", 59 Help: "The number of concurrent search requests running", 60 }) 61 metricSearchShardRunning = promauto.NewGauge(prometheus.GaugeOpts{ 62 Name: "zoekt_search_shard_running", 63 Help: "The number of concurrent search requests in a shard running", 64 }) 65 metricSearchFailedTotal = promauto.NewCounter(prometheus.CounterOpts{ 66 Name: "zoekt_search_failed_total", 67 Help: "The total number of search requests that failed", 68 }) 69 metricSearchDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 70 Name: "zoekt_search_duration_seconds", 71 Help: "The duration a search request took in seconds", 72 Buckets: prometheus.DefBuckets, // DefBuckets good for service timings 73 }) 74 75 // A Counter per Stat. Name should match field in zoekt.Stats. 76 metricSearchContentBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 77 Name: "zoekt_search_content_loaded_bytes_total", 78 Help: "Total amount of I/O for reading contents", 79 }) 80 metricSearchIndexBytesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 81 Name: "zoekt_search_index_loaded_bytes_total", 82 Help: "Total amount of I/O for reading from index", 83 }) 84 metricSearchCrashesTotal = promauto.NewCounter(prometheus.CounterOpts{ 85 Name: "zoekt_search_crashes_total", 86 Help: "Total number of search shards that had a crash", 87 }) 88 metricSearchFileCountTotal = promauto.NewCounter(prometheus.CounterOpts{ 89 Name: "zoekt_search_file_count_total", 90 Help: "Total number of files containing a match", 91 }) 92 metricSearchShardFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{ 93 Name: "zoekt_search_shard_files_considered_total", 94 Help: "Total number of files in shards that we considered", 95 }) 96 metricSearchFilesConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{ 97 Name: "zoekt_search_files_considered_total", 98 Help: "Total files that we evaluated. Equivalent to files for which all atom matches (including negations) evaluated to true", 99 }) 100 metricSearchFilesLoadedTotal = promauto.NewCounter(prometheus.CounterOpts{ 101 Name: "zoekt_search_files_loaded_total", 102 Help: "Total files for which we loaded file content to verify substring matches", 103 }) 104 metricSearchFilesSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{ 105 Name: "zoekt_search_files_skipped_total", 106 Help: "Total candidate files whose contents weren't examined because we gathered enough matches", 107 }) 108 metricSearchShardsSkippedTotal = promauto.NewCounter(prometheus.CounterOpts{ 109 Name: "zoekt_search_shards_skipped_total", 110 Help: "Total shards that we did not process because a query was canceled", 111 }) 112 metricSearchMatchCountTotal = promauto.NewCounter(prometheus.CounterOpts{ 113 Name: "zoekt_search_match_count_total", 114 Help: "Total number of non-overlapping matches", 115 }) 116 metricSearchNgramMatchesTotal = promauto.NewCounter(prometheus.CounterOpts{ 117 Name: "zoekt_search_ngram_matches_total", 118 Help: "Total number of candidate matches as a result of searching ngrams", 119 }) 120 metricSearchNgramLookupsTotal = promauto.NewCounter(prometheus.CounterOpts{ 121 Name: "zoekt_search_ngram_lookups_total", 122 Help: "Total number of times we accessed an ngram in the index", 123 }) 124 metricSearchRegexpsConsideredTotal = promauto.NewCounter(prometheus.CounterOpts{ 125 Name: "zoekt_search_regexps_considered_total", 126 Help: "Total number of times regexp was called on files that we evaluated", 127 }) 128 129 metricListRunning = promauto.NewGauge(prometheus.GaugeOpts{ 130 Name: "zoekt_list_running", 131 Help: "The number of concurrent list requests running", 132 }) 133 metricListShardRunning = promauto.NewGauge(prometheus.GaugeOpts{ 134 Name: "zoekt_list_shard_running", 135 Help: "The number of concurrent list requests in a shard running", 136 }) 137 metricShardsBatchReplaceDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{ 138 Name: "zoekt_shards_batch_replace_duration_seconds", 139 Help: "The time it takes to replace a batch of Searchers.", 140 Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30}, 141 }) 142 metricListAllRepos = promauto.NewGauge(prometheus.GaugeOpts{ 143 Name: "zoekt_list_all_stats_repos", 144 Help: "The last List(true) value for RepoStats.Repos. Repos is used for aggregrating the number of repositories.", 145 }) 146 metricListAllShards = promauto.NewGauge(prometheus.GaugeOpts{ 147 Name: "zoekt_list_all_stats_shards", 148 Help: "The last List(true) value for RepoStats.Shards. Shards is the total number of search shards.", 149 }) 150 metricListAllDocuments = promauto.NewGauge(prometheus.GaugeOpts{ 151 Name: "zoekt_list_all_stats_documents", 152 Help: "The last List(true) value for RepoStats.Documents. Documents holds the number of documents or files.", 153 }) 154 metricListAllIndexBytes = promauto.NewGauge(prometheus.GaugeOpts{ 155 Name: "zoekt_list_all_stats_index_bytes", 156 Help: "The last List(true) value for RepoStats.IndexBytes. IndexBytes is the amount of RAM used for index overhead.", 157 }) 158 metricListAllContentBytes = promauto.NewGauge(prometheus.GaugeOpts{ 159 Name: "zoekt_list_all_stats_content_bytes", 160 Help: "The last List(true) value for RepoStats.ContentBytes. ContentBytes is the amount of RAM used for raw content.", 161 }) 162 metricListAllNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 163 Name: "zoekt_list_all_stats_new_lines_count", 164 Help: "The last List(true) value for RepoStats.NewLinesCount.", 165 }) 166 metricListAllDefaultBranchNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 167 Name: "zoekt_list_all_stats_default_branch_new_lines_count", 168 Help: "The last List(true) value for RepoStats.DefaultBranchNewLinesCount.", 169 }) 170 metricListAllOtherBranchesNewLinesCount = promauto.NewGauge(prometheus.GaugeOpts{ 171 Name: "zoekt_list_all_stats_other_branches_new_lines_count", 172 Help: "The last List(true) value for RepoStats.OtherBranchesNewLinesCount.", 173 }) 174) 175 176type rankedShard struct { 177 zoekt.Searcher 178 179 priority float64 // maximum priority across all repos in the shard 180 181 // We have out of band ranking on compound shards which can change even if 182 // the shard file does not. So we compute a rank in getShards. We store 183 // repos here to avoid the cost of List in the search request path. 184 // 185 // repos is nil only if that call failed. 186 repos []*zoekt.Repository 187} 188 189// loaded stores the state we compute when updating the state of shards from 190// disk. 191type loaded struct { 192 // shards is the currently loaded shards sorted by decreasing rank and 193 // should not be mutated. 194 shards []*rankedShard 195 196 // ready is true if sharded searcher has finished loading all initial 197 // shards on startup. 198 ready bool 199} 200 201type shardedSearcher struct { 202 // Limit the number of parallel queries. Since searching is 203 // CPU bound, we can't do better than #CPU queries in 204 // parallel. If we do so, we just create more memory 205 // pressure. 206 sched scheduler 207 208 mu sync.Mutex // protects writes to shards 209 shards map[string]*rankedShard 210 211 ready atomic.Bool 212 ranked atomic.Value 213} 214 215func newShardedSearcher(n int64) *shardedSearcher { 216 ss := &shardedSearcher{ 217 shards: make(map[string]*rankedShard), 218 sched: newScheduler(n), 219 } 220 return ss 221} 222 223// NewDirectorySearcher returns a searcher instance that loads all 224// shards corresponding to a glob into memory. 225func NewDirectorySearcher(dir string) (zoekt.Streamer, error) { 226 return newDirectorySearcher(dir, true) 227} 228 229// NewDirectorySearcherFast is like NewDirectorySearcher, but does not block 230// on the initial loading of shards. 231// 232// This exists since in the case of zoekt-webserver we are happy with having 233// partial availability since that is better than no availability on large 234// instances. 235func NewDirectorySearcherFast(dir string) (zoekt.Streamer, error) { 236 return newDirectorySearcher(dir, false) 237} 238 239func newDirectorySearcher(dir string, waitUntilReady bool) (zoekt.Streamer, error) { 240 ss := newShardedSearcher(int64(runtime.GOMAXPROCS(0))) 241 tl := &loader{ 242 ss: ss, 243 } 244 dw, err := newDirectoryWatcher(dir, tl) 245 if err != nil { 246 return nil, err 247 } 248 249 if waitUntilReady { 250 if err := dw.WaitUntilReady(); err != nil { 251 return nil, err 252 } 253 } 254 255 ds := &directorySearcher{ 256 Streamer: ss, 257 directoryWatcher: dw, 258 } 259 260 return &typeRepoSearcher{Streamer: ds}, nil 261} 262 263type directorySearcher struct { 264 zoekt.Streamer 265 266 directoryWatcher *DirectoryWatcher 267} 268 269func (s *directorySearcher) Close() { 270 // We need to Stop directoryWatcher first since it calls load/unload on 271 // Searcher. 272 s.directoryWatcher.Stop() 273 s.Streamer.Close() 274} 275 276type loader struct { 277 ss *shardedSearcher 278} 279 280func (tl *loader) load(keys ...string) { 281 // This is called with all keys on startup, so once this function has 282 // finished running shardedSearcher will be ready. 283 defer tl.ss.markReady() 284 285 if len(keys) == 0 { 286 // If there's nothing to load, we exit early here, but we want to mark 287 // ourselves as ready. 288 return 289 } 290 291 var ( 292 mu sync.Mutex // synchronizes writes to the shards map 293 wg sync.WaitGroup // used to wait for all shards to load 294 sem = semaphore.NewWeighted(int64(runtime.GOMAXPROCS(0))) 295 loadedShards = make(map[string]zoekt.Searcher) 296 ) 297 298 publishLoaded := func() { 299 mu.Lock() 300 chunk := loadedShards 301 loadedShards = make(map[string]zoekt.Searcher) 302 mu.Unlock() 303 tl.ss.replace(chunk) 304 } 305 306 log.Printf("[INFO] loading %d shard(s): %s", len(keys), humanTruncateList(keys, 5)) 307 308 lastProgress := time.Now() 309 for i, key := range keys { 310 // If taking a while to start-up occasionally give a progress message 311 if time.Since(lastProgress) > 5*time.Second { 312 log.Printf("[INFO] still need to load %d shards...", len(keys)-i) 313 lastProgress = time.Now() 314 315 publishLoaded() 316 } 317 318 _ = sem.Acquire(context.Background(), 1) 319 wg.Add(1) 320 321 go func(key string) { 322 defer sem.Release(1) 323 defer wg.Done() 324 325 shard, err := loadShard(key) 326 if err != nil { 327 metricShardsLoadFailedTotal.Inc() 328 log.Printf("[ERROR] reloading: %s, err %v ", key, err) 329 return 330 } 331 metricShardsLoadedTotal.Inc() 332 333 mu.Lock() 334 loadedShards[key] = shard 335 mu.Unlock() 336 }(key) 337 } 338 339 wg.Wait() 340 341 publishLoaded() 342} 343 344func (tl *loader) drop(keys ...string) { 345 shards := make(map[string]zoekt.Searcher, len(keys)) 346 for _, key := range keys { 347 shards[key] = nil 348 } 349 tl.ss.replace(shards) 350} 351 352func (ss *shardedSearcher) String() string { 353 return "shardedSearcher" 354} 355 356// Close closes references to open files. It may be called only once. 357func (ss *shardedSearcher) Close() { 358 ss.mu.Lock() 359 shards := make(map[string]zoekt.Searcher, len(ss.shards)) 360 for k := range ss.shards { 361 shards[k] = nil 362 } 363 ss.mu.Unlock() 364 365 ss.replace(shards) 366} 367 368func selectRepoSet(shards []*rankedShard, q query.Q) ([]*rankedShard, query.Q) { 369 and, ok := q.(*query.And) 370 if ok { 371 return doSelectRepoSet(shards, and) 372 } 373 374 // We have queries which look like (reposet ...) and we want to do the same 375 // optimizations. To simplify we just always wrap the query in And and then 376 // on the return value call Simplify to unwrap. In particular this is 377 // important for List calls. 378 and = &query.And{Children: []query.Q{q}} 379 shards, q = doSelectRepoSet(shards, and) 380 return shards, query.Simplify(q) 381} 382 383func doSelectRepoSet(shards []*rankedShard, and *query.And) ([]*rankedShard, query.Q) { 384 // (and (reposet ...) (q)) 385 // (and true (q)) with a filtered shards 386 // (and false) // noop 387 388 // (and (repobranches ...) (q)) 389 // (and (repobranches ...) (q)) 390 391 // Note: we also support (and (repo ...) (q)) even though sourcegraph does 392 // not generate those sorts of queries. This is to support manual testing. 393 394 hasReposForPredicate := func(pred func(repo *zoekt.Repository) bool) func(repos []*zoekt.Repository) (any, all bool) { 395 return func(repos []*zoekt.Repository) (any, all bool) { 396 any = false 397 all = true 398 for _, repo := range repos { 399 b := pred(repo) 400 any = any || b 401 all = all && b 402 } 403 return any, all 404 } 405 } 406 407 for i, c := range and.Children { 408 var setSize int 409 var hasRepos func([]*zoekt.Repository) (bool, bool) 410 switch setQuery := c.(type) { 411 case *query.RepoSet: 412 setSize = len(setQuery.Set) 413 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 414 return setQuery.Set[repo.Name] 415 }) 416 case *query.RepoIDs: 417 setSize = int(setQuery.Repos.GetCardinality()) 418 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 419 return setQuery.Repos.Contains(repo.ID) 420 }) 421 case *query.Repo: 422 setSize = 0 423 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 424 return setQuery.Regexp.MatchString(repo.Name) 425 }) 426 case *query.BranchesRepos: 427 for _, br := range setQuery.List { 428 setSize += int(br.Repos.GetCardinality()) 429 } 430 431 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 432 for _, br := range setQuery.List { 433 if br.Repos.Contains(repo.ID) { 434 return true 435 } 436 } 437 return false 438 }) 439 case *query.Meta: 440 // Meta queries filter repositories based on metadata fields. 441 // By checking this at the shard level, we can skip entire shards 442 // that don't contain any matching repositories, avoiding expensive 443 // I/O operations. 444 setSize = 0 // Unknown size, we'll filter based on metadata 445 hasRepos = hasReposForPredicate(func(repo *zoekt.Repository) bool { 446 if repo.Metadata == nil { 447 return false 448 } 449 v, ok := repo.Metadata[setQuery.Field] 450 if !ok { 451 return false 452 } 453 return setQuery.Value.MatchString(v) 454 }) 455 default: 456 continue 457 } 458 459 // setSize may be larger than the number of shards we have. The size of 460 // filtered is bounded by min(len(set), len(shards)) 461 if setSize > len(shards) { 462 setSize = len(shards) 463 } 464 465 filtered := make([]*rankedShard, 0, setSize) 466 filteredAll := true 467 468 for _, s := range shards { 469 if s.repos == nil { 470 // repos is nil if we failed to List the shard. This shouldn't 471 // happen, but if it does we don't know what is in it and must search 472 // it without simplifying the query. 473 filtered = append(filtered, s) 474 filteredAll = false 475 } else if any, all := hasRepos(s.repos); any { 476 filtered = append(filtered, s) 477 filteredAll = filteredAll && all 478 } 479 } 480 481 // We don't need to adjust the query since we are returning an empty set 482 // of shards to search. 483 if len(filtered) == 0 { 484 return filtered, and 485 } 486 487 // We can't simplify the query since we are searching shards which contain 488 // repos we aren't supposed to search. 489 if !filteredAll { 490 return filtered, and 491 } 492 493 // We don't want to mutate the original and, so we clone it before 494 // mutating it. 495 and = &query.And{Children: slices.Clone(and.Children)} 496 497 // This optimization allows us to avoid the work done by 498 // indexData.simplify for each shard. 499 // 500 // For example if our query is (and (reposet foo bar) (content baz)) 501 // then at this point filtered is [foo bar] and q is the same. For each 502 // shard indexData.simplify will simplify to (and true (content baz)) -> 503 // (content baz). This work can be done now once, rather than per shard. 504 switch c := c.(type) { 505 case *query.RepoSet, *query.RepoIDs, *query.Repo, *query.Meta: 506 and.Children[i] = &query.Const{Value: true} 507 return filtered, query.Simplify(and) 508 509 case *query.BranchesRepos: 510 // We can only replace if all the repos want the same branches. We 511 // simplify and just check that we are requesting 1 branch. The common 512 // case is just asking for HEAD, so this should be effective. 513 if len(c.List) != 1 { 514 return filtered, and 515 } 516 517 // Every repo wants the same branches, so we can replace RepoBranches 518 // with a list of branch queries. 519 and.Children[i] = &query.Branch{Pattern: c.List[0].Branch, Exact: true} 520 return filtered, query.Simplify(and) 521 } 522 523 // Stop after first RepoSet, otherwise we might append duplicate 524 // shards to `filtered` 525 return filtered, and 526 } 527 528 return shards, and 529} 530 531func (ss *shardedSearcher) Search(ctx context.Context, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) { 532 tr, ctx := trace.New(ctx, "shardedSearcher.Search", "") 533 tr.LazyLog(q, true) 534 tr.LazyPrintf("opts: %+v", opts) 535 defer func() { 536 if sr != nil { 537 tr.LazyPrintf("num files: %d", len(sr.Files)) 538 tr.LazyPrintf("stats: %+v", sr.Stats) 539 } 540 if err != nil { 541 tr.LazyPrintf("error: %v", err) 542 tr.SetError(err) 543 } 544 tr.Finish() 545 }() 546 ctx, cancel := context.WithCancel(ctx) 547 defer cancel() 548 549 collectSender := newCollectSender(opts) 550 551 start := time.Now() 552 proc, err := ss.sched.Acquire(ctx) 553 if err != nil { 554 return nil, err 555 } 556 defer proc.Release() 557 tr.LazyPrintf("acquired process") 558 559 wait := time.Since(start) 560 start = time.Now() 561 562 loaded := ss.getLoaded() 563 done, err := streamSearch(ctx, proc, q, opts, loaded.shards, collectSender) 564 defer done() 565 if err != nil { 566 return nil, err 567 } 568 569 aggregate, ok := collectSender.Done() 570 if !ok { 571 aggregate = &zoekt.SearchResult{ 572 RepoURLs: map[string]string{}, 573 LineFragments: map[string]string{}, 574 } 575 } 576 577 copyFiles(aggregate) 578 579 if !loaded.ready { 580 // We may have missed results due to not being fully loaded. 581 aggregate.Stats.Crashes++ 582 } 583 584 aggregate.Stats.Wait = wait 585 aggregate.Stats.Duration = time.Since(start) 586 587 return aggregate, nil 588} 589 590func (ss *shardedSearcher) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, sender zoekt.Sender) (err error) { 591 tr, ctx := trace.New(ctx, "shardedSearcher.StreamSearch", "") 592 defer func() { 593 if err != nil { 594 tr.LazyPrintf("error: %v", err) 595 tr.SetError(err) 596 } 597 tr.Finish() 598 }() 599 600 start := time.Now() 601 proc, err := ss.sched.Acquire(ctx) 602 if err != nil { 603 return err 604 } 605 defer proc.Release() 606 tr.LazyPrintf("acquired process") 607 608 loaded := ss.getLoaded() 609 shards := loaded.shards 610 611 maxPendingPriority := math.Inf(-1) 612 if len(shards) > 0 { 613 maxPendingPriority = shards[0].priority 614 } 615 616 stillLoadingCrashes := 0 617 if !loaded.ready { 618 // We may have missed results due to not being fully loaded. 619 stillLoadingCrashes++ 620 } 621 622 sender.Send(&zoekt.SearchResult{ 623 Stats: zoekt.Stats{ 624 Crashes: stillLoadingCrashes, 625 Wait: time.Since(start), 626 }, 627 Progress: zoekt.Progress{ 628 MaxPendingPriority: maxPendingPriority, 629 }, 630 }) 631 632 // Matches flow from the shards up the stack in the following order: 633 // 634 // 1. Search shards 635 // 2. flushCollectSender (aggregate) 636 // 3. limitSender (limit) 637 // 4. copyFileSender (copy) 638 // 639 // For streaming, the wrapping has to happen in the inverted order. 640 sender = copyFileSender(sender) 641 642 if truncator, hasLimits := index.NewDisplayTruncator(opts); hasLimits { 643 var cancel context.CancelFunc 644 ctx, cancel = context.WithCancel(ctx) 645 defer cancel() 646 sender = limitSender(cancel, sender, truncator) 647 } 648 649 sender, flush := newFlushCollectSender(opts, sender) 650 651 done, err := streamSearch(ctx, proc, q, opts, shards, sender) 652 653 // Even though streaming is done, we may have results sitting in a buffer we 654 // need to flush. So we need to send those before calling done. 655 flush() 656 done() 657 658 return err 659} 660 661// streamSearch is an internal helper since both Search and StreamSearch are 662// largely similar. 663// 664// done must always be called, even if err is non-nil. The SearchResults sent 665// via sender contain references to the underlying mmap data that the garbage 666// collector can't see. Calling done informs the garbage collector it is free 667// to collect those shards. The caller must call copyFiles on any 668// SearchResults it returns/streams out before calling done. 669func streamSearch(ctx context.Context, proc *process, q query.Q, opts *zoekt.SearchOptions, shards []*rankedShard, sender zoekt.Sender) (done func(), err error) { 670 tr, ctx := trace.New(ctx, "shardedSearcher.streamSearch", "") 671 overallStart := time.Now() 672 metricSearchRunning.Inc() 673 defer func() { 674 metricSearchRunning.Dec() 675 metricSearchDuration.Observe(time.Since(overallStart).Seconds()) 676 if err != nil { 677 metricSearchFailedTotal.Inc() 678 679 tr.LazyPrintf("error: %v", err) 680 tr.SetError(err) 681 } 682 tr.Finish() 683 }() 684 685 // Select the subset of shards that we will search over for the given query. 686 { 687 beforeLen := len(shards) 688 beforeQ := q 689 shards, q = selectRepoSet(shards, q) 690 tr.LazyPrintf("selectRepoSet shards=%d->%d q=%s->%s", beforeLen, len(shards), beforeQ, q) 691 } 692 693 if len(shards) == 0 { 694 return func() {}, nil 695 } 696 697 var cancel context.CancelFunc 698 if opts.MaxWallTime == 0 { 699 ctx, cancel = context.WithCancel(ctx) 700 } else { 701 ctx, cancel = context.WithTimeout(ctx, opts.MaxWallTime) 702 } 703 704 defer cancel() 705 706 // We set the number of workers to GOMAXPROCS, or the number of shards, 707 // whichever is smaller. 708 workers := min(runtime.GOMAXPROCS(0), len(shards)) 709 710 type result struct { 711 priority float64 712 *zoekt.SearchResult 713 err error 714 } 715 716 var ( 717 // buffered channels to continue searching when sending back results 718 // takes a while / blocks. The maximum pending result set is workers * 2. 719 results = make(chan *result, workers) 720 search = make(chan *rankedShard, workers) 721 wg sync.WaitGroup 722 ) 723 724 // Start workers that receive shards from the search channel, search them, 725 // and send the results to the results channel. This process is repeated 726 // until the search channel is closed. 727 // 728 // Note: Making "search" a buffered channel has the effect of limiting the number of parallel shard searches. 729 // Since searching is mostly CPU bound, limiting parallel shard searches also reduces the peak working set. 730 wg.Add(workers) 731 for range workers { 732 go func() { 733 defer wg.Done() 734 for s := range search { 735 sr, err := searchOneShard(ctx, s, q, opts) 736 r := &result{priority: s.priority, SearchResult: sr, err: err} 737 results <- r 738 } 739 }() 740 } 741 742 go func() { 743 wg.Wait() 744 close(results) 745 }() 746 747 var ( 748 pending = make(prioritySlice, 0, workers) 749 shard = 0 750 next = shards[shard] 751 752 // We need a separate nil-able reference to the same channel so we can close(search) for the worker 753 // go-routines to finish but also set work to nil in order for the select statement below to ignore 754 // that case when we want to stop a search. This is needed because sending on a closed channel panics. 755 work = search 756 ) 757 758 stop := func() { 759 if work != nil { 760 close(search) 761 work = nil 762 next = nil 763 } 764 } 765 766 // tracked so we can stop when we hit TotalMaxMatchCount 767 var totalMatchCount int 768 769search: 770 for { 771 // At the top of each iteration, have the proc associated with this search yield its won "timeslice" 772 // to possibly allow other searches to make progress 773 _ = proc.Yield(ctx) // Note: we let searchOneShard handle context errors 774 775 select { 776 case work <- next: // is there a worker available to search the next shard? 777 pending.append(next.priority) 778 779 shard++ 780 if shard == len(shards) { 781 stop() 782 } else { 783 next = shards[shard] 784 } 785 case r, ok := <-results: // is there a result to send back? 786 if !ok { 787 break search 788 } 789 790 // delete this result's priority from pending before computing the new max pending priority 791 pending.remove(r.priority) 792 793 if r.err != nil { 794 // Set final error and stop searching new shards, but consume any pending 795 // search results. 796 stop() 797 err = r.err 798 continue 799 } 800 801 // Update the match count statistics and stop searching new shards if we've 802 // reached the limit set in the options. 803 totalMatchCount += r.SearchResult.Stats.MatchCount 804 if opts.TotalMaxMatchCount > 0 && totalMatchCount > opts.TotalMaxMatchCount { 805 stop() 806 } 807 808 observeMetrics(r.SearchResult) 809 810 r.Priority = r.priority 811 r.MaxPendingPriority = pending.max() 812 813 sendByRepository(r.SearchResult, opts, sender) // send the result back to the client 814 } 815 } 816 817 return func() { runtime.KeepAlive(shards) }, err 818} 819 820// sendByRepository splits a zoekt.SearchResult by repository and calls 821// sender.Send for each batch. Ranking in Sourcegraph expects zoekt.SearchResult 822// to contain results with the same zoekt.SearchResult.priority only. 823// 824// We split by repository instead of by priority because it is easier to set 825// RepoURLs and LineFragments in zoekt.SearchResult. 826func sendByRepository(result *zoekt.SearchResult, opts *zoekt.SearchOptions, sender zoekt.Sender) { 827 if len(result.RepoURLs) <= 1 || len(result.Files) == 0 { 828 index.SortFiles(result.Files) 829 sender.Send(result) 830 return 831 } 832 833 send := func(repoName string, a, b int, stats zoekt.Stats) { 834 index.SortFiles(result.Files[a:b]) 835 836 filteredRepoURLs := map[string]string{repoName: result.RepoURLs[repoName]} 837 filteredLineFragments := map[string]string{repoName: result.LineFragments[repoName]} 838 839 // Filter RepoURLs and LineFragments to only those of repoName and its 840 // subRepositories if there are subRepositories 841 for _, file := range result.Files[a:b] { 842 name := file.SubRepositoryName 843 if name == "" { 844 continue 845 } 846 _, repoSet := filteredRepoURLs[name] 847 url, ok := result.RepoURLs[name] 848 if !repoSet && ok { 849 filteredRepoURLs[name] = url 850 } 851 _, fragSet := filteredLineFragments[name] 852 frag, ok := result.LineFragments[name] 853 if !fragSet && ok { 854 filteredLineFragments[name] = frag 855 } 856 } 857 858 sender.Send(&zoekt.SearchResult{ 859 Stats: stats, 860 Progress: zoekt.Progress{ 861 Priority: result.Files[a].RepositoryPriority, 862 MaxPendingPriority: result.MaxPendingPriority, 863 }, 864 Files: result.Files[a:b], 865 RepoURLs: filteredRepoURLs, 866 LineFragments: filteredLineFragments, 867 }) 868 } 869 870 var startIndex, endIndex int 871 curRepoID := result.Files[0].RepositoryID 872 curRepoName := result.Files[0].Repository 873 874 fm := zoekt.FileMatch{} 875 for endIndex, fm = range result.Files { 876 if curRepoID != fm.RepositoryID { 877 // Stats must stay aggregate-able, hence we sent the aggregate stats with the 878 // last event. 879 send(curRepoName, startIndex, endIndex, zoekt.Stats{}) 880 881 startIndex = endIndex 882 curRepoID = fm.RepositoryID 883 curRepoName = fm.Repository 884 } 885 } 886 887 send(curRepoName, startIndex, endIndex+1, result.Stats) 888} 889 890func observeMetrics(sr *zoekt.SearchResult) { 891 metricSearchContentBytesLoadedTotal.Add(float64(sr.Stats.ContentBytesLoaded)) 892 metricSearchIndexBytesLoadedTotal.Add(float64(sr.Stats.IndexBytesLoaded)) 893 metricSearchCrashesTotal.Add(float64(sr.Stats.Crashes)) 894 metricSearchFileCountTotal.Add(float64(sr.Stats.FileCount)) 895 metricSearchShardFilesConsideredTotal.Add(float64(sr.Stats.ShardFilesConsidered)) 896 metricSearchFilesConsideredTotal.Add(float64(sr.Stats.FilesConsidered)) 897 metricSearchFilesLoadedTotal.Add(float64(sr.Stats.FilesLoaded)) 898 metricSearchFilesSkippedTotal.Add(float64(sr.Stats.FilesSkipped)) 899 metricSearchShardsSkippedTotal.Add(float64(sr.Stats.ShardsSkipped)) 900 metricSearchMatchCountTotal.Add(float64(sr.Stats.MatchCount)) 901 metricSearchNgramMatchesTotal.Add(float64(sr.Stats.NgramMatches)) 902 metricSearchNgramLookupsTotal.Add(float64(sr.Stats.NgramLookups)) 903 metricSearchRegexpsConsideredTotal.Add(float64(sr.Stats.RegexpsConsidered)) 904} 905 906func copySlice(src *[]byte) { 907 if *src == nil { 908 return 909 } 910 dst := make([]byte, len(*src)) 911 copy(dst, *src) 912 *src = dst 913} 914 915func copyFiles(sr *zoekt.SearchResult) { 916 for i := range sr.Files { 917 copySlice(&sr.Files[i].Content) 918 copySlice(&sr.Files[i].Checksum) 919 for l := range sr.Files[i].LineMatches { 920 copySlice(&sr.Files[i].LineMatches[l].Line) 921 copySlice(&sr.Files[i].LineMatches[l].Before) 922 copySlice(&sr.Files[i].LineMatches[l].After) 923 } 924 for c := range sr.Files[i].ChunkMatches { 925 copySlice(&sr.Files[i].ChunkMatches[c].Content) 926 } 927 } 928} 929 930func searchOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) { 931 metricSearchShardRunning.Inc() 932 defer func() { 933 metricSearchShardRunning.Dec() 934 if e := recover(); e != nil { 935 log.Printf("[ERROR] crashed shard: %s: %#v, %s", s, e, debug.Stack()) 936 937 if sr == nil { 938 sr = &zoekt.SearchResult{} 939 } 940 sr.Stats.Crashes = 1 941 } 942 }() 943 944 return s.Search(ctx, q, opts) 945} 946 947type shardListResult struct { 948 rl *zoekt.RepoList 949 err error 950} 951 952func listOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.ListOptions, sink chan shardListResult) { 953 metricListShardRunning.Inc() 954 defer func() { 955 metricListShardRunning.Dec() 956 if r := recover(); r != nil { 957 log.Printf("[ERROR] crashed shard: %s: %s, %s", s.String(), r, debug.Stack()) 958 sink <- shardListResult{ 959 &zoekt.RepoList{Crashes: 1}, nil, 960 } 961 } 962 }() 963 964 ms, err := s.List(ctx, q, opts) 965 sink <- shardListResult{ms, err} 966} 967 968func (ss *shardedSearcher) List(ctx context.Context, q query.Q, opts *zoekt.ListOptions) (rl *zoekt.RepoList, err error) { 969 tr, ctx := trace.New(ctx, "shardedSearcher.List", "") 970 metricListRunning.Inc() 971 defer func() { 972 metricListRunning.Dec() 973 if rl != nil { 974 tr.LazyPrintf("repos.size=%d reposmap.size=%d crashes=%d stats=%+v", len(rl.Repos), len(rl.ReposMap), rl.Crashes, rl.Stats) 975 } 976 if err != nil { 977 tr.LazyPrintf("error: %v", err) 978 tr.SetError(err) 979 } 980 tr.Finish() 981 }() 982 983 q = query.Simplify(q) 984 isAll := false 985 if c, ok := q.(*query.Const); ok { 986 isAll = c.Value 987 } 988 989 proc, err := ss.sched.Acquire(ctx) 990 if err != nil { 991 return nil, err 992 } 993 defer proc.Release() 994 tr.LazyPrintf("acquired process") 995 996 loaded := ss.getLoaded() 997 shards := loaded.shards 998 999 // Setup what we return now, since we may short circuit if there are no 1000 // shards to search. 1001 stillLoadingCrashes := 0 1002 if !loaded.ready { 1003 // We may have missed results due to not being fully loaded. 1004 stillLoadingCrashes++ 1005 } 1006 agg := zoekt.RepoList{ 1007 Crashes: stillLoadingCrashes, 1008 ReposMap: zoekt.ReposMap{}, 1009 Repos: []*zoekt.RepoListEntry{}, 1010 } 1011 1012 // PERF: Select the subset of shards that we will search over for the given 1013 // query. A common List query only asks for a specific repo, so this is an 1014 // important optimization. 1015 { 1016 beforeLen := len(shards) 1017 beforeQ := q 1018 shards, q = selectRepoSet(shards, q) 1019 tr.LazyPrintf("selectRepoSet shards=%d->%d q=%s->%s", beforeLen, len(shards), beforeQ, q) 1020 } 1021 1022 if len(shards) == 0 { 1023 return &agg, nil 1024 } 1025 1026 shardCount := len(shards) 1027 all := make(chan shardListResult, shardCount) 1028 feeder := make(chan zoekt.Searcher, len(shards)) 1029 for _, s := range shards { 1030 feeder <- s 1031 } 1032 close(feeder) 1033 1034 for range runtime.GOMAXPROCS(0) { 1035 go func() { 1036 for s := range feeder { 1037 listOneShard(ctx, s, q, opts, all) 1038 } 1039 }() 1040 } 1041 1042 uniq := map[string]*zoekt.RepoListEntry{} 1043 1044 for range shards { 1045 r := <-all 1046 if r.err != nil { 1047 return nil, r.err 1048 } 1049 1050 agg.Crashes += r.rl.Crashes 1051 agg.Stats.Add(&r.rl.Stats) 1052 1053 for _, r := range r.rl.Repos { 1054 prev, ok := uniq[r.Repository.Name] 1055 if !ok { 1056 cp := *r // We need to copy because we mutate r.Stats when merging duplicates 1057 uniq[r.Repository.Name] = &cp 1058 } else { 1059 prev.Stats.Add(&r.Stats) 1060 } 1061 } 1062 1063 for id, r := range r.rl.ReposMap { 1064 _, ok := agg.ReposMap[id] 1065 if !ok { 1066 agg.ReposMap[id] = r 1067 } 1068 } 1069 } 1070 1071 agg.Repos = make([]*zoekt.RepoListEntry, 0, len(uniq)) 1072 for _, r := range uniq { 1073 agg.Repos = append(agg.Repos, r) 1074 } 1075 1076 // Only one of these fields is populated and in all cases the size of that 1077 // field is the number of Repos. 1078 // 1079 // Note: we don't just add individual Stats.Repos since a repository can 1080 // have multiple shards. 1081 agg.Stats.Repos = len(uniq) + len(agg.ReposMap) 1082 1083 if isAll && len(agg.Repos) > 0 { 1084 reportListAllMetrics(agg.Repos) 1085 } 1086 1087 return &agg, nil 1088} 1089 1090func reportListAllMetrics(repos []*zoekt.RepoListEntry) { 1091 var stats zoekt.RepoStats 1092 for _, r := range repos { 1093 stats.Add(&r.Stats) 1094 } 1095 1096 metricListAllRepos.Set(float64(stats.Repos)) 1097 metricListAllIndexBytes.Set(float64(stats.IndexBytes)) 1098 metricListAllContentBytes.Set(float64(stats.ContentBytes)) 1099 metricListAllDocuments.Set(float64(stats.Documents)) 1100 metricListAllShards.Set(float64(stats.Shards)) 1101 metricListAllNewLinesCount.Set(float64(stats.NewLinesCount)) 1102 metricListAllDefaultBranchNewLinesCount.Set(float64(stats.DefaultBranchNewLinesCount)) 1103 metricListAllOtherBranchesNewLinesCount.Set(float64(stats.OtherBranchesNewLinesCount)) 1104} 1105 1106// getLoaded returns the currently loaded shards. Shared so do not mutate. 1107func (s *shardedSearcher) getLoaded() loaded { 1108 // next commit will store the true value of this, for now we keep the 1109 // backwards compatible behaviour. 1110 ready := s.ready.Load() 1111 // ranked is loaded after ready to avoid a race were ready is true but 1112 // ranked is still not the final set of shards. 1113 ranked, _ := s.ranked.Load().([]*rankedShard) 1114 return loaded{ 1115 shards: ranked, 1116 ready: ready, 1117 } 1118} 1119 1120func mkRankedShard(s zoekt.Searcher) *rankedShard { 1121 q := query.Const{Value: true} 1122 // We need to use WithUnsafeContext here, otherwise we cannot return a proper 1123 // rankedShard. On the user request path we use selectRepoSet which relies on 1124 // rankedShard.repos being set. 1125 result, err := s.List(systemtenant.WithUnsafeContext(context.Background()), &q, nil) 1126 if err != nil { 1127 log.Printf("[ERROR] mkRankedShard(%s): failed to cache repository list: %v", s, err) 1128 return &rankedShard{Searcher: s} 1129 } 1130 1131 var ( 1132 maxPriority float64 1133 repos = make([]*zoekt.Repository, 0, len(result.Repos)) 1134 ) 1135 for i := range result.Repos { 1136 repo := &result.Repos[i].Repository 1137 repos = append(repos, repo) 1138 if repo.RawConfig != nil { 1139 priority, _ := strconv.ParseFloat(repo.RawConfig["priority"], 64) 1140 if priority > maxPriority { 1141 maxPriority = priority 1142 } 1143 } 1144 } 1145 1146 return &rankedShard{ 1147 Searcher: s, 1148 repos: repos, 1149 priority: maxPriority, 1150 } 1151} 1152 1153// markReady should be called once all shards have been passed into replace on 1154// startup. Once s is marked as ready it stops reporting a Crash in the 1155// response Stats. 1156func (s *shardedSearcher) markReady() { 1157 s.ready.CompareAndSwap(false, true) 1158} 1159 1160func (s *shardedSearcher) replace(shards map[string]zoekt.Searcher) { 1161 if len(shards) == 0 { 1162 return 1163 } 1164 1165 defer func(began time.Time) { 1166 metricShardsBatchReplaceDurationSeconds.Observe(time.Since(began).Seconds()) 1167 }(time.Now()) 1168 1169 s.mu.Lock() 1170 defer s.mu.Unlock() 1171 1172 for key, shard := range shards { 1173 var r *rankedShard 1174 if shard != nil { 1175 r = mkRankedShard(shard) 1176 } 1177 1178 old := s.shards[key] 1179 if shard == nil { 1180 delete(s.shards, key) 1181 } else { 1182 s.shards[key] = r 1183 } 1184 1185 if old != nil && old.Searcher != nil { 1186 // _ ___ /^^\ /^\ /^^\_ 1187 // _ _@)@) \ ,,/ '` ~ `'~~ ', `\. 1188 // _/o\_ _ _ _/~`.`...'~\ ./~~..,'`','',.,' ' ~: 1189 // / `,'.~,~.~ . , . , ~|, ,/ .,' , ,. .. ,,. `, ~\_ 1190 // ( ' _' _ '_` _ ' . , `\_/ .' ..' ' ` ` `.. `, \_ 1191 // ~V~ V~ V~ V~ ~\ ` ' . ' , ' .,.,''`.,.''`.,.``. ', \_ 1192 // _/\ /\ /\ /\_/, . ' , `_/~\_ .' .,. ,, , _/~\_ `. `. '., \_ 1193 // < ~ ~ '~`'~'`, ., . `_: ::: \_ ' `_/ ::: \_ `.,' . ', \_ 1194 // \ ' `_ '`_ _ ',/ _::_::_ \ _ _/ _::_::_ \ `.,'.,`., \-,-,-,_,_, 1195 // `'~~ `'~~ `'~~ `'~~ \(_)(_)(_)/ `~~' \(_)(_)(_)/ ~'`\_.._,._,'_;_;_;_;_; 1196 // 1197 // We can't just call Close now, because there may be ongoing searches 1198 // which have old in the shards list. Previously we used an exclusive 1199 // lock to guarantee there were no concurrent searches. However, that 1200 // led to blocking on the read path. 1201 // 1202 // We could introduce granular locking per rankedShard to know when 1203 // there are no more references. However, this becomes tricky in 1204 // practice. Instead we rely on the garbage collector noticing old is no 1205 // longer used. We take care in our searchers to runtime.KeepAlive until 1206 // we have stopped referencing the underling mmap data. 1207 runtime.SetFinalizer(old, func(r *rankedShard) { 1208 r.Close() 1209 }) 1210 } 1211 } 1212 1213 ranked := make([]*rankedShard, 0, len(s.shards)) 1214 for _, r := range s.shards { 1215 ranked = append(ranked, r) 1216 } 1217 1218 sort.Slice(ranked, func(i, j int) bool { 1219 priorityDiff := ranked[i].priority - ranked[j].priority 1220 if priorityDiff != 0 { 1221 return priorityDiff > 0 1222 } 1223 if len(ranked[i].repos) == 0 || len(ranked[j].repos) == 0 { 1224 // Protect against empty names which can happen if we fail to List or 1225 // the shard is full of tombstones. Prefer the shard which has names. 1226 return len(ranked[i].repos) >= len(ranked[j].repos) 1227 } 1228 return ranked[i].repos[0].Name < ranked[j].repos[0].Name 1229 }) 1230 1231 s.ranked.Store(ranked) 1232 1233 metricShardsLoaded.Set(float64(len(ranked))) 1234} 1235 1236func loadShard(fn string) (zoekt.Searcher, error) { 1237 f, err := os.Open(fn) 1238 if err != nil { 1239 return nil, err 1240 } 1241 1242 iFile, err := index.NewIndexFile(f) 1243 if err != nil { 1244 return nil, err 1245 } 1246 s, err := index.NewSearcher(iFile) 1247 if err != nil { 1248 iFile.Close() 1249 return nil, fmt.Errorf("NewSearcher(%s): %v", fn, err) 1250 } 1251 1252 return s, nil 1253} 1254 1255// prioritySlice is a trivial implementation of an array that provides three 1256// things: appending a value, removing a value, and getting the array's max. 1257// Operations take O(n) time, which is acceptable because N is restricted to 1258// GOMAXPROCS (i.e., number of cpu cores) by the shardedSearcher interface. 1259type prioritySlice []float64 1260 1261func (p *prioritySlice) append(pri float64) { 1262 *p = append(*p, pri) 1263} 1264 1265func (p *prioritySlice) remove(pri float64) { 1266 for i, opri := range *p { 1267 if opri == pri { 1268 if i != len(*p)-1 { 1269 // swap to make this element the tail 1270 (*p)[i] = (*p)[len(*p)-1] 1271 } 1272 // pop the end off 1273 *p = (*p)[:len(*p)-1] 1274 break 1275 } 1276 } 1277} 1278 1279func (p *prioritySlice) max() float64 { 1280 // remove() and max() could be combined, but this is easier to read and 1281 // the expected performance difference from the extra lock and loop is 1282 // almost certainly irrelevant. 1283 maxPri := math.Inf(-1) 1284 for _, pri := range *p { 1285 if pri > maxPri { 1286 maxPri = pri 1287 } 1288 } 1289 return maxPri 1290}