fork of https://github.com/sourcegraph/zoekt
1package internalerrs
2
3import (
4 "context"
5 "io"
6 "sync"
7
8 "github.com/prometheus/client_golang/prometheus"
9 "github.com/prometheus/client_golang/prometheus/promauto"
10 "github.com/sourcegraph/zoekt/grpc/grpcutil"
11 "google.golang.org/grpc"
12 "google.golang.org/grpc/codes"
13)
14
15var metricGRPCMethodStatus = promauto.NewCounterVec(prometheus.CounterOpts{
16 Name: "grpc_method_status",
17 Help: "Counts the number of gRPC methods that return a given status code, and whether a possible error is an go-grpc internal error.",
18},
19 []string{
20 "grpc_service", // e.g. "gitserver.v1.GitserverService"
21 "grpc_method", // e.g. "Exec"
22 "grpc_code", // e.g. "NotFound"
23 "is_internal_error", // e.g. "true"
24 },
25)
26
27// PrometheusUnaryClientInterceptor returns a grpc.UnaryClientInterceptor that observes the result of
28// the RPC and records it as a Prometheus metric ("src_grpc_method_status").
29func PrometheusUnaryClientInterceptor(ctx context.Context, fullMethod string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
30 serviceName, methodName := grpcutil.SplitMethodName(fullMethod)
31
32 err := invoker(ctx, fullMethod, req, reply, cc, opts...)
33 doObservation(serviceName, methodName, err)
34 return err
35}
36
37// PrometheusStreamClientInterceptor returns a grpc.StreamClientInterceptor that observes the result of
38// the RPC and records it as a Prometheus metric ("src_grpc_method_status").
39//
40// If any errors are encountered during the stream, the first error is recorded. Otherwise, the
41// final status of the stream is recorded.
42func PrometheusStreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, fullMethod string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
43 serviceName, methodName := grpcutil.SplitMethodName(fullMethod)
44
45 s, err := streamer(ctx, desc, cc, fullMethod, opts...)
46 if err != nil {
47 doObservation(serviceName, methodName, err) // method failed to be invoked at all, record it
48 return nil, err
49 }
50
51 return newPrometheusServerStream(s, serviceName, methodName), err
52}
53
54// newPrometheusServerStream wraps a grpc.ClientStream to observe the first error
55// encountered during the stream, if any.
56func newPrometheusServerStream(s grpc.ClientStream, serviceName, methodName string) grpc.ClientStream {
57 // Design note: We only want a single observation for each RPC call: it either succeeds or fails
58 // with a single error. This ensures we do not double-count RPCs in Prometheus metrics.
59 //
60 // For unary calls this is straightforward, but for streaming RPCs we need to make a compromise. We only
61 // observe the first error (either sending or receiving) that occurs during the stream, instead of every
62 // error that occurs during the stream's lifespan. While this approach swallows some errors, it keeps the
63 // Prometheus metric count clean and non-duplicated. The logging interceptor handles surfacing all errors
64 // that are encountered during a stream.
65 var observeOnce sync.Once
66
67 return &callBackClientStream{
68 ClientStream: s,
69 postMessageSend: func(_ any, err error) {
70 if err != nil {
71 observeOnce.Do(func() {
72 doObservation(serviceName, methodName, err)
73 })
74 }
75 },
76 postMessageReceive: func(_ any, err error) {
77 if err != nil {
78 if err == io.EOF {
79 // EOF signals end of stream, not an error. We handle this by setting err to nil, because
80 // we want to treat the stream as successfully completed.
81 err = nil
82 }
83
84 observeOnce.Do(func() {
85 doObservation(serviceName, methodName, err)
86 })
87 }
88 },
89 }
90}
91
92func doObservation(serviceName, methodName string, rpcErr error) {
93 if rpcErr == nil {
94 // No error occurred, so we record a successful call.
95 metricGRPCMethodStatus.WithLabelValues(serviceName, methodName, codes.OK.String(), "false").Inc()
96 return
97 }
98
99 s, ok := massageIntoStatusErr(rpcErr)
100 if !ok {
101 // An error occurred, but it was not an error that has a status.Status implementation. We record this as an unknown error.
102 metricGRPCMethodStatus.WithLabelValues(serviceName, methodName, codes.Unknown.String(), "false").Inc()
103 return
104 }
105
106 if !probablyInternalGRPCError(s, allCheckers) {
107 // An error occurred, but it was not an internal gRPC error. We record this as a non-internal error.
108 metricGRPCMethodStatus.WithLabelValues(serviceName, methodName, s.Code().String(), "false").Inc()
109 return
110 }
111
112 // An error occurred, and it looks like an internal gRPC error. We record this as an internal error.
113 metricGRPCMethodStatus.WithLabelValues(serviceName, methodName, s.Code().String(), "true").Inc()
114}