···11+// Package chunk provides a utility for sending sets of protobuf messages in
22+// groups of smaller chunks. This is useful for gRPC, which has limitations around the maximum
33+// size of a message that you can send.
44+//
55+// This code is adapted from the gitaly project, which is licensed
66+// under the MIT license. A copy of that license text can be found at
77+// https://mit-license.org/.
88+//
99+// The code this file was based off can be found here: https://gitlab.com/gitlab-org/gitaly/-/blob/v16.2.0/internal/helper/chunk/chunker.go
1010+package chunk
1111+1212+import (
1313+ "google.golang.org/protobuf/proto"
1414+)
1515+1616+// New returns a new Chunker that will use the given sendFunc to send chunks of messages.
1717+func New[T proto.Message](sendFunc func([]T) error) *Chunker[T] {
1818+ return &Chunker[T]{sendFunc: sendFunc}
1919+}
2020+2121+// Chunker lets you spread items you want to send over multiple chunks.
2222+// This type is not thread-safe.
2323+type Chunker[T proto.Message] struct {
2424+ sendFunc func([]T) error // sendFunc is the function that will be invoked when a chunk is ready to be sent.
2525+2626+ buffer []T // buffer stores the items that will be sent when the sendFunc is invoked.
2727+ sizeBytes int // sizeBytes is the size of the current chunk in bytes.
2828+}
2929+3030+// maxMessageSize is the maximum size per protobuf message
3131+const maxMessageSize = 1 * 1024 * 1024 // 1 MiB
3232+3333+// Send will append the provided items to the current chunk, and send the chunk if it is full.
3434+//
3535+// Callers should ensure that they call Flush() after the last call to Send().
3636+func (c *Chunker[T]) Send(items ...T) error {
3737+ for _, item := range items {
3838+ if err := c.sendOne(item); err != nil {
3939+ return err
4040+ }
4141+ }
4242+4343+ return nil
4444+}
4545+4646+func (c *Chunker[T]) sendOne(item T) error {
4747+ itemSize := proto.Size(item)
4848+4949+ if itemSize+c.sizeBytes >= maxMessageSize {
5050+ if err := c.sendResponseMsg(); err != nil {
5151+ return err
5252+ }
5353+ }
5454+5555+ c.buffer = append(c.buffer, item)
5656+ c.sizeBytes += itemSize
5757+5858+ return nil
5959+}
6060+6161+func (c *Chunker[T]) sendResponseMsg() error {
6262+ c.sizeBytes = 0
6363+6464+ err := c.sendFunc(c.buffer)
6565+ if err != nil {
6666+ return err
6767+ }
6868+6969+ c.buffer = c.buffer[:0]
7070+ return nil
7171+}
7272+7373+// Flush sends remaining items in the current chunk, if any.
7474+func (c *Chunker[T]) Flush() error {
7575+ if len(c.buffer) == 0 {
7676+ return nil
7777+ }
7878+7979+ err := c.sendResponseMsg()
8080+ if err != nil {
8181+ return err
8282+ }
8383+8484+ return nil
8585+}
8686+8787+// SendAll is a convenience function that immediately sends all provided items in smaller chunks using the provided
8888+// sendFunc.
8989+//
9090+// See the documentation for Chunker.Send() for more information.
9191+func SendAll[T proto.Message](sendFunc func([]T) error, items ...T) error {
9292+ c := New(sendFunc)
9393+9494+ err := c.Send(items...)
9595+ if err != nil {
9696+ return err
9797+ }
9898+9999+ return c.Flush()
100100+}
+278
grpc/chunk/chunker_test.go
···11+// Package chunk provides a utility for sending sets of protobuf messages in
22+// groups of smaller chunks. This is useful for gRPC, which has limitations around the maximum
33+// size of a message that you can send.
44+//
55+// This code is adapted from the gitaly project, which is licensed
66+// under the MIT license. A copy of that license text can be found at
77+// https://mit-license.org/.
88+//
99+// The code this file was based off can be found here: https://gitlab.com/gitlab-org/gitaly/-/blob/v16.2.0/internal/helper/chunk/chunker_test.go
1010+package chunk
1111+1212+import (
1313+ "bytes"
1414+ "context"
1515+ "errors"
1616+ "fmt"
1717+ "io"
1818+ "net"
1919+ "strconv"
2020+ "testing"
2121+ "testing/quick"
2222+2323+ "github.com/dustin/go-humanize"
2424+ "github.com/google/go-cmp/cmp"
2525+ "github.com/stretchr/testify/require"
2626+ "google.golang.org/grpc"
2727+ "google.golang.org/grpc/credentials/insecure"
2828+ "google.golang.org/grpc/interop/grpc_testing"
2929+ "google.golang.org/protobuf/proto"
3030+)
3131+3232+func TestChunker_DeliverAllMessages(t *testing.T) {
3333+ runTest := func(inputPayloads [][]byte) error {
3434+ expectedPayloadSizeBytes := 0
3535+ for _, payload := range inputPayloads {
3636+ expectedPayloadSizeBytes += len(payload)
3737+ }
3838+3939+ var receivedPayloads []*grpc_testing.Payload
4040+4141+ // Tell the chunker to just gather all the payloads for later inspection.
4242+ sendFunc := func(payloads []*grpc_testing.Payload) error {
4343+ receivedPayloads = append(receivedPayloads, payloads...)
4444+ return nil
4545+ }
4646+4747+ c := New(sendFunc)
4848+4949+ // send all the payloads
5050+ for _, payload := range inputPayloads {
5151+ if err := c.Send(&grpc_testing.Payload{Body: payload}); err != nil {
5252+ return fmt.Errorf("error sending payload: %s", err)
5353+ }
5454+ }
5555+5656+ if err := c.Flush(); err != nil {
5757+ return fmt.Errorf("error flushing chunker: %s", err)
5858+ }
5959+6060+ // Confirm that we received the same number of payloads as we sent.
6161+ if diff := cmp.Diff(len(inputPayloads), len(receivedPayloads)); diff != "" {
6262+ return fmt.Errorf("unexpected number of payloads (-want +got):\n%s", diff)
6363+ }
6464+6565+ // Confirm that each received payload is the same as the original.
6666+ for i, payload := range receivedPayloads {
6767+ expectedPayload := inputPayloads[i]
6868+ if diff := cmp.Diff(expectedPayload, payload.GetBody()); diff != "" {
6969+ return fmt.Errorf("for payload #%d (-want +got):\n%s", i, diff)
7070+ }
7171+ }
7272+7373+ receivedPayloadSizeBytes := 0
7474+ for _, payload := range receivedPayloads {
7575+ receivedPayloadSizeBytes += len(payload.GetBody())
7676+ }
7777+7878+ // Confirm that the total size of the payloads we received is the same as the total size of the payloads we sent.
7979+ if diff := cmp.Diff(expectedPayloadSizeBytes, receivedPayloadSizeBytes); diff != "" {
8080+ return fmt.Errorf("unexpected payload size (-want +got):\n%s", diff)
8181+ }
8282+8383+ return nil
8484+ }
8585+8686+ t.Run("normal", func(t *testing.T) {
8787+ t.Parallel()
8888+8989+ inputPayloads := [][]byte{
9090+ {1, 2, 3},
9191+ bytes.Repeat([]byte("a"), int(3.5*maxMessageSize)),
9292+ {4, 5, 6},
9393+ }
9494+9595+ if err := runTest(inputPayloads); err != nil {
9696+ t.Fatal(err)
9797+ }
9898+ })
9999+100100+ t.Run("some empty", func(t *testing.T) {
101101+ t.Parallel()
102102+103103+ inputPayloads := [][]byte{
104104+ {},
105105+ []byte("foo, bar, baz"),
106106+ bytes.Repeat([]byte("a"), int(3.5*maxMessageSize)),
107107+ {},
108108+ }
109109+110110+ if err := runTest(inputPayloads); err != nil {
111111+ t.Fatal(err)
112112+ }
113113+ })
114114+115115+ t.Run("fuzz", func(t *testing.T) {
116116+ t.Parallel()
117117+118118+ var lastErr error
119119+120120+ if err := quick.Check(func(payloads [][]byte) bool {
121121+ lastErr = runTest(payloads)
122122+ if lastErr != nil {
123123+ return false
124124+ }
125125+126126+ return true
127127+ }, nil); err != nil {
128128+ t.Fatal(lastErr)
129129+ }
130130+ })
131131+}
132132+133133+func TestChunkerE2E(t *testing.T) {
134134+ for _, test := range []struct {
135135+ name string
136136+137137+ inputSizeBytes int
138138+ expectedMessageCount int
139139+ }{
140140+ {
141141+ name: "normal",
142142+143143+ inputSizeBytes: int(3.5 * maxMessageSize),
144144+ expectedMessageCount: 4,
145145+ },
146146+ {
147147+ name: "empty payload",
148148+ inputSizeBytes: 0,
149149+ expectedMessageCount: 1,
150150+ },
151151+ } {
152152+ t.Run(test.name, func(t *testing.T) {
153153+ s := &server{}
154154+ srv, serverSocketPath := runServer(t, s)
155155+ t.Cleanup(func() {
156156+ srv.Stop()
157157+ })
158158+159159+ client, conn := newClient(t, serverSocketPath)
160160+ t.Cleanup(func() {
161161+ _ = conn.Close()
162162+ })
163163+164164+ ctx := context.Background()
165165+166166+ stream, err := client.StreamingOutputCall(ctx, &grpc_testing.StreamingOutputCallRequest{
167167+ Payload: &grpc_testing.Payload{
168168+ Body: []byte(strconv.FormatInt(int64(test.inputSizeBytes), 10)),
169169+ },
170170+ })
171171+172172+ require.NoError(t, err)
173173+174174+ messageCount := 0
175175+ var receivedPayload []byte
176176+ for {
177177+ resp, err := stream.Recv()
178178+ if errors.Is(err, io.EOF) {
179179+ break
180180+ }
181181+182182+ if err != nil {
183183+ t.Fatal(err)
184184+ }
185185+186186+ messageCount++
187187+ receivedPayload = append(receivedPayload, resp.GetPayload().GetBody()...)
188188+189189+ require.Less(t, proto.Size(resp), maxMessageSize)
190190+ }
191191+192192+ require.Equal(t, test.expectedMessageCount, messageCount)
193193+194194+ receivedPayloadSizeBytes := len(receivedPayload)
195195+196196+ expectedSizeBytes := test.inputSizeBytes
197197+198198+ if receivedPayloadSizeBytes != expectedSizeBytes {
199199+ t.Fatalf("input payload size is not %d bytes (~ %q), got size: %d (~ %q)",
200200+ expectedSizeBytes, humanize.Bytes(uint64(expectedSizeBytes)),
201201+ receivedPayloadSizeBytes, humanize.Bytes(uint64(receivedPayloadSizeBytes)),
202202+ )
203203+ }
204204+205205+ })
206206+ }
207207+}
208208+209209+type server struct {
210210+ grpc_testing.UnimplementedTestServiceServer
211211+}
212212+213213+func (s *server) StreamingOutputCall(req *grpc_testing.StreamingOutputCallRequest, stream grpc_testing.TestService_StreamingOutputCallServer) error {
214214+ const kilobyte = 1024
215215+216216+ c := New[*grpc_testing.Payload](func(payloads []*grpc_testing.Payload) error {
217217+ var body []byte
218218+ for _, p := range payloads {
219219+ body = append(body, p.GetBody()...)
220220+ }
221221+222222+ return stream.Send(&grpc_testing.StreamingOutputCallResponse{Payload: &grpc_testing.Payload{Body: body}})
223223+ })
224224+225225+ bytesToSend, err := strconv.ParseInt(string(req.GetPayload().GetBody()), 10, 64)
226226+ if err != nil {
227227+ return err
228228+ }
229229+230230+ if bytesToSend == 0 {
231231+ if err := c.Send(&grpc_testing.Payload{}); err != nil {
232232+ return err
233233+ }
234234+235235+ return c.Flush()
236236+ }
237237+238238+ for numBytes := int64(0); numBytes < bytesToSend; numBytes += kilobyte {
239239+ if err := c.Send(&grpc_testing.Payload{Body: make([]byte, kilobyte)}); err != nil {
240240+ return err
241241+ }
242242+ }
243243+244244+ return c.Flush()
245245+}
246246+247247+func runServer(t *testing.T, s *server, opt ...grpc.ServerOption) (*grpc.Server, string) {
248248+ grpcServer := grpc.NewServer(opt...)
249249+ grpc_testing.RegisterTestServiceServer(grpcServer, s)
250250+251251+ lis, err := net.Listen("tcp", ":0")
252252+ require.NoError(t, err)
253253+254254+ go func() {
255255+ err := grpcServer.Serve(lis)
256256+ require.NoError(t, err)
257257+ }()
258258+259259+ t.Cleanup(func() {
260260+ grpcServer.Stop()
261261+ lis.Close()
262262+ })
263263+264264+ return grpcServer, lis.Addr().String()
265265+}
266266+267267+func newClient(t *testing.T, serverSocketPath string) (grpc_testing.TestServiceClient, *grpc.ClientConn) {
268268+ connOpts := []grpc.DialOption{
269269+ grpc.WithTransportCredentials(insecure.NewCredentials()),
270270+ }
271271+272272+ conn, err := grpc.Dial(serverSocketPath, connOpts...)
273273+ if err != nil {
274274+ t.Fatal(err)
275275+ }
276276+277277+ return grpc_testing.NewTestServiceClient(conn), conn
278278+}
+59-4
grpc/server.go
···2233import (
44 "context"
55+ "math"
5677+ "github.com/sourcegraph/zoekt/grpc/chunk"
68 "google.golang.org/grpc/codes"
79 "google.golang.org/grpc/status"
810···4345 return status.Error(codes.InvalidArgument, err.Error())
4446 }
45474646- onMatch := stream.SenderFunc(func(res *zoekt.SearchResult) {
4747- ss.Send(res.ToProto())
4848- })
4949- sampler := stream.NewSamplingSender(onMatch)
4848+ sender := gRPCChunkSender(ss)
4949+ sampler := stream.NewSamplingSender(sender)
50505151 err = s.streamer.StreamSearch(ss.Context(), q, zoekt.SearchOptionsFromProto(req.GetOpts()), sampler)
5252 if err == nil {
···68686969 return repoList.ToProto(), nil
7070}
7171+7272+// gRPCChunkSender is a zoekt.Sender that sends small chunks of FileMatches to the provided gRPC stream.
7373+func gRPCChunkSender(ss v1.WebserverService_StreamSearchServer) zoekt.Sender {
7474+ f := func(r *zoekt.SearchResult) {
7575+ result := r.ToProto()
7676+7777+ if len(result.GetFiles()) == 0 { // stats-only result, send it immediately
7878+ _ = ss.Send(result)
7979+ return
8080+ }
8181+8282+ // Otherwise, chunk the file matches into multiple responses
8383+8484+ statsSent := false
8585+ numFilesSent := 0
8686+8787+ sendFunc := func(filesChunk []*v1.FileMatch) error {
8888+ numFilesSent += len(filesChunk)
8989+9090+ var stats *v1.Stats
9191+ if !statsSent { // We only send stats back on the first chunk
9292+ statsSent = true
9393+ stats = result.GetStats()
9494+ }
9595+9696+ progress := result.GetProgress()
9797+9898+ if numFilesSent < len(result.GetFiles()) { // more chunks to come
9999+ progress = &v1.Progress{
100100+ Priority: result.GetProgress().GetPriority(),
101101+102102+ // We want the client to consume the entire set of chunks - so we manually
103103+ // patch the MaxPendingPriority to be >= overall priority.
104104+ MaxPendingPriority: math.Max(
105105+ result.GetProgress().GetPriority(),
106106+ result.GetProgress().GetMaxPendingPriority(),
107107+ ),
108108+ }
109109+ }
110110+111111+ response := &v1.SearchResponse{
112112+ Files: filesChunk,
113113+114114+ Stats: stats,
115115+ Progress: progress,
116116+ }
117117+118118+ return ss.Send(response)
119119+ }
120120+121121+ _ = chunk.SendAll(sendFunc, result.GetFiles()...)
122122+ }
123123+124124+ return stream.SenderFunc(f)
125125+}
+195
grpc/server_test.go
···2233import (
44 "context"
55+ "errors"
66+ "fmt"
77+ "io"
58 "net/http/httptest"
69 "net/url"
710 "testing"
1111+ "testing/quick"
8121313+ "github.com/google/go-cmp/cmp"
1414+ "github.com/google/go-cmp/cmp/cmpopts"
1515+ "go.uber.org/atomic"
916 "golang.org/x/net/http2"
1017 "golang.org/x/net/http2/h2c"
1118 "google.golang.org/grpc"
1219 "google.golang.org/grpc/credentials/insecure"
1320 "google.golang.org/protobuf/proto"
2121+ "google.golang.org/protobuf/testing/protocmp"
14221523 "github.com/sourcegraph/zoekt"
1624 v1 "github.com/sourcegraph/zoekt/grpc/v1"
···2432 SearchResult: &zoekt.SearchResult{
2533 Files: []zoekt.FileMatch{
2634 {FileName: "bin.go"},
3535+ {FileName: "foo.go"},
2736 },
2837 },
2938···4150 }
42514352 gs := grpc.NewServer()
5353+ defer gs.Stop()
5454+4455 v1.RegisterWebserverServiceServer(gs, NewServer(adapter{mock}))
4556 ts := httptest.NewServer(h2c.NewHandler(gs, &http2.Server{}))
5757+ defer ts.Close()
46584759 u, err := url.Parse(ts.URL)
4860 if err != nil {
···5264 if err != nil {
5365 t.Fatal(err)
5466 }
6767+ defer cc.Close()
6868+5569 client := v1.NewWebserverServiceClient(cc)
56705771 r, err := client.Search(context.Background(), &v1.SearchRequest{Query: query.QToProto(mock.WantSearch)})
···7084 if !proto.Equal(l, mock.RepoList.ToProto()) {
7185 t.Fatalf("got %+v, want %+v", l, mock.RepoList.ToProto())
7286 }
8787+8888+ cs, err := client.StreamSearch(context.Background(), &v1.SearchRequest{Query: query.QToProto(mock.WantSearch)})
8989+ if err != nil {
9090+ t.Fatal(err)
9191+ }
9292+9393+ allResponses := readAllStream(t, cs)
9494+9595+ // check to make sure that we get the same set of file matches back
9696+ var receivedFileMatches []*v1.FileMatch
9797+ for _, r := range allResponses {
9898+ receivedFileMatches = append(receivedFileMatches, r.GetFiles()...)
9999+ }
100100+101101+ if diff := cmp.Diff(receivedFileMatches, mock.SearchResult.ToProto().GetFiles(), protocmp.Transform()); diff != "" {
102102+ t.Fatalf("unexpected difference in file matches (-want +got):\n%s", diff)
103103+ }
104104+}
105105+106106+func TestFuzzGRPCChunkSender(t *testing.T) {
107107+ validateResult := func(input zoekt.SearchResult) error {
108108+ clientStream, serverStream := newPairedSearchStream(t)
109109+ sender := gRPCChunkSender(serverStream)
110110+111111+ sender.Send(&input)
112112+113113+ allResponses := readAllStream(t, clientStream)
114114+ if len(allResponses) == 0 {
115115+ return errors.New("received no responses")
116116+ }
117117+118118+ expectedResult := input.ToProto()
119119+120120+ for i, receivedResponse := range allResponses {
121121+ // First, check some invariants about the progress field
122122+123123+ if i == len(allResponses)-1 {
124124+ // The last response should have the same progress as the original search result
125125+ if diff := cmp.Diff(expectedResult.GetProgress(), receivedResponse.GetProgress(), protocmp.Transform()); diff != "" {
126126+ return fmt.Errorf("unexpected difference in progress (-want +got):\n%s", diff)
127127+ }
128128+129129+ } else {
130130+ // All other responses should ensure that the progress' priority is less than the max-pending priority, to
131131+ // ensure that the client consumes the entire set of chunks
132132+133133+ if receivedResponse.GetProgress().GetPriority() > receivedResponse.GetProgress().GetMaxPendingPriority() {
134134+ return fmt.Errorf(
135135+ "received response %d (%s) has priority %.6f, which is greater than the max pending priority %.6f",
136136+ i, receivedResponse,
137137+ receivedResponse.GetProgress().GetPriority(), receivedResponse.GetProgress().GetMaxPendingPriority(),
138138+ )
139139+140140+ }
141141+ }
142142+143143+ // Safety, ensure that all other fields are echoed back correctly if the schema ever changes
144144+ opts := []cmp.Option{
145145+ protocmp.Transform(),
146146+ protocmp.IgnoreFields(&v1.SearchResponse{},
147147+ "progress", // progress is tested above
148148+ "stats", // aggregated stats are tested below
149149+ "files", // files are tested separately
150150+ ),
151151+ }
152152+153153+ if diff := cmp.Diff(expectedResult, receivedResponse, opts...); diff != "" {
154154+ return fmt.Errorf("unexpected difference in response fields (-want +got):\n%s", diff)
155155+ }
156156+ }
157157+158158+ receivedStats := &zoekt.Stats{}
159159+160160+ var receivedFileMatches []*v1.FileMatch
161161+ for _, r := range allResponses {
162162+ receivedStats.Add(zoekt.StatsFromProto(r.GetStats()))
163163+ receivedFileMatches = append(receivedFileMatches, r.GetFiles()...)
164164+ }
165165+166166+ // Check to make sure that we get one set of stats back
167167+ if diff := cmp.Diff(expectedResult.GetStats(), receivedStats.ToProto(),
168168+ protocmp.Transform(),
169169+ protocmp.IgnoreFields(&v1.Stats{},
170170+ "duration", // for whatever the duration field isn't updated when zoekt.Stats.Add is called
171171+ ),
172172+ ); diff != "" {
173173+ return fmt.Errorf("unexpected difference in stats (-want +got):\n%s", diff)
174174+ }
175175+176176+ // Check to make sure that we get the same set of file matches back
177177+ if diff := cmp.Diff(expectedResult.GetFiles(), receivedFileMatches,
178178+ protocmp.Transform(), cmpopts.EquateEmpty()); diff != "" {
179179+ return fmt.Errorf("unexpected difference in file matches (-want +got):\n%s", diff)
180180+ }
181181+182182+ return nil
183183+ }
184184+185185+ var lastErr error
186186+ if err := quick.Check(func(r zoekt.SearchResult) bool {
187187+ lastErr = validateResult(r)
188188+189189+ return lastErr == nil
190190+ }, nil); err != nil {
191191+ t.Fatal(lastErr.Error())
192192+ }
193193+}
194194+195195+// newPairedSearchStream returns a pair of client and server search streams that are connected to each other.
196196+func newPairedSearchStream(t *testing.T) (v1.WebserverService_StreamSearchClient, v1.WebserverService_StreamSearchServer) {
197197+ client := &mockSearchStreamClient{t: t}
198198+ server := &mockSearchStreamServer{t: t, pairedClient: client}
199199+200200+ return client, server
201201+}
202202+203203+type mockSearchStreamClient struct {
204204+ t *testing.T
205205+206206+ storedResponses []*v1.SearchResponse
207207+ index int
208208+209209+ startedReading atomic.Bool
210210+211211+ grpc.ClientStream
212212+}
213213+214214+func (m *mockSearchStreamClient) Recv() (*v1.SearchResponse, error) {
215215+ m.startedReading.Store(true)
216216+217217+ if m.index >= len(m.storedResponses) {
218218+ return nil, io.EOF
219219+ }
220220+221221+ r := m.storedResponses[m.index]
222222+ m.index++
223223+ return r, nil
224224+}
225225+226226+func (m *mockSearchStreamClient) storeResponse(r *v1.SearchResponse) {
227227+ if m.startedReading.Load() {
228228+ m.t.Fatalf("cannot store additional responses after starting to read from stream")
229229+ }
230230+231231+ m.storedResponses = append(m.storedResponses, r)
232232+}
233233+234234+type mockSearchStreamServer struct {
235235+ t *testing.T
236236+237237+ pairedClient *mockSearchStreamClient
238238+239239+ grpc.ServerStream
240240+}
241241+242242+func (m *mockSearchStreamServer) Send(r *v1.SearchResponse) error {
243243+ m.pairedClient.storeResponse(r)
244244+ return nil
245245+}
246246+247247+var (
248248+ _ v1.WebserverService_StreamSearchServer = &mockSearchStreamServer{}
249249+ _ v1.WebserverService_StreamSearchClient = &mockSearchStreamClient{}
250250+)
251251+252252+func readAllStream(t *testing.T, cs v1.WebserverService_StreamSearchClient) []*v1.SearchResponse {
253253+ var got []*v1.SearchResponse
254254+ for { // collect all responses from the stream
255255+ r, err := cs.Recv()
256256+ if errors.Is(err, io.EOF) {
257257+ break
258258+ }
259259+260260+ if err != nil {
261261+ t.Fatal(err)
262262+ }
263263+264264+ got = append(got, r)
265265+ }
266266+267267+ return got
73268}
7426975270func mustParse(s string) query.Q {