fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Command zoekt-webserver responds to search queries, using an index generated 16// by another program such as zoekt-indexserver. 17 18package main 19 20import ( 21 "context" 22 "crypto/tls" 23 "errors" 24 "flag" 25 "fmt" 26 "html/template" 27 "io" 28 "log" 29 "net" 30 "net/http" 31 "net/http/httputil" 32 "net/url" 33 "os" 34 "os/signal" 35 "path/filepath" 36 "runtime" 37 "strconv" 38 "strings" 39 "sync" 40 "time" 41 42 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 43 "github.com/sourcegraph/mountinfo" 44 zoektgrpc "github.com/sourcegraph/zoekt/cmd/zoekt-webserver/grpc/server" 45 "github.com/sourcegraph/zoekt/grpc/internalerrs" 46 "github.com/sourcegraph/zoekt/grpc/messagesize" 47 proto "github.com/sourcegraph/zoekt/grpc/protos/zoekt/webserver/v1" 48 "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" 49 "golang.org/x/net/http2" 50 "golang.org/x/net/http2/h2c" 51 "google.golang.org/grpc" 52 53 "github.com/sourcegraph/zoekt" 54 "github.com/sourcegraph/zoekt/build" 55 "github.com/sourcegraph/zoekt/debugserver" 56 "github.com/sourcegraph/zoekt/internal/profiler" 57 "github.com/sourcegraph/zoekt/internal/tracer" 58 "github.com/sourcegraph/zoekt/query" 59 "github.com/sourcegraph/zoekt/shards" 60 "github.com/sourcegraph/zoekt/stream" 61 "github.com/sourcegraph/zoekt/trace" 62 "github.com/sourcegraph/zoekt/web" 63 64 "github.com/opentracing/opentracing-go" 65 "github.com/prometheus/client_golang/prometheus" 66 "github.com/prometheus/client_golang/prometheus/promauto" 67 "github.com/shirou/gopsutil/v3/disk" 68 sglog "github.com/sourcegraph/log" 69 "github.com/uber/jaeger-client-go" 70 oteltrace "go.opentelemetry.io/otel/trace" 71 "go.uber.org/automaxprocs/maxprocs" 72) 73 74const logFormat = "2006-01-02T15-04-05.999999999Z07" 75 76func divertLogs(dir string, interval time.Duration) { 77 t := time.NewTicker(interval) 78 var last *os.File 79 for { 80 nm := filepath.Join(dir, fmt.Sprintf("zoekt-webserver.%s.%d.log", time.Now().Format(logFormat), os.Getpid())) 81 fmt.Fprintf(os.Stderr, "writing logs to %s\n", nm) 82 83 f, err := os.Create(nm) 84 if err != nil { 85 // There is not much we can do now. 86 fmt.Fprintf(os.Stderr, "can't create output file %s: %v\n", nm, err) 87 os.Exit(2) 88 } 89 90 log.SetOutput(f) 91 last.Close() 92 93 last = f 94 95 <-t.C 96 } 97} 98 99const templateExtension = ".html.tpl" 100 101func loadTemplates(tpl *template.Template, dir string) error { 102 fs, err := filepath.Glob(dir + "/*" + templateExtension) 103 if err != nil { 104 log.Fatalf("Glob: %v", err) 105 } 106 107 log.Printf("loading templates: %v", fs) 108 for _, fn := range fs { 109 content, err := os.ReadFile(fn) 110 if err != nil { 111 return err 112 } 113 114 base := filepath.Base(fn) 115 base = strings.TrimSuffix(base, templateExtension) 116 if _, err := tpl.New(base).Parse(string(content)); err != nil { 117 return fmt.Errorf("template.Parse(%s): %v", fn, err) 118 } 119 } 120 return nil 121} 122 123func writeTemplates(dir string) error { 124 if dir == "" { 125 return fmt.Errorf("must set --template_dir") 126 } 127 128 for k, v := range web.TemplateText { 129 nm := filepath.Join(dir, k+templateExtension) 130 if err := os.WriteFile(nm, []byte(v), 0o644); err != nil { 131 return err 132 } 133 } 134 return nil 135} 136 137func main() { 138 logDir := flag.String("log_dir", "", "log to this directory rather than stderr.") 139 logRefresh := flag.Duration("log_refresh", 24*time.Hour, "if using --log_dir, start writing a new file this often.") 140 141 listen := flag.String("listen", ":6070", "listen on this address.") 142 index := flag.String("index", build.DefaultDir, "set index directory to use") 143 html := flag.Bool("html", true, "enable HTML interface") 144 enableRPC := flag.Bool("rpc", false, "enable go/net RPC") 145 enableIndexserverProxy := flag.Bool("indexserver_proxy", false, "proxy requests with URLs matching the path /indexserver/ to <index>/indexserver.sock") 146 print := flag.Bool("print", false, "enable local result URLs") 147 enablePprof := flag.Bool("pprof", false, "set to enable remote profiling.") 148 sslCert := flag.String("ssl_cert", "", "set path to SSL .pem holding certificate.") 149 sslKey := flag.String("ssl_key", "", "set path to SSL .pem holding key.") 150 hostCustomization := flag.String( 151 "host_customization", "", 152 "specify host customization, as HOST1=QUERY,HOST2=QUERY") 153 154 templateDir := flag.String("template_dir", "", "set directory from which to load custom .html.tpl template files") 155 dumpTemplates := flag.Bool("dump_templates", false, "dump templates into --template_dir and exit.") 156 version := flag.Bool("version", false, "Print version number") 157 158 flag.Parse() 159 160 if *version { 161 fmt.Printf("zoekt-webserver version %q\n", zoekt.Version) 162 os.Exit(0) 163 } 164 165 if *dumpTemplates { 166 if err := writeTemplates(*templateDir); err != nil { 167 log.Fatal(err) 168 } 169 os.Exit(0) 170 } 171 172 resource := sglog.Resource{ 173 Name: "zoekt-webserver", 174 Version: zoekt.Version, 175 InstanceID: zoekt.HostnameBestEffort(), 176 } 177 178 liblog := sglog.Init(resource) 179 defer liblog.Sync() 180 tracer.Init(resource) 181 profiler.Init("zoekt-webserver", zoekt.Version, -1) 182 183 if *logDir != "" { 184 if fi, err := os.Lstat(*logDir); err != nil || !fi.IsDir() { 185 log.Fatalf("%s is not a directory", *logDir) 186 } 187 // We could do fdup acrobatics to also redirect 188 // stderr, but it is simpler and more portable for the 189 // caller to divert stderr output if necessary. 190 go divertLogs(*logDir, *logRefresh) 191 } 192 193 // Tune GOMAXPROCS to match Linux container CPU quota. 194 _, _ = maxprocs.Set() 195 196 if err := os.MkdirAll(*index, 0o755); err != nil { 197 log.Fatal(err) 198 } 199 200 mustRegisterDiskMonitor(*index) 201 202 metricsLogger := sglog.Scoped("metricsRegistration") 203 204 mustRegisterMemoryMapMetrics(metricsLogger) 205 206 opts := mountinfo.CollectorOpts{Namespace: "zoekt_webserver"} 207 c := mountinfo.NewCollector(metricsLogger, opts, map[string]string{"indexDir": *index}) 208 209 prometheus.DefaultRegisterer.MustRegister(c) 210 211 // Do not block on loading shards so we can become partially available 212 // sooner. Otherwise on large instances zoekt can be unavailable on the 213 // order of minutes. 214 searcher, err := shards.NewDirectorySearcherFast(*index) 215 if err != nil { 216 log.Fatal(err) 217 } 218 219 searcher = &loggedSearcher{ 220 Streamer: searcher, 221 Logger: sglog.Scoped("searcher"), 222 } 223 224 s := &web.Server{ 225 Searcher: searcher, 226 Top: web.Top, 227 Version: zoekt.Version, 228 } 229 230 if *templateDir != "" { 231 if err := loadTemplates(s.Top, *templateDir); err != nil { 232 log.Fatalf("loadTemplates: %v", err) 233 } 234 } 235 236 s.Print = *print 237 s.HTML = *html 238 s.RPC = *enableRPC 239 240 if *hostCustomization != "" { 241 s.HostCustomQueries = map[string]string{} 242 for _, h := range strings.SplitN(*hostCustomization, ",", -1) { 243 if len(h) == 0 { 244 continue 245 } 246 fields := strings.SplitN(h, "=", 2) 247 if len(fields) < 2 { 248 log.Fatalf("invalid host_customization %q", h) 249 } 250 251 s.HostCustomQueries[fields[0]] = fields[1] 252 } 253 } 254 255 serveMux, err := web.NewMux(s) 256 if err != nil { 257 log.Fatal(err) 258 } 259 260 debugserver.AddHandlers(serveMux, *enablePprof) 261 262 if *enableIndexserverProxy { 263 socket := filepath.Join(*index, "indexserver.sock") 264 sglog.Scoped("server").Info("adding reverse proxy", sglog.String("socket", socket)) 265 addProxyHandler(serveMux, socket) 266 } 267 268 handler := trace.Middleware(serveMux) 269 270 // Sourcegraph: We use environment variables to configure watchdog since 271 // they are more convenient than flags in containerized environments. 272 watchdogTick := 30 * time.Second 273 if v := os.Getenv("ZOEKT_WATCHDOG_TICK"); v != "" { 274 watchdogTick, _ = time.ParseDuration(v) 275 log.Printf("custom ZOEKT_WATCHDOG_TICK=%v", watchdogTick) 276 } 277 278 watchdogErrCount := 3 279 if v := os.Getenv("ZOEKT_WATCHDOG_ERRORS"); v != "" { 280 watchdogErrCount, _ = strconv.Atoi(v) 281 log.Printf("custom ZOEKT_WATCHDOG_ERRORS=%d", watchdogErrCount) 282 } 283 284 watchdogAddr := "http://" + *listen 285 if *sslCert != "" || *sslKey != "" { 286 watchdogAddr = "https://" + *listen 287 } 288 watchdogAddr += "/healthz" 289 290 if watchdogErrCount > 0 && watchdogTick > 0 { 291 go watchdog(watchdogTick, watchdogErrCount, watchdogAddr) 292 } else { 293 log.Println("watchdog disabled") 294 } 295 296 logger := sglog.Scoped("ZoektWebserverGRPCServer") 297 298 streamer := web.NewTraceAwareSearcher(s.Searcher) 299 grpcServer := newGRPCServer(logger, streamer) 300 301 handler = multiplexGRPC(grpcServer, handler) 302 303 srv := &http.Server{ 304 Addr: *listen, 305 Handler: handler, 306 } 307 308 go func() { 309 sglog.Scoped("server").Info("starting server", sglog.Stringp("address", listen)) 310 var err error 311 if *sslCert != "" || *sslKey != "" { 312 err = srv.ListenAndServeTLS(*sslCert, *sslKey) 313 } else { 314 err = srv.ListenAndServe() 315 } 316 317 if err != http.ErrServerClosed { 318 // Fatal otherwise shutdownOnSignal will block 319 log.Fatalf("ListenAndServe: %v", err) 320 } 321 }() 322 323 if s.RPC { 324 // Our RPC system does not support shutdown and hijacks the underlying 325 // http connection. This means shutdown is ineffective and just waits 10s 326 // before calling close. Lets just quit faster in that case. 327 if err := closeOnSignal(srv); err != nil { 328 log.Fatalf("http.Server.Close: %v", err) 329 } 330 } else { 331 if err := shutdownOnSignal(srv); err != nil { 332 log.Fatalf("http.Server.Shutdown: %v", err) 333 } 334 } 335} 336 337// multiplexGRPC takes a gRPC server and a plain HTTP handler and multiplexes the 338// request handling. Any requests that declare themselves as gRPC requests are routed 339// to the gRPC server, all others are routed to the httpHandler. 340func multiplexGRPC(grpcServer *grpc.Server, httpHandler http.Handler) http.Handler { 341 newHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 342 if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") { 343 grpcServer.ServeHTTP(w, r) 344 } else { 345 httpHandler.ServeHTTP(w, r) 346 } 347 }) 348 349 // Until we enable TLS, we need to fall back to the h2c protocol, which is 350 // basically HTTP2 without TLS. The standard library does not implement the 351 // h2s protocol, so this hijacks h2s requests and handles them correctly. 352 return h2c.NewHandler(newHandler, &http2.Server{}) 353} 354 355// addProxyHandler adds a handler to "mux" that proxies all requests with base 356// /indexserver to "socket". 357func addProxyHandler(mux *http.ServeMux, socket string) { 358 proxy := httputil.NewSingleHostReverseProxy(&url.URL{ 359 Scheme: "http", 360 // The value of "Host" is arbitrary, because it is ignored by the 361 // DialContext we use for the socket connection. 362 Host: "socket", 363 }) 364 proxy.Transport = &http.Transport{ 365 DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { 366 var d net.Dialer 367 return d.DialContext(ctx, "unix", socket) 368 }, 369 } 370 mux.Handle("/indexserver/", http.StripPrefix("/indexserver/", http.HandlerFunc(proxy.ServeHTTP))) 371} 372 373// shutdownSignalChan returns a channel which is listening for shutdown 374// signals from the operating system. maxReads is an upper bound on how many 375// times you will read the channel (used as buffer for signal.Notify). 376func shutdownSignalChan(maxReads int) <-chan os.Signal { 377 c := make(chan os.Signal, maxReads) 378 signal.Notify(c, os.Interrupt) // terminal C-c and goreman 379 signal.Notify(c, PLATFORM_SIGTERM) // Kubernetes 380 return c 381} 382 383// closeOnSignal will listen for SIGINT or SIGTERM and call srv.Close. This is 384// not a graceful shutdown, see shutdownOnSignal. 385func closeOnSignal(srv *http.Server) error { 386 c := shutdownSignalChan(1) 387 <-c 388 389 return srv.Close() 390} 391 392// shutdownOnSignal will listen for SIGINT or SIGTERM and call srv.Shutdown. 393// Note it doesn't call anything else for shutting down. Notably our RPC 394// framework doesn't allow us to drain connections, so when Shutdown we will 395// wait 10s before closing. 396// 397// Note: the call site for shutdownOnSignal should use closeOnSignal instead 398// if rpc mode is enabled due to the above limitation. 399func shutdownOnSignal(srv *http.Server) error { 400 c := shutdownSignalChan(2) 401 <-c 402 403 // If we receive another signal, immediate shutdown 404 ctx, cancel := context.WithCancel(context.Background()) 405 defer cancel() 406 go func() { 407 select { 408 case <-ctx.Done(): 409 case sig := <-c: 410 log.Printf("received another signal (%v), immediate shutdown", sig) 411 cancel() 412 } 413 }() 414 415 // Wait for 10s to drain ongoing requests. Kubernetes gives us 30s to 416 // shutdown, we have already used 15s waiting for our endpoint removal to 417 // propagate. 418 ctx, cancel2 := context.WithTimeout(ctx, 10*time.Second) 419 defer cancel2() 420 421 log.Printf("shutting down") 422 return srv.Shutdown(ctx) 423} 424 425func watchdogOnce(ctx context.Context, client *http.Client, addr string) error { 426 defer metricWatchdogTotal.Inc() 427 428 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(30*time.Second)) 429 defer cancel() 430 431 req, err := http.NewRequest("GET", addr, nil) 432 if err != nil { 433 return err 434 } 435 436 req = req.WithContext(ctx) 437 438 resp, err := client.Do(req) 439 if err != nil { 440 return err 441 } 442 body, _ := io.ReadAll(resp.Body) 443 _ = resp.Body.Close() 444 445 if resp.StatusCode != http.StatusOK { 446 return fmt.Errorf("watchdog: status=%v body=%q", resp.StatusCode, string(body)) 447 } 448 return nil 449} 450 451func watchdog(dt time.Duration, maxErrCount int, addr string) { 452 tr := &http.Transport{ 453 TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, 454 } 455 client := &http.Client{ 456 Transport: tr, 457 } 458 tick := time.NewTicker(dt) 459 460 errCount := 0 461 for range tick.C { 462 err := watchdogOnce(context.Background(), client, addr) 463 if err != nil { 464 errCount++ 465 metricWatchdogErrors.Set(float64(errCount)) 466 metricWatchdogErrorsTotal.Inc() 467 if errCount >= maxErrCount { 468 log.Printf(`watchdog health check has consecutively failed %d times indicating is likely an unrecoverable error affecting zoekt. As such this process will exit with code 3. 469 470Final error: %v 471 472Possible remediations: 473- If this rarely happens, ignore and let your process manager restart zoekt. 474- Possibly under provisioned. Try increasing CPU or disk IO. 475- A bug. Reach out with logs and screenshots of metrics when this occurs.`, errCount, err) 476 os.Exit(3) 477 } else { 478 log.Printf("watchdog: failed, will try %d more times: %v", maxErrCount-errCount, err) 479 } 480 } else if errCount > 0 { 481 errCount = 0 482 metricWatchdogErrors.Set(float64(errCount)) 483 log.Printf("watchdog: success, resetting error count") 484 } 485 } 486} 487 488func diskUsage(path string) (*disk.UsageStat, error) { 489 duPath := path 490 if runtime.GOOS == "windows" { 491 duPath = filepath.VolumeName(duPath) 492 } 493 usage, err := disk.Usage(duPath) 494 if err != nil { 495 return nil, fmt.Errorf("diskUsage: %w", err) 496 } 497 return usage, err 498} 499 500func mustRegisterDiskMonitor(path string) { 501 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ 502 Name: "src_disk_space_available_bytes", 503 Help: "Amount of free space disk space.", 504 ConstLabels: prometheus.Labels{"path": path}, 505 }, func() float64 { 506 // I know there is no error handling here, and I don't like it 507 // but there was no error handling in the previous version 508 // that used Statfs, either, so I'm assuming there's no need for it 509 usage, _ := diskUsage(path) 510 return float64(usage.Free) 511 })) 512 513 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ 514 Name: "src_disk_space_total_bytes", 515 Help: "Amount of total disk space.", 516 ConstLabels: prometheus.Labels{"path": path}, 517 }, func() float64 { 518 // I know there is no error handling here, and I don't like it 519 // but there was no error handling in the previous version 520 // that used Statfs, either, so I'm assuming there's no need for it 521 usage, _ := diskUsage(path) 522 return float64(usage.Total) 523 })) 524} 525 526type loggedSearcher struct { 527 zoekt.Streamer 528 Logger sglog.Logger 529} 530 531func (s *loggedSearcher) Search( 532 ctx context.Context, 533 q query.Q, 534 opts *zoekt.SearchOptions, 535) (sr *zoekt.SearchResult, err error) { 536 defer func() { 537 var stats *zoekt.Stats 538 if sr != nil { 539 stats = &sr.Stats 540 } 541 s.log(ctx, q, opts, stats, err) 542 }() 543 544 metricSearchRequestsTotal.Inc() 545 return s.Streamer.Search(ctx, q, opts) 546} 547 548func (s *loggedSearcher) StreamSearch( 549 ctx context.Context, 550 q query.Q, 551 opts *zoekt.SearchOptions, 552 sender zoekt.Sender, 553) error { 554 var stats zoekt.Stats 555 556 metricSearchRequestsTotal.Inc() 557 err := s.Streamer.StreamSearch(ctx, q, opts, stream.SenderFunc(func(event *zoekt.SearchResult) { 558 stats.Add(event.Stats) 559 sender.Send(event) 560 })) 561 562 s.log(ctx, q, opts, &stats, err) 563 564 return err 565} 566 567func (s *loggedSearcher) log(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, st *zoekt.Stats, err error) { 568 logger := s.Logger. 569 WithTrace(traceContext(ctx)). 570 With( 571 sglog.String("query", q.String()), 572 sglog.Bool("opts.EstimateDocCount", opts.EstimateDocCount), 573 sglog.Bool("opts.Whole", opts.Whole), 574 sglog.Int("opts.ShardMaxMatchCount", opts.ShardMaxMatchCount), 575 sglog.Int("opts.TotalMaxMatchCount", opts.TotalMaxMatchCount), 576 sglog.Duration("opts.MaxWallTime", opts.MaxWallTime), 577 sglog.Int("opts.MaxDocDisplayCount", opts.MaxDocDisplayCount), 578 sglog.Int("opts.MaxMatchDisplayCount", opts.MaxMatchDisplayCount), 579 ) 580 581 if err != nil { 582 switch { 583 case errors.Is(err, context.Canceled): 584 logger.Warn("search canceled", sglog.Error(err)) 585 case errors.Is(err, context.DeadlineExceeded): 586 logger.Warn("search timeout", sglog.Error(err)) 587 default: 588 logger.Error("search failed", sglog.Error(err)) 589 } 590 return 591 } 592 593 if st == nil { 594 return 595 } 596 597 logger.Debug("search", 598 sglog.Int64("stat.ContentBytesLoaded", st.ContentBytesLoaded), 599 sglog.Int64("stat.IndexBytesLoaded", st.IndexBytesLoaded), 600 sglog.Int("stat.Crashes", st.Crashes), 601 sglog.Duration("stat.Duration", st.Duration), 602 sglog.Int("stat.FileCount", st.FileCount), 603 sglog.Int("stat.ShardFilesConsidered", st.ShardFilesConsidered), 604 sglog.Int("stat.FilesConsidered", st.FilesConsidered), 605 sglog.Int("stat.FilesLoaded", st.FilesLoaded), 606 sglog.Int("stat.FilesSkipped", st.FilesSkipped), 607 sglog.Int("stat.ShardsScanned", st.ShardsScanned), 608 sglog.Int("stat.ShardsSkipped", st.ShardsSkipped), 609 sglog.Int("stat.ShardsSkippedFilter", st.ShardsSkippedFilter), 610 sglog.Int("stat.MatchCount", st.MatchCount), 611 sglog.Int("stat.NgramMatches", st.NgramMatches), 612 sglog.Int("stat.NgramLookups", st.NgramLookups), 613 sglog.Duration("stat.Wait", st.Wait), 614 sglog.Duration("stat.MatchTreeConstruction", st.MatchTreeConstruction), 615 sglog.Duration("stat.MatchTreeSearch", st.MatchTreeSearch), 616 sglog.Int("stat.RegexpsConsidered", st.RegexpsConsidered), 617 sglog.String("stat.FlushReason", st.FlushReason.String()), 618 ) 619} 620 621func traceContext(ctx context.Context) sglog.TraceContext { 622 otSpan := opentracing.SpanFromContext(ctx) 623 if otSpan != nil { 624 if jaegerSpan, ok := otSpan.Context().(jaeger.SpanContext); ok { 625 return sglog.TraceContext{ 626 TraceID: jaegerSpan.TraceID().String(), 627 SpanID: jaegerSpan.SpanID().String(), 628 } 629 } 630 } 631 632 if otelSpan := oteltrace.SpanFromContext(ctx).SpanContext(); otelSpan.IsValid() { 633 return sglog.TraceContext{ 634 TraceID: otelSpan.TraceID().String(), 635 SpanID: otelSpan.SpanID().String(), 636 } 637 } 638 639 return sglog.TraceContext{} 640} 641 642func newGRPCServer(logger sglog.Logger, streamer zoekt.Streamer, additionalOpts ...grpc.ServerOption) *grpc.Server { 643 metrics := mustGetServerMetrics() 644 645 opts := []grpc.ServerOption{ 646 grpc.ChainStreamInterceptor( 647 otelgrpc.StreamServerInterceptor(), 648 metrics.StreamServerInterceptor(), 649 messagesize.StreamServerInterceptor, 650 internalerrs.LoggingStreamServerInterceptor(logger), 651 ), 652 grpc.ChainUnaryInterceptor( 653 otelgrpc.UnaryServerInterceptor(), 654 metrics.UnaryServerInterceptor(), 655 messagesize.UnaryServerInterceptor, 656 internalerrs.LoggingUnaryServerInterceptor(logger), 657 ), 658 } 659 660 opts = append(opts, additionalOpts...) 661 662 // Ensure that the message size options are set last, so they override any other 663 // server-specific options that tweak the message size. 664 // 665 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 666 // take precedence over everything else with a uniform size setting that's easy to reason about. 667 opts = append(opts, messagesize.MustGetServerMessageSizeFromEnv()...) 668 669 s := grpc.NewServer(opts...) 670 proto.RegisterWebserverServiceServer(s, zoektgrpc.NewServer(streamer)) 671 672 return s 673} 674 675var ( 676 metricWatchdogErrors = promauto.NewGauge(prometheus.GaugeOpts{ 677 Name: "zoekt_webserver_watchdog_errors", 678 Help: "The current error count for zoekt watchdog.", 679 }) 680 metricWatchdogTotal = promauto.NewCounter(prometheus.CounterOpts{ 681 Name: "zoekt_webserver_watchdog_total", 682 Help: "The total number of requests done by zoekt watchdog.", 683 }) 684 metricWatchdogErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{ 685 Name: "zoekt_webserver_watchdog_errors_total", 686 Help: "The total number of errors from zoekt watchdog.", 687 }) 688 metricSearchRequestsTotal = promauto.NewCounter(prometheus.CounterOpts{ 689 Name: "zoekt_search_requests_total", 690 Help: "The total number of search requests that zoekt received", 691 }) 692 693 serverMetricsOnce sync.Once 694 serverMetrics *grpcprom.ServerMetrics 695) 696 697// mustGetServerMetrics returns a singleton instance of the server metrics 698// that are shared across all gRPC servers that this process creates. 699// 700// This function panics if the metrics cannot be registered with the default 701// Prometheus registry. 702func mustGetServerMetrics() *grpcprom.ServerMetrics { 703 serverMetricsOnce.Do(func() { 704 serverMetrics = grpcprom.NewServerMetrics( 705 grpcprom.WithServerCounterOptions(), 706 grpcprom.WithServerHandlingTimeHistogram(), // record the overall response latency for a gRPC request) 707 ) 708 709 prometheus.DefaultRegisterer.MustRegister(serverMetrics) 710 }) 711 712 return serverMetrics 713}