fork of https://github.com/sourcegraph/zoekt
1// Package stream provides a client and a server to consume search results as
2// stream.
3package stream
4
5import (
6 "encoding/gob"
7 "errors"
8 "math"
9 "net/http"
10 "sync"
11
12 "github.com/sourcegraph/zoekt"
13 "github.com/sourcegraph/zoekt/query"
14 "github.com/sourcegraph/zoekt/rpc"
15)
16
17// DefaultSSEPath is the path used by zoekt-webserver.
18const DefaultSSEPath = "/stream"
19
20type eventType int
21
22const (
23 eventMatches eventType = iota
24 eventError
25 eventDone
26)
27
28func (e eventType) string() string {
29 return []string{"eventMatches", "eventError", "eventDone"}[e]
30}
31
32// Server returns an http.Handler which is the server side of StreamSearch.
33func Server(searcher zoekt.Streamer) http.Handler {
34 registerGob()
35 return &handler{Searcher: searcher}
36}
37
38type searchArgs struct {
39 Q query.Q
40 Opts *zoekt.SearchOptions
41}
42
43type searchReply struct {
44 Event eventType
45 Data interface{}
46}
47
48type handler struct {
49 Searcher zoekt.Streamer
50}
51
52func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
53 ctx := r.Context()
54
55 // Decode payload.
56 args := new(searchArgs)
57 err := gob.NewDecoder(r.Body).Decode(args)
58 if err != nil {
59 http.Error(w, err.Error(), http.StatusBadRequest)
60 return
61 }
62
63 args.Q = query.RPCUnwrap(args.Q)
64
65 eventWriter, err := newEventStreamWriter(w)
66 if err != nil {
67 http.Error(w, err.Error(), http.StatusInternalServerError)
68 return
69 }
70
71 // Always send a done event in the end.
72 defer func() {
73 err = eventWriter.event(eventDone, nil)
74 if err != nil {
75 _ = eventWriter.event(eventError, err)
76 }
77 }()
78
79 send := func(zsr *zoekt.SearchResult) {
80 err := eventWriter.event(eventMatches, zsr)
81 if err != nil {
82 _ = eventWriter.event(eventError, err)
83 return
84 }
85 }
86
87 sampler := NewSamplingSender(SenderFunc(send))
88
89 err = h.Searcher.StreamSearch(ctx, args.Q, args.Opts, sampler)
90
91 if err == nil {
92 sampler.Flush()
93 }
94
95 if err != nil {
96 _ = eventWriter.event(eventError, err)
97 return
98 }
99}
100
101type eventStreamWriter struct {
102 enc *gob.Encoder
103 flush func()
104}
105
106func newEventStreamWriter(w http.ResponseWriter) (*eventStreamWriter, error) {
107 flusher, ok := w.(http.Flusher)
108 if !ok {
109 return nil, errors.New("http flushing not supported")
110 }
111
112 w.Header().Set("Content-Type", "application/x-gob-stream")
113 w.Header().Set("Cache-Control", "no-cache")
114 w.Header().Set("Connection", "keep-alive")
115 w.Header().Set("Transfer-Encoding", "chunked")
116
117 // This informs nginx to not buffer. With buffering search responses will
118 // be delayed until buffers get full, leading to worst case latency of the
119 // full time a search takes to complete.
120 w.Header().Set("X-Accel-Buffering", "no")
121
122 return &eventStreamWriter{
123 enc: gob.NewEncoder(w),
124 flush: flusher.Flush,
125 }, nil
126}
127
128func (e *eventStreamWriter) event(event eventType, data interface{}) error {
129 // Because gob does not support serializing errors, we send error.Error() and
130 // recreate the error on the client-side.
131 if event == eventError {
132 if err, isError := data.(error); isError {
133 data = err.Error()
134 }
135 }
136 err := e.enc.Encode(searchReply{Event: event, Data: data})
137 if err != nil {
138 return err
139 }
140 e.flush()
141 return nil
142}
143
144var once sync.Once
145
146func registerGob() {
147 once.Do(func() {
148 gob.Register(&zoekt.SearchResult{})
149 })
150 rpc.RegisterGob()
151}
152
153// NewSamplingSender is a zoekt.Sender that samples stats events
154// to avoid sending many empty stats events over the wire.
155func NewSamplingSender(next zoekt.Sender) *samplingSender {
156 return &samplingSender{
157 next: next,
158 agg: zoekt.SearchResult{},
159 aggCount: 0,
160 }
161}
162
163type samplingSender struct {
164 next zoekt.Sender
165 agg zoekt.SearchResult
166 aggCount int
167}
168
169func (s *samplingSender) Send(event *zoekt.SearchResult) {
170 // We don't want to send events over the wire if they don't contain file
171 // matches. Hence, in case we didn't find any results, we aggregate the stats
172 // and send them out in regular intervals.
173 if len(event.Files) == 0 {
174 s.aggCount++
175
176 s.agg.Stats.Add(event.Stats)
177 s.agg.Progress = event.Progress
178
179 if s.aggCount%100 == 0 && !s.agg.Stats.Zero() {
180 s.next.Send(&s.agg)
181 s.agg = zoekt.SearchResult{}
182 }
183
184 return
185 }
186
187 // If we have aggregate stats, we merge them with the new event before sending
188 // it. We drop agg.Progress, because we assume that event.Progress reflects the
189 // latest status.
190 if !s.agg.Stats.Zero() {
191 event.Stats.Add(s.agg.Stats)
192 s.agg = zoekt.SearchResult{}
193 }
194
195 s.next.Send(event)
196}
197
198// Flush sends any aggregated stats that we haven't sent yet
199func (s *samplingSender) Flush() {
200 if !s.agg.Stats.Zero() {
201 s.next.Send(&zoekt.SearchResult{
202 Stats: s.agg.Stats,
203 Progress: zoekt.Progress{
204 Priority: math.Inf(-1),
205 MaxPendingPriority: math.Inf(-1),
206 },
207 })
208 }
209}