fork of https://github.com/sourcegraph/zoekt
1package server
2
3import (
4 "context"
5 "math"
6
7 "google.golang.org/grpc/codes"
8 "google.golang.org/grpc/status"
9
10 "github.com/sourcegraph/zoekt"
11 "github.com/sourcegraph/zoekt/grpc/chunk"
12 webserverv1 "github.com/sourcegraph/zoekt/grpc/protos/zoekt/webserver/v1"
13 "github.com/sourcegraph/zoekt/query"
14)
15
16func NewServer(s zoekt.Streamer) *Server {
17 return &Server{
18 streamer: s,
19 }
20}
21
22type Server struct {
23 webserverv1.UnimplementedWebserverServiceServer
24 streamer zoekt.Streamer
25}
26
27func (s *Server) Search(ctx context.Context, req *webserverv1.SearchRequest) (*webserverv1.SearchResponse, error) {
28 q, err := query.QFromProto(req.GetQuery())
29 if err != nil {
30 return nil, status.Error(codes.InvalidArgument, err.Error())
31 }
32
33 res, err := s.streamer.Search(ctx, q, zoekt.SearchOptionsFromProto(req.GetOpts()))
34 if err != nil {
35 return nil, err
36 }
37
38 return res.ToProto(), nil
39}
40
41func (s *Server) StreamSearch(req *webserverv1.StreamSearchRequest, ss webserverv1.WebserverService_StreamSearchServer) error {
42 request := req.GetRequest()
43
44 q, err := query.QFromProto(request.GetQuery())
45 if err != nil {
46 return status.Error(codes.InvalidArgument, err.Error())
47 }
48
49 sender := gRPCChunkSender(ss)
50 sampler := newSamplingSender(sender)
51
52 err = s.streamer.StreamSearch(ss.Context(), q, zoekt.SearchOptionsFromProto(request.GetOpts()), sampler)
53 if err == nil {
54 sampler.Flush()
55 }
56 return err
57}
58
59func (s *Server) List(ctx context.Context, req *webserverv1.ListRequest) (*webserverv1.ListResponse, error) {
60 q, err := query.QFromProto(req.GetQuery())
61 if err != nil {
62 return nil, status.Error(codes.InvalidArgument, err.Error())
63 }
64
65 repoList, err := s.streamer.List(ctx, q, zoekt.ListOptionsFromProto(req.GetOpts()))
66 if err != nil {
67 return nil, err
68 }
69
70 return repoList.ToProto(), nil
71}
72
73// gRPCChunkSender is a zoekt.Sender that sends small chunks of FileMatches to the provided gRPC stream.
74func gRPCChunkSender(ss webserverv1.WebserverService_StreamSearchServer) zoekt.Sender {
75 f := func(r *zoekt.SearchResult) {
76 result := r.ToStreamProto().GetResponseChunk()
77
78 if len(result.GetFiles()) == 0 { // stats-only result, send it immediately
79 _ = ss.Send(&webserverv1.StreamSearchResponse{
80 ResponseChunk: result,
81 })
82 return
83 }
84
85 // Otherwise, chunk the file matches into multiple responses
86
87 statsSent := false
88 numFilesSent := 0
89
90 sendFunc := func(filesChunk []*webserverv1.FileMatch) error {
91 numFilesSent += len(filesChunk)
92
93 var stats *webserverv1.Stats
94 if !statsSent { // We only send stats back on the first chunk
95 statsSent = true
96 stats = result.GetStats()
97 }
98
99 progress := result.GetProgress()
100
101 if numFilesSent < len(result.GetFiles()) { // more chunks to come
102 progress = &webserverv1.Progress{
103 Priority: result.GetProgress().GetPriority(),
104
105 // We want the client to consume the entire set of chunks - so we manually
106 // patch the MaxPendingPriority to be >= overall priority.
107 MaxPendingPriority: math.Max(
108 result.GetProgress().GetPriority(),
109 result.GetProgress().GetMaxPendingPriority(),
110 ),
111 }
112 }
113
114 return ss.Send(&webserverv1.StreamSearchResponse{
115 ResponseChunk: &webserverv1.SearchResponse{
116 Files: filesChunk,
117
118 Stats: stats,
119 Progress: progress,
120 },
121 })
122 }
123
124 _ = chunk.SendAll(sendFunc, result.GetFiles()...)
125 }
126
127 return zoekt.SenderFunc(f)
128}