fork of https://github.com/sourcegraph/zoekt
1package shards
2
3import (
4 "context"
5 "sync"
6 "time"
7
8 "github.com/prometheus/client_golang/prometheus"
9 "github.com/prometheus/client_golang/prometheus/promauto"
10
11 "github.com/sourcegraph/zoekt"
12 "github.com/sourcegraph/zoekt/stream"
13)
14
15var (
16 metricFinalAggregateSize = promauto.NewHistogramVec(prometheus.HistogramOpts{
17 Name: "zoekt_final_aggregate_size",
18 Help: "The number of file matches we aggregated before flushing",
19 Buckets: prometheus.ExponentialBuckets(1, 2, 20),
20 }, []string{"reason"})
21)
22
23// collectSender is a sender that will aggregate results. Once sending is
24// done, you call Done to return the aggregated result which are ranked.
25//
26// Note: It aggregates Progress as well, and expects that the
27// MaxPendingPriority it receives are monotonically decreasing.
28type collectSender struct {
29 opts *zoekt.SearchOptions
30 aggregate *zoekt.SearchResult
31}
32
33func newCollectSender(opts *zoekt.SearchOptions) *collectSender {
34 return &collectSender{opts: opts}
35}
36
37// Send aggregates the new search result by adding it stats and ranking
38// and truncating its files according to the input SearchOptions.
39func (c *collectSender) Send(r *zoekt.SearchResult) {
40 if c.aggregate == nil {
41 c.aggregate = &zoekt.SearchResult{
42 RepoURLs: map[string]string{},
43 LineFragments: map[string]string{},
44 }
45 }
46
47 c.aggregate.Stats.Add(r.Stats)
48
49 if len(r.Files) > 0 {
50 c.aggregate.Files = append(c.aggregate.Files, r.Files...)
51
52 zoekt.SortFiles(c.aggregate.Files)
53 if max := c.opts.MaxDocDisplayCount; max > 0 && max < len(c.aggregate.Files) {
54 c.aggregate.Files = c.aggregate.Files[:max]
55 }
56
57 for k, v := range r.RepoURLs {
58 c.aggregate.RepoURLs[k] = v
59 }
60 for k, v := range r.LineFragments {
61 c.aggregate.LineFragments[k] = v
62 }
63 }
64
65 // The priority of our aggregate is the largest priority we collect.
66 if c.aggregate.Priority < r.Priority {
67 c.aggregate.Priority = r.Priority
68 }
69
70 // We receive monotonically decreasing values, so we update on every call.
71 c.aggregate.MaxPendingPriority = r.MaxPendingPriority
72}
73
74// Done returns the aggregated result.
75//
76// If no results are aggregated, ok is false and the result is nil.
77func (c *collectSender) Done() (_ *zoekt.SearchResult, ok bool) {
78 if c.aggregate == nil {
79 return nil, false
80 }
81
82 agg := c.aggregate
83 c.aggregate = nil
84 return agg, true
85}
86
87// newFlushCollectSender creates a sender which will collect and rank results
88// until opts.FlushWallTime. After that it will stream each result as it is
89// sent.
90func newFlushCollectSender(opts *zoekt.SearchOptions, sender zoekt.Sender) (zoekt.Sender, func()) {
91 // We don't need to do any collecting, so just pass back the sender to use
92 // directly.
93 if opts.FlushWallTime == 0 {
94 return sender, func() {}
95 }
96
97 // We transition through 3 states
98 // 1. collectSender != nil: collect results via collectSender
99 // 2. timerFired: send collected results and mark collectSender nil
100 // 3. collectSender == nil: directly use sender
101
102 var (
103 mu sync.Mutex
104 collectSender = newCollectSender(opts)
105 timerCancel = make(chan struct{})
106 )
107
108 // stopCollectingAndFlush will send what we have collected and all future
109 // sends will go via sender directly.
110 stopCollectingAndFlush := func(reason zoekt.FlushReason) {
111 mu.Lock()
112 defer mu.Unlock()
113
114 if collectSender == nil {
115 return
116 }
117
118 if agg, ok := collectSender.Done(); ok {
119 metricFinalAggregateSize.WithLabelValues(reason.String()).Observe(float64(len(agg.Files)))
120 agg.FlushReason = reason
121 sender.Send(agg)
122 }
123
124 // From now on use sender directly
125 collectSender = nil
126
127 // Stop timer goroutine if it is still running.
128 close(timerCancel)
129 }
130
131 // Wait FlushWallTime to call stopCollecting.
132 go func() {
133 timer := time.NewTimer(opts.FlushWallTime)
134 select {
135 case <-timerCancel:
136 timer.Stop()
137 case <-timer.C:
138 stopCollectingAndFlush(zoekt.FlushReasonTimerExpired)
139 }
140 }()
141
142 finalFlush := func() {
143 stopCollectingAndFlush(zoekt.FlushReasonFinalFlush)
144 }
145
146 return stream.SenderFunc(func(event *zoekt.SearchResult) {
147 mu.Lock()
148 if collectSender != nil {
149 collectSender.Send(event)
150 } else {
151 sender.Send(event)
152 }
153 mu.Unlock()
154 }), finalFlush
155}
156
157// limitSender wraps a sender and calls cancel once it has seen limit file
158// matches.
159func limitSender(cancel context.CancelFunc, sender zoekt.Sender, limit int) zoekt.Sender {
160 return stream.SenderFunc(func(result *zoekt.SearchResult) {
161 if len(result.Files) >= limit {
162 result.Files = result.Files[:limit]
163 cancel()
164 }
165 limit -= len(result.Files)
166 sender.Send(result)
167 })
168}
169
170func copyFileSender(sender zoekt.Sender) zoekt.Sender {
171 return stream.SenderFunc(func(result *zoekt.SearchResult) {
172 copyFiles(result)
173 sender.Send(result)
174 })
175}