fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package search 2 3import ( 4 "context" 5 "maps" 6 "sync" 7 "time" 8 9 "github.com/prometheus/client_golang/prometheus" 10 "github.com/prometheus/client_golang/prometheus/promauto" 11 12 "github.com/sourcegraph/zoekt" 13 "github.com/sourcegraph/zoekt/index" 14) 15 16var metricFinalAggregateSize = promauto.NewHistogramVec(prometheus.HistogramOpts{ 17 Name: "zoekt_final_aggregate_size", 18 Help: "The number of file matches we aggregated before flushing", 19 Buckets: prometheus.ExponentialBuckets(1, 2, 20), 20}, []string{"reason"}) 21 22// collectSender is a sender that will aggregate results. Once sending is 23// done, you call Done to return the aggregated result which are ranked. 24// 25// Note: It aggregates Progress as well, and expects that the 26// MaxPendingPriority it receives are monotonically decreasing. 27type collectSender struct { 28 opts *zoekt.SearchOptions 29 aggregate *zoekt.SearchResult 30} 31 32func newCollectSender(opts *zoekt.SearchOptions) *collectSender { 33 return &collectSender{opts: opts} 34} 35 36// Send aggregates the new search result by adding it stats and ranking 37// and truncating its files according to the input SearchOptions. 38func (c *collectSender) Send(r *zoekt.SearchResult) { 39 if c.aggregate == nil { 40 c.aggregate = &zoekt.SearchResult{ 41 RepoURLs: map[string]string{}, 42 LineFragments: map[string]string{}, 43 } 44 } 45 46 c.aggregate.Stats.Add(r.Stats) 47 48 if len(r.Files) > 0 { 49 c.aggregate.Files = append(c.aggregate.Files, r.Files...) 50 51 c.aggregate.Files = index.SortAndTruncateFiles(c.aggregate.Files, c.opts) 52 53 maps.Copy(c.aggregate.RepoURLs, r.RepoURLs) 54 maps.Copy(c.aggregate.LineFragments, r.LineFragments) 55 } 56 57 // The priority of our aggregate is the largest priority we collect. 58 if c.aggregate.Priority < r.Priority { 59 c.aggregate.Priority = r.Priority 60 } 61 62 // We receive monotonically decreasing values, so we update on every call. 63 c.aggregate.MaxPendingPriority = r.MaxPendingPriority 64} 65 66// Done returns the aggregated result. 67// 68// If no results are aggregated, ok is false and the result is nil. 69func (c *collectSender) Done() (_ *zoekt.SearchResult, ok bool) { 70 if c.aggregate == nil { 71 return nil, false 72 } 73 74 agg := c.aggregate 75 c.aggregate = nil 76 return agg, true 77} 78 79// newFlushCollectSender creates a sender which will collect and rank results 80// until opts.FlushWallTime. After that it will stream each result as it is 81// sent. 82func newFlushCollectSender(opts *zoekt.SearchOptions, sender zoekt.Sender) (zoekt.Sender, func()) { 83 // We don't need to do any collecting, so just pass back the sender to use 84 // directly. 85 if opts.FlushWallTime == 0 { 86 return sender, func() {} 87 } 88 89 // We transition through 3 states 90 // 1. collectSender != nil: collect results via collectSender 91 // 2. timerFired: send collected results and mark collectSender nil 92 // 3. collectSender == nil: directly use sender 93 94 var ( 95 mu sync.Mutex 96 collectSender = newCollectSender(opts) 97 timerCancel = make(chan struct{}) 98 ) 99 100 // stopCollectingAndFlush will send what we have collected and all future 101 // sends will go via sender directly. 102 stopCollectingAndFlush := func(reason zoekt.FlushReason) { 103 mu.Lock() 104 defer mu.Unlock() 105 106 if collectSender == nil { 107 return 108 } 109 110 if agg, ok := collectSender.Done(); ok { 111 metricFinalAggregateSize.WithLabelValues(reason.String()).Observe(float64(len(agg.Files))) 112 agg.FlushReason = reason 113 sender.Send(agg) 114 } 115 116 // From now on use sender directly 117 collectSender = nil 118 119 // Stop timer goroutine if it is still running. 120 close(timerCancel) 121 } 122 123 // Wait FlushWallTime to call stopCollecting. 124 go func() { 125 timer := time.NewTimer(opts.FlushWallTime) 126 select { 127 case <-timerCancel: 128 timer.Stop() 129 case <-timer.C: 130 stopCollectingAndFlush(zoekt.FlushReasonTimerExpired) 131 } 132 }() 133 134 finalFlush := func() { 135 stopCollectingAndFlush(zoekt.FlushReasonFinalFlush) 136 } 137 138 return zoekt.SenderFunc(func(event *zoekt.SearchResult) { 139 mu.Lock() 140 if collectSender != nil { 141 collectSender.Send(event) 142 } else { 143 sender.Send(event) 144 } 145 mu.Unlock() 146 }), finalFlush 147} 148 149// limitSender wraps a sender and calls cancel once the truncator has finished 150// truncating. 151func limitSender(cancel context.CancelFunc, sender zoekt.Sender, truncator index.DisplayTruncator) zoekt.Sender { 152 return zoekt.SenderFunc(func(result *zoekt.SearchResult) { 153 var hasMore bool 154 result.Files, hasMore = truncator(result.Files) 155 if !hasMore { 156 cancel() 157 } 158 sender.Send(result) 159 }) 160} 161 162func copyFileSender(sender zoekt.Sender) zoekt.Sender { 163 return zoekt.SenderFunc(func(result *zoekt.SearchResult) { 164 copyFiles(result) 165 sender.Send(result) 166 }) 167}