fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Command zoekt-webserver responds to search queries, using an index generated
16// by another program such as zoekt-indexserver.
17
18package main
19
20import (
21 "context"
22 "crypto/tls"
23 "errors"
24 "flag"
25 "fmt"
26 "html/template"
27 "io"
28 "log"
29 "net"
30 "net/http"
31 "net/http/httputil"
32 "net/url"
33 "os"
34 "os/signal"
35 "path/filepath"
36 "runtime"
37 "strconv"
38 "strings"
39 "sync"
40 "time"
41
42 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
43 "github.com/sourcegraph/mountinfo"
44 zoektgrpc "github.com/sourcegraph/zoekt/cmd/zoekt-webserver/grpc/server"
45 "github.com/sourcegraph/zoekt/grpc/internalerrs"
46 "github.com/sourcegraph/zoekt/grpc/messagesize"
47 proto "github.com/sourcegraph/zoekt/grpc/protos/zoekt/webserver/v1"
48 "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
49 "golang.org/x/net/http2"
50 "golang.org/x/net/http2/h2c"
51 "google.golang.org/grpc"
52
53 "github.com/sourcegraph/zoekt"
54 "github.com/sourcegraph/zoekt/build"
55 "github.com/sourcegraph/zoekt/debugserver"
56 "github.com/sourcegraph/zoekt/internal/profiler"
57 "github.com/sourcegraph/zoekt/internal/tracer"
58 "github.com/sourcegraph/zoekt/query"
59 "github.com/sourcegraph/zoekt/shards"
60 "github.com/sourcegraph/zoekt/trace"
61 "github.com/sourcegraph/zoekt/web"
62
63 "github.com/opentracing/opentracing-go"
64 "github.com/prometheus/client_golang/prometheus"
65 "github.com/prometheus/client_golang/prometheus/promauto"
66 "github.com/shirou/gopsutil/v3/disk"
67 sglog "github.com/sourcegraph/log"
68 "github.com/uber/jaeger-client-go"
69 oteltrace "go.opentelemetry.io/otel/trace"
70 "go.uber.org/automaxprocs/maxprocs"
71)
72
73const logFormat = "2006-01-02T15-04-05.999999999Z07"
74
75func divertLogs(dir string, interval time.Duration) {
76 t := time.NewTicker(interval)
77 var last *os.File
78 for {
79 nm := filepath.Join(dir, fmt.Sprintf("zoekt-webserver.%s.%d.log", time.Now().Format(logFormat), os.Getpid()))
80 fmt.Fprintf(os.Stderr, "writing logs to %s\n", nm)
81
82 f, err := os.Create(nm)
83 if err != nil {
84 // There is not much we can do now.
85 fmt.Fprintf(os.Stderr, "can't create output file %s: %v\n", nm, err)
86 os.Exit(2)
87 }
88
89 log.SetOutput(f)
90 last.Close()
91
92 last = f
93
94 <-t.C
95 }
96}
97
98const templateExtension = ".html.tpl"
99
100func loadTemplates(tpl *template.Template, dir string) error {
101 fs, err := filepath.Glob(dir + "/*" + templateExtension)
102 if err != nil {
103 log.Fatalf("Glob: %v", err)
104 }
105
106 log.Printf("loading templates: %v", fs)
107 for _, fn := range fs {
108 content, err := os.ReadFile(fn)
109 if err != nil {
110 return err
111 }
112
113 base := filepath.Base(fn)
114 base = strings.TrimSuffix(base, templateExtension)
115 if _, err := tpl.New(base).Parse(string(content)); err != nil {
116 return fmt.Errorf("template.Parse(%s): %v", fn, err)
117 }
118 }
119 return nil
120}
121
122func writeTemplates(dir string) error {
123 if dir == "" {
124 return fmt.Errorf("must set --template_dir")
125 }
126
127 for k, v := range web.TemplateText {
128 nm := filepath.Join(dir, k+templateExtension)
129 if err := os.WriteFile(nm, []byte(v), 0o644); err != nil {
130 return err
131 }
132 }
133 return nil
134}
135
136func main() {
137 logDir := flag.String("log_dir", "", "log to this directory rather than stderr.")
138 logRefresh := flag.Duration("log_refresh", 24*time.Hour, "if using --log_dir, start writing a new file this often.")
139
140 listen := flag.String("listen", ":6070", "listen on this address.")
141 index := flag.String("index", build.DefaultDir, "set index directory to use")
142 html := flag.Bool("html", true, "enable HTML interface")
143 enableRPC := flag.Bool("rpc", false, "enable go/net RPC")
144 enableIndexserverProxy := flag.Bool("indexserver_proxy", false, "proxy requests with URLs matching the path /indexserver/ to <index>/indexserver.sock")
145 print := flag.Bool("print", false, "enable local result URLs")
146 enablePprof := flag.Bool("pprof", false, "set to enable remote profiling.")
147 sslCert := flag.String("ssl_cert", "", "set path to SSL .pem holding certificate.")
148 sslKey := flag.String("ssl_key", "", "set path to SSL .pem holding key.")
149 hostCustomization := flag.String(
150 "host_customization", "",
151 "specify host customization, as HOST1=QUERY,HOST2=QUERY")
152
153 templateDir := flag.String("template_dir", "", "set directory from which to load custom .html.tpl template files")
154 dumpTemplates := flag.Bool("dump_templates", false, "dump templates into --template_dir and exit.")
155 version := flag.Bool("version", false, "Print version number")
156
157 flag.Parse()
158
159 if *version {
160 fmt.Printf("zoekt-webserver version %q\n", zoekt.Version)
161 os.Exit(0)
162 }
163
164 if *dumpTemplates {
165 if err := writeTemplates(*templateDir); err != nil {
166 log.Fatal(err)
167 }
168 os.Exit(0)
169 }
170
171 resource := sglog.Resource{
172 Name: "zoekt-webserver",
173 Version: zoekt.Version,
174 InstanceID: zoekt.HostnameBestEffort(),
175 }
176
177 liblog := sglog.Init(resource)
178 defer liblog.Sync()
179 tracer.Init(resource)
180 profiler.Init("zoekt-webserver", zoekt.Version, -1)
181
182 if *logDir != "" {
183 if fi, err := os.Lstat(*logDir); err != nil || !fi.IsDir() {
184 log.Fatalf("%s is not a directory", *logDir)
185 }
186 // We could do fdup acrobatics to also redirect
187 // stderr, but it is simpler and more portable for the
188 // caller to divert stderr output if necessary.
189 go divertLogs(*logDir, *logRefresh)
190 }
191
192 // Tune GOMAXPROCS to match Linux container CPU quota.
193 _, _ = maxprocs.Set()
194
195 if err := os.MkdirAll(*index, 0o755); err != nil {
196 log.Fatal(err)
197 }
198
199 mustRegisterDiskMonitor(*index)
200
201 metricsLogger := sglog.Scoped("metricsRegistration")
202
203 mustRegisterMemoryMapMetrics(metricsLogger)
204
205 opts := mountinfo.CollectorOpts{Namespace: "zoekt_webserver"}
206 c := mountinfo.NewCollector(metricsLogger, opts, map[string]string{"indexDir": *index})
207
208 prometheus.DefaultRegisterer.MustRegister(c)
209
210 // Do not block on loading shards so we can become partially available
211 // sooner. Otherwise on large instances zoekt can be unavailable on the
212 // order of minutes.
213 searcher, err := shards.NewDirectorySearcherFast(*index)
214 if err != nil {
215 log.Fatal(err)
216 }
217
218 searcher = &loggedSearcher{
219 Streamer: searcher,
220 Logger: sglog.Scoped("searcher"),
221 }
222
223 s := &web.Server{
224 Searcher: searcher,
225 Top: web.Top,
226 Version: zoekt.Version,
227 }
228
229 if *templateDir != "" {
230 if err := loadTemplates(s.Top, *templateDir); err != nil {
231 log.Fatalf("loadTemplates: %v", err)
232 }
233 }
234
235 s.Print = *print
236 s.HTML = *html
237 s.RPC = *enableRPC
238
239 if *hostCustomization != "" {
240 s.HostCustomQueries = map[string]string{}
241 for _, h := range strings.SplitN(*hostCustomization, ",", -1) {
242 if len(h) == 0 {
243 continue
244 }
245 fields := strings.SplitN(h, "=", 2)
246 if len(fields) < 2 {
247 log.Fatalf("invalid host_customization %q", h)
248 }
249
250 s.HostCustomQueries[fields[0]] = fields[1]
251 }
252 }
253
254 serveMux, err := web.NewMux(s)
255 if err != nil {
256 log.Fatal(err)
257 }
258
259 debugserver.AddHandlers(serveMux, *enablePprof)
260
261 if *enableIndexserverProxy {
262 socket := filepath.Join(*index, "indexserver.sock")
263 sglog.Scoped("server").Info("adding reverse proxy", sglog.String("socket", socket))
264 addProxyHandler(serveMux, socket)
265 }
266
267 handler := trace.Middleware(serveMux)
268
269 // Sourcegraph: We use environment variables to configure watchdog since
270 // they are more convenient than flags in containerized environments.
271 watchdogTick := 30 * time.Second
272 if v := os.Getenv("ZOEKT_WATCHDOG_TICK"); v != "" {
273 watchdogTick, _ = time.ParseDuration(v)
274 log.Printf("custom ZOEKT_WATCHDOG_TICK=%v", watchdogTick)
275 }
276
277 watchdogErrCount := 3
278 if v := os.Getenv("ZOEKT_WATCHDOG_ERRORS"); v != "" {
279 watchdogErrCount, _ = strconv.Atoi(v)
280 log.Printf("custom ZOEKT_WATCHDOG_ERRORS=%d", watchdogErrCount)
281 }
282
283 watchdogAddr := "http://" + *listen
284 if *sslCert != "" || *sslKey != "" {
285 watchdogAddr = "https://" + *listen
286 }
287 watchdogAddr += "/healthz"
288
289 if watchdogErrCount > 0 && watchdogTick > 0 {
290 go watchdog(watchdogTick, watchdogErrCount, watchdogAddr)
291 } else {
292 log.Println("watchdog disabled")
293 }
294
295 logger := sglog.Scoped("ZoektWebserverGRPCServer")
296
297 streamer := web.NewTraceAwareSearcher(s.Searcher)
298 grpcServer := newGRPCServer(logger, streamer)
299
300 handler = multiplexGRPC(grpcServer, handler)
301
302 srv := &http.Server{
303 Addr: *listen,
304 Handler: handler,
305 }
306
307 go func() {
308 sglog.Scoped("server").Info("starting server", sglog.Stringp("address", listen))
309 var err error
310 if *sslCert != "" || *sslKey != "" {
311 err = srv.ListenAndServeTLS(*sslCert, *sslKey)
312 } else {
313 err = srv.ListenAndServe()
314 }
315
316 if err != http.ErrServerClosed {
317 // Fatal otherwise shutdownOnSignal will block
318 log.Fatalf("ListenAndServe: %v", err)
319 }
320 }()
321
322 if s.RPC {
323 // Our RPC system does not support shutdown and hijacks the underlying
324 // http connection. This means shutdown is ineffective and just waits 10s
325 // before calling close. Lets just quit faster in that case.
326 if err := closeOnSignal(srv); err != nil {
327 log.Fatalf("http.Server.Close: %v", err)
328 }
329 } else {
330 if err := shutdownOnSignal(srv); err != nil {
331 log.Fatalf("http.Server.Shutdown: %v", err)
332 }
333 }
334}
335
336// multiplexGRPC takes a gRPC server and a plain HTTP handler and multiplexes the
337// request handling. Any requests that declare themselves as gRPC requests are routed
338// to the gRPC server, all others are routed to the httpHandler.
339func multiplexGRPC(grpcServer *grpc.Server, httpHandler http.Handler) http.Handler {
340 newHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
341 if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
342 grpcServer.ServeHTTP(w, r)
343 } else {
344 httpHandler.ServeHTTP(w, r)
345 }
346 })
347
348 // Until we enable TLS, we need to fall back to the h2c protocol, which is
349 // basically HTTP2 without TLS. The standard library does not implement the
350 // h2s protocol, so this hijacks h2s requests and handles them correctly.
351 return h2c.NewHandler(newHandler, &http2.Server{})
352}
353
354// addProxyHandler adds a handler to "mux" that proxies all requests with base
355// /indexserver to "socket".
356func addProxyHandler(mux *http.ServeMux, socket string) {
357 proxy := httputil.NewSingleHostReverseProxy(&url.URL{
358 Scheme: "http",
359 // The value of "Host" is arbitrary, because it is ignored by the
360 // DialContext we use for the socket connection.
361 Host: "socket",
362 })
363 proxy.Transport = &http.Transport{
364 DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
365 var d net.Dialer
366 return d.DialContext(ctx, "unix", socket)
367 },
368 }
369 mux.Handle("/indexserver/", http.StripPrefix("/indexserver/", http.HandlerFunc(proxy.ServeHTTP)))
370}
371
372// shutdownSignalChan returns a channel which is listening for shutdown
373// signals from the operating system. maxReads is an upper bound on how many
374// times you will read the channel (used as buffer for signal.Notify).
375func shutdownSignalChan(maxReads int) <-chan os.Signal {
376 c := make(chan os.Signal, maxReads)
377 signal.Notify(c, os.Interrupt) // terminal C-c and goreman
378 signal.Notify(c, PLATFORM_SIGTERM) // Kubernetes
379 return c
380}
381
382// closeOnSignal will listen for SIGINT or SIGTERM and call srv.Close. This is
383// not a graceful shutdown, see shutdownOnSignal.
384func closeOnSignal(srv *http.Server) error {
385 c := shutdownSignalChan(1)
386 <-c
387
388 return srv.Close()
389}
390
391// shutdownOnSignal will listen for SIGINT or SIGTERM and call srv.Shutdown.
392// Note it doesn't call anything else for shutting down. Notably our RPC
393// framework doesn't allow us to drain connections, so when Shutdown we will
394// wait 10s before closing.
395//
396// Note: the call site for shutdownOnSignal should use closeOnSignal instead
397// if rpc mode is enabled due to the above limitation.
398func shutdownOnSignal(srv *http.Server) error {
399 c := shutdownSignalChan(2)
400 <-c
401
402 // If we receive another signal, immediate shutdown
403 ctx, cancel := context.WithCancel(context.Background())
404 defer cancel()
405 go func() {
406 select {
407 case <-ctx.Done():
408 case sig := <-c:
409 log.Printf("received another signal (%v), immediate shutdown", sig)
410 cancel()
411 }
412 }()
413
414 // Wait for 10s to drain ongoing requests. Kubernetes gives us 30s to
415 // shutdown, we have already used 15s waiting for our endpoint removal to
416 // propagate.
417 ctx, cancel2 := context.WithTimeout(ctx, 10*time.Second)
418 defer cancel2()
419
420 log.Printf("shutting down")
421 return srv.Shutdown(ctx)
422}
423
424func watchdogOnce(ctx context.Context, client *http.Client, addr string) error {
425 defer metricWatchdogTotal.Inc()
426
427 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(30*time.Second))
428 defer cancel()
429
430 req, err := http.NewRequest("GET", addr, nil)
431 if err != nil {
432 return err
433 }
434
435 req = req.WithContext(ctx)
436
437 resp, err := client.Do(req)
438 if err != nil {
439 return err
440 }
441 body, _ := io.ReadAll(resp.Body)
442 _ = resp.Body.Close()
443
444 if resp.StatusCode != http.StatusOK {
445 return fmt.Errorf("watchdog: status=%v body=%q", resp.StatusCode, string(body))
446 }
447 return nil
448}
449
450func watchdog(dt time.Duration, maxErrCount int, addr string) {
451 tr := &http.Transport{
452 TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
453 }
454 client := &http.Client{
455 Transport: tr,
456 }
457 tick := time.NewTicker(dt)
458
459 errCount := 0
460 for range tick.C {
461 err := watchdogOnce(context.Background(), client, addr)
462 if err != nil {
463 errCount++
464 metricWatchdogErrors.Set(float64(errCount))
465 metricWatchdogErrorsTotal.Inc()
466 if errCount >= maxErrCount {
467 log.Printf(`watchdog health check has consecutively failed %d times indicating is likely an unrecoverable error affecting zoekt. As such this process will exit with code 3.
468
469Final error: %v
470
471Possible remediations:
472- If this rarely happens, ignore and let your process manager restart zoekt.
473- Possibly under provisioned. Try increasing CPU or disk IO.
474- A bug. Reach out with logs and screenshots of metrics when this occurs.`, errCount, err)
475 os.Exit(3)
476 } else {
477 log.Printf("watchdog: failed, will try %d more times: %v", maxErrCount-errCount, err)
478 }
479 } else if errCount > 0 {
480 errCount = 0
481 metricWatchdogErrors.Set(float64(errCount))
482 log.Printf("watchdog: success, resetting error count")
483 }
484 }
485}
486
487func diskUsage(path string) (*disk.UsageStat, error) {
488 duPath := path
489 if runtime.GOOS == "windows" {
490 duPath = filepath.VolumeName(duPath)
491 }
492 usage, err := disk.Usage(duPath)
493 if err != nil {
494 return nil, fmt.Errorf("diskUsage: %w", err)
495 }
496 return usage, err
497}
498
499func mustRegisterDiskMonitor(path string) {
500 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
501 Name: "src_disk_space_available_bytes",
502 Help: "Amount of free space disk space.",
503 ConstLabels: prometheus.Labels{"path": path},
504 }, func() float64 {
505 // I know there is no error handling here, and I don't like it
506 // but there was no error handling in the previous version
507 // that used Statfs, either, so I'm assuming there's no need for it
508 usage, _ := diskUsage(path)
509 return float64(usage.Free)
510 }))
511
512 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
513 Name: "src_disk_space_total_bytes",
514 Help: "Amount of total disk space.",
515 ConstLabels: prometheus.Labels{"path": path},
516 }, func() float64 {
517 // I know there is no error handling here, and I don't like it
518 // but there was no error handling in the previous version
519 // that used Statfs, either, so I'm assuming there's no need for it
520 usage, _ := diskUsage(path)
521 return float64(usage.Total)
522 }))
523}
524
525type loggedSearcher struct {
526 zoekt.Streamer
527 Logger sglog.Logger
528}
529
530func (s *loggedSearcher) Search(
531 ctx context.Context,
532 q query.Q,
533 opts *zoekt.SearchOptions,
534) (sr *zoekt.SearchResult, err error) {
535 defer func() {
536 var stats *zoekt.Stats
537 if sr != nil {
538 stats = &sr.Stats
539 }
540 s.log(ctx, q, opts, stats, err)
541 }()
542
543 metricSearchRequestsTotal.Inc()
544 return s.Streamer.Search(ctx, q, opts)
545}
546
547func (s *loggedSearcher) StreamSearch(
548 ctx context.Context,
549 q query.Q,
550 opts *zoekt.SearchOptions,
551 sender zoekt.Sender,
552) error {
553 var stats zoekt.Stats
554
555 metricSearchRequestsTotal.Inc()
556 err := s.Streamer.StreamSearch(ctx, q, opts, zoekt.SenderFunc(func(event *zoekt.SearchResult) {
557 stats.Add(event.Stats)
558 sender.Send(event)
559 }))
560
561 s.log(ctx, q, opts, &stats, err)
562
563 return err
564}
565
566func (s *loggedSearcher) log(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, st *zoekt.Stats, err error) {
567 logger := s.Logger.
568 WithTrace(traceContext(ctx)).
569 With(
570 sglog.String("query", q.String()),
571 sglog.Bool("opts.EstimateDocCount", opts.EstimateDocCount),
572 sglog.Bool("opts.Whole", opts.Whole),
573 sglog.Int("opts.ShardMaxMatchCount", opts.ShardMaxMatchCount),
574 sglog.Int("opts.TotalMaxMatchCount", opts.TotalMaxMatchCount),
575 sglog.Duration("opts.MaxWallTime", opts.MaxWallTime),
576 sglog.Int("opts.MaxDocDisplayCount", opts.MaxDocDisplayCount),
577 sglog.Int("opts.MaxMatchDisplayCount", opts.MaxMatchDisplayCount),
578 )
579
580 if err != nil {
581 switch {
582 case errors.Is(err, context.Canceled):
583 logger.Warn("search canceled", sglog.Error(err))
584 case errors.Is(err, context.DeadlineExceeded):
585 logger.Warn("search timeout", sglog.Error(err))
586 default:
587 logger.Error("search failed", sglog.Error(err))
588 }
589 return
590 }
591
592 if st == nil {
593 return
594 }
595
596 logger.Debug("search",
597 sglog.Int64("stat.ContentBytesLoaded", st.ContentBytesLoaded),
598 sglog.Int64("stat.IndexBytesLoaded", st.IndexBytesLoaded),
599 sglog.Int("stat.Crashes", st.Crashes),
600 sglog.Duration("stat.Duration", st.Duration),
601 sglog.Int("stat.FileCount", st.FileCount),
602 sglog.Int("stat.ShardFilesConsidered", st.ShardFilesConsidered),
603 sglog.Int("stat.FilesConsidered", st.FilesConsidered),
604 sglog.Int("stat.FilesLoaded", st.FilesLoaded),
605 sglog.Int("stat.FilesSkipped", st.FilesSkipped),
606 sglog.Int("stat.ShardsScanned", st.ShardsScanned),
607 sglog.Int("stat.ShardsSkipped", st.ShardsSkipped),
608 sglog.Int("stat.ShardsSkippedFilter", st.ShardsSkippedFilter),
609 sglog.Int("stat.MatchCount", st.MatchCount),
610 sglog.Int("stat.NgramMatches", st.NgramMatches),
611 sglog.Int("stat.NgramLookups", st.NgramLookups),
612 sglog.Duration("stat.Wait", st.Wait),
613 sglog.Duration("stat.MatchTreeConstruction", st.MatchTreeConstruction),
614 sglog.Duration("stat.MatchTreeSearch", st.MatchTreeSearch),
615 sglog.Int("stat.RegexpsConsidered", st.RegexpsConsidered),
616 sglog.String("stat.FlushReason", st.FlushReason.String()),
617 )
618}
619
620func traceContext(ctx context.Context) sglog.TraceContext {
621 otSpan := opentracing.SpanFromContext(ctx)
622 if otSpan != nil {
623 if jaegerSpan, ok := otSpan.Context().(jaeger.SpanContext); ok {
624 return sglog.TraceContext{
625 TraceID: jaegerSpan.TraceID().String(),
626 SpanID: jaegerSpan.SpanID().String(),
627 }
628 }
629 }
630
631 if otelSpan := oteltrace.SpanFromContext(ctx).SpanContext(); otelSpan.IsValid() {
632 return sglog.TraceContext{
633 TraceID: otelSpan.TraceID().String(),
634 SpanID: otelSpan.SpanID().String(),
635 }
636 }
637
638 return sglog.TraceContext{}
639}
640
641func newGRPCServer(logger sglog.Logger, streamer zoekt.Streamer, additionalOpts ...grpc.ServerOption) *grpc.Server {
642 metrics := mustGetServerMetrics()
643
644 opts := []grpc.ServerOption{
645 grpc.ChainStreamInterceptor(
646 otelgrpc.StreamServerInterceptor(),
647 metrics.StreamServerInterceptor(),
648 messagesize.StreamServerInterceptor,
649 internalerrs.LoggingStreamServerInterceptor(logger),
650 ),
651 grpc.ChainUnaryInterceptor(
652 otelgrpc.UnaryServerInterceptor(),
653 metrics.UnaryServerInterceptor(),
654 messagesize.UnaryServerInterceptor,
655 internalerrs.LoggingUnaryServerInterceptor(logger),
656 ),
657 }
658
659 opts = append(opts, additionalOpts...)
660
661 // Ensure that the message size options are set last, so they override any other
662 // server-specific options that tweak the message size.
663 //
664 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
665 // take precedence over everything else with a uniform size setting that's easy to reason about.
666 opts = append(opts, messagesize.MustGetServerMessageSizeFromEnv()...)
667
668 s := grpc.NewServer(opts...)
669 proto.RegisterWebserverServiceServer(s, zoektgrpc.NewServer(streamer))
670
671 return s
672}
673
674var (
675 metricWatchdogErrors = promauto.NewGauge(prometheus.GaugeOpts{
676 Name: "zoekt_webserver_watchdog_errors",
677 Help: "The current error count for zoekt watchdog.",
678 })
679 metricWatchdogTotal = promauto.NewCounter(prometheus.CounterOpts{
680 Name: "zoekt_webserver_watchdog_total",
681 Help: "The total number of requests done by zoekt watchdog.",
682 })
683 metricWatchdogErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{
684 Name: "zoekt_webserver_watchdog_errors_total",
685 Help: "The total number of errors from zoekt watchdog.",
686 })
687 metricSearchRequestsTotal = promauto.NewCounter(prometheus.CounterOpts{
688 Name: "zoekt_search_requests_total",
689 Help: "The total number of search requests that zoekt received",
690 })
691
692 serverMetricsOnce sync.Once
693 serverMetrics *grpcprom.ServerMetrics
694)
695
696// mustGetServerMetrics returns a singleton instance of the server metrics
697// that are shared across all gRPC servers that this process creates.
698//
699// This function panics if the metrics cannot be registered with the default
700// Prometheus registry.
701func mustGetServerMetrics() *grpcprom.ServerMetrics {
702 serverMetricsOnce.Do(func() {
703 serverMetrics = grpcprom.NewServerMetrics(
704 grpcprom.WithServerCounterOptions(),
705 grpcprom.WithServerHandlingTimeHistogram(), // record the overall response latency for a gRPC request)
706 )
707
708 prometheus.DefaultRegisterer.MustRegister(serverMetrics)
709 })
710
711 return serverMetrics
712}