fork of https://github.com/sourcegraph/zoekt
1package search
2
3import (
4 "context"
5 "maps"
6 "sync"
7 "time"
8
9 "github.com/prometheus/client_golang/prometheus"
10 "github.com/prometheus/client_golang/prometheus/promauto"
11
12 "github.com/sourcegraph/zoekt"
13 "github.com/sourcegraph/zoekt/index"
14)
15
16var metricFinalAggregateSize = promauto.NewHistogramVec(prometheus.HistogramOpts{
17 Name: "zoekt_final_aggregate_size",
18 Help: "The number of file matches we aggregated before flushing",
19 Buckets: prometheus.ExponentialBuckets(1, 2, 20),
20}, []string{"reason"})
21
22// collectSender is a sender that will aggregate results. Once sending is
23// done, you call Done to return the aggregated result which are ranked.
24//
25// Note: It aggregates Progress as well, and expects that the
26// MaxPendingPriority it receives are monotonically decreasing.
27type collectSender struct {
28 opts *zoekt.SearchOptions
29 aggregate *zoekt.SearchResult
30}
31
32func newCollectSender(opts *zoekt.SearchOptions) *collectSender {
33 return &collectSender{opts: opts}
34}
35
36// Send aggregates the new search result by adding it stats and ranking
37// and truncating its files according to the input SearchOptions.
38func (c *collectSender) Send(r *zoekt.SearchResult) {
39 if c.aggregate == nil {
40 c.aggregate = &zoekt.SearchResult{
41 RepoURLs: map[string]string{},
42 LineFragments: map[string]string{},
43 }
44 }
45
46 c.aggregate.Stats.Add(r.Stats)
47
48 if len(r.Files) > 0 {
49 c.aggregate.Files = append(c.aggregate.Files, r.Files...)
50
51 c.aggregate.Files = index.SortAndTruncateFiles(c.aggregate.Files, c.opts)
52
53 maps.Copy(c.aggregate.RepoURLs, r.RepoURLs)
54 maps.Copy(c.aggregate.LineFragments, r.LineFragments)
55 }
56
57 // The priority of our aggregate is the largest priority we collect.
58 if c.aggregate.Priority < r.Priority {
59 c.aggregate.Priority = r.Priority
60 }
61
62 // We receive monotonically decreasing values, so we update on every call.
63 c.aggregate.MaxPendingPriority = r.MaxPendingPriority
64}
65
66// Done returns the aggregated result.
67//
68// If no results are aggregated, ok is false and the result is nil.
69func (c *collectSender) Done() (_ *zoekt.SearchResult, ok bool) {
70 if c.aggregate == nil {
71 return nil, false
72 }
73
74 agg := c.aggregate
75 c.aggregate = nil
76 return agg, true
77}
78
79// newFlushCollectSender creates a sender which will collect and rank results
80// until opts.FlushWallTime. After that it will stream each result as it is
81// sent.
82func newFlushCollectSender(opts *zoekt.SearchOptions, sender zoekt.Sender) (zoekt.Sender, func()) {
83 // We don't need to do any collecting, so just pass back the sender to use
84 // directly.
85 if opts.FlushWallTime == 0 {
86 return sender, func() {}
87 }
88
89 // We transition through 3 states
90 // 1. collectSender != nil: collect results via collectSender
91 // 2. timerFired: send collected results and mark collectSender nil
92 // 3. collectSender == nil: directly use sender
93
94 var (
95 mu sync.Mutex
96 collectSender = newCollectSender(opts)
97 timerCancel = make(chan struct{})
98 )
99
100 // stopCollectingAndFlush will send what we have collected and all future
101 // sends will go via sender directly.
102 stopCollectingAndFlush := func(reason zoekt.FlushReason) {
103 mu.Lock()
104 defer mu.Unlock()
105
106 if collectSender == nil {
107 return
108 }
109
110 if agg, ok := collectSender.Done(); ok {
111 metricFinalAggregateSize.WithLabelValues(reason.String()).Observe(float64(len(agg.Files)))
112 agg.FlushReason = reason
113 sender.Send(agg)
114 }
115
116 // From now on use sender directly
117 collectSender = nil
118
119 // Stop timer goroutine if it is still running.
120 close(timerCancel)
121 }
122
123 // Wait FlushWallTime to call stopCollecting.
124 go func() {
125 timer := time.NewTimer(opts.FlushWallTime)
126 select {
127 case <-timerCancel:
128 timer.Stop()
129 case <-timer.C:
130 stopCollectingAndFlush(zoekt.FlushReasonTimerExpired)
131 }
132 }()
133
134 finalFlush := func() {
135 stopCollectingAndFlush(zoekt.FlushReasonFinalFlush)
136 }
137
138 return zoekt.SenderFunc(func(event *zoekt.SearchResult) {
139 mu.Lock()
140 if collectSender != nil {
141 collectSender.Send(event)
142 } else {
143 sender.Send(event)
144 }
145 mu.Unlock()
146 }), finalFlush
147}
148
149// limitSender wraps a sender and calls cancel once the truncator has finished
150// truncating.
151func limitSender(cancel context.CancelFunc, sender zoekt.Sender, truncator index.DisplayTruncator) zoekt.Sender {
152 return zoekt.SenderFunc(func(result *zoekt.SearchResult) {
153 var hasMore bool
154 result.Files, hasMore = truncator(result.Files)
155 if !hasMore {
156 cancel()
157 }
158 sender.Send(result)
159 })
160}
161
162func copyFileSender(sender zoekt.Sender) zoekt.Sender {
163 return zoekt.SenderFunc(func(result *zoekt.SearchResult) {
164 copyFiles(result)
165 sender.Send(result)
166 })
167}