fork of https://github.com/sourcegraph/zoekt
1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled
2// repositories on sourcegraph
3package main
4
5import (
6 "bytes"
7 "context"
8 _ "embed"
9 "encoding/json"
10 "errors"
11 "flag"
12 "fmt"
13 "html/template"
14 "io"
15 "io/fs"
16 "log"
17 "math"
18 "math/rand"
19 "net"
20 "net/http"
21 "net/url"
22 "os"
23 "os/exec"
24 "os/signal"
25 "path/filepath"
26 "runtime"
27 "sort"
28 "strconv"
29 "strings"
30 "sync"
31 "text/tabwriter"
32 "time"
33
34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
36 "github.com/peterbourgon/ff/v3/ffcli"
37 "github.com/prometheus/client_golang/prometheus"
38 "github.com/prometheus/client_golang/prometheus/promauto"
39 sglog "github.com/sourcegraph/log"
40 "github.com/sourcegraph/mountinfo"
41 "go.uber.org/automaxprocs/maxprocs"
42 "golang.org/x/net/trace"
43 "golang.org/x/sys/unix"
44 "google.golang.org/grpc"
45 "google.golang.org/grpc/codes"
46 "google.golang.org/grpc/credentials/insecure"
47 "google.golang.org/grpc/metadata"
48
49 "github.com/sourcegraph/zoekt"
50 "github.com/sourcegraph/zoekt/build"
51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
52 "github.com/sourcegraph/zoekt/debugserver"
53 "github.com/sourcegraph/zoekt/grpc/internalerrs"
54 "github.com/sourcegraph/zoekt/grpc/messagesize"
55 "github.com/sourcegraph/zoekt/internal/profiler"
56 "github.com/sourcegraph/zoekt/internal/tenant"
57)
58
59var (
60 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
61 Name: "resolve_revisions_seconds",
62 Help: "A histogram of latencies for resolving all repository revisions.",
63 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
64 })
65
66 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
67 Name: "resolve_revision_seconds",
68 Help: "A histogram of latencies for resolving a repository revision.",
69 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
70 }, []string{"success"}) // success=true|false
71
72 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
73 Name: "get_index_options_total",
74 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
75 })
76 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
77 Name: "get_index_options_error_total",
78 Help: "The total number of times we failed to get index options for a repository.",
79 })
80
81 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
82 Name: "index_repo_seconds",
83 Help: "A histogram of latencies for indexing a repository.",
84 Buckets: prometheus.ExponentialBucketsRange(
85 (100 * time.Millisecond).Seconds(),
86 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
87 20),
88 }, []string{
89 "state", // state is an indexState
90 "name", // name of the repository that was indexed
91 })
92
93 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{
94 Name: "index_indexing_delay_seconds",
95 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.",
96 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days
97 }, []string{
98 "state", // state is an indexState
99 "name", // the name of the repository that was indexed
100 })
101
102 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
103 Name: "index_fetch_seconds",
104 Help: "A histogram of latencies for fetching a repository.",
105 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
106 }, []string{
107 "success", // true|false
108 "name", // the name of the repository that the commits were fetched from
109 })
110
111 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
112 Name: "index_incremental_index_state",
113 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
114 }, []string{"state"}) // state is build.IndexState
115
116 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
117 Name: "index_num_indexed",
118 Help: "Number of indexed repos by code host",
119 })
120
121 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
122 Name: "index_failing_total",
123 Help: "Counts failures to index (indexing activity, should be used with rate())",
124 })
125
126 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
127 Name: "index_indexing_total",
128 Help: "Counts indexings (indexing activity, should be used with rate())",
129 })
130
131 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
132 Name: "index_num_stopped_tracking_total",
133 Help: "Counts the number of repos we stopped tracking.",
134 })
135
136 // clientMetricsOnce returns a singleton instance of the client metrics
137 // that are shared across all gRPC clients that this process creates.
138 //
139 // This function panics if the metrics cannot be registered with the default
140 // Prometheus registry.
141 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics {
142 clientMetrics := grpcprom.NewClientMetrics(
143 grpcprom.WithClientCounterOptions(),
144 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
145 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
146 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
147 )
148 prometheus.DefaultRegisterer.MustRegister(clientMetrics)
149 return clientMetrics
150 })
151)
152
153// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
154// NOTE: if you change this, you must also update gitIndex to use the same value when fetching the repo.
155const MaxFileSize = 1 << 20
156
157// set of repositories that we want to capture separate indexing metrics for
158var reposWithSeparateIndexingMetrics = make(map[string]struct{})
159
160type indexState string
161
162const (
163 indexStateFail indexState = "fail"
164 indexStateSuccess indexState = "success"
165 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
166 indexStateNoop indexState = "noop" // We didn't need to update index
167 indexStateEmpty indexState = "empty" // index is empty (empty repo)
168)
169
170// Server is the main functionality of zoekt-sourcegraph-indexserver. It
171// exists to conveniently use all the options passed in via func main.
172type Server struct {
173 logger sglog.Logger
174
175 Sourcegraph Sourcegraph
176 BatchSize int
177
178 // IndexDir is the index directory to use.
179 IndexDir string
180
181 // IndexConcurrency is the number of repositories we index at once.
182 IndexConcurrency int
183
184 // Interval is how often we sync with Sourcegraph.
185 Interval time.Duration
186
187 // CPUCount is the number of CPUs to use for indexing shards.
188 CPUCount int
189
190 queue Queue
191
192 // muIndexDir protects the index directory from concurrent access.
193 muIndexDir indexMutex
194
195 // If true, shard merging is enabled.
196 shardMerging bool
197
198 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
199 // use delta-builds for instead of normal builds
200 deltaBuildRepositoriesAllowList map[string]struct{}
201
202 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
203 // before attempting a delta build.
204 deltaShardNumberFallbackThreshold uint64
205
206 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
207 // we skip calculating symbols metadata for during builds
208 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
209
210 hostname string
211
212 mergeOpts mergeOpts
213
214 // timeout defines how long the index server waits before killing an indexing job.
215 timeout time.Duration
216}
217
218var (
219 debugLog = log.New(io.Discard, "[DEBUG] ", log.LstdFlags)
220 infoLog = log.New(os.Stderr, "[INFO] ", log.LstdFlags)
221 errorLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags)
222)
223
224// our index commands should output something every 100mb they process.
225//
226// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
227// time." famous last words. A client was indexing a monorepo with 42
228// cores... 5m was not enough.
229const noOutputTimeout = 30 * time.Minute
230
231func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
232 out := &synchronizedBuffer{}
233 cmd.Stdout = out
234 cmd.Stderr = out
235
236 tr.LazyPrintf("%s", cmd.Args)
237
238 defer func() {
239 if err != nil {
240 outS := out.String()
241 tr.LazyPrintf("failed: %v", err)
242 tr.LazyPrintf("output: %s", out)
243 tr.SetError()
244 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
245 }
246 }()
247
248 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
249
250 if err := cmd.Start(); err != nil {
251 return err
252 }
253
254 errC := make(chan error)
255 go func() {
256 errC <- cmd.Wait()
257 }()
258
259 // This channel is set after we have sent sigquit. It allows us to follow up
260 // with a sigkill if the process doesn't quit after sigquit.
261 kill := make(<-chan time.Time)
262
263 lastLen := 0
264 for {
265 select {
266 case <-time.After(noOutputTimeout):
267 // Periodically check if we have had output. If not kill the process.
268 if out.Len() != lastLen {
269 lastLen = out.Len()
270 infoLog.Printf("still running %s", cmd.Args)
271 } else {
272 // Send quit (C-\) first so we get a stack dump.
273 infoLog.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
274 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
275 errorLog.Println("quit failed:", err)
276 }
277
278 // send sigkill if still running in 10s
279 kill = time.After(10 * time.Second)
280 }
281
282 case <-kill:
283 infoLog.Printf("still running, killing %s", cmd.Args)
284 if err := cmd.Process.Kill(); err != nil {
285 errorLog.Println("kill failed:", err)
286 }
287
288 case err := <-errC:
289 if err != nil {
290 return err
291 }
292
293 tr.LazyPrintf("success")
294 return nil
295 }
296 }
297}
298
299// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
300// monitor the buffer while it is being written to.
301type synchronizedBuffer struct {
302 mu sync.Mutex
303 b bytes.Buffer
304}
305
306func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
307 sb.mu.Lock()
308 defer sb.mu.Unlock()
309 return sb.b.Write(p)
310}
311
312func (sb *synchronizedBuffer) Len() int {
313 sb.mu.Lock()
314 defer sb.mu.Unlock()
315 return sb.b.Len()
316}
317
318func (sb *synchronizedBuffer) String() string {
319 sb.mu.Lock()
320 defer sb.mu.Unlock()
321 return sb.b.String()
322}
323
324// pauseFileName if present in IndexDir will stop index jobs from
325// running. This is to make it possible to experiment with the content of the
326// IndexDir without the indexserver writing to it.
327const pauseFileName = "PAUSE"
328
329// Run the sync loop. This blocks forever.
330func (s *Server) Run() {
331 removeIncompleteShards(s.IndexDir)
332
333 // Start a goroutine which updates the queue with commits to index.
334 go func() {
335 // We update the list of indexed repos every Interval. To speed up manual
336 // testing we also listen for SIGUSR1 to trigger updates.
337 //
338 // "pkill -SIGUSR1 zoekt-sourcegra"
339 for range jitterTicker(s.Interval, unix.SIGUSR1) {
340 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
341 infoLog.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
342 continue
343 }
344
345 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
346 if err != nil {
347 errorLog.Printf("error listing repos: %s", err)
348 continue
349 }
350
351 debugLog.Printf("updating index queue with %d repositories", len(repos.IDs))
352
353 // Stop indexing repos we don't need to track anymore
354 removed := s.queue.MaybeRemoveMissing(repos.IDs)
355 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
356 if len(removed) > 0 {
357 infoLog.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
358 }
359
360 cleanupDone := make(chan struct{})
361 go func() {
362 defer close(cleanupDone)
363 s.muIndexDir.Global(func() {
364 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
365 })
366 }()
367
368 repos.IterateIndexOptions(s.queue.AddOrUpdate)
369
370 // IterateIndexOptions will only iterate over repositories that have
371 // changed since we last called list. However, we want to add all IDs
372 // back onto the queue just to check that what is on disk is still
373 // correct. This will use the last IndexOptions we stored in the
374 // queue. The repositories not on the queue (missing) need a forced
375 // fetch of IndexOptions.
376 missing := s.queue.Bump(repos.IDs)
377 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
378
379 setCompoundShardCounter(s.IndexDir)
380
381 <-cleanupDone
382 }
383 }()
384
385 go func() {
386 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
387 if s.shardMerging {
388 s.vacuum()
389 }
390 }
391 }()
392
393 go func() {
394 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
395 if s.shardMerging {
396 s.doMerge()
397 }
398 }
399 }()
400
401 for i := 0; i < s.IndexConcurrency; i++ {
402 go s.processQueue()
403 }
404
405 // block forever
406 select {}
407}
408
409// formatList returns a comma-separated list of the first min(len(v), m) items.
410func formatListUint32(v []uint32, m int) string {
411 if len(v) < m {
412 m = len(v)
413 }
414
415 sb := strings.Builder{}
416 for i := 0; i < m; i++ {
417 fmt.Fprintf(&sb, "%d, ", v[i])
418 }
419
420 if len(v) > m {
421 sb.WriteString("...")
422 }
423
424 return strings.TrimRight(sb.String(), ", ")
425}
426
427func (s *Server) processQueue() {
428 for {
429 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
430 time.Sleep(time.Second)
431 continue
432 }
433
434 item, ok := s.queue.Pop()
435 if !ok {
436 time.Sleep(time.Second)
437 continue
438 }
439
440 opts := item.Opts
441 args := s.indexArgs(opts)
442
443 ran := s.muIndexDir.With(opts.Name, func() {
444 // only record time taken once we hold the lock. This avoids us
445 // recording time taken while merging/cleanup runs.
446 start := time.Now()
447
448 state, err := s.Index(args)
449
450 elapsed := time.Since(start)
451 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
452
453 indexDelay := time.Since(item.DateAddedToQueue)
454 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds())
455
456 if err != nil {
457 errorLog.Printf("error indexing %s: %s", args.String(), err)
458 }
459
460 switch state {
461 case indexStateSuccess:
462 var branches []string
463 for _, b := range args.Branches {
464 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
465 }
466 s.logger.Info("updated index",
467 sglog.Int("tenant", args.TenantID),
468 sglog.String("repo", args.Name),
469 sglog.Uint32("id", args.RepoID),
470 sglog.Strings("branches", branches),
471 sglog.Duration("duration", elapsed),
472 sglog.Duration("index_delay", indexDelay),
473 )
474 case indexStateSuccessMeta:
475 infoLog.Printf("updated meta %s in %v", args.String(), elapsed)
476 }
477 s.queue.SetIndexed(opts, state)
478 })
479
480 if !ran {
481 // Someone else is processing the repository. We can just skip this job
482 // since the repository will be added back to the queue and we will
483 // converge to the correct behaviour.
484 debugLog.Printf("index job for repository already running: %s", args)
485 continue
486 }
487 }
488}
489
490// repoNameForMetric returns a normalized version of the given repository name that is
491// suitable for use with Prometheus metrics.
492func repoNameForMetric(repo string) string {
493 // Check to see if we want to be able to capture separate indexing metrics for this repository.
494 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
495 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
496 return repo
497 }
498
499 return ""
500}
501
502func batched(slice []uint32, size int) <-chan []uint32 {
503 c := make(chan []uint32)
504 go func() {
505 for len(slice) > 0 {
506 if size > len(slice) {
507 size = len(slice)
508 }
509 c <- slice[:size]
510 slice = slice[size:]
511 }
512 close(c)
513 }()
514 return c
515}
516
517// jitterTicker returns a ticker which ticks with a jitter. Each tick is
518// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
519//
520// sig is a list of signals which also cause the ticker to fire. This is a
521// convenience to allow manually triggering of the ticker.
522func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
523 ticker := make(chan struct{})
524
525 go func() {
526 for {
527 ticker <- struct{}{}
528 ns := int64(d)
529 jitter := rand.Int63n(ns)
530 time.Sleep(time.Duration(ns/2 + jitter))
531 }
532 }()
533
534 go func() {
535 if len(sig) == 0 {
536 return
537 }
538
539 c := make(chan os.Signal, 1)
540 signal.Notify(c, sig...)
541 for range c {
542 ticker <- struct{}{}
543 }
544 }()
545
546 return ticker
547}
548
549// Index starts an index job for repo name at commit.
550func (s *Server) Index(args *indexArgs) (state indexState, err error) {
551 tr := trace.New("index", args.Name)
552 tr.SetMaxEvents(30) // Ensure we capture all indexing events
553
554 defer func() {
555 if err != nil {
556 tr.SetError()
557 tr.LazyPrintf("error: %v", err)
558 state = indexStateFail
559 metricFailingTotal.Inc()
560 }
561 tr.LazyPrintf("state: %s", state)
562 tr.Finish()
563 }()
564
565 // Sourcegraph should always provide a tenant ID.
566 if args.TenantID < 1 {
567 return indexStateFail, tenant.ErrMissingTenant
568 }
569
570 tr.LazyPrintf("branches: %v", args.Branches)
571
572 if len(args.Branches) == 0 {
573 return indexStateEmpty, createEmptyShard(args)
574 }
575
576 repositoryName := args.Name
577 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
578 tr.LazyPrintf("marking this repository for delta build")
579 args.UseDelta = true
580 }
581
582 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
583
584 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
585 tr.LazyPrintf("skipping symbols calculation")
586 args.Symbols = false
587 }
588
589 reason := "forced"
590
591 if args.Incremental {
592 bo := args.BuildOptions()
593 bo.SetDefaults()
594 incrementalState, fn := bo.IndexState()
595 reason = string(incrementalState)
596 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
597
598 switch incrementalState {
599 case build.IndexStateEqual:
600 debugLog.Printf("%s index already up to date. Shard=%s", args.String(), fn)
601 return indexStateNoop, nil
602
603 case build.IndexStateMeta:
604 infoLog.Printf("updating index.meta %s", args.String())
605
606 // TODO(stefan) handle mergeMeta for tenant id.
607 if err := mergeMeta(bo); err != nil {
608 errorLog.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
609 } else {
610 return indexStateSuccessMeta, nil
611 }
612
613 case build.IndexStateCorrupt:
614 infoLog.Printf("falling back to full update: corrupt index: %s", args.String())
615 }
616 }
617
618 infoLog.Printf("updating index %s reason=%s", args.String(), reason)
619
620 metricIndexingTotal.Inc()
621 c := gitIndexConfig{
622 runCmd: func(cmd *exec.Cmd) error {
623 return s.loggedRun(tr, cmd)
624 },
625
626 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
627 return args.BuildOptions().FindRepositoryMetadata()
628 },
629 timeout: s.timeout,
630 }
631
632 err = gitIndex(c, args, s.Sourcegraph, s.logger)
633 if err != nil {
634 return indexStateFail, err
635 }
636
637 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
638 s.logger.Error("failed to update index status",
639 sglog.String("repo", args.Name),
640 sglog.Uint32("id", args.RepoID),
641 sglogBranches("branches", args.Branches),
642 sglog.Error(err),
643 )
644 }
645
646 return indexStateSuccess, nil
647}
648
649// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
650// it can update the zoekt_repos table.
651func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
652 // We need to read from disk for IndexTime.
653 _, metadata, ok, err := c.findRepositoryMetadata(args)
654 if err != nil {
655 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
656 }
657 if !ok {
658 return errors.New("failed to find metadata for new/updated index")
659 }
660
661 status := []indexStatus{{
662 RepoID: args.RepoID,
663 Branches: args.Branches,
664 IndexTimeUnix: metadata.IndexTime.Unix(),
665 }}
666 if err := sg.UpdateIndexStatus(status); err != nil {
667 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
668 }
669
670 return nil
671}
672
673func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
674 ss := make([]string, len(branches))
675 for i, b := range branches {
676 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
677 }
678 return sglog.Strings(key, ss)
679}
680
681func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
682 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
683 return &indexArgs{
684 IndexOptions: opts,
685 IndexDir: s.IndexDir,
686 Parallelism: parallelism,
687 Incremental: true,
688 FileLimit: MaxFileSize,
689 ShardMerging: s.shardMerging,
690 }
691}
692
693// parallelism consults both the server flags and index options to determine the number
694// of shards to index in parallel. If the CPUCount index option is provided, it always
695// overrides the server flag.
696func (s *Server) parallelism(opts IndexOptions, maxProcs int) int {
697 var parallelism int
698 if opts.ShardConcurrency > 0 {
699 parallelism = int(opts.ShardConcurrency)
700 } else {
701 parallelism = s.CPUCount
702 }
703
704 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs
705 if parallelism > 4*maxProcs {
706 parallelism = 4 * maxProcs
707 }
708
709 // If index concurrency is set, then divide the parallelism by the number of
710 // repos we're indexing in parallel
711 if s.IndexConcurrency > 1 {
712 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency)))
713 }
714
715 return parallelism
716}
717
718func createEmptyShard(args *indexArgs) error {
719 bo := args.BuildOptions()
720 bo.SetDefaults()
721 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
722
723 if args.Incremental && bo.IncrementalSkipIndexing() {
724 return nil
725 }
726
727 builder, err := build.NewBuilder(*bo)
728 if err != nil {
729 return err
730 }
731 return builder.Finish()
732}
733
734// addDebugHandlers adds handlers specific to indexserver.
735func (s *Server) addDebugHandlers(mux *http.ServeMux) {
736 // Sourcegraph's site admin view requires indexserver to serve it's admin view
737 // on "/".
738 mux.Handle("/", http.HandlerFunc(s.handleRoot))
739
740 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
741 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
742 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
743 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
744 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
745 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
746}
747
748func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
749 if r.Method != "GET" {
750 w.Header().Set("Allow", "GET")
751 w.WriteHeader(http.StatusMethodNotAllowed)
752 return
753 }
754
755 response := struct {
756 Hostname string
757 }{
758 Hostname: s.hostname,
759 }
760
761 b, err := json.Marshal(response)
762 if err != nil {
763 http.Error(w, err.Error(), http.StatusInternalServerError)
764 return
765 }
766
767 w.Header().Set("Content-Type", "application/json; charset=utf-8")
768 w.Write(b)
769}
770
771var rootTmpl = template.Must(template.New("name").Parse(`
772<html>
773 <body>
774 <a href="debug">Debug</a><br />
775 <a href="debug/requests">Traces</a><br />
776 {{.IndexMsg}}<br />
777 <br />
778 <h3>Reindex</h3>
779 {{if .Repos}}
780 <a href="?show_repos=false">hide repos</a><br />
781 <table style="margin-top: 20px">
782 <th style="text-align:left">Name</th>
783 <th style="text-align:left">ID (click to reindex)</th>
784 {{range .Repos}}
785 <tr>
786 <td>{{.Name}}</td>
787 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
788 </tr>
789 {{end}}
790 </table>
791 {{else}}
792 <a href="?show_repos=true">show repos</a><br />
793 {{end}}
794 </body>
795</html>
796`))
797
798func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
799 if r.Method != "GET" {
800 w.Header().Set("Allow", "GET")
801 w.WriteHeader(http.StatusMethodNotAllowed)
802 return
803 }
804
805 values := r.URL.Query()
806
807 // ?id=
808 indexMsg := ""
809 if v := values.Get("id"); v != "" {
810 id, err := strconv.Atoi(v)
811 if err != nil {
812 http.Error(w, err.Error(), http.StatusBadRequest)
813 return
814 }
815 indexMsg, _ = s.forceIndex(uint32(id))
816 }
817
818 // ?show_repos=
819 showRepos := false
820 if v := values.Get("show_repos"); v != "" {
821 showRepos, _ = strconv.ParseBool(v)
822 }
823
824 type Repo struct {
825 ID uint32
826 Name string
827 }
828 var data struct {
829 Repos []Repo
830 IndexMsg string
831 }
832
833 data.IndexMsg = indexMsg
834
835 if showRepos {
836 s.queue.Iterate(func(opts *IndexOptions) {
837 data.Repos = append(data.Repos, Repo{
838 ID: opts.RepoID,
839 Name: opts.Name,
840 })
841 })
842 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
843 }
844
845 _ = rootTmpl.Execute(w, data)
846}
847
848// handleReindex triggers a reindex asynocronously. If a reindex was triggered
849// the request returns with status 202. The caller can infer the new state of
850// the index by calling List.
851func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
852 if r.Method != http.MethodPost {
853 w.Header().Set("Allow", http.MethodPost)
854 w.WriteHeader(http.StatusMethodNotAllowed)
855 return
856 }
857
858 err := r.ParseForm()
859 if err != nil {
860 http.Error(w, err.Error(), http.StatusBadRequest)
861 return
862 }
863
864 id, err := strconv.Atoi(r.Form.Get("repo"))
865 if err != nil {
866 http.Error(w, err.Error(), http.StatusBadRequest)
867 return
868 }
869
870 go func() { s.forceIndex(uint32(id)) }()
871
872 // 202 Accepted
873 w.WriteHeader(http.StatusAccepted)
874}
875
876func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
877 withIndexed := true
878 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
879 withIndexed = b
880 }
881
882 var indexed []uint32
883 if withIndexed {
884 indexed = listIndexed(s.IndexDir)
885 }
886
887 repos, err := s.Sourcegraph.List(r.Context(), indexed)
888 if err != nil {
889 http.Error(w, err.Error(), http.StatusInternalServerError)
890 return
891 }
892
893 bw := bytes.Buffer{}
894
895 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
896
897 _, err = fmt.Fprintf(tw, "ID\tName\n")
898 if err != nil {
899 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
900 return
901 }
902
903 s.queue.mu.Lock()
904 name := ""
905 for _, id := range repos.IDs {
906 if item := s.queue.get(id); item != nil {
907 name = item.opts.Name
908 } else {
909 name = ""
910 }
911 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
912 if err != nil {
913 debugLog.Printf("handleDebugList: %s\n", err.Error())
914 }
915 }
916 s.queue.mu.Unlock()
917
918 if err != nil {
919 http.Error(w, err.Error(), http.StatusInternalServerError)
920 return
921 }
922
923 err = tw.Flush()
924 if err != nil {
925 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
926 return
927 }
928
929 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
930
931 if _, err := io.Copy(w, &bw); err != nil {
932 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
933 return
934 }
935}
936
937// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
938// can run this command during periods of low usage (evenings, weekends) to
939// trigger an initial merge run. In the steady-state, merges happen rarely, even
940// on busy instances, and users can rely on automatic merging instead.
941func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
942 // A merge operation can take very long, depending on the number merges and the
943 // target size of the compound shards. We run the merge in the background and
944 // return immediately to the user.
945 //
946 // We track the status of the merge with metricShardMergingRunning.
947 go func() {
948 s.doMerge()
949 }()
950 _, _ = w.Write([]byte("merging enqueued\n"))
951}
952
953func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
954 indexed := listIndexed(s.IndexDir)
955
956 bw := bytes.Buffer{}
957
958 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
959
960 _, err := fmt.Fprintf(tw, "ID\tName\n")
961 if err != nil {
962 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
963 return
964 }
965
966 s.queue.mu.Lock()
967 name := ""
968 for _, id := range indexed {
969 if item := s.queue.get(id); item != nil {
970 name = item.opts.Name
971 } else {
972 name = ""
973 }
974 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
975 if err != nil {
976 debugLog.Printf("handleDebugIndexed: %s\n", err.Error())
977 }
978 }
979 s.queue.mu.Unlock()
980
981 if err != nil {
982 http.Error(w, err.Error(), http.StatusInternalServerError)
983 return
984 }
985
986 err = tw.Flush()
987 if err != nil {
988 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
989 return
990 }
991
992 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
993
994 if _, err := io.Copy(w, &bw); err != nil {
995 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
996 return
997 }
998}
999
1000// forceIndex will run the index job for repo name now. It will return always
1001// return a string explaining what it did, even if it failed.
1002func (s *Server) forceIndex(id uint32) (string, error) {
1003 var opts IndexOptions
1004 var err error
1005 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
1006 opts = o
1007 }, func(_ uint32, e error) {
1008 err = e
1009 }, id)
1010 if err != nil {
1011 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
1012 }
1013
1014 args := s.indexArgs(opts)
1015 args.Incremental = false // force re-index
1016
1017 var state indexState
1018 ran := s.muIndexDir.With(opts.Name, func() {
1019 state, err = s.Index(args)
1020 })
1021 if !ran {
1022 return fmt.Sprintf("index job for repository already running: %s", args), nil
1023 }
1024 if err != nil {
1025 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
1026 }
1027 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
1028}
1029
1030func listIndexed(indexDir string) []uint32 {
1031 index := getShards(indexDir)
1032 metricNumIndexed.Set(float64(len(index)))
1033 repoIDs := make([]uint32, 0, len(index))
1034 for id := range index {
1035 repoIDs = append(repoIDs, id)
1036 }
1037 sort.Slice(repoIDs, func(i, j int) bool {
1038 return repoIDs[i] < repoIDs[j]
1039 })
1040 return repoIDs
1041}
1042
1043// setupTmpDir sets up a temporary directory on the same volume as the
1044// indexes.
1045//
1046// If main is true we will delete older temp directories left around. main is
1047// false when this is a debug command.
1048func setupTmpDir(logger sglog.Logger, main bool, index string) error {
1049 // change the target tmp directory depending on if it's our main daemon or a
1050 // debug sub command.
1051 dir := ".indexserver.debug.tmp"
1052 if main {
1053 dir = ".indexserver.tmp"
1054 }
1055
1056 tmpRoot := filepath.Join(index, dir)
1057
1058 if main {
1059 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot))
1060 err := os.RemoveAll(tmpRoot)
1061 if err != nil {
1062 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err))
1063 }
1064 }
1065
1066 if err := os.MkdirAll(tmpRoot, 0o755); err != nil {
1067 return err
1068 }
1069
1070 return os.Setenv("TMPDIR", tmpRoot)
1071}
1072
1073func printMetaData(fn string) error {
1074 repo, indexMeta, err := zoekt.ReadMetadataPath(fn)
1075 if err != nil {
1076 return err
1077 }
1078
1079 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1080 if err != nil {
1081 return err
1082 }
1083
1084 err = json.NewEncoder(os.Stdout).Encode(repo)
1085 if err != nil {
1086 return err
1087 }
1088 return nil
1089}
1090
1091func printShardStats(fn string) error {
1092 f, err := os.Open(fn)
1093 if err != nil {
1094 return err
1095 }
1096
1097 iFile, err := zoekt.NewIndexFile(f)
1098 if err != nil {
1099 return err
1100 }
1101
1102 return zoekt.PrintNgramStats(iFile)
1103}
1104
1105func srcLogLevelIsDebug() bool {
1106 lvl := os.Getenv(sglog.EnvLogLevel)
1107 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1108}
1109
1110func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1111 v := os.Getenv(k)
1112 if v == "" {
1113 return defaultVal
1114 }
1115 b, err := strconv.ParseBool(v)
1116 if err != nil {
1117 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1118 }
1119 return b
1120}
1121
1122func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1123 v := os.Getenv(k)
1124 if v == "" {
1125 return defaultVal
1126 }
1127 i, err := strconv.ParseInt(v, 10, 64)
1128 if err != nil {
1129 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1130 }
1131 return i
1132}
1133
1134func getEnvWithDefaultInt(k string, defaultVal int) int {
1135 v := os.Getenv(k)
1136 if v == "" {
1137 return defaultVal
1138 }
1139 i, err := strconv.Atoi(k)
1140 if err != nil {
1141 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1142 }
1143 return i
1144}
1145
1146func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1147 v := os.Getenv(k)
1148 if v == "" {
1149 return defaultVal
1150 }
1151 i, err := strconv.ParseUint(v, 10, 64)
1152 if err != nil {
1153 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1154 }
1155 return i
1156}
1157
1158func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1159 v := os.Getenv(k)
1160 if v == "" {
1161 return defaultVal
1162 }
1163 f, err := strconv.ParseFloat(v, 64)
1164 if err != nil {
1165 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1166 }
1167 return f
1168}
1169
1170func getEnvWithDefaultString(k string, defaultVal string) string {
1171 v := os.Getenv(k)
1172 if v == "" {
1173 return defaultVal
1174 }
1175 return v
1176}
1177
1178func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1179 v := os.Getenv(k)
1180 if v == "" {
1181 return defaultVal
1182 }
1183
1184 d, err := time.ParseDuration(v)
1185 if err != nil {
1186 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1187 }
1188 return d
1189}
1190
1191func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1192 set := map[string]struct{}{}
1193 for _, v := range strings.Split(os.Getenv(k), ",") {
1194 v = strings.TrimSpace(v)
1195 if v != "" {
1196 set[v] = struct{}{}
1197 }
1198 }
1199 return set
1200}
1201
1202func joinStringSet(set map[string]struct{}, sep string) string {
1203 var xs []string
1204 for x := range set {
1205 xs = append(xs, x)
1206 }
1207
1208 return strings.Join(xs, sep)
1209}
1210
1211func setCompoundShardCounter(indexDir string) {
1212 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt"))
1213 if err != nil {
1214 errorLog.Printf("setCompoundShardCounter: %s\n", err)
1215 return
1216 }
1217 metricNumberCompoundShards.Set(float64(len(fns)))
1218}
1219
1220func rootCmd() *ffcli.Command {
1221 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1222 conf := rootConfig{
1223 Main: true,
1224 }
1225 conf.registerRootFlags(rootFs)
1226
1227 return &ffcli.Command{
1228 FlagSet: rootFs,
1229 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1230 Subcommands: []*ffcli.Command{debugCmd()},
1231 Exec: func(ctx context.Context, args []string) error {
1232 return startServer(conf)
1233 },
1234 }
1235}
1236
1237type rootConfig struct {
1238 // Main is true if this rootConfig is for our main long running command (the
1239 // indexserver). Debug commands should not set this value. This is used to
1240 // determine if we need to run tmpfriend.
1241 Main bool
1242
1243 root string
1244 interval time.Duration
1245 index string
1246 indexConcurrency int64
1247 listen string
1248 hostname string
1249 cpuFraction float64
1250
1251 // config values related to shard merging
1252 disableShardMerging bool
1253 vacuumInterval time.Duration
1254 mergeInterval time.Duration
1255 targetSize int64
1256 minSize int64
1257 minAgeDays int
1258
1259 // config values related to backoff indexing repos with one or more consecutive failures
1260 backoffDuration time.Duration
1261 maxBackoffDuration time.Duration
1262}
1263
1264func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1265 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1266 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1267 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
1268 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use")
1269 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1270 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1271 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1272 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1273 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1274
1275 // flags related to shard merging
1276 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging")
1277 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1278 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1279 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB")
1280 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB")
1281 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1282}
1283
1284func startServer(conf rootConfig) error {
1285 s, err := newServer(conf)
1286 if err != nil {
1287 return err
1288 }
1289
1290 profiler.Init("zoekt-sourcegraph-indexserver")
1291 setCompoundShardCounter(s.IndexDir)
1292
1293 if conf.listen != "" {
1294
1295 mux := http.NewServeMux()
1296 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1297 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1298 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1299 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1300 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1301 }...)
1302 s.addDebugHandlers(mux)
1303
1304 go func() {
1305 debugLog.Printf("serving HTTP on %s", conf.listen)
1306 log.Fatal(http.ListenAndServe(conf.listen, mux))
1307 }()
1308
1309 // Serve mux on a unix domain socket on a best-effort-basis so that
1310 // webserver can call the endpoints via the shared filesystem.
1311 //
1312 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1313 // on the socket due to permission errors. See
1314 // https://github.com/docker/for-mac/issues/6239
1315 go func() {
1316 serveHTTPOverSocket := func() error {
1317 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1318 // We cannot bind a socket to an existing pathname.
1319 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1320 return fmt.Errorf("error removing socket file: %s", socket)
1321 }
1322 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1323 // unixpacket).
1324 l, err := net.Listen("unix", socket)
1325 if err != nil {
1326 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1327 }
1328 // Indexserver (root) and webserver (Sourcegraph) run with
1329 // different users. Per default, the socket is created with
1330 // permission 755 (root root), which doesn't let webserver write to
1331 // it.
1332 //
1333 // See https://github.com/golang/go/issues/11822 for more context.
1334 if err := os.Chmod(socket, 0o777); err != nil {
1335 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1336 }
1337 debugLog.Printf("serving HTTP on %s", socket)
1338 return http.Serve(l, mux)
1339 }
1340 debugLog.Print(serveHTTPOverSocket())
1341 }()
1342 }
1343
1344 oc := &ownerChecker{
1345 Path: filepath.Join(conf.index, "owner.txt"),
1346 Hostname: conf.hostname,
1347 }
1348 go oc.Run()
1349
1350 logger := sglog.Scoped("metricsRegistration")
1351 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1352
1353 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1354 prometheus.DefaultRegisterer.MustRegister(c)
1355
1356 s.Run()
1357 return nil
1358}
1359
1360func newServer(conf rootConfig) (*Server, error) {
1361 logger := sglog.Scoped("server")
1362
1363 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1364 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1365 }
1366 if conf.index == "" {
1367 return nil, fmt.Errorf("must set -index")
1368 }
1369 if conf.root == "" {
1370 return nil, fmt.Errorf("must set -sourcegraph_url")
1371 }
1372 rootURL, err := url.Parse(conf.root)
1373 if err != nil {
1374 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1375 }
1376
1377 rootURL = addDefaultPort(rootURL)
1378
1379 // Tune GOMAXPROCS to match Linux container CPU quota.
1380 _, _ = maxprocs.Set()
1381
1382 // Automatically prepend our own path at the front, to minimize
1383 // required configuration.
1384 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1385 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1386 }
1387
1388 if _, err := os.Stat(conf.index); err != nil {
1389 if err := os.MkdirAll(conf.index, 0o755); err != nil {
1390 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1391 }
1392 }
1393
1394 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil {
1395 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1396 }
1397
1398 if srcLogLevelIsDebug() {
1399 debugLog.SetOutput(os.Stderr)
1400 }
1401
1402 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1403 if len(reposWithSeparateIndexingMetrics) > 0 {
1404 debugLog.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1405 }
1406
1407 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1408 if len(deltaBuildRepositoriesAllowList) > 0 {
1409 debugLog.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1410 }
1411
1412 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1413 if deltaShardNumberFallbackThreshold > 0 {
1414 debugLog.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1415 } else {
1416 debugLog.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1417 }
1418
1419 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1420 if len(reposShouldSkipSymbolsCalculation) > 0 {
1421 debugLog.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1422 }
1423
1424 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout)
1425 if indexingTimeout != defaultIndexingTimeout {
1426 debugLog.Printf("using configured indexing timeout: %s", indexingTimeout)
1427 }
1428
1429 var sg Sourcegraph
1430 if rootURL.IsAbs() {
1431 var batchSize int
1432 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1433 batchSize, err = strconv.Atoi(v)
1434 if err != nil {
1435 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1436 }
1437 }
1438
1439 opts := []SourcegraphClientOption{
1440 WithBatchSize(batchSize),
1441 }
1442
1443 logger := sglog.Scoped("zoektConfigurationGRPCClient")
1444 client, err := dialGRPCClient(rootURL.Host, logger)
1445 if err != nil {
1446 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1447 }
1448
1449 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...)
1450
1451 } else {
1452 sg = sourcegraphFake{
1453 RootDir: rootURL.String(),
1454 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1455 }
1456 }
1457
1458 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
1459 if cpuCount < 1 {
1460 cpuCount = 1
1461 }
1462
1463 if conf.indexConcurrency < 1 {
1464 conf.indexConcurrency = 1
1465 } else if conf.indexConcurrency > int64(cpuCount) {
1466 conf.indexConcurrency = int64(cpuCount)
1467 }
1468
1469 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1470
1471 return &Server{
1472 logger: logger,
1473 Sourcegraph: sg,
1474 IndexDir: conf.index,
1475 IndexConcurrency: int(conf.indexConcurrency),
1476 Interval: conf.interval,
1477 CPUCount: cpuCount,
1478 queue: *q,
1479 shardMerging: !conf.disableShardMerging,
1480 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1481 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1482 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1483 hostname: conf.hostname,
1484 mergeOpts: mergeOpts{
1485 vacuumInterval: conf.vacuumInterval,
1486 mergeInterval: conf.mergeInterval,
1487 targetSizeBytes: conf.targetSize * 1024 * 1024,
1488 minSizeBytes: conf.minSize * 1024 * 1024,
1489 minAgeDays: conf.minAgeDays,
1490 },
1491 timeout: indexingTimeout,
1492 }, err
1493}
1494
1495// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1496// for the indexed-search-configuration gRPC service.
1497//
1498// The default backoff strategy is modeled after the default settings used by
1499// retryablehttp.DefaultClient.
1500//
1501// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1502// - Unavailable
1503// - Aborted
1504//
1505//go:embed default_grpc_service_configuration.json
1506var defaultGRPCServiceConfigurationJSON string
1507
1508func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1509 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1510 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1511 return invoker(ctx, method, req, reply, cc, opts...)
1512 }
1513}
1514
1515func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1516 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1517 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1518 return streamer(ctx, desc, cc, method, opts...)
1519 }
1520}
1521
1522// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1523// This can be overridden by providing custom Server/Dial options.
1524const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1525
1526func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) {
1527 metrics := clientMetricsOnce()
1528
1529 // If the service seems to be unavailable, this
1530 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1
1531 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s])
1532 retryOpts := []retry.CallOption{
1533 retry.WithMax(5),
1534 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)),
1535 retry.WithCodes(codes.Unavailable),
1536 }
1537
1538 opts := []grpc.DialOption{
1539 grpc.WithTransportCredentials(insecure.NewCredentials()),
1540 grpc.WithChainStreamInterceptor(
1541 metrics.StreamClientInterceptor(),
1542 messagesize.StreamClientInterceptor,
1543 internalActorStreamInterceptor(),
1544 internalerrs.LoggingStreamClientInterceptor(logger),
1545 internalerrs.PrometheusStreamClientInterceptor,
1546 retry.StreamClientInterceptor(retryOpts...),
1547 ),
1548 grpc.WithChainUnaryInterceptor(
1549 metrics.UnaryClientInterceptor(),
1550 messagesize.UnaryClientInterceptor,
1551 internalActorUnaryInterceptor(),
1552 internalerrs.LoggingUnaryClientInterceptor(logger),
1553 internalerrs.PrometheusUnaryClientInterceptor,
1554 retry.UnaryClientInterceptor(retryOpts...),
1555 ),
1556 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1557 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1558 }
1559
1560 opts = append(opts, additionalOpts...)
1561
1562 // Ensure that the message size options are set last, so they override any other
1563 // client-specific options that tweak the message size.
1564 //
1565 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1566 // take precedence over everything else with a uniform size setting that's easy to reason about.
1567 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1568
1569 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1570 // This is done lazily, so we can provide the client to use regardless of
1571 // whether we enabled gRPC or not initially.
1572 cc, err := grpc.Dial(addr, opts...)
1573 if err != nil {
1574 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1575 }
1576
1577 client := proto.NewZoektConfigurationServiceClient(cc)
1578 return client, nil
1579}
1580
1581// addDefaultPort adds a default port to a URL if one is not specified.
1582//
1583// If the URL scheme is "http" and no port is specified, "80" is used.
1584// If the scheme is "https", "443" is used.
1585//
1586// The original URL is not mutated. A copy is modified and returned.
1587func addDefaultPort(original *url.URL) *url.URL {
1588 if original == nil {
1589 return nil // don't panic
1590 }
1591
1592 if !original.IsAbs() {
1593 return original // don't do anything if the URL doesn't look like a remote URL
1594 }
1595
1596 if original.Scheme == "http" && original.Port() == "" {
1597 u := cloneURL(original)
1598 u.Host = net.JoinHostPort(u.Host, "80")
1599 return u
1600 }
1601
1602 if original.Scheme == "https" && original.Port() == "" {
1603 u := cloneURL(original)
1604 u.Host = net.JoinHostPort(u.Host, "443")
1605 return u
1606 }
1607
1608 return original
1609}
1610
1611// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1612// This is copied from net/http/clone.go
1613func cloneURL(u *url.URL) *url.URL {
1614 if u == nil {
1615 return nil
1616 }
1617 u2 := new(url.URL)
1618 *u2 = *u
1619 if u.User != nil {
1620 u2.User = new(url.Userinfo)
1621 *u2.User = *u.User
1622 }
1623 return u2
1624}
1625
1626func main() {
1627 liblog := sglog.Init(sglog.Resource{
1628 Name: "zoekt-indexserver",
1629 Version: zoekt.Version,
1630 InstanceID: zoekt.HostnameBestEffort(),
1631 })
1632 defer liblog.Sync()
1633
1634 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1635 log.Fatal(err)
1636 }
1637}
1638
1639// getBoolFromEnvironmentVariables returns the boolean defined by the first environment
1640// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set.
1641//
1642// An error is returned of the provided environment variables fails to parse as a boolean.
1643func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) {
1644 for _, envVar := range envVarNames {
1645 v := os.Getenv(envVar)
1646 if v == "" {
1647 continue
1648 }
1649
1650 b, err := strconv.ParseBool(v)
1651 if err != nil {
1652 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err)
1653 }
1654
1655 return b, nil
1656 }
1657
1658 return defaultBool, nil
1659}