fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Command zoekt-webserver responds to search queries, using an index generated 16// by another program such as zoekt-indexserver. 17 18package main 19 20import ( 21 "context" 22 "crypto/tls" 23 "errors" 24 "flag" 25 "fmt" 26 "html/template" 27 "io" 28 "log" 29 "net" 30 "net/http" 31 "net/http/httputil" 32 "net/url" 33 "os" 34 "os/signal" 35 "path/filepath" 36 "runtime" 37 "strconv" 38 "strings" 39 "sync" 40 "time" 41 42 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 43 "github.com/sourcegraph/mountinfo" 44 zoektgrpc "github.com/sourcegraph/zoekt/cmd/zoekt-webserver/grpc/server" 45 "github.com/sourcegraph/zoekt/grpc/internalerrs" 46 "github.com/sourcegraph/zoekt/grpc/messagesize" 47 proto "github.com/sourcegraph/zoekt/grpc/protos/zoekt/webserver/v1" 48 "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" 49 "golang.org/x/net/http2" 50 "golang.org/x/net/http2/h2c" 51 "google.golang.org/grpc" 52 53 "github.com/sourcegraph/zoekt" 54 "github.com/sourcegraph/zoekt/build" 55 "github.com/sourcegraph/zoekt/debugserver" 56 "github.com/sourcegraph/zoekt/internal/profiler" 57 "github.com/sourcegraph/zoekt/internal/tracer" 58 "github.com/sourcegraph/zoekt/query" 59 "github.com/sourcegraph/zoekt/shards" 60 "github.com/sourcegraph/zoekt/trace" 61 "github.com/sourcegraph/zoekt/web" 62 63 "github.com/opentracing/opentracing-go" 64 "github.com/prometheus/client_golang/prometheus" 65 "github.com/prometheus/client_golang/prometheus/promauto" 66 "github.com/shirou/gopsutil/v3/disk" 67 sglog "github.com/sourcegraph/log" 68 "github.com/uber/jaeger-client-go" 69 oteltrace "go.opentelemetry.io/otel/trace" 70 "go.uber.org/automaxprocs/maxprocs" 71) 72 73const logFormat = "2006-01-02T15-04-05.999999999Z07" 74 75func divertLogs(dir string, interval time.Duration) { 76 t := time.NewTicker(interval) 77 var last *os.File 78 for { 79 nm := filepath.Join(dir, fmt.Sprintf("zoekt-webserver.%s.%d.log", time.Now().Format(logFormat), os.Getpid())) 80 fmt.Fprintf(os.Stderr, "writing logs to %s\n", nm) 81 82 f, err := os.Create(nm) 83 if err != nil { 84 // There is not much we can do now. 85 fmt.Fprintf(os.Stderr, "can't create output file %s: %v\n", nm, err) 86 os.Exit(2) 87 } 88 89 log.SetOutput(f) 90 last.Close() 91 92 last = f 93 94 <-t.C 95 } 96} 97 98const templateExtension = ".html.tpl" 99 100func loadTemplates(tpl *template.Template, dir string) error { 101 fs, err := filepath.Glob(dir + "/*" + templateExtension) 102 if err != nil { 103 log.Fatalf("Glob: %v", err) 104 } 105 106 log.Printf("loading templates: %v", fs) 107 for _, fn := range fs { 108 content, err := os.ReadFile(fn) 109 if err != nil { 110 return err 111 } 112 113 base := filepath.Base(fn) 114 base = strings.TrimSuffix(base, templateExtension) 115 if _, err := tpl.New(base).Parse(string(content)); err != nil { 116 return fmt.Errorf("template.Parse(%s): %v", fn, err) 117 } 118 } 119 return nil 120} 121 122func writeTemplates(dir string) error { 123 if dir == "" { 124 return fmt.Errorf("must set --template_dir") 125 } 126 127 for k, v := range web.TemplateText { 128 nm := filepath.Join(dir, k+templateExtension) 129 if err := os.WriteFile(nm, []byte(v), 0o644); err != nil { 130 return err 131 } 132 } 133 return nil 134} 135 136func main() { 137 logDir := flag.String("log_dir", "", "log to this directory rather than stderr.") 138 logRefresh := flag.Duration("log_refresh", 24*time.Hour, "if using --log_dir, start writing a new file this often.") 139 140 listen := flag.String("listen", ":6070", "listen on this address.") 141 index := flag.String("index", build.DefaultDir, "set index directory to use") 142 html := flag.Bool("html", true, "enable HTML interface") 143 enableRPC := flag.Bool("rpc", false, "enable go/net RPC") 144 enableIndexserverProxy := flag.Bool("indexserver_proxy", false, "proxy requests with URLs matching the path /indexserver/ to <index>/indexserver.sock") 145 print := flag.Bool("print", false, "enable local result URLs") 146 enablePprof := flag.Bool("pprof", false, "set to enable remote profiling.") 147 sslCert := flag.String("ssl_cert", "", "set path to SSL .pem holding certificate.") 148 sslKey := flag.String("ssl_key", "", "set path to SSL .pem holding key.") 149 hostCustomization := flag.String( 150 "host_customization", "", 151 "specify host customization, as HOST1=QUERY,HOST2=QUERY") 152 153 templateDir := flag.String("template_dir", "", "set directory from which to load custom .html.tpl template files") 154 dumpTemplates := flag.Bool("dump_templates", false, "dump templates into --template_dir and exit.") 155 version := flag.Bool("version", false, "Print version number") 156 157 flag.Parse() 158 159 if *version { 160 fmt.Printf("zoekt-webserver version %q\n", zoekt.Version) 161 os.Exit(0) 162 } 163 164 if *dumpTemplates { 165 if err := writeTemplates(*templateDir); err != nil { 166 log.Fatal(err) 167 } 168 os.Exit(0) 169 } 170 171 resource := sglog.Resource{ 172 Name: "zoekt-webserver", 173 Version: zoekt.Version, 174 InstanceID: zoekt.HostnameBestEffort(), 175 } 176 177 liblog := sglog.Init(resource) 178 defer liblog.Sync() 179 tracer.Init(resource) 180 profiler.Init("zoekt-webserver", zoekt.Version, -1) 181 182 if *logDir != "" { 183 if fi, err := os.Lstat(*logDir); err != nil || !fi.IsDir() { 184 log.Fatalf("%s is not a directory", *logDir) 185 } 186 // We could do fdup acrobatics to also redirect 187 // stderr, but it is simpler and more portable for the 188 // caller to divert stderr output if necessary. 189 go divertLogs(*logDir, *logRefresh) 190 } 191 192 // Tune GOMAXPROCS to match Linux container CPU quota. 193 _, _ = maxprocs.Set() 194 195 if err := os.MkdirAll(*index, 0o755); err != nil { 196 log.Fatal(err) 197 } 198 199 mustRegisterDiskMonitor(*index) 200 201 metricsLogger := sglog.Scoped("metricsRegistration") 202 203 mustRegisterMemoryMapMetrics(metricsLogger) 204 205 opts := mountinfo.CollectorOpts{Namespace: "zoekt_webserver"} 206 c := mountinfo.NewCollector(metricsLogger, opts, map[string]string{"indexDir": *index}) 207 208 prometheus.DefaultRegisterer.MustRegister(c) 209 210 // Do not block on loading shards so we can become partially available 211 // sooner. Otherwise on large instances zoekt can be unavailable on the 212 // order of minutes. 213 searcher, err := shards.NewDirectorySearcherFast(*index) 214 if err != nil { 215 log.Fatal(err) 216 } 217 218 searcher = &loggedSearcher{ 219 Streamer: searcher, 220 Logger: sglog.Scoped("searcher"), 221 } 222 223 s := &web.Server{ 224 Searcher: searcher, 225 Top: web.Top, 226 Version: zoekt.Version, 227 } 228 229 if *templateDir != "" { 230 if err := loadTemplates(s.Top, *templateDir); err != nil { 231 log.Fatalf("loadTemplates: %v", err) 232 } 233 } 234 235 s.Print = *print 236 s.HTML = *html 237 s.RPC = *enableRPC 238 239 if *hostCustomization != "" { 240 s.HostCustomQueries = map[string]string{} 241 for _, h := range strings.SplitN(*hostCustomization, ",", -1) { 242 if len(h) == 0 { 243 continue 244 } 245 fields := strings.SplitN(h, "=", 2) 246 if len(fields) < 2 { 247 log.Fatalf("invalid host_customization %q", h) 248 } 249 250 s.HostCustomQueries[fields[0]] = fields[1] 251 } 252 } 253 254 serveMux, err := web.NewMux(s) 255 if err != nil { 256 log.Fatal(err) 257 } 258 259 debugserver.AddHandlers(serveMux, *enablePprof) 260 261 if *enableIndexserverProxy { 262 socket := filepath.Join(*index, "indexserver.sock") 263 sglog.Scoped("server").Info("adding reverse proxy", sglog.String("socket", socket)) 264 addProxyHandler(serveMux, socket) 265 } 266 267 handler := trace.Middleware(serveMux) 268 269 // Sourcegraph: We use environment variables to configure watchdog since 270 // they are more convenient than flags in containerized environments. 271 watchdogTick := 30 * time.Second 272 if v := os.Getenv("ZOEKT_WATCHDOG_TICK"); v != "" { 273 watchdogTick, _ = time.ParseDuration(v) 274 log.Printf("custom ZOEKT_WATCHDOG_TICK=%v", watchdogTick) 275 } 276 277 watchdogErrCount := 3 278 if v := os.Getenv("ZOEKT_WATCHDOG_ERRORS"); v != "" { 279 watchdogErrCount, _ = strconv.Atoi(v) 280 log.Printf("custom ZOEKT_WATCHDOG_ERRORS=%d", watchdogErrCount) 281 } 282 283 watchdogAddr := "http://" + *listen 284 if *sslCert != "" || *sslKey != "" { 285 watchdogAddr = "https://" + *listen 286 } 287 watchdogAddr += "/healthz" 288 289 if watchdogErrCount > 0 && watchdogTick > 0 { 290 go watchdog(watchdogTick, watchdogErrCount, watchdogAddr) 291 } else { 292 log.Println("watchdog disabled") 293 } 294 295 logger := sglog.Scoped("ZoektWebserverGRPCServer") 296 297 streamer := web.NewTraceAwareSearcher(s.Searcher) 298 grpcServer := newGRPCServer(logger, streamer) 299 300 handler = multiplexGRPC(grpcServer, handler) 301 302 srv := &http.Server{ 303 Addr: *listen, 304 Handler: handler, 305 } 306 307 go func() { 308 sglog.Scoped("server").Info("starting server", sglog.Stringp("address", listen)) 309 var err error 310 if *sslCert != "" || *sslKey != "" { 311 err = srv.ListenAndServeTLS(*sslCert, *sslKey) 312 } else { 313 err = srv.ListenAndServe() 314 } 315 316 if err != http.ErrServerClosed { 317 // Fatal otherwise shutdownOnSignal will block 318 log.Fatalf("ListenAndServe: %v", err) 319 } 320 }() 321 322 if s.RPC { 323 // Our RPC system does not support shutdown and hijacks the underlying 324 // http connection. This means shutdown is ineffective and just waits 10s 325 // before calling close. Lets just quit faster in that case. 326 if err := closeOnSignal(srv); err != nil { 327 log.Fatalf("http.Server.Close: %v", err) 328 } 329 } else { 330 if err := shutdownOnSignal(srv); err != nil { 331 log.Fatalf("http.Server.Shutdown: %v", err) 332 } 333 } 334} 335 336// multiplexGRPC takes a gRPC server and a plain HTTP handler and multiplexes the 337// request handling. Any requests that declare themselves as gRPC requests are routed 338// to the gRPC server, all others are routed to the httpHandler. 339func multiplexGRPC(grpcServer *grpc.Server, httpHandler http.Handler) http.Handler { 340 newHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 341 if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") { 342 grpcServer.ServeHTTP(w, r) 343 } else { 344 httpHandler.ServeHTTP(w, r) 345 } 346 }) 347 348 // Until we enable TLS, we need to fall back to the h2c protocol, which is 349 // basically HTTP2 without TLS. The standard library does not implement the 350 // h2s protocol, so this hijacks h2s requests and handles them correctly. 351 return h2c.NewHandler(newHandler, &http2.Server{}) 352} 353 354// addProxyHandler adds a handler to "mux" that proxies all requests with base 355// /indexserver to "socket". 356func addProxyHandler(mux *http.ServeMux, socket string) { 357 proxy := httputil.NewSingleHostReverseProxy(&url.URL{ 358 Scheme: "http", 359 // The value of "Host" is arbitrary, because it is ignored by the 360 // DialContext we use for the socket connection. 361 Host: "socket", 362 }) 363 proxy.Transport = &http.Transport{ 364 DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { 365 var d net.Dialer 366 return d.DialContext(ctx, "unix", socket) 367 }, 368 } 369 mux.Handle("/indexserver/", http.StripPrefix("/indexserver/", http.HandlerFunc(proxy.ServeHTTP))) 370} 371 372// shutdownSignalChan returns a channel which is listening for shutdown 373// signals from the operating system. maxReads is an upper bound on how many 374// times you will read the channel (used as buffer for signal.Notify). 375func shutdownSignalChan(maxReads int) <-chan os.Signal { 376 c := make(chan os.Signal, maxReads) 377 signal.Notify(c, os.Interrupt) // terminal C-c and goreman 378 signal.Notify(c, PLATFORM_SIGTERM) // Kubernetes 379 return c 380} 381 382// closeOnSignal will listen for SIGINT or SIGTERM and call srv.Close. This is 383// not a graceful shutdown, see shutdownOnSignal. 384func closeOnSignal(srv *http.Server) error { 385 c := shutdownSignalChan(1) 386 <-c 387 388 return srv.Close() 389} 390 391// shutdownOnSignal will listen for SIGINT or SIGTERM and call srv.Shutdown. 392// Note it doesn't call anything else for shutting down. Notably our RPC 393// framework doesn't allow us to drain connections, so when Shutdown we will 394// wait 10s before closing. 395// 396// Note: the call site for shutdownOnSignal should use closeOnSignal instead 397// if rpc mode is enabled due to the above limitation. 398func shutdownOnSignal(srv *http.Server) error { 399 c := shutdownSignalChan(2) 400 <-c 401 402 // If we receive another signal, immediate shutdown 403 ctx, cancel := context.WithCancel(context.Background()) 404 defer cancel() 405 go func() { 406 select { 407 case <-ctx.Done(): 408 case sig := <-c: 409 log.Printf("received another signal (%v), immediate shutdown", sig) 410 cancel() 411 } 412 }() 413 414 // Wait for 10s to drain ongoing requests. Kubernetes gives us 30s to 415 // shutdown, we have already used 15s waiting for our endpoint removal to 416 // propagate. 417 ctx, cancel2 := context.WithTimeout(ctx, 10*time.Second) 418 defer cancel2() 419 420 log.Printf("shutting down") 421 return srv.Shutdown(ctx) 422} 423 424func watchdogOnce(ctx context.Context, client *http.Client, addr string) error { 425 defer metricWatchdogTotal.Inc() 426 427 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(30*time.Second)) 428 defer cancel() 429 430 req, err := http.NewRequest("GET", addr, nil) 431 if err != nil { 432 return err 433 } 434 435 req = req.WithContext(ctx) 436 437 resp, err := client.Do(req) 438 if err != nil { 439 return err 440 } 441 body, _ := io.ReadAll(resp.Body) 442 _ = resp.Body.Close() 443 444 if resp.StatusCode != http.StatusOK { 445 return fmt.Errorf("watchdog: status=%v body=%q", resp.StatusCode, string(body)) 446 } 447 return nil 448} 449 450func watchdog(dt time.Duration, maxErrCount int, addr string) { 451 tr := &http.Transport{ 452 TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, 453 } 454 client := &http.Client{ 455 Transport: tr, 456 } 457 tick := time.NewTicker(dt) 458 459 errCount := 0 460 for range tick.C { 461 err := watchdogOnce(context.Background(), client, addr) 462 if err != nil { 463 errCount++ 464 metricWatchdogErrors.Set(float64(errCount)) 465 metricWatchdogErrorsTotal.Inc() 466 if errCount >= maxErrCount { 467 log.Printf(`watchdog health check has consecutively failed %d times indicating is likely an unrecoverable error affecting zoekt. As such this process will exit with code 3. 468 469Final error: %v 470 471Possible remediations: 472- If this rarely happens, ignore and let your process manager restart zoekt. 473- Possibly under provisioned. Try increasing CPU or disk IO. 474- A bug. Reach out with logs and screenshots of metrics when this occurs.`, errCount, err) 475 os.Exit(3) 476 } else { 477 log.Printf("watchdog: failed, will try %d more times: %v", maxErrCount-errCount, err) 478 } 479 } else if errCount > 0 { 480 errCount = 0 481 metricWatchdogErrors.Set(float64(errCount)) 482 log.Printf("watchdog: success, resetting error count") 483 } 484 } 485} 486 487func diskUsage(path string) (*disk.UsageStat, error) { 488 duPath := path 489 if runtime.GOOS == "windows" { 490 duPath = filepath.VolumeName(duPath) 491 } 492 usage, err := disk.Usage(duPath) 493 if err != nil { 494 return nil, fmt.Errorf("diskUsage: %w", err) 495 } 496 return usage, err 497} 498 499func mustRegisterDiskMonitor(path string) { 500 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ 501 Name: "src_disk_space_available_bytes", 502 Help: "Amount of free space disk space.", 503 ConstLabels: prometheus.Labels{"path": path}, 504 }, func() float64 { 505 // I know there is no error handling here, and I don't like it 506 // but there was no error handling in the previous version 507 // that used Statfs, either, so I'm assuming there's no need for it 508 usage, _ := diskUsage(path) 509 return float64(usage.Free) 510 })) 511 512 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ 513 Name: "src_disk_space_total_bytes", 514 Help: "Amount of total disk space.", 515 ConstLabels: prometheus.Labels{"path": path}, 516 }, func() float64 { 517 // I know there is no error handling here, and I don't like it 518 // but there was no error handling in the previous version 519 // that used Statfs, either, so I'm assuming there's no need for it 520 usage, _ := diskUsage(path) 521 return float64(usage.Total) 522 })) 523} 524 525type loggedSearcher struct { 526 zoekt.Streamer 527 Logger sglog.Logger 528} 529 530func (s *loggedSearcher) Search( 531 ctx context.Context, 532 q query.Q, 533 opts *zoekt.SearchOptions, 534) (sr *zoekt.SearchResult, err error) { 535 defer func() { 536 var stats *zoekt.Stats 537 if sr != nil { 538 stats = &sr.Stats 539 } 540 s.log(ctx, q, opts, stats, err) 541 }() 542 543 metricSearchRequestsTotal.Inc() 544 return s.Streamer.Search(ctx, q, opts) 545} 546 547func (s *loggedSearcher) StreamSearch( 548 ctx context.Context, 549 q query.Q, 550 opts *zoekt.SearchOptions, 551 sender zoekt.Sender, 552) error { 553 var stats zoekt.Stats 554 555 metricSearchRequestsTotal.Inc() 556 err := s.Streamer.StreamSearch(ctx, q, opts, zoekt.SenderFunc(func(event *zoekt.SearchResult) { 557 stats.Add(event.Stats) 558 sender.Send(event) 559 })) 560 561 s.log(ctx, q, opts, &stats, err) 562 563 return err 564} 565 566func (s *loggedSearcher) log(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, st *zoekt.Stats, err error) { 567 logger := s.Logger. 568 WithTrace(traceContext(ctx)). 569 With( 570 sglog.String("query", q.String()), 571 sglog.Bool("opts.EstimateDocCount", opts.EstimateDocCount), 572 sglog.Bool("opts.Whole", opts.Whole), 573 sglog.Int("opts.ShardMaxMatchCount", opts.ShardMaxMatchCount), 574 sglog.Int("opts.TotalMaxMatchCount", opts.TotalMaxMatchCount), 575 sglog.Duration("opts.MaxWallTime", opts.MaxWallTime), 576 sglog.Int("opts.MaxDocDisplayCount", opts.MaxDocDisplayCount), 577 sglog.Int("opts.MaxMatchDisplayCount", opts.MaxMatchDisplayCount), 578 ) 579 580 if err != nil { 581 switch { 582 case errors.Is(err, context.Canceled): 583 logger.Warn("search canceled", sglog.Error(err)) 584 case errors.Is(err, context.DeadlineExceeded): 585 logger.Warn("search timeout", sglog.Error(err)) 586 default: 587 logger.Error("search failed", sglog.Error(err)) 588 } 589 return 590 } 591 592 if st == nil { 593 return 594 } 595 596 logger.Debug("search", 597 sglog.Int64("stat.ContentBytesLoaded", st.ContentBytesLoaded), 598 sglog.Int64("stat.IndexBytesLoaded", st.IndexBytesLoaded), 599 sglog.Int("stat.Crashes", st.Crashes), 600 sglog.Duration("stat.Duration", st.Duration), 601 sglog.Int("stat.FileCount", st.FileCount), 602 sglog.Int("stat.ShardFilesConsidered", st.ShardFilesConsidered), 603 sglog.Int("stat.FilesConsidered", st.FilesConsidered), 604 sglog.Int("stat.FilesLoaded", st.FilesLoaded), 605 sglog.Int("stat.FilesSkipped", st.FilesSkipped), 606 sglog.Int("stat.ShardsScanned", st.ShardsScanned), 607 sglog.Int("stat.ShardsSkipped", st.ShardsSkipped), 608 sglog.Int("stat.ShardsSkippedFilter", st.ShardsSkippedFilter), 609 sglog.Int("stat.MatchCount", st.MatchCount), 610 sglog.Int("stat.NgramMatches", st.NgramMatches), 611 sglog.Int("stat.NgramLookups", st.NgramLookups), 612 sglog.Duration("stat.Wait", st.Wait), 613 sglog.Duration("stat.MatchTreeConstruction", st.MatchTreeConstruction), 614 sglog.Duration("stat.MatchTreeSearch", st.MatchTreeSearch), 615 sglog.Int("stat.RegexpsConsidered", st.RegexpsConsidered), 616 sglog.String("stat.FlushReason", st.FlushReason.String()), 617 ) 618} 619 620func traceContext(ctx context.Context) sglog.TraceContext { 621 otSpan := opentracing.SpanFromContext(ctx) 622 if otSpan != nil { 623 if jaegerSpan, ok := otSpan.Context().(jaeger.SpanContext); ok { 624 return sglog.TraceContext{ 625 TraceID: jaegerSpan.TraceID().String(), 626 SpanID: jaegerSpan.SpanID().String(), 627 } 628 } 629 } 630 631 if otelSpan := oteltrace.SpanFromContext(ctx).SpanContext(); otelSpan.IsValid() { 632 return sglog.TraceContext{ 633 TraceID: otelSpan.TraceID().String(), 634 SpanID: otelSpan.SpanID().String(), 635 } 636 } 637 638 return sglog.TraceContext{} 639} 640 641func newGRPCServer(logger sglog.Logger, streamer zoekt.Streamer, additionalOpts ...grpc.ServerOption) *grpc.Server { 642 metrics := mustGetServerMetrics() 643 644 opts := []grpc.ServerOption{ 645 grpc.ChainStreamInterceptor( 646 otelgrpc.StreamServerInterceptor(), 647 metrics.StreamServerInterceptor(), 648 messagesize.StreamServerInterceptor, 649 internalerrs.LoggingStreamServerInterceptor(logger), 650 ), 651 grpc.ChainUnaryInterceptor( 652 otelgrpc.UnaryServerInterceptor(), 653 metrics.UnaryServerInterceptor(), 654 messagesize.UnaryServerInterceptor, 655 internalerrs.LoggingUnaryServerInterceptor(logger), 656 ), 657 } 658 659 opts = append(opts, additionalOpts...) 660 661 // Ensure that the message size options are set last, so they override any other 662 // server-specific options that tweak the message size. 663 // 664 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 665 // take precedence over everything else with a uniform size setting that's easy to reason about. 666 opts = append(opts, messagesize.MustGetServerMessageSizeFromEnv()...) 667 668 s := grpc.NewServer(opts...) 669 proto.RegisterWebserverServiceServer(s, zoektgrpc.NewServer(streamer)) 670 671 return s 672} 673 674var ( 675 metricWatchdogErrors = promauto.NewGauge(prometheus.GaugeOpts{ 676 Name: "zoekt_webserver_watchdog_errors", 677 Help: "The current error count for zoekt watchdog.", 678 }) 679 metricWatchdogTotal = promauto.NewCounter(prometheus.CounterOpts{ 680 Name: "zoekt_webserver_watchdog_total", 681 Help: "The total number of requests done by zoekt watchdog.", 682 }) 683 metricWatchdogErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{ 684 Name: "zoekt_webserver_watchdog_errors_total", 685 Help: "The total number of errors from zoekt watchdog.", 686 }) 687 metricSearchRequestsTotal = promauto.NewCounter(prometheus.CounterOpts{ 688 Name: "zoekt_search_requests_total", 689 Help: "The total number of search requests that zoekt received", 690 }) 691 692 serverMetricsOnce sync.Once 693 serverMetrics *grpcprom.ServerMetrics 694) 695 696// mustGetServerMetrics returns a singleton instance of the server metrics 697// that are shared across all gRPC servers that this process creates. 698// 699// This function panics if the metrics cannot be registered with the default 700// Prometheus registry. 701func mustGetServerMetrics() *grpcprom.ServerMetrics { 702 serverMetricsOnce.Do(func() { 703 serverMetrics = grpcprom.NewServerMetrics( 704 grpcprom.WithServerCounterOptions(), 705 grpcprom.WithServerHandlingTimeHistogram(), // record the overall response latency for a gRPC request) 706 ) 707 708 prometheus.DefaultRegisterer.MustRegister(serverMetrics) 709 }) 710 711 return serverMetrics 712}