fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 9.9 kB View raw
1package messagesize 2 3import ( 4 "context" 5 "sync" 6 "sync/atomic" 7 8 "github.com/prometheus/client_golang/prometheus" 9 "github.com/prometheus/client_golang/prometheus/promauto" 10 "google.golang.org/grpc" 11 "google.golang.org/protobuf/proto" 12 13 "github.com/sourcegraph/zoekt/grpc/grpcutil" 14) 15 16var ( 17 metricServerSingleMessageSize = promauto.NewHistogramVec(prometheus.HistogramOpts{ 18 Name: "grpc_server_sent_individual_message_size_bytes_per_rpc", 19 Help: "Size of individual messages sent by the server per RPC.", 20 Buckets: sizeBuckets, 21 }, []string{ 22 "grpc_service", // e.g. "gitserver.v1.GitserverService" 23 "grpc_method", // e.g. "Exec" 24 }) 25 26 metricServerTotalSentPerRPCBytes = promauto.NewHistogramVec(prometheus.HistogramOpts{ 27 Name: "grpc_server_sent_bytes_per_rpc", 28 Help: "Total size of all the messages sent by the server during the course of a single RPC call", 29 Buckets: sizeBuckets, 30 }, []string{ 31 "grpc_service", // e.g. "gitserver.v1.GitserverService" 32 "grpc_method", // e.g. "Exec" 33 }) 34 35 metricClientSingleMessageSize = promauto.NewHistogramVec(prometheus.HistogramOpts{ 36 Name: "grpc_client_sent_individual_message_size_per_rpc_bytes", 37 Help: "Size of individual messages sent by the client per RPC.", 38 Buckets: sizeBuckets, 39 }, []string{ 40 "grpc_service", // e.g. "gitserver.v1.GitserverService" 41 "grpc_method", // e.g. "Exec" 42 }) 43 44 metricClientTotalSentPerRPCBytes = promauto.NewHistogramVec(prometheus.HistogramOpts{ 45 Name: "grpc_client_sent_bytes_per_rpc", 46 Help: "Total size of all the messages sent by the client during the course of a single RPC call", 47 Buckets: sizeBuckets, 48 }, []string{ 49 "grpc_service", // e.g. "gitserver.v1.GitserverService" 50 "grpc_method", // e.g. "Exec" 51 }) 52) 53 54const ( 55 B = 1 56 KB = 1024 * B 57 MB = 1024 * KB 58 GB = 1024 * MB 59) 60 61var sizeBuckets = []float64{ 62 0, 63 1 * KB, 64 10 * KB, 65 50 * KB, 66 100 * KB, 67 500 * KB, 68 1 * MB, 69 5 * MB, 70 10 * MB, 71 50 * MB, 72 100 * MB, 73 500 * MB, 74 1 * GB, 75 5 * GB, 76 10 * GB, 77} 78 79// UnaryServerInterceptor is a grpc.UnaryServerInterceptor that records Prometheus metrics that observe the size of 80// the response message sent back by the server for a single RPC call. 81func UnaryServerInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { 82 observer := newServerMessageSizeObserver(info.FullMethod) 83 84 return unaryServerInterceptor(observer, req, ctx, info, handler) 85} 86 87func unaryServerInterceptor(observer *messageSizeObserver, req any, ctx context.Context, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { 88 defer observer.FinishRPC() 89 90 r, err := handler(ctx, req) 91 if err != nil { 92 return r, err 93 } 94 95 response, ok := r.(proto.Message) 96 if !ok { 97 return r, nil 98 } 99 100 observer.Observe(response) 101 return response, nil 102} 103 104// StreamServerInterceptor is a grpc.StreamServerInterceptor that records Prometheus metrics that observe both the sizes of the 105// individual response messages and the cumulative response size of all the message sent back by the server over the course 106// of a single RPC call. 107func StreamServerInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { 108 observer := newServerMessageSizeObserver(info.FullMethod) 109 110 return streamServerInterceptor(observer, srv, ss, info, handler) 111} 112 113func streamServerInterceptor(observer *messageSizeObserver, srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { 114 defer observer.FinishRPC() 115 116 wrappedStream := newObservingServerStream(ss, observer) 117 118 return handler(srv, wrappedStream) 119} 120 121// UnaryClientInterceptor is a grpc.UnaryClientInterceptor that records Prometheus metrics that observe the size of 122// the request message sent by client for a single RPC call. 123func UnaryClientInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 124 o := newClientMessageSizeObserver(method) 125 return unaryClientInterceptor(o, ctx, method, req, reply, cc, invoker, opts...) 126} 127 128func unaryClientInterceptor(observer *messageSizeObserver, ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 129 defer observer.FinishRPC() 130 131 err := invoker(ctx, method, req, reply, cc, opts...) 132 if err != nil { 133 // Don't record the size of the message if there was an error sending it, since it may not have been sent. 134 return err 135 } 136 137 // Observe the size of the request message. 138 request, ok := req.(proto.Message) 139 if !ok { 140 return nil 141 } 142 143 observer.Observe(request) 144 return nil 145} 146 147// StreamClientInterceptor is a grpc.StreamClientInterceptor that records Prometheus metrics that observe both the sizes of the 148// individual request messages and the cumulative request size of all the message sent by the client over the course 149// of a single RPC call. 150func StreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 151 observer := newClientMessageSizeObserver(method) 152 153 return streamClientInterceptor(observer, ctx, desc, cc, method, streamer, opts...) 154} 155 156func streamClientInterceptor(observer *messageSizeObserver, ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 157 s, err := streamer(ctx, desc, cc, method, opts...) 158 if err != nil { 159 return nil, err 160 } 161 162 wrappedStream := newObservingClientStream(s, observer) 163 return wrappedStream, nil 164} 165 166type observingServerStream struct { 167 grpc.ServerStream 168 169 observer *messageSizeObserver 170} 171 172func newObservingServerStream(s grpc.ServerStream, observer *messageSizeObserver) grpc.ServerStream { 173 return &observingServerStream{ 174 ServerStream: s, 175 observer: observer, 176 } 177} 178 179func (s *observingServerStream) SendMsg(m any) error { 180 err := s.ServerStream.SendMsg(m) 181 if err != nil { 182 // Don't record the size of the message if there was an error sending it, since it may not have been sent. 183 // 184 // However, the stream aborts on an error, 185 // so we need to record the total size of the messages sent during the course of the RPC call. 186 s.observer.FinishRPC() 187 return err 188 } 189 190 // Observe the size of the sent message. 191 message, ok := m.(proto.Message) 192 if !ok { 193 return nil 194 } 195 196 s.observer.Observe(message) 197 return nil 198} 199 200type observingClientStream struct { 201 grpc.ClientStream 202 203 observer *messageSizeObserver 204} 205 206func newObservingClientStream(s grpc.ClientStream, observer *messageSizeObserver) grpc.ClientStream { 207 return &observingClientStream{ 208 ClientStream: s, 209 observer: observer, 210 } 211} 212 213func (s *observingClientStream) SendMsg(m any) error { 214 err := s.ClientStream.SendMsg(m) 215 if err != nil { 216 // Don't record the size of the message if there was an error sending it, since it may not have been sent. 217 // 218 // However, the stream aborts on an error, 219 // so we need to record the total size of the messages sent during the course of the RPC call. 220 s.observer.FinishRPC() 221 return err 222 } 223 224 // Observe the size of the sent message. 225 message, ok := m.(proto.Message) 226 if !ok { 227 return nil 228 } 229 230 s.observer.Observe(message) 231 return nil 232} 233 234func (s *observingClientStream) CloseSend() error { 235 err := s.ClientStream.CloseSend() 236 237 s.observer.FinishRPC() 238 return err 239} 240 241func (s *observingClientStream) RecvMsg(m any) error { 242 err := s.ClientStream.RecvMsg(m) 243 if err != nil { 244 // Record the total size of the messages sent during the course of the RPC call, even if there was an error. 245 s.observer.FinishRPC() 246 } 247 248 return err 249} 250 251func newServerMessageSizeObserver(fullMethod string) *messageSizeObserver { 252 serviceName, methodName := grpcutil.SplitMethodName(fullMethod) 253 254 onSingle := func(messageSize uint64) { 255 metricServerSingleMessageSize.WithLabelValues(serviceName, methodName).Observe(float64(messageSize)) 256 } 257 258 onFinish := func(messageSize uint64) { 259 metricServerTotalSentPerRPCBytes.WithLabelValues(serviceName, methodName).Observe(float64(messageSize)) 260 } 261 262 return &messageSizeObserver{ 263 onSingleFunc: onSingle, 264 onFinishFunc: onFinish, 265 } 266} 267 268func newClientMessageSizeObserver(fullMethod string) *messageSizeObserver { 269 serviceName, methodName := grpcutil.SplitMethodName(fullMethod) 270 271 onSingle := func(messageSize uint64) { 272 metricClientSingleMessageSize.WithLabelValues(serviceName, methodName).Observe(float64(messageSize)) 273 } 274 275 onFinish := func(messageSize uint64) { 276 metricClientTotalSentPerRPCBytes.WithLabelValues(serviceName, methodName).Observe(float64(messageSize)) 277 } 278 279 return &messageSizeObserver{ 280 onSingleFunc: onSingle, 281 onFinishFunc: onFinish, 282 } 283} 284 285// messageSizeObserver is a utility that records Prometheus metrics that observe the size of each sent message and the 286// cumulative size of all sent messages during the course of a single RPC call. 287type messageSizeObserver struct { 288 onSingleFunc func(messageSizeBytes uint64) 289 290 finishOnce sync.Once 291 onFinishFunc func(totalSizeBytes uint64) 292 293 totalSizeBytes atomic.Uint64 294} 295 296// Observe records the size of a single message. 297func (o *messageSizeObserver) Observe(message proto.Message) { 298 s := uint64(proto.Size(message)) 299 o.onSingleFunc(s) 300 301 o.totalSizeBytes.Add(s) 302} 303 304// FinishRPC records the total size of all sent messages during the course of a single RPC call. 305// This function should only be called once the RPC call has completed. 306func (o *messageSizeObserver) FinishRPC() { 307 o.finishOnce.Do(func() { 308 o.onFinishFunc(o.totalSizeBytes.Load()) 309 }) 310} 311 312var ( 313 _ grpc.ServerStream = &observingServerStream{} 314 _ grpc.ClientStream = &observingClientStream{} 315) 316 317var ( 318 _ grpc.UnaryServerInterceptor = UnaryServerInterceptor 319 _ grpc.StreamServerInterceptor = StreamServerInterceptor 320 _ grpc.UnaryClientInterceptor = UnaryClientInterceptor 321 _ grpc.StreamClientInterceptor = StreamClientInterceptor 322)