fork of https://github.com/sourcegraph/zoekt
1// Package stream provides a client and a server to consume search results as
2// stream.
3package stream
4
5import (
6 "encoding/gob"
7 "errors"
8 "math"
9 "net/http"
10 "sync"
11
12 "github.com/sourcegraph/zoekt"
13 "github.com/sourcegraph/zoekt/query"
14 "github.com/sourcegraph/zoekt/rpc"
15)
16
17// DefaultSSEPath is the path used by zoekt-webserver.
18const DefaultSSEPath = "/stream"
19
20type eventType int
21
22const (
23 eventMatches eventType = iota
24 eventError
25 eventDone
26)
27
28func (e eventType) string() string {
29 return []string{"eventMatches", "eventError", "eventDone"}[e]
30}
31
32// Server returns an http.Handler which is the server side of StreamSearch.
33func Server(searcher zoekt.Streamer) http.Handler {
34 registerGob()
35 return &handler{Searcher: searcher}
36}
37
38type searchArgs struct {
39 Q query.Q
40 Opts *zoekt.SearchOptions
41}
42
43type searchReply struct {
44 Event eventType
45 Data interface{}
46}
47
48type handler struct {
49 Searcher zoekt.Streamer
50}
51
52func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
53 ctx := r.Context()
54
55 // Decode payload.
56 args := new(searchArgs)
57 err := gob.NewDecoder(r.Body).Decode(args)
58 if err != nil {
59 http.Error(w, err.Error(), http.StatusBadRequest)
60 return
61 }
62
63 args.Q = query.RPCUnwrap(args.Q)
64
65 eventWriter, err := newEventStreamWriter(w)
66 if err != nil {
67 http.Error(w, err.Error(), http.StatusInternalServerError)
68 return
69 }
70
71 // Always send a done event in the end.
72 defer func() {
73 err = eventWriter.event(eventDone, nil)
74 if err != nil {
75 _ = eventWriter.event(eventError, err)
76 }
77 }()
78
79 agg := zoekt.SearchResult{}
80 aggCount := 0
81
82 send := func(zsr *zoekt.SearchResult) {
83
84 err := eventWriter.event(eventMatches, zsr)
85 if err != nil {
86 _ = eventWriter.event(eventError, err)
87 return
88 }
89 }
90
91 err = h.Searcher.StreamSearch(ctx, args.Q, args.Opts, SenderFunc(func(event *zoekt.SearchResult) {
92 // We don't want to send events over the wire if they don't contain file
93 // matches. Hence, in case we didn't find any results, we aggregate the stats
94 // and send them out in regular intervals.
95 if len(event.Files) == 0 {
96 aggCount++
97
98 agg.Stats.Add(event.Stats)
99 agg.Progress = event.Progress
100
101 if aggCount%100 == 0 && !agg.Stats.Zero() {
102 send(&agg)
103 agg = zoekt.SearchResult{}
104 }
105
106 return
107 }
108
109 // If we have aggregate stats, we merge them with the new event before sending
110 // it. We drop agg.Progress, because we assume that event.Progress reflects the
111 // latest status.
112 if !agg.Stats.Zero() {
113 event.Stats.Add(agg.Stats)
114 agg = zoekt.SearchResult{}
115 }
116
117 send(event)
118 }))
119
120 if err == nil && !agg.Stats.Zero() {
121 send(&zoekt.SearchResult{
122 Stats: agg.Stats,
123 Progress: zoekt.Progress{
124 Priority: math.Inf(-1),
125 MaxPendingPriority: math.Inf(-1),
126 },
127 })
128 }
129
130 if err != nil {
131 _ = eventWriter.event(eventError, err)
132 return
133 }
134}
135
136type eventStreamWriter struct {
137 enc *gob.Encoder
138 flush func()
139}
140
141func newEventStreamWriter(w http.ResponseWriter) (*eventStreamWriter, error) {
142 flusher, ok := w.(http.Flusher)
143 if !ok {
144 return nil, errors.New("http flushing not supported")
145 }
146
147 w.Header().Set("Content-Type", "application/x-gob-stream")
148 w.Header().Set("Cache-Control", "no-cache")
149 w.Header().Set("Connection", "keep-alive")
150 w.Header().Set("Transfer-Encoding", "chunked")
151
152 // This informs nginx to not buffer. With buffering search responses will
153 // be delayed until buffers get full, leading to worst case latency of the
154 // full time a search takes to complete.
155 w.Header().Set("X-Accel-Buffering", "no")
156
157 return &eventStreamWriter{
158 enc: gob.NewEncoder(w),
159 flush: flusher.Flush,
160 }, nil
161}
162
163func (e *eventStreamWriter) event(event eventType, data interface{}) error {
164 // Because gob does not support serializing errors, we send error.Error() and
165 // recreate the error on the client-side.
166 if event == eventError {
167 if err, isError := data.(error); isError {
168 data = err.Error()
169 }
170 }
171 err := e.enc.Encode(searchReply{Event: event, Data: data})
172 if err != nil {
173 return err
174 }
175 e.flush()
176 return nil
177}
178
179var once sync.Once
180
181func registerGob() {
182 once.Do(func() {
183 gob.Register(&zoekt.SearchResult{})
184 })
185 rpc.RegisterGob()
186}