fork of https://github.com/sourcegraph/zoekt
1package shards
2
3import (
4 "context"
5 "sync"
6 "time"
7
8 "github.com/prometheus/client_golang/prometheus"
9 "github.com/prometheus/client_golang/prometheus/promauto"
10
11 "github.com/sourcegraph/zoekt"
12 "github.com/sourcegraph/zoekt/stream"
13)
14
15var (
16 metricFinalAggregateSize = promauto.NewHistogramVec(prometheus.HistogramOpts{
17 Name: "zoekt_final_aggregate_size",
18 Help: "The number of file matches we aggregated before flushing",
19 Buckets: prometheus.ExponentialBuckets(1, 2, 20),
20 }, []string{"reason"})
21)
22
23// collectSender is a sender that will aggregate results. Once sending is
24// done, you call Done to return the aggregated result which are ranked.
25//
26// Note: It aggregates Progress as well, and expects that the
27// MaxPendingPriority it receives are monotonically decreasing.
28type collectSender struct {
29 opts *zoekt.SearchOptions
30 aggregate *zoekt.SearchResult
31}
32
33func newCollectSender(opts *zoekt.SearchOptions) *collectSender {
34 return &collectSender{opts: opts}
35}
36
37// Send aggregates the new search result by adding it stats and ranking
38// and truncating its files according to the input SearchOptions.
39func (c *collectSender) Send(r *zoekt.SearchResult) {
40 if c.aggregate == nil {
41 c.aggregate = &zoekt.SearchResult{
42 RepoURLs: map[string]string{},
43 LineFragments: map[string]string{},
44 }
45 }
46
47 c.aggregate.Stats.Add(r.Stats)
48
49 if len(r.Files) > 0 {
50 c.aggregate.Files = append(c.aggregate.Files, r.Files...)
51
52 c.aggregate.Files = zoekt.SortAndTruncateFiles(c.aggregate.Files, c.opts)
53
54 for k, v := range r.RepoURLs {
55 c.aggregate.RepoURLs[k] = v
56 }
57 for k, v := range r.LineFragments {
58 c.aggregate.LineFragments[k] = v
59 }
60 }
61
62 // The priority of our aggregate is the largest priority we collect.
63 if c.aggregate.Priority < r.Priority {
64 c.aggregate.Priority = r.Priority
65 }
66
67 // We receive monotonically decreasing values, so we update on every call.
68 c.aggregate.MaxPendingPriority = r.MaxPendingPriority
69}
70
71// Done returns the aggregated result.
72//
73// If no results are aggregated, ok is false and the result is nil.
74func (c *collectSender) Done() (_ *zoekt.SearchResult, ok bool) {
75 if c.aggregate == nil {
76 return nil, false
77 }
78
79 agg := c.aggregate
80 c.aggregate = nil
81 return agg, true
82}
83
84// newFlushCollectSender creates a sender which will collect and rank results
85// until opts.FlushWallTime. After that it will stream each result as it is
86// sent.
87func newFlushCollectSender(opts *zoekt.SearchOptions, sender zoekt.Sender) (zoekt.Sender, func()) {
88 // We don't need to do any collecting, so just pass back the sender to use
89 // directly.
90 if opts.FlushWallTime == 0 {
91 return sender, func() {}
92 }
93
94 // We transition through 3 states
95 // 1. collectSender != nil: collect results via collectSender
96 // 2. timerFired: send collected results and mark collectSender nil
97 // 3. collectSender == nil: directly use sender
98
99 var (
100 mu sync.Mutex
101 collectSender = newCollectSender(opts)
102 timerCancel = make(chan struct{})
103 )
104
105 // stopCollectingAndFlush will send what we have collected and all future
106 // sends will go via sender directly.
107 stopCollectingAndFlush := func(reason zoekt.FlushReason) {
108 mu.Lock()
109 defer mu.Unlock()
110
111 if collectSender == nil {
112 return
113 }
114
115 if agg, ok := collectSender.Done(); ok {
116 metricFinalAggregateSize.WithLabelValues(reason.String()).Observe(float64(len(agg.Files)))
117 agg.FlushReason = reason
118 sender.Send(agg)
119 }
120
121 // From now on use sender directly
122 collectSender = nil
123
124 // Stop timer goroutine if it is still running.
125 close(timerCancel)
126 }
127
128 // Wait FlushWallTime to call stopCollecting.
129 go func() {
130 timer := time.NewTimer(opts.FlushWallTime)
131 select {
132 case <-timerCancel:
133 timer.Stop()
134 case <-timer.C:
135 stopCollectingAndFlush(zoekt.FlushReasonTimerExpired)
136 }
137 }()
138
139 finalFlush := func() {
140 stopCollectingAndFlush(zoekt.FlushReasonFinalFlush)
141 }
142
143 return stream.SenderFunc(func(event *zoekt.SearchResult) {
144 mu.Lock()
145 if collectSender != nil {
146 collectSender.Send(event)
147 } else {
148 sender.Send(event)
149 }
150 mu.Unlock()
151 }), finalFlush
152}
153
154// limitSender wraps a sender and calls cancel once the truncator has finished
155// truncating.
156func limitSender(cancel context.CancelFunc, sender zoekt.Sender, truncator zoekt.DisplayTruncator) zoekt.Sender {
157 return stream.SenderFunc(func(result *zoekt.SearchResult) {
158 var hasMore bool
159 result.Files, hasMore = truncator(result.Files)
160 if !hasMore {
161 cancel()
162 }
163 sender.Send(result)
164 })
165}
166
167func copyFileSender(sender zoekt.Sender) zoekt.Sender {
168 return stream.SenderFunc(func(result *zoekt.SearchResult) {
169 copyFiles(result)
170 sender.Send(result)
171 })
172}