fork of https://github.com/sourcegraph/zoekt
1package server
2
3import (
4 "math"
5
6 "github.com/sourcegraph/zoekt"
7)
8
9// newSamplingSender is a zoekt.Sender that samples stats events to avoid
10// sending many empty stats events over the wire.
11func newSamplingSender(next zoekt.Sender) *samplingSender {
12 return &samplingSender{next: next}
13}
14
15type samplingSender struct {
16 next zoekt.Sender
17 agg zoekt.SearchResult
18 aggCount int
19}
20
21func (s *samplingSender) Send(event *zoekt.SearchResult) {
22 // We don't want to send events over the wire if they don't contain file
23 // matches. Hence, in case we didn't find any results, we aggregate the stats
24 // and send them out in regular intervals.
25 if len(event.Files) == 0 {
26 s.aggCount++
27
28 s.agg.Stats.Add(event.Stats)
29 s.agg.Progress = event.Progress
30
31 if s.aggCount%100 == 0 && !s.agg.Stats.Zero() {
32 s.next.Send(&s.agg)
33 s.agg = zoekt.SearchResult{}
34 }
35
36 return
37 }
38
39 // If we have aggregate stats, we merge them with the new event before sending
40 // it. We drop agg.Progress, because we assume that event.Progress reflects the
41 // latest status.
42 if !s.agg.Stats.Zero() {
43 event.Stats.Add(s.agg.Stats)
44 s.agg = zoekt.SearchResult{}
45 }
46
47 s.next.Send(event)
48}
49
50// Flush sends any aggregated stats that we haven't sent yet
51func (s *samplingSender) Flush() {
52 if !s.agg.Stats.Zero() {
53 s.next.Send(&zoekt.SearchResult{
54 Stats: s.agg.Stats,
55 Progress: zoekt.Progress{
56 Priority: math.Inf(-1),
57 MaxPendingPriority: math.Inf(-1),
58 },
59 })
60 }
61}