fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 20 kB View raw
1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Command zoekt-webserver starts a server that responds to search queries, using 16// an index generated by another program such as zoekt-indexserver. 17package main 18 19import ( 20 "context" 21 "crypto/tls" 22 "errors" 23 "flag" 24 "fmt" 25 "html/template" 26 "io" 27 "log" 28 "net" 29 "net/http" 30 "net/http/httputil" 31 "net/url" 32 "os" 33 "os/signal" 34 "path/filepath" 35 "strconv" 36 "strings" 37 "time" 38 39 "github.com/opentracing/opentracing-go" 40 "github.com/prometheus/client_golang/prometheus" 41 "github.com/prometheus/client_golang/prometheus/promauto" 42 "github.com/shirou/gopsutil/v3/disk" 43 sglog "github.com/sourcegraph/log" 44 "github.com/sourcegraph/mountinfo" 45 "github.com/uber/jaeger-client-go" 46 oteltrace "go.opentelemetry.io/otel/trace" 47 "go.uber.org/automaxprocs/maxprocs" 48 "golang.org/x/sys/unix" 49 "google.golang.org/grpc" 50 51 "github.com/sourcegraph/zoekt" 52 grpcserver "github.com/sourcegraph/zoekt/cmd/zoekt-webserver/grpc/server" 53 "github.com/sourcegraph/zoekt/grpc/defaults" 54 "github.com/sourcegraph/zoekt/grpc/grpcutil" 55 webserverv1 "github.com/sourcegraph/zoekt/grpc/protos/zoekt/webserver/v1" 56 "github.com/sourcegraph/zoekt/index" 57 "github.com/sourcegraph/zoekt/internal/debugserver" 58 "github.com/sourcegraph/zoekt/internal/profiler" 59 "github.com/sourcegraph/zoekt/internal/trace" 60 "github.com/sourcegraph/zoekt/internal/tracer" 61 "github.com/sourcegraph/zoekt/query" 62 "github.com/sourcegraph/zoekt/search" 63 "github.com/sourcegraph/zoekt/web" 64) 65 66const logFormat = "2006-01-02T15-04-05.999999999Z07" 67 68func divertLogs(dir string, interval time.Duration) { 69 t := time.NewTicker(interval) 70 var last *os.File 71 for { 72 nm := filepath.Join(dir, fmt.Sprintf("zoekt-webserver.%s.%d.log", time.Now().Format(logFormat), os.Getpid())) 73 fmt.Fprintf(os.Stderr, "writing logs to %s\n", nm) 74 75 f, err := os.Create(nm) 76 if err != nil { 77 // There is not much we can do now. 78 fmt.Fprintf(os.Stderr, "can't create output file %s: %v\n", nm, err) 79 os.Exit(2) 80 } 81 82 log.SetOutput(f) 83 last.Close() 84 85 last = f 86 87 <-t.C 88 } 89} 90 91const templateExtension = ".html.tpl" 92 93func loadTemplates(tpl *template.Template, dir string) error { 94 fs, err := filepath.Glob(dir + "/*" + templateExtension) 95 if err != nil { 96 log.Fatalf("Glob: %v", err) 97 } 98 99 log.Printf("loading templates: %v", fs) 100 for _, fn := range fs { 101 content, err := os.ReadFile(fn) 102 if err != nil { 103 return err 104 } 105 106 base := filepath.Base(fn) 107 base = strings.TrimSuffix(base, templateExtension) 108 if _, err := tpl.New(base).Parse(string(content)); err != nil { 109 return fmt.Errorf("template.Parse(%s): %v", fn, err) 110 } 111 } 112 return nil 113} 114 115func writeTemplates(dir string) error { 116 if dir == "" { 117 return fmt.Errorf("must set --template_dir") 118 } 119 120 for k, v := range web.TemplateText { 121 nm := filepath.Join(dir, k+templateExtension) 122 if err := os.WriteFile(nm, []byte(v), 0o644); err != nil { 123 return err 124 } 125 } 126 return nil 127} 128 129func main() { 130 logDir := flag.String("log_dir", "", "log to this directory rather than stderr.") 131 logRefresh := flag.Duration("log_refresh", 24*time.Hour, "if using --log_dir, start writing a new file this often.") 132 133 listen := flag.String("listen", ":6070", "listen on this address.") 134 indexDir := flag.String("index", index.DefaultDir, "set index directory to use") 135 html := flag.Bool("html", true, "enable HTML interface") 136 enableRPC := flag.Bool("rpc", false, "enable go/net RPC") 137 enableIndexserverProxy := flag.Bool("indexserver_proxy", false, "proxy requests with URLs matching the path /indexserver/ to <index>/indexserver.sock") 138 print := flag.Bool("print", false, "enable local result URLs") 139 enablePprof := flag.Bool("pprof", false, "set to enable remote profiling.") 140 sslCert := flag.String("ssl_cert", "", "set path to SSL .pem holding certificate.") 141 sslKey := flag.String("ssl_key", "", "set path to SSL .pem holding key.") 142 hostCustomization := flag.String( 143 "host_customization", "", 144 "specify host customization, as HOST1=QUERY,HOST2=QUERY") 145 146 templateDir := flag.String("template_dir", "", "set directory from which to load custom .html.tpl template files") 147 dumpTemplates := flag.Bool("dump_templates", false, "dump templates into --template_dir and exit.") 148 corsOrigin := flag.String("cors_origin", "", "allow requests from this origin. If empty, no CORS headers are set.") 149 version := flag.Bool("version", false, "Print version number") 150 151 flag.Parse() 152 153 if *version { 154 fmt.Printf("zoekt-webserver version %q\n", index.Version) 155 os.Exit(0) 156 } 157 158 if *dumpTemplates { 159 if err := writeTemplates(*templateDir); err != nil { 160 log.Fatal(err) 161 } 162 os.Exit(0) 163 } 164 165 resource := sglog.Resource{ 166 Name: "zoekt-webserver", 167 Version: index.Version, 168 InstanceID: index.HostnameBestEffort(), 169 } 170 171 liblog := sglog.Init(resource) 172 defer liblog.Sync() 173 tracer.Init(resource) 174 profiler.Init("zoekt-webserver") 175 176 if *logDir != "" { 177 if fi, err := os.Lstat(*logDir); err != nil || !fi.IsDir() { 178 log.Fatalf("%s is not a directory", *logDir) 179 } 180 // We could do fdup acrobatics to also redirect 181 // stderr, but it is simpler and more portable for the 182 // caller to divert stderr output if necessary. 183 go divertLogs(*logDir, *logRefresh) 184 } 185 186 // Tune GOMAXPROCS to match Linux container CPU quota. 187 _, _ = maxprocs.Set() 188 189 if err := os.MkdirAll(*indexDir, 0o755); err != nil { 190 log.Fatal(err) 191 } 192 193 mustRegisterDiskMonitor(*indexDir) 194 195 metricsLogger := sglog.Scoped("metricsRegistration") 196 197 mustRegisterMemoryMapMetrics(metricsLogger) 198 199 opts := mountinfo.CollectorOpts{Namespace: "zoekt_webserver"} 200 c := mountinfo.NewCollector(metricsLogger, opts, map[string]string{"indexDir": *indexDir}) 201 202 prometheus.DefaultRegisterer.MustRegister(c) 203 204 // Do not block on loading shards so we can become partially available 205 // sooner. Otherwise on large instances zoekt can be unavailable on the 206 // order of minutes. 207 searcher, err := search.NewDirectorySearcherFast(*indexDir) 208 if err != nil { 209 log.Fatal(err) 210 } 211 212 searcher = &loggedSearcher{ 213 Streamer: searcher, 214 Logger: sglog.Scoped("searcher"), 215 } 216 217 s := &web.Server{ 218 Searcher: searcher, 219 Top: web.Top, 220 Version: index.Version, 221 } 222 223 if *templateDir != "" { 224 if err := loadTemplates(s.Top, *templateDir); err != nil { 225 log.Fatalf("loadTemplates: %v", err) 226 } 227 } 228 229 s.Print = *print 230 s.HTML = *html 231 s.RPC = *enableRPC 232 233 if *hostCustomization != "" { 234 s.HostCustomQueries = map[string]string{} 235 for _, h := range strings.SplitN(*hostCustomization, ",", -1) { 236 if len(h) == 0 { 237 continue 238 } 239 fields := strings.SplitN(h, "=", 2) 240 if len(fields) < 2 { 241 log.Fatalf("invalid host_customization %q", h) 242 } 243 244 s.HostCustomQueries[fields[0]] = fields[1] 245 } 246 } 247 248 serveMux, err := web.NewMux(s) 249 if err != nil { 250 log.Fatal(err) 251 } 252 253 debugserver.AddHandlers(serveMux, *enablePprof) 254 255 if *enableIndexserverProxy { 256 socket := filepath.Join(*indexDir, "indexserver.sock") 257 sglog.Scoped("server").Info("adding reverse proxy", sglog.String("socket", socket)) 258 addProxyHandler(serveMux, socket) 259 } 260 261 handler := trace.Middleware(serveMux) 262 263 // Sourcegraph: We use environment variables to configure watchdog since 264 // they are more convenient than flags in containerized environments. 265 watchdogTick := 30 * time.Second 266 if v := os.Getenv("ZOEKT_WATCHDOG_TICK"); v != "" { 267 watchdogTick, _ = time.ParseDuration(v) 268 log.Printf("custom ZOEKT_WATCHDOG_TICK=%v", watchdogTick) 269 } 270 271 watchdogErrCount := 3 272 if v := os.Getenv("ZOEKT_WATCHDOG_ERRORS"); v != "" { 273 watchdogErrCount, _ = strconv.Atoi(v) 274 log.Printf("custom ZOEKT_WATCHDOG_ERRORS=%d", watchdogErrCount) 275 } 276 277 watchdogAddr := "http://" + *listen 278 if *sslCert != "" || *sslKey != "" { 279 watchdogAddr = "https://" + *listen 280 } 281 watchdogAddr += "/healthz" 282 283 if watchdogErrCount > 0 && watchdogTick > 0 { 284 go watchdog(watchdogTick, watchdogErrCount, watchdogAddr) 285 } else { 286 log.Println("watchdog disabled") 287 } 288 289 logger := sglog.Scoped("ZoektWebserverGRPCServer") 290 291 streamer := web.NewTraceAwareSearcher(s.Searcher) 292 grpcServer := newGRPCServer(logger, streamer) 293 294 handler = grpcutil.MultiplexGRPC(grpcServer, handler) 295 handler = corsHandler(handler, *corsOrigin) 296 297 srv := &http.Server{ 298 Addr: *listen, 299 Handler: handler, 300 } 301 302 go func() { 303 sglog.Scoped("server").Info("starting server", sglog.Stringp("address", listen)) 304 var err error 305 if *sslCert != "" || *sslKey != "" { 306 err = srv.ListenAndServeTLS(*sslCert, *sslKey) 307 } else { 308 err = srv.ListenAndServe() 309 } 310 311 if err != http.ErrServerClosed { 312 // Fatal otherwise shutdownOnSignal will block 313 log.Fatalf("ListenAndServe: %v", err) 314 } 315 }() 316 317 if s.RPC { 318 // Our RPC system does not support shutdown and hijacks the underlying 319 // http connection. This means shutdown is ineffective and just waits 10s 320 // before calling close. Lets just quit faster in that case. 321 if err := closeOnSignal(srv); err != nil { 322 log.Fatalf("http.Server.Close: %v", err) 323 } 324 } else { 325 if err := shutdownOnSignal(srv); err != nil { 326 log.Fatalf("http.Server.Shutdown: %v", err) 327 } 328 } 329} 330 331// addProxyHandler adds a handler to "mux" that proxies all requests with base 332// /indexserver to "socket". 333func addProxyHandler(mux *http.ServeMux, socket string) { 334 proxy := httputil.NewSingleHostReverseProxy(&url.URL{ 335 Scheme: "http", 336 // The value of "Host" is arbitrary, because it is ignored by the 337 // DialContext we use for the socket connection. 338 Host: "socket", 339 }) 340 proxy.Transport = &http.Transport{ 341 DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { 342 var d net.Dialer 343 return d.DialContext(ctx, "unix", socket) 344 }, 345 } 346 mux.Handle("/indexserver/", http.StripPrefix("/indexserver/", http.HandlerFunc(proxy.ServeHTTP))) 347} 348 349// shutdownSignalChan returns a channel which is listening for shutdown 350// signals from the operating system. maxReads is an upper bound on how many 351// times you will read the channel (used as buffer for signal.Notify). 352func shutdownSignalChan(maxReads int) <-chan os.Signal { 353 c := make(chan os.Signal, maxReads) 354 signal.Notify(c, os.Interrupt) // terminal C-c and goreman 355 signal.Notify(c, unix.SIGTERM) // Kubernetes 356 return c 357} 358 359// closeOnSignal will listen for SIGINT or SIGTERM and call srv.Close. This is 360// not a graceful shutdown, see shutdownOnSignal. 361func closeOnSignal(srv *http.Server) error { 362 c := shutdownSignalChan(1) 363 <-c 364 365 return srv.Close() 366} 367 368// shutdownOnSignal will listen for SIGINT or SIGTERM and call srv.Shutdown. 369// Note it doesn't call anything else for shutting down. Notably our RPC 370// framework doesn't allow us to drain connections, so when Shutdown we will 371// wait 10s before closing. 372// 373// Note: the call site for shutdownOnSignal should use closeOnSignal instead 374// if rpc mode is enabled due to the above limitation. 375func shutdownOnSignal(srv *http.Server) error { 376 c := shutdownSignalChan(2) 377 <-c 378 379 // If we receive another signal, immediate shutdown 380 ctx, cancel := context.WithCancel(context.Background()) 381 defer cancel() 382 go func() { 383 select { 384 case <-ctx.Done(): 385 case sig := <-c: 386 log.Printf("received another signal (%v), immediate shutdown", sig) 387 cancel() 388 } 389 }() 390 391 // Wait for 10s to drain ongoing requests. Kubernetes gives us 30s to 392 // shutdown, we have already used 15s waiting for our endpoint removal to 393 // propagate. 394 ctx, cancel2 := context.WithTimeout(ctx, 10*time.Second) 395 defer cancel2() 396 397 log.Printf("shutting down") 398 return srv.Shutdown(ctx) 399} 400 401func watchdogOnce(ctx context.Context, client *http.Client, addr string) error { 402 defer metricWatchdogTotal.Inc() 403 404 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(30*time.Second)) 405 defer cancel() 406 407 req, err := http.NewRequest("GET", addr, nil) 408 if err != nil { 409 return err 410 } 411 412 req = req.WithContext(ctx) 413 414 resp, err := client.Do(req) 415 if err != nil { 416 return err 417 } 418 body, _ := io.ReadAll(resp.Body) 419 _ = resp.Body.Close() 420 421 if resp.StatusCode != http.StatusOK { 422 return fmt.Errorf("watchdog: status=%v body=%q", resp.StatusCode, string(body)) 423 } 424 return nil 425} 426 427func watchdog(dt time.Duration, maxErrCount int, addr string) { 428 tr := &http.Transport{ 429 TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, 430 } 431 client := &http.Client{ 432 Transport: tr, 433 } 434 tick := time.NewTicker(dt) 435 436 errCount := 0 437 for range tick.C { 438 err := watchdogOnce(context.Background(), client, addr) 439 if err != nil { 440 errCount++ 441 metricWatchdogErrors.Set(float64(errCount)) 442 metricWatchdogErrorsTotal.Inc() 443 if errCount >= maxErrCount { 444 log.Printf(`watchdog health check has consecutively failed %d times indicating is likely an unrecoverable error affecting zoekt. As such this process will exit with code 3. 445 446Final error: %v 447 448Possible remediations: 449- If this rarely happens, ignore and let your process manager restart zoekt. 450- Possibly under provisioned. Try increasing CPU or disk IO. 451- A bug. Reach out with logs and screenshots of metrics when this occurs.`, errCount, err) 452 os.Exit(3) 453 } else { 454 log.Printf("watchdog: failed, will try %d more times: %v", maxErrCount-errCount, err) 455 } 456 } else if errCount > 0 { 457 errCount = 0 458 metricWatchdogErrors.Set(float64(errCount)) 459 log.Printf("watchdog: success, resetting error count") 460 } 461 } 462} 463 464func mustRegisterDiskMonitor(path string) { 465 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ 466 Name: "src_disk_space_available_bytes", 467 Help: "Amount of free space disk space.", 468 ConstLabels: prometheus.Labels{"path": path}, 469 }, func() float64 { 470 usage, _ := disk.Usage(path) 471 return float64(usage.Free) 472 })) 473 474 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ 475 Name: "src_disk_space_total_bytes", 476 Help: "Amount of total disk space.", 477 ConstLabels: prometheus.Labels{"path": path}, 478 }, func() float64 { 479 usage, _ := disk.Usage(path) 480 return float64(usage.Total) 481 })) 482} 483 484type loggedSearcher struct { 485 zoekt.Streamer 486 Logger sglog.Logger 487} 488 489func (s *loggedSearcher) Search( 490 ctx context.Context, 491 q query.Q, 492 opts *zoekt.SearchOptions, 493) (sr *zoekt.SearchResult, err error) { 494 defer func() { 495 var stats *zoekt.Stats 496 if sr != nil { 497 stats = &sr.Stats 498 } 499 s.log(ctx, q, opts, stats, err) 500 }() 501 502 metricSearchRequestsTotal.Inc() 503 return s.Streamer.Search(ctx, q, opts) 504} 505 506func (s *loggedSearcher) StreamSearch( 507 ctx context.Context, 508 q query.Q, 509 opts *zoekt.SearchOptions, 510 sender zoekt.Sender, 511) error { 512 var stats zoekt.Stats 513 514 metricSearchRequestsTotal.Inc() 515 err := s.Streamer.StreamSearch(ctx, q, opts, zoekt.SenderFunc(func(event *zoekt.SearchResult) { 516 stats.Add(event.Stats) 517 sender.Send(event) 518 })) 519 520 s.log(ctx, q, opts, &stats, err) 521 522 return err 523} 524 525func (s *loggedSearcher) log(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, st *zoekt.Stats, err error) { 526 logger := s.Logger. 527 WithTrace(traceContext(ctx)). 528 With( 529 sglog.String("query", q.String()), 530 sglog.Bool("opts.EstimateDocCount", opts.EstimateDocCount), 531 sglog.Bool("opts.Whole", opts.Whole), 532 sglog.Int("opts.ShardMaxMatchCount", opts.ShardMaxMatchCount), 533 sglog.Int("opts.TotalMaxMatchCount", opts.TotalMaxMatchCount), 534 sglog.Duration("opts.MaxWallTime", opts.MaxWallTime), 535 sglog.Int("opts.MaxDocDisplayCount", opts.MaxDocDisplayCount), 536 sglog.Int("opts.MaxMatchDisplayCount", opts.MaxMatchDisplayCount), 537 ) 538 539 if err != nil { 540 switch { 541 case errors.Is(err, context.Canceled): 542 logger.Warn("search canceled", sglog.Error(err)) 543 case errors.Is(err, context.DeadlineExceeded): 544 logger.Warn("search timeout", sglog.Error(err)) 545 default: 546 logger.Error("search failed", sglog.Error(err)) 547 } 548 return 549 } 550 551 if st == nil { 552 return 553 } 554 555 logger.Debug("search", 556 sglog.Int64("stat.ContentBytesLoaded", st.ContentBytesLoaded), 557 sglog.Int64("stat.IndexBytesLoaded", st.IndexBytesLoaded), 558 sglog.Int("stat.Crashes", st.Crashes), 559 sglog.Duration("stat.Duration", st.Duration), 560 sglog.Int("stat.FileCount", st.FileCount), 561 sglog.Int("stat.ShardFilesConsidered", st.ShardFilesConsidered), 562 sglog.Int("stat.FilesConsidered", st.FilesConsidered), 563 sglog.Int("stat.FilesLoaded", st.FilesLoaded), 564 sglog.Int("stat.FilesSkipped", st.FilesSkipped), 565 sglog.Int("stat.ShardsScanned", st.ShardsScanned), 566 sglog.Int("stat.ShardsSkipped", st.ShardsSkipped), 567 sglog.Int("stat.ShardsSkippedFilter", st.ShardsSkippedFilter), 568 sglog.Int("stat.MatchCount", st.MatchCount), 569 sglog.Int("stat.NgramMatches", st.NgramMatches), 570 sglog.Int("stat.NgramLookups", st.NgramLookups), 571 sglog.Duration("stat.Wait", st.Wait), 572 sglog.Duration("stat.MatchTreeConstruction", st.MatchTreeConstruction), 573 sglog.Duration("stat.MatchTreeSearch", st.MatchTreeSearch), 574 sglog.Int("stat.RegexpsConsidered", st.RegexpsConsidered), 575 sglog.String("stat.FlushReason", st.FlushReason.String()), 576 ) 577} 578 579func traceContext(ctx context.Context) sglog.TraceContext { 580 otSpan := opentracing.SpanFromContext(ctx) 581 if otSpan != nil { 582 if jaegerSpan, ok := otSpan.Context().(jaeger.SpanContext); ok { 583 return sglog.TraceContext{ 584 TraceID: jaegerSpan.TraceID().String(), 585 SpanID: jaegerSpan.SpanID().String(), 586 } 587 } 588 } 589 590 if otelSpan := oteltrace.SpanFromContext(ctx).SpanContext(); otelSpan.IsValid() { 591 return sglog.TraceContext{ 592 TraceID: otelSpan.TraceID().String(), 593 SpanID: otelSpan.SpanID().String(), 594 } 595 } 596 597 return sglog.TraceContext{} 598} 599 600func newGRPCServer(logger sglog.Logger, streamer zoekt.Streamer, additionalOpts ...grpc.ServerOption) *grpc.Server { 601 s := defaults.NewServer(logger, additionalOpts...) 602 webserverv1.RegisterWebserverServiceServer(s, grpcserver.NewServer(streamer)) 603 604 return s 605} 606 607func corsHandler(h http.Handler, origin string) http.Handler { 608 if origin == "" { 609 return h 610 } 611 612 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 613 if r.Header.Get("Origin") == "" { 614 h.ServeHTTP(w, r) 615 return 616 } 617 618 w.Header().Set("Access-Control-Allow-Origin", origin) 619 w.Header().Set("Access-Control-Max-Age", "86400") 620 w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS") 621 w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") 622 w.Header().Add("Vary", "Origin") 623 624 if r.Method == "OPTIONS" { 625 w.Header().Set("Access-Control-Max-Age", "86400") 626 w.WriteHeader(http.StatusOK) 627 return 628 } 629 630 h.ServeHTTP(w, r) 631 }) 632} 633 634var ( 635 metricWatchdogErrors = promauto.NewGauge(prometheus.GaugeOpts{ 636 Name: "zoekt_webserver_watchdog_errors", 637 Help: "The current error count for zoekt watchdog.", 638 }) 639 metricWatchdogTotal = promauto.NewCounter(prometheus.CounterOpts{ 640 Name: "zoekt_webserver_watchdog_total", 641 Help: "The total number of requests done by zoekt watchdog.", 642 }) 643 metricWatchdogErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{ 644 Name: "zoekt_webserver_watchdog_errors_total", 645 Help: "The total number of errors from zoekt watchdog.", 646 }) 647 metricSearchRequestsTotal = promauto.NewCounter(prometheus.CounterOpts{ 648 Name: "zoekt_search_requests_total", 649 Help: "The total number of search requests that zoekt received", 650 }) 651)