fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package internalerrs 2 3import ( 4 "context" 5 "io" 6 "sync" 7 8 "github.com/prometheus/client_golang/prometheus" 9 "github.com/prometheus/client_golang/prometheus/promauto" 10 "github.com/sourcegraph/zoekt/grpc/grpcutil" 11 "google.golang.org/grpc" 12 "google.golang.org/grpc/codes" 13) 14 15var metricGRPCMethodStatus = promauto.NewCounterVec(prometheus.CounterOpts{ 16 Name: "grpc_method_status", 17 Help: "Counts the number of gRPC methods that return a given status code, and whether a possible error is an go-grpc internal error.", 18}, 19 []string{ 20 "grpc_service", // e.g. "gitserver.v1.GitserverService" 21 "grpc_method", // e.g. "Exec" 22 "grpc_code", // e.g. "NotFound" 23 "is_internal_error", // e.g. "true" 24 }, 25) 26 27// PrometheusUnaryClientInterceptor returns a grpc.UnaryClientInterceptor that observes the result of 28// the RPC and records it as a Prometheus metric ("src_grpc_method_status"). 29func PrometheusUnaryClientInterceptor(ctx context.Context, fullMethod string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 30 serviceName, methodName := grpcutil.SplitMethodName(fullMethod) 31 32 err := invoker(ctx, fullMethod, req, reply, cc, opts...) 33 doObservation(serviceName, methodName, err) 34 return err 35} 36 37// PrometheusStreamClientInterceptor returns a grpc.StreamClientInterceptor that observes the result of 38// the RPC and records it as a Prometheus metric ("src_grpc_method_status"). 39// 40// If any errors are encountered during the stream, the first error is recorded. Otherwise, the 41// final status of the stream is recorded. 42func PrometheusStreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, fullMethod string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 43 serviceName, methodName := grpcutil.SplitMethodName(fullMethod) 44 45 s, err := streamer(ctx, desc, cc, fullMethod, opts...) 46 if err != nil { 47 doObservation(serviceName, methodName, err) // method failed to be invoked at all, record it 48 return nil, err 49 } 50 51 return newPrometheusServerStream(s, serviceName, methodName), err 52} 53 54// newPrometheusServerStream wraps a grpc.ClientStream to observe the first error 55// encountered during the stream, if any. 56func newPrometheusServerStream(s grpc.ClientStream, serviceName, methodName string) grpc.ClientStream { 57 // Design note: We only want a single observation for each RPC call: it either succeeds or fails 58 // with a single error. This ensures we do not double-count RPCs in Prometheus metrics. 59 // 60 // For unary calls this is straightforward, but for streaming RPCs we need to make a compromise. We only 61 // observe the first error (either sending or receiving) that occurs during the stream, instead of every 62 // error that occurs during the stream's lifespan. While this approach swallows some errors, it keeps the 63 // Prometheus metric count clean and non-duplicated. The logging interceptor handles surfacing all errors 64 // that are encountered during a stream. 65 var observeOnce sync.Once 66 67 return &callBackClientStream{ 68 ClientStream: s, 69 postMessageSend: func(_ any, err error) { 70 if err != nil { 71 observeOnce.Do(func() { 72 doObservation(serviceName, methodName, err) 73 }) 74 } 75 }, 76 postMessageReceive: func(_ any, err error) { 77 if err != nil { 78 if err == io.EOF { 79 // EOF signals end of stream, not an error. We handle this by setting err to nil, because 80 // we want to treat the stream as successfully completed. 81 err = nil 82 } 83 84 observeOnce.Do(func() { 85 doObservation(serviceName, methodName, err) 86 }) 87 } 88 }, 89 } 90} 91 92func doObservation(serviceName, methodName string, rpcErr error) { 93 if rpcErr == nil { 94 // No error occurred, so we record a successful call. 95 metricGRPCMethodStatus.WithLabelValues(serviceName, methodName, codes.OK.String(), "false").Inc() 96 return 97 } 98 99 s, ok := massageIntoStatusErr(rpcErr) 100 if !ok { 101 // An error occurred, but it was not an error that has a status.Status implementation. We record this as an unknown error. 102 metricGRPCMethodStatus.WithLabelValues(serviceName, methodName, codes.Unknown.String(), "false").Inc() 103 return 104 } 105 106 if !probablyInternalGRPCError(s, allCheckers) { 107 // An error occurred, but it was not an internal gRPC error. We record this as a non-internal error. 108 metricGRPCMethodStatus.WithLabelValues(serviceName, methodName, s.Code().String(), "false").Inc() 109 return 110 } 111 112 // An error occurred, and it looks like an internal gRPC error. We record this as an internal error. 113 metricGRPCMethodStatus.WithLabelValues(serviceName, methodName, s.Code().String(), "true").Inc() 114}