fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

all: remove gob and SSE rpc endpoints (#758)

We now only consume zoekt via gRPC at Sourcegraph and I doubt anyone
uses the old endpoints.

This will have one required update in sourcegraph, and that is to use
SenderFunc from the main zoekt package rather than from the now deleted
stream package.

+151 -1099
+9
api.go
··· 1032 1032 Send(*SearchResult) 1033 1033 } 1034 1034 1035 + // SenderFunc is an adapter to allow the use of ordinary functions as Sender. 1036 + // If f is a function with the appropriate signature, SenderFunc(f) is a Sender 1037 + // that calls f. 1038 + type SenderFunc func(result *SearchResult) 1039 + 1040 + func (f SenderFunc) Send(result *SearchResult) { 1041 + f(result) 1042 + } 1043 + 1035 1044 // Streamer adds the method StreamSearch to the Searcher interface. 1036 1045 type Streamer interface { 1037 1046 Searcher
+61
cmd/zoekt-webserver/grpc/server/sampling.go
··· 1 + package server 2 + 3 + import ( 4 + "math" 5 + 6 + "github.com/sourcegraph/zoekt" 7 + ) 8 + 9 + // newSamplingSender is a zoekt.Sender that samples stats events to avoid 10 + // sending many empty stats events over the wire. 11 + func newSamplingSender(next zoekt.Sender) *samplingSender { 12 + return &samplingSender{next: next} 13 + } 14 + 15 + type samplingSender struct { 16 + next zoekt.Sender 17 + agg zoekt.SearchResult 18 + aggCount int 19 + } 20 + 21 + func (s *samplingSender) Send(event *zoekt.SearchResult) { 22 + // We don't want to send events over the wire if they don't contain file 23 + // matches. Hence, in case we didn't find any results, we aggregate the stats 24 + // and send them out in regular intervals. 25 + if len(event.Files) == 0 { 26 + s.aggCount++ 27 + 28 + s.agg.Stats.Add(event.Stats) 29 + s.agg.Progress = event.Progress 30 + 31 + if s.aggCount%100 == 0 && !s.agg.Stats.Zero() { 32 + s.next.Send(&s.agg) 33 + s.agg = zoekt.SearchResult{} 34 + } 35 + 36 + return 37 + } 38 + 39 + // If we have aggregate stats, we merge them with the new event before sending 40 + // it. We drop agg.Progress, because we assume that event.Progress reflects the 41 + // latest status. 42 + if !s.agg.Stats.Zero() { 43 + event.Stats.Add(s.agg.Stats) 44 + s.agg = zoekt.SearchResult{} 45 + } 46 + 47 + s.next.Send(event) 48 + } 49 + 50 + // Flush sends any aggregated stats that we haven't sent yet 51 + func (s *samplingSender) Flush() { 52 + if !s.agg.Stats.Zero() { 53 + s.next.Send(&zoekt.SearchResult{ 54 + Stats: s.agg.Stats, 55 + Progress: zoekt.Progress{ 56 + Priority: math.Inf(-1), 57 + MaxPendingPriority: math.Inf(-1), 58 + }, 59 + }) 60 + } 61 + }
+72
cmd/zoekt-webserver/grpc/server/sampling_test.go
··· 1 + package server 2 + 3 + import ( 4 + "testing" 5 + 6 + "github.com/sourcegraph/zoekt" 7 + ) 8 + 9 + func TestSamplingStream(t *testing.T) { 10 + nonZeroStats := zoekt.Stats{ 11 + ContentBytesLoaded: 10, 12 + } 13 + filesEvent := &zoekt.SearchResult{ 14 + Files: make([]zoekt.FileMatch, 10), 15 + Stats: nonZeroStats, 16 + } 17 + fileEvents := func(n int) []*zoekt.SearchResult { 18 + res := make([]*zoekt.SearchResult, n) 19 + for i := 0; i < n; i++ { 20 + res[i] = filesEvent 21 + } 22 + return res 23 + } 24 + statsEvent := &zoekt.SearchResult{ 25 + Stats: nonZeroStats, 26 + } 27 + statsEvents := func(n int) []*zoekt.SearchResult { 28 + res := make([]*zoekt.SearchResult, n) 29 + for i := 0; i < n; i++ { 30 + res[i] = statsEvent 31 + } 32 + return res 33 + } 34 + cases := []struct { 35 + events []*zoekt.SearchResult 36 + beforeFlushCount int 37 + afterFlushCount int 38 + }{ 39 + // These test cases assume that the sampler only forwards 40 + // every 100 stats-only event. In case the sampling logic 41 + // changes, these tests are not valuable. 42 + {nil, 0, 0}, 43 + {fileEvents(1), 1, 1}, 44 + {fileEvents(2), 2, 2}, 45 + {fileEvents(200), 200, 200}, 46 + {append(fileEvents(1), statsEvents(1)...), 1, 2}, 47 + {append(fileEvents(1), statsEvents(2)...), 1, 2}, 48 + {append(fileEvents(1), statsEvents(99)...), 1, 2}, 49 + {append(fileEvents(1), statsEvents(100)...), 2, 2}, 50 + {statsEvents(500), 5, 5}, 51 + {statsEvents(501), 5, 6}, 52 + } 53 + 54 + for _, tc := range cases { 55 + count := 0 56 + ss := newSamplingSender(zoekt.SenderFunc(func(*zoekt.SearchResult) { 57 + count += 1 58 + })) 59 + 60 + for _, event := range tc.events { 61 + ss.Send(event) 62 + } 63 + if count != tc.beforeFlushCount { 64 + t.Fatalf("expected %d events, got %d", tc.beforeFlushCount, count) 65 + } 66 + ss.Flush() 67 + 68 + if count != tc.afterFlushCount { 69 + t.Fatalf("expected %d events, got %d", tc.afterFlushCount, count) 70 + } 71 + } 72 + }
+2 -3
cmd/zoekt-webserver/grpc/server/server.go
··· 11 11 12 12 "github.com/sourcegraph/zoekt" 13 13 "github.com/sourcegraph/zoekt/query" 14 - "github.com/sourcegraph/zoekt/stream" 15 14 ) 16 15 17 16 func NewServer(s zoekt.Streamer) *Server { ··· 48 47 } 49 48 50 49 sender := gRPCChunkSender(ss) 51 - sampler := stream.NewSamplingSender(sender) 50 + sampler := newSamplingSender(sender) 52 51 53 52 err = s.streamer.StreamSearch(ss.Context(), q, zoekt.SearchOptionsFromProto(request.GetOpts()), sampler) 54 53 if err == nil { ··· 125 124 _ = chunk.SendAll(sendFunc, result.GetFiles()...) 126 125 } 127 126 128 - return stream.SenderFunc(f) 127 + return zoekt.SenderFunc(f) 129 128 }
+1 -2
cmd/zoekt-webserver/main.go
··· 57 57 "github.com/sourcegraph/zoekt/internal/tracer" 58 58 "github.com/sourcegraph/zoekt/query" 59 59 "github.com/sourcegraph/zoekt/shards" 60 - "github.com/sourcegraph/zoekt/stream" 61 60 "github.com/sourcegraph/zoekt/trace" 62 61 "github.com/sourcegraph/zoekt/web" 63 62 ··· 554 553 var stats zoekt.Stats 555 554 556 555 metricSearchRequestsTotal.Inc() 557 - err := s.Streamer.StreamSearch(ctx, q, opts, stream.SenderFunc(func(event *zoekt.SearchResult) { 556 + err := s.Streamer.StreamSearch(ctx, q, opts, zoekt.SenderFunc(func(event *zoekt.SearchResult) { 558 557 stats.Add(event.Stats) 559 558 sender.Send(event) 560 559 }))
-64
query/query.go
··· 15 15 package query 16 16 17 17 import ( 18 - "bytes" 19 - "encoding/gob" 20 18 "encoding/json" 21 19 "fmt" 22 20 "log" ··· 25 23 "sort" 26 24 "strconv" 27 25 "strings" 28 - "sync" 29 26 30 27 "github.com/RoaringBitmap/roaring" 31 28 "github.com/grafana/regexp" ··· 37 34 // Q is a representation for a possibly hierarchical search query. 38 35 type Q interface { 39 36 String() string 40 - } 41 - 42 - // RPCUnwrap processes q to remove RPC specific elements from q. This is 43 - // needed because gob isn't flexible enough for us. This should be called by 44 - // RPC servers at the client/server boundary so that q works with the rest of 45 - // zoekt. 46 - func RPCUnwrap(q Q) Q { 47 - if cache, ok := q.(*GobCache); ok { 48 - return cache.Q 49 - } 50 - return q 51 37 } 52 38 53 39 // RawConfig filters repositories based on their encoded RawConfig map. ··· 460 446 case "auto": 461 447 q.CaseSensitive = !q.Regexp.Equal(LowerRegexp(q.Regexp)) 462 448 } 463 - } 464 - 465 - // GobCache exists so we only pay the cost of marshalling a query once when we 466 - // aggregate it out over all the replicas. 467 - // 468 - // Our query and eval layer do not support GobCache. Instead, at the gob 469 - // boundaries (RPC and Streaming) we check if the Q is a GobCache and unwrap 470 - // it. 471 - // 472 - // "I wish we could get rid of this code soon enough" - tomas 473 - type GobCache struct { 474 - Q 475 - 476 - once sync.Once 477 - data []byte 478 - err error 479 - } 480 - 481 - // GobEncode implements gob.Encoder. 482 - func (q *GobCache) GobEncode() ([]byte, error) { 483 - q.once.Do(func() { 484 - var buf bytes.Buffer 485 - enc := gob.NewEncoder(&buf) 486 - q.err = enc.Encode(&gobWrapper{ 487 - WrappedQ: q.Q, 488 - }) 489 - q.data = buf.Bytes() 490 - }) 491 - return q.data, q.err 492 - } 493 - 494 - // GobDecode implements gob.Decoder. 495 - func (q *GobCache) GobDecode(data []byte) error { 496 - dec := gob.NewDecoder(bytes.NewBuffer(data)) 497 - var w gobWrapper 498 - err := dec.Decode(&w) 499 - if err != nil { 500 - return err 501 - } 502 - q.Q = w.WrappedQ 503 - return nil 504 - } 505 - 506 - // gobWrapper is needed so the gob decoder works. 507 - type gobWrapper struct { 508 - WrappedQ Q 509 - } 510 - 511 - func (q *GobCache) String() string { 512 - return fmt.Sprintf("GobCache(%s)", q.Q) 513 449 } 514 450 515 451 // Or is matched when any of its children is matched.
-1
query/query_proto.go
··· 50 50 return &proto.Q{Query: &proto.Q_Boost{Boost: v.ToProto()}} 51 51 default: 52 52 // The following nodes do not have a proto representation: 53 - // - GobCache: only needed for Gob encoding 54 53 // - caseQ: only used internally, not by the RPC layer 55 54 panic(fmt.Sprintf("unknown query node %T", v)) 56 55 }
-71
rpc/internal/srv/srv.go
··· 1 - package srv 2 - 3 - import ( 4 - "context" 5 - "time" 6 - 7 - "github.com/sourcegraph/zoekt" 8 - "github.com/sourcegraph/zoekt/query" 9 - ) 10 - 11 - // defaultTimeout is the maximum amount of time a search request should 12 - // take. This is the same default used by Sourcegraph. 13 - const defaultTimeout = 20 * time.Second 14 - 15 - type SearchArgs struct { 16 - Q query.Q 17 - Opts *zoekt.SearchOptions 18 - } 19 - 20 - type SearchReply struct { 21 - Result *zoekt.SearchResult 22 - } 23 - 24 - type ListArgs struct { 25 - Q query.Q 26 - Opts *zoekt.ListOptions 27 - } 28 - 29 - type ListReply struct { 30 - List *zoekt.RepoList 31 - } 32 - 33 - type Searcher struct { 34 - Searcher zoekt.Searcher 35 - } 36 - 37 - func (s *Searcher) Search(ctx context.Context, args *SearchArgs, reply *SearchReply) error { 38 - // Set a timeout if the user hasn't specified one. 39 - if args.Opts != nil && args.Opts.MaxWallTime == 0 { 40 - var cancel context.CancelFunc 41 - ctx, cancel = context.WithTimeout(ctx, defaultTimeout) 42 - defer cancel() 43 - } 44 - 45 - if args.Q != nil { 46 - args.Q = query.RPCUnwrap(args.Q) 47 - } 48 - 49 - r, err := s.Searcher.Search(ctx, args.Q, args.Opts) 50 - if err != nil { 51 - return err 52 - } 53 - reply.Result = r 54 - return nil 55 - } 56 - 57 - func (s *Searcher) List(ctx context.Context, args *ListArgs, reply *ListReply) error { 58 - ctx, cancel := context.WithTimeout(ctx, defaultTimeout) 59 - defer cancel() 60 - 61 - if args.Q != nil { 62 - args.Q = query.RPCUnwrap(args.Q) 63 - } 64 - 65 - r, err := s.Searcher.List(ctx, args.Q, args.Opts) 66 - if err != nil { 67 - return err 68 - } 69 - reply.List = r 70 - return nil 71 - }
-210
rpc/rpc.go
··· 1 - // Package rpc provides a zoekt.Searcher over RPC. 2 - package rpc 3 - 4 - import ( 5 - "context" 6 - "encoding/gob" 7 - "fmt" 8 - "net/http" 9 - "reflect" 10 - "strings" 11 - "sync" 12 - "time" 13 - 14 - "github.com/keegancsmith/rpc" 15 - "github.com/sourcegraph/zoekt" 16 - "github.com/sourcegraph/zoekt/query" 17 - "github.com/sourcegraph/zoekt/rpc/internal/srv" 18 - ) 19 - 20 - // DefaultRPCPath is the rpc path used by zoekt-webserver 21 - const DefaultRPCPath = "/rpc" 22 - 23 - // Server returns an http.Handler for searcher which is the server side of the 24 - // RPC calls. 25 - func Server(searcher zoekt.Searcher) http.Handler { 26 - RegisterGob() 27 - server := rpc.NewServer() 28 - if err := server.Register(&srv.Searcher{Searcher: searcher}); err != nil { 29 - // this should never fail, so we panic. 30 - panic("unexpected error registering rpc server: " + err.Error()) 31 - } 32 - return server 33 - } 34 - 35 - // Client connects to a Searcher HTTP RPC server at address (host:port) using 36 - // DefaultRPCPath path. 37 - func Client(address string) zoekt.Searcher { 38 - return ClientAtPath(address, DefaultRPCPath) 39 - } 40 - 41 - // ClientAtPath connects to a Searcher HTTP RPC server at address and path 42 - // (http://host:port/path). 43 - func ClientAtPath(address, path string) zoekt.Searcher { 44 - RegisterGob() 45 - return &client{addr: address, path: path} 46 - } 47 - 48 - type client struct { 49 - addr, path string 50 - 51 - mu sync.Mutex // protects client and gen 52 - cl *rpc.Client 53 - gen int // incremented each time we dial 54 - } 55 - 56 - func (c *client) Search(ctx context.Context, q query.Q, opts *zoekt.SearchOptions) (*zoekt.SearchResult, error) { 57 - var reply srv.SearchReply 58 - err := c.call(ctx, "Searcher.Search", &srv.SearchArgs{Q: q, Opts: opts}, &reply) 59 - return reply.Result, err 60 - } 61 - 62 - func (c *client) List(ctx context.Context, q query.Q, opts *zoekt.ListOptions) (*zoekt.RepoList, error) { 63 - var reply srv.ListReply 64 - err := c.call(ctx, "Searcher.List", &srv.ListArgs{Q: q, Opts: opts}, &reply) 65 - return reply.List, err 66 - } 67 - 68 - func (c *client) call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error { 69 - // We try twice. If we fail to dial or fail to call the function we try 70 - // again after 100ms. Unrolled to make logic clear 71 - cl, gen, err := c.getRPCClient(ctx, 0) 72 - if err == nil { 73 - err = cl.Call(ctx, serviceMethod, args, reply) 74 - if err != rpc.ErrShutdown { 75 - return err 76 - } 77 - } 78 - 79 - select { 80 - case <-ctx.Done(): 81 - return ctx.Err() 82 - case <-time.After(100 * time.Millisecond): 83 - } 84 - 85 - cl, _, err = c.getRPCClient(ctx, gen) 86 - if err != nil { 87 - return err 88 - } 89 - return cl.Call(ctx, serviceMethod, args, reply) 90 - } 91 - 92 - // getRPCClient gets the rpc client. If gen matches the current generation, we 93 - // redial and increment the generation. This is used to prevent concurrent 94 - // redialing on network failure. 95 - func (c *client) getRPCClient(ctx context.Context, gen int) (*rpc.Client, int, error) { 96 - // coarse lock so we only dial once 97 - c.mu.Lock() 98 - defer c.mu.Unlock() 99 - if gen != c.gen { 100 - return c.cl, c.gen, nil 101 - } 102 - var timeout time.Duration 103 - if deadline, ok := ctx.Deadline(); ok { 104 - timeout = time.Until(deadline) 105 - } 106 - cl, err := rpc.DialHTTPPathTimeout("tcp", c.addr, c.path, timeout) 107 - if err != nil { 108 - return nil, c.gen, err 109 - } 110 - c.cl = cl 111 - c.gen++ 112 - return c.cl, c.gen, nil 113 - } 114 - 115 - func (c *client) Close() { 116 - c.mu.Lock() 117 - defer c.mu.Unlock() 118 - if c.cl != nil { 119 - c.cl.Close() 120 - } 121 - } 122 - 123 - func (c *client) String() string { 124 - return fmt.Sprintf("rpcSearcher(%s/%s)", c.addr, c.path) 125 - } 126 - 127 - var once sync.Once 128 - 129 - // RegisterGob registers various query types with gob. It can be called more than 130 - // once, because calls to gob.Register are protected by a sync.Once. 131 - func RegisterGob() { 132 - once.Do(func() { 133 - gobRegister(&query.And{}) 134 - gobRegister(&query.BranchRepos{}) 135 - gobRegister(&query.BranchesRepos{}) 136 - gobRegister(&query.Branch{}) 137 - gobRegister(&query.Const{}) 138 - gobRegister(&query.FileNameSet{}) 139 - gobRegister(&query.GobCache{}) 140 - gobRegister(&query.Language{}) 141 - gobRegister(&query.Not{}) 142 - gobRegister(&query.Or{}) 143 - gobRegister(&query.Regexp{}) 144 - gobRegister(&query.RepoRegexp{}) 145 - gobRegister(&query.RepoSet{}) 146 - gobRegister(&query.RepoIDs{}) 147 - gobRegister(&query.Repo{}) 148 - gobRegister(&query.Substring{}) 149 - gobRegister(&query.Symbol{}) 150 - gobRegister(&query.Type{}) 151 - gobRegister(query.RawConfig(41)) 152 - }) 153 - } 154 - 155 - // gobRegister exists to keep backwards compatibility around renames of the go 156 - // module. This is to avoid breaking the wire protocol due to refactors. In 157 - // particular in August 2022 we renamed the go module from 158 - // github.com/google/zoekt to github.com/sourcegraph/zoekt which breaks the 159 - // wire protocol. So this function will replace those names so we keep using 160 - // google/zoekt. 161 - func gobRegister(value any) { 162 - name := gobRegister_name(value) 163 - 164 - name = strings.Replace(name, "github.com/sourcegraph/", "github.com/google/", 1) 165 - 166 - gob.RegisterName(name, value) 167 - } 168 - 169 - // gobRegister_name is copy-pasta from the stdlib gob.Register, returning the 170 - // name it picks for gob.RegisterName. 171 - func gobRegister_name(value any) string { 172 - // Default to printed representation for unnamed types 173 - rt := reflect.TypeOf(value) 174 - name := rt.String() 175 - 176 - // But for named types (or pointers to them), qualify with import path (but see inner comment). 177 - // Dereference one pointer looking for a named type. 178 - star := "" 179 - if rt.Name() == "" { 180 - if pt := rt; pt.Kind() == reflect.Pointer { 181 - star = "*" 182 - // NOTE: The following line should be rt = pt.Elem() to implement 183 - // what the comment above claims, but fixing it would break compatibility 184 - // with existing gobs. 185 - // 186 - // Given package p imported as "full/p" with these definitions: 187 - // package p 188 - // type T1 struct { ... } 189 - // this table shows the intended and actual strings used by gob to 190 - // name the types: 191 - // 192 - // Type Correct string Actual string 193 - // 194 - // T1 full/p.T1 full/p.T1 195 - // *T1 *full/p.T1 *p.T1 196 - // 197 - // The missing full path cannot be fixed without breaking existing gob decoders. 198 - rt = pt 199 - } 200 - } 201 - if rt.Name() != "" { 202 - if rt.PkgPath() == "" { 203 - name = star + rt.Name() 204 - } else { 205 - name = star + rt.PkgPath() + "." + rt.Name() 206 - } 207 - } 208 - 209 - return name 210 - }
-81
rpc/rpc_test.go
··· 1 - package rpc_test 2 - 3 - import ( 4 - "context" 5 - "net/http/httptest" 6 - "net/url" 7 - "reflect" 8 - "testing" 9 - 10 - "github.com/google/go-cmp/cmp" 11 - "github.com/google/go-cmp/cmp/cmpopts" 12 - "github.com/sourcegraph/zoekt" 13 - "github.com/sourcegraph/zoekt/internal/mockSearcher" 14 - "github.com/sourcegraph/zoekt/query" 15 - "github.com/sourcegraph/zoekt/rpc" 16 - ) 17 - 18 - func TestClientServer(t *testing.T) { 19 - mock := &mockSearcher.MockSearcher{ 20 - WantSearch: query.NewAnd(mustParse("hello world|universe"), query.NewSingleBranchesRepos("HEAD", 1, 2)), 21 - SearchResult: &zoekt.SearchResult{ 22 - Files: []zoekt.FileMatch{ 23 - {FileName: "bin.go"}, 24 - }, 25 - }, 26 - 27 - WantList: &query.Const{Value: true}, 28 - RepoList: &zoekt.RepoList{ 29 - Repos: []*zoekt.RepoListEntry{ 30 - { 31 - Repository: zoekt.Repository{ 32 - ID: 2, 33 - Name: "foo/bar", 34 - }, 35 - }, 36 - }, 37 - }, 38 - } 39 - 40 - ts := httptest.NewServer(rpc.Server(mock)) 41 - defer ts.Close() 42 - 43 - u, err := url.Parse(ts.URL) 44 - if err != nil { 45 - t.Fatal(err) 46 - } 47 - client := rpc.Client(u.Host) 48 - defer client.Close() 49 - 50 - var cached query.Q = &query.GobCache{ 51 - Q: mock.WantSearch, 52 - } 53 - 54 - r, err := client.Search(context.Background(), cached, &zoekt.SearchOptions{}) 55 - if err != nil { 56 - t.Fatal(err) 57 - } 58 - if !reflect.DeepEqual(r, mock.SearchResult) { 59 - t.Fatalf("got %+v, want %+v", r, mock.SearchResult) 60 - } 61 - 62 - l, err := client.List(context.Background(), mock.WantList, nil) 63 - if err != nil { 64 - t.Fatal(err) 65 - } 66 - if d := cmp.Diff(mock.RepoList, l, cmpopts.IgnoreUnexported(zoekt.Repository{})); d != "" { 67 - t.Fatalf("unexpected RepoList (-want, +got):\n%s", d) 68 - } 69 - 70 - // Test closing a client we never dial. 71 - noopClient := rpc.Client(u.Host) 72 - noopClient.Close() 73 - } 74 - 75 - func mustParse(s string) query.Q { 76 - q, err := query.Parse(s) 77 - if err != nil { 78 - panic(err) 79 - } 80 - return q 81 - }
+3 -4
shards/aggregate.go
··· 9 9 "github.com/prometheus/client_golang/prometheus/promauto" 10 10 11 11 "github.com/sourcegraph/zoekt" 12 - "github.com/sourcegraph/zoekt/stream" 13 12 ) 14 13 15 14 var metricFinalAggregateSize = promauto.NewHistogramVec(prometheus.HistogramOpts{ ··· 138 137 stopCollectingAndFlush(zoekt.FlushReasonFinalFlush) 139 138 } 140 139 141 - return stream.SenderFunc(func(event *zoekt.SearchResult) { 140 + return zoekt.SenderFunc(func(event *zoekt.SearchResult) { 142 141 mu.Lock() 143 142 if collectSender != nil { 144 143 collectSender.Send(event) ··· 152 151 // limitSender wraps a sender and calls cancel once the truncator has finished 153 152 // truncating. 154 153 func limitSender(cancel context.CancelFunc, sender zoekt.Sender, truncator zoekt.DisplayTruncator) zoekt.Sender { 155 - return stream.SenderFunc(func(result *zoekt.SearchResult) { 154 + return zoekt.SenderFunc(func(result *zoekt.SearchResult) { 156 155 var hasMore bool 157 156 result.Files, hasMore = truncator(result.Files) 158 157 if !hasMore { ··· 163 162 } 164 163 165 164 func copyFileSender(sender zoekt.Sender) zoekt.Sender { 166 - return stream.SenderFunc(func(result *zoekt.SearchResult) { 165 + return zoekt.SenderFunc(func(result *zoekt.SearchResult) { 167 166 copyFiles(result) 168 167 sender.Send(result) 169 168 })
+1 -2
shards/eval.go
··· 5 5 6 6 "github.com/sourcegraph/zoekt" 7 7 "github.com/sourcegraph/zoekt/query" 8 - "github.com/sourcegraph/zoekt/stream" 9 8 "github.com/sourcegraph/zoekt/trace" 10 9 ) 11 10 ··· 59 58 return err 60 59 } 61 60 62 - return s.Streamer.StreamSearch(ctx, q, opts, stream.SenderFunc(func(event *zoekt.SearchResult) { 61 + return s.Streamer.StreamSearch(ctx, q, opts, zoekt.SenderFunc(func(event *zoekt.SearchResult) { 63 62 stats.Add(event.Stats) 64 63 sender.Send(event) 65 64 }))
+2 -3
shards/shards_test.go
··· 37 37 38 38 "github.com/sourcegraph/zoekt" 39 39 "github.com/sourcegraph/zoekt/query" 40 - "github.com/sourcegraph/zoekt/stream" 41 40 ) 42 41 43 42 type crashSearcher struct{} ··· 258 257 } 259 258 260 259 err := ss.StreamSearch(context.Background(), &query.Substring{Pattern: "foo"}, opts, 261 - stream.SenderFunc(func(event *zoekt.SearchResult) { 260 + zoekt.SenderFunc(func(event *zoekt.SearchResult) { 262 261 results = append(results, event) 263 262 })) 264 263 if err != nil { ··· 1129 1128 ss.replace(map[string]zoekt.Searcher{"r1": searcher}) 1130 1129 1131 1130 var files []zoekt.FileMatch 1132 - sender := stream.SenderFunc(func(result *zoekt.SearchResult) { 1131 + sender := zoekt.SenderFunc(func(result *zoekt.SearchResult) { 1133 1132 files = append(files, result.Files...) 1134 1133 }) 1135 1134
-126
stream/client.go
··· 1 - package stream 2 - 3 - import ( 4 - "bytes" 5 - "context" 6 - "encoding/gob" 7 - "fmt" 8 - "net/http" 9 - 10 - "github.com/sourcegraph/zoekt" 11 - "github.com/sourcegraph/zoekt/query" 12 - ) 13 - 14 - // Doer implements the minimal surface of *http.Client and http.RoundTripper needed 15 - // by Client. 16 - type Doer interface { 17 - Do(*http.Request) (*http.Response, error) 18 - } 19 - 20 - // NewClient returns a client which implements StreamSearch. If httpClient is 21 - // nil, http.DefaultClient is used. 22 - func NewClient(address string, httpClient Doer) *Client { 23 - registerGob() 24 - if httpClient == nil { 25 - httpClient = http.DefaultClient 26 - } 27 - return &Client{ 28 - address: address, 29 - httpClient: httpClient, 30 - } 31 - } 32 - 33 - // Client is an HTTP client for StreamSearch. Do not create directly, call 34 - // NewClient. 35 - type Client struct { 36 - // HTTP address of zoekt-webserver. Will query against address + "/stream". 37 - address string 38 - 39 - // httpClient when set is used instead of http.DefaultClient 40 - httpClient Doer 41 - } 42 - 43 - // SenderFunc is an adapter to allow the use of ordinary functions as Sender. 44 - // If f is a function with the appropriate signature, SenderFunc(f) is a Sender 45 - // that calls f. 46 - type SenderFunc func(result *zoekt.SearchResult) 47 - 48 - func (f SenderFunc) Send(result *zoekt.SearchResult) { 49 - f(result) 50 - } 51 - 52 - // StreamSearch returns search results as stream by calling streamer.Send(event) 53 - // for each event returned by the server. 54 - // 55 - // Error events returned by the server are returned as error. Context errors are 56 - // recreated and returned on a best-efforts basis. 57 - func (c *Client) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, streamer zoekt.Sender) error { 58 - // Encode query and opts. 59 - buf := new(bytes.Buffer) 60 - args := &searchArgs{ 61 - q, opts, 62 - } 63 - enc := gob.NewEncoder(buf) 64 - err := enc.Encode(args) 65 - if err != nil { 66 - return fmt.Errorf("error during encoding: %w", err) 67 - } 68 - 69 - // Send request. 70 - req, err := http.NewRequestWithContext(ctx, "POST", c.address+DefaultSSEPath, buf) 71 - if err != nil { 72 - return err 73 - } 74 - req.Header.Set("Accept", "application/x-gob-stream") 75 - req.Header.Set("Cache-Control", "no-cache") 76 - req.Header.Set("Connection", "keep-alive") 77 - req.Header.Set("Transfer-Encoding", "chunked") 78 - 79 - resp, err := c.httpClient.Do(req) 80 - if err != nil { 81 - return err 82 - } 83 - defer resp.Body.Close() 84 - 85 - dec := gob.NewDecoder(resp.Body) 86 - for { 87 - reply := &searchReply{} 88 - err := dec.Decode(reply) 89 - if err != nil { 90 - return fmt.Errorf("error during decoding: %w", err) 91 - } 92 - switch reply.Event { 93 - case eventMatches: 94 - if res, ok := reply.Data.(*zoekt.SearchResult); ok { 95 - streamer.Send(res) 96 - } else { 97 - return fmt.Errorf("event of type %s could not be converted to *zoekt.SearchResult", eventMatches.string()) 98 - } 99 - case eventError: 100 - if errString, ok := reply.Data.(string); ok { 101 - return fmt.Errorf("error received from zoekt: %s", errString) 102 - } else { 103 - return fmt.Errorf("data for event of type %s could not be converted to string", eventError.string()) 104 - } 105 - case eventDone: 106 - return nil 107 - default: 108 - return fmt.Errorf("unknown event type") 109 - } 110 - } 111 - } 112 - 113 - // WithSearcher returns Streamer composed of s and the streaming client. All 114 - // non-streaming calls will go via s, while streaming calls will go via the 115 - // streaming client. 116 - func (c *Client) WithSearcher(s zoekt.Searcher) zoekt.Streamer { 117 - return &streamer{ 118 - Searcher: s, 119 - Client: c, 120 - } 121 - } 122 - 123 - type streamer struct { 124 - zoekt.Searcher 125 - *Client 126 - }
-209
stream/stream.go
··· 1 - // Package stream provides a client and a server to consume search results as 2 - // stream. 3 - package stream 4 - 5 - import ( 6 - "encoding/gob" 7 - "errors" 8 - "math" 9 - "net/http" 10 - "sync" 11 - 12 - "github.com/sourcegraph/zoekt" 13 - "github.com/sourcegraph/zoekt/query" 14 - "github.com/sourcegraph/zoekt/rpc" 15 - ) 16 - 17 - // DefaultSSEPath is the path used by zoekt-webserver. 18 - const DefaultSSEPath = "/stream" 19 - 20 - type eventType int 21 - 22 - const ( 23 - eventMatches eventType = iota 24 - eventError 25 - eventDone 26 - ) 27 - 28 - func (e eventType) string() string { 29 - return []string{"eventMatches", "eventError", "eventDone"}[e] 30 - } 31 - 32 - // Server returns an http.Handler which is the server side of StreamSearch. 33 - func Server(searcher zoekt.Streamer) http.Handler { 34 - registerGob() 35 - return &handler{Searcher: searcher} 36 - } 37 - 38 - type searchArgs struct { 39 - Q query.Q 40 - Opts *zoekt.SearchOptions 41 - } 42 - 43 - type searchReply struct { 44 - Event eventType 45 - Data interface{} 46 - } 47 - 48 - type handler struct { 49 - Searcher zoekt.Streamer 50 - } 51 - 52 - func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 53 - ctx := r.Context() 54 - 55 - // Decode payload. 56 - args := new(searchArgs) 57 - err := gob.NewDecoder(r.Body).Decode(args) 58 - if err != nil { 59 - http.Error(w, err.Error(), http.StatusBadRequest) 60 - return 61 - } 62 - 63 - args.Q = query.RPCUnwrap(args.Q) 64 - 65 - eventWriter, err := newEventStreamWriter(w) 66 - if err != nil { 67 - http.Error(w, err.Error(), http.StatusInternalServerError) 68 - return 69 - } 70 - 71 - // Always send a done event in the end. 72 - defer func() { 73 - err = eventWriter.event(eventDone, nil) 74 - if err != nil { 75 - _ = eventWriter.event(eventError, err) 76 - } 77 - }() 78 - 79 - send := func(zsr *zoekt.SearchResult) { 80 - err := eventWriter.event(eventMatches, zsr) 81 - if err != nil { 82 - _ = eventWriter.event(eventError, err) 83 - return 84 - } 85 - } 86 - 87 - sampler := NewSamplingSender(SenderFunc(send)) 88 - 89 - err = h.Searcher.StreamSearch(ctx, args.Q, args.Opts, sampler) 90 - 91 - if err == nil { 92 - sampler.Flush() 93 - } 94 - 95 - if err != nil { 96 - _ = eventWriter.event(eventError, err) 97 - return 98 - } 99 - } 100 - 101 - type eventStreamWriter struct { 102 - enc *gob.Encoder 103 - flush func() 104 - } 105 - 106 - func newEventStreamWriter(w http.ResponseWriter) (*eventStreamWriter, error) { 107 - flusher, ok := w.(http.Flusher) 108 - if !ok { 109 - return nil, errors.New("http flushing not supported") 110 - } 111 - 112 - w.Header().Set("Content-Type", "application/x-gob-stream") 113 - w.Header().Set("Cache-Control", "no-cache") 114 - w.Header().Set("Connection", "keep-alive") 115 - w.Header().Set("Transfer-Encoding", "chunked") 116 - 117 - // This informs nginx to not buffer. With buffering search responses will 118 - // be delayed until buffers get full, leading to worst case latency of the 119 - // full time a search takes to complete. 120 - w.Header().Set("X-Accel-Buffering", "no") 121 - 122 - return &eventStreamWriter{ 123 - enc: gob.NewEncoder(w), 124 - flush: flusher.Flush, 125 - }, nil 126 - } 127 - 128 - func (e *eventStreamWriter) event(event eventType, data interface{}) error { 129 - // Because gob does not support serializing errors, we send error.Error() and 130 - // recreate the error on the client-side. 131 - if event == eventError { 132 - if err, isError := data.(error); isError { 133 - data = err.Error() 134 - } 135 - } 136 - err := e.enc.Encode(searchReply{Event: event, Data: data}) 137 - if err != nil { 138 - return err 139 - } 140 - e.flush() 141 - return nil 142 - } 143 - 144 - var once sync.Once 145 - 146 - func registerGob() { 147 - once.Do(func() { 148 - gob.Register(&zoekt.SearchResult{}) 149 - }) 150 - rpc.RegisterGob() 151 - } 152 - 153 - // NewSamplingSender is a zoekt.Sender that samples stats events 154 - // to avoid sending many empty stats events over the wire. 155 - func NewSamplingSender(next zoekt.Sender) *samplingSender { 156 - return &samplingSender{ 157 - next: next, 158 - agg: zoekt.SearchResult{}, 159 - aggCount: 0, 160 - } 161 - } 162 - 163 - type samplingSender struct { 164 - next zoekt.Sender 165 - agg zoekt.SearchResult 166 - aggCount int 167 - } 168 - 169 - func (s *samplingSender) Send(event *zoekt.SearchResult) { 170 - // We don't want to send events over the wire if they don't contain file 171 - // matches. Hence, in case we didn't find any results, we aggregate the stats 172 - // and send them out in regular intervals. 173 - if len(event.Files) == 0 { 174 - s.aggCount++ 175 - 176 - s.agg.Stats.Add(event.Stats) 177 - s.agg.Progress = event.Progress 178 - 179 - if s.aggCount%100 == 0 && !s.agg.Stats.Zero() { 180 - s.next.Send(&s.agg) 181 - s.agg = zoekt.SearchResult{} 182 - } 183 - 184 - return 185 - } 186 - 187 - // If we have aggregate stats, we merge them with the new event before sending 188 - // it. We drop agg.Progress, because we assume that event.Progress reflects the 189 - // latest status. 190 - if !s.agg.Stats.Zero() { 191 - event.Stats.Add(s.agg.Stats) 192 - s.agg = zoekt.SearchResult{} 193 - } 194 - 195 - s.next.Send(event) 196 - } 197 - 198 - // Flush sends any aggregated stats that we haven't sent yet 199 - func (s *samplingSender) Flush() { 200 - if !s.agg.Stats.Zero() { 201 - s.next.Send(&zoekt.SearchResult{ 202 - Stats: s.agg.Stats, 203 - Progress: zoekt.Progress{ 204 - Priority: math.Inf(-1), 205 - MaxPendingPriority: math.Inf(-1), 206 - }, 207 - }) 208 - } 209 - }
-262
stream/stream_test.go
··· 1 - package stream 2 - 3 - import ( 4 - "bytes" 5 - "context" 6 - "encoding/gob" 7 - "fmt" 8 - "net/http" 9 - "net/http/httptest" 10 - "testing" 11 - 12 - "github.com/google/go-cmp/cmp" 13 - 14 - "github.com/sourcegraph/zoekt" 15 - "github.com/sourcegraph/zoekt/internal/mockSearcher" 16 - "github.com/sourcegraph/zoekt/query" 17 - ) 18 - 19 - func TestStreamSearch(t *testing.T) { 20 - q := query.NewAnd(mustParse("hello world|universe"), query.NewRepoSet("foo/bar", "baz/bam")) 21 - searcher := &mockSearcher.MockSearcher{ 22 - WantSearch: q, 23 - SearchResult: &zoekt.SearchResult{ 24 - Files: []zoekt.FileMatch{ 25 - {FileName: "bin.go"}, 26 - }, 27 - }, 28 - } 29 - 30 - h := &handler{Searcher: adapter{searcher}} 31 - 32 - s := httptest.NewServer(h) 33 - defer s.Close() 34 - 35 - cl := NewClient(s.URL, nil) 36 - 37 - c := make(chan *zoekt.SearchResult, 100) 38 - 39 - err := cl.StreamSearch(context.Background(), q, nil, streamerChan(c)) 40 - if err != nil { 41 - t.Fatal(err) 42 - } 43 - close(c) 44 - 45 - for res := range c { 46 - if res.Files == nil { 47 - continue 48 - } 49 - if res.Files[0].FileName != "bin.go" { 50 - t.Errorf("got %s, wanted %s", res.Files[0].FileName, "bin.go") 51 - } 52 - } 53 - } 54 - 55 - func TestStreamSearchJustStats(t *testing.T) { 56 - wantStats := zoekt.Stats{ 57 - Crashes: 1, 58 - } 59 - q := query.NewAnd(mustParse("hello world|universe"), query.NewRepoSet("foo/bar", "baz/bam")) 60 - searcher := &mockSearcher.MockSearcher{ 61 - WantSearch: q, 62 - SearchResult: &zoekt.SearchResult{ 63 - Files: []zoekt.FileMatch{}, 64 - Stats: wantStats, 65 - }, 66 - } 67 - 68 - h := &handler{Searcher: adapter{searcher}} 69 - 70 - s := httptest.NewServer(h) 71 - defer s.Close() 72 - 73 - cl := NewClient(s.URL, nil) 74 - 75 - c := make(chan *zoekt.SearchResult, 100) 76 - 77 - err := cl.StreamSearch(context.Background(), q, nil, streamerChan(c)) 78 - if err != nil { 79 - t.Fatal(err) 80 - } 81 - close(c) 82 - 83 - count := 0 84 - for res := range c { 85 - count += 1 86 - if count > 1 { 87 - t.Fatal("expected exactly 1 result, got at least 2") 88 - } 89 - if d := cmp.Diff(wantStats, res.Stats); d != "" { 90 - t.Fatalf("zoekt.Stats mismatch (-want +got): %s\n", d) 91 - } 92 - } 93 - if count != 1 { 94 - t.Fatal("expected exactly 1 result, got 0") 95 - } 96 - } 97 - 98 - func TestEventStreamWriter(t *testing.T) { 99 - registerGob() 100 - network := new(bytes.Buffer) 101 - enc := gob.NewEncoder(network) 102 - dec := gob.NewDecoder(network) 103 - 104 - esw := eventStreamWriter{ 105 - enc: enc, 106 - flush: func() {}, 107 - } 108 - 109 - tests := []struct { 110 - event eventType 111 - data interface{} 112 - }{ 113 - { 114 - eventDone, 115 - nil, 116 - }, 117 - { 118 - eventMatches, 119 - &zoekt.SearchResult{ 120 - Files: []zoekt.FileMatch{ 121 - {FileName: "bin.go"}, 122 - }, 123 - }, 124 - }, 125 - { 126 - eventError, 127 - "test error", 128 - }, 129 - } 130 - 131 - for _, tt := range tests { 132 - t.Run(tt.event.string(), func(t *testing.T) { 133 - err := esw.event(tt.event, tt.data) 134 - if err != nil { 135 - t.Fatal(err) 136 - } 137 - reply := new(searchReply) 138 - err = dec.Decode(reply) 139 - if err != nil { 140 - t.Fatal(err) 141 - } 142 - if reply.Event != tt.event { 143 - t.Fatalf("got %s, want %s", reply.Event.string(), tt.event.string()) 144 - } 145 - if d := cmp.Diff(tt.data, reply.Data); d != "" { 146 - t.Fatalf("mismatch for event type %s (-want +got):\n%s", tt.event.string(), d) 147 - } 148 - }) 149 - } 150 - } 151 - 152 - func TestServerError(t *testing.T) { 153 - serverError := fmt.Errorf("zoekt server error") 154 - h := func(w http.ResponseWriter, r *http.Request) { 155 - esw, err := newEventStreamWriter(w) 156 - if err != nil { 157 - t.Fatal(err) 158 - } 159 - err = esw.event(eventError, serverError) 160 - if err != nil { 161 - t.Fatal(err) 162 - } 163 - } 164 - s := httptest.NewServer(http.HandlerFunc(h)) 165 - cl := NewClient(s.URL, nil) 166 - err := cl.StreamSearch(context.Background(), nil, nil, streamerChan(make(chan *zoekt.SearchResult))) 167 - if err == nil { 168 - t.Fatalf("got nil, want %s", serverError) 169 - } 170 - } 171 - 172 - func mustParse(s string) query.Q { 173 - q, err := query.Parse(s) 174 - if err != nil { 175 - panic(err) 176 - } 177 - return q 178 - } 179 - 180 - type streamerChan chan<- *zoekt.SearchResult 181 - 182 - func (c streamerChan) Send(result *zoekt.SearchResult) { 183 - c <- result 184 - } 185 - 186 - type adapter struct { 187 - zoekt.Searcher 188 - } 189 - 190 - func (a adapter) StreamSearch(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, sender zoekt.Sender) (err error) { 191 - sr, err := a.Searcher.Search(ctx, q, opts) 192 - if err != nil { 193 - return err 194 - } 195 - sender.Send(sr) 196 - return nil 197 - } 198 - 199 - func TestSamplingStream(t *testing.T) { 200 - nonZeroStats := zoekt.Stats{ 201 - ContentBytesLoaded: 10, 202 - } 203 - filesEvent := &zoekt.SearchResult{ 204 - Files: make([]zoekt.FileMatch, 10), 205 - Stats: nonZeroStats, 206 - } 207 - fileEvents := func(n int) []*zoekt.SearchResult { 208 - res := make([]*zoekt.SearchResult, n) 209 - for i := 0; i < n; i++ { 210 - res[i] = filesEvent 211 - } 212 - return res 213 - } 214 - statsEvent := &zoekt.SearchResult{ 215 - Stats: nonZeroStats, 216 - } 217 - statsEvents := func(n int) []*zoekt.SearchResult { 218 - res := make([]*zoekt.SearchResult, n) 219 - for i := 0; i < n; i++ { 220 - res[i] = statsEvent 221 - } 222 - return res 223 - } 224 - cases := []struct { 225 - events []*zoekt.SearchResult 226 - beforeFlushCount int 227 - afterFlushCount int 228 - }{ 229 - // These test cases assume that the sampler only forwards 230 - // every 100 stats-only event. In case the sampling logic 231 - // changes, these tests are not valuable. 232 - {nil, 0, 0}, 233 - {fileEvents(1), 1, 1}, 234 - {fileEvents(2), 2, 2}, 235 - {fileEvents(200), 200, 200}, 236 - {append(fileEvents(1), statsEvents(1)...), 1, 2}, 237 - {append(fileEvents(1), statsEvents(2)...), 1, 2}, 238 - {append(fileEvents(1), statsEvents(99)...), 1, 2}, 239 - {append(fileEvents(1), statsEvents(100)...), 2, 2}, 240 - {statsEvents(500), 5, 5}, 241 - {statsEvents(501), 5, 6}, 242 - } 243 - 244 - for _, tc := range cases { 245 - count := 0 246 - ss := NewSamplingSender(SenderFunc(func(*zoekt.SearchResult) { 247 - count += 1 248 - })) 249 - 250 - for _, event := range tc.events { 251 - ss.Send(event) 252 - } 253 - if count != tc.beforeFlushCount { 254 - t.Fatalf("expected %d events, got %d", tc.beforeFlushCount, count) 255 - } 256 - ss.Flush() 257 - 258 - if count != tc.afterFlushCount { 259 - t.Fatalf("expected %d events, got %d", tc.afterFlushCount, count) 260 - } 261 - } 262 - }
-57
web/e2e_test.go
··· 33 33 34 34 "github.com/sourcegraph/zoekt" 35 35 "github.com/sourcegraph/zoekt/query" 36 - "github.com/sourcegraph/zoekt/rpc" 37 - "github.com/sourcegraph/zoekt/stream" 38 36 ) 39 37 40 38 // TODO(hanwen): cut & paste from ../ . Should create internal test ··· 961 959 if reflect.DeepEqual(result, zoekt.SearchResult{}) { 962 960 t.Fatal("empty result in response") 963 961 } 964 - } 965 - 966 - func TestRPC(t *testing.T) { 967 - b, err := zoekt.NewIndexBuilder(&zoekt.Repository{ 968 - Name: "name", 969 - URL: "repo-url", 970 - CommitURLTemplate: "{{.Version}}", 971 - FileURLTemplate: "file-url", 972 - LineFragmentTemplate: "#line", 973 - Branches: []zoekt.RepositoryBranch{{Name: "master", Version: "1234"}}, 974 - }) 975 - if err != nil { 976 - t.Fatalf("NewIndexBuilder: %v", err) 977 - } 978 - if err := b.Add(zoekt.Document{ 979 - Name: "f2", 980 - Content: []byte("to carry water in the no later bla"), 981 - // --------------0123456789012345678901234567890123 982 - // --------------0 1 2 3 983 - Branches: []string{"master"}, 984 - }); err != nil { 985 - t.Fatalf("Add: %v", err) 986 - } 987 - 988 - s := searcherForTest(t, b) 989 - srv := Server{ 990 - Searcher: s, 991 - RPC: true, 992 - Top: Top, 993 - } 994 - 995 - mux, err := NewMux(&srv) 996 - if err != nil { 997 - t.Fatalf("NewMux: %v", err) 998 - } 999 - 1000 - ts := httptest.NewServer(mux) 1001 - defer ts.Close() 1002 - 1003 - endpoint := ts.Listener.Addr().String() 1004 - 1005 - client := stream.NewClient("http://"+endpoint, nil).WithSearcher(rpc.Client(endpoint)) 1006 - 1007 - ctx := context.Background() 1008 - q := &query.Substring{Pattern: "water"} 1009 - opts := &zoekt.SearchOptions{ChunkMatches: true} 1010 - opts.SetDefaults() 1011 - results, err := client.Search(ctx, q, opts) 1012 - if err != nil { 1013 - t.Fatal(err) 1014 - } 1015 - 1016 - assertResults(t, results.Files, "f2: to carry water in the no later bla") 1017 - 1018 - // TODO grpc, List, StreamSearch 1019 962 } 1020 963 1021 964 func assertResults(t *testing.T, files []zoekt.FileMatch, want string) {
-4
web/server.go
··· 34 34 "github.com/sourcegraph/zoekt" 35 35 zjson "github.com/sourcegraph/zoekt/json" 36 36 "github.com/sourcegraph/zoekt/query" 37 - "github.com/sourcegraph/zoekt/rpc" 38 - "github.com/sourcegraph/zoekt/stream" 39 37 ) 40 38 41 39 var Funcmap = template.FuncMap{ ··· 176 174 mux.HandleFunc("/print", s.servePrint) 177 175 } 178 176 if s.RPC { 179 - mux.Handle(rpc.DefaultRPCPath, rpc.Server(traceAwareSearcher{s.Searcher})) // /rpc 180 177 mux.Handle("/api/", http.StripPrefix("/api", zjson.JSONServer(traceAwareSearcher{s.Searcher}))) 181 - mux.Handle(stream.DefaultSSEPath, stream.Server(traceAwareSearcher{s.Searcher})) // /stream 182 178 } 183 179 184 180 mux.HandleFunc("/healthz", s.serveHealthz)