fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package internalerrs 2 3import ( 4 "context" 5 "io" 6 "sync" 7 8 "github.com/prometheus/client_golang/prometheus" 9 "github.com/prometheus/client_golang/prometheus/promauto" 10 "google.golang.org/grpc" 11 "google.golang.org/grpc/codes" 12 13 "github.com/sourcegraph/zoekt/grpc/grpcutil" 14) 15 16var metricGRPCMethodStatus = promauto.NewCounterVec(prometheus.CounterOpts{ 17 Name: "grpc_method_status", 18 Help: "Counts the number of gRPC methods that return a given status code, and whether a possible error is an go-grpc internal error.", 19}, 20 []string{ 21 "grpc_service", // e.g. "gitserver.v1.GitserverService" 22 "grpc_method", // e.g. "Exec" 23 "grpc_code", // e.g. "NotFound" 24 "is_internal_error", // e.g. "true" 25 }, 26) 27 28// PrometheusUnaryClientInterceptor returns a grpc.UnaryClientInterceptor that observes the result of 29// the RPC and records it as a Prometheus metric ("src_grpc_method_status"). 30func PrometheusUnaryClientInterceptor(ctx context.Context, fullMethod string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 31 serviceName, methodName := grpcutil.SplitMethodName(fullMethod) 32 33 err := invoker(ctx, fullMethod, req, reply, cc, opts...) 34 doObservation(serviceName, methodName, err) 35 return err 36} 37 38// PrometheusStreamClientInterceptor returns a grpc.StreamClientInterceptor that observes the result of 39// the RPC and records it as a Prometheus metric ("src_grpc_method_status"). 40// 41// If any errors are encountered during the stream, the first error is recorded. Otherwise, the 42// final status of the stream is recorded. 43func PrometheusStreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, fullMethod string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 44 serviceName, methodName := grpcutil.SplitMethodName(fullMethod) 45 46 s, err := streamer(ctx, desc, cc, fullMethod, opts...) 47 if err != nil { 48 doObservation(serviceName, methodName, err) // method failed to be invoked at all, record it 49 return nil, err 50 } 51 52 return newPrometheusServerStream(s, serviceName, methodName), err 53} 54 55// newPrometheusServerStream wraps a grpc.ClientStream to observe the first error 56// encountered during the stream, if any. 57func newPrometheusServerStream(s grpc.ClientStream, serviceName, methodName string) grpc.ClientStream { 58 // Design note: We only want a single observation for each RPC call: it either succeeds or fails 59 // with a single error. This ensures we do not double-count RPCs in Prometheus metrics. 60 // 61 // For unary calls this is straightforward, but for streaming RPCs we need to make a compromise. We only 62 // observe the first error (either sending or receiving) that occurs during the stream, instead of every 63 // error that occurs during the stream's lifespan. While this approach swallows some errors, it keeps the 64 // Prometheus metric count clean and non-duplicated. The logging interceptor handles surfacing all errors 65 // that are encountered during a stream. 66 var observeOnce sync.Once 67 68 return &callBackClientStream{ 69 ClientStream: s, 70 postMessageSend: func(_ any, err error) { 71 if err != nil { 72 observeOnce.Do(func() { 73 doObservation(serviceName, methodName, err) 74 }) 75 } 76 }, 77 postMessageReceive: func(_ any, err error) { 78 if err != nil { 79 if err == io.EOF { 80 // EOF signals end of stream, not an error. We handle this by setting err to nil, because 81 // we want to treat the stream as successfully completed. 82 err = nil 83 } 84 85 observeOnce.Do(func() { 86 doObservation(serviceName, methodName, err) 87 }) 88 } 89 }, 90 } 91} 92 93func doObservation(serviceName, methodName string, rpcErr error) { 94 if rpcErr == nil { 95 // No error occurred, so we record a successful call. 96 metricGRPCMethodStatus.WithLabelValues(serviceName, methodName, codes.OK.String(), "false").Inc() 97 return 98 } 99 100 s, ok := massageIntoStatusErr(rpcErr) 101 if !ok { 102 // An error occurred, but it was not an error that has a status.Status implementation. We record this as an unknown error. 103 metricGRPCMethodStatus.WithLabelValues(serviceName, methodName, codes.Unknown.String(), "false").Inc() 104 return 105 } 106 107 if !probablyInternalGRPCError(s, allCheckers) { 108 // An error occurred, but it was not an internal gRPC error. We record this as a non-internal error. 109 metricGRPCMethodStatus.WithLabelValues(serviceName, methodName, s.Code().String(), "false").Inc() 110 return 111 } 112 113 // An error occurred, and it looks like an internal gRPC error. We record this as an internal error. 114 metricGRPCMethodStatus.WithLabelValues(serviceName, methodName, s.Code().String(), "true").Inc() 115}