fork of https://github.com/sourcegraph/zoekt
1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled
2// repositories on sourcegraph
3package main
4
5import (
6 "bytes"
7 "context"
8 _ "embed"
9 "encoding/json"
10 "errors"
11 "flag"
12 "fmt"
13 "html/template"
14 "io"
15 "io/fs"
16 "log"
17 "math"
18 "math/rand"
19 "net"
20 "net/http"
21 "net/url"
22 "os"
23 "os/exec"
24 "os/signal"
25 "path/filepath"
26 "runtime"
27 "sort"
28 "strconv"
29 "strings"
30 "sync"
31 "text/tabwriter"
32 "time"
33
34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
36 "github.com/peterbourgon/ff/v3/ffcli"
37 "github.com/prometheus/client_golang/prometheus"
38 "github.com/prometheus/client_golang/prometheus/promauto"
39 sglog "github.com/sourcegraph/log"
40 "github.com/sourcegraph/mountinfo"
41 "go.uber.org/automaxprocs/maxprocs"
42 "golang.org/x/net/trace"
43 "golang.org/x/sys/unix"
44 "google.golang.org/grpc"
45 "google.golang.org/grpc/codes"
46 "google.golang.org/grpc/credentials/insecure"
47 "google.golang.org/grpc/metadata"
48
49 "github.com/sourcegraph/zoekt"
50 "github.com/sourcegraph/zoekt/build"
51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
52 "github.com/sourcegraph/zoekt/debugserver"
53 "github.com/sourcegraph/zoekt/grpc/internalerrs"
54 "github.com/sourcegraph/zoekt/grpc/messagesize"
55 "github.com/sourcegraph/zoekt/internal/profiler"
56)
57
58var (
59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
60 Name: "resolve_revisions_seconds",
61 Help: "A histogram of latencies for resolving all repository revisions.",
62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
63 })
64
65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
66 Name: "resolve_revision_seconds",
67 Help: "A histogram of latencies for resolving a repository revision.",
68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
69 }, []string{"success"}) // success=true|false
70
71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
72 Name: "get_index_options_total",
73 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
74 })
75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
76 Name: "get_index_options_error_total",
77 Help: "The total number of times we failed to get index options for a repository.",
78 })
79
80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
81 Name: "index_repo_seconds",
82 Help: "A histogram of latencies for indexing a repository.",
83 Buckets: prometheus.ExponentialBucketsRange(
84 (100 * time.Millisecond).Seconds(),
85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
86 20),
87 }, []string{
88 "state", // state is an indexState
89 "name", // name of the repository that was indexed
90 })
91
92 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{
93 Name: "index_indexing_delay_seconds",
94 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.",
95 Buckets: []float64{
96 60, // 1m
97 300, // 5m
98 1200, // 20m
99 2400, // 40m
100 3600, // 1h
101 10800, // 3h
102 18000, // 5h
103 36000, // 10h
104 43200, // 12h
105 54000, // 15h
106 72000, // 20h
107 86400, // 24h
108 108000, // 30h
109 126000, // 35h
110 172800, // 48h
111 }}, []string{
112 "state", // state is an indexState
113 "name", // the name of the repository that was indexed
114 })
115
116 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
117 Name: "index_fetch_seconds",
118 Help: "A histogram of latencies for fetching a repository.",
119 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
120 }, []string{
121 "success", // true|false
122 "name", // the name of the repository that the commits were fetched from
123 })
124
125 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
126 Name: "index_incremental_index_state",
127 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
128 }, []string{"state"}) // state is build.IndexState
129
130 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
131 Name: "index_num_indexed",
132 Help: "Number of indexed repos by code host",
133 })
134
135 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
136 Name: "index_failing_total",
137 Help: "Counts failures to index (indexing activity, should be used with rate())",
138 })
139
140 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
141 Name: "index_indexing_total",
142 Help: "Counts indexings (indexing activity, should be used with rate())",
143 })
144
145 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
146 Name: "index_num_stopped_tracking_total",
147 Help: "Counts the number of repos we stopped tracking.",
148 })
149
150 clientMetricsOnce sync.Once
151 clientMetrics *grpcprom.ClientMetrics
152)
153
154// set of repositories that we want to capture separate indexing metrics for
155var reposWithSeparateIndexingMetrics = make(map[string]struct{})
156
157type indexState string
158
159const (
160 indexStateFail indexState = "fail"
161 indexStateSuccess indexState = "success"
162 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
163 indexStateNoop indexState = "noop" // We didn't need to update index
164 indexStateEmpty indexState = "empty" // index is empty (empty repo)
165)
166
167// Server is the main functionality of zoekt-sourcegraph-indexserver. It
168// exists to conveniently use all the options passed in via func main.
169type Server struct {
170 logger sglog.Logger
171
172 Sourcegraph Sourcegraph
173 BatchSize int
174
175 // IndexDir is the index directory to use.
176 IndexDir string
177
178 // IndexConcurrency is the number of repositories we index at once.
179 IndexConcurrency int
180
181 // Interval is how often we sync with Sourcegraph.
182 Interval time.Duration
183
184 // CPUCount is the number of CPUs to use for indexing shards.
185 CPUCount int
186
187 queue Queue
188
189 // muIndexDir protects the index directory from concurrent access.
190 muIndexDir indexMutex
191
192 // If true, shard merging is enabled.
193 shardMerging bool
194
195 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
196 // use delta-builds for instead of normal builds
197 deltaBuildRepositoriesAllowList map[string]struct{}
198
199 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
200 // before attempting a delta build.
201 deltaShardNumberFallbackThreshold uint64
202
203 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
204 // we skip calculating symbols metadata for during builds
205 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
206
207 hostname string
208
209 mergeOpts mergeOpts
210
211 // timeout defines how long the index server waits before killing an indexing job.
212 timeout time.Duration
213}
214
215var debug = log.New(io.Discard, "", log.LstdFlags)
216
217// our index commands should output something every 100mb they process.
218//
219// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
220// time." famous last words. A client was indexing a monorepo with 42
221// cores... 5m was not enough.
222const noOutputTimeout = 30 * time.Minute
223
224func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
225 out := &synchronizedBuffer{}
226 cmd.Stdout = out
227 cmd.Stderr = out
228
229 tr.LazyPrintf("%s", cmd.Args)
230
231 defer func() {
232 if err != nil {
233 outS := out.String()
234 tr.LazyPrintf("failed: %v", err)
235 tr.LazyPrintf("output: %s", out)
236 tr.SetError()
237 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
238 }
239 }()
240
241 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
242
243 if err := cmd.Start(); err != nil {
244 return err
245 }
246
247 errC := make(chan error)
248 go func() {
249 errC <- cmd.Wait()
250 }()
251
252 // This channel is set after we have sent sigquit. It allows us to follow up
253 // with a sigkill if the process doesn't quit after sigquit.
254 kill := make(<-chan time.Time)
255
256 lastLen := 0
257 for {
258 select {
259 case <-time.After(noOutputTimeout):
260 // Periodically check if we have had output. If not kill the process.
261 if out.Len() != lastLen {
262 lastLen = out.Len()
263 log.Printf("still running %s", cmd.Args)
264 } else {
265 // Send quit (C-\) first so we get a stack dump.
266 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
267 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
268 log.Println("quit failed:", err)
269 }
270
271 // send sigkill if still running in 10s
272 kill = time.After(10 * time.Second)
273 }
274
275 case <-kill:
276 log.Printf("still running, killing %s", cmd.Args)
277 if err := cmd.Process.Kill(); err != nil {
278 log.Println("kill failed:", err)
279 }
280
281 case err := <-errC:
282 if err != nil {
283 return err
284 }
285
286 tr.LazyPrintf("success")
287 return nil
288 }
289 }
290}
291
292// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
293// monitor the buffer while it is being written to.
294type synchronizedBuffer struct {
295 mu sync.Mutex
296 b bytes.Buffer
297}
298
299func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
300 sb.mu.Lock()
301 defer sb.mu.Unlock()
302 return sb.b.Write(p)
303}
304
305func (sb *synchronizedBuffer) Len() int {
306 sb.mu.Lock()
307 defer sb.mu.Unlock()
308 return sb.b.Len()
309}
310
311func (sb *synchronizedBuffer) String() string {
312 sb.mu.Lock()
313 defer sb.mu.Unlock()
314 return sb.b.String()
315}
316
317// pauseFileName if present in IndexDir will stop index jobs from
318// running. This is to make it possible to experiment with the content of the
319// IndexDir without the indexserver writing to it.
320const pauseFileName = "PAUSE"
321
322// Run the sync loop. This blocks forever.
323func (s *Server) Run() {
324 removeIncompleteShards(s.IndexDir)
325
326 // Start a goroutine which updates the queue with commits to index.
327 go func() {
328 // We update the list of indexed repos every Interval. To speed up manual
329 // testing we also listen for SIGUSR1 to trigger updates.
330 //
331 // "pkill -SIGUSR1 zoekt-sourcegra"
332 for range jitterTicker(s.Interval, unix.SIGUSR1) {
333 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
334 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
335 continue
336 }
337
338 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
339 if err != nil {
340 log.Printf("error listing repos: %s", err)
341 continue
342 }
343
344 debug.Printf("updating index queue with %d repositories", len(repos.IDs))
345
346 // Stop indexing repos we don't need to track anymore
347 removed := s.queue.MaybeRemoveMissing(repos.IDs)
348 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
349 if len(removed) > 0 {
350 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
351 }
352
353 cleanupDone := make(chan struct{})
354 go func() {
355 defer close(cleanupDone)
356 s.muIndexDir.Global(func() {
357 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
358 })
359 }()
360
361 repos.IterateIndexOptions(s.queue.AddOrUpdate)
362
363 // IterateIndexOptions will only iterate over repositories that have
364 // changed since we last called list. However, we want to add all IDs
365 // back onto the queue just to check that what is on disk is still
366 // correct. This will use the last IndexOptions we stored in the
367 // queue. The repositories not on the queue (missing) need a forced
368 // fetch of IndexOptions.
369 missing := s.queue.Bump(repos.IDs)
370 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
371
372 setCompoundShardCounter(s.IndexDir)
373
374 <-cleanupDone
375 }
376 }()
377
378 go func() {
379 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
380 if s.shardMerging {
381 s.vacuum()
382 }
383 }
384 }()
385
386 go func() {
387 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
388 if s.shardMerging {
389 s.doMerge()
390 }
391 }
392 }()
393
394 for i := 0; i < s.IndexConcurrency; i++ {
395 go s.processQueue()
396 }
397
398 // block forever
399 select {}
400}
401
402// formatList returns a comma-separated list of the first min(len(v), m) items.
403func formatListUint32(v []uint32, m int) string {
404 if len(v) < m {
405 m = len(v)
406 }
407
408 sb := strings.Builder{}
409 for i := 0; i < m; i++ {
410 fmt.Fprintf(&sb, "%d, ", v[i])
411 }
412
413 if len(v) > m {
414 sb.WriteString("...")
415 }
416
417 return strings.TrimRight(sb.String(), ", ")
418}
419
420func (s *Server) processQueue() {
421 for {
422 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
423 time.Sleep(time.Second)
424 continue
425 }
426
427 item, ok := s.queue.Pop()
428 if !ok {
429 time.Sleep(time.Second)
430 continue
431 }
432
433 opts := item.Opts
434 args := s.indexArgs(opts)
435
436 ran := s.muIndexDir.With(opts.Name, func() {
437 // only record time taken once we hold the lock. This avoids us
438 // recording time taken while merging/cleanup runs.
439 start := time.Now()
440
441 state, err := s.Index(args)
442
443 elapsed := time.Since(start)
444 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
445
446 indexDelay := time.Since(item.DateAddedToQueue)
447 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds())
448
449 if err != nil {
450 log.Printf("error indexing %s: %s", args.String(), err)
451 }
452
453 switch state {
454 case indexStateSuccess:
455 var branches []string
456 for _, b := range args.Branches {
457 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
458 }
459 s.logger.Info("updated index",
460 sglog.String("repo", args.Name),
461 sglog.Uint32("id", args.RepoID),
462 sglog.Strings("branches", branches),
463 sglog.Duration("duration", elapsed),
464 sglog.Duration("index_delay", indexDelay),
465 )
466 case indexStateSuccessMeta:
467 log.Printf("updated meta %s in %v", args.String(), elapsed)
468 }
469 s.queue.SetIndexed(opts, state)
470 })
471
472 if !ran {
473 // Someone else is processing the repository. We can just skip this job
474 // since the repository will be added back to the queue and we will
475 // converge to the correct behaviour.
476 debug.Printf("index job for repository already running: %s", args)
477 continue
478 }
479 }
480}
481
482// repoNameForMetric returns a normalized version of the given repository name that is
483// suitable for use with Prometheus metrics.
484func repoNameForMetric(repo string) string {
485 // Check to see if we want to be able to capture separate indexing metrics for this repository.
486 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
487 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
488 return repo
489 }
490
491 return ""
492}
493
494func batched(slice []uint32, size int) <-chan []uint32 {
495 c := make(chan []uint32)
496 go func() {
497 for len(slice) > 0 {
498 if size > len(slice) {
499 size = len(slice)
500 }
501 c <- slice[:size]
502 slice = slice[size:]
503 }
504 close(c)
505 }()
506 return c
507}
508
509// jitterTicker returns a ticker which ticks with a jitter. Each tick is
510// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
511//
512// sig is a list of signals which also cause the ticker to fire. This is a
513// convenience to allow manually triggering of the ticker.
514func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
515 ticker := make(chan struct{})
516
517 go func() {
518 for {
519 ticker <- struct{}{}
520 ns := int64(d)
521 jitter := rand.Int63n(ns)
522 time.Sleep(time.Duration(ns/2 + jitter))
523 }
524 }()
525
526 go func() {
527 if len(sig) == 0 {
528 return
529 }
530
531 c := make(chan os.Signal, 1)
532 signal.Notify(c, sig...)
533 for range c {
534 ticker <- struct{}{}
535 }
536 }()
537
538 return ticker
539}
540
541// Index starts an index job for repo name at commit.
542func (s *Server) Index(args *indexArgs) (state indexState, err error) {
543 tr := trace.New("index", args.Name)
544
545 defer func() {
546 if err != nil {
547 tr.SetError()
548 tr.LazyPrintf("error: %v", err)
549 state = indexStateFail
550 metricFailingTotal.Inc()
551 }
552 tr.LazyPrintf("state: %s", state)
553 tr.Finish()
554 }()
555
556 tr.LazyPrintf("branches: %v", args.Branches)
557
558 if len(args.Branches) == 0 {
559 return indexStateEmpty, createEmptyShard(args)
560 }
561
562 repositoryName := args.Name
563 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
564 tr.LazyPrintf("marking this repository for delta build")
565 args.UseDelta = true
566 }
567
568 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
569
570 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
571 tr.LazyPrintf("skipping symbols calculation")
572 args.Symbols = false
573 }
574
575 reason := "forced"
576
577 if args.Incremental {
578 bo := args.BuildOptions()
579 bo.SetDefaults()
580 incrementalState, fn := bo.IndexState()
581 reason = string(incrementalState)
582 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
583
584 switch incrementalState {
585 case build.IndexStateEqual:
586 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn)
587 return indexStateNoop, nil
588
589 case build.IndexStateMeta:
590 log.Printf("updating index.meta %s", args.String())
591
592 if err := mergeMeta(bo); err != nil {
593 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
594 } else {
595 return indexStateSuccessMeta, nil
596 }
597
598 case build.IndexStateCorrupt:
599 log.Printf("falling back to full update: corrupt index: %s", args.String())
600 }
601 }
602
603 log.Printf("updating index %s reason=%s", args.String(), reason)
604
605 metricIndexingTotal.Inc()
606 c := gitIndexConfig{
607 runCmd: func(cmd *exec.Cmd) error {
608 return s.loggedRun(tr, cmd)
609 },
610
611 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
612 return args.BuildOptions().FindRepositoryMetadata()
613 },
614 timeout: s.timeout,
615 }
616
617 err = gitIndex(c, args, s.Sourcegraph, s.logger)
618 if err != nil {
619 return indexStateFail, err
620 }
621
622 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
623 s.logger.Error("failed to update index status",
624 sglog.String("repo", args.Name),
625 sglog.Uint32("id", args.RepoID),
626 sglogBranches("branches", args.Branches),
627 sglog.Error(err),
628 )
629 }
630
631 return indexStateSuccess, nil
632}
633
634// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
635// it can update the zoekt_repos table.
636func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
637 // We need to read from disk for IndexTime.
638 _, metadata, ok, err := c.findRepositoryMetadata(args)
639 if err != nil {
640 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
641 }
642 if !ok {
643 return errors.New("failed to find metadata for new/updated index")
644 }
645
646 status := []indexStatus{{
647 RepoID: args.RepoID,
648 Branches: args.Branches,
649 IndexTimeUnix: metadata.IndexTime.Unix(),
650 }}
651 if err := sg.UpdateIndexStatus(status); err != nil {
652 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
653 }
654
655 return nil
656}
657
658func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
659 ss := make([]string, len(branches))
660 for i, b := range branches {
661 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
662 }
663 return sglog.Strings(key, ss)
664}
665
666func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
667 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
668 return &indexArgs{
669 IndexOptions: opts,
670 IndexDir: s.IndexDir,
671 Parallelism: parallelism,
672 Incremental: true,
673
674 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
675 FileLimit: 1 << 20,
676
677 ShardMerging: s.shardMerging,
678 }
679}
680
681// parallelism consults both the server flags and index options to determine the number
682// of shards to index in parallel. If the CPUCount index option is provided, it always
683// overrides the server flag.
684func (s *Server) parallelism(opts IndexOptions, maxProcs int) int {
685 var parallelism int
686 if opts.ShardConcurrency > 0 {
687 parallelism = int(opts.ShardConcurrency)
688 } else {
689 parallelism = s.CPUCount
690 }
691
692 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs
693 if parallelism > 4*maxProcs {
694 parallelism = 4 * maxProcs
695 }
696
697 // If index concurrency is set, then divide the parallelism by the number of
698 // repos we're indexing in parallel
699 if s.IndexConcurrency > 1 {
700 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency)))
701 }
702
703 return parallelism
704}
705
706func createEmptyShard(args *indexArgs) error {
707 bo := args.BuildOptions()
708 bo.SetDefaults()
709 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
710
711 if args.Incremental && bo.IncrementalSkipIndexing() {
712 return nil
713 }
714
715 builder, err := build.NewBuilder(*bo)
716 if err != nil {
717 return err
718 }
719 return builder.Finish()
720}
721
722// addDebugHandlers adds handlers specific to indexserver.
723func (s *Server) addDebugHandlers(mux *http.ServeMux) {
724 // Sourcegraph's site admin view requires indexserver to serve it's admin view
725 // on "/".
726 mux.Handle("/", http.HandlerFunc(s.handleRoot))
727
728 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
729 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
730 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
731 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
732 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
733 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
734}
735
736func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
737 if r.Method != "GET" {
738 w.Header().Set("Allow", "GET")
739 w.WriteHeader(http.StatusMethodNotAllowed)
740 return
741 }
742
743 response := struct {
744 Hostname string
745 }{
746 Hostname: s.hostname,
747 }
748
749 b, err := json.Marshal(response)
750 if err != nil {
751 http.Error(w, err.Error(), http.StatusInternalServerError)
752 return
753 }
754
755 w.Header().Set("Content-Type", "application/json; charset=utf-8")
756 w.Write(b)
757}
758
759var rootTmpl = template.Must(template.New("name").Parse(`
760<html>
761 <body>
762 <a href="debug">Debug</a><br />
763 <a href="debug/requests">Traces</a><br />
764 {{.IndexMsg}}<br />
765 <br />
766 <h3>Reindex</h3>
767 {{if .Repos}}
768 <a href="?show_repos=false">hide repos</a><br />
769 <table style="margin-top: 20px">
770 <th style="text-align:left">Name</th>
771 <th style="text-align:left">ID</th>
772 {{range .Repos}}
773 <tr>
774 <td>{{.Name}}</td>
775 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
776 </tr>
777 {{end}}
778 </table>
779 {{else}}
780 <a href="?show_repos=true">show repos</a><br />
781 {{end}}
782 </body>
783</html>
784`))
785
786func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
787 if r.Method != "GET" {
788 w.Header().Set("Allow", "GET")
789 w.WriteHeader(http.StatusMethodNotAllowed)
790 return
791 }
792
793 values := r.URL.Query()
794
795 // ?id=
796 indexMsg := ""
797 if v := values.Get("id"); v != "" {
798 id, err := strconv.Atoi(v)
799 if err != nil {
800 http.Error(w, err.Error(), http.StatusBadRequest)
801 return
802 }
803 indexMsg, _ = s.forceIndex(uint32(id))
804 }
805
806 // ?show_repos=
807 showRepos := false
808 if v := values.Get("show_repos"); v != "" {
809 showRepos, _ = strconv.ParseBool(v)
810 }
811
812 type Repo struct {
813 ID uint32
814 Name string
815 }
816 var data struct {
817 Repos []Repo
818 IndexMsg string
819 }
820
821 data.IndexMsg = indexMsg
822
823 if showRepos {
824 s.queue.Iterate(func(opts *IndexOptions) {
825 data.Repos = append(data.Repos, Repo{
826 ID: opts.RepoID,
827 Name: opts.Name,
828 })
829 })
830 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
831 }
832
833 _ = rootTmpl.Execute(w, data)
834}
835
836// handleReindex triggers a reindex asynocronously. If a reindex was triggered
837// the request returns with status 202. The caller can infer the new state of
838// the index by calling List.
839func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
840 if r.Method != http.MethodPost {
841 w.Header().Set("Allow", http.MethodPost)
842 w.WriteHeader(http.StatusMethodNotAllowed)
843 return
844 }
845
846 err := r.ParseForm()
847 if err != nil {
848 http.Error(w, err.Error(), http.StatusBadRequest)
849 return
850 }
851
852 id, err := strconv.Atoi(r.Form.Get("repo"))
853 if err != nil {
854 http.Error(w, err.Error(), http.StatusBadRequest)
855 return
856 }
857
858 go func() { s.forceIndex(uint32(id)) }()
859
860 // 202 Accepted
861 w.WriteHeader(http.StatusAccepted)
862}
863
864func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
865 withIndexed := true
866 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
867 withIndexed = b
868 }
869
870 var indexed []uint32
871 if withIndexed {
872 indexed = listIndexed(s.IndexDir)
873 }
874
875 repos, err := s.Sourcegraph.List(r.Context(), indexed)
876 if err != nil {
877 http.Error(w, err.Error(), http.StatusInternalServerError)
878 return
879 }
880
881 bw := bytes.Buffer{}
882
883 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
884
885 _, err = fmt.Fprintf(tw, "ID\tName\n")
886 if err != nil {
887 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
888 return
889 }
890
891 s.queue.mu.Lock()
892 name := ""
893 for _, id := range repos.IDs {
894 if item := s.queue.get(id); item != nil {
895 name = item.opts.Name
896 } else {
897 name = ""
898 }
899 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
900 if err != nil {
901 debug.Printf("handleDebugList: %s\n", err.Error())
902 }
903 }
904 s.queue.mu.Unlock()
905
906 if err != nil {
907 http.Error(w, err.Error(), http.StatusInternalServerError)
908 return
909 }
910
911 err = tw.Flush()
912 if err != nil {
913 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
914 return
915 }
916
917 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
918
919 if _, err := io.Copy(w, &bw); err != nil {
920 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
921 return
922 }
923}
924
925// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
926// can run this command during periods of low usage (evenings, weekends) to
927// trigger an initial merge run. In the steady-state, merges happen rarely, even
928// on busy instances, and users can rely on automatic merging instead.
929func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
930 // A merge operation can take very long, depending on the number merges and the
931 // target size of the compound shards. We run the merge in the background and
932 // return immediately to the user.
933 //
934 // We track the status of the merge with metricShardMergingRunning.
935 go func() {
936 s.doMerge()
937 }()
938 _, _ = w.Write([]byte("merging enqueued\n"))
939}
940
941func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
942 indexed := listIndexed(s.IndexDir)
943
944 bw := bytes.Buffer{}
945
946 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
947
948 _, err := fmt.Fprintf(tw, "ID\tName\n")
949 if err != nil {
950 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
951 return
952 }
953
954 s.queue.mu.Lock()
955 name := ""
956 for _, id := range indexed {
957 if item := s.queue.get(id); item != nil {
958 name = item.opts.Name
959 } else {
960 name = ""
961 }
962 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
963 if err != nil {
964 debug.Printf("handleDebugIndexed: %s\n", err.Error())
965 }
966 }
967 s.queue.mu.Unlock()
968
969 if err != nil {
970 http.Error(w, err.Error(), http.StatusInternalServerError)
971 return
972 }
973
974 err = tw.Flush()
975 if err != nil {
976 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
977 return
978 }
979
980 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
981
982 if _, err := io.Copy(w, &bw); err != nil {
983 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
984 return
985 }
986}
987
988// forceIndex will run the index job for repo name now. It will return always
989// return a string explaining what it did, even if it failed.
990func (s *Server) forceIndex(id uint32) (string, error) {
991 var opts IndexOptions
992 var err error
993 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
994 opts = o
995 }, func(_ uint32, e error) {
996 err = e
997 }, id)
998 if err != nil {
999 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
1000 }
1001
1002 args := s.indexArgs(opts)
1003 args.Incremental = false // force re-index
1004
1005 var state indexState
1006 ran := s.muIndexDir.With(opts.Name, func() {
1007 state, err = s.Index(args)
1008 })
1009 if !ran {
1010 return fmt.Sprintf("index job for repository already running: %s", args), nil
1011 }
1012 if err != nil {
1013 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
1014 }
1015 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
1016}
1017
1018func listIndexed(indexDir string) []uint32 {
1019 index := getShards(indexDir)
1020 metricNumIndexed.Set(float64(len(index)))
1021 repoIDs := make([]uint32, 0, len(index))
1022 for id := range index {
1023 repoIDs = append(repoIDs, id)
1024 }
1025 sort.Slice(repoIDs, func(i, j int) bool {
1026 return repoIDs[i] < repoIDs[j]
1027 })
1028 return repoIDs
1029}
1030
1031// setupTmpDir sets up a temporary directory on the same volume as the
1032// indexes.
1033//
1034// If main is true we will delete older temp directories left around. main is
1035// false when this is a debug command.
1036func setupTmpDir(logger sglog.Logger, main bool, index string) error {
1037 // change the target tmp directory depending on if it's our main daemon or a
1038 // debug sub command.
1039 dir := ".indexserver.debug.tmp"
1040 if main {
1041 dir = ".indexserver.tmp"
1042 }
1043
1044 tmpRoot := filepath.Join(index, dir)
1045
1046 if main {
1047 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot))
1048 err := os.RemoveAll(tmpRoot)
1049 if err != nil {
1050 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err))
1051 }
1052 }
1053
1054 if err := os.MkdirAll(tmpRoot, 0o755); err != nil {
1055 return err
1056 }
1057
1058 return os.Setenv("TMPDIR", tmpRoot)
1059}
1060
1061func printMetaData(fn string) error {
1062 repo, indexMeta, err := zoekt.ReadMetadataPath(fn)
1063 if err != nil {
1064 return err
1065 }
1066
1067 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1068 if err != nil {
1069 return err
1070 }
1071
1072 err = json.NewEncoder(os.Stdout).Encode(repo)
1073 if err != nil {
1074 return err
1075 }
1076 return nil
1077}
1078
1079func printShardStats(fn string) error {
1080 f, err := os.Open(fn)
1081 if err != nil {
1082 return err
1083 }
1084
1085 iFile, err := zoekt.NewIndexFile(f)
1086 if err != nil {
1087 return err
1088 }
1089
1090 return zoekt.PrintNgramStats(iFile)
1091}
1092
1093func srcLogLevelIsDebug() bool {
1094 lvl := os.Getenv(sglog.EnvLogLevel)
1095 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1096}
1097
1098func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1099 v := os.Getenv(k)
1100 if v == "" {
1101 return defaultVal
1102 }
1103 b, err := strconv.ParseBool(v)
1104 if err != nil {
1105 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1106 }
1107 return b
1108}
1109
1110func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1111 v := os.Getenv(k)
1112 if v == "" {
1113 return defaultVal
1114 }
1115 i, err := strconv.ParseInt(v, 10, 64)
1116 if err != nil {
1117 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1118 }
1119 return i
1120}
1121
1122func getEnvWithDefaultInt(k string, defaultVal int) int {
1123 v := os.Getenv(k)
1124 if v == "" {
1125 return defaultVal
1126 }
1127 i, err := strconv.Atoi(k)
1128 if err != nil {
1129 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1130 }
1131 return i
1132}
1133
1134func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1135 v := os.Getenv(k)
1136 if v == "" {
1137 return defaultVal
1138 }
1139 i, err := strconv.ParseUint(v, 10, 64)
1140 if err != nil {
1141 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1142 }
1143 return i
1144}
1145
1146func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1147 v := os.Getenv(k)
1148 if v == "" {
1149 return defaultVal
1150 }
1151 f, err := strconv.ParseFloat(v, 64)
1152 if err != nil {
1153 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1154 }
1155 return f
1156}
1157
1158func getEnvWithDefaultString(k string, defaultVal string) string {
1159 v := os.Getenv(k)
1160 if v == "" {
1161 return defaultVal
1162 }
1163 return v
1164}
1165
1166func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1167 v := os.Getenv(k)
1168 if v == "" {
1169 return defaultVal
1170 }
1171
1172 d, err := time.ParseDuration(v)
1173 if err != nil {
1174 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1175 }
1176 return d
1177}
1178
1179func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1180 set := map[string]struct{}{}
1181 for _, v := range strings.Split(os.Getenv(k), ",") {
1182 v = strings.TrimSpace(v)
1183 if v != "" {
1184 set[v] = struct{}{}
1185 }
1186 }
1187 return set
1188}
1189
1190func joinStringSet(set map[string]struct{}, sep string) string {
1191 var xs []string
1192 for x := range set {
1193 xs = append(xs, x)
1194 }
1195
1196 return strings.Join(xs, sep)
1197}
1198
1199func setCompoundShardCounter(indexDir string) {
1200 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt"))
1201 if err != nil {
1202 log.Printf("setCompoundShardCounter: %s\n", err)
1203 return
1204 }
1205 metricNumberCompoundShards.Set(float64(len(fns)))
1206}
1207
1208func rootCmd() *ffcli.Command {
1209 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1210 conf := rootConfig{
1211 Main: true,
1212 }
1213 conf.registerRootFlags(rootFs)
1214
1215 return &ffcli.Command{
1216 FlagSet: rootFs,
1217 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1218 Subcommands: []*ffcli.Command{debugCmd()},
1219 Exec: func(ctx context.Context, args []string) error {
1220 return startServer(conf)
1221 },
1222 }
1223}
1224
1225type rootConfig struct {
1226 // Main is true if this rootConfig is for our main long running command (the
1227 // indexserver). Debug commands should not set this value. This is used to
1228 // determine if we need to run tmpfriend.
1229 Main bool
1230
1231 root string
1232 interval time.Duration
1233 index string
1234 indexConcurrency int64
1235 listen string
1236 hostname string
1237 cpuFraction float64
1238 blockProfileRate int
1239
1240 // config values related to shard merging
1241 disableShardMerging bool
1242 vacuumInterval time.Duration
1243 mergeInterval time.Duration
1244 targetSize int64
1245 minSize int64
1246 minAgeDays int
1247
1248 // config values related to backoff indexing repos with one or more consecutive failures
1249 backoffDuration time.Duration
1250 maxBackoffDuration time.Duration
1251}
1252
1253func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1254 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1255 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1256 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
1257 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use")
1258 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1259 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1260 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1261 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate")
1262 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1263 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1264
1265 // flags related to shard merging
1266 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging")
1267 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1268 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1269 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB")
1270 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB")
1271 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1272}
1273
1274func startServer(conf rootConfig) error {
1275 s, err := newServer(conf)
1276 if err != nil {
1277 return err
1278 }
1279
1280 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate)
1281 setCompoundShardCounter(s.IndexDir)
1282
1283 if conf.listen != "" {
1284
1285 mux := http.NewServeMux()
1286 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1287 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1288 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1289 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1290 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1291 }...)
1292 s.addDebugHandlers(mux)
1293
1294 go func() {
1295 debug.Printf("serving HTTP on %s", conf.listen)
1296 log.Fatal(http.ListenAndServe(conf.listen, mux))
1297 }()
1298
1299 // Serve mux on a unix domain socket on a best-effort-basis so that
1300 // webserver can call the endpoints via the shared filesystem.
1301 //
1302 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1303 // on the socket due to permission errors. See
1304 // https://github.com/docker/for-mac/issues/6239
1305 go func() {
1306 serveHTTPOverSocket := func() error {
1307 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1308 // We cannot bind a socket to an existing pathname.
1309 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1310 return fmt.Errorf("error removing socket file: %s", socket)
1311 }
1312 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1313 // unixpacket).
1314 l, err := net.Listen("unix", socket)
1315 if err != nil {
1316 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1317 }
1318 // Indexserver (root) and webserver (Sourcegraph) run with
1319 // different users. Per default, the socket is created with
1320 // permission 755 (root root), which doesn't let webserver write to
1321 // it.
1322 //
1323 // See https://github.com/golang/go/issues/11822 for more context.
1324 if err := os.Chmod(socket, 0o777); err != nil {
1325 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1326 }
1327 debug.Printf("serving HTTP on %s", socket)
1328 return http.Serve(l, mux)
1329 }
1330 debug.Print(serveHTTPOverSocket())
1331 }()
1332 }
1333
1334 oc := &ownerChecker{
1335 Path: filepath.Join(conf.index, "owner.txt"),
1336 Hostname: conf.hostname,
1337 }
1338 go oc.Run()
1339
1340 logger := sglog.Scoped("metricsRegistration")
1341 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1342
1343 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1344 prometheus.DefaultRegisterer.MustRegister(c)
1345
1346 s.Run()
1347 return nil
1348}
1349
1350func newServer(conf rootConfig) (*Server, error) {
1351 logger := sglog.Scoped("server")
1352
1353 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1354 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1355 }
1356 if conf.index == "" {
1357 return nil, fmt.Errorf("must set -index")
1358 }
1359 if conf.root == "" {
1360 return nil, fmt.Errorf("must set -sourcegraph_url")
1361 }
1362 rootURL, err := url.Parse(conf.root)
1363 if err != nil {
1364 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1365 }
1366
1367 rootURL = addDefaultPort(rootURL)
1368
1369 // Tune GOMAXPROCS to match Linux container CPU quota.
1370 _, _ = maxprocs.Set()
1371
1372 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler.
1373 // The block profiler is disabled by default and should be enabled with care in production
1374 runtime.SetBlockProfileRate(conf.blockProfileRate)
1375
1376 // Automatically prepend our own path at the front, to minimize
1377 // required configuration.
1378 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1379 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1380 }
1381
1382 if _, err := os.Stat(conf.index); err != nil {
1383 if err := os.MkdirAll(conf.index, 0o755); err != nil {
1384 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1385 }
1386 }
1387
1388 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil {
1389 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1390 }
1391
1392 if srcLogLevelIsDebug() {
1393 debug = log.New(os.Stderr, "", log.LstdFlags)
1394 }
1395
1396 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1397 if len(reposWithSeparateIndexingMetrics) > 0 {
1398 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1399 }
1400
1401 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1402 if len(deltaBuildRepositoriesAllowList) > 0 {
1403 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1404 }
1405
1406 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1407 if deltaShardNumberFallbackThreshold > 0 {
1408 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1409 } else {
1410 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1411 }
1412
1413 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1414 if len(reposShouldSkipSymbolsCalculation) > 0 {
1415 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1416 }
1417
1418 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout)
1419 if indexingTimeout != defaultIndexingTimeout {
1420 debug.Printf("using configured indexing timeout: %s", indexingTimeout)
1421 }
1422
1423 var sg Sourcegraph
1424 if rootURL.IsAbs() {
1425 var batchSize int
1426 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1427 batchSize, err = strconv.Atoi(v)
1428 if err != nil {
1429 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1430 }
1431 }
1432
1433 opts := []SourcegraphClientOption{
1434 WithBatchSize(batchSize),
1435 }
1436
1437 logger := sglog.Scoped("zoektConfigurationGRPCClient")
1438 client, err := dialGRPCClient(rootURL.Host, logger)
1439 if err != nil {
1440 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1441 }
1442
1443 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...)
1444
1445 } else {
1446 sg = sourcegraphFake{
1447 RootDir: rootURL.String(),
1448 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1449 }
1450 }
1451
1452 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
1453 if cpuCount < 1 {
1454 cpuCount = 1
1455 }
1456
1457 if conf.indexConcurrency < 1 {
1458 conf.indexConcurrency = 1
1459 } else if conf.indexConcurrency > int64(cpuCount) {
1460 conf.indexConcurrency = int64(cpuCount)
1461 }
1462
1463 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1464
1465 return &Server{
1466 logger: logger,
1467 Sourcegraph: sg,
1468 IndexDir: conf.index,
1469 IndexConcurrency: int(conf.indexConcurrency),
1470 Interval: conf.interval,
1471 CPUCount: cpuCount,
1472 queue: *q,
1473 shardMerging: !conf.disableShardMerging,
1474 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1475 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1476 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1477 hostname: conf.hostname,
1478 mergeOpts: mergeOpts{
1479 vacuumInterval: conf.vacuumInterval,
1480 mergeInterval: conf.mergeInterval,
1481 targetSizeBytes: conf.targetSize * 1024 * 1024,
1482 minSizeBytes: conf.minSize * 1024 * 1024,
1483 minAgeDays: conf.minAgeDays,
1484 },
1485 timeout: indexingTimeout,
1486 }, err
1487}
1488
1489// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1490// for the indexed-search-configuration gRPC service.
1491//
1492// The default backoff strategy is modeled after the default settings used by
1493// retryablehttp.DefaultClient.
1494//
1495// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1496// - Unavailable
1497// - Aborted
1498//
1499//go:embed default_grpc_service_configuration.json
1500var defaultGRPCServiceConfigurationJSON string
1501
1502func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1503 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1504 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1505 return invoker(ctx, method, req, reply, cc, opts...)
1506 }
1507}
1508
1509func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1510 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1511 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1512 return streamer(ctx, desc, cc, method, opts...)
1513 }
1514}
1515
1516// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1517// This can be overridden by providing custom Server/Dial options.
1518const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1519
1520func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) {
1521 metrics := mustGetClientMetrics()
1522
1523 // If the service seems to be unavailable, this
1524 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1
1525 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s])
1526 retryOpts := []retry.CallOption{
1527 retry.WithMax(5),
1528 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)),
1529 retry.WithCodes(codes.Unavailable),
1530 }
1531
1532 opts := []grpc.DialOption{
1533 grpc.WithTransportCredentials(insecure.NewCredentials()),
1534 grpc.WithChainStreamInterceptor(
1535 metrics.StreamClientInterceptor(),
1536 messagesize.StreamClientInterceptor,
1537 internalActorStreamInterceptor(),
1538 internalerrs.LoggingStreamClientInterceptor(logger),
1539 internalerrs.PrometheusStreamClientInterceptor,
1540 retry.StreamClientInterceptor(retryOpts...),
1541 ),
1542 grpc.WithChainUnaryInterceptor(
1543 metrics.UnaryClientInterceptor(),
1544 messagesize.UnaryClientInterceptor,
1545 internalActorUnaryInterceptor(),
1546 internalerrs.LoggingUnaryClientInterceptor(logger),
1547 internalerrs.PrometheusUnaryClientInterceptor,
1548 retry.UnaryClientInterceptor(retryOpts...),
1549 ),
1550 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1551 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1552 }
1553
1554 opts = append(opts, additionalOpts...)
1555
1556 // Ensure that the message size options are set last, so they override any other
1557 // client-specific options that tweak the message size.
1558 //
1559 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1560 // take precedence over everything else with a uniform size setting that's easy to reason about.
1561 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1562
1563 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1564 // This is done lazily, so we can provide the client to use regardless of
1565 // whether we enabled gRPC or not initially.
1566 cc, err := grpc.Dial(addr, opts...)
1567 if err != nil {
1568 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1569 }
1570
1571 client := proto.NewZoektConfigurationServiceClient(cc)
1572 return client, nil
1573}
1574
1575// mustGetClientMetrics returns a singleton instance of the client metrics
1576// that are shared across all gRPC clients that this process creates.
1577//
1578// This function panics if the metrics cannot be registered with the default
1579// Prometheus registry.
1580func mustGetClientMetrics() *grpcprom.ClientMetrics {
1581 clientMetricsOnce.Do(func() {
1582 clientMetrics = grpcprom.NewClientMetrics(
1583 grpcprom.WithClientCounterOptions(),
1584 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
1585 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
1586 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
1587 )
1588
1589 prometheus.DefaultRegisterer.MustRegister(clientMetrics)
1590 })
1591
1592 return clientMetrics
1593}
1594
1595// addDefaultPort adds a default port to a URL if one is not specified.
1596//
1597// If the URL scheme is "http" and no port is specified, "80" is used.
1598// If the scheme is "https", "443" is used.
1599//
1600// The original URL is not mutated. A copy is modified and returned.
1601func addDefaultPort(original *url.URL) *url.URL {
1602 if original == nil {
1603 return nil // don't panic
1604 }
1605
1606 if !original.IsAbs() {
1607 return original // don't do anything if the URL doesn't look like a remote URL
1608 }
1609
1610 if original.Scheme == "http" && original.Port() == "" {
1611 u := cloneURL(original)
1612 u.Host = net.JoinHostPort(u.Host, "80")
1613 return u
1614 }
1615
1616 if original.Scheme == "https" && original.Port() == "" {
1617 u := cloneURL(original)
1618 u.Host = net.JoinHostPort(u.Host, "443")
1619 return u
1620 }
1621
1622 return original
1623}
1624
1625// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1626// This is copied from net/http/clone.go
1627func cloneURL(u *url.URL) *url.URL {
1628 if u == nil {
1629 return nil
1630 }
1631 u2 := new(url.URL)
1632 *u2 = *u
1633 if u.User != nil {
1634 u2.User = new(url.Userinfo)
1635 *u2.User = *u.User
1636 }
1637 return u2
1638}
1639
1640func main() {
1641 liblog := sglog.Init(sglog.Resource{
1642 Name: "zoekt-indexserver",
1643 Version: zoekt.Version,
1644 InstanceID: zoekt.HostnameBestEffort(),
1645 })
1646 defer liblog.Sync()
1647
1648 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1649 log.Fatal(err)
1650 }
1651}
1652
1653// getBoolFromEnvironmentVariables returns the boolean defined by the first environment
1654// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set.
1655//
1656// An error is returned of the provided environment variables fails to parse as a boolean.
1657func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) {
1658 for _, envVar := range envVarNames {
1659 v := os.Getenv(envVar)
1660 if v == "" {
1661 continue
1662 }
1663
1664 b, err := strconv.ParseBool(v)
1665 if err != nil {
1666 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err)
1667 }
1668
1669 return b, nil
1670 }
1671
1672 return defaultBool, nil
1673}