fork of https://github.com/sourcegraph/zoekt
1// Package chunk provides a utility for sending sets of protobuf messages in
2// groups of smaller chunks. This is useful for gRPC, which has limitations around the maximum
3// size of a message that you can send.
4//
5// This code is adapted from the gitaly project, which is licensed
6// under the MIT license. A copy of that license text can be found at
7// https://mit-license.org/.
8//
9// The code this file was based off can be found here: https://gitlab.com/gitlab-org/gitaly/-/blob/v16.2.0/internal/helper/chunk/chunker.go
10package chunk
11
12import (
13 "google.golang.org/protobuf/proto"
14)
15
16// New returns a new Chunker that will use the given sendFunc to send chunks of messages.
17func New[T proto.Message](sendFunc func([]T) error) *Chunker[T] {
18 return &Chunker[T]{sendFunc: sendFunc}
19}
20
21// Chunker lets you spread items you want to send over multiple chunks.
22// This type is not thread-safe.
23type Chunker[T proto.Message] struct {
24 sendFunc func([]T) error // sendFunc is the function that will be invoked when a chunk is ready to be sent.
25
26 buffer []T // buffer stores the items that will be sent when the sendFunc is invoked.
27 sizeBytes int // sizeBytes is the size of the current chunk in bytes.
28}
29
30// maxMessageSize is the maximum size per protobuf message
31const maxMessageSize = 1 * 1024 * 1024 // 1 MiB
32
33// Send will append the provided items to the current chunk, and send the chunk if it is full.
34//
35// Callers should ensure that they call Flush() after the last call to Send().
36func (c *Chunker[T]) Send(items ...T) error {
37 for _, item := range items {
38 if err := c.sendOne(item); err != nil {
39 return err
40 }
41 }
42
43 return nil
44}
45
46func (c *Chunker[T]) sendOne(item T) error {
47 itemSize := proto.Size(item)
48
49 if itemSize+c.sizeBytes >= maxMessageSize {
50 if err := c.sendResponseMsg(); err != nil {
51 return err
52 }
53 }
54
55 c.buffer = append(c.buffer, item)
56 c.sizeBytes += itemSize
57
58 return nil
59}
60
61func (c *Chunker[T]) sendResponseMsg() error {
62 c.sizeBytes = 0
63
64 err := c.sendFunc(c.buffer)
65 if err != nil {
66 return err
67 }
68
69 c.buffer = c.buffer[:0]
70 return nil
71}
72
73// Flush sends remaining items in the current chunk, if any.
74func (c *Chunker[T]) Flush() error {
75 if len(c.buffer) == 0 {
76 return nil
77 }
78
79 err := c.sendResponseMsg()
80 if err != nil {
81 return err
82 }
83
84 return nil
85}
86
87// SendAll is a convenience function that immediately sends all provided items in smaller chunks using the provided
88// sendFunc.
89//
90// See the documentation for Chunker.Send() for more information.
91func SendAll[T proto.Message](sendFunc func([]T) error, items ...T) error {
92 c := New(sendFunc)
93
94 err := c.Send(items...)
95 if err != nil {
96 return err
97 }
98
99 return c.Flush()
100}