fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package shards 2 3import ( 4 "context" 5 "sync" 6 "time" 7 8 "github.com/prometheus/client_golang/prometheus" 9 "github.com/prometheus/client_golang/prometheus/promauto" 10 11 "github.com/sourcegraph/zoekt" 12 "github.com/sourcegraph/zoekt/stream" 13) 14 15var ( 16 metricFinalAggregateSize = promauto.NewHistogramVec(prometheus.HistogramOpts{ 17 Name: "zoekt_final_aggregate_size", 18 Help: "The number of file matches we aggregated before flushing", 19 Buckets: prometheus.ExponentialBuckets(1, 2, 20), 20 }, []string{"reason"}) 21) 22 23// collectSender is a sender that will aggregate results. Once sending is 24// done, you call Done to return the aggregated result which are ranked. 25// 26// Note: It aggregates Progress as well, and expects that the 27// MaxPendingPriority it receives are monotonically decreasing. 28type collectSender struct { 29 opts *zoekt.SearchOptions 30 aggregate *zoekt.SearchResult 31} 32 33func newCollectSender(opts *zoekt.SearchOptions) *collectSender { 34 return &collectSender{opts: opts} 35} 36 37// Send aggregates the new search result by adding it stats and ranking 38// and truncating its files according to the input SearchOptions. 39func (c *collectSender) Send(r *zoekt.SearchResult) { 40 if c.aggregate == nil { 41 c.aggregate = &zoekt.SearchResult{ 42 RepoURLs: map[string]string{}, 43 LineFragments: map[string]string{}, 44 } 45 } 46 47 c.aggregate.Stats.Add(r.Stats) 48 49 if len(r.Files) > 0 { 50 c.aggregate.Files = append(c.aggregate.Files, r.Files...) 51 52 zoekt.SortFiles(c.aggregate.Files) 53 if max := c.opts.MaxDocDisplayCount; max > 0 && max < len(c.aggregate.Files) { 54 c.aggregate.Files = c.aggregate.Files[:max] 55 } 56 57 for k, v := range r.RepoURLs { 58 c.aggregate.RepoURLs[k] = v 59 } 60 for k, v := range r.LineFragments { 61 c.aggregate.LineFragments[k] = v 62 } 63 } 64 65 // The priority of our aggregate is the largest priority we collect. 66 if c.aggregate.Priority < r.Priority { 67 c.aggregate.Priority = r.Priority 68 } 69 70 // We receive monotonically decreasing values, so we update on every call. 71 c.aggregate.MaxPendingPriority = r.MaxPendingPriority 72} 73 74// Done returns the aggregated result. 75// 76// If no results are aggregated, ok is false and the result is nil. 77func (c *collectSender) Done() (_ *zoekt.SearchResult, ok bool) { 78 if c.aggregate == nil { 79 return nil, false 80 } 81 82 agg := c.aggregate 83 c.aggregate = nil 84 return agg, true 85} 86 87// newFlushCollectSender creates a sender which will collect and rank results 88// until opts.FlushWallTime. After that it will stream each result as it is 89// sent. 90func newFlushCollectSender(opts *zoekt.SearchOptions, sender zoekt.Sender) (zoekt.Sender, func()) { 91 // We don't need to do any collecting, so just pass back the sender to use 92 // directly. 93 if opts.FlushWallTime == 0 { 94 return sender, func() {} 95 } 96 97 // We transition through 3 states 98 // 1. collectSender != nil: collect results via collectSender 99 // 2. timerFired: send collected results and mark collectSender nil 100 // 3. collectSender == nil: directly use sender 101 102 var ( 103 mu sync.Mutex 104 collectSender = newCollectSender(opts) 105 timerCancel = make(chan struct{}) 106 ) 107 108 // stopCollectingAndFlush will send what we have collected and all future 109 // sends will go via sender directly. 110 stopCollectingAndFlush := func(reason zoekt.FlushReason) { 111 mu.Lock() 112 defer mu.Unlock() 113 114 if collectSender == nil { 115 return 116 } 117 118 if agg, ok := collectSender.Done(); ok { 119 metricFinalAggregateSize.WithLabelValues(reason.String()).Observe(float64(len(agg.Files))) 120 agg.FlushReason = reason 121 sender.Send(agg) 122 } 123 124 // From now on use sender directly 125 collectSender = nil 126 127 // Stop timer goroutine if it is still running. 128 close(timerCancel) 129 } 130 131 // Wait FlushWallTime to call stopCollecting. 132 go func() { 133 timer := time.NewTimer(opts.FlushWallTime) 134 select { 135 case <-timerCancel: 136 timer.Stop() 137 case <-timer.C: 138 stopCollectingAndFlush(zoekt.FlushReasonTimerExpired) 139 } 140 }() 141 142 finalFlush := func() { 143 stopCollectingAndFlush(zoekt.FlushReasonFinalFlush) 144 } 145 146 return stream.SenderFunc(func(event *zoekt.SearchResult) { 147 mu.Lock() 148 if collectSender != nil { 149 collectSender.Send(event) 150 } else { 151 sender.Send(event) 152 } 153 mu.Unlock() 154 }), finalFlush 155} 156 157// limitSender wraps a sender and calls cancel once it has seen limit file 158// matches. 159func limitSender(cancel context.CancelFunc, sender zoekt.Sender, limit int) zoekt.Sender { 160 return stream.SenderFunc(func(result *zoekt.SearchResult) { 161 if len(result.Files) >= limit { 162 result.Files = result.Files[:limit] 163 cancel() 164 } 165 limit -= len(result.Files) 166 sender.Send(result) 167 }) 168} 169 170func copyFileSender(sender zoekt.Sender) zoekt.Sender { 171 return stream.SenderFunc(func(result *zoekt.SearchResult) { 172 copyFiles(result) 173 sender.Send(result) 174 }) 175}