fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Command zoekt-webserver responds to search queries, using an index generated 16// by another program such as zoekt-indexserver. 17 18package main 19 20import ( 21 "context" 22 "crypto/tls" 23 "errors" 24 "flag" 25 "fmt" 26 "html/template" 27 "io" 28 "log" 29 "net" 30 "net/http" 31 "net/http/httputil" 32 "net/url" 33 "os" 34 "os/signal" 35 "path/filepath" 36 "runtime" 37 "strconv" 38 "strings" 39 "sync" 40 "time" 41 42 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 43 "github.com/sourcegraph/mountinfo" 44 "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" 45 "golang.org/x/net/http2" 46 "golang.org/x/net/http2/h2c" 47 "google.golang.org/grpc" 48 49 "github.com/sourcegraph/zoekt" 50 "github.com/sourcegraph/zoekt/build" 51 zoektgrpc "github.com/sourcegraph/zoekt/cmd/zoekt-webserver/grpc/server" 52 "github.com/sourcegraph/zoekt/debugserver" 53 "github.com/sourcegraph/zoekt/grpc/internalerrs" 54 "github.com/sourcegraph/zoekt/grpc/messagesize" 55 "github.com/sourcegraph/zoekt/grpc/propagator" 56 proto "github.com/sourcegraph/zoekt/grpc/protos/zoekt/webserver/v1" 57 "github.com/sourcegraph/zoekt/internal/profiler" 58 "github.com/sourcegraph/zoekt/internal/tenant" 59 "github.com/sourcegraph/zoekt/internal/tracer" 60 "github.com/sourcegraph/zoekt/query" 61 "github.com/sourcegraph/zoekt/shards" 62 "github.com/sourcegraph/zoekt/trace" 63 "github.com/sourcegraph/zoekt/web" 64 65 "github.com/opentracing/opentracing-go" 66 "github.com/prometheus/client_golang/prometheus" 67 "github.com/prometheus/client_golang/prometheus/promauto" 68 "github.com/shirou/gopsutil/v3/disk" 69 sglog "github.com/sourcegraph/log" 70 "github.com/uber/jaeger-client-go" 71 oteltrace "go.opentelemetry.io/otel/trace" 72 "go.uber.org/automaxprocs/maxprocs" 73) 74 75const logFormat = "2006-01-02T15-04-05.999999999Z07" 76 77func divertLogs(dir string, interval time.Duration) { 78 t := time.NewTicker(interval) 79 var last *os.File 80 for { 81 nm := filepath.Join(dir, fmt.Sprintf("zoekt-webserver.%s.%d.log", time.Now().Format(logFormat), os.Getpid())) 82 fmt.Fprintf(os.Stderr, "writing logs to %s\n", nm) 83 84 f, err := os.Create(nm) 85 if err != nil { 86 // There is not much we can do now. 87 fmt.Fprintf(os.Stderr, "can't create output file %s: %v\n", nm, err) 88 os.Exit(2) 89 } 90 91 log.SetOutput(f) 92 last.Close() 93 94 last = f 95 96 <-t.C 97 } 98} 99 100const templateExtension = ".html.tpl" 101 102func loadTemplates(tpl *template.Template, dir string) error { 103 fs, err := filepath.Glob(dir + "/*" + templateExtension) 104 if err != nil { 105 log.Fatalf("Glob: %v", err) 106 } 107 108 log.Printf("loading templates: %v", fs) 109 for _, fn := range fs { 110 content, err := os.ReadFile(fn) 111 if err != nil { 112 return err 113 } 114 115 base := filepath.Base(fn) 116 base = strings.TrimSuffix(base, templateExtension) 117 if _, err := tpl.New(base).Parse(string(content)); err != nil { 118 return fmt.Errorf("template.Parse(%s): %v", fn, err) 119 } 120 } 121 return nil 122} 123 124func writeTemplates(dir string) error { 125 if dir == "" { 126 return fmt.Errorf("must set --template_dir") 127 } 128 129 for k, v := range web.TemplateText { 130 nm := filepath.Join(dir, k+templateExtension) 131 if err := os.WriteFile(nm, []byte(v), 0o644); err != nil { 132 return err 133 } 134 } 135 return nil 136} 137 138func main() { 139 logDir := flag.String("log_dir", "", "log to this directory rather than stderr.") 140 logRefresh := flag.Duration("log_refresh", 24*time.Hour, "if using --log_dir, start writing a new file this often.") 141 142 listen := flag.String("listen", ":6070", "listen on this address.") 143 index := flag.String("index", build.DefaultDir, "set index directory to use") 144 html := flag.Bool("html", true, "enable HTML interface") 145 enableRPC := flag.Bool("rpc", false, "enable go/net RPC") 146 enableIndexserverProxy := flag.Bool("indexserver_proxy", false, "proxy requests with URLs matching the path /indexserver/ to <index>/indexserver.sock") 147 print := flag.Bool("print", false, "enable local result URLs") 148 enablePprof := flag.Bool("pprof", false, "set to enable remote profiling.") 149 sslCert := flag.String("ssl_cert", "", "set path to SSL .pem holding certificate.") 150 sslKey := flag.String("ssl_key", "", "set path to SSL .pem holding key.") 151 hostCustomization := flag.String( 152 "host_customization", "", 153 "specify host customization, as HOST1=QUERY,HOST2=QUERY") 154 155 templateDir := flag.String("template_dir", "", "set directory from which to load custom .html.tpl template files") 156 dumpTemplates := flag.Bool("dump_templates", false, "dump templates into --template_dir and exit.") 157 version := flag.Bool("version", false, "Print version number") 158 159 flag.Parse() 160 161 if *version { 162 fmt.Printf("zoekt-webserver version %q\n", zoekt.Version) 163 os.Exit(0) 164 } 165 166 if *dumpTemplates { 167 if err := writeTemplates(*templateDir); err != nil { 168 log.Fatal(err) 169 } 170 os.Exit(0) 171 } 172 173 resource := sglog.Resource{ 174 Name: "zoekt-webserver", 175 Version: zoekt.Version, 176 InstanceID: zoekt.HostnameBestEffort(), 177 } 178 179 liblog := sglog.Init(resource) 180 defer liblog.Sync() 181 tracer.Init(resource) 182 profiler.Init("zoekt-webserver") 183 184 if *logDir != "" { 185 if fi, err := os.Lstat(*logDir); err != nil || !fi.IsDir() { 186 log.Fatalf("%s is not a directory", *logDir) 187 } 188 // We could do fdup acrobatics to also redirect 189 // stderr, but it is simpler and more portable for the 190 // caller to divert stderr output if necessary. 191 go divertLogs(*logDir, *logRefresh) 192 } 193 194 // Tune GOMAXPROCS to match Linux container CPU quota. 195 _, _ = maxprocs.Set() 196 197 if err := os.MkdirAll(*index, 0o755); err != nil { 198 log.Fatal(err) 199 } 200 201 mustRegisterDiskMonitor(*index) 202 203 metricsLogger := sglog.Scoped("metricsRegistration") 204 205 mustRegisterMemoryMapMetrics(metricsLogger) 206 207 opts := mountinfo.CollectorOpts{Namespace: "zoekt_webserver"} 208 c := mountinfo.NewCollector(metricsLogger, opts, map[string]string{"indexDir": *index}) 209 210 prometheus.DefaultRegisterer.MustRegister(c) 211 212 // Do not block on loading shards so we can become partially available 213 // sooner. Otherwise on large instances zoekt can be unavailable on the 214 // order of minutes. 215 searcher, err := shards.NewDirectorySearcherFast(*index) 216 if err != nil { 217 log.Fatal(err) 218 } 219 220 searcher = &loggedSearcher{ 221 Streamer: searcher, 222 Logger: sglog.Scoped("searcher"), 223 } 224 225 s := &web.Server{ 226 Searcher: searcher, 227 Top: web.Top, 228 Version: zoekt.Version, 229 } 230 231 if *templateDir != "" { 232 if err := loadTemplates(s.Top, *templateDir); err != nil { 233 log.Fatalf("loadTemplates: %v", err) 234 } 235 } 236 237 s.Print = *print 238 s.HTML = *html 239 s.RPC = *enableRPC 240 241 if *hostCustomization != "" { 242 s.HostCustomQueries = map[string]string{} 243 for _, h := range strings.SplitN(*hostCustomization, ",", -1) { 244 if len(h) == 0 { 245 continue 246 } 247 fields := strings.SplitN(h, "=", 2) 248 if len(fields) < 2 { 249 log.Fatalf("invalid host_customization %q", h) 250 } 251 252 s.HostCustomQueries[fields[0]] = fields[1] 253 } 254 } 255 256 serveMux, err := web.NewMux(s) 257 if err != nil { 258 log.Fatal(err) 259 } 260 261 debugserver.AddHandlers(serveMux, *enablePprof) 262 263 if *enableIndexserverProxy { 264 socket := filepath.Join(*index, "indexserver.sock") 265 sglog.Scoped("server").Info("adding reverse proxy", sglog.String("socket", socket)) 266 addProxyHandler(serveMux, socket) 267 } 268 269 handler := trace.Middleware(serveMux) 270 271 // Sourcegraph: We use environment variables to configure watchdog since 272 // they are more convenient than flags in containerized environments. 273 watchdogTick := 30 * time.Second 274 if v := os.Getenv("ZOEKT_WATCHDOG_TICK"); v != "" { 275 watchdogTick, _ = time.ParseDuration(v) 276 log.Printf("custom ZOEKT_WATCHDOG_TICK=%v", watchdogTick) 277 } 278 279 watchdogErrCount := 3 280 if v := os.Getenv("ZOEKT_WATCHDOG_ERRORS"); v != "" { 281 watchdogErrCount, _ = strconv.Atoi(v) 282 log.Printf("custom ZOEKT_WATCHDOG_ERRORS=%d", watchdogErrCount) 283 } 284 285 watchdogAddr := "http://" + *listen 286 if *sslCert != "" || *sslKey != "" { 287 watchdogAddr = "https://" + *listen 288 } 289 watchdogAddr += "/healthz" 290 291 if watchdogErrCount > 0 && watchdogTick > 0 { 292 go watchdog(watchdogTick, watchdogErrCount, watchdogAddr) 293 } else { 294 log.Println("watchdog disabled") 295 } 296 297 logger := sglog.Scoped("ZoektWebserverGRPCServer") 298 299 streamer := web.NewTraceAwareSearcher(s.Searcher) 300 grpcServer := newGRPCServer(logger, streamer) 301 302 handler = multiplexGRPC(grpcServer, handler) 303 304 srv := &http.Server{ 305 Addr: *listen, 306 Handler: handler, 307 } 308 309 go func() { 310 sglog.Scoped("server").Info("starting server", sglog.Stringp("address", listen)) 311 var err error 312 if *sslCert != "" || *sslKey != "" { 313 err = srv.ListenAndServeTLS(*sslCert, *sslKey) 314 } else { 315 err = srv.ListenAndServe() 316 } 317 318 if err != http.ErrServerClosed { 319 // Fatal otherwise shutdownOnSignal will block 320 log.Fatalf("ListenAndServe: %v", err) 321 } 322 }() 323 324 if s.RPC { 325 // Our RPC system does not support shutdown and hijacks the underlying 326 // http connection. This means shutdown is ineffective and just waits 10s 327 // before calling close. Lets just quit faster in that case. 328 if err := closeOnSignal(srv); err != nil { 329 log.Fatalf("http.Server.Close: %v", err) 330 } 331 } else { 332 if err := shutdownOnSignal(srv); err != nil { 333 log.Fatalf("http.Server.Shutdown: %v", err) 334 } 335 } 336} 337 338// multiplexGRPC takes a gRPC server and a plain HTTP handler and multiplexes the 339// request handling. Any requests that declare themselves as gRPC requests are routed 340// to the gRPC server, all others are routed to the httpHandler. 341func multiplexGRPC(grpcServer *grpc.Server, httpHandler http.Handler) http.Handler { 342 newHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 343 if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") { 344 grpcServer.ServeHTTP(w, r) 345 } else { 346 httpHandler.ServeHTTP(w, r) 347 } 348 }) 349 350 // Until we enable TLS, we need to fall back to the h2c protocol, which is 351 // basically HTTP2 without TLS. The standard library does not implement the 352 // h2s protocol, so this hijacks h2s requests and handles them correctly. 353 return h2c.NewHandler(newHandler, &http2.Server{}) 354} 355 356// addProxyHandler adds a handler to "mux" that proxies all requests with base 357// /indexserver to "socket". 358func addProxyHandler(mux *http.ServeMux, socket string) { 359 proxy := httputil.NewSingleHostReverseProxy(&url.URL{ 360 Scheme: "http", 361 // The value of "Host" is arbitrary, because it is ignored by the 362 // DialContext we use for the socket connection. 363 Host: "socket", 364 }) 365 proxy.Transport = &http.Transport{ 366 DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { 367 var d net.Dialer 368 return d.DialContext(ctx, "unix", socket) 369 }, 370 } 371 mux.Handle("/indexserver/", http.StripPrefix("/indexserver/", http.HandlerFunc(proxy.ServeHTTP))) 372} 373 374// shutdownSignalChan returns a channel which is listening for shutdown 375// signals from the operating system. maxReads is an upper bound on how many 376// times you will read the channel (used as buffer for signal.Notify). 377func shutdownSignalChan(maxReads int) <-chan os.Signal { 378 c := make(chan os.Signal, maxReads) 379 signal.Notify(c, os.Interrupt) // terminal C-c and goreman 380 signal.Notify(c, PLATFORM_SIGTERM) // Kubernetes 381 return c 382} 383 384// closeOnSignal will listen for SIGINT or SIGTERM and call srv.Close. This is 385// not a graceful shutdown, see shutdownOnSignal. 386func closeOnSignal(srv *http.Server) error { 387 c := shutdownSignalChan(1) 388 <-c 389 390 return srv.Close() 391} 392 393// shutdownOnSignal will listen for SIGINT or SIGTERM and call srv.Shutdown. 394// Note it doesn't call anything else for shutting down. Notably our RPC 395// framework doesn't allow us to drain connections, so when Shutdown we will 396// wait 10s before closing. 397// 398// Note: the call site for shutdownOnSignal should use closeOnSignal instead 399// if rpc mode is enabled due to the above limitation. 400func shutdownOnSignal(srv *http.Server) error { 401 c := shutdownSignalChan(2) 402 <-c 403 404 // If we receive another signal, immediate shutdown 405 ctx, cancel := context.WithCancel(context.Background()) 406 defer cancel() 407 go func() { 408 select { 409 case <-ctx.Done(): 410 case sig := <-c: 411 log.Printf("received another signal (%v), immediate shutdown", sig) 412 cancel() 413 } 414 }() 415 416 // Wait for 10s to drain ongoing requests. Kubernetes gives us 30s to 417 // shutdown, we have already used 15s waiting for our endpoint removal to 418 // propagate. 419 ctx, cancel2 := context.WithTimeout(ctx, 10*time.Second) 420 defer cancel2() 421 422 log.Printf("shutting down") 423 return srv.Shutdown(ctx) 424} 425 426func watchdogOnce(ctx context.Context, client *http.Client, addr string) error { 427 defer metricWatchdogTotal.Inc() 428 429 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(30*time.Second)) 430 defer cancel() 431 432 req, err := http.NewRequest("GET", addr, nil) 433 if err != nil { 434 return err 435 } 436 437 req = req.WithContext(ctx) 438 439 resp, err := client.Do(req) 440 if err != nil { 441 return err 442 } 443 body, _ := io.ReadAll(resp.Body) 444 _ = resp.Body.Close() 445 446 if resp.StatusCode != http.StatusOK { 447 return fmt.Errorf("watchdog: status=%v body=%q", resp.StatusCode, string(body)) 448 } 449 return nil 450} 451 452func watchdog(dt time.Duration, maxErrCount int, addr string) { 453 tr := &http.Transport{ 454 TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, 455 } 456 client := &http.Client{ 457 Transport: tr, 458 } 459 tick := time.NewTicker(dt) 460 461 errCount := 0 462 for range tick.C { 463 err := watchdogOnce(context.Background(), client, addr) 464 if err != nil { 465 errCount++ 466 metricWatchdogErrors.Set(float64(errCount)) 467 metricWatchdogErrorsTotal.Inc() 468 if errCount >= maxErrCount { 469 log.Printf(`watchdog health check has consecutively failed %d times indicating is likely an unrecoverable error affecting zoekt. As such this process will exit with code 3. 470 471Final error: %v 472 473Possible remediations: 474- If this rarely happens, ignore and let your process manager restart zoekt. 475- Possibly under provisioned. Try increasing CPU or disk IO. 476- A bug. Reach out with logs and screenshots of metrics when this occurs.`, errCount, err) 477 os.Exit(3) 478 } else { 479 log.Printf("watchdog: failed, will try %d more times: %v", maxErrCount-errCount, err) 480 } 481 } else if errCount > 0 { 482 errCount = 0 483 metricWatchdogErrors.Set(float64(errCount)) 484 log.Printf("watchdog: success, resetting error count") 485 } 486 } 487} 488 489func diskUsage(path string) (*disk.UsageStat, error) { 490 duPath := path 491 if runtime.GOOS == "windows" { 492 duPath = filepath.VolumeName(duPath) 493 } 494 usage, err := disk.Usage(duPath) 495 if err != nil { 496 return nil, fmt.Errorf("diskUsage: %w", err) 497 } 498 return usage, err 499} 500 501func mustRegisterDiskMonitor(path string) { 502 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ 503 Name: "src_disk_space_available_bytes", 504 Help: "Amount of free space disk space.", 505 ConstLabels: prometheus.Labels{"path": path}, 506 }, func() float64 { 507 // I know there is no error handling here, and I don't like it 508 // but there was no error handling in the previous version 509 // that used Statfs, either, so I'm assuming there's no need for it 510 usage, _ := diskUsage(path) 511 return float64(usage.Free) 512 })) 513 514 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ 515 Name: "src_disk_space_total_bytes", 516 Help: "Amount of total disk space.", 517 ConstLabels: prometheus.Labels{"path": path}, 518 }, func() float64 { 519 // I know there is no error handling here, and I don't like it 520 // but there was no error handling in the previous version 521 // that used Statfs, either, so I'm assuming there's no need for it 522 usage, _ := diskUsage(path) 523 return float64(usage.Total) 524 })) 525} 526 527type loggedSearcher struct { 528 zoekt.Streamer 529 Logger sglog.Logger 530} 531 532func (s *loggedSearcher) Search( 533 ctx context.Context, 534 q query.Q, 535 opts *zoekt.SearchOptions, 536) (sr *zoekt.SearchResult, err error) { 537 defer func() { 538 var stats *zoekt.Stats 539 if sr != nil { 540 stats = &sr.Stats 541 } 542 s.log(ctx, q, opts, stats, err) 543 }() 544 545 metricSearchRequestsTotal.Inc() 546 return s.Streamer.Search(ctx, q, opts) 547} 548 549func (s *loggedSearcher) StreamSearch( 550 ctx context.Context, 551 q query.Q, 552 opts *zoekt.SearchOptions, 553 sender zoekt.Sender, 554) error { 555 var stats zoekt.Stats 556 557 metricSearchRequestsTotal.Inc() 558 err := s.Streamer.StreamSearch(ctx, q, opts, zoekt.SenderFunc(func(event *zoekt.SearchResult) { 559 stats.Add(event.Stats) 560 sender.Send(event) 561 })) 562 563 s.log(ctx, q, opts, &stats, err) 564 565 return err 566} 567 568func (s *loggedSearcher) log(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, st *zoekt.Stats, err error) { 569 logger := s.Logger. 570 WithTrace(traceContext(ctx)). 571 With( 572 sglog.String("query", q.String()), 573 sglog.Bool("opts.EstimateDocCount", opts.EstimateDocCount), 574 sglog.Bool("opts.Whole", opts.Whole), 575 sglog.Int("opts.ShardMaxMatchCount", opts.ShardMaxMatchCount), 576 sglog.Int("opts.TotalMaxMatchCount", opts.TotalMaxMatchCount), 577 sglog.Duration("opts.MaxWallTime", opts.MaxWallTime), 578 sglog.Int("opts.MaxDocDisplayCount", opts.MaxDocDisplayCount), 579 sglog.Int("opts.MaxMatchDisplayCount", opts.MaxMatchDisplayCount), 580 ) 581 582 if err != nil { 583 switch { 584 case errors.Is(err, context.Canceled): 585 logger.Warn("search canceled", sglog.Error(err)) 586 case errors.Is(err, context.DeadlineExceeded): 587 logger.Warn("search timeout", sglog.Error(err)) 588 default: 589 logger.Error("search failed", sglog.Error(err)) 590 } 591 return 592 } 593 594 if st == nil { 595 return 596 } 597 598 logger.Debug("search", 599 sglog.Int64("stat.ContentBytesLoaded", st.ContentBytesLoaded), 600 sglog.Int64("stat.IndexBytesLoaded", st.IndexBytesLoaded), 601 sglog.Int("stat.Crashes", st.Crashes), 602 sglog.Duration("stat.Duration", st.Duration), 603 sglog.Int("stat.FileCount", st.FileCount), 604 sglog.Int("stat.ShardFilesConsidered", st.ShardFilesConsidered), 605 sglog.Int("stat.FilesConsidered", st.FilesConsidered), 606 sglog.Int("stat.FilesLoaded", st.FilesLoaded), 607 sglog.Int("stat.FilesSkipped", st.FilesSkipped), 608 sglog.Int("stat.ShardsScanned", st.ShardsScanned), 609 sglog.Int("stat.ShardsSkipped", st.ShardsSkipped), 610 sglog.Int("stat.ShardsSkippedFilter", st.ShardsSkippedFilter), 611 sglog.Int("stat.MatchCount", st.MatchCount), 612 sglog.Int("stat.NgramMatches", st.NgramMatches), 613 sglog.Int("stat.NgramLookups", st.NgramLookups), 614 sglog.Duration("stat.Wait", st.Wait), 615 sglog.Duration("stat.MatchTreeConstruction", st.MatchTreeConstruction), 616 sglog.Duration("stat.MatchTreeSearch", st.MatchTreeSearch), 617 sglog.Int("stat.RegexpsConsidered", st.RegexpsConsidered), 618 sglog.String("stat.FlushReason", st.FlushReason.String()), 619 ) 620} 621 622func traceContext(ctx context.Context) sglog.TraceContext { 623 otSpan := opentracing.SpanFromContext(ctx) 624 if otSpan != nil { 625 if jaegerSpan, ok := otSpan.Context().(jaeger.SpanContext); ok { 626 return sglog.TraceContext{ 627 TraceID: jaegerSpan.TraceID().String(), 628 SpanID: jaegerSpan.SpanID().String(), 629 } 630 } 631 } 632 633 if otelSpan := oteltrace.SpanFromContext(ctx).SpanContext(); otelSpan.IsValid() { 634 return sglog.TraceContext{ 635 TraceID: otelSpan.TraceID().String(), 636 SpanID: otelSpan.SpanID().String(), 637 } 638 } 639 640 return sglog.TraceContext{} 641} 642 643func newGRPCServer(logger sglog.Logger, streamer zoekt.Streamer, additionalOpts ...grpc.ServerOption) *grpc.Server { 644 metrics := serverMetricsOnce() 645 646 opts := []grpc.ServerOption{ 647 grpc.ChainStreamInterceptor( 648 propagator.StreamServerPropagator(tenant.Propagator{}), 649 tenant.StreamServerInterceptor, 650 otelgrpc.StreamServerInterceptor(), 651 metrics.StreamServerInterceptor(), 652 messagesize.StreamServerInterceptor, 653 internalerrs.LoggingStreamServerInterceptor(logger), 654 ), 655 grpc.ChainUnaryInterceptor( 656 propagator.UnaryServerPropagator(tenant.Propagator{}), 657 tenant.UnaryServerInterceptor, 658 otelgrpc.UnaryServerInterceptor(), 659 metrics.UnaryServerInterceptor(), 660 messagesize.UnaryServerInterceptor, 661 internalerrs.LoggingUnaryServerInterceptor(logger), 662 ), 663 } 664 665 opts = append(opts, additionalOpts...) 666 667 // Ensure that the message size options are set last, so they override any other 668 // server-specific options that tweak the message size. 669 // 670 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 671 // take precedence over everything else with a uniform size setting that's easy to reason about. 672 opts = append(opts, messagesize.MustGetServerMessageSizeFromEnv()...) 673 674 s := grpc.NewServer(opts...) 675 proto.RegisterWebserverServiceServer(s, zoektgrpc.NewServer(streamer)) 676 677 return s 678} 679 680var ( 681 metricWatchdogErrors = promauto.NewGauge(prometheus.GaugeOpts{ 682 Name: "zoekt_webserver_watchdog_errors", 683 Help: "The current error count for zoekt watchdog.", 684 }) 685 metricWatchdogTotal = promauto.NewCounter(prometheus.CounterOpts{ 686 Name: "zoekt_webserver_watchdog_total", 687 Help: "The total number of requests done by zoekt watchdog.", 688 }) 689 metricWatchdogErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{ 690 Name: "zoekt_webserver_watchdog_errors_total", 691 Help: "The total number of errors from zoekt watchdog.", 692 }) 693 metricSearchRequestsTotal = promauto.NewCounter(prometheus.CounterOpts{ 694 Name: "zoekt_search_requests_total", 695 Help: "The total number of search requests that zoekt received", 696 }) 697 698 // serviceMetricsOnce returns a singleton instance of the server metrics 699 // that are shared across all gRPC servers that this process creates. 700 // 701 // This function panics if the metrics cannot be registered with the default 702 // Prometheus registry. 703 serverMetricsOnce = sync.OnceValue(func() *grpcprom.ServerMetrics { 704 serverMetrics := grpcprom.NewServerMetrics( 705 grpcprom.WithServerCounterOptions(), 706 grpcprom.WithServerHandlingTimeHistogram(), // record the overall response latency for a gRPC request) 707 ) 708 prometheus.DefaultRegisterer.MustRegister(serverMetrics) 709 return serverMetrics 710 }) 711)