fork of https://github.com/sourcegraph/zoekt
1// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Command zoekt-webserver starts a server that responds to search queries, using
16// an index generated by another program such as zoekt-indexserver.
17package main
18
19import (
20 "context"
21 "crypto/tls"
22 "errors"
23 "flag"
24 "fmt"
25 "html/template"
26 "io"
27 "log"
28 "net"
29 "net/http"
30 "net/http/httputil"
31 "net/url"
32 "os"
33 "os/signal"
34 "path/filepath"
35 "strconv"
36 "strings"
37 "time"
38
39 "github.com/opentracing/opentracing-go"
40 "github.com/prometheus/client_golang/prometheus"
41 "github.com/prometheus/client_golang/prometheus/promauto"
42 "github.com/shirou/gopsutil/v3/disk"
43 sglog "github.com/sourcegraph/log"
44 "github.com/sourcegraph/mountinfo"
45 "github.com/uber/jaeger-client-go"
46 oteltrace "go.opentelemetry.io/otel/trace"
47 "go.uber.org/automaxprocs/maxprocs"
48 "golang.org/x/sys/unix"
49 "google.golang.org/grpc"
50
51 "github.com/sourcegraph/zoekt"
52 grpcserver "github.com/sourcegraph/zoekt/cmd/zoekt-webserver/grpc/server"
53 "github.com/sourcegraph/zoekt/grpc/defaults"
54 "github.com/sourcegraph/zoekt/grpc/grpcutil"
55 webserverv1 "github.com/sourcegraph/zoekt/grpc/protos/zoekt/webserver/v1"
56 "github.com/sourcegraph/zoekt/index"
57 "github.com/sourcegraph/zoekt/internal/debugserver"
58 "github.com/sourcegraph/zoekt/internal/profiler"
59 "github.com/sourcegraph/zoekt/internal/trace"
60 "github.com/sourcegraph/zoekt/internal/tracer"
61 "github.com/sourcegraph/zoekt/query"
62 "github.com/sourcegraph/zoekt/search"
63 "github.com/sourcegraph/zoekt/web"
64)
65
66const logFormat = "2006-01-02T15-04-05.999999999Z07"
67
68func divertLogs(dir string, interval time.Duration) {
69 t := time.NewTicker(interval)
70 var last *os.File
71 for {
72 nm := filepath.Join(dir, fmt.Sprintf("zoekt-webserver.%s.%d.log", time.Now().Format(logFormat), os.Getpid()))
73 fmt.Fprintf(os.Stderr, "writing logs to %s\n", nm)
74
75 f, err := os.Create(nm)
76 if err != nil {
77 // There is not much we can do now.
78 fmt.Fprintf(os.Stderr, "can't create output file %s: %v\n", nm, err)
79 os.Exit(2)
80 }
81
82 log.SetOutput(f)
83 last.Close()
84
85 last = f
86
87 <-t.C
88 }
89}
90
91const templateExtension = ".html.tpl"
92
93func loadTemplates(tpl *template.Template, dir string) error {
94 fs, err := filepath.Glob(dir + "/*" + templateExtension)
95 if err != nil {
96 log.Fatalf("Glob: %v", err)
97 }
98
99 log.Printf("loading templates: %v", fs)
100 for _, fn := range fs {
101 content, err := os.ReadFile(fn)
102 if err != nil {
103 return err
104 }
105
106 base := filepath.Base(fn)
107 base = strings.TrimSuffix(base, templateExtension)
108 if _, err := tpl.New(base).Parse(string(content)); err != nil {
109 return fmt.Errorf("template.Parse(%s): %v", fn, err)
110 }
111 }
112 return nil
113}
114
115func writeTemplates(dir string) error {
116 if dir == "" {
117 return fmt.Errorf("must set --template_dir")
118 }
119
120 for k, v := range web.TemplateText {
121 nm := filepath.Join(dir, k+templateExtension)
122 if err := os.WriteFile(nm, []byte(v), 0o644); err != nil {
123 return err
124 }
125 }
126 return nil
127}
128
129func main() {
130 logDir := flag.String("log_dir", "", "log to this directory rather than stderr.")
131 logRefresh := flag.Duration("log_refresh", 24*time.Hour, "if using --log_dir, start writing a new file this often.")
132
133 listen := flag.String("listen", ":6070", "listen on this address.")
134 indexDir := flag.String("index", index.DefaultDir, "set index directory to use")
135 html := flag.Bool("html", true, "enable HTML interface")
136 enableRPC := flag.Bool("rpc", false, "enable go/net RPC")
137 enableIndexserverProxy := flag.Bool("indexserver_proxy", false, "proxy requests with URLs matching the path /indexserver/ to <index>/indexserver.sock")
138 print := flag.Bool("print", false, "enable local result URLs")
139 enablePprof := flag.Bool("pprof", false, "set to enable remote profiling.")
140 sslCert := flag.String("ssl_cert", "", "set path to SSL .pem holding certificate.")
141 sslKey := flag.String("ssl_key", "", "set path to SSL .pem holding key.")
142 hostCustomization := flag.String(
143 "host_customization", "",
144 "specify host customization, as HOST1=QUERY,HOST2=QUERY")
145
146 templateDir := flag.String("template_dir", "", "set directory from which to load custom .html.tpl template files")
147 dumpTemplates := flag.Bool("dump_templates", false, "dump templates into --template_dir and exit.")
148 version := flag.Bool("version", false, "Print version number")
149
150 flag.Parse()
151
152 if *version {
153 fmt.Printf("zoekt-webserver version %q\n", index.Version)
154 os.Exit(0)
155 }
156
157 if *dumpTemplates {
158 if err := writeTemplates(*templateDir); err != nil {
159 log.Fatal(err)
160 }
161 os.Exit(0)
162 }
163
164 resource := sglog.Resource{
165 Name: "zoekt-webserver",
166 Version: index.Version,
167 InstanceID: index.HostnameBestEffort(),
168 }
169
170 liblog := sglog.Init(resource)
171 defer liblog.Sync()
172 tracer.Init(resource)
173 profiler.Init("zoekt-webserver")
174
175 if *logDir != "" {
176 if fi, err := os.Lstat(*logDir); err != nil || !fi.IsDir() {
177 log.Fatalf("%s is not a directory", *logDir)
178 }
179 // We could do fdup acrobatics to also redirect
180 // stderr, but it is simpler and more portable for the
181 // caller to divert stderr output if necessary.
182 go divertLogs(*logDir, *logRefresh)
183 }
184
185 // Tune GOMAXPROCS to match Linux container CPU quota.
186 _, _ = maxprocs.Set()
187
188 if err := os.MkdirAll(*indexDir, 0o755); err != nil {
189 log.Fatal(err)
190 }
191
192 mustRegisterDiskMonitor(*indexDir)
193
194 metricsLogger := sglog.Scoped("metricsRegistration")
195
196 mustRegisterMemoryMapMetrics(metricsLogger)
197
198 opts := mountinfo.CollectorOpts{Namespace: "zoekt_webserver"}
199 c := mountinfo.NewCollector(metricsLogger, opts, map[string]string{"indexDir": *indexDir})
200
201 prometheus.DefaultRegisterer.MustRegister(c)
202
203 // Do not block on loading shards so we can become partially available
204 // sooner. Otherwise on large instances zoekt can be unavailable on the
205 // order of minutes.
206 searcher, err := search.NewDirectorySearcherFast(*indexDir)
207 if err != nil {
208 log.Fatal(err)
209 }
210
211 searcher = &loggedSearcher{
212 Streamer: searcher,
213 Logger: sglog.Scoped("searcher"),
214 }
215
216 s := &web.Server{
217 Searcher: searcher,
218 Top: web.Top,
219 Version: index.Version,
220 }
221
222 if *templateDir != "" {
223 if err := loadTemplates(s.Top, *templateDir); err != nil {
224 log.Fatalf("loadTemplates: %v", err)
225 }
226 }
227
228 s.Print = *print
229 s.HTML = *html
230 s.RPC = *enableRPC
231
232 if *hostCustomization != "" {
233 s.HostCustomQueries = map[string]string{}
234 for _, h := range strings.SplitN(*hostCustomization, ",", -1) {
235 if len(h) == 0 {
236 continue
237 }
238 fields := strings.SplitN(h, "=", 2)
239 if len(fields) < 2 {
240 log.Fatalf("invalid host_customization %q", h)
241 }
242
243 s.HostCustomQueries[fields[0]] = fields[1]
244 }
245 }
246
247 serveMux, err := web.NewMux(s)
248 if err != nil {
249 log.Fatal(err)
250 }
251
252 debugserver.AddHandlers(serveMux, *enablePprof)
253
254 if *enableIndexserverProxy {
255 socket := filepath.Join(*indexDir, "indexserver.sock")
256 sglog.Scoped("server").Info("adding reverse proxy", sglog.String("socket", socket))
257 addProxyHandler(serveMux, socket)
258 }
259
260 handler := trace.Middleware(serveMux)
261
262 // Sourcegraph: We use environment variables to configure watchdog since
263 // they are more convenient than flags in containerized environments.
264 watchdogTick := 30 * time.Second
265 if v := os.Getenv("ZOEKT_WATCHDOG_TICK"); v != "" {
266 watchdogTick, _ = time.ParseDuration(v)
267 log.Printf("custom ZOEKT_WATCHDOG_TICK=%v", watchdogTick)
268 }
269
270 watchdogErrCount := 3
271 if v := os.Getenv("ZOEKT_WATCHDOG_ERRORS"); v != "" {
272 watchdogErrCount, _ = strconv.Atoi(v)
273 log.Printf("custom ZOEKT_WATCHDOG_ERRORS=%d", watchdogErrCount)
274 }
275
276 watchdogAddr := "http://" + *listen
277 if *sslCert != "" || *sslKey != "" {
278 watchdogAddr = "https://" + *listen
279 }
280 watchdogAddr += "/healthz"
281
282 if watchdogErrCount > 0 && watchdogTick > 0 {
283 go watchdog(watchdogTick, watchdogErrCount, watchdogAddr)
284 } else {
285 log.Println("watchdog disabled")
286 }
287
288 logger := sglog.Scoped("ZoektWebserverGRPCServer")
289
290 streamer := web.NewTraceAwareSearcher(s.Searcher)
291 grpcServer := newGRPCServer(logger, streamer)
292
293 handler = grpcutil.MultiplexGRPC(grpcServer, handler)
294
295 srv := &http.Server{
296 Addr: *listen,
297 Handler: handler,
298 }
299
300 go func() {
301 sglog.Scoped("server").Info("starting server", sglog.Stringp("address", listen))
302 var err error
303 if *sslCert != "" || *sslKey != "" {
304 err = srv.ListenAndServeTLS(*sslCert, *sslKey)
305 } else {
306 err = srv.ListenAndServe()
307 }
308
309 if err != http.ErrServerClosed {
310 // Fatal otherwise shutdownOnSignal will block
311 log.Fatalf("ListenAndServe: %v", err)
312 }
313 }()
314
315 if s.RPC {
316 // Our RPC system does not support shutdown and hijacks the underlying
317 // http connection. This means shutdown is ineffective and just waits 10s
318 // before calling close. Lets just quit faster in that case.
319 if err := closeOnSignal(srv); err != nil {
320 log.Fatalf("http.Server.Close: %v", err)
321 }
322 } else {
323 if err := shutdownOnSignal(srv); err != nil {
324 log.Fatalf("http.Server.Shutdown: %v", err)
325 }
326 }
327}
328
329// addProxyHandler adds a handler to "mux" that proxies all requests with base
330// /indexserver to "socket".
331func addProxyHandler(mux *http.ServeMux, socket string) {
332 proxy := httputil.NewSingleHostReverseProxy(&url.URL{
333 Scheme: "http",
334 // The value of "Host" is arbitrary, because it is ignored by the
335 // DialContext we use for the socket connection.
336 Host: "socket",
337 })
338 proxy.Transport = &http.Transport{
339 DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
340 var d net.Dialer
341 return d.DialContext(ctx, "unix", socket)
342 },
343 }
344 mux.Handle("/indexserver/", http.StripPrefix("/indexserver/", http.HandlerFunc(proxy.ServeHTTP)))
345}
346
347// shutdownSignalChan returns a channel which is listening for shutdown
348// signals from the operating system. maxReads is an upper bound on how many
349// times you will read the channel (used as buffer for signal.Notify).
350func shutdownSignalChan(maxReads int) <-chan os.Signal {
351 c := make(chan os.Signal, maxReads)
352 signal.Notify(c, os.Interrupt) // terminal C-c and goreman
353 signal.Notify(c, unix.SIGTERM) // Kubernetes
354 return c
355}
356
357// closeOnSignal will listen for SIGINT or SIGTERM and call srv.Close. This is
358// not a graceful shutdown, see shutdownOnSignal.
359func closeOnSignal(srv *http.Server) error {
360 c := shutdownSignalChan(1)
361 <-c
362
363 return srv.Close()
364}
365
366// shutdownOnSignal will listen for SIGINT or SIGTERM and call srv.Shutdown.
367// Note it doesn't call anything else for shutting down. Notably our RPC
368// framework doesn't allow us to drain connections, so when Shutdown we will
369// wait 10s before closing.
370//
371// Note: the call site for shutdownOnSignal should use closeOnSignal instead
372// if rpc mode is enabled due to the above limitation.
373func shutdownOnSignal(srv *http.Server) error {
374 c := shutdownSignalChan(2)
375 <-c
376
377 // If we receive another signal, immediate shutdown
378 ctx, cancel := context.WithCancel(context.Background())
379 defer cancel()
380 go func() {
381 select {
382 case <-ctx.Done():
383 case sig := <-c:
384 log.Printf("received another signal (%v), immediate shutdown", sig)
385 cancel()
386 }
387 }()
388
389 // Wait for 10s to drain ongoing requests. Kubernetes gives us 30s to
390 // shutdown, we have already used 15s waiting for our endpoint removal to
391 // propagate.
392 ctx, cancel2 := context.WithTimeout(ctx, 10*time.Second)
393 defer cancel2()
394
395 log.Printf("shutting down")
396 return srv.Shutdown(ctx)
397}
398
399func watchdogOnce(ctx context.Context, client *http.Client, addr string) error {
400 defer metricWatchdogTotal.Inc()
401
402 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(30*time.Second))
403 defer cancel()
404
405 req, err := http.NewRequest("GET", addr, nil)
406 if err != nil {
407 return err
408 }
409
410 req = req.WithContext(ctx)
411
412 resp, err := client.Do(req)
413 if err != nil {
414 return err
415 }
416 body, _ := io.ReadAll(resp.Body)
417 _ = resp.Body.Close()
418
419 if resp.StatusCode != http.StatusOK {
420 return fmt.Errorf("watchdog: status=%v body=%q", resp.StatusCode, string(body))
421 }
422 return nil
423}
424
425func watchdog(dt time.Duration, maxErrCount int, addr string) {
426 tr := &http.Transport{
427 TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
428 }
429 client := &http.Client{
430 Transport: tr,
431 }
432 tick := time.NewTicker(dt)
433
434 errCount := 0
435 for range tick.C {
436 err := watchdogOnce(context.Background(), client, addr)
437 if err != nil {
438 errCount++
439 metricWatchdogErrors.Set(float64(errCount))
440 metricWatchdogErrorsTotal.Inc()
441 if errCount >= maxErrCount {
442 log.Printf(`watchdog health check has consecutively failed %d times indicating is likely an unrecoverable error affecting zoekt. As such this process will exit with code 3.
443
444Final error: %v
445
446Possible remediations:
447- If this rarely happens, ignore and let your process manager restart zoekt.
448- Possibly under provisioned. Try increasing CPU or disk IO.
449- A bug. Reach out with logs and screenshots of metrics when this occurs.`, errCount, err)
450 os.Exit(3)
451 } else {
452 log.Printf("watchdog: failed, will try %d more times: %v", maxErrCount-errCount, err)
453 }
454 } else if errCount > 0 {
455 errCount = 0
456 metricWatchdogErrors.Set(float64(errCount))
457 log.Printf("watchdog: success, resetting error count")
458 }
459 }
460}
461
462func mustRegisterDiskMonitor(path string) {
463 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
464 Name: "src_disk_space_available_bytes",
465 Help: "Amount of free space disk space.",
466 ConstLabels: prometheus.Labels{"path": path},
467 }, func() float64 {
468 usage, _ := disk.Usage(path)
469 return float64(usage.Free)
470 }))
471
472 prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
473 Name: "src_disk_space_total_bytes",
474 Help: "Amount of total disk space.",
475 ConstLabels: prometheus.Labels{"path": path},
476 }, func() float64 {
477 usage, _ := disk.Usage(path)
478 return float64(usage.Total)
479 }))
480}
481
482type loggedSearcher struct {
483 zoekt.Streamer
484 Logger sglog.Logger
485}
486
487func (s *loggedSearcher) Search(
488 ctx context.Context,
489 q query.Q,
490 opts *zoekt.SearchOptions,
491) (sr *zoekt.SearchResult, err error) {
492 defer func() {
493 var stats *zoekt.Stats
494 if sr != nil {
495 stats = &sr.Stats
496 }
497 s.log(ctx, q, opts, stats, err)
498 }()
499
500 metricSearchRequestsTotal.Inc()
501 return s.Streamer.Search(ctx, q, opts)
502}
503
504func (s *loggedSearcher) StreamSearch(
505 ctx context.Context,
506 q query.Q,
507 opts *zoekt.SearchOptions,
508 sender zoekt.Sender,
509) error {
510 var stats zoekt.Stats
511
512 metricSearchRequestsTotal.Inc()
513 err := s.Streamer.StreamSearch(ctx, q, opts, zoekt.SenderFunc(func(event *zoekt.SearchResult) {
514 stats.Add(event.Stats)
515 sender.Send(event)
516 }))
517
518 s.log(ctx, q, opts, &stats, err)
519
520 return err
521}
522
523func (s *loggedSearcher) log(ctx context.Context, q query.Q, opts *zoekt.SearchOptions, st *zoekt.Stats, err error) {
524 logger := s.Logger.
525 WithTrace(traceContext(ctx)).
526 With(
527 sglog.String("query", q.String()),
528 sglog.Bool("opts.EstimateDocCount", opts.EstimateDocCount),
529 sglog.Bool("opts.Whole", opts.Whole),
530 sglog.Int("opts.ShardMaxMatchCount", opts.ShardMaxMatchCount),
531 sglog.Int("opts.TotalMaxMatchCount", opts.TotalMaxMatchCount),
532 sglog.Duration("opts.MaxWallTime", opts.MaxWallTime),
533 sglog.Int("opts.MaxDocDisplayCount", opts.MaxDocDisplayCount),
534 sglog.Int("opts.MaxMatchDisplayCount", opts.MaxMatchDisplayCount),
535 )
536
537 if err != nil {
538 switch {
539 case errors.Is(err, context.Canceled):
540 logger.Warn("search canceled", sglog.Error(err))
541 case errors.Is(err, context.DeadlineExceeded):
542 logger.Warn("search timeout", sglog.Error(err))
543 default:
544 logger.Error("search failed", sglog.Error(err))
545 }
546 return
547 }
548
549 if st == nil {
550 return
551 }
552
553 logger.Debug("search",
554 sglog.Int64("stat.ContentBytesLoaded", st.ContentBytesLoaded),
555 sglog.Int64("stat.IndexBytesLoaded", st.IndexBytesLoaded),
556 sglog.Int("stat.Crashes", st.Crashes),
557 sglog.Duration("stat.Duration", st.Duration),
558 sglog.Int("stat.FileCount", st.FileCount),
559 sglog.Int("stat.ShardFilesConsidered", st.ShardFilesConsidered),
560 sglog.Int("stat.FilesConsidered", st.FilesConsidered),
561 sglog.Int("stat.FilesLoaded", st.FilesLoaded),
562 sglog.Int("stat.FilesSkipped", st.FilesSkipped),
563 sglog.Int("stat.ShardsScanned", st.ShardsScanned),
564 sglog.Int("stat.ShardsSkipped", st.ShardsSkipped),
565 sglog.Int("stat.ShardsSkippedFilter", st.ShardsSkippedFilter),
566 sglog.Int("stat.MatchCount", st.MatchCount),
567 sglog.Int("stat.NgramMatches", st.NgramMatches),
568 sglog.Int("stat.NgramLookups", st.NgramLookups),
569 sglog.Duration("stat.Wait", st.Wait),
570 sglog.Duration("stat.MatchTreeConstruction", st.MatchTreeConstruction),
571 sglog.Duration("stat.MatchTreeSearch", st.MatchTreeSearch),
572 sglog.Int("stat.RegexpsConsidered", st.RegexpsConsidered),
573 sglog.String("stat.FlushReason", st.FlushReason.String()),
574 )
575}
576
577func traceContext(ctx context.Context) sglog.TraceContext {
578 otSpan := opentracing.SpanFromContext(ctx)
579 if otSpan != nil {
580 if jaegerSpan, ok := otSpan.Context().(jaeger.SpanContext); ok {
581 return sglog.TraceContext{
582 TraceID: jaegerSpan.TraceID().String(),
583 SpanID: jaegerSpan.SpanID().String(),
584 }
585 }
586 }
587
588 if otelSpan := oteltrace.SpanFromContext(ctx).SpanContext(); otelSpan.IsValid() {
589 return sglog.TraceContext{
590 TraceID: otelSpan.TraceID().String(),
591 SpanID: otelSpan.SpanID().String(),
592 }
593 }
594
595 return sglog.TraceContext{}
596}
597
598func newGRPCServer(logger sglog.Logger, streamer zoekt.Streamer, additionalOpts ...grpc.ServerOption) *grpc.Server {
599 s := defaults.NewServer(logger, additionalOpts...)
600 webserverv1.RegisterWebserverServiceServer(s, grpcserver.NewServer(streamer))
601
602 return s
603}
604
605var (
606 metricWatchdogErrors = promauto.NewGauge(prometheus.GaugeOpts{
607 Name: "zoekt_webserver_watchdog_errors",
608 Help: "The current error count for zoekt watchdog.",
609 })
610 metricWatchdogTotal = promauto.NewCounter(prometheus.CounterOpts{
611 Name: "zoekt_webserver_watchdog_total",
612 Help: "The total number of requests done by zoekt watchdog.",
613 })
614 metricWatchdogErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{
615 Name: "zoekt_webserver_watchdog_errors_total",
616 Help: "The total number of errors from zoekt watchdog.",
617 })
618 metricSearchRequestsTotal = promauto.NewCounter(prometheus.CounterOpts{
619 Name: "zoekt_search_requests_total",
620 Help: "The total number of search requests that zoekt received",
621 })
622)