fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 3.4 kB View raw
1package server 2 3import ( 4 "context" 5 "math" 6 7 "google.golang.org/grpc/codes" 8 "google.golang.org/grpc/status" 9 10 "github.com/sourcegraph/zoekt" 11 "github.com/sourcegraph/zoekt/grpc/chunk" 12 webserverv1 "github.com/sourcegraph/zoekt/grpc/protos/zoekt/webserver/v1" 13 "github.com/sourcegraph/zoekt/query" 14) 15 16func NewServer(s zoekt.Streamer) *Server { 17 return &Server{ 18 streamer: s, 19 } 20} 21 22type Server struct { 23 webserverv1.UnimplementedWebserverServiceServer 24 streamer zoekt.Streamer 25} 26 27func (s *Server) Search(ctx context.Context, req *webserverv1.SearchRequest) (*webserverv1.SearchResponse, error) { 28 q, err := query.QFromProto(req.GetQuery()) 29 if err != nil { 30 return nil, status.Error(codes.InvalidArgument, err.Error()) 31 } 32 33 res, err := s.streamer.Search(ctx, q, zoekt.SearchOptionsFromProto(req.GetOpts())) 34 if err != nil { 35 return nil, err 36 } 37 38 return res.ToProto(), nil 39} 40 41func (s *Server) StreamSearch(req *webserverv1.StreamSearchRequest, ss webserverv1.WebserverService_StreamSearchServer) error { 42 request := req.GetRequest() 43 44 q, err := query.QFromProto(request.GetQuery()) 45 if err != nil { 46 return status.Error(codes.InvalidArgument, err.Error()) 47 } 48 49 sender := gRPCChunkSender(ss) 50 sampler := newSamplingSender(sender) 51 52 err = s.streamer.StreamSearch(ss.Context(), q, zoekt.SearchOptionsFromProto(request.GetOpts()), sampler) 53 if err == nil { 54 sampler.Flush() 55 } 56 return err 57} 58 59func (s *Server) List(ctx context.Context, req *webserverv1.ListRequest) (*webserverv1.ListResponse, error) { 60 q, err := query.QFromProto(req.GetQuery()) 61 if err != nil { 62 return nil, status.Error(codes.InvalidArgument, err.Error()) 63 } 64 65 repoList, err := s.streamer.List(ctx, q, zoekt.ListOptionsFromProto(req.GetOpts())) 66 if err != nil { 67 return nil, err 68 } 69 70 return repoList.ToProto(), nil 71} 72 73// gRPCChunkSender is a zoekt.Sender that sends small chunks of FileMatches to the provided gRPC stream. 74func gRPCChunkSender(ss webserverv1.WebserverService_StreamSearchServer) zoekt.Sender { 75 f := func(r *zoekt.SearchResult) { 76 result := r.ToStreamProto().GetResponseChunk() 77 78 if len(result.GetFiles()) == 0 { // stats-only result, send it immediately 79 _ = ss.Send(&webserverv1.StreamSearchResponse{ 80 ResponseChunk: result, 81 }) 82 return 83 } 84 85 // Otherwise, chunk the file matches into multiple responses 86 87 statsSent := false 88 numFilesSent := 0 89 90 sendFunc := func(filesChunk []*webserverv1.FileMatch) error { 91 numFilesSent += len(filesChunk) 92 93 var stats *webserverv1.Stats 94 if !statsSent { // We only send stats back on the first chunk 95 statsSent = true 96 stats = result.GetStats() 97 } 98 99 progress := result.GetProgress() 100 101 if numFilesSent < len(result.GetFiles()) { // more chunks to come 102 progress = &webserverv1.Progress{ 103 Priority: result.GetProgress().GetPriority(), 104 105 // We want the client to consume the entire set of chunks - so we manually 106 // patch the MaxPendingPriority to be >= overall priority. 107 MaxPendingPriority: math.Max( 108 result.GetProgress().GetPriority(), 109 result.GetProgress().GetMaxPendingPriority(), 110 ), 111 } 112 } 113 114 return ss.Send(&webserverv1.StreamSearchResponse{ 115 ResponseChunk: &webserverv1.SearchResponse{ 116 Files: filesChunk, 117 118 Stats: stats, 119 Progress: progress, 120 }, 121 }) 122 } 123 124 _ = chunk.SendAll(sendFunc, result.GetFiles()...) 125 } 126 127 return zoekt.SenderFunc(f) 128}