fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Command zoekt-webserver responds to search queries, using an index generated
16// by another program such as zoekt-indexserver.
17
18package main
19
20import (
21 "context"
22 "crypto/tls"
23 "errors"
24 "flag"
25 "fmt"
26 "html/template"
27 "io"
28 "log"
29 "net"
30 "net/http"
31 "net/http/httputil"
32 "net/url"
33 "os"
34 "os/signal"
35 "path/filepath"
36 "runtime"
37 "strconv"
38 "strings"
39 "sync"
40 "time"
41
42 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
43 "github.com/sourcegraph/mountinfo"
44 "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
45 "golang.org/x/net/http2"
46 "golang.org/x/net/http2/h2c"
47 "google.golang.org/grpc"
48
49 "github.com/sourcegraph/zoekt"
50 "github.com/sourcegraph/zoekt/build"
51 zoektgrpc "github.com/sourcegraph/zoekt/cmd/zoekt-webserver/grpc/server"
52 "github.com/sourcegraph/zoekt/debugserver"
53 "github.com/sourcegraph/zoekt/grpc/internalerrs"
54 "github.com/sourcegraph/zoekt/grpc/messagesize"
55 "github.com/sourcegraph/zoekt/grpc/propagator"
56 proto "github.com/sourcegraph/zoekt/grpc/protos/zoekt/webserver/v1"
57 "github.com/sourcegraph/zoekt/internal/profiler"
58 "github.com/sourcegraph/zoekt/internal/tenant"
59 "github.com/sourcegraph/zoekt/internal/tracer"
60 "github.com/sourcegraph/zoekt/query"
61 "github.com/sourcegraph/zoekt/shards"
62 "github.com/sourcegraph/zoekt/trace"
63 "github.com/sourcegraph/zoekt/web"
64
65 "github.com/opentracing/opentracing-go"
66 "github.com/prometheus/client_golang/prometheus"
67 "github.com/prometheus/client_golang/prometheus/promauto"
68 "github.com/shirou/gopsutil/v3/disk"
69 sglog "github.com/sourcegraph/log"
70 "github.com/uber/jaeger-client-go"
71 oteltrace "go.opentelemetry.io/otel/trace"
72 "go.uber.org/automaxprocs/maxprocs"
73)
74
75const logFormat = "2006-01-02T15-04-05.999999999Z07"
76
77func divertLogs(dir string, interval time.Duration) {
78 t := time.NewTicker(interval)
79 var last *os.File
80 for {
81 nm := filepath.Join(dir, fmt.Sprintf("zoekt-webserver.%s.%d.log", time.Now().Format(logFormat), os.Getpid()))
82 fmt.Fprintf(os.Stderr, "writing logs to %s\n", nm)
83
84 f, err := os.Create(nm)
85 if err != nil {
86 // There is not much we can do now.
87 fmt.Fprintf(os.Stderr, "can't create output file %s: %v\n", nm, err)
88 os.Exit(2)
89 }
90
91 log.SetOutput(f)
92 last.Close()
93
94 last = f
95
96 <-t.C
97 }
98}
99
100const templateExtension = ".html.tpl"
101
102func loadTemplates(tpl *template.Template, dir string) error {
103 fs, err := filepath.Glob(dir + "/*" + templateExtension)
104 if err != nil {
105 log.Fatalf("Glob: %v", err)
106 }
107
108 log.Printf("loading templates: %v", fs)
109 for _, fn := range fs {
110 content, err := os.ReadFile(fn)
111 if err != nil {
112 return err
113 }
114
115 base := filepath.Base(fn)
116 base = strings.TrimSuffix(base, templateExtension)
117 if _, err := tpl.New(base).Parse(string(content)); err != nil {
118 return fmt.Errorf("template.Parse(%s): %v", fn, err)
119 }
120 }
121 return nil
122}
123
124func writeTemplates(dir string) error {
125 if dir == "" {
126 return fmt.Errorf("must set --template_dir")
127 }
128
129 for k, v := range web.TemplateText {
130 nm := filepath.Join(dir, k+templateExtension)
131 if err := os.WriteFile(nm, []byte(v), 0o644); err != nil {
132 return err
133 }
134 }
135 return nil
136}
137
138func main() {
139 logDir := flag.String("log_dir", "", "log to this directory rather than stderr.")
140 logRefresh := flag.Duration("log_refresh", 24*time.Hour, "if using --log_dir, start writing a new file this often.")
141
142 listen := flag.String("listen", ":6070", "listen on this address.")
143 index := flag.String("index", build.DefaultDir, "set index directory to use")
144 html := flag.Bool("html", true, "enable HTML interface")
145 enableRPC := flag.Bool("rpc", false, "enable go/net RPC")
146 enableIndexserverProxy := flag.Bool("indexserver_proxy", false, "proxy requests with URLs matching the path /indexserver/ to <index>/indexserver.sock")
147 print := flag.Bool("print", false, "enable local result URLs")
148 enablePprof := flag.Bool("pprof", false, "set to enable remote profiling.")
149 sslCert := flag.String("ssl_cert", "", "set path to SSL .pem holding certificate.")
150 sslKey := flag.String("ssl_key", "", "set path to SSL .pem holding key.")
151 hostCustomization := flag.String(
152 "host_customization", "",
153 "specify host customization, as HOST1=QUERY,HOST2=QUERY")
154
155 templateDir := flag.String("template_dir", "", "set directory from which to load custom .html.tpl template files")
156 dumpTemplates := flag.Bool("dump_templates", false, "dump templates into --template_dir and exit.")
157 version := flag.Bool("version", false, "Print version number")
158
159 flag.Parse()
160
161 if *version {
162 fmt.Printf("zoekt-webserver version %q\n", zoekt.Version)
163 os.Exit(0)
164 }
165
166 if *dumpTemplates {
167 if err := writeTemplates(*templateDir); err != nil {
168 log.Fatal(err)
169 }
170 os.Exit(0)
171 }
172
173 resource := sglog.Resource{
174 Name: "zoekt-webserver",
175 Version: zoekt.Version,
176 InstanceID: zoekt.HostnameBestEffort(),
177 }
178
179 liblog := sglog.Init(resource)
180 defer liblog.Sync()
181 tracer.Init(resource)
182 profiler.Init("zoekt-webserver")
183
184 if *logDir != "" {
185 if fi, err := os.Lstat(*logDir); err != nil || !fi.IsDir() {
186 log.Fatalf("%s is not a directory", *logDir)
187 }
188 // We could do fdup acrobatics to also redirect
189 // stderr, but it is simpler and more portable for the
190 // caller to divert stderr output if necessary.
191 go divertLogs(*logDir, *logRefresh)
192 }
193
194 // Tune GOMAXPROCS to match Linux container CPU quota.
195 _, _ = maxprocs.Set()
196
197 if err := os.MkdirAll(*index, 0o755); err != nil {
198 log.Fatal(err)
199 }
200
201 mustRegisterDiskMonitor(*index)
202
203 metricsLogger := sglog.Scoped("metricsRegistration")
204
205 mustRegisterMemoryMapMetrics(metricsLogger)
206
207 opts := mountinfo.CollectorOpts{Namespace: "zoekt_webserver"}
208 c := mountinfo.NewCollector(metricsLogger, opts, map[string]string{"indexDir": *index})
209
210 prometheus.DefaultRegisterer.MustRegister(c)
211
212 // Do not block on loading shards so we can become partially available
213 // sooner. Otherwise on large instances zoekt can be unavailable on the
214 // order of minutes.
215 searcher, err := shards.NewDirectorySearcherFast(*index)
216 if err != nil {
217 log.Fatal(err)
218 }
219
220 searcher = &loggedSearcher{
221 Streamer: searcher,
222 Logger: sglog.Scoped("searcher"),
223 }
224
225 s := &web.Server{
226 Searcher: searcher,
227 Top: web.Top,
228 Version: zoekt.Version,
229 }
230
231 if *templateDir != "" {
232 if err := loadTemplates(s.Top, *templateDir); err != nil {
233 log.Fatalf("loadTemplates: %v", err)
234 }
235 }
236
237 s.Print = *print
238 s.HTML = *html
239 s.RPC = *enableRPC
240
241 if *hostCustomization != "" {
242 s.HostCustomQueries = map[string]string{}
243 for _, h := range strings.SplitN(*hostCustomization, ",", -1) {
244 if len(h) == 0 {
245 continue
246 }
247 fields := strings.SplitN(h, "=", 2)
248 if len(fields) < 2 {
249 log.Fatalf("invalid host_customization %q", h)
250 }
251
252 s.HostCustomQueries[fields[0]] = fields[1]
253 }
254 }
255
256 serveMux, err := web.NewMux(s)
257 if err != nil {
258 log.Fatal(err)
259 }
260
261 debugserver.AddHandlers(serveMux, *enablePprof)
262
263 if *enableIndexserverProxy {
264 socket := filepath.Join(*index, "indexserver.sock")
265 sglog.Scoped("server").Info("adding reverse proxy", sglog.String("socket", socket))
266 addProxyHandler(serveMux, socket)
267 }
268
269 handler := trace.Middleware(serveMux)
270
271 // Sourcegraph: We use environment variables to configure watchdog since
272 // they are more convenient than flags in containerized environments.
273 watchdogTick := 30 * time.Second
274 if v := os.Getenv("ZOEKT_WATCHDOG_TICK"); v != "" {
275 watchdogTick, _ = time.ParseDuration(v)
276 log.Printf("custom ZOEKT_WATCHDOG_TICK=%v", watchdogTick)
277 }
278
279 watchdogErrCount := 3
280 if v := os.Getenv("ZOEKT_WATCHDOG_ERRORS"); v != "" {
281 watchdogErrCount, _ = strconv.Atoi(v)
282 log.Printf("custom ZOEKT_WATCHDOG_ERRORS=%d", watchdogErrCount)
283 }
284
285 watchdogAddr := "http://" + *listen
286 if *sslCert != "" || *sslKey != "" {
287 watchdogAddr = "https://" + *listen
288 }
289 watchdogAddr += "/healthz"
290
291 if watchdogErrCount > 0 && watchdogTick > 0 {
292 go watchdog(watchdogTick, watchdogErrCount, watchdogAddr)
293 } else {
294 log.Println("watchdog disabled")
295 }
296
297 logger := sglog.Scoped("ZoektWebserverGRPCServer")
298
299 streamer := web.NewTraceAwareSearcher(s.Searcher)
300 grpcServer := newGRPCServer(logger, streamer)
301
302 handler = multiplexGRPC(grpcServer, handler)
303
304 srv := &http.Server{
305 Addr: *listen,
306 Handler: handler,
307 }
308
309 go func() {
310 sglog.Scoped("server").Info("starting server", sglog.Stringp("address", listen))
311 var err error
312 if *sslCert != "" || *sslKey != "" {
313 err = srv.ListenAndServeTLS(*sslCert, *sslKey)
314 } else {
315 err = srv.ListenAndServe()
316 }
317
318 if err != http.ErrServerClosed {
319 // Fatal otherwise shutdownOnSignal will block
320 log.Fatalf("ListenAndServe: %v", err)
321 }
322 }()
323
324 if s.RPC {
325 // Our RPC system does not support shutdown and hijacks the underlying
326 // http connection. This means shutdown is ineffective and just waits 10s
327 // before calling close. Lets just quit faster in that case.
328 if err := closeOnSignal(srv); err != nil {
329 log.Fatalf("http.Server.Close: %v", err)
330 }
331 } else {
332 if err := shutdownOnSignal(srv); err != nil {
333 log.Fatalf("http.Server.Shutdown: %v", err)
334 }
335 }
336}
337
338// multiplexGRPC takes a gRPC server and a plain HTTP handler and multiplexes the
339// request handling. Any requests that declare themselves as gRPC requests are routed
340// to the gRPC server, all others are routed to the httpHandler.
341func multiplexGRPC(grpcServer *grpc.Server, httpHandler http.Handler) http.Handler {
342 newHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
343 if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
344 grpcServer.ServeHTTP(w, r)
345 } else {
346 httpHandler.ServeHTTP(w, r)
347 }
348 })
349
350 // Until we enable TLS, we need to fall back to the h2c protocol, which is
351 // basically HTTP2 without TLS. The standard library does not implement the
352 // h2s protocol, so this hijacks h2s requests and handles them correctly.
353 return h2c.NewHandler(newHandler, &http2.Server{})
354}
355
356// addProxyHandler adds a handler to "mux" that proxies all requests with base
357// /indexserver to "socket".
358func addProxyHandler(mux *http.ServeMux, socket string) {
359 proxy := httputil.NewSingleHostReverseProxy(&url.URL{
360 Scheme: "http",
361 // The value of "Host" is arbitrary, because it is ignored by the
362 // DialContext we use for the socket connection.
363 Host: "socket",
364 })
365 proxy.Transport = &http.Transport{
366 DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
367 var d net.Dialer
368 return d.DialContext(ctx, "unix", socket)
369 },
370 }
371 mux.Handle("/indexserver/", http.StripPrefix("/indexserver/", http.HandlerFunc(proxy.ServeHTTP)))
372}
373
374// shutdownSignalChan returns a channel which is listening for shutdown
375// signals from the operating system. maxReads is an upper bound on how many
376// times you will read the channel (used as buffer for signal.Notify).
377func shutdownSignalChan(maxReads int) <-chan os.Signal {
378 c := make(chan os.Signal, maxReads)
379 signal.Notify(c, os.Interrupt) // terminal C-c and goreman
380 signal.Notify(c, PLATFORM_SIGTERM) // Kubernetes
381 return c
382}
383
384// closeOnSignal will listen for SIGINT or SIGTERM and call srv.Close. This is
385// not a graceful shutdown, see shutdownOnSignal.
386func closeOnSignal(srv *http.Server) error {
387 c := shutdownSignalChan(1)
388 <-c
389
390 return srv.Close()
391}
392
393// shutdownOnSignal will listen for SIGINT or SIGTERM and call srv.Shutdown.
394// Note it doesn't call anything else for shutting down. Notably our RPC
395// framework doesn't allow us to drain connections, so when Shutdown we will
396// wait 10s before closing.
397//
398// Note: the call site for shutdownOnSignal should use closeOnSignal instead
399// if rpc mode is enabled due to the above limitation.
400func shutdownOnSignal(srv *http.Server) error {
401 c := shutdownSignalChan(2)
402 <-c
403
404 // If we receive another signal, immediate shutdown
405 ctx, cancel := context.WithCancel(context.Background())
406 defer cancel()
407 go func() {
408 select {
409 case <-ctx.Done():
410 case sig := <-c:
411 log.Printf("received another signal (%v), immediate shutdown", sig)
412 cancel()
413 }
414 }()
415
416 // Wait for 10s to drain ongoing requests. Kubernetes gives us 30s to
417 // shutdown, we have already used 15s waiting for our endpoint removal to
418 // propagate.
419 ctx, cancel2 := context.WithTimeout(ctx, 10*time.Second)
420 defer cancel2()
421
422 log.Printf("shutting down")
423 return srv.Shutdown(ctx)
424}
425
426func watchdogOnce(ctx context.Context, client *http.Client, addr string) error {
427 defer metricWatchdogTotal.Inc()
428
429 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(30*time.Second))
430 defer cancel()
431
432 req, err := http.NewRequest("GET", addr, nil)
433 if err != nil {
434 return err
435 }
436
437 req = req.WithContext(ctx)
438
439 resp, err := client.Do(req)
440 if err != nil {
441 return err
442 }
443 body, _ := io.ReadAll(resp.Body)
444 _ = resp.Body.Close()
445
446 if resp.StatusCode != http.StatusOK {
447 return fmt.Errorf("watchdog: status=%v body=%q", resp.StatusCode, string(body))
448 }
449 return nil
450}
451
452func watchdog(dt time.Duration, maxErrCount int, addr string) {
453 tr := &http.Transport{
454 TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
455 }
456 client := &http.Client{
457 Transport: tr,
458 }
459 tick := time.NewTicker(dt)
460
461 errCount := 0
462 for range tick.C {
463 err := watchdogOnce(context.Background(), client, addr)
464 if err != nil {
465 errCount++
466 metricWatchdogErrors.Set(float64(errCount))
467 metricWatchdogErrorsTotal.Inc()
468 if errCount >= maxErrCount {
469 log.Printf(`watchdog health check has consecutively failed %d times indicating is likely an unrecoverable error affecting zoekt. As such this process will exit with code 3.
470
471Final error: %v
472
473Possible remediations:
474- If this rarely happens, ignore and let your process manager restart zoekt.
475- Possibly under provisioned. Try increasing CPU or disk IO.
476- A bug. Reach out with logs and screenshots of metrics when this occurs.`, errCount, err)
477 os.Exit(3)
478 } else {
479 log.Printf("watchdog: failed, will try %d more times: %v", maxErrCount-errCount, err)
480 }
481 } else if errCount > 0 {
482 errCount = 0
483 metricWatchdogErrors.Set(float64(errCount))
484 log.Printf("watchdog: success, resetting error count")
485 }
486 }
487}
488
489func diskUsage(path string) (*disk.UsageStat, error) {
490 duPath := path
491 if runtime.GOOS == "windows" {
492 duPath = filepath.VolumeName(duPath)
493 }
494 usage, err := disk.Usage(duPath)
495 if err != nil {
496 return nil, fmt.Errorf("diskUsage: %w", err)
497 }
498 return usage, err
499}
500
501func mustRegisterDiskMonitor(path string) {
502 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
503 Name: "src_disk_space_available_bytes",
504 Help: "Amount of free space disk space.",
505 ConstLabels: prometheus.Labels{"path": path},
506 }, func() float64 {
507 // I know there is no error handling here, and I don't like it
508 // but there was no error handling in the previous version
509 // that used Statfs, either, so I'm assuming there's no need for it
510 usage, _ := diskUsage(path)
511 return float64(usage.Free)
512 }))
513
514 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
515 Name: "src_disk_space_total_bytes",
516 Help: "Amount of total disk space.",
517 ConstLabels: prometheus.Labels{"path": path},
518 }, func() float64 {
519 // I know there is no error handling here, and I don't like it
520 // but there was no error handling in the previous version
521 // that used Statfs, either, so I'm assuming there's no need for it
522 usage, _ := diskUsage(path)
523 return float64(usage.Total)
524 }))
525}
526
527type loggedSearcher struct {
528 zoekt.Streamer
529 Logger sglog.Logger
530}
531
532func (s *loggedSearcher) Search(
533 ctx context.Context,
534 q query.Q,
535 opts *zoekt.SearchOptions,
536) (sr *zoekt.SearchResult, err error) {
537 defer func() {
538 var stats *zoekt.Stats
539 if sr != nil {
540 stats = &sr.Stats
541 }
542 s.log(ctx, q, opts, stats, err)
543 }()
544
545 metricSearchRequestsTotal.Inc()
546 return s.Streamer.Search(ctx, q, opts)
547}
548
549func (s *loggedSearcher) StreamSearch(
550 ctx context.Context,
551 q query.Q,
552 opts *zoekt.SearchOptions,
553 sender zoekt.Sender,
554) error {
555 var stats zoekt.Stats
556
557 metricSearchRequestsTotal.Inc()
558 err := s.Streamer.StreamSearch(ctx, q, opts, zoekt.SenderFunc(func(event *zoekt.SearchResult) {
559 stats.Add(event.Stats)
560 sender.Send(event)
561 }))
562
563 s.log(ctx, q, opts, &stats, err)
564
565 return err
566}
567
568func (s *loggedSearcher) log(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, st *zoekt.Stats, err error) {
569 logger := s.Logger.
570 WithTrace(traceContext(ctx)).
571 With(
572 sglog.String("query", q.String()),
573 sglog.Bool("opts.EstimateDocCount", opts.EstimateDocCount),
574 sglog.Bool("opts.Whole", opts.Whole),
575 sglog.Int("opts.ShardMaxMatchCount", opts.ShardMaxMatchCount),
576 sglog.Int("opts.TotalMaxMatchCount", opts.TotalMaxMatchCount),
577 sglog.Duration("opts.MaxWallTime", opts.MaxWallTime),
578 sglog.Int("opts.MaxDocDisplayCount", opts.MaxDocDisplayCount),
579 sglog.Int("opts.MaxMatchDisplayCount", opts.MaxMatchDisplayCount),
580 )
581
582 if err != nil {
583 switch {
584 case errors.Is(err, context.Canceled):
585 logger.Warn("search canceled", sglog.Error(err))
586 case errors.Is(err, context.DeadlineExceeded):
587 logger.Warn("search timeout", sglog.Error(err))
588 default:
589 logger.Error("search failed", sglog.Error(err))
590 }
591 return
592 }
593
594 if st == nil {
595 return
596 }
597
598 logger.Debug("search",
599 sglog.Int64("stat.ContentBytesLoaded", st.ContentBytesLoaded),
600 sglog.Int64("stat.IndexBytesLoaded", st.IndexBytesLoaded),
601 sglog.Int("stat.Crashes", st.Crashes),
602 sglog.Duration("stat.Duration", st.Duration),
603 sglog.Int("stat.FileCount", st.FileCount),
604 sglog.Int("stat.ShardFilesConsidered", st.ShardFilesConsidered),
605 sglog.Int("stat.FilesConsidered", st.FilesConsidered),
606 sglog.Int("stat.FilesLoaded", st.FilesLoaded),
607 sglog.Int("stat.FilesSkipped", st.FilesSkipped),
608 sglog.Int("stat.ShardsScanned", st.ShardsScanned),
609 sglog.Int("stat.ShardsSkipped", st.ShardsSkipped),
610 sglog.Int("stat.ShardsSkippedFilter", st.ShardsSkippedFilter),
611 sglog.Int("stat.MatchCount", st.MatchCount),
612 sglog.Int("stat.NgramMatches", st.NgramMatches),
613 sglog.Int("stat.NgramLookups", st.NgramLookups),
614 sglog.Duration("stat.Wait", st.Wait),
615 sglog.Duration("stat.MatchTreeConstruction", st.MatchTreeConstruction),
616 sglog.Duration("stat.MatchTreeSearch", st.MatchTreeSearch),
617 sglog.Int("stat.RegexpsConsidered", st.RegexpsConsidered),
618 sglog.String("stat.FlushReason", st.FlushReason.String()),
619 )
620}
621
622func traceContext(ctx context.Context) sglog.TraceContext {
623 otSpan := opentracing.SpanFromContext(ctx)
624 if otSpan != nil {
625 if jaegerSpan, ok := otSpan.Context().(jaeger.SpanContext); ok {
626 return sglog.TraceContext{
627 TraceID: jaegerSpan.TraceID().String(),
628 SpanID: jaegerSpan.SpanID().String(),
629 }
630 }
631 }
632
633 if otelSpan := oteltrace.SpanFromContext(ctx).SpanContext(); otelSpan.IsValid() {
634 return sglog.TraceContext{
635 TraceID: otelSpan.TraceID().String(),
636 SpanID: otelSpan.SpanID().String(),
637 }
638 }
639
640 return sglog.TraceContext{}
641}
642
643func newGRPCServer(logger sglog.Logger, streamer zoekt.Streamer, additionalOpts ...grpc.ServerOption) *grpc.Server {
644 metrics := serverMetricsOnce()
645
646 opts := []grpc.ServerOption{
647 grpc.ChainStreamInterceptor(
648 propagator.StreamServerPropagator(tenant.Propagator{}),
649 tenant.StreamServerInterceptor,
650 otelgrpc.StreamServerInterceptor(),
651 metrics.StreamServerInterceptor(),
652 messagesize.StreamServerInterceptor,
653 internalerrs.LoggingStreamServerInterceptor(logger),
654 ),
655 grpc.ChainUnaryInterceptor(
656 propagator.UnaryServerPropagator(tenant.Propagator{}),
657 tenant.UnaryServerInterceptor,
658 otelgrpc.UnaryServerInterceptor(),
659 metrics.UnaryServerInterceptor(),
660 messagesize.UnaryServerInterceptor,
661 internalerrs.LoggingUnaryServerInterceptor(logger),
662 ),
663 }
664
665 opts = append(opts, additionalOpts...)
666
667 // Ensure that the message size options are set last, so they override any other
668 // server-specific options that tweak the message size.
669 //
670 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
671 // take precedence over everything else with a uniform size setting that's easy to reason about.
672 opts = append(opts, messagesize.MustGetServerMessageSizeFromEnv()...)
673
674 s := grpc.NewServer(opts...)
675 proto.RegisterWebserverServiceServer(s, zoektgrpc.NewServer(streamer))
676
677 return s
678}
679
680var (
681 metricWatchdogErrors = promauto.NewGauge(prometheus.GaugeOpts{
682 Name: "zoekt_webserver_watchdog_errors",
683 Help: "The current error count for zoekt watchdog.",
684 })
685 metricWatchdogTotal = promauto.NewCounter(prometheus.CounterOpts{
686 Name: "zoekt_webserver_watchdog_total",
687 Help: "The total number of requests done by zoekt watchdog.",
688 })
689 metricWatchdogErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{
690 Name: "zoekt_webserver_watchdog_errors_total",
691 Help: "The total number of errors from zoekt watchdog.",
692 })
693 metricSearchRequestsTotal = promauto.NewCounter(prometheus.CounterOpts{
694 Name: "zoekt_search_requests_total",
695 Help: "The total number of search requests that zoekt received",
696 })
697
698 // serviceMetricsOnce returns a singleton instance of the server metrics
699 // that are shared across all gRPC servers that this process creates.
700 //
701 // This function panics if the metrics cannot be registered with the default
702 // Prometheus registry.
703 serverMetricsOnce = sync.OnceValue(func() *grpcprom.ServerMetrics {
704 serverMetrics := grpcprom.NewServerMetrics(
705 grpcprom.WithServerCounterOptions(),
706 grpcprom.WithServerHandlingTimeHistogram(), // record the overall response latency for a gRPC request)
707 )
708 prometheus.DefaultRegisterer.MustRegister(serverMetrics)
709 return serverMetrics
710 })
711)