fork of https://github.com/sourcegraph/zoekt
1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled
2// repositories on sourcegraph
3package main
4
5import (
6 "bytes"
7 "context"
8 _ "embed"
9 "encoding/json"
10 "errors"
11 "flag"
12 "fmt"
13 "html/template"
14 "io"
15 "io/fs"
16 "log"
17 "math"
18 "math/rand"
19 "net"
20 "net/http"
21 "net/url"
22 "os"
23 "os/exec"
24 "os/signal"
25 "path/filepath"
26 "runtime"
27 "sort"
28 "strconv"
29 "strings"
30 "sync"
31 "text/tabwriter"
32 "time"
33
34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
36 "github.com/peterbourgon/ff/v3/ffcli"
37 "github.com/prometheus/client_golang/prometheus"
38 "github.com/prometheus/client_golang/prometheus/promauto"
39 sglog "github.com/sourcegraph/log"
40 "github.com/sourcegraph/mountinfo"
41 "go.uber.org/automaxprocs/maxprocs"
42 "golang.org/x/net/trace"
43 "golang.org/x/sys/unix"
44 "google.golang.org/grpc"
45 "google.golang.org/grpc/codes"
46 "google.golang.org/grpc/credentials/insecure"
47 "google.golang.org/grpc/metadata"
48
49 "github.com/sourcegraph/zoekt"
50 "github.com/sourcegraph/zoekt/build"
51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
52 "github.com/sourcegraph/zoekt/debugserver"
53 "github.com/sourcegraph/zoekt/grpc/internalerrs"
54 "github.com/sourcegraph/zoekt/grpc/messagesize"
55 "github.com/sourcegraph/zoekt/internal/profiler"
56)
57
58var (
59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
60 Name: "resolve_revisions_seconds",
61 Help: "A histogram of latencies for resolving all repository revisions.",
62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
63 })
64
65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
66 Name: "resolve_revision_seconds",
67 Help: "A histogram of latencies for resolving a repository revision.",
68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
69 }, []string{"success"}) // success=true|false
70
71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
72 Name: "get_index_options_total",
73 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
74 })
75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
76 Name: "get_index_options_error_total",
77 Help: "The total number of times we failed to get index options for a repository.",
78 })
79
80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
81 Name: "index_repo_seconds",
82 Help: "A histogram of latencies for indexing a repository.",
83 Buckets: prometheus.ExponentialBucketsRange(
84 (100 * time.Millisecond).Seconds(),
85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
86 20),
87 }, []string{
88 "state", // state is an indexState
89 "name", // name of the repository that was indexed
90 })
91
92 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{
93 Name: "index_indexing_delay_seconds",
94 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.",
95 Buckets: prometheus.ExponentialBuckets(60, 2, 12), // 1m -> ~3 days
96 }, []string{
97 "state", // state is an indexState
98 "name", // the name of the repository that was indexed
99 })
100
101 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
102 Name: "index_fetch_seconds",
103 Help: "A histogram of latencies for fetching a repository.",
104 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
105 }, []string{
106 "success", // true|false
107 "name", // the name of the repository that the commits were fetched from
108 })
109
110 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
111 Name: "index_incremental_index_state",
112 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
113 }, []string{"state"}) // state is build.IndexState
114
115 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
116 Name: "index_num_indexed",
117 Help: "Number of indexed repos by code host",
118 })
119
120 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
121 Name: "index_failing_total",
122 Help: "Counts failures to index (indexing activity, should be used with rate())",
123 })
124
125 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
126 Name: "index_indexing_total",
127 Help: "Counts indexings (indexing activity, should be used with rate())",
128 })
129
130 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
131 Name: "index_num_stopped_tracking_total",
132 Help: "Counts the number of repos we stopped tracking.",
133 })
134
135 clientMetricsOnce sync.Once
136 clientMetrics *grpcprom.ClientMetrics
137)
138
139// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
140// NOTE: if you change this, you must also update gitIndex to use the same value when fetching the repo.
141const MaxFileSize = 1 << 20
142
143// set of repositories that we want to capture separate indexing metrics for
144var reposWithSeparateIndexingMetrics = make(map[string]struct{})
145
146type indexState string
147
148const (
149 indexStateFail indexState = "fail"
150 indexStateSuccess indexState = "success"
151 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
152 indexStateNoop indexState = "noop" // We didn't need to update index
153 indexStateEmpty indexState = "empty" // index is empty (empty repo)
154)
155
156// Server is the main functionality of zoekt-sourcegraph-indexserver. It
157// exists to conveniently use all the options passed in via func main.
158type Server struct {
159 logger sglog.Logger
160
161 Sourcegraph Sourcegraph
162 BatchSize int
163
164 // IndexDir is the index directory to use.
165 IndexDir string
166
167 // IndexConcurrency is the number of repositories we index at once.
168 IndexConcurrency int
169
170 // Interval is how often we sync with Sourcegraph.
171 Interval time.Duration
172
173 // CPUCount is the number of CPUs to use for indexing shards.
174 CPUCount int
175
176 queue Queue
177
178 // muIndexDir protects the index directory from concurrent access.
179 muIndexDir indexMutex
180
181 // If true, shard merging is enabled.
182 shardMerging bool
183
184 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
185 // use delta-builds for instead of normal builds
186 deltaBuildRepositoriesAllowList map[string]struct{}
187
188 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
189 // before attempting a delta build.
190 deltaShardNumberFallbackThreshold uint64
191
192 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
193 // we skip calculating symbols metadata for during builds
194 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
195
196 hostname string
197
198 mergeOpts mergeOpts
199
200 // timeout defines how long the index server waits before killing an indexing job.
201 timeout time.Duration
202}
203
204var debug = log.New(io.Discard, "", log.LstdFlags)
205
206// our index commands should output something every 100mb they process.
207//
208// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
209// time." famous last words. A client was indexing a monorepo with 42
210// cores... 5m was not enough.
211const noOutputTimeout = 30 * time.Minute
212
213func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
214 out := &synchronizedBuffer{}
215 cmd.Stdout = out
216 cmd.Stderr = out
217
218 tr.LazyPrintf("%s", cmd.Args)
219
220 defer func() {
221 if err != nil {
222 outS := out.String()
223 tr.LazyPrintf("failed: %v", err)
224 tr.LazyPrintf("output: %s", out)
225 tr.SetError()
226 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
227 }
228 }()
229
230 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
231
232 if err := cmd.Start(); err != nil {
233 return err
234 }
235
236 errC := make(chan error)
237 go func() {
238 errC <- cmd.Wait()
239 }()
240
241 // This channel is set after we have sent sigquit. It allows us to follow up
242 // with a sigkill if the process doesn't quit after sigquit.
243 kill := make(<-chan time.Time)
244
245 lastLen := 0
246 for {
247 select {
248 case <-time.After(noOutputTimeout):
249 // Periodically check if we have had output. If not kill the process.
250 if out.Len() != lastLen {
251 lastLen = out.Len()
252 log.Printf("still running %s", cmd.Args)
253 } else {
254 // Send quit (C-\) first so we get a stack dump.
255 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
256 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
257 log.Println("quit failed:", err)
258 }
259
260 // send sigkill if still running in 10s
261 kill = time.After(10 * time.Second)
262 }
263
264 case <-kill:
265 log.Printf("still running, killing %s", cmd.Args)
266 if err := cmd.Process.Kill(); err != nil {
267 log.Println("kill failed:", err)
268 }
269
270 case err := <-errC:
271 if err != nil {
272 return err
273 }
274
275 tr.LazyPrintf("success")
276 return nil
277 }
278 }
279}
280
281// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
282// monitor the buffer while it is being written to.
283type synchronizedBuffer struct {
284 mu sync.Mutex
285 b bytes.Buffer
286}
287
288func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
289 sb.mu.Lock()
290 defer sb.mu.Unlock()
291 return sb.b.Write(p)
292}
293
294func (sb *synchronizedBuffer) Len() int {
295 sb.mu.Lock()
296 defer sb.mu.Unlock()
297 return sb.b.Len()
298}
299
300func (sb *synchronizedBuffer) String() string {
301 sb.mu.Lock()
302 defer sb.mu.Unlock()
303 return sb.b.String()
304}
305
306// pauseFileName if present in IndexDir will stop index jobs from
307// running. This is to make it possible to experiment with the content of the
308// IndexDir without the indexserver writing to it.
309const pauseFileName = "PAUSE"
310
311// Run the sync loop. This blocks forever.
312func (s *Server) Run() {
313 removeIncompleteShards(s.IndexDir)
314
315 // Start a goroutine which updates the queue with commits to index.
316 go func() {
317 // We update the list of indexed repos every Interval. To speed up manual
318 // testing we also listen for SIGUSR1 to trigger updates.
319 //
320 // "pkill -SIGUSR1 zoekt-sourcegra"
321 for range jitterTicker(s.Interval, unix.SIGUSR1) {
322 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
323 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
324 continue
325 }
326
327 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
328 if err != nil {
329 log.Printf("error listing repos: %s", err)
330 continue
331 }
332
333 debug.Printf("updating index queue with %d repositories", len(repos.IDs))
334
335 // Stop indexing repos we don't need to track anymore
336 removed := s.queue.MaybeRemoveMissing(repos.IDs)
337 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
338 if len(removed) > 0 {
339 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
340 }
341
342 cleanupDone := make(chan struct{})
343 go func() {
344 defer close(cleanupDone)
345 s.muIndexDir.Global(func() {
346 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
347 })
348 }()
349
350 repos.IterateIndexOptions(s.queue.AddOrUpdate)
351
352 // IterateIndexOptions will only iterate over repositories that have
353 // changed since we last called list. However, we want to add all IDs
354 // back onto the queue just to check that what is on disk is still
355 // correct. This will use the last IndexOptions we stored in the
356 // queue. The repositories not on the queue (missing) need a forced
357 // fetch of IndexOptions.
358 missing := s.queue.Bump(repos.IDs)
359 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
360
361 setCompoundShardCounter(s.IndexDir)
362
363 <-cleanupDone
364 }
365 }()
366
367 go func() {
368 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
369 if s.shardMerging {
370 s.vacuum()
371 }
372 }
373 }()
374
375 go func() {
376 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
377 if s.shardMerging {
378 s.doMerge()
379 }
380 }
381 }()
382
383 for i := 0; i < s.IndexConcurrency; i++ {
384 go s.processQueue()
385 }
386
387 // block forever
388 select {}
389}
390
391// formatList returns a comma-separated list of the first min(len(v), m) items.
392func formatListUint32(v []uint32, m int) string {
393 if len(v) < m {
394 m = len(v)
395 }
396
397 sb := strings.Builder{}
398 for i := 0; i < m; i++ {
399 fmt.Fprintf(&sb, "%d, ", v[i])
400 }
401
402 if len(v) > m {
403 sb.WriteString("...")
404 }
405
406 return strings.TrimRight(sb.String(), ", ")
407}
408
409func (s *Server) processQueue() {
410 for {
411 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
412 time.Sleep(time.Second)
413 continue
414 }
415
416 item, ok := s.queue.Pop()
417 if !ok {
418 time.Sleep(time.Second)
419 continue
420 }
421
422 opts := item.Opts
423 args := s.indexArgs(opts)
424
425 ran := s.muIndexDir.With(opts.Name, func() {
426 // only record time taken once we hold the lock. This avoids us
427 // recording time taken while merging/cleanup runs.
428 start := time.Now()
429
430 state, err := s.Index(args)
431
432 elapsed := time.Since(start)
433 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
434
435 indexDelay := time.Since(item.DateAddedToQueue)
436 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds())
437
438 if err != nil {
439 log.Printf("error indexing %s: %s", args.String(), err)
440 }
441
442 switch state {
443 case indexStateSuccess:
444 var branches []string
445 for _, b := range args.Branches {
446 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
447 }
448 s.logger.Info("updated index",
449 sglog.String("repo", args.Name),
450 sglog.Uint32("id", args.RepoID),
451 sglog.Strings("branches", branches),
452 sglog.Duration("duration", elapsed),
453 sglog.Duration("index_delay", indexDelay),
454 )
455 case indexStateSuccessMeta:
456 log.Printf("updated meta %s in %v", args.String(), elapsed)
457 }
458 s.queue.SetIndexed(opts, state)
459 })
460
461 if !ran {
462 // Someone else is processing the repository. We can just skip this job
463 // since the repository will be added back to the queue and we will
464 // converge to the correct behaviour.
465 debug.Printf("index job for repository already running: %s", args)
466 continue
467 }
468 }
469}
470
471// repoNameForMetric returns a normalized version of the given repository name that is
472// suitable for use with Prometheus metrics.
473func repoNameForMetric(repo string) string {
474 // Check to see if we want to be able to capture separate indexing metrics for this repository.
475 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
476 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
477 return repo
478 }
479
480 return ""
481}
482
483func batched(slice []uint32, size int) <-chan []uint32 {
484 c := make(chan []uint32)
485 go func() {
486 for len(slice) > 0 {
487 if size > len(slice) {
488 size = len(slice)
489 }
490 c <- slice[:size]
491 slice = slice[size:]
492 }
493 close(c)
494 }()
495 return c
496}
497
498// jitterTicker returns a ticker which ticks with a jitter. Each tick is
499// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
500//
501// sig is a list of signals which also cause the ticker to fire. This is a
502// convenience to allow manually triggering of the ticker.
503func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
504 ticker := make(chan struct{})
505
506 go func() {
507 for {
508 ticker <- struct{}{}
509 ns := int64(d)
510 jitter := rand.Int63n(ns)
511 time.Sleep(time.Duration(ns/2 + jitter))
512 }
513 }()
514
515 go func() {
516 if len(sig) == 0 {
517 return
518 }
519
520 c := make(chan os.Signal, 1)
521 signal.Notify(c, sig...)
522 for range c {
523 ticker <- struct{}{}
524 }
525 }()
526
527 return ticker
528}
529
530// Index starts an index job for repo name at commit.
531func (s *Server) Index(args *indexArgs) (state indexState, err error) {
532 tr := trace.New("index", args.Name)
533 tr.SetMaxEvents(30) // Ensure we capture all indexing events
534
535 defer func() {
536 if err != nil {
537 tr.SetError()
538 tr.LazyPrintf("error: %v", err)
539 state = indexStateFail
540 metricFailingTotal.Inc()
541 }
542 tr.LazyPrintf("state: %s", state)
543 tr.Finish()
544 }()
545
546 tr.LazyPrintf("branches: %v", args.Branches)
547
548 if len(args.Branches) == 0 {
549 return indexStateEmpty, createEmptyShard(args)
550 }
551
552 repositoryName := args.Name
553 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
554 tr.LazyPrintf("marking this repository for delta build")
555 args.UseDelta = true
556 }
557
558 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
559
560 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
561 tr.LazyPrintf("skipping symbols calculation")
562 args.Symbols = false
563 }
564
565 reason := "forced"
566
567 if args.Incremental {
568 bo := args.BuildOptions()
569 bo.SetDefaults()
570 incrementalState, fn := bo.IndexState()
571 reason = string(incrementalState)
572 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
573
574 switch incrementalState {
575 case build.IndexStateEqual:
576 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn)
577 return indexStateNoop, nil
578
579 case build.IndexStateMeta:
580 log.Printf("updating index.meta %s", args.String())
581
582 if err := mergeMeta(bo); err != nil {
583 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
584 } else {
585 return indexStateSuccessMeta, nil
586 }
587
588 case build.IndexStateCorrupt:
589 log.Printf("falling back to full update: corrupt index: %s", args.String())
590 }
591 }
592
593 log.Printf("updating index %s reason=%s", args.String(), reason)
594
595 metricIndexingTotal.Inc()
596 c := gitIndexConfig{
597 runCmd: func(cmd *exec.Cmd) error {
598 return s.loggedRun(tr, cmd)
599 },
600
601 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
602 return args.BuildOptions().FindRepositoryMetadata()
603 },
604 timeout: s.timeout,
605 }
606
607 err = gitIndex(c, args, s.Sourcegraph, s.logger)
608 if err != nil {
609 return indexStateFail, err
610 }
611
612 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
613 s.logger.Error("failed to update index status",
614 sglog.String("repo", args.Name),
615 sglog.Uint32("id", args.RepoID),
616 sglogBranches("branches", args.Branches),
617 sglog.Error(err),
618 )
619 }
620
621 return indexStateSuccess, nil
622}
623
624// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
625// it can update the zoekt_repos table.
626func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
627 // We need to read from disk for IndexTime.
628 _, metadata, ok, err := c.findRepositoryMetadata(args)
629 if err != nil {
630 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
631 }
632 if !ok {
633 return errors.New("failed to find metadata for new/updated index")
634 }
635
636 status := []indexStatus{{
637 RepoID: args.RepoID,
638 Branches: args.Branches,
639 IndexTimeUnix: metadata.IndexTime.Unix(),
640 }}
641 if err := sg.UpdateIndexStatus(status); err != nil {
642 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
643 }
644
645 return nil
646}
647
648func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
649 ss := make([]string, len(branches))
650 for i, b := range branches {
651 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
652 }
653 return sglog.Strings(key, ss)
654}
655
656func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
657 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
658 return &indexArgs{
659 IndexOptions: opts,
660 IndexDir: s.IndexDir,
661 Parallelism: parallelism,
662 Incremental: true,
663 FileLimit: MaxFileSize,
664 ShardMerging: s.shardMerging,
665 }
666}
667
668// parallelism consults both the server flags and index options to determine the number
669// of shards to index in parallel. If the CPUCount index option is provided, it always
670// overrides the server flag.
671func (s *Server) parallelism(opts IndexOptions, maxProcs int) int {
672 var parallelism int
673 if opts.ShardConcurrency > 0 {
674 parallelism = int(opts.ShardConcurrency)
675 } else {
676 parallelism = s.CPUCount
677 }
678
679 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs
680 if parallelism > 4*maxProcs {
681 parallelism = 4 * maxProcs
682 }
683
684 // If index concurrency is set, then divide the parallelism by the number of
685 // repos we're indexing in parallel
686 if s.IndexConcurrency > 1 {
687 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency)))
688 }
689
690 return parallelism
691}
692
693func createEmptyShard(args *indexArgs) error {
694 bo := args.BuildOptions()
695 bo.SetDefaults()
696 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
697
698 if args.Incremental && bo.IncrementalSkipIndexing() {
699 return nil
700 }
701
702 builder, err := build.NewBuilder(*bo)
703 if err != nil {
704 return err
705 }
706 return builder.Finish()
707}
708
709// addDebugHandlers adds handlers specific to indexserver.
710func (s *Server) addDebugHandlers(mux *http.ServeMux) {
711 // Sourcegraph's site admin view requires indexserver to serve it's admin view
712 // on "/".
713 mux.Handle("/", http.HandlerFunc(s.handleRoot))
714
715 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
716 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
717 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
718 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
719 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
720 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
721}
722
723func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
724 if r.Method != "GET" {
725 w.Header().Set("Allow", "GET")
726 w.WriteHeader(http.StatusMethodNotAllowed)
727 return
728 }
729
730 response := struct {
731 Hostname string
732 }{
733 Hostname: s.hostname,
734 }
735
736 b, err := json.Marshal(response)
737 if err != nil {
738 http.Error(w, err.Error(), http.StatusInternalServerError)
739 return
740 }
741
742 w.Header().Set("Content-Type", "application/json; charset=utf-8")
743 w.Write(b)
744}
745
746var rootTmpl = template.Must(template.New("name").Parse(`
747<html>
748 <body>
749 <a href="debug">Debug</a><br />
750 <a href="debug/requests">Traces</a><br />
751 {{.IndexMsg}}<br />
752 <br />
753 <h3>Reindex</h3>
754 {{if .Repos}}
755 <a href="?show_repos=false">hide repos</a><br />
756 <table style="margin-top: 20px">
757 <th style="text-align:left">Name</th>
758 <th style="text-align:left">ID (click to reindex)</th>
759 {{range .Repos}}
760 <tr>
761 <td>{{.Name}}</td>
762 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
763 </tr>
764 {{end}}
765 </table>
766 {{else}}
767 <a href="?show_repos=true">show repos</a><br />
768 {{end}}
769 </body>
770</html>
771`))
772
773func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
774 if r.Method != "GET" {
775 w.Header().Set("Allow", "GET")
776 w.WriteHeader(http.StatusMethodNotAllowed)
777 return
778 }
779
780 values := r.URL.Query()
781
782 // ?id=
783 indexMsg := ""
784 if v := values.Get("id"); v != "" {
785 id, err := strconv.Atoi(v)
786 if err != nil {
787 http.Error(w, err.Error(), http.StatusBadRequest)
788 return
789 }
790 indexMsg, _ = s.forceIndex(uint32(id))
791 }
792
793 // ?show_repos=
794 showRepos := false
795 if v := values.Get("show_repos"); v != "" {
796 showRepos, _ = strconv.ParseBool(v)
797 }
798
799 type Repo struct {
800 ID uint32
801 Name string
802 }
803 var data struct {
804 Repos []Repo
805 IndexMsg string
806 }
807
808 data.IndexMsg = indexMsg
809
810 if showRepos {
811 s.queue.Iterate(func(opts *IndexOptions) {
812 data.Repos = append(data.Repos, Repo{
813 ID: opts.RepoID,
814 Name: opts.Name,
815 })
816 })
817 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
818 }
819
820 _ = rootTmpl.Execute(w, data)
821}
822
823// handleReindex triggers a reindex asynocronously. If a reindex was triggered
824// the request returns with status 202. The caller can infer the new state of
825// the index by calling List.
826func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
827 if r.Method != http.MethodPost {
828 w.Header().Set("Allow", http.MethodPost)
829 w.WriteHeader(http.StatusMethodNotAllowed)
830 return
831 }
832
833 err := r.ParseForm()
834 if err != nil {
835 http.Error(w, err.Error(), http.StatusBadRequest)
836 return
837 }
838
839 id, err := strconv.Atoi(r.Form.Get("repo"))
840 if err != nil {
841 http.Error(w, err.Error(), http.StatusBadRequest)
842 return
843 }
844
845 go func() { s.forceIndex(uint32(id)) }()
846
847 // 202 Accepted
848 w.WriteHeader(http.StatusAccepted)
849}
850
851func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
852 withIndexed := true
853 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
854 withIndexed = b
855 }
856
857 var indexed []uint32
858 if withIndexed {
859 indexed = listIndexed(s.IndexDir)
860 }
861
862 repos, err := s.Sourcegraph.List(r.Context(), indexed)
863 if err != nil {
864 http.Error(w, err.Error(), http.StatusInternalServerError)
865 return
866 }
867
868 bw := bytes.Buffer{}
869
870 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
871
872 _, err = fmt.Fprintf(tw, "ID\tName\n")
873 if err != nil {
874 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
875 return
876 }
877
878 s.queue.mu.Lock()
879 name := ""
880 for _, id := range repos.IDs {
881 if item := s.queue.get(id); item != nil {
882 name = item.opts.Name
883 } else {
884 name = ""
885 }
886 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
887 if err != nil {
888 debug.Printf("handleDebugList: %s\n", err.Error())
889 }
890 }
891 s.queue.mu.Unlock()
892
893 if err != nil {
894 http.Error(w, err.Error(), http.StatusInternalServerError)
895 return
896 }
897
898 err = tw.Flush()
899 if err != nil {
900 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
901 return
902 }
903
904 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
905
906 if _, err := io.Copy(w, &bw); err != nil {
907 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
908 return
909 }
910}
911
912// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
913// can run this command during periods of low usage (evenings, weekends) to
914// trigger an initial merge run. In the steady-state, merges happen rarely, even
915// on busy instances, and users can rely on automatic merging instead.
916func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
917 // A merge operation can take very long, depending on the number merges and the
918 // target size of the compound shards. We run the merge in the background and
919 // return immediately to the user.
920 //
921 // We track the status of the merge with metricShardMergingRunning.
922 go func() {
923 s.doMerge()
924 }()
925 _, _ = w.Write([]byte("merging enqueued\n"))
926}
927
928func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
929 indexed := listIndexed(s.IndexDir)
930
931 bw := bytes.Buffer{}
932
933 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
934
935 _, err := fmt.Fprintf(tw, "ID\tName\n")
936 if err != nil {
937 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
938 return
939 }
940
941 s.queue.mu.Lock()
942 name := ""
943 for _, id := range indexed {
944 if item := s.queue.get(id); item != nil {
945 name = item.opts.Name
946 } else {
947 name = ""
948 }
949 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
950 if err != nil {
951 debug.Printf("handleDebugIndexed: %s\n", err.Error())
952 }
953 }
954 s.queue.mu.Unlock()
955
956 if err != nil {
957 http.Error(w, err.Error(), http.StatusInternalServerError)
958 return
959 }
960
961 err = tw.Flush()
962 if err != nil {
963 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
964 return
965 }
966
967 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
968
969 if _, err := io.Copy(w, &bw); err != nil {
970 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
971 return
972 }
973}
974
975// forceIndex will run the index job for repo name now. It will return always
976// return a string explaining what it did, even if it failed.
977func (s *Server) forceIndex(id uint32) (string, error) {
978 var opts IndexOptions
979 var err error
980 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
981 opts = o
982 }, func(_ uint32, e error) {
983 err = e
984 }, id)
985 if err != nil {
986 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
987 }
988
989 args := s.indexArgs(opts)
990 args.Incremental = false // force re-index
991
992 var state indexState
993 ran := s.muIndexDir.With(opts.Name, func() {
994 state, err = s.Index(args)
995 })
996 if !ran {
997 return fmt.Sprintf("index job for repository already running: %s", args), nil
998 }
999 if err != nil {
1000 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
1001 }
1002 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
1003}
1004
1005func listIndexed(indexDir string) []uint32 {
1006 index := getShards(indexDir)
1007 metricNumIndexed.Set(float64(len(index)))
1008 repoIDs := make([]uint32, 0, len(index))
1009 for id := range index {
1010 repoIDs = append(repoIDs, id)
1011 }
1012 sort.Slice(repoIDs, func(i, j int) bool {
1013 return repoIDs[i] < repoIDs[j]
1014 })
1015 return repoIDs
1016}
1017
1018// setupTmpDir sets up a temporary directory on the same volume as the
1019// indexes.
1020//
1021// If main is true we will delete older temp directories left around. main is
1022// false when this is a debug command.
1023func setupTmpDir(logger sglog.Logger, main bool, index string) error {
1024 // change the target tmp directory depending on if it's our main daemon or a
1025 // debug sub command.
1026 dir := ".indexserver.debug.tmp"
1027 if main {
1028 dir = ".indexserver.tmp"
1029 }
1030
1031 tmpRoot := filepath.Join(index, dir)
1032
1033 if main {
1034 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot))
1035 err := os.RemoveAll(tmpRoot)
1036 if err != nil {
1037 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err))
1038 }
1039 }
1040
1041 if err := os.MkdirAll(tmpRoot, 0o755); err != nil {
1042 return err
1043 }
1044
1045 return os.Setenv("TMPDIR", tmpRoot)
1046}
1047
1048func printMetaData(fn string) error {
1049 repo, indexMeta, err := zoekt.ReadMetadataPath(fn)
1050 if err != nil {
1051 return err
1052 }
1053
1054 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1055 if err != nil {
1056 return err
1057 }
1058
1059 err = json.NewEncoder(os.Stdout).Encode(repo)
1060 if err != nil {
1061 return err
1062 }
1063 return nil
1064}
1065
1066func printShardStats(fn string) error {
1067 f, err := os.Open(fn)
1068 if err != nil {
1069 return err
1070 }
1071
1072 iFile, err := zoekt.NewIndexFile(f)
1073 if err != nil {
1074 return err
1075 }
1076
1077 return zoekt.PrintNgramStats(iFile)
1078}
1079
1080func srcLogLevelIsDebug() bool {
1081 lvl := os.Getenv(sglog.EnvLogLevel)
1082 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1083}
1084
1085func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1086 v := os.Getenv(k)
1087 if v == "" {
1088 return defaultVal
1089 }
1090 b, err := strconv.ParseBool(v)
1091 if err != nil {
1092 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1093 }
1094 return b
1095}
1096
1097func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1098 v := os.Getenv(k)
1099 if v == "" {
1100 return defaultVal
1101 }
1102 i, err := strconv.ParseInt(v, 10, 64)
1103 if err != nil {
1104 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1105 }
1106 return i
1107}
1108
1109func getEnvWithDefaultInt(k string, defaultVal int) int {
1110 v := os.Getenv(k)
1111 if v == "" {
1112 return defaultVal
1113 }
1114 i, err := strconv.Atoi(k)
1115 if err != nil {
1116 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1117 }
1118 return i
1119}
1120
1121func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1122 v := os.Getenv(k)
1123 if v == "" {
1124 return defaultVal
1125 }
1126 i, err := strconv.ParseUint(v, 10, 64)
1127 if err != nil {
1128 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1129 }
1130 return i
1131}
1132
1133func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1134 v := os.Getenv(k)
1135 if v == "" {
1136 return defaultVal
1137 }
1138 f, err := strconv.ParseFloat(v, 64)
1139 if err != nil {
1140 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1141 }
1142 return f
1143}
1144
1145func getEnvWithDefaultString(k string, defaultVal string) string {
1146 v := os.Getenv(k)
1147 if v == "" {
1148 return defaultVal
1149 }
1150 return v
1151}
1152
1153func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1154 v := os.Getenv(k)
1155 if v == "" {
1156 return defaultVal
1157 }
1158
1159 d, err := time.ParseDuration(v)
1160 if err != nil {
1161 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1162 }
1163 return d
1164}
1165
1166func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1167 set := map[string]struct{}{}
1168 for _, v := range strings.Split(os.Getenv(k), ",") {
1169 v = strings.TrimSpace(v)
1170 if v != "" {
1171 set[v] = struct{}{}
1172 }
1173 }
1174 return set
1175}
1176
1177func joinStringSet(set map[string]struct{}, sep string) string {
1178 var xs []string
1179 for x := range set {
1180 xs = append(xs, x)
1181 }
1182
1183 return strings.Join(xs, sep)
1184}
1185
1186func setCompoundShardCounter(indexDir string) {
1187 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt"))
1188 if err != nil {
1189 log.Printf("setCompoundShardCounter: %s\n", err)
1190 return
1191 }
1192 metricNumberCompoundShards.Set(float64(len(fns)))
1193}
1194
1195func rootCmd() *ffcli.Command {
1196 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1197 conf := rootConfig{
1198 Main: true,
1199 }
1200 conf.registerRootFlags(rootFs)
1201
1202 return &ffcli.Command{
1203 FlagSet: rootFs,
1204 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1205 Subcommands: []*ffcli.Command{debugCmd()},
1206 Exec: func(ctx context.Context, args []string) error {
1207 return startServer(conf)
1208 },
1209 }
1210}
1211
1212type rootConfig struct {
1213 // Main is true if this rootConfig is for our main long running command (the
1214 // indexserver). Debug commands should not set this value. This is used to
1215 // determine if we need to run tmpfriend.
1216 Main bool
1217
1218 root string
1219 interval time.Duration
1220 index string
1221 indexConcurrency int64
1222 listen string
1223 hostname string
1224 cpuFraction float64
1225 blockProfileRate int
1226
1227 // config values related to shard merging
1228 disableShardMerging bool
1229 vacuumInterval time.Duration
1230 mergeInterval time.Duration
1231 targetSize int64
1232 minSize int64
1233 minAgeDays int
1234
1235 // config values related to backoff indexing repos with one or more consecutive failures
1236 backoffDuration time.Duration
1237 maxBackoffDuration time.Duration
1238}
1239
1240func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1241 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1242 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1243 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
1244 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use")
1245 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1246 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1247 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1248 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate")
1249 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1250 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1251
1252 // flags related to shard merging
1253 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging")
1254 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1255 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1256 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB")
1257 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB")
1258 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1259}
1260
1261func startServer(conf rootConfig) error {
1262 s, err := newServer(conf)
1263 if err != nil {
1264 return err
1265 }
1266
1267 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate)
1268 setCompoundShardCounter(s.IndexDir)
1269
1270 if conf.listen != "" {
1271
1272 mux := http.NewServeMux()
1273 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1274 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1275 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1276 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1277 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1278 }...)
1279 s.addDebugHandlers(mux)
1280
1281 go func() {
1282 debug.Printf("serving HTTP on %s", conf.listen)
1283 log.Fatal(http.ListenAndServe(conf.listen, mux))
1284 }()
1285
1286 // Serve mux on a unix domain socket on a best-effort-basis so that
1287 // webserver can call the endpoints via the shared filesystem.
1288 //
1289 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1290 // on the socket due to permission errors. See
1291 // https://github.com/docker/for-mac/issues/6239
1292 go func() {
1293 serveHTTPOverSocket := func() error {
1294 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1295 // We cannot bind a socket to an existing pathname.
1296 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1297 return fmt.Errorf("error removing socket file: %s", socket)
1298 }
1299 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1300 // unixpacket).
1301 l, err := net.Listen("unix", socket)
1302 if err != nil {
1303 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1304 }
1305 // Indexserver (root) and webserver (Sourcegraph) run with
1306 // different users. Per default, the socket is created with
1307 // permission 755 (root root), which doesn't let webserver write to
1308 // it.
1309 //
1310 // See https://github.com/golang/go/issues/11822 for more context.
1311 if err := os.Chmod(socket, 0o777); err != nil {
1312 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1313 }
1314 debug.Printf("serving HTTP on %s", socket)
1315 return http.Serve(l, mux)
1316 }
1317 debug.Print(serveHTTPOverSocket())
1318 }()
1319 }
1320
1321 oc := &ownerChecker{
1322 Path: filepath.Join(conf.index, "owner.txt"),
1323 Hostname: conf.hostname,
1324 }
1325 go oc.Run()
1326
1327 logger := sglog.Scoped("metricsRegistration")
1328 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1329
1330 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1331 prometheus.DefaultRegisterer.MustRegister(c)
1332
1333 s.Run()
1334 return nil
1335}
1336
1337func newServer(conf rootConfig) (*Server, error) {
1338 logger := sglog.Scoped("server")
1339
1340 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1341 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1342 }
1343 if conf.index == "" {
1344 return nil, fmt.Errorf("must set -index")
1345 }
1346 if conf.root == "" {
1347 return nil, fmt.Errorf("must set -sourcegraph_url")
1348 }
1349 rootURL, err := url.Parse(conf.root)
1350 if err != nil {
1351 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1352 }
1353
1354 rootURL = addDefaultPort(rootURL)
1355
1356 // Tune GOMAXPROCS to match Linux container CPU quota.
1357 _, _ = maxprocs.Set()
1358
1359 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler.
1360 // The block profiler is disabled by default and should be enabled with care in production
1361 runtime.SetBlockProfileRate(conf.blockProfileRate)
1362
1363 // Automatically prepend our own path at the front, to minimize
1364 // required configuration.
1365 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1366 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1367 }
1368
1369 if _, err := os.Stat(conf.index); err != nil {
1370 if err := os.MkdirAll(conf.index, 0o755); err != nil {
1371 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1372 }
1373 }
1374
1375 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil {
1376 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1377 }
1378
1379 if srcLogLevelIsDebug() {
1380 debug = log.New(os.Stderr, "", log.LstdFlags)
1381 }
1382
1383 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1384 if len(reposWithSeparateIndexingMetrics) > 0 {
1385 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1386 }
1387
1388 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1389 if len(deltaBuildRepositoriesAllowList) > 0 {
1390 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1391 }
1392
1393 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1394 if deltaShardNumberFallbackThreshold > 0 {
1395 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1396 } else {
1397 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1398 }
1399
1400 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1401 if len(reposShouldSkipSymbolsCalculation) > 0 {
1402 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1403 }
1404
1405 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout)
1406 if indexingTimeout != defaultIndexingTimeout {
1407 debug.Printf("using configured indexing timeout: %s", indexingTimeout)
1408 }
1409
1410 var sg Sourcegraph
1411 if rootURL.IsAbs() {
1412 var batchSize int
1413 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1414 batchSize, err = strconv.Atoi(v)
1415 if err != nil {
1416 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1417 }
1418 }
1419
1420 opts := []SourcegraphClientOption{
1421 WithBatchSize(batchSize),
1422 }
1423
1424 logger := sglog.Scoped("zoektConfigurationGRPCClient")
1425 client, err := dialGRPCClient(rootURL.Host, logger)
1426 if err != nil {
1427 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1428 }
1429
1430 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...)
1431
1432 } else {
1433 sg = sourcegraphFake{
1434 RootDir: rootURL.String(),
1435 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1436 }
1437 }
1438
1439 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
1440 if cpuCount < 1 {
1441 cpuCount = 1
1442 }
1443
1444 if conf.indexConcurrency < 1 {
1445 conf.indexConcurrency = 1
1446 } else if conf.indexConcurrency > int64(cpuCount) {
1447 conf.indexConcurrency = int64(cpuCount)
1448 }
1449
1450 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1451
1452 return &Server{
1453 logger: logger,
1454 Sourcegraph: sg,
1455 IndexDir: conf.index,
1456 IndexConcurrency: int(conf.indexConcurrency),
1457 Interval: conf.interval,
1458 CPUCount: cpuCount,
1459 queue: *q,
1460 shardMerging: !conf.disableShardMerging,
1461 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1462 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1463 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1464 hostname: conf.hostname,
1465 mergeOpts: mergeOpts{
1466 vacuumInterval: conf.vacuumInterval,
1467 mergeInterval: conf.mergeInterval,
1468 targetSizeBytes: conf.targetSize * 1024 * 1024,
1469 minSizeBytes: conf.minSize * 1024 * 1024,
1470 minAgeDays: conf.minAgeDays,
1471 },
1472 timeout: indexingTimeout,
1473 }, err
1474}
1475
1476// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1477// for the indexed-search-configuration gRPC service.
1478//
1479// The default backoff strategy is modeled after the default settings used by
1480// retryablehttp.DefaultClient.
1481//
1482// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1483// - Unavailable
1484// - Aborted
1485//
1486//go:embed default_grpc_service_configuration.json
1487var defaultGRPCServiceConfigurationJSON string
1488
1489func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1490 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1491 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1492 return invoker(ctx, method, req, reply, cc, opts...)
1493 }
1494}
1495
1496func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1497 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1498 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1499 return streamer(ctx, desc, cc, method, opts...)
1500 }
1501}
1502
1503// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1504// This can be overridden by providing custom Server/Dial options.
1505const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1506
1507func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) {
1508 metrics := mustGetClientMetrics()
1509
1510 // If the service seems to be unavailable, this
1511 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1
1512 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s])
1513 retryOpts := []retry.CallOption{
1514 retry.WithMax(5),
1515 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)),
1516 retry.WithCodes(codes.Unavailable),
1517 }
1518
1519 opts := []grpc.DialOption{
1520 grpc.WithTransportCredentials(insecure.NewCredentials()),
1521 grpc.WithChainStreamInterceptor(
1522 metrics.StreamClientInterceptor(),
1523 messagesize.StreamClientInterceptor,
1524 internalActorStreamInterceptor(),
1525 internalerrs.LoggingStreamClientInterceptor(logger),
1526 internalerrs.PrometheusStreamClientInterceptor,
1527 retry.StreamClientInterceptor(retryOpts...),
1528 ),
1529 grpc.WithChainUnaryInterceptor(
1530 metrics.UnaryClientInterceptor(),
1531 messagesize.UnaryClientInterceptor,
1532 internalActorUnaryInterceptor(),
1533 internalerrs.LoggingUnaryClientInterceptor(logger),
1534 internalerrs.PrometheusUnaryClientInterceptor,
1535 retry.UnaryClientInterceptor(retryOpts...),
1536 ),
1537 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1538 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1539 }
1540
1541 opts = append(opts, additionalOpts...)
1542
1543 // Ensure that the message size options are set last, so they override any other
1544 // client-specific options that tweak the message size.
1545 //
1546 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1547 // take precedence over everything else with a uniform size setting that's easy to reason about.
1548 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1549
1550 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1551 // This is done lazily, so we can provide the client to use regardless of
1552 // whether we enabled gRPC or not initially.
1553 cc, err := grpc.Dial(addr, opts...)
1554 if err != nil {
1555 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1556 }
1557
1558 client := proto.NewZoektConfigurationServiceClient(cc)
1559 return client, nil
1560}
1561
1562// mustGetClientMetrics returns a singleton instance of the client metrics
1563// that are shared across all gRPC clients that this process creates.
1564//
1565// This function panics if the metrics cannot be registered with the default
1566// Prometheus registry.
1567func mustGetClientMetrics() *grpcprom.ClientMetrics {
1568 clientMetricsOnce.Do(func() {
1569 clientMetrics = grpcprom.NewClientMetrics(
1570 grpcprom.WithClientCounterOptions(),
1571 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
1572 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
1573 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
1574 )
1575
1576 prometheus.DefaultRegisterer.MustRegister(clientMetrics)
1577 })
1578
1579 return clientMetrics
1580}
1581
1582// addDefaultPort adds a default port to a URL if one is not specified.
1583//
1584// If the URL scheme is "http" and no port is specified, "80" is used.
1585// If the scheme is "https", "443" is used.
1586//
1587// The original URL is not mutated. A copy is modified and returned.
1588func addDefaultPort(original *url.URL) *url.URL {
1589 if original == nil {
1590 return nil // don't panic
1591 }
1592
1593 if !original.IsAbs() {
1594 return original // don't do anything if the URL doesn't look like a remote URL
1595 }
1596
1597 if original.Scheme == "http" && original.Port() == "" {
1598 u := cloneURL(original)
1599 u.Host = net.JoinHostPort(u.Host, "80")
1600 return u
1601 }
1602
1603 if original.Scheme == "https" && original.Port() == "" {
1604 u := cloneURL(original)
1605 u.Host = net.JoinHostPort(u.Host, "443")
1606 return u
1607 }
1608
1609 return original
1610}
1611
1612// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1613// This is copied from net/http/clone.go
1614func cloneURL(u *url.URL) *url.URL {
1615 if u == nil {
1616 return nil
1617 }
1618 u2 := new(url.URL)
1619 *u2 = *u
1620 if u.User != nil {
1621 u2.User = new(url.Userinfo)
1622 *u2.User = *u.User
1623 }
1624 return u2
1625}
1626
1627func main() {
1628 liblog := sglog.Init(sglog.Resource{
1629 Name: "zoekt-indexserver",
1630 Version: zoekt.Version,
1631 InstanceID: zoekt.HostnameBestEffort(),
1632 })
1633 defer liblog.Sync()
1634
1635 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1636 log.Fatal(err)
1637 }
1638}
1639
1640// getBoolFromEnvironmentVariables returns the boolean defined by the first environment
1641// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set.
1642//
1643// An error is returned of the provided environment variables fails to parse as a boolean.
1644func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) {
1645 for _, envVar := range envVarNames {
1646 v := os.Getenv(envVar)
1647 if v == "" {
1648 continue
1649 }
1650
1651 b, err := strconv.ParseBool(v)
1652 if err != nil {
1653 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err)
1654 }
1655
1656 return b, nil
1657 }
1658
1659 return defaultBool, nil
1660}