fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package search 2 3import ( 4 "context" 5 "maps" 6 "sync" 7 "time" 8 9 "github.com/prometheus/client_golang/prometheus" 10 "github.com/prometheus/client_golang/prometheus/promauto" 11 "github.com/sourcegraph/zoekt" 12 "github.com/sourcegraph/zoekt/index" 13) 14 15var metricFinalAggregateSize = promauto.NewHistogramVec(prometheus.HistogramOpts{ 16 Name: "zoekt_final_aggregate_size", 17 Help: "The number of file matches we aggregated before flushing", 18 Buckets: prometheus.ExponentialBuckets(1, 2, 20), 19}, []string{"reason"}) 20 21// collectSender is a sender that will aggregate results. Once sending is 22// done, you call Done to return the aggregated result which are ranked. 23// 24// Note: It aggregates Progress as well, and expects that the 25// MaxPendingPriority it receives are monotonically decreasing. 26type collectSender struct { 27 opts *zoekt.SearchOptions 28 aggregate *zoekt.SearchResult 29} 30 31func newCollectSender(opts *zoekt.SearchOptions) *collectSender { 32 return &collectSender{opts: opts} 33} 34 35// Send aggregates the new search result by adding it stats and ranking 36// and truncating its files according to the input SearchOptions. 37func (c *collectSender) Send(r *zoekt.SearchResult) { 38 if c.aggregate == nil { 39 c.aggregate = &zoekt.SearchResult{ 40 RepoURLs: map[string]string{}, 41 LineFragments: map[string]string{}, 42 } 43 } 44 45 c.aggregate.Stats.Add(r.Stats) 46 47 if len(r.Files) > 0 { 48 c.aggregate.Files = append(c.aggregate.Files, r.Files...) 49 50 c.aggregate.Files = index.SortAndTruncateFiles(c.aggregate.Files, c.opts) 51 52 maps.Copy(c.aggregate.RepoURLs, r.RepoURLs) 53 maps.Copy(c.aggregate.LineFragments, r.LineFragments) 54 } 55 56 // The priority of our aggregate is the largest priority we collect. 57 if c.aggregate.Priority < r.Priority { 58 c.aggregate.Priority = r.Priority 59 } 60 61 // We receive monotonically decreasing values, so we update on every call. 62 c.aggregate.MaxPendingPriority = r.MaxPendingPriority 63} 64 65// Done returns the aggregated result. 66// 67// If no results are aggregated, ok is false and the result is nil. 68func (c *collectSender) Done() (_ *zoekt.SearchResult, ok bool) { 69 if c.aggregate == nil { 70 return nil, false 71 } 72 73 agg := c.aggregate 74 c.aggregate = nil 75 return agg, true 76} 77 78// newFlushCollectSender creates a sender which will collect and rank results 79// until opts.FlushWallTime. After that it will stream each result as it is 80// sent. 81func newFlushCollectSender(opts *zoekt.SearchOptions, sender zoekt.Sender) (zoekt.Sender, func()) { 82 // We don't need to do any collecting, so just pass back the sender to use 83 // directly. 84 if opts.FlushWallTime == 0 { 85 return sender, func() {} 86 } 87 88 // We transition through 3 states 89 // 1. collectSender != nil: collect results via collectSender 90 // 2. timerFired: send collected results and mark collectSender nil 91 // 3. collectSender == nil: directly use sender 92 93 var ( 94 mu sync.Mutex 95 collectSender = newCollectSender(opts) 96 timerCancel = make(chan struct{}) 97 ) 98 99 // stopCollectingAndFlush will send what we have collected and all future 100 // sends will go via sender directly. 101 stopCollectingAndFlush := func(reason zoekt.FlushReason) { 102 mu.Lock() 103 defer mu.Unlock() 104 105 if collectSender == nil { 106 return 107 } 108 109 if agg, ok := collectSender.Done(); ok { 110 metricFinalAggregateSize.WithLabelValues(reason.String()).Observe(float64(len(agg.Files))) 111 agg.FlushReason = reason 112 sender.Send(agg) 113 } 114 115 // From now on use sender directly 116 collectSender = nil 117 118 // Stop timer goroutine if it is still running. 119 close(timerCancel) 120 } 121 122 // Wait FlushWallTime to call stopCollecting. 123 go func() { 124 timer := time.NewTimer(opts.FlushWallTime) 125 select { 126 case <-timerCancel: 127 timer.Stop() 128 case <-timer.C: 129 stopCollectingAndFlush(zoekt.FlushReasonTimerExpired) 130 } 131 }() 132 133 finalFlush := func() { 134 stopCollectingAndFlush(zoekt.FlushReasonFinalFlush) 135 } 136 137 return zoekt.SenderFunc(func(event *zoekt.SearchResult) { 138 mu.Lock() 139 if collectSender != nil { 140 collectSender.Send(event) 141 } else { 142 sender.Send(event) 143 } 144 mu.Unlock() 145 }), finalFlush 146} 147 148// limitSender wraps a sender and calls cancel once the truncator has finished 149// truncating. 150func limitSender(cancel context.CancelFunc, sender zoekt.Sender, truncator index.DisplayTruncator) zoekt.Sender { 151 return zoekt.SenderFunc(func(result *zoekt.SearchResult) { 152 var hasMore bool 153 result.Files, hasMore = truncator(result.Files) 154 if !hasMore { 155 cancel() 156 } 157 sender.Send(result) 158 }) 159} 160 161func copyFileSender(sender zoekt.Sender) zoekt.Sender { 162 return zoekt.SenderFunc(func(result *zoekt.SearchResult) { 163 copyFiles(result) 164 sender.Send(result) 165 }) 166}