fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package stream 2 3import ( 4 "bytes" 5 "context" 6 "encoding/gob" 7 "fmt" 8 "net/http" 9 10 "github.com/sourcegraph/zoekt" 11 "github.com/sourcegraph/zoekt/query" 12) 13 14// Doer implements the minimal surface of *http.Client and http.RoundTripper needed 15// by Client. 16type Doer interface { 17 Do(*http.Request) (*http.Response, error) 18} 19 20// NewClient returns a client which implements StreamSearch. If httpClient is 21// nil, http.DefaultClient is used. 22func NewClient(address string, httpClient Doer) *Client { 23 registerGob() 24 if httpClient == nil { 25 httpClient = http.DefaultClient 26 } 27 return &Client{ 28 address: address, 29 httpClient: httpClient, 30 } 31} 32 33// Client is an HTTP client for StreamSearch. Do not create directly, call 34// NewClient. 35type Client struct { 36 // HTTP address of zoekt-webserver. Will query against address + "/stream". 37 address string 38 39 // httpClient when set is used instead of http.DefaultClient 40 httpClient Doer 41} 42 43// SenderFunc is an adapter to allow the use of ordinary functions as Sender. 44// If f is a function with the appropriate signature, SenderFunc(f) is a Sender 45// that calls f. 46type SenderFunc func(result *zoekt.SearchResult) 47 48func (f SenderFunc) Send(result *zoekt.SearchResult) { 49 f(result) 50} 51 52// StreamSearch returns search results as stream by calling streamer.Send(event) 53// for each event returned by the server. 54// 55// Error events returned by the server are returned as error. Context errors are 56// recreated and returned on a best-efforts basis. 57func (c *Client) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, streamer zoekt.Sender) error { 58 // Encode query and opts. 59 buf := new(bytes.Buffer) 60 args := &searchArgs{ 61 q, opts, 62 } 63 enc := gob.NewEncoder(buf) 64 err := enc.Encode(args) 65 if err != nil { 66 return fmt.Errorf("error during encoding: %w", err) 67 } 68 69 // Send request. 70 req, err := http.NewRequestWithContext(ctx, "POST", c.address+DefaultSSEPath, buf) 71 if err != nil { 72 return err 73 } 74 req.Header.Set("Accept", "application/x-gob-stream") 75 req.Header.Set("Cache-Control", "no-cache") 76 req.Header.Set("Connection", "keep-alive") 77 req.Header.Set("Transfer-Encoding", "chunked") 78 79 resp, err := c.httpClient.Do(req) 80 if err != nil { 81 return err 82 } 83 defer resp.Body.Close() 84 85 dec := gob.NewDecoder(resp.Body) 86 for { 87 reply := &searchReply{} 88 err := dec.Decode(reply) 89 if err != nil { 90 return fmt.Errorf("error during decoding: %w", err) 91 } 92 switch reply.Event { 93 case eventMatches: 94 if res, ok := reply.Data.(*zoekt.SearchResult); ok { 95 streamer.Send(res) 96 } else { 97 return fmt.Errorf("event of type %s could not be converted to *zoekt.SearchResult", eventMatches.string()) 98 } 99 case eventError: 100 if errString, ok := reply.Data.(string); ok { 101 return fmt.Errorf("error received from zoekt: %s", errString) 102 } else { 103 return fmt.Errorf("data for event of type %s could not be converted to string", eventError.string()) 104 } 105 case eventDone: 106 return nil 107 default: 108 return fmt.Errorf("unknown event type") 109 } 110 } 111} 112 113// WithSearcher returns Streamer composed of s and the streaming client. All 114// non-streaming calls will go via s, while streaming calls will go via the 115// streaming client. 116func (c *Client) WithSearcher(s zoekt.Searcher) zoekt.Streamer { 117 return &streamer{ 118 Searcher: s, 119 Client: c, 120 } 121} 122 123type streamer struct { 124 zoekt.Searcher 125 *Client 126}