fork of https://github.com/sourcegraph/zoekt
1package messagesize
2
3import (
4 "context"
5 "sync"
6 "sync/atomic"
7
8 "github.com/prometheus/client_golang/prometheus"
9 "github.com/prometheus/client_golang/prometheus/promauto"
10 "google.golang.org/grpc"
11 "google.golang.org/protobuf/proto"
12
13 "github.com/sourcegraph/zoekt/grpc/grpcutil"
14)
15
16var (
17 metricServerSingleMessageSize = promauto.NewHistogramVec(prometheus.HistogramOpts{
18 Name: "grpc_server_sent_individual_message_size_bytes_per_rpc",
19 Help: "Size of individual messages sent by the server per RPC.",
20 Buckets: sizeBuckets,
21 }, []string{
22 "grpc_service", // e.g. "gitserver.v1.GitserverService"
23 "grpc_method", // e.g. "Exec"
24 })
25
26 metricServerTotalSentPerRPCBytes = promauto.NewHistogramVec(prometheus.HistogramOpts{
27 Name: "grpc_server_sent_bytes_per_rpc",
28 Help: "Total size of all the messages sent by the server during the course of a single RPC call",
29 Buckets: sizeBuckets,
30 }, []string{
31 "grpc_service", // e.g. "gitserver.v1.GitserverService"
32 "grpc_method", // e.g. "Exec"
33 })
34
35 metricClientSingleMessageSize = promauto.NewHistogramVec(prometheus.HistogramOpts{
36 Name: "grpc_client_sent_individual_message_size_per_rpc_bytes",
37 Help: "Size of individual messages sent by the client per RPC.",
38 Buckets: sizeBuckets,
39 }, []string{
40 "grpc_service", // e.g. "gitserver.v1.GitserverService"
41 "grpc_method", // e.g. "Exec"
42 })
43
44 metricClientTotalSentPerRPCBytes = promauto.NewHistogramVec(prometheus.HistogramOpts{
45 Name: "grpc_client_sent_bytes_per_rpc",
46 Help: "Total size of all the messages sent by the client during the course of a single RPC call",
47 Buckets: sizeBuckets,
48 }, []string{
49 "grpc_service", // e.g. "gitserver.v1.GitserverService"
50 "grpc_method", // e.g. "Exec"
51 })
52)
53
54const (
55 B = 1
56 KB = 1024 * B
57 MB = 1024 * KB
58 GB = 1024 * MB
59)
60
61var sizeBuckets = []float64{
62 0,
63 1 * KB,
64 10 * KB,
65 50 * KB,
66 100 * KB,
67 500 * KB,
68 1 * MB,
69 5 * MB,
70 10 * MB,
71 50 * MB,
72 100 * MB,
73 500 * MB,
74 1 * GB,
75 5 * GB,
76 10 * GB,
77}
78
79// UnaryServerInterceptor is a grpc.UnaryServerInterceptor that records Prometheus metrics that observe the size of
80// the response message sent back by the server for a single RPC call.
81func UnaryServerInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
82 observer := newServerMessageSizeObserver(info.FullMethod)
83
84 return unaryServerInterceptor(observer, req, ctx, info, handler)
85}
86
87func unaryServerInterceptor(observer *messageSizeObserver, req any, ctx context.Context, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
88 defer observer.FinishRPC()
89
90 r, err := handler(ctx, req)
91 if err != nil {
92 return r, err
93 }
94
95 response, ok := r.(proto.Message)
96 if !ok {
97 return r, nil
98 }
99
100 observer.Observe(response)
101 return response, nil
102}
103
104// StreamServerInterceptor is a grpc.StreamServerInterceptor that records Prometheus metrics that observe both the sizes of the
105// individual response messages and the cumulative response size of all the message sent back by the server over the course
106// of a single RPC call.
107func StreamServerInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
108 observer := newServerMessageSizeObserver(info.FullMethod)
109
110 return streamServerInterceptor(observer, srv, ss, info, handler)
111}
112
113func streamServerInterceptor(observer *messageSizeObserver, srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
114 defer observer.FinishRPC()
115
116 wrappedStream := newObservingServerStream(ss, observer)
117
118 return handler(srv, wrappedStream)
119}
120
121// UnaryClientInterceptor is a grpc.UnaryClientInterceptor that records Prometheus metrics that observe the size of
122// the request message sent by client for a single RPC call.
123func UnaryClientInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
124 o := newClientMessageSizeObserver(method)
125 return unaryClientInterceptor(o, ctx, method, req, reply, cc, invoker, opts...)
126}
127
128func unaryClientInterceptor(observer *messageSizeObserver, ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
129 defer observer.FinishRPC()
130
131 err := invoker(ctx, method, req, reply, cc, opts...)
132 if err != nil {
133 // Don't record the size of the message if there was an error sending it, since it may not have been sent.
134 return err
135 }
136
137 // Observe the size of the request message.
138 request, ok := req.(proto.Message)
139 if !ok {
140 return nil
141 }
142
143 observer.Observe(request)
144 return nil
145}
146
147// StreamClientInterceptor is a grpc.StreamClientInterceptor that records Prometheus metrics that observe both the sizes of the
148// individual request messages and the cumulative request size of all the message sent by the client over the course
149// of a single RPC call.
150func StreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
151 observer := newClientMessageSizeObserver(method)
152
153 return streamClientInterceptor(observer, ctx, desc, cc, method, streamer, opts...)
154}
155
156func streamClientInterceptor(observer *messageSizeObserver, ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
157 s, err := streamer(ctx, desc, cc, method, opts...)
158 if err != nil {
159 return nil, err
160 }
161
162 wrappedStream := newObservingClientStream(s, observer)
163 return wrappedStream, nil
164}
165
166type observingServerStream struct {
167 grpc.ServerStream
168
169 observer *messageSizeObserver
170}
171
172func newObservingServerStream(s grpc.ServerStream, observer *messageSizeObserver) grpc.ServerStream {
173 return &observingServerStream{
174 ServerStream: s,
175 observer: observer,
176 }
177}
178
179func (s *observingServerStream) SendMsg(m any) error {
180 err := s.ServerStream.SendMsg(m)
181 if err != nil {
182 // Don't record the size of the message if there was an error sending it, since it may not have been sent.
183 //
184 // However, the stream aborts on an error,
185 // so we need to record the total size of the messages sent during the course of the RPC call.
186 s.observer.FinishRPC()
187 return err
188 }
189
190 // Observe the size of the sent message.
191 message, ok := m.(proto.Message)
192 if !ok {
193 return nil
194 }
195
196 s.observer.Observe(message)
197 return nil
198}
199
200type observingClientStream struct {
201 grpc.ClientStream
202
203 observer *messageSizeObserver
204}
205
206func newObservingClientStream(s grpc.ClientStream, observer *messageSizeObserver) grpc.ClientStream {
207 return &observingClientStream{
208 ClientStream: s,
209 observer: observer,
210 }
211}
212
213func (s *observingClientStream) SendMsg(m any) error {
214 err := s.ClientStream.SendMsg(m)
215 if err != nil {
216 // Don't record the size of the message if there was an error sending it, since it may not have been sent.
217 //
218 // However, the stream aborts on an error,
219 // so we need to record the total size of the messages sent during the course of the RPC call.
220 s.observer.FinishRPC()
221 return err
222 }
223
224 // Observe the size of the sent message.
225 message, ok := m.(proto.Message)
226 if !ok {
227 return nil
228 }
229
230 s.observer.Observe(message)
231 return nil
232}
233
234func (s *observingClientStream) CloseSend() error {
235 err := s.ClientStream.CloseSend()
236
237 s.observer.FinishRPC()
238 return err
239}
240
241func (s *observingClientStream) RecvMsg(m any) error {
242 err := s.ClientStream.RecvMsg(m)
243 if err != nil {
244 // Record the total size of the messages sent during the course of the RPC call, even if there was an error.
245 s.observer.FinishRPC()
246 }
247
248 return err
249}
250
251func newServerMessageSizeObserver(fullMethod string) *messageSizeObserver {
252 serviceName, methodName := grpcutil.SplitMethodName(fullMethod)
253
254 onSingle := func(messageSize uint64) {
255 metricServerSingleMessageSize.WithLabelValues(serviceName, methodName).Observe(float64(messageSize))
256 }
257
258 onFinish := func(messageSize uint64) {
259 metricServerTotalSentPerRPCBytes.WithLabelValues(serviceName, methodName).Observe(float64(messageSize))
260 }
261
262 return &messageSizeObserver{
263 onSingleFunc: onSingle,
264 onFinishFunc: onFinish,
265 }
266}
267
268func newClientMessageSizeObserver(fullMethod string) *messageSizeObserver {
269 serviceName, methodName := grpcutil.SplitMethodName(fullMethod)
270
271 onSingle := func(messageSize uint64) {
272 metricClientSingleMessageSize.WithLabelValues(serviceName, methodName).Observe(float64(messageSize))
273 }
274
275 onFinish := func(messageSize uint64) {
276 metricClientTotalSentPerRPCBytes.WithLabelValues(serviceName, methodName).Observe(float64(messageSize))
277 }
278
279 return &messageSizeObserver{
280 onSingleFunc: onSingle,
281 onFinishFunc: onFinish,
282 }
283}
284
285// messageSizeObserver is a utility that records Prometheus metrics that observe the size of each sent message and the
286// cumulative size of all sent messages during the course of a single RPC call.
287type messageSizeObserver struct {
288 onSingleFunc func(messageSizeBytes uint64)
289
290 finishOnce sync.Once
291 onFinishFunc func(totalSizeBytes uint64)
292
293 totalSizeBytes atomic.Uint64
294}
295
296// Observe records the size of a single message.
297func (o *messageSizeObserver) Observe(message proto.Message) {
298 s := uint64(proto.Size(message))
299 o.onSingleFunc(s)
300
301 o.totalSizeBytes.Add(s)
302}
303
304// FinishRPC records the total size of all sent messages during the course of a single RPC call.
305// This function should only be called once the RPC call has completed.
306func (o *messageSizeObserver) FinishRPC() {
307 o.finishOnce.Do(func() {
308 o.onFinishFunc(o.totalSizeBytes.Load())
309 })
310}
311
312var (
313 _ grpc.ServerStream = &observingServerStream{}
314 _ grpc.ClientStream = &observingClientStream{}
315)
316
317var (
318 _ grpc.UnaryServerInterceptor = UnaryServerInterceptor
319 _ grpc.StreamServerInterceptor = StreamServerInterceptor
320 _ grpc.UnaryClientInterceptor = UnaryClientInterceptor
321 _ grpc.StreamClientInterceptor = StreamClientInterceptor
322)