fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Package stream provides a client and a server to consume search results as 2// stream. 3package stream 4 5import ( 6 "encoding/gob" 7 "errors" 8 "math" 9 "net/http" 10 "sync" 11 12 "github.com/sourcegraph/zoekt" 13 "github.com/sourcegraph/zoekt/query" 14 "github.com/sourcegraph/zoekt/rpc" 15) 16 17// DefaultSSEPath is the path used by zoekt-webserver. 18const DefaultSSEPath = "/stream" 19 20type eventType int 21 22const ( 23 eventMatches eventType = iota 24 eventError 25 eventDone 26) 27 28func (e eventType) string() string { 29 return []string{"eventMatches", "eventError", "eventDone"}[e] 30} 31 32// Server returns an http.Handler which is the server side of StreamSearch. 33func Server(searcher zoekt.Streamer) http.Handler { 34 registerGob() 35 return &handler{Searcher: searcher} 36} 37 38type searchArgs struct { 39 Q query.Q 40 Opts *zoekt.SearchOptions 41} 42 43type searchReply struct { 44 Event eventType 45 Data interface{} 46} 47 48type handler struct { 49 Searcher zoekt.Streamer 50} 51 52func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 53 ctx := r.Context() 54 55 // Decode payload. 56 args := new(searchArgs) 57 err := gob.NewDecoder(r.Body).Decode(args) 58 if err != nil { 59 http.Error(w, err.Error(), http.StatusBadRequest) 60 return 61 } 62 63 args.Q = query.RPCUnwrap(args.Q) 64 65 eventWriter, err := newEventStreamWriter(w) 66 if err != nil { 67 http.Error(w, err.Error(), http.StatusInternalServerError) 68 return 69 } 70 71 // Always send a done event in the end. 72 defer func() { 73 err = eventWriter.event(eventDone, nil) 74 if err != nil { 75 _ = eventWriter.event(eventError, err) 76 } 77 }() 78 79 agg := zoekt.SearchResult{} 80 aggCount := 0 81 82 send := func(zsr *zoekt.SearchResult) { 83 84 err := eventWriter.event(eventMatches, zsr) 85 if err != nil { 86 _ = eventWriter.event(eventError, err) 87 return 88 } 89 } 90 91 err = h.Searcher.StreamSearch(ctx, args.Q, args.Opts, SenderFunc(func(event *zoekt.SearchResult) { 92 // We don't want to send events over the wire if they don't contain file 93 // matches. Hence, in case we didn't find any results, we aggregate the stats 94 // and send them out in regular intervals. 95 if len(event.Files) == 0 { 96 aggCount++ 97 98 agg.Stats.Add(event.Stats) 99 agg.Progress = event.Progress 100 101 if aggCount%100 == 0 && !agg.Stats.Zero() { 102 send(&agg) 103 agg = zoekt.SearchResult{} 104 } 105 106 return 107 } 108 109 // If we have aggregate stats, we merge them with the new event before sending 110 // it. We drop agg.Progress, because we assume that event.Progress reflects the 111 // latest status. 112 if !agg.Stats.Zero() { 113 event.Stats.Add(agg.Stats) 114 agg = zoekt.SearchResult{} 115 } 116 117 send(event) 118 })) 119 120 if err == nil && !agg.Stats.Zero() { 121 send(&zoekt.SearchResult{ 122 Stats: agg.Stats, 123 Progress: zoekt.Progress{ 124 Priority: math.Inf(-1), 125 MaxPendingPriority: math.Inf(-1), 126 }, 127 }) 128 } 129 130 if err != nil { 131 _ = eventWriter.event(eventError, err) 132 return 133 } 134} 135 136type eventStreamWriter struct { 137 enc *gob.Encoder 138 flush func() 139} 140 141func newEventStreamWriter(w http.ResponseWriter) (*eventStreamWriter, error) { 142 flusher, ok := w.(http.Flusher) 143 if !ok { 144 return nil, errors.New("http flushing not supported") 145 } 146 147 w.Header().Set("Content-Type", "application/x-gob-stream") 148 w.Header().Set("Cache-Control", "no-cache") 149 w.Header().Set("Connection", "keep-alive") 150 w.Header().Set("Transfer-Encoding", "chunked") 151 152 // This informs nginx to not buffer. With buffering search responses will 153 // be delayed until buffers get full, leading to worst case latency of the 154 // full time a search takes to complete. 155 w.Header().Set("X-Accel-Buffering", "no") 156 157 return &eventStreamWriter{ 158 enc: gob.NewEncoder(w), 159 flush: flusher.Flush, 160 }, nil 161} 162 163func (e *eventStreamWriter) event(event eventType, data interface{}) error { 164 // Because gob does not support serializing errors, we send error.Error() and 165 // recreate the error on the client-side. 166 if event == eventError { 167 if err, isError := data.(error); isError { 168 data = err.Error() 169 } 170 } 171 err := e.enc.Encode(searchReply{Event: event, Data: data}) 172 if err != nil { 173 return err 174 } 175 e.flush() 176 return nil 177} 178 179var once sync.Once 180 181func registerGob() { 182 once.Do(func() { 183 gob.Register(&zoekt.SearchResult{}) 184 }) 185 rpc.RegisterGob() 186}