fork of https://github.com/sourcegraph/zoekt
1package shards
2
3import (
4 "context"
5 "sync"
6 "time"
7
8 "github.com/prometheus/client_golang/prometheus"
9 "github.com/prometheus/client_golang/prometheus/promauto"
10
11 "github.com/sourcegraph/zoekt"
12 "github.com/sourcegraph/zoekt/stream"
13)
14
15var metricFinalAggregateSize = promauto.NewHistogramVec(prometheus.HistogramOpts{
16 Name: "zoekt_final_aggregate_size",
17 Help: "The number of file matches we aggregated before flushing",
18 Buckets: prometheus.ExponentialBuckets(1, 2, 20),
19}, []string{"reason"})
20
21// collectSender is a sender that will aggregate results. Once sending is
22// done, you call Done to return the aggregated result which are ranked.
23//
24// Note: It aggregates Progress as well, and expects that the
25// MaxPendingPriority it receives are monotonically decreasing.
26type collectSender struct {
27 opts *zoekt.SearchOptions
28 aggregate *zoekt.SearchResult
29}
30
31func newCollectSender(opts *zoekt.SearchOptions) *collectSender {
32 return &collectSender{opts: opts}
33}
34
35// Send aggregates the new search result by adding it stats and ranking
36// and truncating its files according to the input SearchOptions.
37func (c *collectSender) Send(r *zoekt.SearchResult) {
38 if c.aggregate == nil {
39 c.aggregate = &zoekt.SearchResult{
40 RepoURLs: map[string]string{},
41 LineFragments: map[string]string{},
42 }
43 }
44
45 c.aggregate.Stats.Add(r.Stats)
46
47 if len(r.Files) > 0 {
48 c.aggregate.Files = append(c.aggregate.Files, r.Files...)
49
50 c.aggregate.Files = zoekt.SortAndTruncateFiles(c.aggregate.Files, c.opts)
51
52 for k, v := range r.RepoURLs {
53 c.aggregate.RepoURLs[k] = v
54 }
55 for k, v := range r.LineFragments {
56 c.aggregate.LineFragments[k] = v
57 }
58 }
59
60 // The priority of our aggregate is the largest priority we collect.
61 if c.aggregate.Priority < r.Priority {
62 c.aggregate.Priority = r.Priority
63 }
64
65 // We receive monotonically decreasing values, so we update on every call.
66 c.aggregate.MaxPendingPriority = r.MaxPendingPriority
67}
68
69// Done returns the aggregated result.
70//
71// If no results are aggregated, ok is false and the result is nil.
72func (c *collectSender) Done() (_ *zoekt.SearchResult, ok bool) {
73 if c.aggregate == nil {
74 return nil, false
75 }
76
77 agg := c.aggregate
78 c.aggregate = nil
79 return agg, true
80}
81
82// newFlushCollectSender creates a sender which will collect and rank results
83// until opts.FlushWallTime. After that it will stream each result as it is
84// sent.
85func newFlushCollectSender(opts *zoekt.SearchOptions, sender zoekt.Sender) (zoekt.Sender, func()) {
86 // We don't need to do any collecting, so just pass back the sender to use
87 // directly.
88 if opts.FlushWallTime == 0 {
89 return sender, func() {}
90 }
91
92 // We transition through 3 states
93 // 1. collectSender != nil: collect results via collectSender
94 // 2. timerFired: send collected results and mark collectSender nil
95 // 3. collectSender == nil: directly use sender
96
97 var (
98 mu sync.Mutex
99 collectSender = newCollectSender(opts)
100 timerCancel = make(chan struct{})
101 )
102
103 // stopCollectingAndFlush will send what we have collected and all future
104 // sends will go via sender directly.
105 stopCollectingAndFlush := func(reason zoekt.FlushReason) {
106 mu.Lock()
107 defer mu.Unlock()
108
109 if collectSender == nil {
110 return
111 }
112
113 if agg, ok := collectSender.Done(); ok {
114 metricFinalAggregateSize.WithLabelValues(reason.String()).Observe(float64(len(agg.Files)))
115 agg.FlushReason = reason
116 sender.Send(agg)
117 }
118
119 // From now on use sender directly
120 collectSender = nil
121
122 // Stop timer goroutine if it is still running.
123 close(timerCancel)
124 }
125
126 // Wait FlushWallTime to call stopCollecting.
127 go func() {
128 timer := time.NewTimer(opts.FlushWallTime)
129 select {
130 case <-timerCancel:
131 timer.Stop()
132 case <-timer.C:
133 stopCollectingAndFlush(zoekt.FlushReasonTimerExpired)
134 }
135 }()
136
137 finalFlush := func() {
138 stopCollectingAndFlush(zoekt.FlushReasonFinalFlush)
139 }
140
141 return stream.SenderFunc(func(event *zoekt.SearchResult) {
142 mu.Lock()
143 if collectSender != nil {
144 collectSender.Send(event)
145 } else {
146 sender.Send(event)
147 }
148 mu.Unlock()
149 }), finalFlush
150}
151
152// limitSender wraps a sender and calls cancel once the truncator has finished
153// truncating.
154func limitSender(cancel context.CancelFunc, sender zoekt.Sender, truncator zoekt.DisplayTruncator) zoekt.Sender {
155 return stream.SenderFunc(func(result *zoekt.SearchResult) {
156 var hasMore bool
157 result.Files, hasMore = truncator(result.Files)
158 if !hasMore {
159 cancel()
160 }
161 sender.Send(result)
162 })
163}
164
165func copyFileSender(sender zoekt.Sender) zoekt.Sender {
166 return stream.SenderFunc(func(result *zoekt.SearchResult) {
167 copyFiles(result)
168 sender.Send(result)
169 })
170}