fork of https://github.com/sourcegraph/zoekt
1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled
2// repositories on sourcegraph
3package main
4
5import (
6 "bytes"
7 "context"
8 _ "embed"
9 "encoding/json"
10 "errors"
11 "flag"
12 "fmt"
13 "html/template"
14 "io"
15 "io/fs"
16 "log"
17 "math"
18 "math/rand"
19 "net"
20 "net/http"
21 "net/url"
22 "os"
23 "os/exec"
24 "os/signal"
25 "path/filepath"
26 "runtime"
27 "sort"
28 "strconv"
29 "strings"
30 "sync"
31 "text/tabwriter"
32 "time"
33
34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
36 "github.com/peterbourgon/ff/v3/ffcli"
37 "github.com/prometheus/client_golang/prometheus"
38 "github.com/prometheus/client_golang/prometheus/promauto"
39 sglog "github.com/sourcegraph/log"
40 "github.com/sourcegraph/mountinfo"
41 "go.uber.org/automaxprocs/maxprocs"
42 "golang.org/x/net/trace"
43 "golang.org/x/sys/unix"
44 "google.golang.org/grpc"
45 "google.golang.org/grpc/codes"
46 "google.golang.org/grpc/credentials/insecure"
47 "google.golang.org/grpc/metadata"
48
49 "github.com/sourcegraph/zoekt"
50 "github.com/sourcegraph/zoekt/build"
51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
52 "github.com/sourcegraph/zoekt/debugserver"
53 "github.com/sourcegraph/zoekt/grpc/internalerrs"
54 "github.com/sourcegraph/zoekt/grpc/messagesize"
55 "github.com/sourcegraph/zoekt/internal/profiler"
56)
57
58var (
59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
60 Name: "resolve_revisions_seconds",
61 Help: "A histogram of latencies for resolving all repository revisions.",
62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
63 })
64
65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
66 Name: "resolve_revision_seconds",
67 Help: "A histogram of latencies for resolving a repository revision.",
68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
69 }, []string{"success"}) // success=true|false
70
71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
72 Name: "get_index_options_total",
73 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
74 })
75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
76 Name: "get_index_options_error_total",
77 Help: "The total number of times we failed to get index options for a repository.",
78 })
79
80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
81 Name: "index_repo_seconds",
82 Help: "A histogram of latencies for indexing a repository.",
83 Buckets: prometheus.ExponentialBucketsRange(
84 (100 * time.Millisecond).Seconds(),
85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
86 20),
87 }, []string{
88 "state", // state is an indexState
89 "name", // name of the repository that was indexed
90 })
91
92 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{
93 Name: "index_indexing_delay_seconds",
94 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.",
95 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days
96 }, []string{
97 "state", // state is an indexState
98 "name", // the name of the repository that was indexed
99 })
100
101 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
102 Name: "index_fetch_seconds",
103 Help: "A histogram of latencies for fetching a repository.",
104 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
105 }, []string{
106 "success", // true|false
107 "name", // the name of the repository that the commits were fetched from
108 })
109
110 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
111 Name: "index_incremental_index_state",
112 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
113 }, []string{"state"}) // state is build.IndexState
114
115 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
116 Name: "index_num_indexed",
117 Help: "Number of indexed repos by code host",
118 })
119
120 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
121 Name: "index_failing_total",
122 Help: "Counts failures to index (indexing activity, should be used with rate())",
123 })
124
125 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
126 Name: "index_indexing_total",
127 Help: "Counts indexings (indexing activity, should be used with rate())",
128 })
129
130 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
131 Name: "index_num_stopped_tracking_total",
132 Help: "Counts the number of repos we stopped tracking.",
133 })
134
135 // clientMetricsOnce returns a singleton instance of the client metrics
136 // that are shared across all gRPC clients that this process creates.
137 //
138 // This function panics if the metrics cannot be registered with the default
139 // Prometheus registry.
140 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics {
141 clientMetrics := grpcprom.NewClientMetrics(
142 grpcprom.WithClientCounterOptions(),
143 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
144 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
145 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
146 )
147 prometheus.DefaultRegisterer.MustRegister(clientMetrics)
148 return clientMetrics
149 })
150)
151
152// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
153// NOTE: if you change this, you must also update gitIndex to use the same value when fetching the repo.
154const MaxFileSize = 1 << 20
155
156// set of repositories that we want to capture separate indexing metrics for
157var reposWithSeparateIndexingMetrics = make(map[string]struct{})
158
159type indexState string
160
161const (
162 indexStateFail indexState = "fail"
163 indexStateSuccess indexState = "success"
164 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
165 indexStateNoop indexState = "noop" // We didn't need to update index
166 indexStateEmpty indexState = "empty" // index is empty (empty repo)
167)
168
169// Server is the main functionality of zoekt-sourcegraph-indexserver. It
170// exists to conveniently use all the options passed in via func main.
171type Server struct {
172 logger sglog.Logger
173
174 Sourcegraph Sourcegraph
175 BatchSize int
176
177 // IndexDir is the index directory to use.
178 IndexDir string
179
180 // IndexConcurrency is the number of repositories we index at once.
181 IndexConcurrency int
182
183 // Interval is how often we sync with Sourcegraph.
184 Interval time.Duration
185
186 // CPUCount is the number of CPUs to use for indexing shards.
187 CPUCount int
188
189 queue Queue
190
191 // muIndexDir protects the index directory from concurrent access.
192 muIndexDir indexMutex
193
194 // If true, shard merging is enabled.
195 shardMerging bool
196
197 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
198 // use delta-builds for instead of normal builds
199 deltaBuildRepositoriesAllowList map[string]struct{}
200
201 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
202 // before attempting a delta build.
203 deltaShardNumberFallbackThreshold uint64
204
205 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
206 // we skip calculating symbols metadata for during builds
207 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
208
209 hostname string
210
211 mergeOpts mergeOpts
212
213 // timeout defines how long the index server waits before killing an indexing job.
214 timeout time.Duration
215}
216
217var debug = log.New(io.Discard, "", log.LstdFlags)
218
219// our index commands should output something every 100mb they process.
220//
221// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
222// time." famous last words. A client was indexing a monorepo with 42
223// cores... 5m was not enough.
224const noOutputTimeout = 30 * time.Minute
225
226func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
227 out := &synchronizedBuffer{}
228 cmd.Stdout = out
229 cmd.Stderr = out
230
231 tr.LazyPrintf("%s", cmd.Args)
232
233 defer func() {
234 if err != nil {
235 outS := out.String()
236 tr.LazyPrintf("failed: %v", err)
237 tr.LazyPrintf("output: %s", out)
238 tr.SetError()
239 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
240 }
241 }()
242
243 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
244
245 if err := cmd.Start(); err != nil {
246 return err
247 }
248
249 errC := make(chan error)
250 go func() {
251 errC <- cmd.Wait()
252 }()
253
254 // This channel is set after we have sent sigquit. It allows us to follow up
255 // with a sigkill if the process doesn't quit after sigquit.
256 kill := make(<-chan time.Time)
257
258 lastLen := 0
259 for {
260 select {
261 case <-time.After(noOutputTimeout):
262 // Periodically check if we have had output. If not kill the process.
263 if out.Len() != lastLen {
264 lastLen = out.Len()
265 log.Printf("still running %s", cmd.Args)
266 } else {
267 // Send quit (C-\) first so we get a stack dump.
268 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
269 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
270 log.Println("quit failed:", err)
271 }
272
273 // send sigkill if still running in 10s
274 kill = time.After(10 * time.Second)
275 }
276
277 case <-kill:
278 log.Printf("still running, killing %s", cmd.Args)
279 if err := cmd.Process.Kill(); err != nil {
280 log.Println("kill failed:", err)
281 }
282
283 case err := <-errC:
284 if err != nil {
285 return err
286 }
287
288 tr.LazyPrintf("success")
289 return nil
290 }
291 }
292}
293
294// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
295// monitor the buffer while it is being written to.
296type synchronizedBuffer struct {
297 mu sync.Mutex
298 b bytes.Buffer
299}
300
301func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
302 sb.mu.Lock()
303 defer sb.mu.Unlock()
304 return sb.b.Write(p)
305}
306
307func (sb *synchronizedBuffer) Len() int {
308 sb.mu.Lock()
309 defer sb.mu.Unlock()
310 return sb.b.Len()
311}
312
313func (sb *synchronizedBuffer) String() string {
314 sb.mu.Lock()
315 defer sb.mu.Unlock()
316 return sb.b.String()
317}
318
319// pauseFileName if present in IndexDir will stop index jobs from
320// running. This is to make it possible to experiment with the content of the
321// IndexDir without the indexserver writing to it.
322const pauseFileName = "PAUSE"
323
324// Run the sync loop. This blocks forever.
325func (s *Server) Run() {
326 removeIncompleteShards(s.IndexDir)
327
328 // Start a goroutine which updates the queue with commits to index.
329 go func() {
330 // We update the list of indexed repos every Interval. To speed up manual
331 // testing we also listen for SIGUSR1 to trigger updates.
332 //
333 // "pkill -SIGUSR1 zoekt-sourcegra"
334 for range jitterTicker(s.Interval, unix.SIGUSR1) {
335 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
336 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
337 continue
338 }
339
340 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
341 if err != nil {
342 log.Printf("error listing repos: %s", err)
343 continue
344 }
345
346 debug.Printf("updating index queue with %d repositories", len(repos.IDs))
347
348 // Stop indexing repos we don't need to track anymore
349 removed := s.queue.MaybeRemoveMissing(repos.IDs)
350 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
351 if len(removed) > 0 {
352 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
353 }
354
355 cleanupDone := make(chan struct{})
356 go func() {
357 defer close(cleanupDone)
358 s.muIndexDir.Global(func() {
359 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
360 })
361 }()
362
363 repos.IterateIndexOptions(s.queue.AddOrUpdate)
364
365 // IterateIndexOptions will only iterate over repositories that have
366 // changed since we last called list. However, we want to add all IDs
367 // back onto the queue just to check that what is on disk is still
368 // correct. This will use the last IndexOptions we stored in the
369 // queue. The repositories not on the queue (missing) need a forced
370 // fetch of IndexOptions.
371 missing := s.queue.Bump(repos.IDs)
372 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
373
374 setCompoundShardCounter(s.IndexDir)
375
376 <-cleanupDone
377 }
378 }()
379
380 go func() {
381 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
382 if s.shardMerging {
383 s.vacuum()
384 }
385 }
386 }()
387
388 go func() {
389 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
390 if s.shardMerging {
391 s.doMerge()
392 }
393 }
394 }()
395
396 for i := 0; i < s.IndexConcurrency; i++ {
397 go s.processQueue()
398 }
399
400 // block forever
401 select {}
402}
403
404// formatList returns a comma-separated list of the first min(len(v), m) items.
405func formatListUint32(v []uint32, m int) string {
406 if len(v) < m {
407 m = len(v)
408 }
409
410 sb := strings.Builder{}
411 for i := 0; i < m; i++ {
412 fmt.Fprintf(&sb, "%d, ", v[i])
413 }
414
415 if len(v) > m {
416 sb.WriteString("...")
417 }
418
419 return strings.TrimRight(sb.String(), ", ")
420}
421
422func (s *Server) processQueue() {
423 for {
424 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
425 time.Sleep(time.Second)
426 continue
427 }
428
429 item, ok := s.queue.Pop()
430 if !ok {
431 time.Sleep(time.Second)
432 continue
433 }
434
435 opts := item.Opts
436 args := s.indexArgs(opts)
437
438 ran := s.muIndexDir.With(opts.Name, func() {
439 // only record time taken once we hold the lock. This avoids us
440 // recording time taken while merging/cleanup runs.
441 start := time.Now()
442
443 state, err := s.Index(args)
444
445 elapsed := time.Since(start)
446 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
447
448 indexDelay := time.Since(item.DateAddedToQueue)
449 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds())
450
451 if err != nil {
452 log.Printf("error indexing %s: %s", args.String(), err)
453 }
454
455 switch state {
456 case indexStateSuccess:
457 var branches []string
458 for _, b := range args.Branches {
459 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
460 }
461 s.logger.Info("updated index",
462 sglog.String("repo", args.Name),
463 sglog.Uint32("id", args.RepoID),
464 sglog.Strings("branches", branches),
465 sglog.Duration("duration", elapsed),
466 sglog.Duration("index_delay", indexDelay),
467 )
468 case indexStateSuccessMeta:
469 log.Printf("updated meta %s in %v", args.String(), elapsed)
470 }
471 s.queue.SetIndexed(opts, state)
472 })
473
474 if !ran {
475 // Someone else is processing the repository. We can just skip this job
476 // since the repository will be added back to the queue and we will
477 // converge to the correct behaviour.
478 debug.Printf("index job for repository already running: %s", args)
479 continue
480 }
481 }
482}
483
484// repoNameForMetric returns a normalized version of the given repository name that is
485// suitable for use with Prometheus metrics.
486func repoNameForMetric(repo string) string {
487 // Check to see if we want to be able to capture separate indexing metrics for this repository.
488 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
489 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
490 return repo
491 }
492
493 return ""
494}
495
496func batched(slice []uint32, size int) <-chan []uint32 {
497 c := make(chan []uint32)
498 go func() {
499 for len(slice) > 0 {
500 if size > len(slice) {
501 size = len(slice)
502 }
503 c <- slice[:size]
504 slice = slice[size:]
505 }
506 close(c)
507 }()
508 return c
509}
510
511// jitterTicker returns a ticker which ticks with a jitter. Each tick is
512// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
513//
514// sig is a list of signals which also cause the ticker to fire. This is a
515// convenience to allow manually triggering of the ticker.
516func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
517 ticker := make(chan struct{})
518
519 go func() {
520 for {
521 ticker <- struct{}{}
522 ns := int64(d)
523 jitter := rand.Int63n(ns)
524 time.Sleep(time.Duration(ns/2 + jitter))
525 }
526 }()
527
528 go func() {
529 if len(sig) == 0 {
530 return
531 }
532
533 c := make(chan os.Signal, 1)
534 signal.Notify(c, sig...)
535 for range c {
536 ticker <- struct{}{}
537 }
538 }()
539
540 return ticker
541}
542
543// Index starts an index job for repo name at commit.
544func (s *Server) Index(args *indexArgs) (state indexState, err error) {
545 tr := trace.New("index", args.Name)
546 tr.SetMaxEvents(30) // Ensure we capture all indexing events
547
548 defer func() {
549 if err != nil {
550 tr.SetError()
551 tr.LazyPrintf("error: %v", err)
552 state = indexStateFail
553 metricFailingTotal.Inc()
554 }
555 tr.LazyPrintf("state: %s", state)
556 tr.Finish()
557 }()
558
559 tr.LazyPrintf("branches: %v", args.Branches)
560
561 if len(args.Branches) == 0 {
562 return indexStateEmpty, createEmptyShard(args)
563 }
564
565 repositoryName := args.Name
566 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
567 tr.LazyPrintf("marking this repository for delta build")
568 args.UseDelta = true
569 }
570
571 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
572
573 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
574 tr.LazyPrintf("skipping symbols calculation")
575 args.Symbols = false
576 }
577
578 reason := "forced"
579
580 if args.Incremental {
581 bo := args.BuildOptions()
582 bo.SetDefaults()
583 incrementalState, fn := bo.IndexState()
584 reason = string(incrementalState)
585 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
586
587 switch incrementalState {
588 case build.IndexStateEqual:
589 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn)
590 return indexStateNoop, nil
591
592 case build.IndexStateMeta:
593 log.Printf("updating index.meta %s", args.String())
594
595 if err := mergeMeta(bo); err != nil {
596 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
597 } else {
598 return indexStateSuccessMeta, nil
599 }
600
601 case build.IndexStateCorrupt:
602 log.Printf("falling back to full update: corrupt index: %s", args.String())
603 }
604 }
605
606 log.Printf("updating index %s reason=%s", args.String(), reason)
607
608 metricIndexingTotal.Inc()
609 c := gitIndexConfig{
610 runCmd: func(cmd *exec.Cmd) error {
611 return s.loggedRun(tr, cmd)
612 },
613
614 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
615 return args.BuildOptions().FindRepositoryMetadata()
616 },
617 timeout: s.timeout,
618 }
619
620 err = gitIndex(c, args, s.Sourcegraph, s.logger)
621 if err != nil {
622 return indexStateFail, err
623 }
624
625 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
626 s.logger.Error("failed to update index status",
627 sglog.String("repo", args.Name),
628 sglog.Uint32("id", args.RepoID),
629 sglogBranches("branches", args.Branches),
630 sglog.Error(err),
631 )
632 }
633
634 return indexStateSuccess, nil
635}
636
637// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
638// it can update the zoekt_repos table.
639func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
640 // We need to read from disk for IndexTime.
641 _, metadata, ok, err := c.findRepositoryMetadata(args)
642 if err != nil {
643 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
644 }
645 if !ok {
646 return errors.New("failed to find metadata for new/updated index")
647 }
648
649 status := []indexStatus{{
650 RepoID: args.RepoID,
651 Branches: args.Branches,
652 IndexTimeUnix: metadata.IndexTime.Unix(),
653 }}
654 if err := sg.UpdateIndexStatus(status); err != nil {
655 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
656 }
657
658 return nil
659}
660
661func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
662 ss := make([]string, len(branches))
663 for i, b := range branches {
664 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
665 }
666 return sglog.Strings(key, ss)
667}
668
669func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
670 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
671 return &indexArgs{
672 IndexOptions: opts,
673 IndexDir: s.IndexDir,
674 Parallelism: parallelism,
675 Incremental: true,
676 FileLimit: MaxFileSize,
677 ShardMerging: s.shardMerging,
678 }
679}
680
681// parallelism consults both the server flags and index options to determine the number
682// of shards to index in parallel. If the CPUCount index option is provided, it always
683// overrides the server flag.
684func (s *Server) parallelism(opts IndexOptions, maxProcs int) int {
685 var parallelism int
686 if opts.ShardConcurrency > 0 {
687 parallelism = int(opts.ShardConcurrency)
688 } else {
689 parallelism = s.CPUCount
690 }
691
692 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs
693 if parallelism > 4*maxProcs {
694 parallelism = 4 * maxProcs
695 }
696
697 // If index concurrency is set, then divide the parallelism by the number of
698 // repos we're indexing in parallel
699 if s.IndexConcurrency > 1 {
700 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency)))
701 }
702
703 return parallelism
704}
705
706func createEmptyShard(args *indexArgs) error {
707 bo := args.BuildOptions()
708 bo.SetDefaults()
709 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
710
711 if args.Incremental && bo.IncrementalSkipIndexing() {
712 return nil
713 }
714
715 builder, err := build.NewBuilder(*bo)
716 if err != nil {
717 return err
718 }
719 return builder.Finish()
720}
721
722// addDebugHandlers adds handlers specific to indexserver.
723func (s *Server) addDebugHandlers(mux *http.ServeMux) {
724 // Sourcegraph's site admin view requires indexserver to serve it's admin view
725 // on "/".
726 mux.Handle("/", http.HandlerFunc(s.handleRoot))
727
728 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
729 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
730 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
731 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
732 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
733 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
734}
735
736func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
737 if r.Method != "GET" {
738 w.Header().Set("Allow", "GET")
739 w.WriteHeader(http.StatusMethodNotAllowed)
740 return
741 }
742
743 response := struct {
744 Hostname string
745 }{
746 Hostname: s.hostname,
747 }
748
749 b, err := json.Marshal(response)
750 if err != nil {
751 http.Error(w, err.Error(), http.StatusInternalServerError)
752 return
753 }
754
755 w.Header().Set("Content-Type", "application/json; charset=utf-8")
756 w.Write(b)
757}
758
759var rootTmpl = template.Must(template.New("name").Parse(`
760<html>
761 <body>
762 <a href="debug">Debug</a><br />
763 <a href="debug/requests">Traces</a><br />
764 {{.IndexMsg}}<br />
765 <br />
766 <h3>Reindex</h3>
767 {{if .Repos}}
768 <a href="?show_repos=false">hide repos</a><br />
769 <table style="margin-top: 20px">
770 <th style="text-align:left">Name</th>
771 <th style="text-align:left">ID (click to reindex)</th>
772 {{range .Repos}}
773 <tr>
774 <td>{{.Name}}</td>
775 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
776 </tr>
777 {{end}}
778 </table>
779 {{else}}
780 <a href="?show_repos=true">show repos</a><br />
781 {{end}}
782 </body>
783</html>
784`))
785
786func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
787 if r.Method != "GET" {
788 w.Header().Set("Allow", "GET")
789 w.WriteHeader(http.StatusMethodNotAllowed)
790 return
791 }
792
793 values := r.URL.Query()
794
795 // ?id=
796 indexMsg := ""
797 if v := values.Get("id"); v != "" {
798 id, err := strconv.Atoi(v)
799 if err != nil {
800 http.Error(w, err.Error(), http.StatusBadRequest)
801 return
802 }
803 indexMsg, _ = s.forceIndex(uint32(id))
804 }
805
806 // ?show_repos=
807 showRepos := false
808 if v := values.Get("show_repos"); v != "" {
809 showRepos, _ = strconv.ParseBool(v)
810 }
811
812 type Repo struct {
813 ID uint32
814 Name string
815 }
816 var data struct {
817 Repos []Repo
818 IndexMsg string
819 }
820
821 data.IndexMsg = indexMsg
822
823 if showRepos {
824 s.queue.Iterate(func(opts *IndexOptions) {
825 data.Repos = append(data.Repos, Repo{
826 ID: opts.RepoID,
827 Name: opts.Name,
828 })
829 })
830 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
831 }
832
833 _ = rootTmpl.Execute(w, data)
834}
835
836// handleReindex triggers a reindex asynocronously. If a reindex was triggered
837// the request returns with status 202. The caller can infer the new state of
838// the index by calling List.
839func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
840 if r.Method != http.MethodPost {
841 w.Header().Set("Allow", http.MethodPost)
842 w.WriteHeader(http.StatusMethodNotAllowed)
843 return
844 }
845
846 err := r.ParseForm()
847 if err != nil {
848 http.Error(w, err.Error(), http.StatusBadRequest)
849 return
850 }
851
852 id, err := strconv.Atoi(r.Form.Get("repo"))
853 if err != nil {
854 http.Error(w, err.Error(), http.StatusBadRequest)
855 return
856 }
857
858 go func() { s.forceIndex(uint32(id)) }()
859
860 // 202 Accepted
861 w.WriteHeader(http.StatusAccepted)
862}
863
864func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
865 withIndexed := true
866 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
867 withIndexed = b
868 }
869
870 var indexed []uint32
871 if withIndexed {
872 indexed = listIndexed(s.IndexDir)
873 }
874
875 repos, err := s.Sourcegraph.List(r.Context(), indexed)
876 if err != nil {
877 http.Error(w, err.Error(), http.StatusInternalServerError)
878 return
879 }
880
881 bw := bytes.Buffer{}
882
883 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
884
885 _, err = fmt.Fprintf(tw, "ID\tName\n")
886 if err != nil {
887 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
888 return
889 }
890
891 s.queue.mu.Lock()
892 name := ""
893 for _, id := range repos.IDs {
894 if item := s.queue.get(id); item != nil {
895 name = item.opts.Name
896 } else {
897 name = ""
898 }
899 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
900 if err != nil {
901 debug.Printf("handleDebugList: %s\n", err.Error())
902 }
903 }
904 s.queue.mu.Unlock()
905
906 if err != nil {
907 http.Error(w, err.Error(), http.StatusInternalServerError)
908 return
909 }
910
911 err = tw.Flush()
912 if err != nil {
913 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
914 return
915 }
916
917 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
918
919 if _, err := io.Copy(w, &bw); err != nil {
920 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
921 return
922 }
923}
924
925// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
926// can run this command during periods of low usage (evenings, weekends) to
927// trigger an initial merge run. In the steady-state, merges happen rarely, even
928// on busy instances, and users can rely on automatic merging instead.
929func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
930 // A merge operation can take very long, depending on the number merges and the
931 // target size of the compound shards. We run the merge in the background and
932 // return immediately to the user.
933 //
934 // We track the status of the merge with metricShardMergingRunning.
935 go func() {
936 s.doMerge()
937 }()
938 _, _ = w.Write([]byte("merging enqueued\n"))
939}
940
941func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
942 indexed := listIndexed(s.IndexDir)
943
944 bw := bytes.Buffer{}
945
946 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
947
948 _, err := fmt.Fprintf(tw, "ID\tName\n")
949 if err != nil {
950 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
951 return
952 }
953
954 s.queue.mu.Lock()
955 name := ""
956 for _, id := range indexed {
957 if item := s.queue.get(id); item != nil {
958 name = item.opts.Name
959 } else {
960 name = ""
961 }
962 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
963 if err != nil {
964 debug.Printf("handleDebugIndexed: %s\n", err.Error())
965 }
966 }
967 s.queue.mu.Unlock()
968
969 if err != nil {
970 http.Error(w, err.Error(), http.StatusInternalServerError)
971 return
972 }
973
974 err = tw.Flush()
975 if err != nil {
976 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
977 return
978 }
979
980 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
981
982 if _, err := io.Copy(w, &bw); err != nil {
983 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
984 return
985 }
986}
987
988// forceIndex will run the index job for repo name now. It will return always
989// return a string explaining what it did, even if it failed.
990func (s *Server) forceIndex(id uint32) (string, error) {
991 var opts IndexOptions
992 var err error
993 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
994 opts = o
995 }, func(_ uint32, e error) {
996 err = e
997 }, id)
998 if err != nil {
999 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
1000 }
1001
1002 args := s.indexArgs(opts)
1003 args.Incremental = false // force re-index
1004
1005 var state indexState
1006 ran := s.muIndexDir.With(opts.Name, func() {
1007 state, err = s.Index(args)
1008 })
1009 if !ran {
1010 return fmt.Sprintf("index job for repository already running: %s", args), nil
1011 }
1012 if err != nil {
1013 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
1014 }
1015 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
1016}
1017
1018func listIndexed(indexDir string) []uint32 {
1019 index := getShards(indexDir)
1020 metricNumIndexed.Set(float64(len(index)))
1021 repoIDs := make([]uint32, 0, len(index))
1022 for id := range index {
1023 repoIDs = append(repoIDs, id)
1024 }
1025 sort.Slice(repoIDs, func(i, j int) bool {
1026 return repoIDs[i] < repoIDs[j]
1027 })
1028 return repoIDs
1029}
1030
1031// setupTmpDir sets up a temporary directory on the same volume as the
1032// indexes.
1033//
1034// If main is true we will delete older temp directories left around. main is
1035// false when this is a debug command.
1036func setupTmpDir(logger sglog.Logger, main bool, index string) error {
1037 // change the target tmp directory depending on if it's our main daemon or a
1038 // debug sub command.
1039 dir := ".indexserver.debug.tmp"
1040 if main {
1041 dir = ".indexserver.tmp"
1042 }
1043
1044 tmpRoot := filepath.Join(index, dir)
1045
1046 if main {
1047 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot))
1048 err := os.RemoveAll(tmpRoot)
1049 if err != nil {
1050 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err))
1051 }
1052 }
1053
1054 if err := os.MkdirAll(tmpRoot, 0o755); err != nil {
1055 return err
1056 }
1057
1058 return os.Setenv("TMPDIR", tmpRoot)
1059}
1060
1061func printMetaData(fn string) error {
1062 repo, indexMeta, err := zoekt.ReadMetadataPath(fn)
1063 if err != nil {
1064 return err
1065 }
1066
1067 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1068 if err != nil {
1069 return err
1070 }
1071
1072 err = json.NewEncoder(os.Stdout).Encode(repo)
1073 if err != nil {
1074 return err
1075 }
1076 return nil
1077}
1078
1079func printShardStats(fn string) error {
1080 f, err := os.Open(fn)
1081 if err != nil {
1082 return err
1083 }
1084
1085 iFile, err := zoekt.NewIndexFile(f)
1086 if err != nil {
1087 return err
1088 }
1089
1090 return zoekt.PrintNgramStats(iFile)
1091}
1092
1093func srcLogLevelIsDebug() bool {
1094 lvl := os.Getenv(sglog.EnvLogLevel)
1095 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1096}
1097
1098func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1099 v := os.Getenv(k)
1100 if v == "" {
1101 return defaultVal
1102 }
1103 b, err := strconv.ParseBool(v)
1104 if err != nil {
1105 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1106 }
1107 return b
1108}
1109
1110func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1111 v := os.Getenv(k)
1112 if v == "" {
1113 return defaultVal
1114 }
1115 i, err := strconv.ParseInt(v, 10, 64)
1116 if err != nil {
1117 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1118 }
1119 return i
1120}
1121
1122func getEnvWithDefaultInt(k string, defaultVal int) int {
1123 v := os.Getenv(k)
1124 if v == "" {
1125 return defaultVal
1126 }
1127 i, err := strconv.Atoi(k)
1128 if err != nil {
1129 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1130 }
1131 return i
1132}
1133
1134func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1135 v := os.Getenv(k)
1136 if v == "" {
1137 return defaultVal
1138 }
1139 i, err := strconv.ParseUint(v, 10, 64)
1140 if err != nil {
1141 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1142 }
1143 return i
1144}
1145
1146func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1147 v := os.Getenv(k)
1148 if v == "" {
1149 return defaultVal
1150 }
1151 f, err := strconv.ParseFloat(v, 64)
1152 if err != nil {
1153 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1154 }
1155 return f
1156}
1157
1158func getEnvWithDefaultString(k string, defaultVal string) string {
1159 v := os.Getenv(k)
1160 if v == "" {
1161 return defaultVal
1162 }
1163 return v
1164}
1165
1166func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1167 v := os.Getenv(k)
1168 if v == "" {
1169 return defaultVal
1170 }
1171
1172 d, err := time.ParseDuration(v)
1173 if err != nil {
1174 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1175 }
1176 return d
1177}
1178
1179func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1180 set := map[string]struct{}{}
1181 for _, v := range strings.Split(os.Getenv(k), ",") {
1182 v = strings.TrimSpace(v)
1183 if v != "" {
1184 set[v] = struct{}{}
1185 }
1186 }
1187 return set
1188}
1189
1190func joinStringSet(set map[string]struct{}, sep string) string {
1191 var xs []string
1192 for x := range set {
1193 xs = append(xs, x)
1194 }
1195
1196 return strings.Join(xs, sep)
1197}
1198
1199func setCompoundShardCounter(indexDir string) {
1200 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt"))
1201 if err != nil {
1202 log.Printf("setCompoundShardCounter: %s\n", err)
1203 return
1204 }
1205 metricNumberCompoundShards.Set(float64(len(fns)))
1206}
1207
1208func rootCmd() *ffcli.Command {
1209 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1210 conf := rootConfig{
1211 Main: true,
1212 }
1213 conf.registerRootFlags(rootFs)
1214
1215 return &ffcli.Command{
1216 FlagSet: rootFs,
1217 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1218 Subcommands: []*ffcli.Command{debugCmd()},
1219 Exec: func(ctx context.Context, args []string) error {
1220 return startServer(conf)
1221 },
1222 }
1223}
1224
1225type rootConfig struct {
1226 // Main is true if this rootConfig is for our main long running command (the
1227 // indexserver). Debug commands should not set this value. This is used to
1228 // determine if we need to run tmpfriend.
1229 Main bool
1230
1231 root string
1232 interval time.Duration
1233 index string
1234 indexConcurrency int64
1235 listen string
1236 hostname string
1237 cpuFraction float64
1238
1239 // config values related to shard merging
1240 disableShardMerging bool
1241 vacuumInterval time.Duration
1242 mergeInterval time.Duration
1243 targetSize int64
1244 minSize int64
1245 minAgeDays int
1246
1247 // config values related to backoff indexing repos with one or more consecutive failures
1248 backoffDuration time.Duration
1249 maxBackoffDuration time.Duration
1250}
1251
1252func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1253 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1254 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1255 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
1256 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use")
1257 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1258 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1259 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1260 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1261 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1262
1263 // flags related to shard merging
1264 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging")
1265 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1266 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1267 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB")
1268 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB")
1269 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1270}
1271
1272func startServer(conf rootConfig) error {
1273 s, err := newServer(conf)
1274 if err != nil {
1275 return err
1276 }
1277
1278 profiler.Init("zoekt-sourcegraph-indexserver")
1279 setCompoundShardCounter(s.IndexDir)
1280
1281 if conf.listen != "" {
1282
1283 mux := http.NewServeMux()
1284 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1285 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1286 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1287 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1288 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1289 }...)
1290 s.addDebugHandlers(mux)
1291
1292 go func() {
1293 debug.Printf("serving HTTP on %s", conf.listen)
1294 log.Fatal(http.ListenAndServe(conf.listen, mux))
1295 }()
1296
1297 // Serve mux on a unix domain socket on a best-effort-basis so that
1298 // webserver can call the endpoints via the shared filesystem.
1299 //
1300 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1301 // on the socket due to permission errors. See
1302 // https://github.com/docker/for-mac/issues/6239
1303 go func() {
1304 serveHTTPOverSocket := func() error {
1305 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1306 // We cannot bind a socket to an existing pathname.
1307 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1308 return fmt.Errorf("error removing socket file: %s", socket)
1309 }
1310 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1311 // unixpacket).
1312 l, err := net.Listen("unix", socket)
1313 if err != nil {
1314 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1315 }
1316 // Indexserver (root) and webserver (Sourcegraph) run with
1317 // different users. Per default, the socket is created with
1318 // permission 755 (root root), which doesn't let webserver write to
1319 // it.
1320 //
1321 // See https://github.com/golang/go/issues/11822 for more context.
1322 if err := os.Chmod(socket, 0o777); err != nil {
1323 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1324 }
1325 debug.Printf("serving HTTP on %s", socket)
1326 return http.Serve(l, mux)
1327 }
1328 debug.Print(serveHTTPOverSocket())
1329 }()
1330 }
1331
1332 oc := &ownerChecker{
1333 Path: filepath.Join(conf.index, "owner.txt"),
1334 Hostname: conf.hostname,
1335 }
1336 go oc.Run()
1337
1338 logger := sglog.Scoped("metricsRegistration")
1339 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1340
1341 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1342 prometheus.DefaultRegisterer.MustRegister(c)
1343
1344 s.Run()
1345 return nil
1346}
1347
1348func newServer(conf rootConfig) (*Server, error) {
1349 logger := sglog.Scoped("server")
1350
1351 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1352 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1353 }
1354 if conf.index == "" {
1355 return nil, fmt.Errorf("must set -index")
1356 }
1357 if conf.root == "" {
1358 return nil, fmt.Errorf("must set -sourcegraph_url")
1359 }
1360 rootURL, err := url.Parse(conf.root)
1361 if err != nil {
1362 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1363 }
1364
1365 rootURL = addDefaultPort(rootURL)
1366
1367 // Tune GOMAXPROCS to match Linux container CPU quota.
1368 _, _ = maxprocs.Set()
1369
1370 // Automatically prepend our own path at the front, to minimize
1371 // required configuration.
1372 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1373 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1374 }
1375
1376 if _, err := os.Stat(conf.index); err != nil {
1377 if err := os.MkdirAll(conf.index, 0o755); err != nil {
1378 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1379 }
1380 }
1381
1382 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil {
1383 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1384 }
1385
1386 if srcLogLevelIsDebug() {
1387 debug = log.New(os.Stderr, "", log.LstdFlags)
1388 }
1389
1390 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1391 if len(reposWithSeparateIndexingMetrics) > 0 {
1392 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1393 }
1394
1395 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1396 if len(deltaBuildRepositoriesAllowList) > 0 {
1397 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1398 }
1399
1400 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1401 if deltaShardNumberFallbackThreshold > 0 {
1402 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1403 } else {
1404 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1405 }
1406
1407 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1408 if len(reposShouldSkipSymbolsCalculation) > 0 {
1409 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1410 }
1411
1412 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout)
1413 if indexingTimeout != defaultIndexingTimeout {
1414 debug.Printf("using configured indexing timeout: %s", indexingTimeout)
1415 }
1416
1417 var sg Sourcegraph
1418 if rootURL.IsAbs() {
1419 var batchSize int
1420 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1421 batchSize, err = strconv.Atoi(v)
1422 if err != nil {
1423 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1424 }
1425 }
1426
1427 opts := []SourcegraphClientOption{
1428 WithBatchSize(batchSize),
1429 }
1430
1431 logger := sglog.Scoped("zoektConfigurationGRPCClient")
1432 client, err := dialGRPCClient(rootURL.Host, logger)
1433 if err != nil {
1434 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1435 }
1436
1437 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...)
1438
1439 } else {
1440 sg = sourcegraphFake{
1441 RootDir: rootURL.String(),
1442 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1443 }
1444 }
1445
1446 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
1447 if cpuCount < 1 {
1448 cpuCount = 1
1449 }
1450
1451 if conf.indexConcurrency < 1 {
1452 conf.indexConcurrency = 1
1453 } else if conf.indexConcurrency > int64(cpuCount) {
1454 conf.indexConcurrency = int64(cpuCount)
1455 }
1456
1457 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1458
1459 return &Server{
1460 logger: logger,
1461 Sourcegraph: sg,
1462 IndexDir: conf.index,
1463 IndexConcurrency: int(conf.indexConcurrency),
1464 Interval: conf.interval,
1465 CPUCount: cpuCount,
1466 queue: *q,
1467 shardMerging: !conf.disableShardMerging,
1468 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1469 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1470 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1471 hostname: conf.hostname,
1472 mergeOpts: mergeOpts{
1473 vacuumInterval: conf.vacuumInterval,
1474 mergeInterval: conf.mergeInterval,
1475 targetSizeBytes: conf.targetSize * 1024 * 1024,
1476 minSizeBytes: conf.minSize * 1024 * 1024,
1477 minAgeDays: conf.minAgeDays,
1478 },
1479 timeout: indexingTimeout,
1480 }, err
1481}
1482
1483// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1484// for the indexed-search-configuration gRPC service.
1485//
1486// The default backoff strategy is modeled after the default settings used by
1487// retryablehttp.DefaultClient.
1488//
1489// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1490// - Unavailable
1491// - Aborted
1492//
1493//go:embed default_grpc_service_configuration.json
1494var defaultGRPCServiceConfigurationJSON string
1495
1496func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1497 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1498 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1499 return invoker(ctx, method, req, reply, cc, opts...)
1500 }
1501}
1502
1503func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1504 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1505 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1506 return streamer(ctx, desc, cc, method, opts...)
1507 }
1508}
1509
1510// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1511// This can be overridden by providing custom Server/Dial options.
1512const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1513
1514func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) {
1515 metrics := clientMetricsOnce()
1516
1517 // If the service seems to be unavailable, this
1518 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1
1519 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s])
1520 retryOpts := []retry.CallOption{
1521 retry.WithMax(5),
1522 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)),
1523 retry.WithCodes(codes.Unavailable),
1524 }
1525
1526 opts := []grpc.DialOption{
1527 grpc.WithTransportCredentials(insecure.NewCredentials()),
1528 grpc.WithChainStreamInterceptor(
1529 metrics.StreamClientInterceptor(),
1530 messagesize.StreamClientInterceptor,
1531 internalActorStreamInterceptor(),
1532 internalerrs.LoggingStreamClientInterceptor(logger),
1533 internalerrs.PrometheusStreamClientInterceptor,
1534 retry.StreamClientInterceptor(retryOpts...),
1535 ),
1536 grpc.WithChainUnaryInterceptor(
1537 metrics.UnaryClientInterceptor(),
1538 messagesize.UnaryClientInterceptor,
1539 internalActorUnaryInterceptor(),
1540 internalerrs.LoggingUnaryClientInterceptor(logger),
1541 internalerrs.PrometheusUnaryClientInterceptor,
1542 retry.UnaryClientInterceptor(retryOpts...),
1543 ),
1544 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1545 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1546 }
1547
1548 opts = append(opts, additionalOpts...)
1549
1550 // Ensure that the message size options are set last, so they override any other
1551 // client-specific options that tweak the message size.
1552 //
1553 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1554 // take precedence over everything else with a uniform size setting that's easy to reason about.
1555 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1556
1557 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1558 // This is done lazily, so we can provide the client to use regardless of
1559 // whether we enabled gRPC or not initially.
1560 cc, err := grpc.Dial(addr, opts...)
1561 if err != nil {
1562 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1563 }
1564
1565 client := proto.NewZoektConfigurationServiceClient(cc)
1566 return client, nil
1567}
1568
1569// addDefaultPort adds a default port to a URL if one is not specified.
1570//
1571// If the URL scheme is "http" and no port is specified, "80" is used.
1572// If the scheme is "https", "443" is used.
1573//
1574// The original URL is not mutated. A copy is modified and returned.
1575func addDefaultPort(original *url.URL) *url.URL {
1576 if original == nil {
1577 return nil // don't panic
1578 }
1579
1580 if !original.IsAbs() {
1581 return original // don't do anything if the URL doesn't look like a remote URL
1582 }
1583
1584 if original.Scheme == "http" && original.Port() == "" {
1585 u := cloneURL(original)
1586 u.Host = net.JoinHostPort(u.Host, "80")
1587 return u
1588 }
1589
1590 if original.Scheme == "https" && original.Port() == "" {
1591 u := cloneURL(original)
1592 u.Host = net.JoinHostPort(u.Host, "443")
1593 return u
1594 }
1595
1596 return original
1597}
1598
1599// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1600// This is copied from net/http/clone.go
1601func cloneURL(u *url.URL) *url.URL {
1602 if u == nil {
1603 return nil
1604 }
1605 u2 := new(url.URL)
1606 *u2 = *u
1607 if u.User != nil {
1608 u2.User = new(url.Userinfo)
1609 *u2.User = *u.User
1610 }
1611 return u2
1612}
1613
1614func main() {
1615 liblog := sglog.Init(sglog.Resource{
1616 Name: "zoekt-indexserver",
1617 Version: zoekt.Version,
1618 InstanceID: zoekt.HostnameBestEffort(),
1619 })
1620 defer liblog.Sync()
1621
1622 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1623 log.Fatal(err)
1624 }
1625}
1626
1627// getBoolFromEnvironmentVariables returns the boolean defined by the first environment
1628// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set.
1629//
1630// An error is returned of the provided environment variables fails to parse as a boolean.
1631func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) {
1632 for _, envVar := range envVarNames {
1633 v := os.Getenv(envVar)
1634 if v == "" {
1635 continue
1636 }
1637
1638 b, err := strconv.ParseBool(v)
1639 if err != nil {
1640 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err)
1641 }
1642
1643 return b, nil
1644 }
1645
1646 return defaultBool, nil
1647}