fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Package stream provides a client and a server to consume search results as 2// stream. 3package stream 4 5import ( 6 "encoding/gob" 7 "errors" 8 "math" 9 "net/http" 10 "sync" 11 12 "github.com/sourcegraph/zoekt" 13 "github.com/sourcegraph/zoekt/query" 14 "github.com/sourcegraph/zoekt/rpc" 15) 16 17// DefaultSSEPath is the path used by zoekt-webserver. 18const DefaultSSEPath = "/stream" 19 20type eventType int 21 22const ( 23 eventMatches eventType = iota 24 eventError 25 eventDone 26) 27 28func (e eventType) string() string { 29 return []string{"eventMatches", "eventError", "eventDone"}[e] 30} 31 32// Server returns an http.Handler which is the server side of StreamSearch. 33func Server(searcher zoekt.Streamer) http.Handler { 34 registerGob() 35 return &handler{Searcher: searcher} 36} 37 38type searchArgs struct { 39 Q query.Q 40 Opts *zoekt.SearchOptions 41} 42 43type searchReply struct { 44 Event eventType 45 Data interface{} 46} 47 48type handler struct { 49 Searcher zoekt.Streamer 50} 51 52func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 53 ctx := r.Context() 54 55 // Decode payload. 56 args := new(searchArgs) 57 err := gob.NewDecoder(r.Body).Decode(args) 58 if err != nil { 59 http.Error(w, err.Error(), http.StatusBadRequest) 60 return 61 } 62 63 args.Q = query.RPCUnwrap(args.Q) 64 65 eventWriter, err := newEventStreamWriter(w) 66 if err != nil { 67 http.Error(w, err.Error(), http.StatusInternalServerError) 68 return 69 } 70 71 // Always send a done event in the end. 72 defer func() { 73 err = eventWriter.event(eventDone, nil) 74 if err != nil { 75 _ = eventWriter.event(eventError, err) 76 } 77 }() 78 79 send := func(zsr *zoekt.SearchResult) { 80 err := eventWriter.event(eventMatches, zsr) 81 if err != nil { 82 _ = eventWriter.event(eventError, err) 83 return 84 } 85 } 86 87 sampler := NewSamplingSender(SenderFunc(send)) 88 89 err = h.Searcher.StreamSearch(ctx, args.Q, args.Opts, sampler) 90 91 if err == nil { 92 sampler.Flush() 93 } 94 95 if err != nil { 96 _ = eventWriter.event(eventError, err) 97 return 98 } 99} 100 101type eventStreamWriter struct { 102 enc *gob.Encoder 103 flush func() 104} 105 106func newEventStreamWriter(w http.ResponseWriter) (*eventStreamWriter, error) { 107 flusher, ok := w.(http.Flusher) 108 if !ok { 109 return nil, errors.New("http flushing not supported") 110 } 111 112 w.Header().Set("Content-Type", "application/x-gob-stream") 113 w.Header().Set("Cache-Control", "no-cache") 114 w.Header().Set("Connection", "keep-alive") 115 w.Header().Set("Transfer-Encoding", "chunked") 116 117 // This informs nginx to not buffer. With buffering search responses will 118 // be delayed until buffers get full, leading to worst case latency of the 119 // full time a search takes to complete. 120 w.Header().Set("X-Accel-Buffering", "no") 121 122 return &eventStreamWriter{ 123 enc: gob.NewEncoder(w), 124 flush: flusher.Flush, 125 }, nil 126} 127 128func (e *eventStreamWriter) event(event eventType, data interface{}) error { 129 // Because gob does not support serializing errors, we send error.Error() and 130 // recreate the error on the client-side. 131 if event == eventError { 132 if err, isError := data.(error); isError { 133 data = err.Error() 134 } 135 } 136 err := e.enc.Encode(searchReply{Event: event, Data: data}) 137 if err != nil { 138 return err 139 } 140 e.flush() 141 return nil 142} 143 144var once sync.Once 145 146func registerGob() { 147 once.Do(func() { 148 gob.Register(&zoekt.SearchResult{}) 149 }) 150 rpc.RegisterGob() 151} 152 153// NewSamplingSender is a zoekt.Sender that samples stats events 154// to avoid sending many empty stats events over the wire. 155func NewSamplingSender(next zoekt.Sender) *samplingSender { 156 return &samplingSender{ 157 next: next, 158 agg: zoekt.SearchResult{}, 159 aggCount: 0, 160 } 161} 162 163type samplingSender struct { 164 next zoekt.Sender 165 agg zoekt.SearchResult 166 aggCount int 167} 168 169func (s *samplingSender) Send(event *zoekt.SearchResult) { 170 // We don't want to send events over the wire if they don't contain file 171 // matches. Hence, in case we didn't find any results, we aggregate the stats 172 // and send them out in regular intervals. 173 if len(event.Files) == 0 { 174 s.aggCount++ 175 176 s.agg.Stats.Add(event.Stats) 177 s.agg.Progress = event.Progress 178 179 if s.aggCount%100 == 0 && !s.agg.Stats.Zero() { 180 s.next.Send(&s.agg) 181 s.agg = zoekt.SearchResult{} 182 } 183 184 return 185 } 186 187 // If we have aggregate stats, we merge them with the new event before sending 188 // it. We drop agg.Progress, because we assume that event.Progress reflects the 189 // latest status. 190 if !s.agg.Stats.Zero() { 191 event.Stats.Add(s.agg.Stats) 192 s.agg = zoekt.SearchResult{} 193 } 194 195 s.next.Send(event) 196} 197 198// Flush sends any aggregated stats that we haven't sent yet 199func (s *samplingSender) Flush() { 200 if !s.agg.Stats.Zero() { 201 s.next.Send(&zoekt.SearchResult{ 202 Stats: s.agg.Stats, 203 Progress: zoekt.Progress{ 204 Priority: math.Inf(-1), 205 MaxPendingPriority: math.Inf(-1), 206 }, 207 }) 208 } 209}