fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package shards 2 3import ( 4 "context" 5 "sync" 6 "time" 7 8 "github.com/prometheus/client_golang/prometheus" 9 "github.com/prometheus/client_golang/prometheus/promauto" 10 11 "github.com/sourcegraph/zoekt" 12 "github.com/sourcegraph/zoekt/stream" 13) 14 15var ( 16 metricFinalAggregateSize = promauto.NewHistogramVec(prometheus.HistogramOpts{ 17 Name: "zoekt_final_aggregate_size", 18 Help: "The number of file matches we aggregated before flushing", 19 Buckets: prometheus.ExponentialBuckets(1, 2, 20), 20 }, []string{"reason"}) 21) 22 23// collectSender is a sender that will aggregate results. Once sending is 24// done, you call Done to return the aggregated result which are ranked. 25// 26// Note: It aggregates Progress as well, and expects that the 27// MaxPendingPriority it receives are monotonically decreasing. 28type collectSender struct { 29 opts *zoekt.SearchOptions 30 aggregate *zoekt.SearchResult 31} 32 33func newCollectSender(opts *zoekt.SearchOptions) *collectSender { 34 return &collectSender{opts: opts} 35} 36 37// Send aggregates the new search result by adding it stats and ranking 38// and truncating its files according to the input SearchOptions. 39func (c *collectSender) Send(r *zoekt.SearchResult) { 40 if c.aggregate == nil { 41 c.aggregate = &zoekt.SearchResult{ 42 RepoURLs: map[string]string{}, 43 LineFragments: map[string]string{}, 44 } 45 } 46 47 c.aggregate.Stats.Add(r.Stats) 48 49 if len(r.Files) > 0 { 50 c.aggregate.Files = append(c.aggregate.Files, r.Files...) 51 52 c.aggregate.Files = zoekt.SortAndTruncateFiles(c.aggregate.Files, c.opts) 53 54 for k, v := range r.RepoURLs { 55 c.aggregate.RepoURLs[k] = v 56 } 57 for k, v := range r.LineFragments { 58 c.aggregate.LineFragments[k] = v 59 } 60 } 61 62 // The priority of our aggregate is the largest priority we collect. 63 if c.aggregate.Priority < r.Priority { 64 c.aggregate.Priority = r.Priority 65 } 66 67 // We receive monotonically decreasing values, so we update on every call. 68 c.aggregate.MaxPendingPriority = r.MaxPendingPriority 69} 70 71// Done returns the aggregated result. 72// 73// If no results are aggregated, ok is false and the result is nil. 74func (c *collectSender) Done() (_ *zoekt.SearchResult, ok bool) { 75 if c.aggregate == nil { 76 return nil, false 77 } 78 79 agg := c.aggregate 80 c.aggregate = nil 81 return agg, true 82} 83 84// newFlushCollectSender creates a sender which will collect and rank results 85// until opts.FlushWallTime. After that it will stream each result as it is 86// sent. 87func newFlushCollectSender(opts *zoekt.SearchOptions, sender zoekt.Sender) (zoekt.Sender, func()) { 88 // We don't need to do any collecting, so just pass back the sender to use 89 // directly. 90 if opts.FlushWallTime == 0 { 91 return sender, func() {} 92 } 93 94 // We transition through 3 states 95 // 1. collectSender != nil: collect results via collectSender 96 // 2. timerFired: send collected results and mark collectSender nil 97 // 3. collectSender == nil: directly use sender 98 99 var ( 100 mu sync.Mutex 101 collectSender = newCollectSender(opts) 102 timerCancel = make(chan struct{}) 103 ) 104 105 // stopCollectingAndFlush will send what we have collected and all future 106 // sends will go via sender directly. 107 stopCollectingAndFlush := func(reason zoekt.FlushReason) { 108 mu.Lock() 109 defer mu.Unlock() 110 111 if collectSender == nil { 112 return 113 } 114 115 if agg, ok := collectSender.Done(); ok { 116 metricFinalAggregateSize.WithLabelValues(reason.String()).Observe(float64(len(agg.Files))) 117 agg.FlushReason = reason 118 sender.Send(agg) 119 } 120 121 // From now on use sender directly 122 collectSender = nil 123 124 // Stop timer goroutine if it is still running. 125 close(timerCancel) 126 } 127 128 // Wait FlushWallTime to call stopCollecting. 129 go func() { 130 timer := time.NewTimer(opts.FlushWallTime) 131 select { 132 case <-timerCancel: 133 timer.Stop() 134 case <-timer.C: 135 stopCollectingAndFlush(zoekt.FlushReasonTimerExpired) 136 } 137 }() 138 139 finalFlush := func() { 140 stopCollectingAndFlush(zoekt.FlushReasonFinalFlush) 141 } 142 143 return stream.SenderFunc(func(event *zoekt.SearchResult) { 144 mu.Lock() 145 if collectSender != nil { 146 collectSender.Send(event) 147 } else { 148 sender.Send(event) 149 } 150 mu.Unlock() 151 }), finalFlush 152} 153 154// limitSender wraps a sender and calls cancel once the truncator has finished 155// truncating. 156func limitSender(cancel context.CancelFunc, sender zoekt.Sender, truncator zoekt.DisplayTruncator) zoekt.Sender { 157 return stream.SenderFunc(func(result *zoekt.SearchResult) { 158 var hasMore bool 159 result.Files, hasMore = truncator(result.Files) 160 if !hasMore { 161 cancel() 162 } 163 sender.Send(result) 164 }) 165} 166 167func copyFileSender(sender zoekt.Sender) zoekt.Sender { 168 return stream.SenderFunc(func(result *zoekt.SearchResult) { 169 copyFiles(result) 170 sender.Send(result) 171 }) 172}