fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Copyright 2016 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Command zoekt-webserver starts a server that responds to search queries, using 16// an index generated by another program such as zoekt-indexserver. 17package main 18 19import ( 20 "context" 21 "crypto/tls" 22 "errors" 23 "flag" 24 "fmt" 25 "html/template" 26 "io" 27 "log" 28 "net" 29 "net/http" 30 "net/http/httputil" 31 "net/url" 32 "os" 33 "os/signal" 34 "path/filepath" 35 "strconv" 36 "strings" 37 "time" 38 39 "github.com/opentracing/opentracing-go" 40 "github.com/prometheus/client_golang/prometheus" 41 "github.com/prometheus/client_golang/prometheus/promauto" 42 "github.com/shirou/gopsutil/v3/disk" 43 sglog "github.com/sourcegraph/log" 44 "github.com/sourcegraph/mountinfo" 45 "github.com/uber/jaeger-client-go" 46 oteltrace "go.opentelemetry.io/otel/trace" 47 "go.uber.org/automaxprocs/maxprocs" 48 "golang.org/x/sys/unix" 49 "google.golang.org/grpc" 50 51 "github.com/sourcegraph/zoekt" 52 grpcserver "github.com/sourcegraph/zoekt/cmd/zoekt-webserver/grpc/server" 53 "github.com/sourcegraph/zoekt/grpc/defaults" 54 "github.com/sourcegraph/zoekt/grpc/grpcutil" 55 webserverv1 "github.com/sourcegraph/zoekt/grpc/protos/zoekt/webserver/v1" 56 "github.com/sourcegraph/zoekt/index" 57 "github.com/sourcegraph/zoekt/internal/debugserver" 58 "github.com/sourcegraph/zoekt/internal/profiler" 59 "github.com/sourcegraph/zoekt/internal/trace" 60 "github.com/sourcegraph/zoekt/internal/tracer" 61 "github.com/sourcegraph/zoekt/query" 62 "github.com/sourcegraph/zoekt/search" 63 "github.com/sourcegraph/zoekt/web" 64) 65 66const logFormat = "2006-01-02T15-04-05.999999999Z07" 67 68func divertLogs(dir string, interval time.Duration) { 69 t := time.NewTicker(interval) 70 var last *os.File 71 for { 72 nm := filepath.Join(dir, fmt.Sprintf("zoekt-webserver.%s.%d.log", time.Now().Format(logFormat), os.Getpid())) 73 fmt.Fprintf(os.Stderr, "writing logs to %s\n", nm) 74 75 f, err := os.Create(nm) 76 if err != nil { 77 // There is not much we can do now. 78 fmt.Fprintf(os.Stderr, "can't create output file %s: %v\n", nm, err) 79 os.Exit(2) 80 } 81 82 log.SetOutput(f) 83 last.Close() 84 85 last = f 86 87 <-t.C 88 } 89} 90 91const templateExtension = ".html.tpl" 92 93func loadTemplates(tpl *template.Template, dir string) error { 94 fs, err := filepath.Glob(dir + "/*" + templateExtension) 95 if err != nil { 96 log.Fatalf("Glob: %v", err) 97 } 98 99 log.Printf("loading templates: %v", fs) 100 for _, fn := range fs { 101 content, err := os.ReadFile(fn) 102 if err != nil { 103 return err 104 } 105 106 base := filepath.Base(fn) 107 base = strings.TrimSuffix(base, templateExtension) 108 if _, err := tpl.New(base).Parse(string(content)); err != nil { 109 return fmt.Errorf("template.Parse(%s): %v", fn, err) 110 } 111 } 112 return nil 113} 114 115func writeTemplates(dir string) error { 116 if dir == "" { 117 return fmt.Errorf("must set --template_dir") 118 } 119 120 for k, v := range web.TemplateText { 121 nm := filepath.Join(dir, k+templateExtension) 122 if err := os.WriteFile(nm, []byte(v), 0o644); err != nil { 123 return err 124 } 125 } 126 return nil 127} 128 129func main() { 130 logDir := flag.String("log_dir", "", "log to this directory rather than stderr.") 131 logRefresh := flag.Duration("log_refresh", 24*time.Hour, "if using --log_dir, start writing a new file this often.") 132 133 listen := flag.String("listen", ":6070", "listen on this address.") 134 indexDir := flag.String("index", index.DefaultDir, "set index directory to use") 135 html := flag.Bool("html", true, "enable HTML interface") 136 enableRPC := flag.Bool("rpc", false, "enable go/net RPC") 137 enableIndexserverProxy := flag.Bool("indexserver_proxy", false, "proxy requests with URLs matching the path /indexserver/ to <index>/indexserver.sock") 138 print := flag.Bool("print", false, "enable local result URLs") 139 enablePprof := flag.Bool("pprof", false, "set to enable remote profiling.") 140 sslCert := flag.String("ssl_cert", "", "set path to SSL .pem holding certificate.") 141 sslKey := flag.String("ssl_key", "", "set path to SSL .pem holding key.") 142 hostCustomization := flag.String( 143 "host_customization", "", 144 "specify host customization, as HOST1=QUERY,HOST2=QUERY") 145 146 templateDir := flag.String("template_dir", "", "set directory from which to load custom .html.tpl template files") 147 dumpTemplates := flag.Bool("dump_templates", false, "dump templates into --template_dir and exit.") 148 version := flag.Bool("version", false, "Print version number") 149 150 flag.Parse() 151 152 if *version { 153 fmt.Printf("zoekt-webserver version %q\n", index.Version) 154 os.Exit(0) 155 } 156 157 if *dumpTemplates { 158 if err := writeTemplates(*templateDir); err != nil { 159 log.Fatal(err) 160 } 161 os.Exit(0) 162 } 163 164 resource := sglog.Resource{ 165 Name: "zoekt-webserver", 166 Version: index.Version, 167 InstanceID: index.HostnameBestEffort(), 168 } 169 170 liblog := sglog.Init(resource) 171 defer liblog.Sync() 172 tracer.Init(resource) 173 profiler.Init("zoekt-webserver") 174 175 if *logDir != "" { 176 if fi, err := os.Lstat(*logDir); err != nil || !fi.IsDir() { 177 log.Fatalf("%s is not a directory", *logDir) 178 } 179 // We could do fdup acrobatics to also redirect 180 // stderr, but it is simpler and more portable for the 181 // caller to divert stderr output if necessary. 182 go divertLogs(*logDir, *logRefresh) 183 } 184 185 // Tune GOMAXPROCS to match Linux container CPU quota. 186 _, _ = maxprocs.Set() 187 188 if err := os.MkdirAll(*indexDir, 0o755); err != nil { 189 log.Fatal(err) 190 } 191 192 mustRegisterDiskMonitor(*indexDir) 193 194 metricsLogger := sglog.Scoped("metricsRegistration") 195 196 mustRegisterMemoryMapMetrics(metricsLogger) 197 198 opts := mountinfo.CollectorOpts{Namespace: "zoekt_webserver"} 199 c := mountinfo.NewCollector(metricsLogger, opts, map[string]string{"indexDir": *indexDir}) 200 201 prometheus.DefaultRegisterer.MustRegister(c) 202 203 // Do not block on loading shards so we can become partially available 204 // sooner. Otherwise on large instances zoekt can be unavailable on the 205 // order of minutes. 206 searcher, err := search.NewDirectorySearcherFast(*indexDir) 207 if err != nil { 208 log.Fatal(err) 209 } 210 211 searcher = &loggedSearcher{ 212 Streamer: searcher, 213 Logger: sglog.Scoped("searcher"), 214 } 215 216 s := &web.Server{ 217 Searcher: searcher, 218 Top: web.Top, 219 Version: index.Version, 220 } 221 222 if *templateDir != "" { 223 if err := loadTemplates(s.Top, *templateDir); err != nil { 224 log.Fatalf("loadTemplates: %v", err) 225 } 226 } 227 228 s.Print = *print 229 s.HTML = *html 230 s.RPC = *enableRPC 231 232 if *hostCustomization != "" { 233 s.HostCustomQueries = map[string]string{} 234 for _, h := range strings.SplitN(*hostCustomization, ",", -1) { 235 if len(h) == 0 { 236 continue 237 } 238 fields := strings.SplitN(h, "=", 2) 239 if len(fields) < 2 { 240 log.Fatalf("invalid host_customization %q", h) 241 } 242 243 s.HostCustomQueries[fields[0]] = fields[1] 244 } 245 } 246 247 serveMux, err := web.NewMux(s) 248 if err != nil { 249 log.Fatal(err) 250 } 251 252 debugserver.AddHandlers(serveMux, *enablePprof) 253 254 if *enableIndexserverProxy { 255 socket := filepath.Join(*indexDir, "indexserver.sock") 256 sglog.Scoped("server").Info("adding reverse proxy", sglog.String("socket", socket)) 257 addProxyHandler(serveMux, socket) 258 } 259 260 handler := trace.Middleware(serveMux) 261 262 // Sourcegraph: We use environment variables to configure watchdog since 263 // they are more convenient than flags in containerized environments. 264 watchdogTick := 30 * time.Second 265 if v := os.Getenv("ZOEKT_WATCHDOG_TICK"); v != "" { 266 watchdogTick, _ = time.ParseDuration(v) 267 log.Printf("custom ZOEKT_WATCHDOG_TICK=%v", watchdogTick) 268 } 269 270 watchdogErrCount := 3 271 if v := os.Getenv("ZOEKT_WATCHDOG_ERRORS"); v != "" { 272 watchdogErrCount, _ = strconv.Atoi(v) 273 log.Printf("custom ZOEKT_WATCHDOG_ERRORS=%d", watchdogErrCount) 274 } 275 276 watchdogAddr := "http://" + *listen 277 if *sslCert != "" || *sslKey != "" { 278 watchdogAddr = "https://" + *listen 279 } 280 watchdogAddr += "/healthz" 281 282 if watchdogErrCount > 0 && watchdogTick > 0 { 283 go watchdog(watchdogTick, watchdogErrCount, watchdogAddr) 284 } else { 285 log.Println("watchdog disabled") 286 } 287 288 logger := sglog.Scoped("ZoektWebserverGRPCServer") 289 290 streamer := web.NewTraceAwareSearcher(s.Searcher) 291 grpcServer := newGRPCServer(logger, streamer) 292 293 handler = grpcutil.MultiplexGRPC(grpcServer, handler) 294 295 srv := &http.Server{ 296 Addr: *listen, 297 Handler: handler, 298 } 299 300 go func() { 301 sglog.Scoped("server").Info("starting server", sglog.Stringp("address", listen)) 302 var err error 303 if *sslCert != "" || *sslKey != "" { 304 err = srv.ListenAndServeTLS(*sslCert, *sslKey) 305 } else { 306 err = srv.ListenAndServe() 307 } 308 309 if err != http.ErrServerClosed { 310 // Fatal otherwise shutdownOnSignal will block 311 log.Fatalf("ListenAndServe: %v", err) 312 } 313 }() 314 315 if s.RPC { 316 // Our RPC system does not support shutdown and hijacks the underlying 317 // http connection. This means shutdown is ineffective and just waits 10s 318 // before calling close. Lets just quit faster in that case. 319 if err := closeOnSignal(srv); err != nil { 320 log.Fatalf("http.Server.Close: %v", err) 321 } 322 } else { 323 if err := shutdownOnSignal(srv); err != nil { 324 log.Fatalf("http.Server.Shutdown: %v", err) 325 } 326 } 327} 328 329// addProxyHandler adds a handler to "mux" that proxies all requests with base 330// /indexserver to "socket". 331func addProxyHandler(mux *http.ServeMux, socket string) { 332 proxy := httputil.NewSingleHostReverseProxy(&url.URL{ 333 Scheme: "http", 334 // The value of "Host" is arbitrary, because it is ignored by the 335 // DialContext we use for the socket connection. 336 Host: "socket", 337 }) 338 proxy.Transport = &http.Transport{ 339 DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { 340 var d net.Dialer 341 return d.DialContext(ctx, "unix", socket) 342 }, 343 } 344 mux.Handle("/indexserver/", http.StripPrefix("/indexserver/", http.HandlerFunc(proxy.ServeHTTP))) 345} 346 347// shutdownSignalChan returns a channel which is listening for shutdown 348// signals from the operating system. maxReads is an upper bound on how many 349// times you will read the channel (used as buffer for signal.Notify). 350func shutdownSignalChan(maxReads int) <-chan os.Signal { 351 c := make(chan os.Signal, maxReads) 352 signal.Notify(c, os.Interrupt) // terminal C-c and goreman 353 signal.Notify(c, unix.SIGTERM) // Kubernetes 354 return c 355} 356 357// closeOnSignal will listen for SIGINT or SIGTERM and call srv.Close. This is 358// not a graceful shutdown, see shutdownOnSignal. 359func closeOnSignal(srv *http.Server) error { 360 c := shutdownSignalChan(1) 361 <-c 362 363 return srv.Close() 364} 365 366// shutdownOnSignal will listen for SIGINT or SIGTERM and call srv.Shutdown. 367// Note it doesn't call anything else for shutting down. Notably our RPC 368// framework doesn't allow us to drain connections, so when Shutdown we will 369// wait 10s before closing. 370// 371// Note: the call site for shutdownOnSignal should use closeOnSignal instead 372// if rpc mode is enabled due to the above limitation. 373func shutdownOnSignal(srv *http.Server) error { 374 c := shutdownSignalChan(2) 375 <-c 376 377 // If we receive another signal, immediate shutdown 378 ctx, cancel := context.WithCancel(context.Background()) 379 defer cancel() 380 go func() { 381 select { 382 case <-ctx.Done(): 383 case sig := <-c: 384 log.Printf("received another signal (%v), immediate shutdown", sig) 385 cancel() 386 } 387 }() 388 389 // Wait for 10s to drain ongoing requests. Kubernetes gives us 30s to 390 // shutdown, we have already used 15s waiting for our endpoint removal to 391 // propagate. 392 ctx, cancel2 := context.WithTimeout(ctx, 10*time.Second) 393 defer cancel2() 394 395 log.Printf("shutting down") 396 return srv.Shutdown(ctx) 397} 398 399func watchdogOnce(ctx context.Context, client *http.Client, addr string) error { 400 defer metricWatchdogTotal.Inc() 401 402 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(30*time.Second)) 403 defer cancel() 404 405 req, err := http.NewRequest("GET", addr, nil) 406 if err != nil { 407 return err 408 } 409 410 req = req.WithContext(ctx) 411 412 resp, err := client.Do(req) 413 if err != nil { 414 return err 415 } 416 body, _ := io.ReadAll(resp.Body) 417 _ = resp.Body.Close() 418 419 if resp.StatusCode != http.StatusOK { 420 return fmt.Errorf("watchdog: status=%v body=%q", resp.StatusCode, string(body)) 421 } 422 return nil 423} 424 425func watchdog(dt time.Duration, maxErrCount int, addr string) { 426 tr := &http.Transport{ 427 TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, 428 } 429 client := &http.Client{ 430 Transport: tr, 431 } 432 tick := time.NewTicker(dt) 433 434 errCount := 0 435 for range tick.C { 436 err := watchdogOnce(context.Background(), client, addr) 437 if err != nil { 438 errCount++ 439 metricWatchdogErrors.Set(float64(errCount)) 440 metricWatchdogErrorsTotal.Inc() 441 if errCount >= maxErrCount { 442 log.Printf(`watchdog health check has consecutively failed %d times indicating is likely an unrecoverable error affecting zoekt. As such this process will exit with code 3. 443 444Final error: %v 445 446Possible remediations: 447- If this rarely happens, ignore and let your process manager restart zoekt. 448- Possibly under provisioned. Try increasing CPU or disk IO. 449- A bug. Reach out with logs and screenshots of metrics when this occurs.`, errCount, err) 450 os.Exit(3) 451 } else { 452 log.Printf("watchdog: failed, will try %d more times: %v", maxErrCount-errCount, err) 453 } 454 } else if errCount > 0 { 455 errCount = 0 456 metricWatchdogErrors.Set(float64(errCount)) 457 log.Printf("watchdog: success, resetting error count") 458 } 459 } 460} 461 462func mustRegisterDiskMonitor(path string) { 463 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ 464 Name: "src_disk_space_available_bytes", 465 Help: "Amount of free space disk space.", 466 ConstLabels: prometheus.Labels{"path": path}, 467 }, func() float64 { 468 usage, _ := disk.Usage(path) 469 return float64(usage.Free) 470 })) 471 472 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ 473 Name: "src_disk_space_total_bytes", 474 Help: "Amount of total disk space.", 475 ConstLabels: prometheus.Labels{"path": path}, 476 }, func() float64 { 477 usage, _ := disk.Usage(path) 478 return float64(usage.Total) 479 })) 480} 481 482type loggedSearcher struct { 483 zoekt.Streamer 484 Logger sglog.Logger 485} 486 487func (s *loggedSearcher) Search( 488 ctx context.Context, 489 q query.Q, 490 opts *zoekt.SearchOptions, 491) (sr *zoekt.SearchResult, err error) { 492 defer func() { 493 var stats *zoekt.Stats 494 if sr != nil { 495 stats = &sr.Stats 496 } 497 s.log(ctx, q, opts, stats, err) 498 }() 499 500 metricSearchRequestsTotal.Inc() 501 return s.Streamer.Search(ctx, q, opts) 502} 503 504func (s *loggedSearcher) StreamSearch( 505 ctx context.Context, 506 q query.Q, 507 opts *zoekt.SearchOptions, 508 sender zoekt.Sender, 509) error { 510 var stats zoekt.Stats 511 512 metricSearchRequestsTotal.Inc() 513 err := s.Streamer.StreamSearch(ctx, q, opts, zoekt.SenderFunc(func(event *zoekt.SearchResult) { 514 stats.Add(event.Stats) 515 sender.Send(event) 516 })) 517 518 s.log(ctx, q, opts, &stats, err) 519 520 return err 521} 522 523func (s *loggedSearcher) log(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, st *zoekt.Stats, err error) { 524 logger := s.Logger. 525 WithTrace(traceContext(ctx)). 526 With( 527 sglog.String("query", q.String()), 528 sglog.Bool("opts.EstimateDocCount", opts.EstimateDocCount), 529 sglog.Bool("opts.Whole", opts.Whole), 530 sglog.Int("opts.ShardMaxMatchCount", opts.ShardMaxMatchCount), 531 sglog.Int("opts.TotalMaxMatchCount", opts.TotalMaxMatchCount), 532 sglog.Duration("opts.MaxWallTime", opts.MaxWallTime), 533 sglog.Int("opts.MaxDocDisplayCount", opts.MaxDocDisplayCount), 534 sglog.Int("opts.MaxMatchDisplayCount", opts.MaxMatchDisplayCount), 535 ) 536 537 if err != nil { 538 switch { 539 case errors.Is(err, context.Canceled): 540 logger.Warn("search canceled", sglog.Error(err)) 541 case errors.Is(err, context.DeadlineExceeded): 542 logger.Warn("search timeout", sglog.Error(err)) 543 default: 544 logger.Error("search failed", sglog.Error(err)) 545 } 546 return 547 } 548 549 if st == nil { 550 return 551 } 552 553 logger.Debug("search", 554 sglog.Int64("stat.ContentBytesLoaded", st.ContentBytesLoaded), 555 sglog.Int64("stat.IndexBytesLoaded", st.IndexBytesLoaded), 556 sglog.Int("stat.Crashes", st.Crashes), 557 sglog.Duration("stat.Duration", st.Duration), 558 sglog.Int("stat.FileCount", st.FileCount), 559 sglog.Int("stat.ShardFilesConsidered", st.ShardFilesConsidered), 560 sglog.Int("stat.FilesConsidered", st.FilesConsidered), 561 sglog.Int("stat.FilesLoaded", st.FilesLoaded), 562 sglog.Int("stat.FilesSkipped", st.FilesSkipped), 563 sglog.Int("stat.ShardsScanned", st.ShardsScanned), 564 sglog.Int("stat.ShardsSkipped", st.ShardsSkipped), 565 sglog.Int("stat.ShardsSkippedFilter", st.ShardsSkippedFilter), 566 sglog.Int("stat.MatchCount", st.MatchCount), 567 sglog.Int("stat.NgramMatches", st.NgramMatches), 568 sglog.Int("stat.NgramLookups", st.NgramLookups), 569 sglog.Duration("stat.Wait", st.Wait), 570 sglog.Duration("stat.MatchTreeConstruction", st.MatchTreeConstruction), 571 sglog.Duration("stat.MatchTreeSearch", st.MatchTreeSearch), 572 sglog.Int("stat.RegexpsConsidered", st.RegexpsConsidered), 573 sglog.String("stat.FlushReason", st.FlushReason.String()), 574 ) 575} 576 577func traceContext(ctx context.Context) sglog.TraceContext { 578 otSpan := opentracing.SpanFromContext(ctx) 579 if otSpan != nil { 580 if jaegerSpan, ok := otSpan.Context().(jaeger.SpanContext); ok { 581 return sglog.TraceContext{ 582 TraceID: jaegerSpan.TraceID().String(), 583 SpanID: jaegerSpan.SpanID().String(), 584 } 585 } 586 } 587 588 if otelSpan := oteltrace.SpanFromContext(ctx).SpanContext(); otelSpan.IsValid() { 589 return sglog.TraceContext{ 590 TraceID: otelSpan.TraceID().String(), 591 SpanID: otelSpan.SpanID().String(), 592 } 593 } 594 595 return sglog.TraceContext{} 596} 597 598func newGRPCServer(logger sglog.Logger, streamer zoekt.Streamer, additionalOpts ...grpc.ServerOption) *grpc.Server { 599 s := defaults.NewServer(logger, additionalOpts...) 600 webserverv1.RegisterWebserverServiceServer(s, grpcserver.NewServer(streamer)) 601 602 return s 603} 604 605var ( 606 metricWatchdogErrors = promauto.NewGauge(prometheus.GaugeOpts{ 607 Name: "zoekt_webserver_watchdog_errors", 608 Help: "The current error count for zoekt watchdog.", 609 }) 610 metricWatchdogTotal = promauto.NewCounter(prometheus.CounterOpts{ 611 Name: "zoekt_webserver_watchdog_total", 612 Help: "The total number of requests done by zoekt watchdog.", 613 }) 614 metricWatchdogErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{ 615 Name: "zoekt_webserver_watchdog_errors_total", 616 Help: "The total number of errors from zoekt watchdog.", 617 }) 618 metricSearchRequestsTotal = promauto.NewCounter(prometheus.CounterOpts{ 619 Name: "zoekt_search_requests_total", 620 Help: "The total number of search requests that zoekt received", 621 }) 622)