fork of https://github.com/sourcegraph/zoekt
1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled
2// repositories on sourcegraph
3package main
4
5import (
6 "bytes"
7 "context"
8 _ "embed"
9 "encoding/json"
10 "errors"
11 "flag"
12 "fmt"
13 "html/template"
14 "io"
15 "io/fs"
16 "log"
17 "math"
18 "math/rand"
19 "net"
20 "net/http"
21 "net/url"
22 "os"
23 "os/exec"
24 "os/signal"
25 "path/filepath"
26 "runtime"
27 "sort"
28 "strconv"
29 "strings"
30 "sync"
31 "text/tabwriter"
32 "time"
33
34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
36 "github.com/peterbourgon/ff/v3/ffcli"
37 "github.com/prometheus/client_golang/prometheus"
38 "github.com/prometheus/client_golang/prometheus/promauto"
39 sglog "github.com/sourcegraph/log"
40 "github.com/sourcegraph/mountinfo"
41 "go.uber.org/automaxprocs/maxprocs"
42 "golang.org/x/net/trace"
43 "golang.org/x/sys/unix"
44 "google.golang.org/grpc"
45 "google.golang.org/grpc/codes"
46 "google.golang.org/grpc/credentials/insecure"
47 "google.golang.org/grpc/metadata"
48
49 "github.com/sourcegraph/zoekt"
50 "github.com/sourcegraph/zoekt/build"
51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
52 "github.com/sourcegraph/zoekt/debugserver"
53 "github.com/sourcegraph/zoekt/grpc/internalerrs"
54 "github.com/sourcegraph/zoekt/grpc/messagesize"
55 "github.com/sourcegraph/zoekt/internal/profiler"
56)
57
58var (
59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
60 Name: "resolve_revisions_seconds",
61 Help: "A histogram of latencies for resolving all repository revisions.",
62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
63 })
64
65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
66 Name: "resolve_revision_seconds",
67 Help: "A histogram of latencies for resolving a repository revision.",
68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
69 }, []string{"success"}) // success=true|false
70
71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
72 Name: "get_index_options_total",
73 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
74 })
75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
76 Name: "get_index_options_error_total",
77 Help: "The total number of times we failed to get index options for a repository.",
78 })
79
80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
81 Name: "index_repo_seconds",
82 Help: "A histogram of latencies for indexing a repository.",
83 Buckets: prometheus.ExponentialBucketsRange(
84 (100 * time.Millisecond).Seconds(),
85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
86 20),
87 }, []string{
88 "state", // state is an indexState
89 "name", // name of the repository that was indexed
90 })
91
92 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
93 Name: "index_fetch_seconds",
94 Help: "A histogram of latencies for fetching a repository.",
95 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
96 }, []string{
97 "success", // true|false
98 "name", // the name of the repository that the commits were fetched from
99 })
100
101 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
102 Name: "index_incremental_index_state",
103 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
104 }, []string{"state"}) // state is build.IndexState
105
106 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
107 Name: "index_num_indexed",
108 Help: "Number of indexed repos by code host",
109 })
110
111 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
112 Name: "index_failing_total",
113 Help: "Counts failures to index (indexing activity, should be used with rate())",
114 })
115
116 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
117 Name: "index_indexing_total",
118 Help: "Counts indexings (indexing activity, should be used with rate())",
119 })
120
121 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
122 Name: "index_num_stopped_tracking_total",
123 Help: "Counts the number of repos we stopped tracking.",
124 })
125
126 clientMetricsOnce sync.Once
127 clientMetrics *grpcprom.ClientMetrics
128)
129
130// set of repositories that we want to capture separate indexing metrics for
131var reposWithSeparateIndexingMetrics = make(map[string]struct{})
132
133type indexState string
134
135const (
136 indexStateFail indexState = "fail"
137 indexStateSuccess indexState = "success"
138 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
139 indexStateNoop indexState = "noop" // We didn't need to update index
140 indexStateEmpty indexState = "empty" // index is empty (empty repo)
141)
142
143// Server is the main functionality of zoekt-sourcegraph-indexserver. It
144// exists to conveniently use all the options passed in via func main.
145type Server struct {
146 logger sglog.Logger
147
148 Sourcegraph Sourcegraph
149 BatchSize int
150
151 // IndexDir is the index directory to use.
152 IndexDir string
153
154 // IndexConcurrency is the number of repositories we index at once.
155 IndexConcurrency int
156
157 // Interval is how often we sync with Sourcegraph.
158 Interval time.Duration
159
160 // CPUCount is the number of CPUs to use for indexing shards.
161 CPUCount int
162
163 queue Queue
164
165 // muIndexDir protects the index directory from concurrent access.
166 muIndexDir indexMutex
167
168 // If true, shard merging is enabled.
169 shardMerging bool
170
171 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
172 // use delta-builds for instead of normal builds
173 deltaBuildRepositoriesAllowList map[string]struct{}
174
175 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
176 // before attempting a delta build.
177 deltaShardNumberFallbackThreshold uint64
178
179 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
180 // we skip calculating symbols metadata for during builds
181 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
182
183 hostname string
184
185 mergeOpts mergeOpts
186
187 // timeout defines how long the index server waits before killing an indexing job.
188 timeout time.Duration
189}
190
191var debug = log.New(io.Discard, "", log.LstdFlags)
192
193// our index commands should output something every 100mb they process.
194//
195// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
196// time." famous last words. A client was indexing a monorepo with 42
197// cores... 5m was not enough.
198const noOutputTimeout = 30 * time.Minute
199
200func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
201 out := &synchronizedBuffer{}
202 cmd.Stdout = out
203 cmd.Stderr = out
204
205 tr.LazyPrintf("%s", cmd.Args)
206
207 defer func() {
208 if err != nil {
209 outS := out.String()
210 tr.LazyPrintf("failed: %v", err)
211 tr.LazyPrintf("output: %s", out)
212 tr.SetError()
213 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
214 }
215 }()
216
217 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
218
219 if err := cmd.Start(); err != nil {
220 return err
221 }
222
223 errC := make(chan error)
224 go func() {
225 errC <- cmd.Wait()
226 }()
227
228 // This channel is set after we have sent sigquit. It allows us to follow up
229 // with a sigkill if the process doesn't quit after sigquit.
230 kill := make(<-chan time.Time)
231
232 lastLen := 0
233 for {
234 select {
235 case <-time.After(noOutputTimeout):
236 // Periodically check if we have had output. If not kill the process.
237 if out.Len() != lastLen {
238 lastLen = out.Len()
239 log.Printf("still running %s", cmd.Args)
240 } else {
241 // Send quit (C-\) first so we get a stack dump.
242 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
243 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
244 log.Println("quit failed:", err)
245 }
246
247 // send sigkill if still running in 10s
248 kill = time.After(10 * time.Second)
249 }
250
251 case <-kill:
252 log.Printf("still running, killing %s", cmd.Args)
253 if err := cmd.Process.Kill(); err != nil {
254 log.Println("kill failed:", err)
255 }
256
257 case err := <-errC:
258 if err != nil {
259 return err
260 }
261
262 tr.LazyPrintf("success")
263 return nil
264 }
265 }
266}
267
268// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
269// monitor the buffer while it is being written to.
270type synchronizedBuffer struct {
271 mu sync.Mutex
272 b bytes.Buffer
273}
274
275func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
276 sb.mu.Lock()
277 defer sb.mu.Unlock()
278 return sb.b.Write(p)
279}
280
281func (sb *synchronizedBuffer) Len() int {
282 sb.mu.Lock()
283 defer sb.mu.Unlock()
284 return sb.b.Len()
285}
286
287func (sb *synchronizedBuffer) String() string {
288 sb.mu.Lock()
289 defer sb.mu.Unlock()
290 return sb.b.String()
291}
292
293// pauseFileName if present in IndexDir will stop index jobs from
294// running. This is to make it possible to experiment with the content of the
295// IndexDir without the indexserver writing to it.
296const pauseFileName = "PAUSE"
297
298// Run the sync loop. This blocks forever.
299func (s *Server) Run() {
300 removeIncompleteShards(s.IndexDir)
301
302 // Start a goroutine which updates the queue with commits to index.
303 go func() {
304 // We update the list of indexed repos every Interval. To speed up manual
305 // testing we also listen for SIGUSR1 to trigger updates.
306 //
307 // "pkill -SIGUSR1 zoekt-sourcegra"
308 for range jitterTicker(s.Interval, unix.SIGUSR1) {
309 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
310 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
311 continue
312 }
313
314 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
315 if err != nil {
316 log.Printf("error listing repos: %s", err)
317 continue
318 }
319
320 debug.Printf("updating index queue with %d repositories", len(repos.IDs))
321
322 // Stop indexing repos we don't need to track anymore
323 removed := s.queue.MaybeRemoveMissing(repos.IDs)
324 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
325 if len(removed) > 0 {
326 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
327 }
328
329 cleanupDone := make(chan struct{})
330 go func() {
331 defer close(cleanupDone)
332 s.muIndexDir.Global(func() {
333 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
334 })
335 }()
336
337 repos.IterateIndexOptions(s.queue.AddOrUpdate)
338
339 // IterateIndexOptions will only iterate over repositories that have
340 // changed since we last called list. However, we want to add all IDs
341 // back onto the queue just to check that what is on disk is still
342 // correct. This will use the last IndexOptions we stored in the
343 // queue. The repositories not on the queue (missing) need a forced
344 // fetch of IndexOptions.
345 missing := s.queue.Bump(repos.IDs)
346 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
347
348 setCompoundShardCounter(s.IndexDir)
349
350 <-cleanupDone
351 }
352 }()
353
354 go func() {
355 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
356 if s.shardMerging {
357 s.vacuum()
358 }
359 }
360 }()
361
362 go func() {
363 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
364 if s.shardMerging {
365 s.doMerge()
366 }
367 }
368 }()
369
370 for i := 0; i < s.IndexConcurrency; i++ {
371 go s.processQueue()
372 }
373
374 // block forever
375 select {}
376}
377
378// formatList returns a comma-separated list of the first min(len(v), m) items.
379func formatListUint32(v []uint32, m int) string {
380 if len(v) < m {
381 m = len(v)
382 }
383
384 sb := strings.Builder{}
385 for i := 0; i < m; i++ {
386 fmt.Fprintf(&sb, "%d, ", v[i])
387 }
388
389 if len(v) > m {
390 sb.WriteString("...")
391 }
392
393 return strings.TrimRight(sb.String(), ", ")
394}
395
396func (s *Server) processQueue() {
397 for {
398 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
399 time.Sleep(time.Second)
400 continue
401 }
402
403 opts, ok := s.queue.Pop()
404 if !ok {
405 time.Sleep(time.Second)
406 continue
407 }
408
409 args := s.indexArgs(opts)
410
411 ran := s.muIndexDir.With(opts.Name, func() {
412 // only record time taken once we hold the lock. This avoids us
413 // recording time taken while merging/cleanup runs.
414 start := time.Now()
415
416 state, err := s.Index(args)
417
418 elapsed := time.Since(start)
419
420 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
421
422 if err != nil {
423 log.Printf("error indexing %s: %s", args.String(), err)
424 }
425
426 switch state {
427 case indexStateSuccess:
428 var branches []string
429 for _, b := range args.Branches {
430 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
431 }
432 s.logger.Info("updated index",
433 sglog.String("repo", args.Name),
434 sglog.Uint32("id", args.RepoID),
435 sglog.Strings("branches", branches),
436 sglog.Duration("duration", elapsed),
437 )
438 case indexStateSuccessMeta:
439 log.Printf("updated meta %s in %v", args.String(), elapsed)
440 }
441 s.queue.SetIndexed(opts, state)
442 })
443
444 if !ran {
445 // Someone else is processing the repository. We can just skip this job
446 // since the repository will be added back to the queue and we will
447 // converge to the correct behaviour.
448 debug.Printf("index job for repository already running: %s", args)
449 continue
450 }
451 }
452}
453
454// repoNameForMetric returns a normalized version of the given repository name that is
455// suitable for use with Prometheus metrics.
456func repoNameForMetric(repo string) string {
457 // Check to see if we want to be able to capture separate indexing metrics for this repository.
458 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
459 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
460 return repo
461 }
462
463 return ""
464}
465
466func batched(slice []uint32, size int) <-chan []uint32 {
467 c := make(chan []uint32)
468 go func() {
469 for len(slice) > 0 {
470 if size > len(slice) {
471 size = len(slice)
472 }
473 c <- slice[:size]
474 slice = slice[size:]
475 }
476 close(c)
477 }()
478 return c
479}
480
481// jitterTicker returns a ticker which ticks with a jitter. Each tick is
482// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
483//
484// sig is a list of signals which also cause the ticker to fire. This is a
485// convenience to allow manually triggering of the ticker.
486func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
487 ticker := make(chan struct{})
488
489 go func() {
490 for {
491 ticker <- struct{}{}
492 ns := int64(d)
493 jitter := rand.Int63n(ns)
494 time.Sleep(time.Duration(ns/2 + jitter))
495 }
496 }()
497
498 go func() {
499 if len(sig) == 0 {
500 return
501 }
502
503 c := make(chan os.Signal, 1)
504 signal.Notify(c, sig...)
505 for range c {
506 ticker <- struct{}{}
507 }
508 }()
509
510 return ticker
511}
512
513// Index starts an index job for repo name at commit.
514func (s *Server) Index(args *indexArgs) (state indexState, err error) {
515 tr := trace.New("index", args.Name)
516
517 defer func() {
518 if err != nil {
519 tr.SetError()
520 tr.LazyPrintf("error: %v", err)
521 state = indexStateFail
522 metricFailingTotal.Inc()
523 }
524 tr.LazyPrintf("state: %s", state)
525 tr.Finish()
526 }()
527
528 tr.LazyPrintf("branches: %v", args.Branches)
529
530 if len(args.Branches) == 0 {
531 return indexStateEmpty, createEmptyShard(args)
532 }
533
534 repositoryName := args.Name
535 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
536 tr.LazyPrintf("marking this repository for delta build")
537 args.UseDelta = true
538 }
539
540 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
541
542 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
543 tr.LazyPrintf("skipping symbols calculation")
544 args.Symbols = false
545 }
546
547 reason := "forced"
548
549 if args.Incremental {
550 bo := args.BuildOptions()
551 bo.SetDefaults()
552 incrementalState, fn := bo.IndexState()
553 reason = string(incrementalState)
554 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
555
556 switch incrementalState {
557 case build.IndexStateEqual:
558 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn)
559 return indexStateNoop, nil
560
561 case build.IndexStateMeta:
562 log.Printf("updating index.meta %s", args.String())
563
564 if err := mergeMeta(bo); err != nil {
565 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
566 } else {
567 return indexStateSuccessMeta, nil
568 }
569
570 case build.IndexStateCorrupt:
571 log.Printf("falling back to full update: corrupt index: %s", args.String())
572 }
573 }
574
575 log.Printf("updating index %s reason=%s", args.String(), reason)
576
577 metricIndexingTotal.Inc()
578 c := gitIndexConfig{
579 runCmd: func(cmd *exec.Cmd) error {
580 return s.loggedRun(tr, cmd)
581 },
582
583 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
584 return args.BuildOptions().FindRepositoryMetadata()
585 },
586 timeout: s.timeout,
587 }
588
589 err = gitIndex(c, args, s.Sourcegraph, s.logger)
590 if err != nil {
591 return indexStateFail, err
592 }
593
594 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
595 s.logger.Error("failed to update index status",
596 sglog.String("repo", args.Name),
597 sglog.Uint32("id", args.RepoID),
598 sglogBranches("branches", args.Branches),
599 sglog.Error(err),
600 )
601 }
602
603 return indexStateSuccess, nil
604}
605
606// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
607// it can update the zoekt_repos table.
608func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
609 // We need to read from disk for IndexTime.
610 _, metadata, ok, err := c.findRepositoryMetadata(args)
611 if err != nil {
612 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
613 }
614 if !ok {
615 return errors.New("failed to find metadata for new/updated index")
616 }
617
618 status := []indexStatus{{
619 RepoID: args.RepoID,
620 Branches: args.Branches,
621 IndexTimeUnix: metadata.IndexTime.Unix(),
622 }}
623 if err := sg.UpdateIndexStatus(status); err != nil {
624 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
625 }
626
627 return nil
628}
629
630func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
631 ss := make([]string, len(branches))
632 for i, b := range branches {
633 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
634 }
635 return sglog.Strings(key, ss)
636}
637
638func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
639 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
640 return &indexArgs{
641 IndexOptions: opts,
642 IndexDir: s.IndexDir,
643 Parallelism: parallelism,
644 Incremental: true,
645
646 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
647 FileLimit: 1 << 20,
648 }
649}
650
651// parallelism consults both the server flags and index options to determine the number
652// of shards to index in parallel. If the CPUCount index option is provided, it always
653// overrides the server flag.
654func (s *Server) parallelism(opts IndexOptions, maxProcs int) int {
655 var parallelism int
656 if opts.ShardConcurrency > 0 {
657 parallelism = int(opts.ShardConcurrency)
658 } else {
659 parallelism = s.CPUCount
660 }
661
662 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs
663 if parallelism > 4*maxProcs {
664 parallelism = 4 * maxProcs
665 }
666
667 // If index concurrency is set, then divide the parallelism by the number of
668 // repos we're indexing in parallel
669 if s.IndexConcurrency > 1 {
670 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency)))
671 }
672
673 return parallelism
674}
675
676func createEmptyShard(args *indexArgs) error {
677 bo := args.BuildOptions()
678 bo.SetDefaults()
679 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
680
681 if args.Incremental && bo.IncrementalSkipIndexing() {
682 return nil
683 }
684
685 builder, err := build.NewBuilder(*bo)
686 if err != nil {
687 return err
688 }
689 return builder.Finish()
690}
691
692// addDebugHandlers adds handlers specific to indexserver.
693func (s *Server) addDebugHandlers(mux *http.ServeMux) {
694 // Sourcegraph's site admin view requires indexserver to serve it's admin view
695 // on "/".
696 mux.Handle("/", http.HandlerFunc(s.handleRoot))
697
698 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
699 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
700 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
701 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
702 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
703 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
704}
705
706func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
707 if r.Method != "GET" {
708 w.Header().Set("Allow", "GET")
709 w.WriteHeader(http.StatusMethodNotAllowed)
710 return
711 }
712
713 response := struct {
714 Hostname string
715 }{
716 Hostname: s.hostname,
717 }
718
719 b, err := json.Marshal(response)
720 if err != nil {
721 http.Error(w, err.Error(), http.StatusInternalServerError)
722 return
723 }
724
725 w.Header().Set("Content-Type", "application/json; charset=utf-8")
726 w.Write(b)
727}
728
729var rootTmpl = template.Must(template.New("name").Parse(`
730<html>
731 <body>
732 <a href="debug">Debug</a><br />
733 <a href="debug/requests">Traces</a><br />
734 {{.IndexMsg}}<br />
735 <br />
736 <h3>Reindex</h3>
737 {{if .Repos}}
738 <a href="?show_repos=false">hide repos</a><br />
739 <table style="margin-top: 20px">
740 <th style="text-align:left">Name</th>
741 <th style="text-align:left">ID</th>
742 {{range .Repos}}
743 <tr>
744 <td>{{.Name}}</td>
745 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
746 </tr>
747 {{end}}
748 </table>
749 {{else}}
750 <a href="?show_repos=true">show repos</a><br />
751 {{end}}
752 </body>
753</html>
754`))
755
756func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
757 if r.Method != "GET" {
758 w.Header().Set("Allow", "GET")
759 w.WriteHeader(http.StatusMethodNotAllowed)
760 return
761 }
762
763 values := r.URL.Query()
764
765 // ?id=
766 indexMsg := ""
767 if v := values.Get("id"); v != "" {
768 id, err := strconv.Atoi(v)
769 if err != nil {
770 http.Error(w, err.Error(), http.StatusBadRequest)
771 return
772 }
773 indexMsg, _ = s.forceIndex(uint32(id))
774 }
775
776 // ?show_repos=
777 showRepos := false
778 if v := values.Get("show_repos"); v != "" {
779 showRepos, _ = strconv.ParseBool(v)
780 }
781
782 type Repo struct {
783 ID uint32
784 Name string
785 }
786 var data struct {
787 Repos []Repo
788 IndexMsg string
789 }
790
791 data.IndexMsg = indexMsg
792
793 if showRepos {
794 s.queue.Iterate(func(opts *IndexOptions) {
795 data.Repos = append(data.Repos, Repo{
796 ID: opts.RepoID,
797 Name: opts.Name,
798 })
799 })
800 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
801 }
802
803 _ = rootTmpl.Execute(w, data)
804}
805
806// handleReindex triggers a reindex asynocronously. If a reindex was triggered
807// the request returns with status 202. The caller can infer the new state of
808// the index by calling List.
809func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
810 if r.Method != http.MethodPost {
811 w.Header().Set("Allow", http.MethodPost)
812 w.WriteHeader(http.StatusMethodNotAllowed)
813 return
814 }
815
816 err := r.ParseForm()
817 if err != nil {
818 http.Error(w, err.Error(), http.StatusBadRequest)
819 return
820 }
821
822 id, err := strconv.Atoi(r.Form.Get("repo"))
823 if err != nil {
824 http.Error(w, err.Error(), http.StatusBadRequest)
825 return
826 }
827
828 go func() { s.forceIndex(uint32(id)) }()
829
830 // 202 Accepted
831 w.WriteHeader(http.StatusAccepted)
832}
833
834func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
835 withIndexed := true
836 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
837 withIndexed = b
838 }
839
840 var indexed []uint32
841 if withIndexed {
842 indexed = listIndexed(s.IndexDir)
843 }
844
845 repos, err := s.Sourcegraph.List(r.Context(), indexed)
846 if err != nil {
847 http.Error(w, err.Error(), http.StatusInternalServerError)
848 return
849 }
850
851 bw := bytes.Buffer{}
852
853 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
854
855 _, err = fmt.Fprintf(tw, "ID\tName\n")
856 if err != nil {
857 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
858 return
859 }
860
861 s.queue.mu.Lock()
862 name := ""
863 for _, id := range repos.IDs {
864 if item := s.queue.get(id); item != nil {
865 name = item.opts.Name
866 } else {
867 name = ""
868 }
869 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
870 if err != nil {
871 debug.Printf("handleDebugList: %s\n", err.Error())
872 }
873 }
874 s.queue.mu.Unlock()
875
876 if err != nil {
877 http.Error(w, err.Error(), http.StatusInternalServerError)
878 return
879 }
880
881 err = tw.Flush()
882 if err != nil {
883 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
884 return
885 }
886
887 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
888
889 if _, err := io.Copy(w, &bw); err != nil {
890 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
891 return
892 }
893}
894
895// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
896// can run this command during periods of low usage (evenings, weekends) to
897// trigger an initial merge run. In the steady-state, merges happen rarely, even
898// on busy instances, and users can rely on automatic merging instead.
899func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
900 // A merge operation can take very long, depending on the number merges and the
901 // target size of the compound shards. We run the merge in the background and
902 // return immediately to the user.
903 //
904 // We track the status of the merge with metricShardMergingRunning.
905 go func() {
906 s.doMerge()
907 }()
908 _, _ = w.Write([]byte("merging enqueued\n"))
909}
910
911func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
912 indexed := listIndexed(s.IndexDir)
913
914 bw := bytes.Buffer{}
915
916 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
917
918 _, err := fmt.Fprintf(tw, "ID\tName\n")
919 if err != nil {
920 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
921 return
922 }
923
924 s.queue.mu.Lock()
925 name := ""
926 for _, id := range indexed {
927 if item := s.queue.get(id); item != nil {
928 name = item.opts.Name
929 } else {
930 name = ""
931 }
932 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
933 if err != nil {
934 debug.Printf("handleDebugIndexed: %s\n", err.Error())
935 }
936 }
937 s.queue.mu.Unlock()
938
939 if err != nil {
940 http.Error(w, err.Error(), http.StatusInternalServerError)
941 return
942 }
943
944 err = tw.Flush()
945 if err != nil {
946 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
947 return
948 }
949
950 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
951
952 if _, err := io.Copy(w, &bw); err != nil {
953 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
954 return
955 }
956}
957
958// forceIndex will run the index job for repo name now. It will return always
959// return a string explaining what it did, even if it failed.
960func (s *Server) forceIndex(id uint32) (string, error) {
961 var opts IndexOptions
962 var err error
963 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
964 opts = o
965 }, func(_ uint32, e error) {
966 err = e
967 }, id)
968 if err != nil {
969 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
970 }
971
972 args := s.indexArgs(opts)
973 args.Incremental = false // force re-index
974
975 var state indexState
976 ran := s.muIndexDir.With(opts.Name, func() {
977 state, err = s.Index(args)
978 })
979 if !ran {
980 return fmt.Sprintf("index job for repository already running: %s", args), nil
981 }
982 if err != nil {
983 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
984 }
985 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
986}
987
988func listIndexed(indexDir string) []uint32 {
989 index := getShards(indexDir)
990 metricNumIndexed.Set(float64(len(index)))
991 repoIDs := make([]uint32, 0, len(index))
992 for id := range index {
993 repoIDs = append(repoIDs, id)
994 }
995 sort.Slice(repoIDs, func(i, j int) bool {
996 return repoIDs[i] < repoIDs[j]
997 })
998 return repoIDs
999}
1000
1001// setupTmpDir sets up a temporary directory on the same volume as the
1002// indexes.
1003//
1004// If main is true we will delete older temp directories left around. main is
1005// false when this is a debug command.
1006func setupTmpDir(logger sglog.Logger, main bool, index string) error {
1007 // change the target tmp directory depending on if it's our main daemon or a
1008 // debug sub command.
1009 dir := ".indexserver.debug.tmp"
1010 if main {
1011 dir = ".indexserver.tmp"
1012 }
1013
1014 tmpRoot := filepath.Join(index, dir)
1015
1016 if main {
1017 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot))
1018 err := os.RemoveAll(tmpRoot)
1019 if err != nil {
1020 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err))
1021 }
1022 }
1023
1024 if err := os.MkdirAll(tmpRoot, 0o755); err != nil {
1025 return err
1026 }
1027
1028 return os.Setenv("TMPDIR", tmpRoot)
1029}
1030
1031func printMetaData(fn string) error {
1032 repo, indexMeta, err := zoekt.ReadMetadataPath(fn)
1033 if err != nil {
1034 return err
1035 }
1036
1037 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1038 if err != nil {
1039 return err
1040 }
1041
1042 err = json.NewEncoder(os.Stdout).Encode(repo)
1043 if err != nil {
1044 return err
1045 }
1046 return nil
1047}
1048
1049func printShardStats(fn string) error {
1050 f, err := os.Open(fn)
1051 if err != nil {
1052 return err
1053 }
1054
1055 iFile, err := zoekt.NewIndexFile(f)
1056 if err != nil {
1057 return err
1058 }
1059
1060 return zoekt.PrintNgramStats(iFile)
1061}
1062
1063func srcLogLevelIsDebug() bool {
1064 lvl := os.Getenv(sglog.EnvLogLevel)
1065 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1066}
1067
1068func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1069 v := os.Getenv(k)
1070 if v == "" {
1071 return defaultVal
1072 }
1073 i, err := strconv.ParseInt(v, 10, 64)
1074 if err != nil {
1075 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1076 }
1077 return i
1078}
1079
1080func getEnvWithDefaultInt(k string, defaultVal int) int {
1081 v := os.Getenv(k)
1082 if v == "" {
1083 return defaultVal
1084 }
1085 i, err := strconv.Atoi(k)
1086 if err != nil {
1087 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1088 }
1089 return i
1090}
1091
1092func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1093 v := os.Getenv(k)
1094 if v == "" {
1095 return defaultVal
1096 }
1097 i, err := strconv.ParseUint(v, 10, 64)
1098 if err != nil {
1099 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1100 }
1101 return i
1102}
1103
1104func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1105 v := os.Getenv(k)
1106 if v == "" {
1107 return defaultVal
1108 }
1109 f, err := strconv.ParseFloat(v, 64)
1110 if err != nil {
1111 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1112 }
1113 return f
1114}
1115
1116func getEnvWithDefaultString(k string, defaultVal string) string {
1117 v := os.Getenv(k)
1118 if v == "" {
1119 return defaultVal
1120 }
1121 return v
1122}
1123
1124func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1125 v := os.Getenv(k)
1126 if v == "" {
1127 return defaultVal
1128 }
1129
1130 d, err := time.ParseDuration(v)
1131 if err != nil {
1132 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1133 }
1134 return d
1135}
1136
1137func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1138 set := map[string]struct{}{}
1139 for _, v := range strings.Split(os.Getenv(k), ",") {
1140 v = strings.TrimSpace(v)
1141 if v != "" {
1142 set[v] = struct{}{}
1143 }
1144 }
1145 return set
1146}
1147
1148func joinStringSet(set map[string]struct{}, sep string) string {
1149 var xs []string
1150 for x := range set {
1151 xs = append(xs, x)
1152 }
1153
1154 return strings.Join(xs, sep)
1155}
1156
1157func setCompoundShardCounter(indexDir string) {
1158 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt"))
1159 if err != nil {
1160 log.Printf("setCompoundShardCounter: %s\n", err)
1161 return
1162 }
1163 metricNumberCompoundShards.Set(float64(len(fns)))
1164}
1165
1166func rootCmd() *ffcli.Command {
1167 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1168 conf := rootConfig{
1169 Main: true,
1170 }
1171 conf.registerRootFlags(rootFs)
1172
1173 return &ffcli.Command{
1174 FlagSet: rootFs,
1175 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1176 Subcommands: []*ffcli.Command{debugCmd()},
1177 Exec: func(ctx context.Context, args []string) error {
1178 return startServer(conf)
1179 },
1180 }
1181}
1182
1183type rootConfig struct {
1184 // Main is true if this rootConfig is for our main long running command (the
1185 // indexserver). Debug commands should not set this value. This is used to
1186 // determine if we need to run tmpfriend.
1187 Main bool
1188
1189 root string
1190 interval time.Duration
1191 index string
1192 indexConcurrency int64
1193 listen string
1194 hostname string
1195 cpuFraction float64
1196 blockProfileRate int
1197
1198 // config values related to shard merging
1199 vacuumInterval time.Duration
1200 mergeInterval time.Duration
1201 targetSize int64
1202 minSize int64
1203 minAgeDays int
1204 maxPriority float64
1205
1206 // config values related to backoff indexing repos with one or more consecutive failures
1207 backoffDuration time.Duration
1208 maxBackoffDuration time.Duration
1209}
1210
1211func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1212 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1213 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1214 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
1215 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use")
1216 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1217 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1218 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1219 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate")
1220 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1221 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1222
1223 // flags related to shard merging
1224 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1225 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1226 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB")
1227 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB")
1228 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1229 fs.Float64Var(&rc.maxPriority, "merge_max_priority", getEnvWithDefaultFloat64("SRC_MERGE_MAX_PRIORITY", 100), "the maximum priority a shard can have to be considered for merging.")
1230}
1231
1232func startServer(conf rootConfig) error {
1233 s, err := newServer(conf)
1234 if err != nil {
1235 return err
1236 }
1237
1238 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate)
1239 setCompoundShardCounter(s.IndexDir)
1240
1241 if conf.listen != "" {
1242
1243 mux := http.NewServeMux()
1244 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1245 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1246 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1247 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1248 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1249 }...)
1250 s.addDebugHandlers(mux)
1251
1252 go func() {
1253 debug.Printf("serving HTTP on %s", conf.listen)
1254 log.Fatal(http.ListenAndServe(conf.listen, mux))
1255 }()
1256
1257 // Serve mux on a unix domain socket on a best-effort-basis so that
1258 // webserver can call the endpoints via the shared filesystem.
1259 //
1260 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1261 // on the socket due to permission errors. See
1262 // https://github.com/docker/for-mac/issues/6239
1263 go func() {
1264 serveHTTPOverSocket := func() error {
1265 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1266 // We cannot bind a socket to an existing pathname.
1267 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1268 return fmt.Errorf("error removing socket file: %s", socket)
1269 }
1270 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1271 // unixpacket).
1272 l, err := net.Listen("unix", socket)
1273 if err != nil {
1274 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1275 }
1276 // Indexserver (root) and webserver (Sourcegraph) run with
1277 // different users. Per default, the socket is created with
1278 // permission 755 (root root), which doesn't let webserver write to
1279 // it.
1280 //
1281 // See https://github.com/golang/go/issues/11822 for more context.
1282 if err := os.Chmod(socket, 0o777); err != nil {
1283 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1284 }
1285 debug.Printf("serving HTTP on %s", socket)
1286 return http.Serve(l, mux)
1287 }
1288 debug.Print(serveHTTPOverSocket())
1289 }()
1290 }
1291
1292 oc := &ownerChecker{
1293 Path: filepath.Join(conf.index, "owner.txt"),
1294 Hostname: conf.hostname,
1295 }
1296 go oc.Run()
1297
1298 logger := sglog.Scoped("metricsRegistration")
1299 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1300
1301 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1302 prometheus.DefaultRegisterer.MustRegister(c)
1303
1304 s.Run()
1305 return nil
1306}
1307
1308func newServer(conf rootConfig) (*Server, error) {
1309 logger := sglog.Scoped("server")
1310
1311 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1312 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1313 }
1314 if conf.index == "" {
1315 return nil, fmt.Errorf("must set -index")
1316 }
1317 if conf.root == "" {
1318 return nil, fmt.Errorf("must set -sourcegraph_url")
1319 }
1320 rootURL, err := url.Parse(conf.root)
1321 if err != nil {
1322 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1323 }
1324
1325 rootURL = addDefaultPort(rootURL)
1326
1327 // Tune GOMAXPROCS to match Linux container CPU quota.
1328 _, _ = maxprocs.Set()
1329
1330 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler.
1331 // The block profiler is disabled by default and should be enabled with care in production
1332 runtime.SetBlockProfileRate(conf.blockProfileRate)
1333
1334 // Automatically prepend our own path at the front, to minimize
1335 // required configuration.
1336 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1337 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1338 }
1339
1340 if _, err := os.Stat(conf.index); err != nil {
1341 if err := os.MkdirAll(conf.index, 0o755); err != nil {
1342 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1343 }
1344 }
1345
1346 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil {
1347 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1348 }
1349
1350 if srcLogLevelIsDebug() {
1351 debug = log.New(os.Stderr, "", log.LstdFlags)
1352 }
1353
1354 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1355 if len(reposWithSeparateIndexingMetrics) > 0 {
1356 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1357 }
1358
1359 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1360 if len(deltaBuildRepositoriesAllowList) > 0 {
1361 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1362 }
1363
1364 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1365 if deltaShardNumberFallbackThreshold > 0 {
1366 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1367 } else {
1368 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1369 }
1370
1371 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1372 if len(reposShouldSkipSymbolsCalculation) > 0 {
1373 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1374 }
1375
1376 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout)
1377 if indexingTimeout != defaultIndexingTimeout {
1378 debug.Printf("using configured indexing timeout: %s", indexingTimeout)
1379 }
1380
1381 var sg Sourcegraph
1382 if rootURL.IsAbs() {
1383 var batchSize int
1384 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1385 batchSize, err = strconv.Atoi(v)
1386 if err != nil {
1387 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1388 }
1389 }
1390
1391 opts := []SourcegraphClientOption{
1392 WithBatchSize(batchSize),
1393 }
1394
1395 logger := sglog.Scoped("zoektConfigurationGRPCClient")
1396 client, err := dialGRPCClient(rootURL.Host, logger)
1397 if err != nil {
1398 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1399 }
1400
1401 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...)
1402
1403 } else {
1404 sg = sourcegraphFake{
1405 RootDir: rootURL.String(),
1406 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1407 }
1408 }
1409
1410 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
1411 if cpuCount < 1 {
1412 cpuCount = 1
1413 }
1414
1415 if conf.indexConcurrency < 1 {
1416 conf.indexConcurrency = 1
1417 } else if conf.indexConcurrency > int64(cpuCount) {
1418 conf.indexConcurrency = int64(cpuCount)
1419 }
1420
1421 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1422
1423 return &Server{
1424 logger: logger,
1425 Sourcegraph: sg,
1426 IndexDir: conf.index,
1427 IndexConcurrency: int(conf.indexConcurrency),
1428 Interval: conf.interval,
1429 CPUCount: cpuCount,
1430 queue: *q,
1431 shardMerging: zoekt.ShardMergingEnabled(),
1432 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1433 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1434 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1435 hostname: conf.hostname,
1436 mergeOpts: mergeOpts{
1437 vacuumInterval: conf.vacuumInterval,
1438 mergeInterval: conf.mergeInterval,
1439 targetSizeBytes: conf.targetSize * 1024 * 1024,
1440 minSizeBytes: conf.minSize * 1024 * 1024,
1441 minAgeDays: conf.minAgeDays,
1442 maxPriority: conf.maxPriority,
1443 },
1444 timeout: indexingTimeout,
1445 }, err
1446}
1447
1448// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1449// for the indexed-search-configuration gRPC service.
1450//
1451// The default backoff strategy is modeled after the default settings used by
1452// retryablehttp.DefaultClient.
1453//
1454// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1455// - Unavailable
1456// - Aborted
1457//
1458//go:embed default_grpc_service_configuration.json
1459var defaultGRPCServiceConfigurationJSON string
1460
1461func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1462 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1463 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1464 return invoker(ctx, method, req, reply, cc, opts...)
1465 }
1466}
1467
1468func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1469 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1470 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1471 return streamer(ctx, desc, cc, method, opts...)
1472 }
1473}
1474
1475// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1476// This can be overridden by providing custom Server/Dial options.
1477const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1478
1479func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) {
1480 metrics := mustGetClientMetrics()
1481
1482 // If the service seems to be unavailable, this
1483 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1
1484 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s])
1485 retryOpts := []retry.CallOption{
1486 retry.WithMax(5),
1487 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)),
1488 retry.WithCodes(codes.Unavailable),
1489 }
1490
1491 opts := []grpc.DialOption{
1492 grpc.WithTransportCredentials(insecure.NewCredentials()),
1493 grpc.WithChainStreamInterceptor(
1494 metrics.StreamClientInterceptor(),
1495 messagesize.StreamClientInterceptor,
1496 internalActorStreamInterceptor(),
1497 internalerrs.LoggingStreamClientInterceptor(logger),
1498 internalerrs.PrometheusStreamClientInterceptor,
1499 retry.StreamClientInterceptor(retryOpts...),
1500 ),
1501 grpc.WithChainUnaryInterceptor(
1502 metrics.UnaryClientInterceptor(),
1503 messagesize.UnaryClientInterceptor,
1504 internalActorUnaryInterceptor(),
1505 internalerrs.LoggingUnaryClientInterceptor(logger),
1506 internalerrs.PrometheusUnaryClientInterceptor,
1507 retry.UnaryClientInterceptor(retryOpts...),
1508 ),
1509 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1510 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1511 }
1512
1513 opts = append(opts, additionalOpts...)
1514
1515 // Ensure that the message size options are set last, so they override any other
1516 // client-specific options that tweak the message size.
1517 //
1518 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1519 // take precedence over everything else with a uniform size setting that's easy to reason about.
1520 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1521
1522 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1523 // This is done lazily, so we can provide the client to use regardless of
1524 // whether we enabled gRPC or not initially.
1525 cc, err := grpc.Dial(addr, opts...)
1526 if err != nil {
1527 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1528 }
1529
1530 client := proto.NewZoektConfigurationServiceClient(cc)
1531 return client, nil
1532}
1533
1534// mustGetClientMetrics returns a singleton instance of the client metrics
1535// that are shared across all gRPC clients that this process creates.
1536//
1537// This function panics if the metrics cannot be registered with the default
1538// Prometheus registry.
1539func mustGetClientMetrics() *grpcprom.ClientMetrics {
1540 clientMetricsOnce.Do(func() {
1541 clientMetrics = grpcprom.NewClientMetrics(
1542 grpcprom.WithClientCounterOptions(),
1543 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
1544 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
1545 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
1546 )
1547
1548 prometheus.DefaultRegisterer.MustRegister(clientMetrics)
1549 })
1550
1551 return clientMetrics
1552}
1553
1554// addDefaultPort adds a default port to a URL if one is not specified.
1555//
1556// If the URL scheme is "http" and no port is specified, "80" is used.
1557// If the scheme is "https", "443" is used.
1558//
1559// The original URL is not mutated. A copy is modified and returned.
1560func addDefaultPort(original *url.URL) *url.URL {
1561 if original == nil {
1562 return nil // don't panic
1563 }
1564
1565 if !original.IsAbs() {
1566 return original // don't do anything if the URL doesn't look like a remote URL
1567 }
1568
1569 if original.Scheme == "http" && original.Port() == "" {
1570 u := cloneURL(original)
1571 u.Host = net.JoinHostPort(u.Host, "80")
1572 return u
1573 }
1574
1575 if original.Scheme == "https" && original.Port() == "" {
1576 u := cloneURL(original)
1577 u.Host = net.JoinHostPort(u.Host, "443")
1578 return u
1579 }
1580
1581 return original
1582}
1583
1584// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1585// This is copied from net/http/clone.go
1586func cloneURL(u *url.URL) *url.URL {
1587 if u == nil {
1588 return nil
1589 }
1590 u2 := new(url.URL)
1591 *u2 = *u
1592 if u.User != nil {
1593 u2.User = new(url.Userinfo)
1594 *u2.User = *u.User
1595 }
1596 return u2
1597}
1598
1599func main() {
1600 liblog := sglog.Init(sglog.Resource{
1601 Name: "zoekt-indexserver",
1602 Version: zoekt.Version,
1603 InstanceID: zoekt.HostnameBestEffort(),
1604 })
1605 defer liblog.Sync()
1606
1607 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1608 log.Fatal(err)
1609 }
1610}
1611
1612// getBoolFromEnvironmentVariables returns the boolean defined by the first environment
1613// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set.
1614//
1615// An error is returned of the provided environment variables fails to parse as a boolean.
1616func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) {
1617 for _, envVar := range envVarNames {
1618 v := os.Getenv(envVar)
1619 if v == "" {
1620 continue
1621 }
1622
1623 b, err := strconv.ParseBool(v)
1624 if err != nil {
1625 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err)
1626 }
1627
1628 return b, nil
1629 }
1630
1631 return defaultBool, nil
1632}