fork of https://github.com/sourcegraph/zoekt
1package shards
2
3import (
4 "context"
5 "sync"
6 "time"
7
8 "github.com/prometheus/client_golang/prometheus"
9 "github.com/prometheus/client_golang/prometheus/promauto"
10
11 "github.com/sourcegraph/zoekt"
12)
13
14var metricFinalAggregateSize = promauto.NewHistogramVec(prometheus.HistogramOpts{
15 Name: "zoekt_final_aggregate_size",
16 Help: "The number of file matches we aggregated before flushing",
17 Buckets: prometheus.ExponentialBuckets(1, 2, 20),
18}, []string{"reason"})
19
20// collectSender is a sender that will aggregate results. Once sending is
21// done, you call Done to return the aggregated result which are ranked.
22//
23// Note: It aggregates Progress as well, and expects that the
24// MaxPendingPriority it receives are monotonically decreasing.
25type collectSender struct {
26 opts *zoekt.SearchOptions
27 aggregate *zoekt.SearchResult
28}
29
30func newCollectSender(opts *zoekt.SearchOptions) *collectSender {
31 return &collectSender{opts: opts}
32}
33
34// Send aggregates the new search result by adding it stats and ranking
35// and truncating its files according to the input SearchOptions.
36func (c *collectSender) Send(r *zoekt.SearchResult) {
37 if c.aggregate == nil {
38 c.aggregate = &zoekt.SearchResult{
39 RepoURLs: map[string]string{},
40 LineFragments: map[string]string{},
41 }
42 }
43
44 c.aggregate.Stats.Add(r.Stats)
45
46 if len(r.Files) > 0 {
47 c.aggregate.Files = append(c.aggregate.Files, r.Files...)
48
49 c.aggregate.Files = zoekt.SortAndTruncateFiles(c.aggregate.Files, c.opts)
50
51 for k, v := range r.RepoURLs {
52 c.aggregate.RepoURLs[k] = v
53 }
54 for k, v := range r.LineFragments {
55 c.aggregate.LineFragments[k] = v
56 }
57 }
58
59 // The priority of our aggregate is the largest priority we collect.
60 if c.aggregate.Priority < r.Priority {
61 c.aggregate.Priority = r.Priority
62 }
63
64 // We receive monotonically decreasing values, so we update on every call.
65 c.aggregate.MaxPendingPriority = r.MaxPendingPriority
66}
67
68// Done returns the aggregated result.
69//
70// If no results are aggregated, ok is false and the result is nil.
71func (c *collectSender) Done() (_ *zoekt.SearchResult, ok bool) {
72 if c.aggregate == nil {
73 return nil, false
74 }
75
76 agg := c.aggregate
77 c.aggregate = nil
78 return agg, true
79}
80
81// newFlushCollectSender creates a sender which will collect and rank results
82// until opts.FlushWallTime. After that it will stream each result as it is
83// sent.
84func newFlushCollectSender(opts *zoekt.SearchOptions, sender zoekt.Sender) (zoekt.Sender, func()) {
85 // We don't need to do any collecting, so just pass back the sender to use
86 // directly.
87 if opts.FlushWallTime == 0 {
88 return sender, func() {}
89 }
90
91 // We transition through 3 states
92 // 1. collectSender != nil: collect results via collectSender
93 // 2. timerFired: send collected results and mark collectSender nil
94 // 3. collectSender == nil: directly use sender
95
96 var (
97 mu sync.Mutex
98 collectSender = newCollectSender(opts)
99 timerCancel = make(chan struct{})
100 )
101
102 // stopCollectingAndFlush will send what we have collected and all future
103 // sends will go via sender directly.
104 stopCollectingAndFlush := func(reason zoekt.FlushReason) {
105 mu.Lock()
106 defer mu.Unlock()
107
108 if collectSender == nil {
109 return
110 }
111
112 if agg, ok := collectSender.Done(); ok {
113 metricFinalAggregateSize.WithLabelValues(reason.String()).Observe(float64(len(agg.Files)))
114 agg.FlushReason = reason
115 sender.Send(agg)
116 }
117
118 // From now on use sender directly
119 collectSender = nil
120
121 // Stop timer goroutine if it is still running.
122 close(timerCancel)
123 }
124
125 // Wait FlushWallTime to call stopCollecting.
126 go func() {
127 timer := time.NewTimer(opts.FlushWallTime)
128 select {
129 case <-timerCancel:
130 timer.Stop()
131 case <-timer.C:
132 stopCollectingAndFlush(zoekt.FlushReasonTimerExpired)
133 }
134 }()
135
136 finalFlush := func() {
137 stopCollectingAndFlush(zoekt.FlushReasonFinalFlush)
138 }
139
140 return zoekt.SenderFunc(func(event *zoekt.SearchResult) {
141 mu.Lock()
142 if collectSender != nil {
143 collectSender.Send(event)
144 } else {
145 sender.Send(event)
146 }
147 mu.Unlock()
148 }), finalFlush
149}
150
151// limitSender wraps a sender and calls cancel once the truncator has finished
152// truncating.
153func limitSender(cancel context.CancelFunc, sender zoekt.Sender, truncator zoekt.DisplayTruncator) zoekt.Sender {
154 return zoekt.SenderFunc(func(result *zoekt.SearchResult) {
155 var hasMore bool
156 result.Files, hasMore = truncator(result.Files)
157 if !hasMore {
158 cancel()
159 }
160 sender.Send(result)
161 })
162}
163
164func copyFileSender(sender zoekt.Sender) zoekt.Sender {
165 return zoekt.SenderFunc(func(result *zoekt.SearchResult) {
166 copyFiles(result)
167 sender.Send(result)
168 })
169}