fork of https://github.com/sourcegraph/zoekt
1package stream
2
3import (
4 "bytes"
5 "context"
6 "encoding/gob"
7 "fmt"
8 "net/http"
9
10 "github.com/sourcegraph/zoekt"
11 "github.com/sourcegraph/zoekt/query"
12)
13
14// Doer implements the minimal surface of *http.Client and http.RoundTripper needed
15// by Client.
16type Doer interface {
17 Do(*http.Request) (*http.Response, error)
18}
19
20// NewClient returns a client which implements StreamSearch. If httpClient is
21// nil, http.DefaultClient is used.
22func NewClient(address string, httpClient Doer) *Client {
23 registerGob()
24 if httpClient == nil {
25 httpClient = http.DefaultClient
26 }
27 return &Client{
28 address: address,
29 httpClient: httpClient,
30 }
31}
32
33// Client is an HTTP client for StreamSearch. Do not create directly, call
34// NewClient.
35type Client struct {
36 // HTTP address of zoekt-webserver. Will query against address + "/stream".
37 address string
38
39 // httpClient when set is used instead of http.DefaultClient
40 httpClient Doer
41}
42
43// SenderFunc is an adapter to allow the use of ordinary functions as Sender.
44// If f is a function with the appropriate signature, SenderFunc(f) is a Sender
45// that calls f.
46type SenderFunc func(result *zoekt.SearchResult)
47
48func (f SenderFunc) Send(result *zoekt.SearchResult) {
49 f(result)
50}
51
52// StreamSearch returns search results as stream by calling streamer.Send(event)
53// for each event returned by the server.
54//
55// Error events returned by the server are returned as error. Context errors are
56// recreated and returned on a best-efforts basis.
57func (c *Client) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, streamer zoekt.Sender) error {
58 // Encode query and opts.
59 buf := new(bytes.Buffer)
60 args := &searchArgs{
61 q, opts,
62 }
63 enc := gob.NewEncoder(buf)
64 err := enc.Encode(args)
65 if err != nil {
66 return fmt.Errorf("error during encoding: %w", err)
67 }
68
69 // Send request.
70 req, err := http.NewRequestWithContext(ctx, "POST", c.address+DefaultSSEPath, buf)
71 if err != nil {
72 return err
73 }
74 req.Header.Set("Accept", "application/x-gob-stream")
75 req.Header.Set("Cache-Control", "no-cache")
76 req.Header.Set("Connection", "keep-alive")
77 req.Header.Set("Transfer-Encoding", "chunked")
78
79 resp, err := c.httpClient.Do(req)
80 if err != nil {
81 return err
82 }
83 defer resp.Body.Close()
84
85 dec := gob.NewDecoder(resp.Body)
86 for {
87 reply := &searchReply{}
88 err := dec.Decode(reply)
89 if err != nil {
90 return fmt.Errorf("error during decoding: %w", err)
91 }
92 switch reply.Event {
93 case eventMatches:
94 if res, ok := reply.Data.(*zoekt.SearchResult); ok {
95 streamer.Send(res)
96 } else {
97 return fmt.Errorf("event of type %s could not be converted to *zoekt.SearchResult", eventMatches.string())
98 }
99 case eventError:
100 if errString, ok := reply.Data.(string); ok {
101 return fmt.Errorf("error received from zoekt: %s", errString)
102 } else {
103 return fmt.Errorf("data for event of type %s could not be converted to string", eventError.string())
104 }
105 case eventDone:
106 return nil
107 default:
108 return fmt.Errorf("unknown event type")
109 }
110 }
111}
112
113// WithSearcher returns Streamer composed of s and the streaming client. All
114// non-streaming calls will go via s, while streaming calls will go via the
115// streaming client.
116func (c *Client) WithSearcher(s zoekt.Searcher) zoekt.Streamer {
117 return &streamer{
118 Searcher: s,
119 Client: c,
120 }
121}
122
123type streamer struct {
124 zoekt.Searcher
125 *Client
126}