fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

scheduler: add some clarifying comments (#508)

+30 -12
+13 -5
shards/sched.go
··· 129 129 130 130 // Acquire implements scheduler.Acquire. 131 131 func (s *multiScheduler) Acquire(ctx context.Context) (*process, error) { 132 - // Start in interactive. yieldFunc will switch us to batch. sem can be nil 133 - // if we fail while switching to batch. nil value prevents us releasing 134 - // twice. 132 + // There are two stages, interactive and batch. We first start by acquiring the interactive mode semaphore. 133 + // At some point in the future (if this search request is expensive enough), 134 + // yieldFunc will switch us to the batch mode semaphore. 135 + // 136 + // It's possible for "sem" to be nil if we fail while switching to batch. In this scenario, 137 + // the nil value will prevent us from releasing twice. 138 + 135 139 sem := s.semInteractive 136 140 137 141 if err := sem.Acquire(ctx); err != nil { ··· 227 231 // The only error it will return is a context error if ctx expires. In that 228 232 // case the process should stop running and call Release. 229 233 func (p *process) Yield(ctx context.Context) error { 234 + // Return immediately if we have already yielded or if we haven't used up our full timeslice 235 + // (represented via yieldTimer). 230 236 if p.yieldTimer == nil || !p.yieldTimer.Exceeded() { 231 237 return nil 232 238 } 233 239 234 - // Try to yield. This can return an error if our context expired. 240 + // We've just exceeded our timeslice. 241 + 242 + // First, try to yield. This can return an error if our context expired. 235 243 err := p.yieldFunc(ctx) 236 244 if err != nil { 237 245 return err 238 246 } 239 247 240 - // Successfully yielded. Stop our timer and mark it nil so we don't call 248 + // We've successfully yielded. Second, stop our timer and mark it nil so we don't call 241 249 // yieldFunc again. 242 250 p.yieldTimer.Stop() 243 251 p.yieldTimer = nil
+17 -7
shards/shards.go
··· 618 618 }() 619 619 620 620 tr.LazyPrintf("before selectRepoSet shards:%d", len(shards)) 621 + // Select the subset of shards that we will search over for the given query. 621 622 shards, q = selectRepoSet(shards, q) 622 623 tr.LazyPrintf("after selectRepoSet shards:%d %s", len(shards), q) 623 624 ··· 634 635 635 636 defer cancel() 636 637 638 + // We set the number of workers to GOMAXPROCS, or the number of shards, 639 + // whichever is smaller. 637 640 workers := runtime.GOMAXPROCS(0) 638 641 if workers > len(shards) { 639 642 workers = len(shards) ··· 653 656 wg sync.WaitGroup 654 657 ) 655 658 656 - // Since searching is mostly CPU bound, we limit the number 657 - // of parallel shard searches which also reduces the peak working set 658 - 659 + // Start workers that receive shards from the search channel, search them, 660 + // and send the results to the results channel. This process is repeated 661 + // until the search channel is closed. 662 + // 663 + // Note: Making "search" a buffered channel has the effect of limiting the number of parallel shard searches. 664 + // Since searching is mostly CPU bound, limiting parallel shard searches also reduces the peak working set. 659 665 wg.Add(workers) 660 666 for i := 0; i < workers; i++ { 661 667 go func() { ··· 697 703 698 704 search: 699 705 for { 700 - _ = proc.Yield(ctx) // We let searchOneShard handle context errors. 706 + // At the top of each iteration, have the proc associated with this search yield its won "timeslice" 707 + // to possibly allow other searches to make progress 708 + _ = proc.Yield(ctx) // Note: we let searchOneShard handle context errors 701 709 702 710 select { 703 - case work <- next: 711 + case work <- next: // is there a worker available to search the next shard? 704 712 pending.append(next.priority) 705 713 706 714 shard++ ··· 709 717 } else { 710 718 next = shards[shard] 711 719 } 712 - case r, ok := <-results: 720 + case r, ok := <-results: // is there a result to send back? 713 721 if !ok { 714 722 break search 715 723 } ··· 725 733 continue 726 734 } 727 735 736 + // Update the match count statistics and stop searching new shards if we've 737 + // reached the limit set in the options. 728 738 totalMatchCount += r.SearchResult.Stats.MatchCount 729 739 if opts.TotalMaxMatchCount > 0 && totalMatchCount > opts.TotalMaxMatchCount { 730 740 stop() ··· 735 745 r.Priority = r.priority 736 746 r.MaxPendingPriority = pending.max() 737 747 738 - sendByRepository(r.SearchResult, opts, sender) 748 + sendByRepository(r.SearchResult, opts, sender) // send the result back to the client 739 749 } 740 750 } 741 751