fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package shards 2 3import ( 4 "context" 5 "sync" 6 "time" 7 8 "github.com/prometheus/client_golang/prometheus" 9 "github.com/prometheus/client_golang/prometheus/promauto" 10 11 "github.com/sourcegraph/zoekt" 12 "github.com/sourcegraph/zoekt/stream" 13) 14 15var metricFinalAggregateSize = promauto.NewHistogramVec(prometheus.HistogramOpts{ 16 Name: "zoekt_final_aggregate_size", 17 Help: "The number of file matches we aggregated before flushing", 18 Buckets: prometheus.ExponentialBuckets(1, 2, 20), 19}, []string{"reason"}) 20 21// collectSender is a sender that will aggregate results. Once sending is 22// done, you call Done to return the aggregated result which are ranked. 23// 24// Note: It aggregates Progress as well, and expects that the 25// MaxPendingPriority it receives are monotonically decreasing. 26type collectSender struct { 27 opts *zoekt.SearchOptions 28 aggregate *zoekt.SearchResult 29} 30 31func newCollectSender(opts *zoekt.SearchOptions) *collectSender { 32 return &collectSender{opts: opts} 33} 34 35// Send aggregates the new search result by adding it stats and ranking 36// and truncating its files according to the input SearchOptions. 37func (c *collectSender) Send(r *zoekt.SearchResult) { 38 if c.aggregate == nil { 39 c.aggregate = &zoekt.SearchResult{ 40 RepoURLs: map[string]string{}, 41 LineFragments: map[string]string{}, 42 } 43 } 44 45 c.aggregate.Stats.Add(r.Stats) 46 47 if len(r.Files) > 0 { 48 c.aggregate.Files = append(c.aggregate.Files, r.Files...) 49 50 c.aggregate.Files = zoekt.SortAndTruncateFiles(c.aggregate.Files, c.opts) 51 52 for k, v := range r.RepoURLs { 53 c.aggregate.RepoURLs[k] = v 54 } 55 for k, v := range r.LineFragments { 56 c.aggregate.LineFragments[k] = v 57 } 58 } 59 60 // The priority of our aggregate is the largest priority we collect. 61 if c.aggregate.Priority < r.Priority { 62 c.aggregate.Priority = r.Priority 63 } 64 65 // We receive monotonically decreasing values, so we update on every call. 66 c.aggregate.MaxPendingPriority = r.MaxPendingPriority 67} 68 69// Done returns the aggregated result. 70// 71// If no results are aggregated, ok is false and the result is nil. 72func (c *collectSender) Done() (_ *zoekt.SearchResult, ok bool) { 73 if c.aggregate == nil { 74 return nil, false 75 } 76 77 agg := c.aggregate 78 c.aggregate = nil 79 return agg, true 80} 81 82// newFlushCollectSender creates a sender which will collect and rank results 83// until opts.FlushWallTime. After that it will stream each result as it is 84// sent. 85func newFlushCollectSender(opts *zoekt.SearchOptions, sender zoekt.Sender) (zoekt.Sender, func()) { 86 // We don't need to do any collecting, so just pass back the sender to use 87 // directly. 88 if opts.FlushWallTime == 0 { 89 return sender, func() {} 90 } 91 92 // We transition through 3 states 93 // 1. collectSender != nil: collect results via collectSender 94 // 2. timerFired: send collected results and mark collectSender nil 95 // 3. collectSender == nil: directly use sender 96 97 var ( 98 mu sync.Mutex 99 collectSender = newCollectSender(opts) 100 timerCancel = make(chan struct{}) 101 ) 102 103 // stopCollectingAndFlush will send what we have collected and all future 104 // sends will go via sender directly. 105 stopCollectingAndFlush := func(reason zoekt.FlushReason) { 106 mu.Lock() 107 defer mu.Unlock() 108 109 if collectSender == nil { 110 return 111 } 112 113 if agg, ok := collectSender.Done(); ok { 114 metricFinalAggregateSize.WithLabelValues(reason.String()).Observe(float64(len(agg.Files))) 115 agg.FlushReason = reason 116 sender.Send(agg) 117 } 118 119 // From now on use sender directly 120 collectSender = nil 121 122 // Stop timer goroutine if it is still running. 123 close(timerCancel) 124 } 125 126 // Wait FlushWallTime to call stopCollecting. 127 go func() { 128 timer := time.NewTimer(opts.FlushWallTime) 129 select { 130 case <-timerCancel: 131 timer.Stop() 132 case <-timer.C: 133 stopCollectingAndFlush(zoekt.FlushReasonTimerExpired) 134 } 135 }() 136 137 finalFlush := func() { 138 stopCollectingAndFlush(zoekt.FlushReasonFinalFlush) 139 } 140 141 return stream.SenderFunc(func(event *zoekt.SearchResult) { 142 mu.Lock() 143 if collectSender != nil { 144 collectSender.Send(event) 145 } else { 146 sender.Send(event) 147 } 148 mu.Unlock() 149 }), finalFlush 150} 151 152// limitSender wraps a sender and calls cancel once the truncator has finished 153// truncating. 154func limitSender(cancel context.CancelFunc, sender zoekt.Sender, truncator zoekt.DisplayTruncator) zoekt.Sender { 155 return stream.SenderFunc(func(result *zoekt.SearchResult) { 156 var hasMore bool 157 result.Files, hasMore = truncator(result.Files) 158 if !hasMore { 159 cancel() 160 } 161 sender.Send(result) 162 }) 163} 164 165func copyFileSender(sender zoekt.Sender) zoekt.Sender { 166 return stream.SenderFunc(func(result *zoekt.SearchResult) { 167 copyFiles(result) 168 sender.Send(result) 169 }) 170}