fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Command zoekt-webserver starts a server that responds to search queries, using
16// an index generated by another program such as zoekt-indexserver.
17package main
18
19import (
20 "context"
21 "crypto/tls"
22 "errors"
23 "flag"
24 "fmt"
25 "html/template"
26 "io"
27 "log"
28 "net"
29 "net/http"
30 "net/http/httputil"
31 "net/url"
32 "os"
33 "os/signal"
34 "path/filepath"
35 "strconv"
36 "strings"
37 "sync"
38 "time"
39
40 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
41 "github.com/sourcegraph/mountinfo"
42 "github.com/sourcegraph/zoekt/internal/debugserver"
43 "github.com/sourcegraph/zoekt/internal/shards"
44 "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
45 "golang.org/x/net/http2"
46 "golang.org/x/net/http2/h2c"
47 "golang.org/x/sys/unix"
48 "google.golang.org/grpc"
49
50 "github.com/opentracing/opentracing-go"
51 "github.com/prometheus/client_golang/prometheus"
52 "github.com/prometheus/client_golang/prometheus/promauto"
53 "github.com/shirou/gopsutil/v3/disk"
54 sglog "github.com/sourcegraph/log"
55 "github.com/sourcegraph/zoekt"
56 zoektgrpc "github.com/sourcegraph/zoekt/cmd/zoekt-webserver/grpc/server"
57 "github.com/sourcegraph/zoekt/grpc/internalerrs"
58 "github.com/sourcegraph/zoekt/grpc/messagesize"
59 "github.com/sourcegraph/zoekt/grpc/propagator"
60 proto "github.com/sourcegraph/zoekt/grpc/protos/zoekt/webserver/v1"
61 "github.com/sourcegraph/zoekt/index"
62 "github.com/sourcegraph/zoekt/internal/profiler"
63 "github.com/sourcegraph/zoekt/internal/tenant"
64 "github.com/sourcegraph/zoekt/internal/trace"
65 "github.com/sourcegraph/zoekt/internal/tracer"
66 "github.com/sourcegraph/zoekt/query"
67 "github.com/sourcegraph/zoekt/web"
68 "github.com/uber/jaeger-client-go"
69 oteltrace "go.opentelemetry.io/otel/trace"
70 "go.uber.org/automaxprocs/maxprocs"
71)
72
73const logFormat = "2006-01-02T15-04-05.999999999Z07"
74
75func divertLogs(dir string, interval time.Duration) {
76 t := time.NewTicker(interval)
77 var last *os.File
78 for {
79 nm := filepath.Join(dir, fmt.Sprintf("zoekt-webserver.%s.%d.log", time.Now().Format(logFormat), os.Getpid()))
80 fmt.Fprintf(os.Stderr, "writing logs to %s\n", nm)
81
82 f, err := os.Create(nm)
83 if err != nil {
84 // There is not much we can do now.
85 fmt.Fprintf(os.Stderr, "can't create output file %s: %v\n", nm, err)
86 os.Exit(2)
87 }
88
89 log.SetOutput(f)
90 last.Close()
91
92 last = f
93
94 <-t.C
95 }
96}
97
98const templateExtension = ".html.tpl"
99
100func loadTemplates(tpl *template.Template, dir string) error {
101 fs, err := filepath.Glob(dir + "/*" + templateExtension)
102 if err != nil {
103 log.Fatalf("Glob: %v", err)
104 }
105
106 log.Printf("loading templates: %v", fs)
107 for _, fn := range fs {
108 content, err := os.ReadFile(fn)
109 if err != nil {
110 return err
111 }
112
113 base := filepath.Base(fn)
114 base = strings.TrimSuffix(base, templateExtension)
115 if _, err := tpl.New(base).Parse(string(content)); err != nil {
116 return fmt.Errorf("template.Parse(%s): %v", fn, err)
117 }
118 }
119 return nil
120}
121
122func writeTemplates(dir string) error {
123 if dir == "" {
124 return fmt.Errorf("must set --template_dir")
125 }
126
127 for k, v := range web.TemplateText {
128 nm := filepath.Join(dir, k+templateExtension)
129 if err := os.WriteFile(nm, []byte(v), 0o644); err != nil {
130 return err
131 }
132 }
133 return nil
134}
135
136func main() {
137 logDir := flag.String("log_dir", "", "log to this directory rather than stderr.")
138 logRefresh := flag.Duration("log_refresh", 24*time.Hour, "if using --log_dir, start writing a new file this often.")
139
140 listen := flag.String("listen", ":6070", "listen on this address.")
141 indexDir := flag.String("index", index.DefaultDir, "set index directory to use")
142 html := flag.Bool("html", true, "enable HTML interface")
143 enableRPC := flag.Bool("rpc", false, "enable go/net RPC")
144 enableIndexserverProxy := flag.Bool("indexserver_proxy", false, "proxy requests with URLs matching the path /indexserver/ to <index>/indexserver.sock")
145 print := flag.Bool("print", false, "enable local result URLs")
146 enablePprof := flag.Bool("pprof", false, "set to enable remote profiling.")
147 sslCert := flag.String("ssl_cert", "", "set path to SSL .pem holding certificate.")
148 sslKey := flag.String("ssl_key", "", "set path to SSL .pem holding key.")
149 hostCustomization := flag.String(
150 "host_customization", "",
151 "specify host customization, as HOST1=QUERY,HOST2=QUERY")
152
153 templateDir := flag.String("template_dir", "", "set directory from which to load custom .html.tpl template files")
154 dumpTemplates := flag.Bool("dump_templates", false, "dump templates into --template_dir and exit.")
155 version := flag.Bool("version", false, "Print version number")
156
157 flag.Parse()
158
159 if *version {
160 fmt.Printf("zoekt-webserver version %q\n", index.Version)
161 os.Exit(0)
162 }
163
164 if *dumpTemplates {
165 if err := writeTemplates(*templateDir); err != nil {
166 log.Fatal(err)
167 }
168 os.Exit(0)
169 }
170
171 resource := sglog.Resource{
172 Name: "zoekt-webserver",
173 Version: index.Version,
174 InstanceID: index.HostnameBestEffort(),
175 }
176
177 liblog := sglog.Init(resource)
178 defer liblog.Sync()
179 tracer.Init(resource)
180 profiler.Init("zoekt-webserver")
181
182 if *logDir != "" {
183 if fi, err := os.Lstat(*logDir); err != nil || !fi.IsDir() {
184 log.Fatalf("%s is not a directory", *logDir)
185 }
186 // We could do fdup acrobatics to also redirect
187 // stderr, but it is simpler and more portable for the
188 // caller to divert stderr output if necessary.
189 go divertLogs(*logDir, *logRefresh)
190 }
191
192 // Tune GOMAXPROCS to match Linux container CPU quota.
193 _, _ = maxprocs.Set()
194
195 if err := os.MkdirAll(*indexDir, 0o755); err != nil {
196 log.Fatal(err)
197 }
198
199 mustRegisterDiskMonitor(*indexDir)
200
201 metricsLogger := sglog.Scoped("metricsRegistration")
202
203 mustRegisterMemoryMapMetrics(metricsLogger)
204
205 opts := mountinfo.CollectorOpts{Namespace: "zoekt_webserver"}
206 c := mountinfo.NewCollector(metricsLogger, opts, map[string]string{"indexDir": *indexDir})
207
208 prometheus.DefaultRegisterer.MustRegister(c)
209
210 // Do not block on loading shards so we can become partially available
211 // sooner. Otherwise on large instances zoekt can be unavailable on the
212 // order of minutes.
213 searcher, err := shards.NewDirectorySearcherFast(*indexDir)
214 if err != nil {
215 log.Fatal(err)
216 }
217
218 searcher = &loggedSearcher{
219 Streamer: searcher,
220 Logger: sglog.Scoped("searcher"),
221 }
222
223 s := &web.Server{
224 Searcher: searcher,
225 Top: web.Top,
226 Version: index.Version,
227 }
228
229 if *templateDir != "" {
230 if err := loadTemplates(s.Top, *templateDir); err != nil {
231 log.Fatalf("loadTemplates: %v", err)
232 }
233 }
234
235 s.Print = *print
236 s.HTML = *html
237 s.RPC = *enableRPC
238
239 if *hostCustomization != "" {
240 s.HostCustomQueries = map[string]string{}
241 for _, h := range strings.SplitN(*hostCustomization, ",", -1) {
242 if len(h) == 0 {
243 continue
244 }
245 fields := strings.SplitN(h, "=", 2)
246 if len(fields) < 2 {
247 log.Fatalf("invalid host_customization %q", h)
248 }
249
250 s.HostCustomQueries[fields[0]] = fields[1]
251 }
252 }
253
254 serveMux, err := web.NewMux(s)
255 if err != nil {
256 log.Fatal(err)
257 }
258
259 debugserver.AddHandlers(serveMux, *enablePprof)
260
261 if *enableIndexserverProxy {
262 socket := filepath.Join(*indexDir, "indexserver.sock")
263 sglog.Scoped("server").Info("adding reverse proxy", sglog.String("socket", socket))
264 addProxyHandler(serveMux, socket)
265 }
266
267 handler := trace.Middleware(serveMux)
268
269 // Sourcegraph: We use environment variables to configure watchdog since
270 // they are more convenient than flags in containerized environments.
271 watchdogTick := 30 * time.Second
272 if v := os.Getenv("ZOEKT_WATCHDOG_TICK"); v != "" {
273 watchdogTick, _ = time.ParseDuration(v)
274 log.Printf("custom ZOEKT_WATCHDOG_TICK=%v", watchdogTick)
275 }
276
277 watchdogErrCount := 3
278 if v := os.Getenv("ZOEKT_WATCHDOG_ERRORS"); v != "" {
279 watchdogErrCount, _ = strconv.Atoi(v)
280 log.Printf("custom ZOEKT_WATCHDOG_ERRORS=%d", watchdogErrCount)
281 }
282
283 watchdogAddr := "http://" + *listen
284 if *sslCert != "" || *sslKey != "" {
285 watchdogAddr = "https://" + *listen
286 }
287 watchdogAddr += "/healthz"
288
289 if watchdogErrCount > 0 && watchdogTick > 0 {
290 go watchdog(watchdogTick, watchdogErrCount, watchdogAddr)
291 } else {
292 log.Println("watchdog disabled")
293 }
294
295 logger := sglog.Scoped("ZoektWebserverGRPCServer")
296
297 streamer := web.NewTraceAwareSearcher(s.Searcher)
298 grpcServer := newGRPCServer(logger, streamer)
299
300 handler = multiplexGRPC(grpcServer, handler)
301
302 srv := &http.Server{
303 Addr: *listen,
304 Handler: handler,
305 }
306
307 go func() {
308 sglog.Scoped("server").Info("starting server", sglog.Stringp("address", listen))
309 var err error
310 if *sslCert != "" || *sslKey != "" {
311 err = srv.ListenAndServeTLS(*sslCert, *sslKey)
312 } else {
313 err = srv.ListenAndServe()
314 }
315
316 if err != http.ErrServerClosed {
317 // Fatal otherwise shutdownOnSignal will block
318 log.Fatalf("ListenAndServe: %v", err)
319 }
320 }()
321
322 if s.RPC {
323 // Our RPC system does not support shutdown and hijacks the underlying
324 // http connection. This means shutdown is ineffective and just waits 10s
325 // before calling close. Lets just quit faster in that case.
326 if err := closeOnSignal(srv); err != nil {
327 log.Fatalf("http.Server.Close: %v", err)
328 }
329 } else {
330 if err := shutdownOnSignal(srv); err != nil {
331 log.Fatalf("http.Server.Shutdown: %v", err)
332 }
333 }
334}
335
336// multiplexGRPC takes a gRPC server and a plain HTTP handler and multiplexes the
337// request handling. Any requests that declare themselves as gRPC requests are routed
338// to the gRPC server, all others are routed to the httpHandler.
339func multiplexGRPC(grpcServer *grpc.Server, httpHandler http.Handler) http.Handler {
340 newHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
341 if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
342 grpcServer.ServeHTTP(w, r)
343 } else {
344 httpHandler.ServeHTTP(w, r)
345 }
346 })
347
348 // Until we enable TLS, we need to fall back to the h2c protocol, which is
349 // basically HTTP2 without TLS. The standard library does not implement the
350 // h2s protocol, so this hijacks h2s requests and handles them correctly.
351 return h2c.NewHandler(newHandler, &http2.Server{})
352}
353
354// addProxyHandler adds a handler to "mux" that proxies all requests with base
355// /indexserver to "socket".
356func addProxyHandler(mux *http.ServeMux, socket string) {
357 proxy := httputil.NewSingleHostReverseProxy(&url.URL{
358 Scheme: "http",
359 // The value of "Host" is arbitrary, because it is ignored by the
360 // DialContext we use for the socket connection.
361 Host: "socket",
362 })
363 proxy.Transport = &http.Transport{
364 DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
365 var d net.Dialer
366 return d.DialContext(ctx, "unix", socket)
367 },
368 }
369 mux.Handle("/indexserver/", http.StripPrefix("/indexserver/", http.HandlerFunc(proxy.ServeHTTP)))
370}
371
372// shutdownSignalChan returns a channel which is listening for shutdown
373// signals from the operating system. maxReads is an upper bound on how many
374// times you will read the channel (used as buffer for signal.Notify).
375func shutdownSignalChan(maxReads int) <-chan os.Signal {
376 c := make(chan os.Signal, maxReads)
377 signal.Notify(c, os.Interrupt) // terminal C-c and goreman
378 signal.Notify(c, unix.SIGTERM) // Kubernetes
379 return c
380}
381
382// closeOnSignal will listen for SIGINT or SIGTERM and call srv.Close. This is
383// not a graceful shutdown, see shutdownOnSignal.
384func closeOnSignal(srv *http.Server) error {
385 c := shutdownSignalChan(1)
386 <-c
387
388 return srv.Close()
389}
390
391// shutdownOnSignal will listen for SIGINT or SIGTERM and call srv.Shutdown.
392// Note it doesn't call anything else for shutting down. Notably our RPC
393// framework doesn't allow us to drain connections, so when Shutdown we will
394// wait 10s before closing.
395//
396// Note: the call site for shutdownOnSignal should use closeOnSignal instead
397// if rpc mode is enabled due to the above limitation.
398func shutdownOnSignal(srv *http.Server) error {
399 c := shutdownSignalChan(2)
400 <-c
401
402 // If we receive another signal, immediate shutdown
403 ctx, cancel := context.WithCancel(context.Background())
404 defer cancel()
405 go func() {
406 select {
407 case <-ctx.Done():
408 case sig := <-c:
409 log.Printf("received another signal (%v), immediate shutdown", sig)
410 cancel()
411 }
412 }()
413
414 // Wait for 10s to drain ongoing requests. Kubernetes gives us 30s to
415 // shutdown, we have already used 15s waiting for our endpoint removal to
416 // propagate.
417 ctx, cancel2 := context.WithTimeout(ctx, 10*time.Second)
418 defer cancel2()
419
420 log.Printf("shutting down")
421 return srv.Shutdown(ctx)
422}
423
424func watchdogOnce(ctx context.Context, client *http.Client, addr string) error {
425 defer metricWatchdogTotal.Inc()
426
427 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(30*time.Second))
428 defer cancel()
429
430 req, err := http.NewRequest("GET", addr, nil)
431 if err != nil {
432 return err
433 }
434
435 req = req.WithContext(ctx)
436
437 resp, err := client.Do(req)
438 if err != nil {
439 return err
440 }
441 body, _ := io.ReadAll(resp.Body)
442 _ = resp.Body.Close()
443
444 if resp.StatusCode != http.StatusOK {
445 return fmt.Errorf("watchdog: status=%v body=%q", resp.StatusCode, string(body))
446 }
447 return nil
448}
449
450func watchdog(dt time.Duration, maxErrCount int, addr string) {
451 tr := &http.Transport{
452 TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
453 }
454 client := &http.Client{
455 Transport: tr,
456 }
457 tick := time.NewTicker(dt)
458
459 errCount := 0
460 for range tick.C {
461 err := watchdogOnce(context.Background(), client, addr)
462 if err != nil {
463 errCount++
464 metricWatchdogErrors.Set(float64(errCount))
465 metricWatchdogErrorsTotal.Inc()
466 if errCount >= maxErrCount {
467 log.Printf(`watchdog health check has consecutively failed %d times indicating is likely an unrecoverable error affecting zoekt. As such this process will exit with code 3.
468
469Final error: %v
470
471Possible remediations:
472- If this rarely happens, ignore and let your process manager restart zoekt.
473- Possibly under provisioned. Try increasing CPU or disk IO.
474- A bug. Reach out with logs and screenshots of metrics when this occurs.`, errCount, err)
475 os.Exit(3)
476 } else {
477 log.Printf("watchdog: failed, will try %d more times: %v", maxErrCount-errCount, err)
478 }
479 } else if errCount > 0 {
480 errCount = 0
481 metricWatchdogErrors.Set(float64(errCount))
482 log.Printf("watchdog: success, resetting error count")
483 }
484 }
485}
486
487func mustRegisterDiskMonitor(path string) {
488 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
489 Name: "src_disk_space_available_bytes",
490 Help: "Amount of free space disk space.",
491 ConstLabels: prometheus.Labels{"path": path},
492 }, func() float64 {
493 usage, _ := disk.Usage(path)
494 return float64(usage.Free)
495 }))
496
497 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
498 Name: "src_disk_space_total_bytes",
499 Help: "Amount of total disk space.",
500 ConstLabels: prometheus.Labels{"path": path},
501 }, func() float64 {
502 usage, _ := disk.Usage(path)
503 return float64(usage.Total)
504 }))
505}
506
507type loggedSearcher struct {
508 zoekt.Streamer
509 Logger sglog.Logger
510}
511
512func (s *loggedSearcher) Search(
513 ctx context.Context,
514 q query.Q,
515 opts *zoekt.SearchOptions,
516) (sr *zoekt.SearchResult, err error) {
517 defer func() {
518 var stats *zoekt.Stats
519 if sr != nil {
520 stats = &sr.Stats
521 }
522 s.log(ctx, q, opts, stats, err)
523 }()
524
525 metricSearchRequestsTotal.Inc()
526 return s.Streamer.Search(ctx, q, opts)
527}
528
529func (s *loggedSearcher) StreamSearch(
530 ctx context.Context,
531 q query.Q,
532 opts *zoekt.SearchOptions,
533 sender zoekt.Sender,
534) error {
535 var stats zoekt.Stats
536
537 metricSearchRequestsTotal.Inc()
538 err := s.Streamer.StreamSearch(ctx, q, opts, zoekt.SenderFunc(func(event *zoekt.SearchResult) {
539 stats.Add(event.Stats)
540 sender.Send(event)
541 }))
542
543 s.log(ctx, q, opts, &stats, err)
544
545 return err
546}
547
548func (s *loggedSearcher) log(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, st *zoekt.Stats, err error) {
549 logger := s.Logger.
550 WithTrace(traceContext(ctx)).
551 With(
552 sglog.String("query", q.String()),
553 sglog.Bool("opts.EstimateDocCount", opts.EstimateDocCount),
554 sglog.Bool("opts.Whole", opts.Whole),
555 sglog.Int("opts.ShardMaxMatchCount", opts.ShardMaxMatchCount),
556 sglog.Int("opts.TotalMaxMatchCount", opts.TotalMaxMatchCount),
557 sglog.Duration("opts.MaxWallTime", opts.MaxWallTime),
558 sglog.Int("opts.MaxDocDisplayCount", opts.MaxDocDisplayCount),
559 sglog.Int("opts.MaxMatchDisplayCount", opts.MaxMatchDisplayCount),
560 )
561
562 if err != nil {
563 switch {
564 case errors.Is(err, context.Canceled):
565 logger.Warn("search canceled", sglog.Error(err))
566 case errors.Is(err, context.DeadlineExceeded):
567 logger.Warn("search timeout", sglog.Error(err))
568 default:
569 logger.Error("search failed", sglog.Error(err))
570 }
571 return
572 }
573
574 if st == nil {
575 return
576 }
577
578 logger.Debug("search",
579 sglog.Int64("stat.ContentBytesLoaded", st.ContentBytesLoaded),
580 sglog.Int64("stat.IndexBytesLoaded", st.IndexBytesLoaded),
581 sglog.Int("stat.Crashes", st.Crashes),
582 sglog.Duration("stat.Duration", st.Duration),
583 sglog.Int("stat.FileCount", st.FileCount),
584 sglog.Int("stat.ShardFilesConsidered", st.ShardFilesConsidered),
585 sglog.Int("stat.FilesConsidered", st.FilesConsidered),
586 sglog.Int("stat.FilesLoaded", st.FilesLoaded),
587 sglog.Int("stat.FilesSkipped", st.FilesSkipped),
588 sglog.Int("stat.ShardsScanned", st.ShardsScanned),
589 sglog.Int("stat.ShardsSkipped", st.ShardsSkipped),
590 sglog.Int("stat.ShardsSkippedFilter", st.ShardsSkippedFilter),
591 sglog.Int("stat.MatchCount", st.MatchCount),
592 sglog.Int("stat.NgramMatches", st.NgramMatches),
593 sglog.Int("stat.NgramLookups", st.NgramLookups),
594 sglog.Duration("stat.Wait", st.Wait),
595 sglog.Duration("stat.MatchTreeConstruction", st.MatchTreeConstruction),
596 sglog.Duration("stat.MatchTreeSearch", st.MatchTreeSearch),
597 sglog.Int("stat.RegexpsConsidered", st.RegexpsConsidered),
598 sglog.String("stat.FlushReason", st.FlushReason.String()),
599 )
600}
601
602func traceContext(ctx context.Context) sglog.TraceContext {
603 otSpan := opentracing.SpanFromContext(ctx)
604 if otSpan != nil {
605 if jaegerSpan, ok := otSpan.Context().(jaeger.SpanContext); ok {
606 return sglog.TraceContext{
607 TraceID: jaegerSpan.TraceID().String(),
608 SpanID: jaegerSpan.SpanID().String(),
609 }
610 }
611 }
612
613 if otelSpan := oteltrace.SpanFromContext(ctx).SpanContext(); otelSpan.IsValid() {
614 return sglog.TraceContext{
615 TraceID: otelSpan.TraceID().String(),
616 SpanID: otelSpan.SpanID().String(),
617 }
618 }
619
620 return sglog.TraceContext{}
621}
622
623func newGRPCServer(logger sglog.Logger, streamer zoekt.Streamer, additionalOpts ...grpc.ServerOption) *grpc.Server {
624 metrics := serverMetricsOnce()
625
626 opts := []grpc.ServerOption{
627 grpc.ChainStreamInterceptor(
628 propagator.StreamServerPropagator(tenant.Propagator{}),
629 tenant.StreamServerInterceptor,
630 otelgrpc.StreamServerInterceptor(),
631 metrics.StreamServerInterceptor(),
632 messagesize.StreamServerInterceptor,
633 internalerrs.LoggingStreamServerInterceptor(logger),
634 ),
635 grpc.ChainUnaryInterceptor(
636 propagator.UnaryServerPropagator(tenant.Propagator{}),
637 tenant.UnaryServerInterceptor,
638 otelgrpc.UnaryServerInterceptor(),
639 metrics.UnaryServerInterceptor(),
640 messagesize.UnaryServerInterceptor,
641 internalerrs.LoggingUnaryServerInterceptor(logger),
642 ),
643 }
644
645 opts = append(opts, additionalOpts...)
646
647 // Ensure that the message size options are set last, so they override any other
648 // server-specific options that tweak the message size.
649 //
650 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
651 // take precedence over everything else with a uniform size setting that's easy to reason about.
652 opts = append(opts, messagesize.MustGetServerMessageSizeFromEnv()...)
653
654 s := grpc.NewServer(opts...)
655 proto.RegisterWebserverServiceServer(s, zoektgrpc.NewServer(streamer))
656
657 return s
658}
659
660var (
661 metricWatchdogErrors = promauto.NewGauge(prometheus.GaugeOpts{
662 Name: "zoekt_webserver_watchdog_errors",
663 Help: "The current error count for zoekt watchdog.",
664 })
665 metricWatchdogTotal = promauto.NewCounter(prometheus.CounterOpts{
666 Name: "zoekt_webserver_watchdog_total",
667 Help: "The total number of requests done by zoekt watchdog.",
668 })
669 metricWatchdogErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{
670 Name: "zoekt_webserver_watchdog_errors_total",
671 Help: "The total number of errors from zoekt watchdog.",
672 })
673 metricSearchRequestsTotal = promauto.NewCounter(prometheus.CounterOpts{
674 Name: "zoekt_search_requests_total",
675 Help: "The total number of search requests that zoekt received",
676 })
677
678 // serviceMetricsOnce returns a singleton instance of the server metrics
679 // that are shared across all gRPC servers that this process creates.
680 //
681 // This function panics if the metrics cannot be registered with the default
682 // Prometheus registry.
683 serverMetricsOnce = sync.OnceValue(func() *grpcprom.ServerMetrics {
684 serverMetrics := grpcprom.NewServerMetrics(
685 grpcprom.WithServerCounterOptions(),
686 grpcprom.WithServerHandlingTimeHistogram(), // record the overall response latency for a gRPC request)
687 )
688 prometheus.DefaultRegisterer.MustRegister(serverMetrics)
689 return serverMetrics
690 })
691)