fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

streaming search that aggregates and ranks for FlushWallTime (#438)

This option makes it so that streaming search will collect results for
FlushWallTime before sending. This allows a client to provide
best-effort ranking based on time. We hope this will be a sweet spot
between the benefits of streaming latency and relevance of results.

Additionally we also make it so that streaming search respects
MaxDocDisplayCount.

Test Plan: go test

Co-authored-by: Stefan Hengl <stefan@sourcegraph.com>

+155 -27
+9
api.go
··· 777 777 // Abort the search after this much time has passed. 778 778 MaxWallTime time.Duration 779 779 780 + // FlushWallTime if non-zero will stop streaming behaviour at first and 781 + // instead will collate and sort results. At FlushWallTime the results will 782 + // be sent and then the behaviour will revert to the normal streaming. 783 + FlushWallTime time.Duration 784 + 780 785 // Trim the number of results after collating and sorting the 781 786 // results 782 787 MaxDocDisplayCount int ··· 790 795 // If true, ChunkMatches will be returned in each FileMatch rather than LineMatches 791 796 // EXPERIMENTAL: the behavior of this flag may be changed in future versions. 792 797 ChunkMatches bool 798 + 799 + // EXPERIMENTAL. If true, document ranks are used as additional input for 800 + // sorting matches. 801 + UseDocumentRanks bool 793 802 794 803 // Trace turns on opentracing for this request if true and if the Jaeger address was provided as 795 804 // a command-line flag
+6 -14
contentprovider.go
··· 720 720 // 721 721 // Rankings derived from match scores and rank vectors are combined based on 722 722 // "Reciprocal Rank Fusion" (RFF). 723 - func SortFiles(ms []FileMatch) { 723 + func SortFiles(ms []FileMatch, useDocumentRanks bool) { 724 724 sort.Sort(fileMatchesByScore(ms)) 725 725 726 - if hasRanks(ms) { 726 + if useDocumentRanks { 727 727 rffScore := make([]float64, len(ms)) 728 728 729 729 for i := 0; i < len(ms); i++ { 730 730 rffScore[i] = 1 / (k + float64(i)) 731 731 } 732 732 733 - sort.Sort(fileMatchesByRank{fileMatches: ms, rffScore: rffScore}) 733 + // We use stable sort in case we don't have ranks. Without stable sort the order 734 + // of file matches would be random which would sully the ranking induces by the 735 + // scores. 736 + sort.Stable(fileMatchesByRank{fileMatches: ms, rffScore: rffScore}) 734 737 735 738 for i := range rffScore { 736 739 rffScore[i] += 1 / (k + float64(i)) ··· 738 741 739 742 sort.Sort(fileMatchesByRFFScore{fileMatches: ms, rffScore: rffScore}) 740 743 } 741 - } 742 - 743 - // hasRanks returns true if any sr.Files has a non-zero rank vector 744 - func hasRanks(fm []FileMatch) bool { 745 - for _, f := range fm { 746 - if len(f.Ranks) > 0 { 747 - return true 748 - } 749 - } 750 - 751 - return false 752 744 } 753 745 754 746 type fileMatchesByRank struct {
+1 -1
contentprovider_test.go
··· 341 341 // d1 1/(60+2) 1/(60+1) 0,0325224748810153 2 342 342 // d4 1/(60+3) 1/(60+2) 0,0320020481310804 3 343 343 344 - SortFiles(in) 344 + SortFiles(in, true) 345 345 346 346 wantOrder := []string{"d3", "d2", "d1", "d4"} 347 347
+109 -2
shards/aggregate.go
··· 1 1 package shards 2 2 3 3 import ( 4 + "context" 5 + "sync" 6 + "time" 7 + 8 + "github.com/prometheus/client_golang/prometheus" 9 + "github.com/prometheus/client_golang/prometheus/promauto" 10 + 4 11 "github.com/sourcegraph/zoekt" 12 + "github.com/sourcegraph/zoekt/stream" 13 + ) 14 + 15 + var ( 16 + metricFinalAggregateSize = promauto.NewHistogramVec(prometheus.HistogramOpts{ 17 + Name: "zoekt_final_aggregate_size", 18 + Help: "The number of file matches we aggregated before flushing", 19 + Buckets: prometheus.ExponentialBuckets(1, 2, 20), 20 + }, []string{"reason"}) 5 21 ) 6 22 7 23 // collectSender is a sender that will aggregate results. Once sending is 8 - // done, you call Done to returned the aggregated result which are ranked. 24 + // done, you call Done to return the aggregated result which are ranked. 9 25 // 10 26 // Note: It aggregates Progress as well, and expects that the 11 27 // MaxPendingPriority it receives are monotonically decreasing. 12 28 type collectSender struct { 13 29 aggregate *zoekt.SearchResult 14 30 maxDocDisplayCount int 31 + useDocumentRanks bool 15 32 } 16 33 17 34 func newCollectSender(opts *zoekt.SearchOptions) *collectSender { 18 35 return &collectSender{ 19 36 maxDocDisplayCount: opts.MaxDocDisplayCount, 37 + useDocumentRanks: opts.UseDocumentRanks, 20 38 } 21 39 } 22 40 ··· 62 80 agg := c.aggregate 63 81 c.aggregate = nil 64 82 65 - zoekt.SortFiles(agg.Files) 83 + zoekt.SortFiles(agg.Files, c.useDocumentRanks) 66 84 67 85 if max := c.maxDocDisplayCount; max > 0 && len(agg.Files) > max { 68 86 agg.Files = agg.Files[:max] ··· 70 88 71 89 return agg, true 72 90 } 91 + 92 + // newFlushCollectSender creates a sender which will collect and rank results 93 + // until opts.FlushWallTime. After that it will stream each result as it is 94 + // sent. 95 + func newFlushCollectSender(opts *zoekt.SearchOptions, sender zoekt.Sender) (zoekt.Sender, func()) { 96 + // We don't need to do any collecting, so just pass back the sender to use 97 + // directly. 98 + if opts.FlushWallTime == 0 { 99 + return sender, func() {} 100 + } 101 + 102 + // We transition through 3 states 103 + // 1. collectSender != nil: collect results via collectSender 104 + // 2. timerFired: send collected results and mark collectSender nil 105 + // 3. collectSender == nil: directly use sender 106 + 107 + var ( 108 + mu sync.Mutex 109 + collectSender = newCollectSender(opts) 110 + timerCancel = make(chan struct{}) 111 + ) 112 + 113 + // stopCollectingAndFlush will send what we have collected and all future 114 + // sends will go via sender directly. 115 + stopCollectingAndFlush := func(reason string) { 116 + mu.Lock() 117 + defer mu.Unlock() 118 + 119 + if collectSender == nil { 120 + return 121 + } 122 + 123 + if agg, ok := collectSender.Done(); ok { 124 + metricFinalAggregateSize.WithLabelValues(reason).Observe(float64(len(agg.Files))) 125 + sender.Send(agg) 126 + } 127 + 128 + // From now on use sender directly 129 + collectSender = nil 130 + 131 + // Stop timer goroutine if it is still running. 132 + close(timerCancel) 133 + } 134 + 135 + // Wait FlushWallTime to call stopCollecting. 136 + go func() { 137 + timer := time.NewTimer(opts.FlushWallTime) 138 + select { 139 + case <-timerCancel: 140 + timer.Stop() 141 + case <-timer.C: 142 + stopCollectingAndFlush("timer_expired") 143 + } 144 + }() 145 + 146 + finalFlush := func() { 147 + stopCollectingAndFlush("final_flush") 148 + } 149 + 150 + return stream.SenderFunc(func(event *zoekt.SearchResult) { 151 + mu.Lock() 152 + if collectSender != nil { 153 + collectSender.Send(event) 154 + } else { 155 + sender.Send(event) 156 + } 157 + mu.Unlock() 158 + }), finalFlush 159 + } 160 + 161 + // limitSender wraps a sender and calls cancel once it has seen limit file 162 + // matches. 163 + func limitSender(cancel context.CancelFunc, sender zoekt.Sender, limit int) zoekt.Sender { 164 + return stream.SenderFunc(func(result *zoekt.SearchResult) { 165 + if len(result.Files) >= limit { 166 + result.Files = result.Files[:limit] 167 + cancel() 168 + } 169 + limit -= len(result.Files) 170 + sender.Send(result) 171 + }) 172 + } 173 + 174 + func copyFileSender(sender zoekt.Sender) zoekt.Sender { 175 + return stream.SenderFunc(func(result *zoekt.SearchResult) { 176 + copyFiles(result) 177 + sender.Send(result) 178 + }) 179 + }
+28 -9
shards/shards.go
··· 35 35 36 36 "github.com/sourcegraph/zoekt" 37 37 "github.com/sourcegraph/zoekt/query" 38 - "github.com/sourcegraph/zoekt/stream" 39 38 "github.com/sourcegraph/zoekt/trace" 40 39 ) 41 40 ··· 526 525 }, 527 526 }) 528 527 529 - done, err := streamSearch(ctx, proc, q, opts, shards, stream.SenderFunc(func(event *zoekt.SearchResult) { 530 - copyFiles(event) 531 - sender.Send(event) 532 - })) 528 + // Matches flow from the shards up the stack in the following order: 529 + // 530 + // 1. Search shards 531 + // 2. flushCollectSender (aggregate) 532 + // 3. limitSender (limit) 533 + // 4. copyFileSender (copy) 534 + // 535 + // For streaming, the wrapping has to happen in the inverted order. 536 + sender = copyFileSender(sender) 537 + 538 + if opts.MaxDocDisplayCount > 0 { 539 + var cancel context.CancelFunc 540 + ctx, cancel = context.WithCancel(ctx) 541 + defer cancel() 542 + sender = limitSender(cancel, sender, opts.MaxDocDisplayCount) 543 + } 544 + 545 + sender, flush := newFlushCollectSender(opts, sender) 546 + 547 + done, err := streamSearch(ctx, proc, q, opts, shards, sender) 548 + 549 + // Even though streaming is done, we may have results sitting in a buffer we 550 + // need to flush. So we need to send those before calling done. 551 + flush() 533 552 done() 534 553 535 554 return err ··· 679 698 r.Priority = r.priority 680 699 r.MaxPendingPriority = pending.max() 681 700 682 - sendByRepository(r.SearchResult, sender) 701 + sendByRepository(r.SearchResult, opts, sender) 683 702 } 684 703 } 685 704 ··· 692 711 // 693 712 // We split by repository instead of by priority because it is easier to set 694 713 // RepoURLs and LineFragments in zoekt.SearchResult. 695 - func sendByRepository(result *zoekt.SearchResult, sender zoekt.Sender) { 714 + func sendByRepository(result *zoekt.SearchResult, opts *zoekt.SearchOptions, sender zoekt.Sender) { 696 715 697 716 if len(result.RepoURLs) <= 1 || len(result.Files) == 0 { 698 - zoekt.SortFiles(result.Files) 717 + zoekt.SortFiles(result.Files, opts.UseDocumentRanks) 699 718 sender.Send(result) 700 719 return 701 720 } 702 721 703 722 send := func(repoName string, a, b int, stats zoekt.Stats) { 704 - zoekt.SortFiles(result.Files[a:b]) 723 + zoekt.SortFiles(result.Files[a:b], opts.UseDocumentRanks) 705 724 sender.Send(&zoekt.SearchResult{ 706 725 Stats: stats, 707 726 Progress: zoekt.Progress{
+2 -1
shards/shards_test.go
··· 34 34 "github.com/google/go-cmp/cmp" 35 35 "github.com/google/go-cmp/cmp/cmpopts" 36 36 "github.com/grafana/regexp" 37 + 37 38 "github.com/sourcegraph/zoekt" 38 39 "github.com/sourcegraph/zoekt/query" 39 40 "github.com/sourcegraph/zoekt/stream" ··· 764 765 sr := createMockSearchResult(n1, n2, n3, wantStats) 765 766 766 767 mock := &mockSender{} 767 - sendByRepository(sr, mock) 768 + sendByRepository(sr, &zoekt.SearchOptions{}, mock) 768 769 769 770 if diff := cmp.Diff(wantStats, mock.stats); diff != "" { 770 771 t.Logf("-want,+got\n%s", diff)