fork of https://github.com/sourcegraph/zoekt
1package internalerrs
2
3import (
4 "context"
5 "io"
6 "sync"
7
8 "github.com/prometheus/client_golang/prometheus"
9 "github.com/prometheus/client_golang/prometheus/promauto"
10 "google.golang.org/grpc"
11 "google.golang.org/grpc/codes"
12
13 "github.com/sourcegraph/zoekt/grpc/grpcutil"
14)
15
16var metricGRPCMethodStatus = promauto.NewCounterVec(prometheus.CounterOpts{
17 Name: "grpc_method_status",
18 Help: "Counts the number of gRPC methods that return a given status code, and whether a possible error is an go-grpc internal error.",
19},
20 []string{
21 "grpc_service", // e.g. "gitserver.v1.GitserverService"
22 "grpc_method", // e.g. "Exec"
23 "grpc_code", // e.g. "NotFound"
24 "is_internal_error", // e.g. "true"
25 },
26)
27
28// PrometheusUnaryClientInterceptor returns a grpc.UnaryClientInterceptor that observes the result of
29// the RPC and records it as a Prometheus metric ("src_grpc_method_status").
30func PrometheusUnaryClientInterceptor(ctx context.Context, fullMethod string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
31 serviceName, methodName := grpcutil.SplitMethodName(fullMethod)
32
33 err := invoker(ctx, fullMethod, req, reply, cc, opts...)
34 doObservation(serviceName, methodName, err)
35 return err
36}
37
38// PrometheusStreamClientInterceptor returns a grpc.StreamClientInterceptor that observes the result of
39// the RPC and records it as a Prometheus metric ("src_grpc_method_status").
40//
41// If any errors are encountered during the stream, the first error is recorded. Otherwise, the
42// final status of the stream is recorded.
43func PrometheusStreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, fullMethod string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
44 serviceName, methodName := grpcutil.SplitMethodName(fullMethod)
45
46 s, err := streamer(ctx, desc, cc, fullMethod, opts...)
47 if err != nil {
48 doObservation(serviceName, methodName, err) // method failed to be invoked at all, record it
49 return nil, err
50 }
51
52 return newPrometheusServerStream(s, serviceName, methodName), err
53}
54
55// newPrometheusServerStream wraps a grpc.ClientStream to observe the first error
56// encountered during the stream, if any.
57func newPrometheusServerStream(s grpc.ClientStream, serviceName, methodName string) grpc.ClientStream {
58 // Design note: We only want a single observation for each RPC call: it either succeeds or fails
59 // with a single error. This ensures we do not double-count RPCs in Prometheus metrics.
60 //
61 // For unary calls this is straightforward, but for streaming RPCs we need to make a compromise. We only
62 // observe the first error (either sending or receiving) that occurs during the stream, instead of every
63 // error that occurs during the stream's lifespan. While this approach swallows some errors, it keeps the
64 // Prometheus metric count clean and non-duplicated. The logging interceptor handles surfacing all errors
65 // that are encountered during a stream.
66 var observeOnce sync.Once
67
68 return &callBackClientStream{
69 ClientStream: s,
70 postMessageSend: func(_ any, err error) {
71 if err != nil {
72 observeOnce.Do(func() {
73 doObservation(serviceName, methodName, err)
74 })
75 }
76 },
77 postMessageReceive: func(_ any, err error) {
78 if err != nil {
79 if err == io.EOF {
80 // EOF signals end of stream, not an error. We handle this by setting err to nil, because
81 // we want to treat the stream as successfully completed.
82 err = nil
83 }
84
85 observeOnce.Do(func() {
86 doObservation(serviceName, methodName, err)
87 })
88 }
89 },
90 }
91}
92
93func doObservation(serviceName, methodName string, rpcErr error) {
94 if rpcErr == nil {
95 // No error occurred, so we record a successful call.
96 metricGRPCMethodStatus.WithLabelValues(serviceName, methodName, codes.OK.String(), "false").Inc()
97 return
98 }
99
100 s, ok := massageIntoStatusErr(rpcErr)
101 if !ok {
102 // An error occurred, but it was not an error that has a status.Status implementation. We record this as an unknown error.
103 metricGRPCMethodStatus.WithLabelValues(serviceName, methodName, codes.Unknown.String(), "false").Inc()
104 return
105 }
106
107 if !probablyInternalGRPCError(s, allCheckers) {
108 // An error occurred, but it was not an internal gRPC error. We record this as a non-internal error.
109 metricGRPCMethodStatus.WithLabelValues(serviceName, methodName, s.Code().String(), "false").Inc()
110 return
111 }
112
113 // An error occurred, and it looks like an internal gRPC error. We record this as an internal error.
114 metricGRPCMethodStatus.WithLabelValues(serviceName, methodName, s.Code().String(), "true").Inc()
115}