fork of https://github.com/sourcegraph/zoekt
1package search
2
3import (
4 "context"
5 "maps"
6 "sync"
7 "time"
8
9 "github.com/prometheus/client_golang/prometheus"
10 "github.com/prometheus/client_golang/prometheus/promauto"
11 "github.com/sourcegraph/zoekt"
12 "github.com/sourcegraph/zoekt/index"
13)
14
15var metricFinalAggregateSize = promauto.NewHistogramVec(prometheus.HistogramOpts{
16 Name: "zoekt_final_aggregate_size",
17 Help: "The number of file matches we aggregated before flushing",
18 Buckets: prometheus.ExponentialBuckets(1, 2, 20),
19}, []string{"reason"})
20
21// collectSender is a sender that will aggregate results. Once sending is
22// done, you call Done to return the aggregated result which are ranked.
23//
24// Note: It aggregates Progress as well, and expects that the
25// MaxPendingPriority it receives are monotonically decreasing.
26type collectSender struct {
27 opts *zoekt.SearchOptions
28 aggregate *zoekt.SearchResult
29}
30
31func newCollectSender(opts *zoekt.SearchOptions) *collectSender {
32 return &collectSender{opts: opts}
33}
34
35// Send aggregates the new search result by adding it stats and ranking
36// and truncating its files according to the input SearchOptions.
37func (c *collectSender) Send(r *zoekt.SearchResult) {
38 if c.aggregate == nil {
39 c.aggregate = &zoekt.SearchResult{
40 RepoURLs: map[string]string{},
41 LineFragments: map[string]string{},
42 }
43 }
44
45 c.aggregate.Stats.Add(r.Stats)
46
47 if len(r.Files) > 0 {
48 c.aggregate.Files = append(c.aggregate.Files, r.Files...)
49
50 c.aggregate.Files = index.SortAndTruncateFiles(c.aggregate.Files, c.opts)
51
52 maps.Copy(c.aggregate.RepoURLs, r.RepoURLs)
53 maps.Copy(c.aggregate.LineFragments, r.LineFragments)
54 }
55
56 // The priority of our aggregate is the largest priority we collect.
57 if c.aggregate.Priority < r.Priority {
58 c.aggregate.Priority = r.Priority
59 }
60
61 // We receive monotonically decreasing values, so we update on every call.
62 c.aggregate.MaxPendingPriority = r.MaxPendingPriority
63}
64
65// Done returns the aggregated result.
66//
67// If no results are aggregated, ok is false and the result is nil.
68func (c *collectSender) Done() (_ *zoekt.SearchResult, ok bool) {
69 if c.aggregate == nil {
70 return nil, false
71 }
72
73 agg := c.aggregate
74 c.aggregate = nil
75 return agg, true
76}
77
78// newFlushCollectSender creates a sender which will collect and rank results
79// until opts.FlushWallTime. After that it will stream each result as it is
80// sent.
81func newFlushCollectSender(opts *zoekt.SearchOptions, sender zoekt.Sender) (zoekt.Sender, func()) {
82 // We don't need to do any collecting, so just pass back the sender to use
83 // directly.
84 if opts.FlushWallTime == 0 {
85 return sender, func() {}
86 }
87
88 // We transition through 3 states
89 // 1. collectSender != nil: collect results via collectSender
90 // 2. timerFired: send collected results and mark collectSender nil
91 // 3. collectSender == nil: directly use sender
92
93 var (
94 mu sync.Mutex
95 collectSender = newCollectSender(opts)
96 timerCancel = make(chan struct{})
97 )
98
99 // stopCollectingAndFlush will send what we have collected and all future
100 // sends will go via sender directly.
101 stopCollectingAndFlush := func(reason zoekt.FlushReason) {
102 mu.Lock()
103 defer mu.Unlock()
104
105 if collectSender == nil {
106 return
107 }
108
109 if agg, ok := collectSender.Done(); ok {
110 metricFinalAggregateSize.WithLabelValues(reason.String()).Observe(float64(len(agg.Files)))
111 agg.FlushReason = reason
112 sender.Send(agg)
113 }
114
115 // From now on use sender directly
116 collectSender = nil
117
118 // Stop timer goroutine if it is still running.
119 close(timerCancel)
120 }
121
122 // Wait FlushWallTime to call stopCollecting.
123 go func() {
124 timer := time.NewTimer(opts.FlushWallTime)
125 select {
126 case <-timerCancel:
127 timer.Stop()
128 case <-timer.C:
129 stopCollectingAndFlush(zoekt.FlushReasonTimerExpired)
130 }
131 }()
132
133 finalFlush := func() {
134 stopCollectingAndFlush(zoekt.FlushReasonFinalFlush)
135 }
136
137 return zoekt.SenderFunc(func(event *zoekt.SearchResult) {
138 mu.Lock()
139 if collectSender != nil {
140 collectSender.Send(event)
141 } else {
142 sender.Send(event)
143 }
144 mu.Unlock()
145 }), finalFlush
146}
147
148// limitSender wraps a sender and calls cancel once the truncator has finished
149// truncating.
150func limitSender(cancel context.CancelFunc, sender zoekt.Sender, truncator index.DisplayTruncator) zoekt.Sender {
151 return zoekt.SenderFunc(func(result *zoekt.SearchResult) {
152 var hasMore bool
153 result.Files, hasMore = truncator(result.Files)
154 if !hasMore {
155 cancel()
156 }
157 sender.Send(result)
158 })
159}
160
161func copyFileSender(sender zoekt.Sender) zoekt.Sender {
162 return zoekt.SenderFunc(func(result *zoekt.SearchResult) {
163 copyFiles(result)
164 sender.Send(result)
165 })
166}