fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package stream 2 3import ( 4 "bytes" 5 "context" 6 "encoding/gob" 7 "fmt" 8 "net/http" 9 10 "github.com/sourcegraph/zoekt" 11 "github.com/sourcegraph/zoekt/query" 12) 13 14// NewClient returns a client which implements StreamSearch. If httpClient is 15// nil, http.DefaultClient is used. 16func NewClient(address string, httpClient *http.Client) *Client { 17 registerGob() 18 if httpClient == nil { 19 httpClient = http.DefaultClient 20 } 21 return &Client{ 22 address: address, 23 httpClient: httpClient, 24 } 25} 26 27// Client is an HTTP client for StreamSearch. Do not create directly, call 28// NewClient. 29type Client struct { 30 // HTTP address of zoekt-webserver. Will query against address + "/stream". 31 address string 32 33 // httpClient when set is used instead of http.DefaultClient 34 httpClient *http.Client 35} 36 37// SenderFunc is an adapter to allow the use of ordinary functions as Sender. 38// If f is a function with the appropriate signature, SenderFunc(f) is a Sender 39// that calls f. 40type SenderFunc func(result *zoekt.SearchResult) 41 42func (f SenderFunc) Send(result *zoekt.SearchResult) { 43 f(result) 44} 45 46// StreamSearch returns search results as stream by calling streamer.Send(event) 47// for each event returned by the server. 48// 49// Error events returned by the server are returned as error. Context errors are 50// recreated and returned on a best-efforts basis. 51func (c *Client) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, streamer zoekt.Sender) error { 52 // Encode query and opts. 53 buf := new(bytes.Buffer) 54 args := &searchArgs{ 55 q, opts, 56 } 57 enc := gob.NewEncoder(buf) 58 err := enc.Encode(args) 59 if err != nil { 60 return fmt.Errorf("error during encoding: %w", err) 61 } 62 63 // Send request. 64 req, err := http.NewRequestWithContext(ctx, "POST", c.address+DefaultSSEPath, buf) 65 if err != nil { 66 return err 67 } 68 req.Header.Set("Accept", "application/x-gob-stream") 69 req.Header.Set("Cache-Control", "no-cache") 70 req.Header.Set("Connection", "keep-alive") 71 req.Header.Set("Transfer-Encoding", "chunked") 72 73 resp, err := c.httpClient.Do(req) 74 if err != nil { 75 return err 76 } 77 defer resp.Body.Close() 78 79 dec := gob.NewDecoder(resp.Body) 80 for { 81 reply := &searchReply{} 82 err := dec.Decode(reply) 83 if err != nil { 84 return fmt.Errorf("error during decoding: %w", err) 85 } 86 switch reply.Event { 87 case eventMatches: 88 if res, ok := reply.Data.(*zoekt.SearchResult); ok { 89 streamer.Send(res) 90 } else { 91 return fmt.Errorf("event of type %s could not be converted to *zoekt.SearchResult", eventMatches.string()) 92 } 93 case eventError: 94 if errString, ok := reply.Data.(string); ok { 95 return fmt.Errorf("error received from zoekt: %s", errString) 96 } else { 97 return fmt.Errorf("data for event of type %s could not be converted to string", eventError.string()) 98 } 99 case eventDone: 100 return nil 101 default: 102 return fmt.Errorf("unknown event type") 103 } 104 } 105} 106 107// WithSearcher returns Streamer composed of s and the streaming client. All 108// non-streaming calls will go via s, while streaming calls will go via the 109// streaming client. 110func (c *Client) WithSearcher(s zoekt.Searcher) zoekt.Streamer { 111 return &streamer{ 112 Searcher: s, 113 Client: c, 114 } 115} 116 117type streamer struct { 118 zoekt.Searcher 119 *Client 120}