fork of https://github.com/sourcegraph/zoekt
1package stream
2
3import (
4 "bytes"
5 "context"
6 "encoding/gob"
7 "fmt"
8 "net/http"
9
10 "github.com/sourcegraph/zoekt"
11 "github.com/sourcegraph/zoekt/query"
12)
13
14// NewClient returns a client which implements StreamSearch. If httpClient is
15// nil, http.DefaultClient is used.
16func NewClient(address string, httpClient *http.Client) *Client {
17 registerGob()
18 if httpClient == nil {
19 httpClient = http.DefaultClient
20 }
21 return &Client{
22 address: address,
23 httpClient: httpClient,
24 }
25}
26
27// Client is an HTTP client for StreamSearch. Do not create directly, call
28// NewClient.
29type Client struct {
30 // HTTP address of zoekt-webserver. Will query against address + "/stream".
31 address string
32
33 // httpClient when set is used instead of http.DefaultClient
34 httpClient *http.Client
35}
36
37// SenderFunc is an adapter to allow the use of ordinary functions as Sender.
38// If f is a function with the appropriate signature, SenderFunc(f) is a Sender
39// that calls f.
40type SenderFunc func(result *zoekt.SearchResult)
41
42func (f SenderFunc) Send(result *zoekt.SearchResult) {
43 f(result)
44}
45
46// StreamSearch returns search results as stream by calling streamer.Send(event)
47// for each event returned by the server.
48//
49// Error events returned by the server are returned as error. Context errors are
50// recreated and returned on a best-efforts basis.
51func (c *Client) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, streamer zoekt.Sender) error {
52 // Encode query and opts.
53 buf := new(bytes.Buffer)
54 args := &searchArgs{
55 q, opts,
56 }
57 enc := gob.NewEncoder(buf)
58 err := enc.Encode(args)
59 if err != nil {
60 return fmt.Errorf("error during encoding: %w", err)
61 }
62
63 // Send request.
64 req, err := http.NewRequestWithContext(ctx, "POST", c.address+DefaultSSEPath, buf)
65 if err != nil {
66 return err
67 }
68 req.Header.Set("Accept", "application/x-gob-stream")
69 req.Header.Set("Cache-Control", "no-cache")
70 req.Header.Set("Connection", "keep-alive")
71 req.Header.Set("Transfer-Encoding", "chunked")
72
73 resp, err := c.httpClient.Do(req)
74 if err != nil {
75 return err
76 }
77 defer resp.Body.Close()
78
79 dec := gob.NewDecoder(resp.Body)
80 for {
81 reply := &searchReply{}
82 err := dec.Decode(reply)
83 if err != nil {
84 return fmt.Errorf("error during decoding: %w", err)
85 }
86 switch reply.Event {
87 case eventMatches:
88 if res, ok := reply.Data.(*zoekt.SearchResult); ok {
89 streamer.Send(res)
90 } else {
91 return fmt.Errorf("event of type %s could not be converted to *zoekt.SearchResult", eventMatches.string())
92 }
93 case eventError:
94 if errString, ok := reply.Data.(string); ok {
95 return fmt.Errorf("error received from zoekt: %s", errString)
96 } else {
97 return fmt.Errorf("data for event of type %s could not be converted to string", eventError.string())
98 }
99 case eventDone:
100 return nil
101 default:
102 return fmt.Errorf("unknown event type")
103 }
104 }
105}