fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Command zoekt-webserver responds to search queries, using an index generated
16// by another program such as zoekt-indexserver.
17
18package main
19
20import (
21 "context"
22 "crypto/tls"
23 "errors"
24 "flag"
25 "fmt"
26 "html/template"
27 "io"
28 "log"
29 "net"
30 "net/http"
31 "net/http/httputil"
32 "net/url"
33 "os"
34 "os/signal"
35 "path/filepath"
36 "runtime"
37 "strconv"
38 "strings"
39 "sync"
40 "time"
41
42 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
43 "github.com/sourcegraph/mountinfo"
44 "github.com/sourcegraph/zoekt/internal/debugserver"
45 "github.com/sourcegraph/zoekt/internal/shards"
46 "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
47 "golang.org/x/net/http2"
48 "golang.org/x/net/http2/h2c"
49 "google.golang.org/grpc"
50
51 "github.com/opentracing/opentracing-go"
52 "github.com/prometheus/client_golang/prometheus"
53 "github.com/prometheus/client_golang/prometheus/promauto"
54 "github.com/shirou/gopsutil/v3/disk"
55 sglog "github.com/sourcegraph/log"
56 "github.com/sourcegraph/zoekt"
57 zoektgrpc "github.com/sourcegraph/zoekt/cmd/zoekt-webserver/grpc/server"
58 "github.com/sourcegraph/zoekt/grpc/internalerrs"
59 "github.com/sourcegraph/zoekt/grpc/messagesize"
60 "github.com/sourcegraph/zoekt/grpc/propagator"
61 proto "github.com/sourcegraph/zoekt/grpc/protos/zoekt/webserver/v1"
62 "github.com/sourcegraph/zoekt/index"
63 "github.com/sourcegraph/zoekt/internal/profiler"
64 "github.com/sourcegraph/zoekt/internal/tenant"
65 "github.com/sourcegraph/zoekt/internal/trace"
66 "github.com/sourcegraph/zoekt/internal/tracer"
67 "github.com/sourcegraph/zoekt/query"
68 "github.com/sourcegraph/zoekt/web"
69 "github.com/uber/jaeger-client-go"
70 oteltrace "go.opentelemetry.io/otel/trace"
71 "go.uber.org/automaxprocs/maxprocs"
72)
73
74const logFormat = "2006-01-02T15-04-05.999999999Z07"
75
76func divertLogs(dir string, interval time.Duration) {
77 t := time.NewTicker(interval)
78 var last *os.File
79 for {
80 nm := filepath.Join(dir, fmt.Sprintf("zoekt-webserver.%s.%d.log", time.Now().Format(logFormat), os.Getpid()))
81 fmt.Fprintf(os.Stderr, "writing logs to %s\n", nm)
82
83 f, err := os.Create(nm)
84 if err != nil {
85 // There is not much we can do now.
86 fmt.Fprintf(os.Stderr, "can't create output file %s: %v\n", nm, err)
87 os.Exit(2)
88 }
89
90 log.SetOutput(f)
91 last.Close()
92
93 last = f
94
95 <-t.C
96 }
97}
98
99const templateExtension = ".html.tpl"
100
101func loadTemplates(tpl *template.Template, dir string) error {
102 fs, err := filepath.Glob(dir + "/*" + templateExtension)
103 if err != nil {
104 log.Fatalf("Glob: %v", err)
105 }
106
107 log.Printf("loading templates: %v", fs)
108 for _, fn := range fs {
109 content, err := os.ReadFile(fn)
110 if err != nil {
111 return err
112 }
113
114 base := filepath.Base(fn)
115 base = strings.TrimSuffix(base, templateExtension)
116 if _, err := tpl.New(base).Parse(string(content)); err != nil {
117 return fmt.Errorf("template.Parse(%s): %v", fn, err)
118 }
119 }
120 return nil
121}
122
123func writeTemplates(dir string) error {
124 if dir == "" {
125 return fmt.Errorf("must set --template_dir")
126 }
127
128 for k, v := range web.TemplateText {
129 nm := filepath.Join(dir, k+templateExtension)
130 if err := os.WriteFile(nm, []byte(v), 0o644); err != nil {
131 return err
132 }
133 }
134 return nil
135}
136
137func main() {
138 logDir := flag.String("log_dir", "", "log to this directory rather than stderr.")
139 logRefresh := flag.Duration("log_refresh", 24*time.Hour, "if using --log_dir, start writing a new file this often.")
140
141 listen := flag.String("listen", ":6070", "listen on this address.")
142 indexDir := flag.String("index", index.DefaultDir, "set index directory to use")
143 html := flag.Bool("html", true, "enable HTML interface")
144 enableRPC := flag.Bool("rpc", false, "enable go/net RPC")
145 enableIndexserverProxy := flag.Bool("indexserver_proxy", false, "proxy requests with URLs matching the path /indexserver/ to <index>/indexserver.sock")
146 print := flag.Bool("print", false, "enable local result URLs")
147 enablePprof := flag.Bool("pprof", false, "set to enable remote profiling.")
148 sslCert := flag.String("ssl_cert", "", "set path to SSL .pem holding certificate.")
149 sslKey := flag.String("ssl_key", "", "set path to SSL .pem holding key.")
150 hostCustomization := flag.String(
151 "host_customization", "",
152 "specify host customization, as HOST1=QUERY,HOST2=QUERY")
153
154 templateDir := flag.String("template_dir", "", "set directory from which to load custom .html.tpl template files")
155 dumpTemplates := flag.Bool("dump_templates", false, "dump templates into --template_dir and exit.")
156 version := flag.Bool("version", false, "Print version number")
157
158 flag.Parse()
159
160 if *version {
161 fmt.Printf("zoekt-webserver version %q\n", index.Version)
162 os.Exit(0)
163 }
164
165 if *dumpTemplates {
166 if err := writeTemplates(*templateDir); err != nil {
167 log.Fatal(err)
168 }
169 os.Exit(0)
170 }
171
172 resource := sglog.Resource{
173 Name: "zoekt-webserver",
174 Version: index.Version,
175 InstanceID: index.HostnameBestEffort(),
176 }
177
178 liblog := sglog.Init(resource)
179 defer liblog.Sync()
180 tracer.Init(resource)
181 profiler.Init("zoekt-webserver")
182
183 if *logDir != "" {
184 if fi, err := os.Lstat(*logDir); err != nil || !fi.IsDir() {
185 log.Fatalf("%s is not a directory", *logDir)
186 }
187 // We could do fdup acrobatics to also redirect
188 // stderr, but it is simpler and more portable for the
189 // caller to divert stderr output if necessary.
190 go divertLogs(*logDir, *logRefresh)
191 }
192
193 // Tune GOMAXPROCS to match Linux container CPU quota.
194 _, _ = maxprocs.Set()
195
196 if err := os.MkdirAll(*indexDir, 0o755); err != nil {
197 log.Fatal(err)
198 }
199
200 mustRegisterDiskMonitor(*indexDir)
201
202 metricsLogger := sglog.Scoped("metricsRegistration")
203
204 mustRegisterMemoryMapMetrics(metricsLogger)
205
206 opts := mountinfo.CollectorOpts{Namespace: "zoekt_webserver"}
207 c := mountinfo.NewCollector(metricsLogger, opts, map[string]string{"indexDir": *indexDir})
208
209 prometheus.DefaultRegisterer.MustRegister(c)
210
211 // Do not block on loading shards so we can become partially available
212 // sooner. Otherwise on large instances zoekt can be unavailable on the
213 // order of minutes.
214 searcher, err := shards.NewDirectorySearcherFast(*indexDir)
215 if err != nil {
216 log.Fatal(err)
217 }
218
219 searcher = &loggedSearcher{
220 Streamer: searcher,
221 Logger: sglog.Scoped("searcher"),
222 }
223
224 s := &web.Server{
225 Searcher: searcher,
226 Top: web.Top,
227 Version: index.Version,
228 }
229
230 if *templateDir != "" {
231 if err := loadTemplates(s.Top, *templateDir); err != nil {
232 log.Fatalf("loadTemplates: %v", err)
233 }
234 }
235
236 s.Print = *print
237 s.HTML = *html
238 s.RPC = *enableRPC
239
240 if *hostCustomization != "" {
241 s.HostCustomQueries = map[string]string{}
242 for _, h := range strings.SplitN(*hostCustomization, ",", -1) {
243 if len(h) == 0 {
244 continue
245 }
246 fields := strings.SplitN(h, "=", 2)
247 if len(fields) < 2 {
248 log.Fatalf("invalid host_customization %q", h)
249 }
250
251 s.HostCustomQueries[fields[0]] = fields[1]
252 }
253 }
254
255 serveMux, err := web.NewMux(s)
256 if err != nil {
257 log.Fatal(err)
258 }
259
260 debugserver.AddHandlers(serveMux, *enablePprof)
261
262 if *enableIndexserverProxy {
263 socket := filepath.Join(*indexDir, "indexserver.sock")
264 sglog.Scoped("server").Info("adding reverse proxy", sglog.String("socket", socket))
265 addProxyHandler(serveMux, socket)
266 }
267
268 handler := trace.Middleware(serveMux)
269
270 // Sourcegraph: We use environment variables to configure watchdog since
271 // they are more convenient than flags in containerized environments.
272 watchdogTick := 30 * time.Second
273 if v := os.Getenv("ZOEKT_WATCHDOG_TICK"); v != "" {
274 watchdogTick, _ = time.ParseDuration(v)
275 log.Printf("custom ZOEKT_WATCHDOG_TICK=%v", watchdogTick)
276 }
277
278 watchdogErrCount := 3
279 if v := os.Getenv("ZOEKT_WATCHDOG_ERRORS"); v != "" {
280 watchdogErrCount, _ = strconv.Atoi(v)
281 log.Printf("custom ZOEKT_WATCHDOG_ERRORS=%d", watchdogErrCount)
282 }
283
284 watchdogAddr := "http://" + *listen
285 if *sslCert != "" || *sslKey != "" {
286 watchdogAddr = "https://" + *listen
287 }
288 watchdogAddr += "/healthz"
289
290 if watchdogErrCount > 0 && watchdogTick > 0 {
291 go watchdog(watchdogTick, watchdogErrCount, watchdogAddr)
292 } else {
293 log.Println("watchdog disabled")
294 }
295
296 logger := sglog.Scoped("ZoektWebserverGRPCServer")
297
298 streamer := web.NewTraceAwareSearcher(s.Searcher)
299 grpcServer := newGRPCServer(logger, streamer)
300
301 handler = multiplexGRPC(grpcServer, handler)
302
303 srv := &http.Server{
304 Addr: *listen,
305 Handler: handler,
306 }
307
308 go func() {
309 sglog.Scoped("server").Info("starting server", sglog.Stringp("address", listen))
310 var err error
311 if *sslCert != "" || *sslKey != "" {
312 err = srv.ListenAndServeTLS(*sslCert, *sslKey)
313 } else {
314 err = srv.ListenAndServe()
315 }
316
317 if err != http.ErrServerClosed {
318 // Fatal otherwise shutdownOnSignal will block
319 log.Fatalf("ListenAndServe: %v", err)
320 }
321 }()
322
323 if s.RPC {
324 // Our RPC system does not support shutdown and hijacks the underlying
325 // http connection. This means shutdown is ineffective and just waits 10s
326 // before calling close. Lets just quit faster in that case.
327 if err := closeOnSignal(srv); err != nil {
328 log.Fatalf("http.Server.Close: %v", err)
329 }
330 } else {
331 if err := shutdownOnSignal(srv); err != nil {
332 log.Fatalf("http.Server.Shutdown: %v", err)
333 }
334 }
335}
336
337// multiplexGRPC takes a gRPC server and a plain HTTP handler and multiplexes the
338// request handling. Any requests that declare themselves as gRPC requests are routed
339// to the gRPC server, all others are routed to the httpHandler.
340func multiplexGRPC(grpcServer *grpc.Server, httpHandler http.Handler) http.Handler {
341 newHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
342 if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
343 grpcServer.ServeHTTP(w, r)
344 } else {
345 httpHandler.ServeHTTP(w, r)
346 }
347 })
348
349 // Until we enable TLS, we need to fall back to the h2c protocol, which is
350 // basically HTTP2 without TLS. The standard library does not implement the
351 // h2s protocol, so this hijacks h2s requests and handles them correctly.
352 return h2c.NewHandler(newHandler, &http2.Server{})
353}
354
355// addProxyHandler adds a handler to "mux" that proxies all requests with base
356// /indexserver to "socket".
357func addProxyHandler(mux *http.ServeMux, socket string) {
358 proxy := httputil.NewSingleHostReverseProxy(&url.URL{
359 Scheme: "http",
360 // The value of "Host" is arbitrary, because it is ignored by the
361 // DialContext we use for the socket connection.
362 Host: "socket",
363 })
364 proxy.Transport = &http.Transport{
365 DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
366 var d net.Dialer
367 return d.DialContext(ctx, "unix", socket)
368 },
369 }
370 mux.Handle("/indexserver/", http.StripPrefix("/indexserver/", http.HandlerFunc(proxy.ServeHTTP)))
371}
372
373// shutdownSignalChan returns a channel which is listening for shutdown
374// signals from the operating system. maxReads is an upper bound on how many
375// times you will read the channel (used as buffer for signal.Notify).
376func shutdownSignalChan(maxReads int) <-chan os.Signal {
377 c := make(chan os.Signal, maxReads)
378 signal.Notify(c, os.Interrupt) // terminal C-c and goreman
379 signal.Notify(c, PLATFORM_SIGTERM) // Kubernetes
380 return c
381}
382
383// closeOnSignal will listen for SIGINT or SIGTERM and call srv.Close. This is
384// not a graceful shutdown, see shutdownOnSignal.
385func closeOnSignal(srv *http.Server) error {
386 c := shutdownSignalChan(1)
387 <-c
388
389 return srv.Close()
390}
391
392// shutdownOnSignal will listen for SIGINT or SIGTERM and call srv.Shutdown.
393// Note it doesn't call anything else for shutting down. Notably our RPC
394// framework doesn't allow us to drain connections, so when Shutdown we will
395// wait 10s before closing.
396//
397// Note: the call site for shutdownOnSignal should use closeOnSignal instead
398// if rpc mode is enabled due to the above limitation.
399func shutdownOnSignal(srv *http.Server) error {
400 c := shutdownSignalChan(2)
401 <-c
402
403 // If we receive another signal, immediate shutdown
404 ctx, cancel := context.WithCancel(context.Background())
405 defer cancel()
406 go func() {
407 select {
408 case <-ctx.Done():
409 case sig := <-c:
410 log.Printf("received another signal (%v), immediate shutdown", sig)
411 cancel()
412 }
413 }()
414
415 // Wait for 10s to drain ongoing requests. Kubernetes gives us 30s to
416 // shutdown, we have already used 15s waiting for our endpoint removal to
417 // propagate.
418 ctx, cancel2 := context.WithTimeout(ctx, 10*time.Second)
419 defer cancel2()
420
421 log.Printf("shutting down")
422 return srv.Shutdown(ctx)
423}
424
425func watchdogOnce(ctx context.Context, client *http.Client, addr string) error {
426 defer metricWatchdogTotal.Inc()
427
428 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(30*time.Second))
429 defer cancel()
430
431 req, err := http.NewRequest("GET", addr, nil)
432 if err != nil {
433 return err
434 }
435
436 req = req.WithContext(ctx)
437
438 resp, err := client.Do(req)
439 if err != nil {
440 return err
441 }
442 body, _ := io.ReadAll(resp.Body)
443 _ = resp.Body.Close()
444
445 if resp.StatusCode != http.StatusOK {
446 return fmt.Errorf("watchdog: status=%v body=%q", resp.StatusCode, string(body))
447 }
448 return nil
449}
450
451func watchdog(dt time.Duration, maxErrCount int, addr string) {
452 tr := &http.Transport{
453 TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
454 }
455 client := &http.Client{
456 Transport: tr,
457 }
458 tick := time.NewTicker(dt)
459
460 errCount := 0
461 for range tick.C {
462 err := watchdogOnce(context.Background(), client, addr)
463 if err != nil {
464 errCount++
465 metricWatchdogErrors.Set(float64(errCount))
466 metricWatchdogErrorsTotal.Inc()
467 if errCount >= maxErrCount {
468 log.Printf(`watchdog health check has consecutively failed %d times indicating is likely an unrecoverable error affecting zoekt. As such this process will exit with code 3.
469
470Final error: %v
471
472Possible remediations:
473- If this rarely happens, ignore and let your process manager restart zoekt.
474- Possibly under provisioned. Try increasing CPU or disk IO.
475- A bug. Reach out with logs and screenshots of metrics when this occurs.`, errCount, err)
476 os.Exit(3)
477 } else {
478 log.Printf("watchdog: failed, will try %d more times: %v", maxErrCount-errCount, err)
479 }
480 } else if errCount > 0 {
481 errCount = 0
482 metricWatchdogErrors.Set(float64(errCount))
483 log.Printf("watchdog: success, resetting error count")
484 }
485 }
486}
487
488func diskUsage(path string) (*disk.UsageStat, error) {
489 duPath := path
490 if runtime.GOOS == "windows" {
491 duPath = filepath.VolumeName(duPath)
492 }
493 usage, err := disk.Usage(duPath)
494 if err != nil {
495 return nil, fmt.Errorf("diskUsage: %w", err)
496 }
497 return usage, err
498}
499
500func mustRegisterDiskMonitor(path string) {
501 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
502 Name: "src_disk_space_available_bytes",
503 Help: "Amount of free space disk space.",
504 ConstLabels: prometheus.Labels{"path": path},
505 }, func() float64 {
506 // I know there is no error handling here, and I don't like it
507 // but there was no error handling in the previous version
508 // that used Statfs, either, so I'm assuming there's no need for it
509 usage, _ := diskUsage(path)
510 return float64(usage.Free)
511 }))
512
513 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
514 Name: "src_disk_space_total_bytes",
515 Help: "Amount of total disk space.",
516 ConstLabels: prometheus.Labels{"path": path},
517 }, func() float64 {
518 // I know there is no error handling here, and I don't like it
519 // but there was no error handling in the previous version
520 // that used Statfs, either, so I'm assuming there's no need for it
521 usage, _ := diskUsage(path)
522 return float64(usage.Total)
523 }))
524}
525
526type loggedSearcher struct {
527 zoekt.Streamer
528 Logger sglog.Logger
529}
530
531func (s *loggedSearcher) Search(
532 ctx context.Context,
533 q query.Q,
534 opts *zoekt.SearchOptions,
535) (sr *zoekt.SearchResult, err error) {
536 defer func() {
537 var stats *zoekt.Stats
538 if sr != nil {
539 stats = &sr.Stats
540 }
541 s.log(ctx, q, opts, stats, err)
542 }()
543
544 metricSearchRequestsTotal.Inc()
545 return s.Streamer.Search(ctx, q, opts)
546}
547
548func (s *loggedSearcher) StreamSearch(
549 ctx context.Context,
550 q query.Q,
551 opts *zoekt.SearchOptions,
552 sender zoekt.Sender,
553) error {
554 var stats zoekt.Stats
555
556 metricSearchRequestsTotal.Inc()
557 err := s.Streamer.StreamSearch(ctx, q, opts, zoekt.SenderFunc(func(event *zoekt.SearchResult) {
558 stats.Add(event.Stats)
559 sender.Send(event)
560 }))
561
562 s.log(ctx, q, opts, &stats, err)
563
564 return err
565}
566
567func (s *loggedSearcher) log(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, st *zoekt.Stats, err error) {
568 logger := s.Logger.
569 WithTrace(traceContext(ctx)).
570 With(
571 sglog.String("query", q.String()),
572 sglog.Bool("opts.EstimateDocCount", opts.EstimateDocCount),
573 sglog.Bool("opts.Whole", opts.Whole),
574 sglog.Int("opts.ShardMaxMatchCount", opts.ShardMaxMatchCount),
575 sglog.Int("opts.TotalMaxMatchCount", opts.TotalMaxMatchCount),
576 sglog.Duration("opts.MaxWallTime", opts.MaxWallTime),
577 sglog.Int("opts.MaxDocDisplayCount", opts.MaxDocDisplayCount),
578 sglog.Int("opts.MaxMatchDisplayCount", opts.MaxMatchDisplayCount),
579 )
580
581 if err != nil {
582 switch {
583 case errors.Is(err, context.Canceled):
584 logger.Warn("search canceled", sglog.Error(err))
585 case errors.Is(err, context.DeadlineExceeded):
586 logger.Warn("search timeout", sglog.Error(err))
587 default:
588 logger.Error("search failed", sglog.Error(err))
589 }
590 return
591 }
592
593 if st == nil {
594 return
595 }
596
597 logger.Debug("search",
598 sglog.Int64("stat.ContentBytesLoaded", st.ContentBytesLoaded),
599 sglog.Int64("stat.IndexBytesLoaded", st.IndexBytesLoaded),
600 sglog.Int("stat.Crashes", st.Crashes),
601 sglog.Duration("stat.Duration", st.Duration),
602 sglog.Int("stat.FileCount", st.FileCount),
603 sglog.Int("stat.ShardFilesConsidered", st.ShardFilesConsidered),
604 sglog.Int("stat.FilesConsidered", st.FilesConsidered),
605 sglog.Int("stat.FilesLoaded", st.FilesLoaded),
606 sglog.Int("stat.FilesSkipped", st.FilesSkipped),
607 sglog.Int("stat.ShardsScanned", st.ShardsScanned),
608 sglog.Int("stat.ShardsSkipped", st.ShardsSkipped),
609 sglog.Int("stat.ShardsSkippedFilter", st.ShardsSkippedFilter),
610 sglog.Int("stat.MatchCount", st.MatchCount),
611 sglog.Int("stat.NgramMatches", st.NgramMatches),
612 sglog.Int("stat.NgramLookups", st.NgramLookups),
613 sglog.Duration("stat.Wait", st.Wait),
614 sglog.Duration("stat.MatchTreeConstruction", st.MatchTreeConstruction),
615 sglog.Duration("stat.MatchTreeSearch", st.MatchTreeSearch),
616 sglog.Int("stat.RegexpsConsidered", st.RegexpsConsidered),
617 sglog.String("stat.FlushReason", st.FlushReason.String()),
618 )
619}
620
621func traceContext(ctx context.Context) sglog.TraceContext {
622 otSpan := opentracing.SpanFromContext(ctx)
623 if otSpan != nil {
624 if jaegerSpan, ok := otSpan.Context().(jaeger.SpanContext); ok {
625 return sglog.TraceContext{
626 TraceID: jaegerSpan.TraceID().String(),
627 SpanID: jaegerSpan.SpanID().String(),
628 }
629 }
630 }
631
632 if otelSpan := oteltrace.SpanFromContext(ctx).SpanContext(); otelSpan.IsValid() {
633 return sglog.TraceContext{
634 TraceID: otelSpan.TraceID().String(),
635 SpanID: otelSpan.SpanID().String(),
636 }
637 }
638
639 return sglog.TraceContext{}
640}
641
642func newGRPCServer(logger sglog.Logger, streamer zoekt.Streamer, additionalOpts ...grpc.ServerOption) *grpc.Server {
643 metrics := serverMetricsOnce()
644
645 opts := []grpc.ServerOption{
646 grpc.ChainStreamInterceptor(
647 propagator.StreamServerPropagator(tenant.Propagator{}),
648 tenant.StreamServerInterceptor,
649 otelgrpc.StreamServerInterceptor(),
650 metrics.StreamServerInterceptor(),
651 messagesize.StreamServerInterceptor,
652 internalerrs.LoggingStreamServerInterceptor(logger),
653 ),
654 grpc.ChainUnaryInterceptor(
655 propagator.UnaryServerPropagator(tenant.Propagator{}),
656 tenant.UnaryServerInterceptor,
657 otelgrpc.UnaryServerInterceptor(),
658 metrics.UnaryServerInterceptor(),
659 messagesize.UnaryServerInterceptor,
660 internalerrs.LoggingUnaryServerInterceptor(logger),
661 ),
662 }
663
664 opts = append(opts, additionalOpts...)
665
666 // Ensure that the message size options are set last, so they override any other
667 // server-specific options that tweak the message size.
668 //
669 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
670 // take precedence over everything else with a uniform size setting that's easy to reason about.
671 opts = append(opts, messagesize.MustGetServerMessageSizeFromEnv()...)
672
673 s := grpc.NewServer(opts...)
674 proto.RegisterWebserverServiceServer(s, zoektgrpc.NewServer(streamer))
675
676 return s
677}
678
679var (
680 metricWatchdogErrors = promauto.NewGauge(prometheus.GaugeOpts{
681 Name: "zoekt_webserver_watchdog_errors",
682 Help: "The current error count for zoekt watchdog.",
683 })
684 metricWatchdogTotal = promauto.NewCounter(prometheus.CounterOpts{
685 Name: "zoekt_webserver_watchdog_total",
686 Help: "The total number of requests done by zoekt watchdog.",
687 })
688 metricWatchdogErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{
689 Name: "zoekt_webserver_watchdog_errors_total",
690 Help: "The total number of errors from zoekt watchdog.",
691 })
692 metricSearchRequestsTotal = promauto.NewCounter(prometheus.CounterOpts{
693 Name: "zoekt_search_requests_total",
694 Help: "The total number of search requests that zoekt received",
695 })
696
697 // serviceMetricsOnce returns a singleton instance of the server metrics
698 // that are shared across all gRPC servers that this process creates.
699 //
700 // This function panics if the metrics cannot be registered with the default
701 // Prometheus registry.
702 serverMetricsOnce = sync.OnceValue(func() *grpcprom.ServerMetrics {
703 serverMetrics := grpcprom.NewServerMetrics(
704 grpcprom.WithServerCounterOptions(),
705 grpcprom.WithServerHandlingTimeHistogram(), // record the overall response latency for a gRPC request)
706 )
707 prometheus.DefaultRegisterer.MustRegister(serverMetrics)
708 return serverMetrics
709 })
710)