fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package shards 2 3import ( 4 "context" 5 "sync" 6 "time" 7 8 "github.com/prometheus/client_golang/prometheus" 9 "github.com/prometheus/client_golang/prometheus/promauto" 10 11 "github.com/sourcegraph/zoekt" 12) 13 14var metricFinalAggregateSize = promauto.NewHistogramVec(prometheus.HistogramOpts{ 15 Name: "zoekt_final_aggregate_size", 16 Help: "The number of file matches we aggregated before flushing", 17 Buckets: prometheus.ExponentialBuckets(1, 2, 20), 18}, []string{"reason"}) 19 20// collectSender is a sender that will aggregate results. Once sending is 21// done, you call Done to return the aggregated result which are ranked. 22// 23// Note: It aggregates Progress as well, and expects that the 24// MaxPendingPriority it receives are monotonically decreasing. 25type collectSender struct { 26 opts *zoekt.SearchOptions 27 aggregate *zoekt.SearchResult 28} 29 30func newCollectSender(opts *zoekt.SearchOptions) *collectSender { 31 return &collectSender{opts: opts} 32} 33 34// Send aggregates the new search result by adding it stats and ranking 35// and truncating its files according to the input SearchOptions. 36func (c *collectSender) Send(r *zoekt.SearchResult) { 37 if c.aggregate == nil { 38 c.aggregate = &zoekt.SearchResult{ 39 RepoURLs: map[string]string{}, 40 LineFragments: map[string]string{}, 41 } 42 } 43 44 c.aggregate.Stats.Add(r.Stats) 45 46 if len(r.Files) > 0 { 47 c.aggregate.Files = append(c.aggregate.Files, r.Files...) 48 49 c.aggregate.Files = zoekt.SortAndTruncateFiles(c.aggregate.Files, c.opts) 50 51 for k, v := range r.RepoURLs { 52 c.aggregate.RepoURLs[k] = v 53 } 54 for k, v := range r.LineFragments { 55 c.aggregate.LineFragments[k] = v 56 } 57 } 58 59 // The priority of our aggregate is the largest priority we collect. 60 if c.aggregate.Priority < r.Priority { 61 c.aggregate.Priority = r.Priority 62 } 63 64 // We receive monotonically decreasing values, so we update on every call. 65 c.aggregate.MaxPendingPriority = r.MaxPendingPriority 66} 67 68// Done returns the aggregated result. 69// 70// If no results are aggregated, ok is false and the result is nil. 71func (c *collectSender) Done() (_ *zoekt.SearchResult, ok bool) { 72 if c.aggregate == nil { 73 return nil, false 74 } 75 76 agg := c.aggregate 77 c.aggregate = nil 78 return agg, true 79} 80 81// newFlushCollectSender creates a sender which will collect and rank results 82// until opts.FlushWallTime. After that it will stream each result as it is 83// sent. 84func newFlushCollectSender(opts *zoekt.SearchOptions, sender zoekt.Sender) (zoekt.Sender, func()) { 85 // We don't need to do any collecting, so just pass back the sender to use 86 // directly. 87 if opts.FlushWallTime == 0 { 88 return sender, func() {} 89 } 90 91 // We transition through 3 states 92 // 1. collectSender != nil: collect results via collectSender 93 // 2. timerFired: send collected results and mark collectSender nil 94 // 3. collectSender == nil: directly use sender 95 96 var ( 97 mu sync.Mutex 98 collectSender = newCollectSender(opts) 99 timerCancel = make(chan struct{}) 100 ) 101 102 // stopCollectingAndFlush will send what we have collected and all future 103 // sends will go via sender directly. 104 stopCollectingAndFlush := func(reason zoekt.FlushReason) { 105 mu.Lock() 106 defer mu.Unlock() 107 108 if collectSender == nil { 109 return 110 } 111 112 if agg, ok := collectSender.Done(); ok { 113 metricFinalAggregateSize.WithLabelValues(reason.String()).Observe(float64(len(agg.Files))) 114 agg.FlushReason = reason 115 sender.Send(agg) 116 } 117 118 // From now on use sender directly 119 collectSender = nil 120 121 // Stop timer goroutine if it is still running. 122 close(timerCancel) 123 } 124 125 // Wait FlushWallTime to call stopCollecting. 126 go func() { 127 timer := time.NewTimer(opts.FlushWallTime) 128 select { 129 case <-timerCancel: 130 timer.Stop() 131 case <-timer.C: 132 stopCollectingAndFlush(zoekt.FlushReasonTimerExpired) 133 } 134 }() 135 136 finalFlush := func() { 137 stopCollectingAndFlush(zoekt.FlushReasonFinalFlush) 138 } 139 140 return zoekt.SenderFunc(func(event *zoekt.SearchResult) { 141 mu.Lock() 142 if collectSender != nil { 143 collectSender.Send(event) 144 } else { 145 sender.Send(event) 146 } 147 mu.Unlock() 148 }), finalFlush 149} 150 151// limitSender wraps a sender and calls cancel once the truncator has finished 152// truncating. 153func limitSender(cancel context.CancelFunc, sender zoekt.Sender, truncator zoekt.DisplayTruncator) zoekt.Sender { 154 return zoekt.SenderFunc(func(result *zoekt.SearchResult) { 155 var hasMore bool 156 result.Files, hasMore = truncator(result.Files) 157 if !hasMore { 158 cancel() 159 } 160 sender.Send(result) 161 }) 162} 163 164func copyFileSender(sender zoekt.Sender) zoekt.Sender { 165 return zoekt.SenderFunc(func(result *zoekt.SearchResult) { 166 copyFiles(result) 167 sender.Send(result) 168 }) 169}