fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Command zoekt-webserver starts a server that responds to search queries, using
16// an index generated by another program such as zoekt-indexserver.
17package main
18
19import (
20 "context"
21 "crypto/tls"
22 "errors"
23 "flag"
24 "fmt"
25 "html/template"
26 "io"
27 "log"
28 "net"
29 "net/http"
30 "net/http/httputil"
31 "net/url"
32 "os"
33 "os/signal"
34 "path/filepath"
35 "strconv"
36 "strings"
37 "time"
38
39 "github.com/opentracing/opentracing-go"
40 "github.com/prometheus/client_golang/prometheus"
41 "github.com/prometheus/client_golang/prometheus/promauto"
42 "github.com/shirou/gopsutil/v3/disk"
43 sglog "github.com/sourcegraph/log"
44 "github.com/sourcegraph/mountinfo"
45 "github.com/uber/jaeger-client-go"
46 oteltrace "go.opentelemetry.io/otel/trace"
47 "go.uber.org/automaxprocs/maxprocs"
48 "golang.org/x/sys/unix"
49 "google.golang.org/grpc"
50
51 "github.com/sourcegraph/zoekt"
52 grpcserver "github.com/sourcegraph/zoekt/cmd/zoekt-webserver/grpc/server"
53 "github.com/sourcegraph/zoekt/grpc/defaults"
54 "github.com/sourcegraph/zoekt/grpc/grpcutil"
55 webserverv1 "github.com/sourcegraph/zoekt/grpc/protos/zoekt/webserver/v1"
56 "github.com/sourcegraph/zoekt/index"
57 "github.com/sourcegraph/zoekt/internal/debugserver"
58 "github.com/sourcegraph/zoekt/internal/profiler"
59 "github.com/sourcegraph/zoekt/internal/trace"
60 "github.com/sourcegraph/zoekt/internal/tracer"
61 "github.com/sourcegraph/zoekt/query"
62 "github.com/sourcegraph/zoekt/search"
63 "github.com/sourcegraph/zoekt/web"
64)
65
66const logFormat = "2006-01-02T15-04-05.999999999Z07"
67
68func divertLogs(dir string, interval time.Duration) {
69 t := time.NewTicker(interval)
70 var last *os.File
71 for {
72 nm := filepath.Join(dir, fmt.Sprintf("zoekt-webserver.%s.%d.log", time.Now().Format(logFormat), os.Getpid()))
73 fmt.Fprintf(os.Stderr, "writing logs to %s\n", nm)
74
75 f, err := os.Create(nm)
76 if err != nil {
77 // There is not much we can do now.
78 fmt.Fprintf(os.Stderr, "can't create output file %s: %v\n", nm, err)
79 os.Exit(2)
80 }
81
82 log.SetOutput(f)
83 last.Close()
84
85 last = f
86
87 <-t.C
88 }
89}
90
91const templateExtension = ".html.tpl"
92
93func loadTemplates(tpl *template.Template, dir string) error {
94 fs, err := filepath.Glob(dir + "/*" + templateExtension)
95 if err != nil {
96 log.Fatalf("Glob: %v", err)
97 }
98
99 log.Printf("loading templates: %v", fs)
100 for _, fn := range fs {
101 content, err := os.ReadFile(fn)
102 if err != nil {
103 return err
104 }
105
106 base := filepath.Base(fn)
107 base = strings.TrimSuffix(base, templateExtension)
108 if _, err := tpl.New(base).Parse(string(content)); err != nil {
109 return fmt.Errorf("template.Parse(%s): %v", fn, err)
110 }
111 }
112 return nil
113}
114
115func writeTemplates(dir string) error {
116 if dir == "" {
117 return fmt.Errorf("must set --template_dir")
118 }
119
120 for k, v := range web.TemplateText {
121 nm := filepath.Join(dir, k+templateExtension)
122 if err := os.WriteFile(nm, []byte(v), 0o644); err != nil {
123 return err
124 }
125 }
126 return nil
127}
128
129func main() {
130 logDir := flag.String("log_dir", "", "log to this directory rather than stderr.")
131 logRefresh := flag.Duration("log_refresh", 24*time.Hour, "if using --log_dir, start writing a new file this often.")
132
133 listen := flag.String("listen", ":6070", "listen on this address.")
134 indexDir := flag.String("index", index.DefaultDir, "set index directory to use")
135 html := flag.Bool("html", true, "enable HTML interface")
136 enableRPC := flag.Bool("rpc", false, "enable go/net RPC")
137 enableIndexserverProxy := flag.Bool("indexserver_proxy", false, "proxy requests with URLs matching the path /indexserver/ to <index>/indexserver.sock")
138 print := flag.Bool("print", false, "enable local result URLs")
139 enablePprof := flag.Bool("pprof", false, "set to enable remote profiling.")
140 sslCert := flag.String("ssl_cert", "", "set path to SSL .pem holding certificate.")
141 sslKey := flag.String("ssl_key", "", "set path to SSL .pem holding key.")
142 hostCustomization := flag.String(
143 "host_customization", "",
144 "specify host customization, as HOST1=QUERY,HOST2=QUERY")
145
146 templateDir := flag.String("template_dir", "", "set directory from which to load custom .html.tpl template files")
147 dumpTemplates := flag.Bool("dump_templates", false, "dump templates into --template_dir and exit.")
148 corsOrigin := flag.String("cors_origin", "", "allow requests from this origin. If empty, no CORS headers are set.")
149 version := flag.Bool("version", false, "Print version number")
150
151 flag.Parse()
152
153 if *version {
154 fmt.Printf("zoekt-webserver version %q\n", index.Version)
155 os.Exit(0)
156 }
157
158 if *dumpTemplates {
159 if err := writeTemplates(*templateDir); err != nil {
160 log.Fatal(err)
161 }
162 os.Exit(0)
163 }
164
165 resource := sglog.Resource{
166 Name: "zoekt-webserver",
167 Version: index.Version,
168 InstanceID: index.HostnameBestEffort(),
169 }
170
171 liblog := sglog.Init(resource)
172 defer liblog.Sync()
173 tracer.Init(resource)
174 profiler.Init("zoekt-webserver")
175
176 if *logDir != "" {
177 if fi, err := os.Lstat(*logDir); err != nil || !fi.IsDir() {
178 log.Fatalf("%s is not a directory", *logDir)
179 }
180 // We could do fdup acrobatics to also redirect
181 // stderr, but it is simpler and more portable for the
182 // caller to divert stderr output if necessary.
183 go divertLogs(*logDir, *logRefresh)
184 }
185
186 // Tune GOMAXPROCS to match Linux container CPU quota.
187 _, _ = maxprocs.Set()
188
189 if err := os.MkdirAll(*indexDir, 0o755); err != nil {
190 log.Fatal(err)
191 }
192
193 mustRegisterDiskMonitor(*indexDir)
194
195 metricsLogger := sglog.Scoped("metricsRegistration")
196
197 mustRegisterMemoryMapMetrics(metricsLogger)
198
199 opts := mountinfo.CollectorOpts{Namespace: "zoekt_webserver"}
200 c := mountinfo.NewCollector(metricsLogger, opts, map[string]string{"indexDir": *indexDir})
201
202 prometheus.DefaultRegisterer.MustRegister(c)
203
204 // Do not block on loading shards so we can become partially available
205 // sooner. Otherwise on large instances zoekt can be unavailable on the
206 // order of minutes.
207 searcher, err := search.NewDirectorySearcherFast(*indexDir)
208 if err != nil {
209 log.Fatal(err)
210 }
211
212 searcher = &loggedSearcher{
213 Streamer: searcher,
214 Logger: sglog.Scoped("searcher"),
215 }
216
217 s := &web.Server{
218 Searcher: searcher,
219 Top: web.Top,
220 Version: index.Version,
221 }
222
223 if *templateDir != "" {
224 if err := loadTemplates(s.Top, *templateDir); err != nil {
225 log.Fatalf("loadTemplates: %v", err)
226 }
227 }
228
229 s.Print = *print
230 s.HTML = *html
231 s.RPC = *enableRPC
232
233 if *hostCustomization != "" {
234 s.HostCustomQueries = map[string]string{}
235 for _, h := range strings.SplitN(*hostCustomization, ",", -1) {
236 if len(h) == 0 {
237 continue
238 }
239 fields := strings.SplitN(h, "=", 2)
240 if len(fields) < 2 {
241 log.Fatalf("invalid host_customization %q", h)
242 }
243
244 s.HostCustomQueries[fields[0]] = fields[1]
245 }
246 }
247
248 serveMux, err := web.NewMux(s)
249 if err != nil {
250 log.Fatal(err)
251 }
252
253 debugserver.AddHandlers(serveMux, *enablePprof)
254
255 if *enableIndexserverProxy {
256 socket := filepath.Join(*indexDir, "indexserver.sock")
257 sglog.Scoped("server").Info("adding reverse proxy", sglog.String("socket", socket))
258 addProxyHandler(serveMux, socket)
259 }
260
261 handler := trace.Middleware(serveMux)
262
263 // Sourcegraph: We use environment variables to configure watchdog since
264 // they are more convenient than flags in containerized environments.
265 watchdogTick := 30 * time.Second
266 if v := os.Getenv("ZOEKT_WATCHDOG_TICK"); v != "" {
267 watchdogTick, _ = time.ParseDuration(v)
268 log.Printf("custom ZOEKT_WATCHDOG_TICK=%v", watchdogTick)
269 }
270
271 watchdogErrCount := 3
272 if v := os.Getenv("ZOEKT_WATCHDOG_ERRORS"); v != "" {
273 watchdogErrCount, _ = strconv.Atoi(v)
274 log.Printf("custom ZOEKT_WATCHDOG_ERRORS=%d", watchdogErrCount)
275 }
276
277 watchdogAddr := "http://" + *listen
278 if *sslCert != "" || *sslKey != "" {
279 watchdogAddr = "https://" + *listen
280 }
281 watchdogAddr += "/healthz"
282
283 if watchdogErrCount > 0 && watchdogTick > 0 {
284 go watchdog(watchdogTick, watchdogErrCount, watchdogAddr)
285 } else {
286 log.Println("watchdog disabled")
287 }
288
289 logger := sglog.Scoped("ZoektWebserverGRPCServer")
290
291 streamer := web.NewTraceAwareSearcher(s.Searcher)
292 grpcServer := newGRPCServer(logger, streamer)
293
294 handler = grpcutil.MultiplexGRPC(grpcServer, handler)
295 handler = corsHandler(handler, *corsOrigin)
296
297 srv := &http.Server{
298 Addr: *listen,
299 Handler: handler,
300 }
301
302 go func() {
303 sglog.Scoped("server").Info("starting server", sglog.Stringp("address", listen))
304 var err error
305 if *sslCert != "" || *sslKey != "" {
306 err = srv.ListenAndServeTLS(*sslCert, *sslKey)
307 } else {
308 err = srv.ListenAndServe()
309 }
310
311 if err != http.ErrServerClosed {
312 // Fatal otherwise shutdownOnSignal will block
313 log.Fatalf("ListenAndServe: %v", err)
314 }
315 }()
316
317 if s.RPC {
318 // Our RPC system does not support shutdown and hijacks the underlying
319 // http connection. This means shutdown is ineffective and just waits 10s
320 // before calling close. Lets just quit faster in that case.
321 if err := closeOnSignal(srv); err != nil {
322 log.Fatalf("http.Server.Close: %v", err)
323 }
324 } else {
325 if err := shutdownOnSignal(srv); err != nil {
326 log.Fatalf("http.Server.Shutdown: %v", err)
327 }
328 }
329}
330
331// addProxyHandler adds a handler to "mux" that proxies all requests with base
332// /indexserver to "socket".
333func addProxyHandler(mux *http.ServeMux, socket string) {
334 proxy := httputil.NewSingleHostReverseProxy(&url.URL{
335 Scheme: "http",
336 // The value of "Host" is arbitrary, because it is ignored by the
337 // DialContext we use for the socket connection.
338 Host: "socket",
339 })
340 proxy.Transport = &http.Transport{
341 DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
342 var d net.Dialer
343 return d.DialContext(ctx, "unix", socket)
344 },
345 }
346 mux.Handle("/indexserver/", http.StripPrefix("/indexserver/", http.HandlerFunc(proxy.ServeHTTP)))
347}
348
349// shutdownSignalChan returns a channel which is listening for shutdown
350// signals from the operating system. maxReads is an upper bound on how many
351// times you will read the channel (used as buffer for signal.Notify).
352func shutdownSignalChan(maxReads int) <-chan os.Signal {
353 c := make(chan os.Signal, maxReads)
354 signal.Notify(c, os.Interrupt) // terminal C-c and goreman
355 signal.Notify(c, unix.SIGTERM) // Kubernetes
356 return c
357}
358
359// closeOnSignal will listen for SIGINT or SIGTERM and call srv.Close. This is
360// not a graceful shutdown, see shutdownOnSignal.
361func closeOnSignal(srv *http.Server) error {
362 c := shutdownSignalChan(1)
363 <-c
364
365 return srv.Close()
366}
367
368// shutdownOnSignal will listen for SIGINT or SIGTERM and call srv.Shutdown.
369// Note it doesn't call anything else for shutting down. Notably our RPC
370// framework doesn't allow us to drain connections, so when Shutdown we will
371// wait 10s before closing.
372//
373// Note: the call site for shutdownOnSignal should use closeOnSignal instead
374// if rpc mode is enabled due to the above limitation.
375func shutdownOnSignal(srv *http.Server) error {
376 c := shutdownSignalChan(2)
377 <-c
378
379 // If we receive another signal, immediate shutdown
380 ctx, cancel := context.WithCancel(context.Background())
381 defer cancel()
382 go func() {
383 select {
384 case <-ctx.Done():
385 case sig := <-c:
386 log.Printf("received another signal (%v), immediate shutdown", sig)
387 cancel()
388 }
389 }()
390
391 // Wait for 10s to drain ongoing requests. Kubernetes gives us 30s to
392 // shutdown, we have already used 15s waiting for our endpoint removal to
393 // propagate.
394 ctx, cancel2 := context.WithTimeout(ctx, 10*time.Second)
395 defer cancel2()
396
397 log.Printf("shutting down")
398 return srv.Shutdown(ctx)
399}
400
401func watchdogOnce(ctx context.Context, client *http.Client, addr string) error {
402 defer metricWatchdogTotal.Inc()
403
404 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(30*time.Second))
405 defer cancel()
406
407 req, err := http.NewRequest("GET", addr, nil)
408 if err != nil {
409 return err
410 }
411
412 req = req.WithContext(ctx)
413
414 resp, err := client.Do(req)
415 if err != nil {
416 return err
417 }
418 body, _ := io.ReadAll(resp.Body)
419 _ = resp.Body.Close()
420
421 if resp.StatusCode != http.StatusOK {
422 return fmt.Errorf("watchdog: status=%v body=%q", resp.StatusCode, string(body))
423 }
424 return nil
425}
426
427func watchdog(dt time.Duration, maxErrCount int, addr string) {
428 tr := &http.Transport{
429 TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
430 }
431 client := &http.Client{
432 Transport: tr,
433 }
434 tick := time.NewTicker(dt)
435
436 errCount := 0
437 for range tick.C {
438 err := watchdogOnce(context.Background(), client, addr)
439 if err != nil {
440 errCount++
441 metricWatchdogErrors.Set(float64(errCount))
442 metricWatchdogErrorsTotal.Inc()
443 if errCount >= maxErrCount {
444 log.Printf(`watchdog health check has consecutively failed %d times indicating is likely an unrecoverable error affecting zoekt. As such this process will exit with code 3.
445
446Final error: %v
447
448Possible remediations:
449- If this rarely happens, ignore and let your process manager restart zoekt.
450- Possibly under provisioned. Try increasing CPU or disk IO.
451- A bug. Reach out with logs and screenshots of metrics when this occurs.`, errCount, err)
452 os.Exit(3)
453 } else {
454 log.Printf("watchdog: failed, will try %d more times: %v", maxErrCount-errCount, err)
455 }
456 } else if errCount > 0 {
457 errCount = 0
458 metricWatchdogErrors.Set(float64(errCount))
459 log.Printf("watchdog: success, resetting error count")
460 }
461 }
462}
463
464func mustRegisterDiskMonitor(path string) {
465 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
466 Name: "src_disk_space_available_bytes",
467 Help: "Amount of free space disk space.",
468 ConstLabels: prometheus.Labels{"path": path},
469 }, func() float64 {
470 usage, _ := disk.Usage(path)
471 return float64(usage.Free)
472 }))
473
474 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
475 Name: "src_disk_space_total_bytes",
476 Help: "Amount of total disk space.",
477 ConstLabels: prometheus.Labels{"path": path},
478 }, func() float64 {
479 usage, _ := disk.Usage(path)
480 return float64(usage.Total)
481 }))
482}
483
484type loggedSearcher struct {
485 zoekt.Streamer
486 Logger sglog.Logger
487}
488
489func (s *loggedSearcher) Search(
490 ctx context.Context,
491 q query.Q,
492 opts *zoekt.SearchOptions,
493) (sr *zoekt.SearchResult, err error) {
494 defer func() {
495 var stats *zoekt.Stats
496 if sr != nil {
497 stats = &sr.Stats
498 }
499 s.log(ctx, q, opts, stats, err)
500 }()
501
502 metricSearchRequestsTotal.Inc()
503 return s.Streamer.Search(ctx, q, opts)
504}
505
506func (s *loggedSearcher) StreamSearch(
507 ctx context.Context,
508 q query.Q,
509 opts *zoekt.SearchOptions,
510 sender zoekt.Sender,
511) error {
512 var stats zoekt.Stats
513
514 metricSearchRequestsTotal.Inc()
515 err := s.Streamer.StreamSearch(ctx, q, opts, zoekt.SenderFunc(func(event *zoekt.SearchResult) {
516 stats.Add(event.Stats)
517 sender.Send(event)
518 }))
519
520 s.log(ctx, q, opts, &stats, err)
521
522 return err
523}
524
525func (s *loggedSearcher) log(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, st *zoekt.Stats, err error) {
526 logger := s.Logger.
527 WithTrace(traceContext(ctx)).
528 With(
529 sglog.String("query", q.String()),
530 sglog.Bool("opts.EstimateDocCount", opts.EstimateDocCount),
531 sglog.Bool("opts.Whole", opts.Whole),
532 sglog.Int("opts.ShardMaxMatchCount", opts.ShardMaxMatchCount),
533 sglog.Int("opts.TotalMaxMatchCount", opts.TotalMaxMatchCount),
534 sglog.Duration("opts.MaxWallTime", opts.MaxWallTime),
535 sglog.Int("opts.MaxDocDisplayCount", opts.MaxDocDisplayCount),
536 sglog.Int("opts.MaxMatchDisplayCount", opts.MaxMatchDisplayCount),
537 )
538
539 if err != nil {
540 switch {
541 case errors.Is(err, context.Canceled):
542 logger.Warn("search canceled", sglog.Error(err))
543 case errors.Is(err, context.DeadlineExceeded):
544 logger.Warn("search timeout", sglog.Error(err))
545 default:
546 logger.Error("search failed", sglog.Error(err))
547 }
548 return
549 }
550
551 if st == nil {
552 return
553 }
554
555 logger.Debug("search",
556 sglog.Int64("stat.ContentBytesLoaded", st.ContentBytesLoaded),
557 sglog.Int64("stat.IndexBytesLoaded", st.IndexBytesLoaded),
558 sglog.Int("stat.Crashes", st.Crashes),
559 sglog.Duration("stat.Duration", st.Duration),
560 sglog.Int("stat.FileCount", st.FileCount),
561 sglog.Int("stat.ShardFilesConsidered", st.ShardFilesConsidered),
562 sglog.Int("stat.FilesConsidered", st.FilesConsidered),
563 sglog.Int("stat.FilesLoaded", st.FilesLoaded),
564 sglog.Int("stat.FilesSkipped", st.FilesSkipped),
565 sglog.Int("stat.ShardsScanned", st.ShardsScanned),
566 sglog.Int("stat.ShardsSkipped", st.ShardsSkipped),
567 sglog.Int("stat.ShardsSkippedFilter", st.ShardsSkippedFilter),
568 sglog.Int("stat.MatchCount", st.MatchCount),
569 sglog.Int("stat.NgramMatches", st.NgramMatches),
570 sglog.Int("stat.NgramLookups", st.NgramLookups),
571 sglog.Duration("stat.Wait", st.Wait),
572 sglog.Duration("stat.MatchTreeConstruction", st.MatchTreeConstruction),
573 sglog.Duration("stat.MatchTreeSearch", st.MatchTreeSearch),
574 sglog.Int("stat.RegexpsConsidered", st.RegexpsConsidered),
575 sglog.String("stat.FlushReason", st.FlushReason.String()),
576 )
577}
578
579func traceContext(ctx context.Context) sglog.TraceContext {
580 otSpan := opentracing.SpanFromContext(ctx)
581 if otSpan != nil {
582 if jaegerSpan, ok := otSpan.Context().(jaeger.SpanContext); ok {
583 return sglog.TraceContext{
584 TraceID: jaegerSpan.TraceID().String(),
585 SpanID: jaegerSpan.SpanID().String(),
586 }
587 }
588 }
589
590 if otelSpan := oteltrace.SpanFromContext(ctx).SpanContext(); otelSpan.IsValid() {
591 return sglog.TraceContext{
592 TraceID: otelSpan.TraceID().String(),
593 SpanID: otelSpan.SpanID().String(),
594 }
595 }
596
597 return sglog.TraceContext{}
598}
599
600func newGRPCServer(logger sglog.Logger, streamer zoekt.Streamer, additionalOpts ...grpc.ServerOption) *grpc.Server {
601 s := defaults.NewServer(logger, additionalOpts...)
602 webserverv1.RegisterWebserverServiceServer(s, grpcserver.NewServer(streamer))
603
604 return s
605}
606
607func corsHandler(h http.Handler, origin string) http.Handler {
608 if origin == "" {
609 return h
610 }
611
612 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
613 if r.Header.Get("Origin") == "" {
614 h.ServeHTTP(w, r)
615 return
616 }
617
618 w.Header().Set("Access-Control-Allow-Origin", origin)
619 w.Header().Set("Access-Control-Max-Age", "86400")
620 w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
621 w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")
622 w.Header().Add("Vary", "Origin")
623
624 if r.Method == "OPTIONS" {
625 w.Header().Set("Access-Control-Max-Age", "86400")
626 w.WriteHeader(http.StatusOK)
627 return
628 }
629
630 h.ServeHTTP(w, r)
631 })
632}
633
634var (
635 metricWatchdogErrors = promauto.NewGauge(prometheus.GaugeOpts{
636 Name: "zoekt_webserver_watchdog_errors",
637 Help: "The current error count for zoekt watchdog.",
638 })
639 metricWatchdogTotal = promauto.NewCounter(prometheus.CounterOpts{
640 Name: "zoekt_webserver_watchdog_total",
641 Help: "The total number of requests done by zoekt watchdog.",
642 })
643 metricWatchdogErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{
644 Name: "zoekt_webserver_watchdog_errors_total",
645 Help: "The total number of errors from zoekt watchdog.",
646 })
647 metricSearchRequestsTotal = promauto.NewCounter(prometheus.CounterOpts{
648 Name: "zoekt_search_requests_total",
649 Help: "The total number of search requests that zoekt received",
650 })
651)