fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Package chunk provides a utility for sending sets of protobuf messages in 2// groups of smaller chunks. This is useful for gRPC, which has limitations around the maximum 3// size of a message that you can send. 4// 5// This code is adapted from the gitaly project, which is licensed 6// under the MIT license. A copy of that license text can be found at 7// https://mit-license.org/. 8// 9// The code this file was based off can be found here: https://gitlab.com/gitlab-org/gitaly/-/blob/v16.2.0/internal/helper/chunk/chunker.go 10package chunk 11 12import ( 13 "google.golang.org/protobuf/proto" 14) 15 16// New returns a new Chunker that will use the given sendFunc to send chunks of messages. 17func New[T proto.Message](sendFunc func([]T) error) *Chunker[T] { 18 return &Chunker[T]{sendFunc: sendFunc} 19} 20 21// Chunker lets you spread items you want to send over multiple chunks. 22// This type is not thread-safe. 23type Chunker[T proto.Message] struct { 24 sendFunc func([]T) error // sendFunc is the function that will be invoked when a chunk is ready to be sent. 25 26 buffer []T // buffer stores the items that will be sent when the sendFunc is invoked. 27 sizeBytes int // sizeBytes is the size of the current chunk in bytes. 28} 29 30// maxMessageSize is the maximum size per protobuf message 31const maxMessageSize = 1 * 1024 * 1024 // 1 MiB 32 33// Send will append the provided items to the current chunk, and send the chunk if it is full. 34// 35// Callers should ensure that they call Flush() after the last call to Send(). 36func (c *Chunker[T]) Send(items ...T) error { 37 for _, item := range items { 38 if err := c.sendOne(item); err != nil { 39 return err 40 } 41 } 42 43 return nil 44} 45 46func (c *Chunker[T]) sendOne(item T) error { 47 itemSize := proto.Size(item) 48 49 if itemSize+c.sizeBytes >= maxMessageSize { 50 if err := c.sendResponseMsg(); err != nil { 51 return err 52 } 53 } 54 55 c.buffer = append(c.buffer, item) 56 c.sizeBytes += itemSize 57 58 return nil 59} 60 61func (c *Chunker[T]) sendResponseMsg() error { 62 c.sizeBytes = 0 63 64 err := c.sendFunc(c.buffer) 65 if err != nil { 66 return err 67 } 68 69 c.buffer = c.buffer[:0] 70 return nil 71} 72 73// Flush sends remaining items in the current chunk, if any. 74func (c *Chunker[T]) Flush() error { 75 if len(c.buffer) == 0 { 76 return nil 77 } 78 79 err := c.sendResponseMsg() 80 if err != nil { 81 return err 82 } 83 84 return nil 85} 86 87// SendAll is a convenience function that immediately sends all provided items in smaller chunks using the provided 88// sendFunc. 89// 90// See the documentation for Chunker.Send() for more information. 91func SendAll[T proto.Message](sendFunc func([]T) error, items ...T) error { 92 c := New(sendFunc) 93 94 err := c.Send(items...) 95 if err != nil { 96 return err 97 } 98 99 return c.Flush() 100}