fork of https://github.com/sourcegraph/zoekt
1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled
2// repositories on sourcegraph
3package main
4
5import (
6 "bytes"
7 "context"
8 _ "embed"
9 "encoding/json"
10 "errors"
11 "flag"
12 "fmt"
13 "html/template"
14 "io"
15 "io/fs"
16 "log"
17 "math"
18 "math/rand"
19 "net"
20 "net/http"
21 "net/url"
22 "os"
23 "os/exec"
24 "os/signal"
25 "path/filepath"
26 "runtime"
27 "sort"
28 "strconv"
29 "strings"
30 "sync"
31 "text/tabwriter"
32 "time"
33
34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
36 "github.com/peterbourgon/ff/v3/ffcli"
37 "github.com/prometheus/client_golang/prometheus"
38 "github.com/prometheus/client_golang/prometheus/promauto"
39 sglog "github.com/sourcegraph/log"
40 "github.com/sourcegraph/mountinfo"
41 "github.com/sourcegraph/zoekt"
42 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
43 "github.com/sourcegraph/zoekt/grpc/internalerrs"
44 "github.com/sourcegraph/zoekt/grpc/messagesize"
45 "github.com/sourcegraph/zoekt/index"
46 "github.com/sourcegraph/zoekt/internal/debugserver"
47 "github.com/sourcegraph/zoekt/internal/profiler"
48 "github.com/sourcegraph/zoekt/internal/tenant"
49
50 "go.uber.org/automaxprocs/maxprocs"
51 "golang.org/x/net/trace"
52 "golang.org/x/sys/unix"
53 "google.golang.org/grpc"
54 "google.golang.org/grpc/codes"
55 "google.golang.org/grpc/credentials/insecure"
56 "google.golang.org/grpc/metadata"
57)
58
59var (
60 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
61 Name: "resolve_revisions_seconds",
62 Help: "A histogram of latencies for resolving all repository revisions.",
63 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
64 })
65
66 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
67 Name: "resolve_revision_seconds",
68 Help: "A histogram of latencies for resolving a repository revision.",
69 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
70 }, []string{"success"}) // success=true|false
71
72 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
73 Name: "get_index_options_total",
74 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
75 })
76 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
77 Name: "get_index_options_error_total",
78 Help: "The total number of times we failed to get index options for a repository.",
79 })
80
81 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
82 Name: "index_repo_seconds",
83 Help: "A histogram of latencies for indexing a repository.",
84 Buckets: prometheus.ExponentialBucketsRange(
85 (100 * time.Millisecond).Seconds(),
86 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
87 20),
88 }, []string{
89 "state", // state is an indexState
90 "name", // name of the repository that was indexed
91 })
92
93 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{
94 Name: "index_indexing_delay_seconds",
95 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.",
96 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days
97 }, []string{
98 "state", // state is an indexState
99 "name", // the name of the repository that was indexed
100 })
101
102 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
103 Name: "index_fetch_seconds",
104 Help: "A histogram of latencies for fetching a repository.",
105 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
106 }, []string{
107 "success", // true|false
108 "name", // the name of the repository that the commits were fetched from
109 })
110
111 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
112 Name: "index_incremental_index_state",
113 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
114 }, []string{"state"}) // state is index.IndexState
115
116 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
117 Name: "index_num_indexed",
118 Help: "Number of indexed repos by code host",
119 })
120
121 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
122 Name: "index_failing_total",
123 Help: "Counts failures to index (indexing activity, should be used with rate())",
124 })
125
126 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
127 Name: "index_indexing_total",
128 Help: "Counts indexings (indexing activity, should be used with rate())",
129 })
130
131 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
132 Name: "index_num_stopped_tracking_total",
133 Help: "Counts the number of repos we stopped tracking.",
134 })
135
136 // clientMetricsOnce returns a singleton instance of the client metrics
137 // that are shared across all gRPC clients that this process creates.
138 //
139 // This function panics if the metrics cannot be registered with the default
140 // Prometheus registry.
141 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics {
142 clientMetrics := grpcprom.NewClientMetrics(
143 grpcprom.WithClientCounterOptions(),
144 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
145 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
146 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
147 )
148 prometheus.DefaultRegisterer.MustRegister(clientMetrics)
149 return clientMetrics
150 })
151)
152
153// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
154// NOTE: if you change this, you must also update gitIndex to use the same value when fetching the repo.
155const MaxFileSize = 1 << 20
156
157// set of repositories that we want to capture separate indexing metrics for
158var reposWithSeparateIndexingMetrics = make(map[string]struct{})
159
160type indexState string
161
162const (
163 indexStateFail indexState = "fail"
164 indexStateSuccess indexState = "success"
165 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
166 indexStateNoop indexState = "noop" // We didn't need to update index
167 indexStateEmpty indexState = "empty" // index is empty (empty repo)
168)
169
170// Server is the main functionality of zoekt-sourcegraph-indexserver. It
171// exists to conveniently use all the options passed in via func main.
172type Server struct {
173 logger sglog.Logger
174
175 Sourcegraph Sourcegraph
176 BatchSize int
177
178 // IndexDir is the index directory to use.
179 IndexDir string
180
181 // IndexConcurrency is the number of repositories we index at once.
182 IndexConcurrency int
183
184 // Interval is how often we sync with Sourcegraph.
185 Interval time.Duration
186
187 // CPUCount is the number of CPUs to use for indexing shards.
188 CPUCount int
189
190 queue Queue
191
192 // muIndexDir protects the index directory from concurrent access.
193 muIndexDir indexMutex
194
195 // If true, shard merging is enabled.
196 shardMerging bool
197
198 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
199 // use delta-builds for instead of normal builds
200 deltaBuildRepositoriesAllowList map[string]struct{}
201
202 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
203 // before attempting a delta build.
204 deltaShardNumberFallbackThreshold uint64
205
206 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
207 // we skip calculating symbols metadata for during builds
208 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
209
210 hostname string
211
212 mergeOpts mergeOpts
213
214 // timeout defines how long the index server waits before killing an indexing job.
215 timeout time.Duration
216}
217
218var (
219 debugLog = log.New(io.Discard, "[DEBUG] ", log.LstdFlags)
220 infoLog = log.New(os.Stderr, "[INFO] ", log.LstdFlags)
221 errorLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags)
222)
223
224// our index commands should output something every 100mb they process.
225//
226// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
227// time." famous last words. A client was indexing a monorepo with 42
228// cores... 5m was not enough.
229const noOutputTimeout = 30 * time.Minute
230
231func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
232 out := &synchronizedBuffer{}
233 cmd.Stdout = out
234 cmd.Stderr = out
235
236 tr.LazyPrintf("%s", cmd.Args)
237
238 defer func() {
239 if err != nil {
240 outS := out.String()
241 tr.LazyPrintf("failed: %v", err)
242 tr.LazyPrintf("output: %s", out)
243 tr.SetError()
244 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
245 }
246 }()
247
248 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
249
250 if err := cmd.Start(); err != nil {
251 return err
252 }
253
254 errC := make(chan error)
255 go func() {
256 errC <- cmd.Wait()
257 }()
258
259 // This channel is set after we have sent sigquit. It allows us to follow up
260 // with a sigkill if the process doesn't quit after sigquit.
261 kill := make(<-chan time.Time)
262
263 lastLen := 0
264 for {
265 select {
266 case <-time.After(noOutputTimeout):
267 // Periodically check if we have had output. If not kill the process.
268 if out.Len() != lastLen {
269 lastLen = out.Len()
270 infoLog.Printf("still running %s", cmd.Args)
271 } else {
272 // Send quit (C-\) first so we get a stack dump.
273 infoLog.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
274 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
275 errorLog.Println("quit failed:", err)
276 }
277
278 // send sigkill if still running in 10s
279 kill = time.After(10 * time.Second)
280 }
281
282 case <-kill:
283 infoLog.Printf("still running, killing %s", cmd.Args)
284 if err := cmd.Process.Kill(); err != nil {
285 errorLog.Println("kill failed:", err)
286 }
287
288 case err := <-errC:
289 if err != nil {
290 return err
291 }
292
293 tr.LazyPrintf("success")
294 return nil
295 }
296 }
297}
298
299// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
300// monitor the buffer while it is being written to.
301type synchronizedBuffer struct {
302 mu sync.Mutex
303 b bytes.Buffer
304}
305
306func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
307 sb.mu.Lock()
308 defer sb.mu.Unlock()
309 return sb.b.Write(p)
310}
311
312func (sb *synchronizedBuffer) Len() int {
313 sb.mu.Lock()
314 defer sb.mu.Unlock()
315 return sb.b.Len()
316}
317
318func (sb *synchronizedBuffer) String() string {
319 sb.mu.Lock()
320 defer sb.mu.Unlock()
321 return sb.b.String()
322}
323
324// pauseFileName if present in IndexDir will stop index jobs from
325// running. This is to make it possible to experiment with the content of the
326// IndexDir without the indexserver writing to it.
327const pauseFileName = "PAUSE"
328
329// Run the sync loop. This blocks forever.
330func (s *Server) Run() {
331 removeIncompleteShards(s.IndexDir)
332
333 // Start a goroutine which updates the queue with commits to index.
334 go func() {
335 // We update the list of indexed repos every Interval. To speed up manual
336 // testing we also listen for SIGUSR1 to trigger updates.
337 //
338 // "pkill -SIGUSR1 zoekt-sourcegra"
339 for range jitterTicker(s.Interval, unix.SIGUSR1) {
340 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
341 infoLog.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
342 continue
343 }
344
345 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
346 if err != nil {
347 errorLog.Printf("error listing repos: %s", err)
348 continue
349 }
350
351 debugLog.Printf("updating index queue with %d repositories", len(repos.IDs))
352
353 // Stop indexing repos we don't need to track anymore
354 removed := s.queue.MaybeRemoveMissing(repos.IDs)
355 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
356 if len(removed) > 0 {
357 infoLog.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
358 }
359
360 cleanupDone := make(chan struct{})
361 go func() {
362 defer close(cleanupDone)
363 s.muIndexDir.Global(func() {
364 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
365 })
366 }()
367
368 repos.IterateIndexOptions(s.queue.AddOrUpdate)
369
370 // IterateIndexOptions will only iterate over repositories that have
371 // changed since we last called list. However, we want to add all IDs
372 // back onto the queue just to check that what is on disk is still
373 // correct. This will use the last IndexOptions we stored in the
374 // queue. The repositories not on the queue (missing) need a forced
375 // fetch of IndexOptions.
376 missing := s.queue.Bump(repos.IDs)
377 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
378
379 setShardsCounter(s.IndexDir)
380
381 <-cleanupDone
382 }
383 }()
384
385 go func() {
386 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
387 if s.shardMerging {
388 s.vacuum()
389 }
390 }
391 }()
392
393 go func() {
394 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
395 if s.shardMerging {
396 s.doMerge()
397 }
398 }
399 }()
400
401 for i := 0; i < s.IndexConcurrency; i++ {
402 go s.processQueue()
403 }
404
405 // block forever
406 select {}
407}
408
409// formatList returns a comma-separated list of the first min(len(v), m) items.
410func formatListUint32(v []uint32, m int) string {
411 if len(v) < m {
412 m = len(v)
413 }
414
415 sb := strings.Builder{}
416 for i := 0; i < m; i++ {
417 fmt.Fprintf(&sb, "%d, ", v[i])
418 }
419
420 if len(v) > m {
421 sb.WriteString("...")
422 }
423
424 return strings.TrimRight(sb.String(), ", ")
425}
426
427func (s *Server) processQueue() {
428 for {
429 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
430 time.Sleep(time.Second)
431 continue
432 }
433
434 item, ok := s.queue.Pop()
435 if !ok {
436 time.Sleep(time.Second)
437 continue
438 }
439
440 opts := item.Opts
441 args := s.indexArgs(opts)
442
443 ran := s.muIndexDir.With(opts.Name, func() {
444 // only record time taken once we hold the lock. This avoids us
445 // recording time taken while merging/cleanup runs.
446 start := time.Now()
447
448 state, err := s.Index(args)
449
450 elapsed := time.Since(start)
451 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
452
453 indexDelay := time.Since(item.DateAddedToQueue)
454 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds())
455
456 if err != nil {
457 errorLog.Printf("error indexing %s: %s", args.String(), err)
458 }
459
460 switch state {
461 case indexStateSuccess:
462 var branches []string
463 for _, b := range args.Branches {
464 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
465 }
466 s.logger.Info("updated index",
467 sglog.Int("tenant", args.TenantID),
468 sglog.String("repo", args.Name),
469 sglog.Uint32("id", args.RepoID),
470 sglog.Strings("branches", branches),
471 sglog.Duration("duration", elapsed),
472 sglog.Duration("index_delay", indexDelay),
473 )
474 case indexStateSuccessMeta:
475 infoLog.Printf("updated meta %s in %v", args.String(), elapsed)
476 }
477 s.queue.SetIndexed(opts, state)
478 })
479
480 if !ran {
481 // Someone else is processing the repository. We can just skip this job
482 // since the repository will be added back to the queue and we will
483 // converge to the correct behaviour.
484 debugLog.Printf("index job for repository already running: %s", args)
485 continue
486 }
487 }
488}
489
490// repoNameForMetric returns a normalized version of the given repository name that is
491// suitable for use with Prometheus metrics.
492func repoNameForMetric(repo string) string {
493 // Check to see if we want to be able to capture separate indexing metrics for this repository.
494 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
495 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
496 return repo
497 }
498
499 return ""
500}
501
502func batched(slice []uint32, size int) <-chan []uint32 {
503 c := make(chan []uint32)
504 go func() {
505 for len(slice) > 0 {
506 if size > len(slice) {
507 size = len(slice)
508 }
509 c <- slice[:size]
510 slice = slice[size:]
511 }
512 close(c)
513 }()
514 return c
515}
516
517// jitterTicker returns a ticker which ticks with a jitter. Each tick is
518// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
519//
520// sig is a list of signals which also cause the ticker to fire. This is a
521// convenience to allow manually triggering of the ticker.
522func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
523 ticker := make(chan struct{})
524
525 go func() {
526 for {
527 ticker <- struct{}{}
528 ns := int64(d)
529 jitter := rand.Int63n(ns)
530 time.Sleep(time.Duration(ns/2 + jitter))
531 }
532 }()
533
534 go func() {
535 if len(sig) == 0 {
536 return
537 }
538
539 c := make(chan os.Signal, 1)
540 signal.Notify(c, sig...)
541 for range c {
542 ticker <- struct{}{}
543 }
544 }()
545
546 return ticker
547}
548
549// Index starts an index job for repo name at commit.
550func (s *Server) Index(args *indexArgs) (state indexState, err error) {
551 tr := trace.New("index", args.Name)
552 tr.SetMaxEvents(30) // Ensure we capture all indexing events
553
554 defer func() {
555 if err != nil {
556 tr.SetError()
557 tr.LazyPrintf("error: %v", err)
558 state = indexStateFail
559 metricFailingTotal.Inc()
560 }
561 tr.LazyPrintf("state: %s", state)
562 tr.Finish()
563 }()
564
565 // Sourcegraph should always provide a tenant ID.
566 if args.TenantID < 1 {
567 return indexStateFail, tenant.ErrMissingTenant
568 }
569
570 tr.LazyPrintf("branches: %v", args.Branches)
571
572 if len(args.Branches) == 0 {
573 return indexStateEmpty, createEmptyShard(args)
574 }
575
576 repositoryName := args.Name
577 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
578 tr.LazyPrintf("marking this repository for delta build")
579 args.UseDelta = true
580 }
581
582 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
583
584 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
585 tr.LazyPrintf("skipping symbols calculation")
586 args.Symbols = false
587 }
588
589 reason := "forced"
590
591 if args.Incremental {
592 bo := args.BuildOptions()
593 bo.SetDefaults()
594 incrementalState, fn := bo.IndexState()
595 reason = string(incrementalState)
596 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
597
598 switch incrementalState {
599 case index.IndexStateEqual:
600 debugLog.Printf("%s index already up to date. Shard=%s", args.String(), fn)
601 return indexStateNoop, nil
602
603 case index.IndexStateMeta:
604 infoLog.Printf("updating index.meta %s", args.String())
605
606 // TODO(stefan) handle mergeMeta for tenant id.
607 if err := mergeMeta(bo); err != nil {
608 errorLog.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
609 } else {
610 return indexStateSuccessMeta, nil
611 }
612
613 case index.IndexStateCorrupt:
614 infoLog.Printf("falling back to full update: corrupt index: %s", args.String())
615 }
616 }
617
618 infoLog.Printf("updating index %s reason=%s", args.String(), reason)
619
620 metricIndexingTotal.Inc()
621 c := gitIndexConfig{
622 runCmd: func(cmd *exec.Cmd) error {
623 return s.loggedRun(tr, cmd)
624 },
625
626 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
627 return args.BuildOptions().FindRepositoryMetadata()
628 },
629 timeout: s.timeout,
630 }
631
632 err = gitIndex(c, args, s.Sourcegraph, s.logger)
633 if err != nil {
634 return indexStateFail, err
635 }
636
637 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
638 s.logger.Error("failed to update index status",
639 sglog.String("repo", args.Name),
640 sglog.Uint32("id", args.RepoID),
641 sglogBranches("branches", args.Branches),
642 sglog.Error(err),
643 )
644 }
645
646 return indexStateSuccess, nil
647}
648
649// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
650// it can update the zoekt_repos table.
651func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
652 // We need to read from disk for IndexTime.
653 _, metadata, ok, err := c.findRepositoryMetadata(args)
654 if err != nil {
655 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
656 }
657 if !ok {
658 return errors.New("failed to find metadata for new/updated index")
659 }
660
661 status := []indexStatus{{
662 RepoID: args.RepoID,
663 Branches: args.Branches,
664 IndexTimeUnix: metadata.IndexTime.Unix(),
665 }}
666 if err := sg.UpdateIndexStatus(status); err != nil {
667 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
668 }
669
670 return nil
671}
672
673func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
674 ss := make([]string, len(branches))
675 for i, b := range branches {
676 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
677 }
678 return sglog.Strings(key, ss)
679}
680
681func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
682 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
683 return &indexArgs{
684 IndexOptions: opts,
685 IndexDir: s.IndexDir,
686 Parallelism: parallelism,
687 Incremental: true,
688 FileLimit: MaxFileSize,
689 ShardMerging: s.shardMerging,
690 }
691}
692
693// parallelism consults both the server flags and index options to determine the number
694// of shards to index in parallel. If the CPUCount index option is provided, it always
695// overrides the server flag.
696func (s *Server) parallelism(opts IndexOptions, maxProcs int) int {
697 var parallelism int
698 if opts.ShardConcurrency > 0 {
699 parallelism = int(opts.ShardConcurrency)
700 } else {
701 parallelism = s.CPUCount
702 }
703
704 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs
705 if parallelism > 4*maxProcs {
706 parallelism = 4 * maxProcs
707 }
708
709 // If index concurrency is set, then divide the parallelism by the number of
710 // repos we're indexing in parallel
711 if s.IndexConcurrency > 1 {
712 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency)))
713 }
714
715 return parallelism
716}
717
718func createEmptyShard(args *indexArgs) error {
719 bo := args.BuildOptions()
720 bo.SetDefaults()
721 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
722
723 if args.Incremental && bo.IncrementalSkipIndexing() {
724 return nil
725 }
726
727 builder, err := index.NewBuilder(*bo)
728 if err != nil {
729 return err
730 }
731 return builder.Finish()
732}
733
734// addDebugHandlers adds handlers specific to indexserver.
735func (s *Server) addDebugHandlers(mux *http.ServeMux) {
736 // Sourcegraph's site admin view requires indexserver to serve it's admin view
737 // on "/".
738 mux.Handle("/", http.HandlerFunc(s.handleRoot))
739
740 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
741 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
742 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
743 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
744 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
745 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
746}
747
748func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
749 if r.Method != "GET" {
750 w.Header().Set("Allow", "GET")
751 w.WriteHeader(http.StatusMethodNotAllowed)
752 return
753 }
754
755 response := struct {
756 Hostname string
757 }{
758 Hostname: s.hostname,
759 }
760
761 b, err := json.Marshal(response)
762 if err != nil {
763 http.Error(w, err.Error(), http.StatusInternalServerError)
764 return
765 }
766
767 w.Header().Set("Content-Type", "application/json; charset=utf-8")
768 w.Write(b)
769}
770
771var rootTmpl = template.Must(template.New("name").Parse(`
772<html>
773 <body>
774 <a href="debug">Debug</a><br />
775 <a href="debug/requests">Traces</a><br />
776 {{.IndexMsg}}<br />
777 <br />
778 <h3>Reindex</h3>
779 {{if .Repos}}
780 <a href="?show_repos=false">hide repos</a><br />
781 <table style="margin-top: 20px">
782 <th style="text-align:left">Name</th>
783 <th style="text-align:left">ID (click to reindex)</th>
784 {{range .Repos}}
785 <tr>
786 <td>{{.Name}}</td>
787 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
788 </tr>
789 {{end}}
790 </table>
791 {{else}}
792 <a href="?show_repos=true">show repos</a><br />
793 {{end}}
794 </body>
795</html>
796`))
797
798func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
799 if r.Method != "GET" {
800 w.Header().Set("Allow", "GET")
801 w.WriteHeader(http.StatusMethodNotAllowed)
802 return
803 }
804
805 values := r.URL.Query()
806
807 // ?id=
808 indexMsg := ""
809 if v := values.Get("id"); v != "" {
810 id, err := strconv.Atoi(v)
811 if err != nil {
812 http.Error(w, err.Error(), http.StatusBadRequest)
813 return
814 }
815 indexMsg, _ = s.forceIndex(uint32(id))
816 }
817
818 // ?show_repos=
819 showRepos := false
820 if v := values.Get("show_repos"); v != "" {
821 showRepos, _ = strconv.ParseBool(v)
822 }
823
824 type Repo struct {
825 ID uint32
826 Name string
827 }
828 var data struct {
829 Repos []Repo
830 IndexMsg string
831 }
832
833 data.IndexMsg = indexMsg
834
835 if showRepos {
836 s.queue.Iterate(func(opts *IndexOptions) {
837 data.Repos = append(data.Repos, Repo{
838 ID: opts.RepoID,
839 Name: opts.Name,
840 })
841 })
842 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
843 }
844
845 _ = rootTmpl.Execute(w, data)
846}
847
848// handleReindex triggers a reindex asynocronously. If a reindex was triggered
849// the request returns with status 202. The caller can infer the new state of
850// the index by calling List.
851func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
852 if r.Method != http.MethodPost {
853 w.Header().Set("Allow", http.MethodPost)
854 w.WriteHeader(http.StatusMethodNotAllowed)
855 return
856 }
857
858 err := r.ParseForm()
859 if err != nil {
860 http.Error(w, err.Error(), http.StatusBadRequest)
861 return
862 }
863
864 id, err := strconv.Atoi(r.Form.Get("repo"))
865 if err != nil {
866 http.Error(w, err.Error(), http.StatusBadRequest)
867 return
868 }
869
870 go func() { s.forceIndex(uint32(id)) }()
871
872 // 202 Accepted
873 w.WriteHeader(http.StatusAccepted)
874}
875
876func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
877 withIndexed := true
878 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
879 withIndexed = b
880 }
881
882 var indexed []uint32
883 if withIndexed {
884 indexed = listIndexed(s.IndexDir)
885 }
886
887 repos, err := s.Sourcegraph.List(r.Context(), indexed)
888 if err != nil {
889 http.Error(w, err.Error(), http.StatusInternalServerError)
890 return
891 }
892
893 bw := bytes.Buffer{}
894
895 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
896
897 _, err = fmt.Fprintf(tw, "ID\tName\n")
898 if err != nil {
899 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
900 return
901 }
902
903 s.queue.mu.Lock()
904 name := ""
905 for _, id := range repos.IDs {
906 if item := s.queue.get(id); item != nil {
907 name = item.opts.Name
908 } else {
909 name = ""
910 }
911 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
912 if err != nil {
913 debugLog.Printf("handleDebugList: %s\n", err.Error())
914 }
915 }
916 s.queue.mu.Unlock()
917
918 if err != nil {
919 http.Error(w, err.Error(), http.StatusInternalServerError)
920 return
921 }
922
923 err = tw.Flush()
924 if err != nil {
925 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
926 return
927 }
928
929 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
930
931 if _, err := io.Copy(w, &bw); err != nil {
932 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
933 return
934 }
935}
936
937// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
938// can run this command during periods of low usage (evenings, weekends) to
939// trigger an initial merge run. In the steady-state, merges happen rarely, even
940// on busy instances, and users can rely on automatic merging instead.
941func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
942 // A merge operation can take very long, depending on the number merges and the
943 // target size of the compound shards. We run the merge in the background and
944 // return immediately to the user.
945 //
946 // We track the status of the merge with metricShardMergingRunning.
947 go func() {
948 s.doMerge()
949 }()
950 _, _ = w.Write([]byte("merging enqueued\n"))
951}
952
953func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
954 indexed := listIndexed(s.IndexDir)
955
956 bw := bytes.Buffer{}
957
958 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
959
960 _, err := fmt.Fprintf(tw, "ID\tName\n")
961 if err != nil {
962 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
963 return
964 }
965
966 s.queue.mu.Lock()
967 name := ""
968 for _, id := range indexed {
969 if item := s.queue.get(id); item != nil {
970 name = item.opts.Name
971 } else {
972 name = ""
973 }
974 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
975 if err != nil {
976 debugLog.Printf("handleDebugIndexed: %s\n", err.Error())
977 }
978 }
979 s.queue.mu.Unlock()
980
981 if err != nil {
982 http.Error(w, err.Error(), http.StatusInternalServerError)
983 return
984 }
985
986 err = tw.Flush()
987 if err != nil {
988 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
989 return
990 }
991
992 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
993
994 if _, err := io.Copy(w, &bw); err != nil {
995 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
996 return
997 }
998}
999
1000// forceIndex will run the index job for repo name now. It will return always
1001// return a string explaining what it did, even if it failed.
1002func (s *Server) forceIndex(id uint32) (string, error) {
1003 var opts IndexOptions
1004 var err error
1005 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
1006 opts = o
1007 }, func(_ uint32, e error) {
1008 err = e
1009 }, id)
1010 if err != nil {
1011 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
1012 }
1013
1014 args := s.indexArgs(opts)
1015 args.Incremental = false // force re-index
1016
1017 var state indexState
1018 ran := s.muIndexDir.With(opts.Name, func() {
1019 state, err = s.Index(args)
1020 })
1021 if !ran {
1022 return fmt.Sprintf("index job for repository already running: %s", args), nil
1023 }
1024 if err != nil {
1025 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
1026 }
1027 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
1028}
1029
1030func listIndexed(indexDir string) []uint32 {
1031 index := getShards(indexDir)
1032 metricNumIndexed.Set(float64(len(index)))
1033 repoIDs := make([]uint32, 0, len(index))
1034 for id := range index {
1035 repoIDs = append(repoIDs, id)
1036 }
1037 sort.Slice(repoIDs, func(i, j int) bool {
1038 return repoIDs[i] < repoIDs[j]
1039 })
1040 return repoIDs
1041}
1042
1043// setupTmpDir sets up a temporary directory on the same volume as the
1044// indexes.
1045//
1046// If main is true we will delete older temp directories left around. main is
1047// false when this is a debug command.
1048func setupTmpDir(logger sglog.Logger, main bool, index string) error {
1049 // change the target tmp directory depending on if it's our main daemon or a
1050 // debug sub command.
1051 dir := ".indexserver.debug.tmp"
1052 if main {
1053 dir = ".indexserver.tmp"
1054 }
1055
1056 tmpRoot := filepath.Join(index, dir)
1057
1058 if main {
1059 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot))
1060 err := os.RemoveAll(tmpRoot)
1061 if err != nil {
1062 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err))
1063 }
1064 }
1065
1066 if err := os.MkdirAll(tmpRoot, 0o755); err != nil {
1067 return err
1068 }
1069
1070 return os.Setenv("TMPDIR", tmpRoot)
1071}
1072
1073func printMetaData(fn string) error {
1074 repo, indexMeta, err := index.ReadMetadataPath(fn)
1075 if err != nil {
1076 return err
1077 }
1078
1079 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1080 if err != nil {
1081 return err
1082 }
1083
1084 err = json.NewEncoder(os.Stdout).Encode(repo)
1085 if err != nil {
1086 return err
1087 }
1088 return nil
1089}
1090
1091func printShardStats(fn string) error {
1092 f, err := os.Open(fn)
1093 if err != nil {
1094 return err
1095 }
1096
1097 iFile, err := index.NewIndexFile(f)
1098 if err != nil {
1099 return err
1100 }
1101
1102 return index.PrintNgramStats(iFile)
1103}
1104
1105func srcLogLevelIsDebug() bool {
1106 lvl := os.Getenv(sglog.EnvLogLevel)
1107 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1108}
1109
1110func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1111 v := os.Getenv(k)
1112 if v == "" {
1113 return defaultVal
1114 }
1115 b, err := strconv.ParseBool(v)
1116 if err != nil {
1117 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1118 }
1119 return b
1120}
1121
1122func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1123 v := os.Getenv(k)
1124 if v == "" {
1125 return defaultVal
1126 }
1127 i, err := strconv.ParseInt(v, 10, 64)
1128 if err != nil {
1129 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1130 }
1131 return i
1132}
1133
1134func getEnvWithDefaultInt(k string, defaultVal int) int {
1135 v := os.Getenv(k)
1136 if v == "" {
1137 return defaultVal
1138 }
1139 i, err := strconv.Atoi(k)
1140 if err != nil {
1141 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1142 }
1143 return i
1144}
1145
1146func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1147 v := os.Getenv(k)
1148 if v == "" {
1149 return defaultVal
1150 }
1151 i, err := strconv.ParseUint(v, 10, 64)
1152 if err != nil {
1153 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1154 }
1155 return i
1156}
1157
1158func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1159 v := os.Getenv(k)
1160 if v == "" {
1161 return defaultVal
1162 }
1163 f, err := strconv.ParseFloat(v, 64)
1164 if err != nil {
1165 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1166 }
1167 return f
1168}
1169
1170func getEnvWithDefaultString(k string, defaultVal string) string {
1171 v := os.Getenv(k)
1172 if v == "" {
1173 return defaultVal
1174 }
1175 return v
1176}
1177
1178func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1179 v := os.Getenv(k)
1180 if v == "" {
1181 return defaultVal
1182 }
1183
1184 d, err := time.ParseDuration(v)
1185 if err != nil {
1186 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1187 }
1188 return d
1189}
1190
1191func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1192 set := map[string]struct{}{}
1193 for _, v := range strings.Split(os.Getenv(k), ",") {
1194 v = strings.TrimSpace(v)
1195 if v != "" {
1196 set[v] = struct{}{}
1197 }
1198 }
1199 return set
1200}
1201
1202func joinStringSet(set map[string]struct{}, sep string) string {
1203 var xs []string
1204 for x := range set {
1205 xs = append(xs, x)
1206 }
1207
1208 return strings.Join(xs, sep)
1209}
1210
1211func setShardsCounter(indexDir string) {
1212 fns, err := filepath.Glob(filepath.Join(indexDir, "*.zoekt"))
1213 if err != nil {
1214 errorLog.Printf("setShardsCounter: %s\n", err)
1215 return
1216 }
1217 metricNumberShards.Set(float64(len(fns)))
1218
1219 compoundFns := make([]string, 0, len(fns))
1220 for _, fn := range fns {
1221 if strings.HasPrefix(filepath.Base(fn), "compound-") {
1222 compoundFns = append(compoundFns, fn)
1223 }
1224 }
1225 metricNumberCompoundShards.Set(float64(len(fns)))
1226}
1227
1228func rootCmd() *ffcli.Command {
1229 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1230 conf := rootConfig{
1231 Main: true,
1232 }
1233 conf.registerRootFlags(rootFs)
1234
1235 return &ffcli.Command{
1236 FlagSet: rootFs,
1237 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1238 Subcommands: []*ffcli.Command{debugCmd()},
1239 Exec: func(ctx context.Context, args []string) error {
1240 return startServer(conf)
1241 },
1242 }
1243}
1244
1245type rootConfig struct {
1246 // Main is true if this rootConfig is for our main long running command (the
1247 // indexserver). Debug commands should not set this value. This is used to
1248 // determine if we need to run tmpfriend.
1249 Main bool
1250
1251 root string
1252 interval time.Duration
1253 index string
1254 indexConcurrency int64
1255 listen string
1256 hostname string
1257 cpuFraction float64
1258
1259 // config values related to shard merging
1260 disableShardMerging bool
1261 vacuumInterval time.Duration
1262 mergeInterval time.Duration
1263 targetSize int64
1264 minSize int64
1265 minAgeDays int
1266
1267 // config values related to backoff indexing repos with one or more consecutive failures
1268 backoffDuration time.Duration
1269 maxBackoffDuration time.Duration
1270}
1271
1272func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1273 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1274 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1275 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
1276 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", index.DefaultDir), "set index directory to use")
1277 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1278 fs.StringVar(&rc.hostname, "hostname", index.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1279 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1280 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1281 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1282
1283 // flags related to shard merging
1284 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging")
1285 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1286 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1287 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB")
1288 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB")
1289 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1290}
1291
1292func startServer(conf rootConfig) error {
1293 s, err := newServer(conf)
1294 if err != nil {
1295 return err
1296 }
1297
1298 profiler.Init("zoekt-sourcegraph-indexserver")
1299 setShardsCounter(s.IndexDir)
1300
1301 if conf.listen != "" {
1302
1303 mux := http.NewServeMux()
1304 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1305 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1306 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1307 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1308 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1309 }...)
1310 s.addDebugHandlers(mux)
1311
1312 go func() {
1313 debugLog.Printf("serving HTTP on %s", conf.listen)
1314 log.Fatal(http.ListenAndServe(conf.listen, mux))
1315 }()
1316
1317 // Serve mux on a unix domain socket on a best-effort-basis so that
1318 // webserver can call the endpoints via the shared filesystem.
1319 //
1320 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1321 // on the socket due to permission errors. See
1322 // https://github.com/docker/for-mac/issues/6239
1323 go func() {
1324 serveHTTPOverSocket := func() error {
1325 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1326 // We cannot bind a socket to an existing pathname.
1327 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1328 return fmt.Errorf("error removing socket file: %s", socket)
1329 }
1330 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1331 // unixpacket).
1332 l, err := net.Listen("unix", socket)
1333 if err != nil {
1334 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1335 }
1336 // Indexserver (root) and webserver (Sourcegraph) run with
1337 // different users. Per default, the socket is created with
1338 // permission 755 (root root), which doesn't let webserver write to
1339 // it.
1340 //
1341 // See https://github.com/golang/go/issues/11822 for more context.
1342 if err := os.Chmod(socket, 0o777); err != nil {
1343 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1344 }
1345 debugLog.Printf("serving HTTP on %s", socket)
1346 return http.Serve(l, mux)
1347 }
1348 debugLog.Print(serveHTTPOverSocket())
1349 }()
1350 }
1351
1352 oc := &ownerChecker{
1353 Path: filepath.Join(conf.index, "owner.txt"),
1354 Hostname: conf.hostname,
1355 }
1356 go oc.Run()
1357
1358 logger := sglog.Scoped("metricsRegistration")
1359 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1360
1361 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1362 prometheus.DefaultRegisterer.MustRegister(c)
1363
1364 s.Run()
1365 return nil
1366}
1367
1368func newServer(conf rootConfig) (*Server, error) {
1369 logger := sglog.Scoped("server")
1370
1371 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1372 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1373 }
1374 if conf.index == "" {
1375 return nil, fmt.Errorf("must set -index")
1376 }
1377 if conf.root == "" {
1378 return nil, fmt.Errorf("must set -sourcegraph_url")
1379 }
1380 rootURL, err := url.Parse(conf.root)
1381 if err != nil {
1382 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1383 }
1384
1385 rootURL = addDefaultPort(rootURL)
1386
1387 // Tune GOMAXPROCS to match Linux container CPU quota.
1388 _, _ = maxprocs.Set()
1389
1390 // Automatically prepend our own path at the front, to minimize
1391 // required configuration.
1392 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1393 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1394 }
1395
1396 if _, err := os.Stat(conf.index); err != nil {
1397 if err := os.MkdirAll(conf.index, 0o755); err != nil {
1398 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1399 }
1400 }
1401
1402 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil {
1403 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1404 }
1405
1406 if srcLogLevelIsDebug() {
1407 debugLog.SetOutput(os.Stderr)
1408 }
1409
1410 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1411 if len(reposWithSeparateIndexingMetrics) > 0 {
1412 debugLog.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1413 }
1414
1415 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1416 if len(deltaBuildRepositoriesAllowList) > 0 {
1417 debugLog.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1418 }
1419
1420 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1421 if deltaShardNumberFallbackThreshold > 0 {
1422 debugLog.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1423 } else {
1424 debugLog.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1425 }
1426
1427 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1428 if len(reposShouldSkipSymbolsCalculation) > 0 {
1429 debugLog.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1430 }
1431
1432 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout)
1433 if indexingTimeout != defaultIndexingTimeout {
1434 debugLog.Printf("using configured indexing timeout: %s", indexingTimeout)
1435 }
1436
1437 var sg Sourcegraph
1438 if rootURL.IsAbs() {
1439 var batchSize int
1440 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1441 batchSize, err = strconv.Atoi(v)
1442 if err != nil {
1443 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1444 }
1445 }
1446
1447 opts := []SourcegraphClientOption{
1448 WithBatchSize(batchSize),
1449 }
1450
1451 logger := sglog.Scoped("zoektConfigurationGRPCClient")
1452 client, err := dialGRPCClient(rootURL.Host, logger)
1453 if err != nil {
1454 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1455 }
1456
1457 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...)
1458
1459 } else {
1460 sg = sourcegraphFake{
1461 RootDir: rootURL.String(),
1462 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1463 }
1464 }
1465
1466 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
1467 if cpuCount < 1 {
1468 cpuCount = 1
1469 }
1470
1471 if conf.indexConcurrency < 1 {
1472 conf.indexConcurrency = 1
1473 } else if conf.indexConcurrency > int64(cpuCount) {
1474 conf.indexConcurrency = int64(cpuCount)
1475 }
1476
1477 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1478
1479 return &Server{
1480 logger: logger,
1481 Sourcegraph: sg,
1482 IndexDir: conf.index,
1483 IndexConcurrency: int(conf.indexConcurrency),
1484 Interval: conf.interval,
1485 CPUCount: cpuCount,
1486 queue: *q,
1487 shardMerging: !conf.disableShardMerging,
1488 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1489 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1490 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1491 hostname: conf.hostname,
1492 mergeOpts: mergeOpts{
1493 vacuumInterval: conf.vacuumInterval,
1494 mergeInterval: conf.mergeInterval,
1495 targetSizeBytes: conf.targetSize * 1024 * 1024,
1496 minSizeBytes: conf.minSize * 1024 * 1024,
1497 minAgeDays: conf.minAgeDays,
1498 },
1499 timeout: indexingTimeout,
1500 }, err
1501}
1502
1503// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1504// for the indexed-search-configuration gRPC service.
1505//
1506// The default backoff strategy is modeled after the default settings used by
1507// retryablehttp.DefaultClient.
1508//
1509// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1510// - Unavailable
1511// - Aborted
1512//
1513//go:embed default_grpc_service_configuration.json
1514var defaultGRPCServiceConfigurationJSON string
1515
1516func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1517 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1518 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1519 return invoker(ctx, method, req, reply, cc, opts...)
1520 }
1521}
1522
1523func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1524 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1525 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1526 return streamer(ctx, desc, cc, method, opts...)
1527 }
1528}
1529
1530// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1531// This can be overridden by providing custom Server/Dial options.
1532const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1533
1534func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) {
1535 metrics := clientMetricsOnce()
1536
1537 // If the service seems to be unavailable, this
1538 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1
1539 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s])
1540 retryOpts := []retry.CallOption{
1541 retry.WithMax(5),
1542 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)),
1543 retry.WithCodes(codes.Unavailable),
1544 }
1545
1546 opts := []grpc.DialOption{
1547 grpc.WithTransportCredentials(insecure.NewCredentials()),
1548 grpc.WithChainStreamInterceptor(
1549 metrics.StreamClientInterceptor(),
1550 messagesize.StreamClientInterceptor,
1551 internalActorStreamInterceptor(),
1552 internalerrs.LoggingStreamClientInterceptor(logger),
1553 internalerrs.PrometheusStreamClientInterceptor,
1554 retry.StreamClientInterceptor(retryOpts...),
1555 ),
1556 grpc.WithChainUnaryInterceptor(
1557 metrics.UnaryClientInterceptor(),
1558 messagesize.UnaryClientInterceptor,
1559 internalActorUnaryInterceptor(),
1560 internalerrs.LoggingUnaryClientInterceptor(logger),
1561 internalerrs.PrometheusUnaryClientInterceptor,
1562 retry.UnaryClientInterceptor(retryOpts...),
1563 ),
1564 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1565 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1566 }
1567
1568 opts = append(opts, additionalOpts...)
1569
1570 // Ensure that the message size options are set last, so they override any other
1571 // client-specific options that tweak the message size.
1572 //
1573 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1574 // take precedence over everything else with a uniform size setting that's easy to reason about.
1575 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1576
1577 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1578 // This is done lazily, so we can provide the client to use regardless of
1579 // whether we enabled gRPC or not initially.
1580 cc, err := grpc.Dial(addr, opts...)
1581 if err != nil {
1582 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1583 }
1584
1585 client := proto.NewZoektConfigurationServiceClient(cc)
1586 return client, nil
1587}
1588
1589// addDefaultPort adds a default port to a URL if one is not specified.
1590//
1591// If the URL scheme is "http" and no port is specified, "80" is used.
1592// If the scheme is "https", "443" is used.
1593//
1594// The original URL is not mutated. A copy is modified and returned.
1595func addDefaultPort(original *url.URL) *url.URL {
1596 if original == nil {
1597 return nil // don't panic
1598 }
1599
1600 if !original.IsAbs() {
1601 return original // don't do anything if the URL doesn't look like a remote URL
1602 }
1603
1604 if original.Scheme == "http" && original.Port() == "" {
1605 u := cloneURL(original)
1606 u.Host = net.JoinHostPort(u.Host, "80")
1607 return u
1608 }
1609
1610 if original.Scheme == "https" && original.Port() == "" {
1611 u := cloneURL(original)
1612 u.Host = net.JoinHostPort(u.Host, "443")
1613 return u
1614 }
1615
1616 return original
1617}
1618
1619// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1620// This is copied from net/http/clone.go
1621func cloneURL(u *url.URL) *url.URL {
1622 if u == nil {
1623 return nil
1624 }
1625 u2 := new(url.URL)
1626 *u2 = *u
1627 if u.User != nil {
1628 u2.User = new(url.Userinfo)
1629 *u2.User = *u.User
1630 }
1631 return u2
1632}
1633
1634func main() {
1635 liblog := sglog.Init(sglog.Resource{
1636 Name: "zoekt-indexserver",
1637 Version: index.Version,
1638 InstanceID: index.HostnameBestEffort(),
1639 })
1640 defer liblog.Sync()
1641
1642 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1643 log.Fatal(err)
1644 }
1645}
1646
1647// getBoolFromEnvironmentVariables returns the boolean defined by the first environment
1648// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set.
1649//
1650// An error is returned of the provided environment variables fails to parse as a boolean.
1651func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) {
1652 for _, envVar := range envVarNames {
1653 v := os.Getenv(envVar)
1654 if v == "" {
1655 continue
1656 }
1657
1658 b, err := strconv.ParseBool(v)
1659 if err != nil {
1660 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err)
1661 }
1662
1663 return b, nil
1664 }
1665
1666 return defaultBool, nil
1667}