fork of https://github.com/sourcegraph/zoekt
1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled
2// repositories on sourcegraph
3package main
4
5import (
6 "bytes"
7 "context"
8 _ "embed"
9 "encoding/json"
10 "errors"
11 "flag"
12 "fmt"
13 "html/template"
14 "io"
15 "io/fs"
16 "log"
17 "math"
18 "math/rand"
19 "net"
20 "net/http"
21 "net/url"
22 "os"
23 "os/exec"
24 "os/signal"
25 "path/filepath"
26 "runtime"
27 "sort"
28 "strconv"
29 "strings"
30 "sync"
31 "text/tabwriter"
32 "time"
33
34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
36 "github.com/peterbourgon/ff/v3/ffcli"
37 "github.com/prometheus/client_golang/prometheus"
38 "github.com/prometheus/client_golang/prometheus/promauto"
39 sglog "github.com/sourcegraph/log"
40 "github.com/sourcegraph/mountinfo"
41 "go.uber.org/automaxprocs/maxprocs"
42 "golang.org/x/net/trace"
43 "golang.org/x/sys/unix"
44 "google.golang.org/grpc"
45 "google.golang.org/grpc/codes"
46 "google.golang.org/grpc/credentials/insecure"
47 "google.golang.org/grpc/metadata"
48
49 "github.com/sourcegraph/zoekt"
50 "github.com/sourcegraph/zoekt/build"
51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
52 "github.com/sourcegraph/zoekt/debugserver"
53 "github.com/sourcegraph/zoekt/grpc/internalerrs"
54 "github.com/sourcegraph/zoekt/grpc/messagesize"
55 "github.com/sourcegraph/zoekt/internal/profiler"
56 "github.com/sourcegraph/zoekt/internal/tenant"
57)
58
59var (
60 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
61 Name: "resolve_revisions_seconds",
62 Help: "A histogram of latencies for resolving all repository revisions.",
63 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
64 })
65
66 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
67 Name: "resolve_revision_seconds",
68 Help: "A histogram of latencies for resolving a repository revision.",
69 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
70 }, []string{"success"}) // success=true|false
71
72 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
73 Name: "get_index_options_total",
74 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
75 })
76 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
77 Name: "get_index_options_error_total",
78 Help: "The total number of times we failed to get index options for a repository.",
79 })
80
81 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
82 Name: "index_repo_seconds",
83 Help: "A histogram of latencies for indexing a repository.",
84 Buckets: prometheus.ExponentialBucketsRange(
85 (100 * time.Millisecond).Seconds(),
86 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
87 20),
88 }, []string{
89 "state", // state is an indexState
90 "name", // name of the repository that was indexed
91 })
92
93 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{
94 Name: "index_indexing_delay_seconds",
95 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.",
96 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days
97 }, []string{
98 "state", // state is an indexState
99 "name", // the name of the repository that was indexed
100 })
101
102 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
103 Name: "index_fetch_seconds",
104 Help: "A histogram of latencies for fetching a repository.",
105 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
106 }, []string{
107 "success", // true|false
108 "name", // the name of the repository that the commits were fetched from
109 })
110
111 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
112 Name: "index_incremental_index_state",
113 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
114 }, []string{"state"}) // state is build.IndexState
115
116 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
117 Name: "index_num_indexed",
118 Help: "Number of indexed repos by code host",
119 })
120
121 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
122 Name: "index_failing_total",
123 Help: "Counts failures to index (indexing activity, should be used with rate())",
124 })
125
126 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
127 Name: "index_indexing_total",
128 Help: "Counts indexings (indexing activity, should be used with rate())",
129 })
130
131 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
132 Name: "index_num_stopped_tracking_total",
133 Help: "Counts the number of repos we stopped tracking.",
134 })
135
136 // clientMetricsOnce returns a singleton instance of the client metrics
137 // that are shared across all gRPC clients that this process creates.
138 //
139 // This function panics if the metrics cannot be registered with the default
140 // Prometheus registry.
141 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics {
142 clientMetrics := grpcprom.NewClientMetrics(
143 grpcprom.WithClientCounterOptions(),
144 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
145 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
146 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
147 )
148 prometheus.DefaultRegisterer.MustRegister(clientMetrics)
149 return clientMetrics
150 })
151)
152
153// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
154// NOTE: if you change this, you must also update gitIndex to use the same value when fetching the repo.
155const MaxFileSize = 1 << 20
156
157// set of repositories that we want to capture separate indexing metrics for
158var reposWithSeparateIndexingMetrics = make(map[string]struct{})
159
160type indexState string
161
162const (
163 indexStateFail indexState = "fail"
164 indexStateSuccess indexState = "success"
165 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
166 indexStateNoop indexState = "noop" // We didn't need to update index
167 indexStateEmpty indexState = "empty" // index is empty (empty repo)
168)
169
170// Server is the main functionality of zoekt-sourcegraph-indexserver. It
171// exists to conveniently use all the options passed in via func main.
172type Server struct {
173 logger sglog.Logger
174
175 Sourcegraph Sourcegraph
176 BatchSize int
177
178 // IndexDir is the index directory to use.
179 IndexDir string
180
181 // IndexConcurrency is the number of repositories we index at once.
182 IndexConcurrency int
183
184 // Interval is how often we sync with Sourcegraph.
185 Interval time.Duration
186
187 // CPUCount is the number of CPUs to use for indexing shards.
188 CPUCount int
189
190 queue Queue
191
192 // muIndexDir protects the index directory from concurrent access.
193 muIndexDir indexMutex
194
195 // If true, shard merging is enabled.
196 shardMerging bool
197
198 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
199 // use delta-builds for instead of normal builds
200 deltaBuildRepositoriesAllowList map[string]struct{}
201
202 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
203 // before attempting a delta build.
204 deltaShardNumberFallbackThreshold uint64
205
206 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
207 // we skip calculating symbols metadata for during builds
208 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
209
210 hostname string
211
212 mergeOpts mergeOpts
213
214 // timeout defines how long the index server waits before killing an indexing job.
215 timeout time.Duration
216}
217
218var debug = log.New(io.Discard, "", log.LstdFlags)
219
220// our index commands should output something every 100mb they process.
221//
222// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
223// time." famous last words. A client was indexing a monorepo with 42
224// cores... 5m was not enough.
225const noOutputTimeout = 30 * time.Minute
226
227func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
228 out := &synchronizedBuffer{}
229 cmd.Stdout = out
230 cmd.Stderr = out
231
232 tr.LazyPrintf("%s", cmd.Args)
233
234 defer func() {
235 if err != nil {
236 outS := out.String()
237 tr.LazyPrintf("failed: %v", err)
238 tr.LazyPrintf("output: %s", out)
239 tr.SetError()
240 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
241 }
242 }()
243
244 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
245
246 if err := cmd.Start(); err != nil {
247 return err
248 }
249
250 errC := make(chan error)
251 go func() {
252 errC <- cmd.Wait()
253 }()
254
255 // This channel is set after we have sent sigquit. It allows us to follow up
256 // with a sigkill if the process doesn't quit after sigquit.
257 kill := make(<-chan time.Time)
258
259 lastLen := 0
260 for {
261 select {
262 case <-time.After(noOutputTimeout):
263 // Periodically check if we have had output. If not kill the process.
264 if out.Len() != lastLen {
265 lastLen = out.Len()
266 log.Printf("still running %s", cmd.Args)
267 } else {
268 // Send quit (C-\) first so we get a stack dump.
269 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
270 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
271 log.Println("quit failed:", err)
272 }
273
274 // send sigkill if still running in 10s
275 kill = time.After(10 * time.Second)
276 }
277
278 case <-kill:
279 log.Printf("still running, killing %s", cmd.Args)
280 if err := cmd.Process.Kill(); err != nil {
281 log.Println("kill failed:", err)
282 }
283
284 case err := <-errC:
285 if err != nil {
286 return err
287 }
288
289 tr.LazyPrintf("success")
290 return nil
291 }
292 }
293}
294
295// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
296// monitor the buffer while it is being written to.
297type synchronizedBuffer struct {
298 mu sync.Mutex
299 b bytes.Buffer
300}
301
302func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
303 sb.mu.Lock()
304 defer sb.mu.Unlock()
305 return sb.b.Write(p)
306}
307
308func (sb *synchronizedBuffer) Len() int {
309 sb.mu.Lock()
310 defer sb.mu.Unlock()
311 return sb.b.Len()
312}
313
314func (sb *synchronizedBuffer) String() string {
315 sb.mu.Lock()
316 defer sb.mu.Unlock()
317 return sb.b.String()
318}
319
320// pauseFileName if present in IndexDir will stop index jobs from
321// running. This is to make it possible to experiment with the content of the
322// IndexDir without the indexserver writing to it.
323const pauseFileName = "PAUSE"
324
325// Run the sync loop. This blocks forever.
326func (s *Server) Run() {
327 removeIncompleteShards(s.IndexDir)
328
329 // Start a goroutine which updates the queue with commits to index.
330 go func() {
331 // We update the list of indexed repos every Interval. To speed up manual
332 // testing we also listen for SIGUSR1 to trigger updates.
333 //
334 // "pkill -SIGUSR1 zoekt-sourcegra"
335 for range jitterTicker(s.Interval, unix.SIGUSR1) {
336 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
337 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
338 continue
339 }
340
341 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
342 if err != nil {
343 log.Printf("error listing repos: %s", err)
344 continue
345 }
346
347 debug.Printf("updating index queue with %d repositories", len(repos.IDs))
348
349 // Stop indexing repos we don't need to track anymore
350 removed := s.queue.MaybeRemoveMissing(repos.IDs)
351 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
352 if len(removed) > 0 {
353 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
354 }
355
356 cleanupDone := make(chan struct{})
357 go func() {
358 defer close(cleanupDone)
359 s.muIndexDir.Global(func() {
360 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
361 })
362 }()
363
364 repos.IterateIndexOptions(s.queue.AddOrUpdate)
365
366 // IterateIndexOptions will only iterate over repositories that have
367 // changed since we last called list. However, we want to add all IDs
368 // back onto the queue just to check that what is on disk is still
369 // correct. This will use the last IndexOptions we stored in the
370 // queue. The repositories not on the queue (missing) need a forced
371 // fetch of IndexOptions.
372 missing := s.queue.Bump(repos.IDs)
373 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
374
375 setCompoundShardCounter(s.IndexDir)
376
377 <-cleanupDone
378 }
379 }()
380
381 go func() {
382 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
383 if s.shardMerging {
384 s.vacuum()
385 }
386 }
387 }()
388
389 go func() {
390 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
391 if s.shardMerging {
392 s.doMerge()
393 }
394 }
395 }()
396
397 for i := 0; i < s.IndexConcurrency; i++ {
398 go s.processQueue()
399 }
400
401 // block forever
402 select {}
403}
404
405// formatList returns a comma-separated list of the first min(len(v), m) items.
406func formatListUint32(v []uint32, m int) string {
407 if len(v) < m {
408 m = len(v)
409 }
410
411 sb := strings.Builder{}
412 for i := 0; i < m; i++ {
413 fmt.Fprintf(&sb, "%d, ", v[i])
414 }
415
416 if len(v) > m {
417 sb.WriteString("...")
418 }
419
420 return strings.TrimRight(sb.String(), ", ")
421}
422
423func (s *Server) processQueue() {
424 for {
425 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
426 time.Sleep(time.Second)
427 continue
428 }
429
430 item, ok := s.queue.Pop()
431 if !ok {
432 time.Sleep(time.Second)
433 continue
434 }
435
436 opts := item.Opts
437 args := s.indexArgs(opts)
438
439 ran := s.muIndexDir.With(opts.Name, func() {
440 // only record time taken once we hold the lock. This avoids us
441 // recording time taken while merging/cleanup runs.
442 start := time.Now()
443
444 state, err := s.Index(args)
445
446 elapsed := time.Since(start)
447 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
448
449 indexDelay := time.Since(item.DateAddedToQueue)
450 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds())
451
452 if err != nil {
453 log.Printf("error indexing %s: %s", args.String(), err)
454 }
455
456 switch state {
457 case indexStateSuccess:
458 var branches []string
459 for _, b := range args.Branches {
460 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
461 }
462 s.logger.Info("updated index",
463 sglog.Int("tenant", args.TenantID),
464 sglog.String("repo", args.Name),
465 sglog.Uint32("id", args.RepoID),
466 sglog.Strings("branches", branches),
467 sglog.Duration("duration", elapsed),
468 sglog.Duration("index_delay", indexDelay),
469 )
470 case indexStateSuccessMeta:
471 log.Printf("updated meta %s in %v", args.String(), elapsed)
472 }
473 s.queue.SetIndexed(opts, state)
474 })
475
476 if !ran {
477 // Someone else is processing the repository. We can just skip this job
478 // since the repository will be added back to the queue and we will
479 // converge to the correct behaviour.
480 debug.Printf("index job for repository already running: %s", args)
481 continue
482 }
483 }
484}
485
486// repoNameForMetric returns a normalized version of the given repository name that is
487// suitable for use with Prometheus metrics.
488func repoNameForMetric(repo string) string {
489 // Check to see if we want to be able to capture separate indexing metrics for this repository.
490 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
491 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
492 return repo
493 }
494
495 return ""
496}
497
498func batched(slice []uint32, size int) <-chan []uint32 {
499 c := make(chan []uint32)
500 go func() {
501 for len(slice) > 0 {
502 if size > len(slice) {
503 size = len(slice)
504 }
505 c <- slice[:size]
506 slice = slice[size:]
507 }
508 close(c)
509 }()
510 return c
511}
512
513// jitterTicker returns a ticker which ticks with a jitter. Each tick is
514// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
515//
516// sig is a list of signals which also cause the ticker to fire. This is a
517// convenience to allow manually triggering of the ticker.
518func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
519 ticker := make(chan struct{})
520
521 go func() {
522 for {
523 ticker <- struct{}{}
524 ns := int64(d)
525 jitter := rand.Int63n(ns)
526 time.Sleep(time.Duration(ns/2 + jitter))
527 }
528 }()
529
530 go func() {
531 if len(sig) == 0 {
532 return
533 }
534
535 c := make(chan os.Signal, 1)
536 signal.Notify(c, sig...)
537 for range c {
538 ticker <- struct{}{}
539 }
540 }()
541
542 return ticker
543}
544
545// Index starts an index job for repo name at commit.
546func (s *Server) Index(args *indexArgs) (state indexState, err error) {
547 tr := trace.New("index", args.Name)
548 tr.SetMaxEvents(30) // Ensure we capture all indexing events
549
550 defer func() {
551 if err != nil {
552 tr.SetError()
553 tr.LazyPrintf("error: %v", err)
554 state = indexStateFail
555 metricFailingTotal.Inc()
556 }
557 tr.LazyPrintf("state: %s", state)
558 tr.Finish()
559 }()
560
561 // Sourcegraph should always provide a tenant ID.
562 if args.TenantID < 1 {
563 return indexStateFail, tenant.ErrMissingTenant
564 }
565
566 tr.LazyPrintf("branches: %v", args.Branches)
567
568 if len(args.Branches) == 0 {
569 return indexStateEmpty, createEmptyShard(args)
570 }
571
572 repositoryName := args.Name
573 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
574 tr.LazyPrintf("marking this repository for delta build")
575 args.UseDelta = true
576 }
577
578 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
579
580 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
581 tr.LazyPrintf("skipping symbols calculation")
582 args.Symbols = false
583 }
584
585 reason := "forced"
586
587 if args.Incremental {
588 bo := args.BuildOptions()
589 bo.SetDefaults()
590 incrementalState, fn := bo.IndexState()
591 reason = string(incrementalState)
592 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
593
594 switch incrementalState {
595 case build.IndexStateEqual:
596 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn)
597 return indexStateNoop, nil
598
599 case build.IndexStateMeta:
600 log.Printf("updating index.meta %s", args.String())
601
602 // TODO(stefan) handle mergeMeta for tenant id.
603 if err := mergeMeta(bo); err != nil {
604 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
605 } else {
606 return indexStateSuccessMeta, nil
607 }
608
609 case build.IndexStateCorrupt:
610 log.Printf("falling back to full update: corrupt index: %s", args.String())
611 }
612 }
613
614 log.Printf("updating index %s reason=%s", args.String(), reason)
615
616 metricIndexingTotal.Inc()
617 c := gitIndexConfig{
618 runCmd: func(cmd *exec.Cmd) error {
619 return s.loggedRun(tr, cmd)
620 },
621
622 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
623 return args.BuildOptions().FindRepositoryMetadata()
624 },
625 timeout: s.timeout,
626 }
627
628 err = gitIndex(c, args, s.Sourcegraph, s.logger)
629 if err != nil {
630 return indexStateFail, err
631 }
632
633 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
634 s.logger.Error("failed to update index status",
635 sglog.String("repo", args.Name),
636 sglog.Uint32("id", args.RepoID),
637 sglogBranches("branches", args.Branches),
638 sglog.Error(err),
639 )
640 }
641
642 return indexStateSuccess, nil
643}
644
645// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
646// it can update the zoekt_repos table.
647func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
648 // We need to read from disk for IndexTime.
649 _, metadata, ok, err := c.findRepositoryMetadata(args)
650 if err != nil {
651 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
652 }
653 if !ok {
654 return errors.New("failed to find metadata for new/updated index")
655 }
656
657 status := []indexStatus{{
658 RepoID: args.RepoID,
659 Branches: args.Branches,
660 IndexTimeUnix: metadata.IndexTime.Unix(),
661 }}
662 if err := sg.UpdateIndexStatus(status); err != nil {
663 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
664 }
665
666 return nil
667}
668
669func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
670 ss := make([]string, len(branches))
671 for i, b := range branches {
672 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
673 }
674 return sglog.Strings(key, ss)
675}
676
677func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
678 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
679 return &indexArgs{
680 IndexOptions: opts,
681 IndexDir: s.IndexDir,
682 Parallelism: parallelism,
683 Incremental: true,
684 FileLimit: MaxFileSize,
685 ShardMerging: s.shardMerging,
686 }
687}
688
689// parallelism consults both the server flags and index options to determine the number
690// of shards to index in parallel. If the CPUCount index option is provided, it always
691// overrides the server flag.
692func (s *Server) parallelism(opts IndexOptions, maxProcs int) int {
693 var parallelism int
694 if opts.ShardConcurrency > 0 {
695 parallelism = int(opts.ShardConcurrency)
696 } else {
697 parallelism = s.CPUCount
698 }
699
700 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs
701 if parallelism > 4*maxProcs {
702 parallelism = 4 * maxProcs
703 }
704
705 // If index concurrency is set, then divide the parallelism by the number of
706 // repos we're indexing in parallel
707 if s.IndexConcurrency > 1 {
708 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency)))
709 }
710
711 return parallelism
712}
713
714func createEmptyShard(args *indexArgs) error {
715 bo := args.BuildOptions()
716 bo.SetDefaults()
717 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
718
719 if args.Incremental && bo.IncrementalSkipIndexing() {
720 return nil
721 }
722
723 builder, err := build.NewBuilder(*bo)
724 if err != nil {
725 return err
726 }
727 return builder.Finish()
728}
729
730// addDebugHandlers adds handlers specific to indexserver.
731func (s *Server) addDebugHandlers(mux *http.ServeMux) {
732 // Sourcegraph's site admin view requires indexserver to serve it's admin view
733 // on "/".
734 mux.Handle("/", http.HandlerFunc(s.handleRoot))
735
736 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
737 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
738 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
739 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
740 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
741 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
742}
743
744func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
745 if r.Method != "GET" {
746 w.Header().Set("Allow", "GET")
747 w.WriteHeader(http.StatusMethodNotAllowed)
748 return
749 }
750
751 response := struct {
752 Hostname string
753 }{
754 Hostname: s.hostname,
755 }
756
757 b, err := json.Marshal(response)
758 if err != nil {
759 http.Error(w, err.Error(), http.StatusInternalServerError)
760 return
761 }
762
763 w.Header().Set("Content-Type", "application/json; charset=utf-8")
764 w.Write(b)
765}
766
767var rootTmpl = template.Must(template.New("name").Parse(`
768<html>
769 <body>
770 <a href="debug">Debug</a><br />
771 <a href="debug/requests">Traces</a><br />
772 {{.IndexMsg}}<br />
773 <br />
774 <h3>Reindex</h3>
775 {{if .Repos}}
776 <a href="?show_repos=false">hide repos</a><br />
777 <table style="margin-top: 20px">
778 <th style="text-align:left">Name</th>
779 <th style="text-align:left">ID (click to reindex)</th>
780 {{range .Repos}}
781 <tr>
782 <td>{{.Name}}</td>
783 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
784 </tr>
785 {{end}}
786 </table>
787 {{else}}
788 <a href="?show_repos=true">show repos</a><br />
789 {{end}}
790 </body>
791</html>
792`))
793
794func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
795 if r.Method != "GET" {
796 w.Header().Set("Allow", "GET")
797 w.WriteHeader(http.StatusMethodNotAllowed)
798 return
799 }
800
801 values := r.URL.Query()
802
803 // ?id=
804 indexMsg := ""
805 if v := values.Get("id"); v != "" {
806 id, err := strconv.Atoi(v)
807 if err != nil {
808 http.Error(w, err.Error(), http.StatusBadRequest)
809 return
810 }
811 indexMsg, _ = s.forceIndex(uint32(id))
812 }
813
814 // ?show_repos=
815 showRepos := false
816 if v := values.Get("show_repos"); v != "" {
817 showRepos, _ = strconv.ParseBool(v)
818 }
819
820 type Repo struct {
821 ID uint32
822 Name string
823 }
824 var data struct {
825 Repos []Repo
826 IndexMsg string
827 }
828
829 data.IndexMsg = indexMsg
830
831 if showRepos {
832 s.queue.Iterate(func(opts *IndexOptions) {
833 data.Repos = append(data.Repos, Repo{
834 ID: opts.RepoID,
835 Name: opts.Name,
836 })
837 })
838 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
839 }
840
841 _ = rootTmpl.Execute(w, data)
842}
843
844// handleReindex triggers a reindex asynocronously. If a reindex was triggered
845// the request returns with status 202. The caller can infer the new state of
846// the index by calling List.
847func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
848 if r.Method != http.MethodPost {
849 w.Header().Set("Allow", http.MethodPost)
850 w.WriteHeader(http.StatusMethodNotAllowed)
851 return
852 }
853
854 err := r.ParseForm()
855 if err != nil {
856 http.Error(w, err.Error(), http.StatusBadRequest)
857 return
858 }
859
860 id, err := strconv.Atoi(r.Form.Get("repo"))
861 if err != nil {
862 http.Error(w, err.Error(), http.StatusBadRequest)
863 return
864 }
865
866 go func() { s.forceIndex(uint32(id)) }()
867
868 // 202 Accepted
869 w.WriteHeader(http.StatusAccepted)
870}
871
872func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
873 withIndexed := true
874 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
875 withIndexed = b
876 }
877
878 var indexed []uint32
879 if withIndexed {
880 indexed = listIndexed(s.IndexDir)
881 }
882
883 repos, err := s.Sourcegraph.List(r.Context(), indexed)
884 if err != nil {
885 http.Error(w, err.Error(), http.StatusInternalServerError)
886 return
887 }
888
889 bw := bytes.Buffer{}
890
891 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
892
893 _, err = fmt.Fprintf(tw, "ID\tName\n")
894 if err != nil {
895 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
896 return
897 }
898
899 s.queue.mu.Lock()
900 name := ""
901 for _, id := range repos.IDs {
902 if item := s.queue.get(id); item != nil {
903 name = item.opts.Name
904 } else {
905 name = ""
906 }
907 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
908 if err != nil {
909 debug.Printf("handleDebugList: %s\n", err.Error())
910 }
911 }
912 s.queue.mu.Unlock()
913
914 if err != nil {
915 http.Error(w, err.Error(), http.StatusInternalServerError)
916 return
917 }
918
919 err = tw.Flush()
920 if err != nil {
921 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
922 return
923 }
924
925 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
926
927 if _, err := io.Copy(w, &bw); err != nil {
928 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
929 return
930 }
931}
932
933// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
934// can run this command during periods of low usage (evenings, weekends) to
935// trigger an initial merge run. In the steady-state, merges happen rarely, even
936// on busy instances, and users can rely on automatic merging instead.
937func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
938 // A merge operation can take very long, depending on the number merges and the
939 // target size of the compound shards. We run the merge in the background and
940 // return immediately to the user.
941 //
942 // We track the status of the merge with metricShardMergingRunning.
943 go func() {
944 s.doMerge()
945 }()
946 _, _ = w.Write([]byte("merging enqueued\n"))
947}
948
949func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
950 indexed := listIndexed(s.IndexDir)
951
952 bw := bytes.Buffer{}
953
954 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
955
956 _, err := fmt.Fprintf(tw, "ID\tName\n")
957 if err != nil {
958 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
959 return
960 }
961
962 s.queue.mu.Lock()
963 name := ""
964 for _, id := range indexed {
965 if item := s.queue.get(id); item != nil {
966 name = item.opts.Name
967 } else {
968 name = ""
969 }
970 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
971 if err != nil {
972 debug.Printf("handleDebugIndexed: %s\n", err.Error())
973 }
974 }
975 s.queue.mu.Unlock()
976
977 if err != nil {
978 http.Error(w, err.Error(), http.StatusInternalServerError)
979 return
980 }
981
982 err = tw.Flush()
983 if err != nil {
984 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
985 return
986 }
987
988 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
989
990 if _, err := io.Copy(w, &bw); err != nil {
991 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
992 return
993 }
994}
995
996// forceIndex will run the index job for repo name now. It will return always
997// return a string explaining what it did, even if it failed.
998func (s *Server) forceIndex(id uint32) (string, error) {
999 var opts IndexOptions
1000 var err error
1001 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
1002 opts = o
1003 }, func(_ uint32, e error) {
1004 err = e
1005 }, id)
1006 if err != nil {
1007 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
1008 }
1009
1010 args := s.indexArgs(opts)
1011 args.Incremental = false // force re-index
1012
1013 var state indexState
1014 ran := s.muIndexDir.With(opts.Name, func() {
1015 state, err = s.Index(args)
1016 })
1017 if !ran {
1018 return fmt.Sprintf("index job for repository already running: %s", args), nil
1019 }
1020 if err != nil {
1021 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
1022 }
1023 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
1024}
1025
1026func listIndexed(indexDir string) []uint32 {
1027 index := getShards(indexDir)
1028 metricNumIndexed.Set(float64(len(index)))
1029 repoIDs := make([]uint32, 0, len(index))
1030 for id := range index {
1031 repoIDs = append(repoIDs, id)
1032 }
1033 sort.Slice(repoIDs, func(i, j int) bool {
1034 return repoIDs[i] < repoIDs[j]
1035 })
1036 return repoIDs
1037}
1038
1039// setupTmpDir sets up a temporary directory on the same volume as the
1040// indexes.
1041//
1042// If main is true we will delete older temp directories left around. main is
1043// false when this is a debug command.
1044func setupTmpDir(logger sglog.Logger, main bool, index string) error {
1045 // change the target tmp directory depending on if it's our main daemon or a
1046 // debug sub command.
1047 dir := ".indexserver.debug.tmp"
1048 if main {
1049 dir = ".indexserver.tmp"
1050 }
1051
1052 tmpRoot := filepath.Join(index, dir)
1053
1054 if main {
1055 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot))
1056 err := os.RemoveAll(tmpRoot)
1057 if err != nil {
1058 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err))
1059 }
1060 }
1061
1062 if err := os.MkdirAll(tmpRoot, 0o755); err != nil {
1063 return err
1064 }
1065
1066 return os.Setenv("TMPDIR", tmpRoot)
1067}
1068
1069func printMetaData(fn string) error {
1070 repo, indexMeta, err := zoekt.ReadMetadataPath(fn)
1071 if err != nil {
1072 return err
1073 }
1074
1075 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1076 if err != nil {
1077 return err
1078 }
1079
1080 err = json.NewEncoder(os.Stdout).Encode(repo)
1081 if err != nil {
1082 return err
1083 }
1084 return nil
1085}
1086
1087func printShardStats(fn string) error {
1088 f, err := os.Open(fn)
1089 if err != nil {
1090 return err
1091 }
1092
1093 iFile, err := zoekt.NewIndexFile(f)
1094 if err != nil {
1095 return err
1096 }
1097
1098 return zoekt.PrintNgramStats(iFile)
1099}
1100
1101func srcLogLevelIsDebug() bool {
1102 lvl := os.Getenv(sglog.EnvLogLevel)
1103 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1104}
1105
1106func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1107 v := os.Getenv(k)
1108 if v == "" {
1109 return defaultVal
1110 }
1111 b, err := strconv.ParseBool(v)
1112 if err != nil {
1113 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1114 }
1115 return b
1116}
1117
1118func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1119 v := os.Getenv(k)
1120 if v == "" {
1121 return defaultVal
1122 }
1123 i, err := strconv.ParseInt(v, 10, 64)
1124 if err != nil {
1125 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1126 }
1127 return i
1128}
1129
1130func getEnvWithDefaultInt(k string, defaultVal int) int {
1131 v := os.Getenv(k)
1132 if v == "" {
1133 return defaultVal
1134 }
1135 i, err := strconv.Atoi(k)
1136 if err != nil {
1137 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1138 }
1139 return i
1140}
1141
1142func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1143 v := os.Getenv(k)
1144 if v == "" {
1145 return defaultVal
1146 }
1147 i, err := strconv.ParseUint(v, 10, 64)
1148 if err != nil {
1149 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1150 }
1151 return i
1152}
1153
1154func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1155 v := os.Getenv(k)
1156 if v == "" {
1157 return defaultVal
1158 }
1159 f, err := strconv.ParseFloat(v, 64)
1160 if err != nil {
1161 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1162 }
1163 return f
1164}
1165
1166func getEnvWithDefaultString(k string, defaultVal string) string {
1167 v := os.Getenv(k)
1168 if v == "" {
1169 return defaultVal
1170 }
1171 return v
1172}
1173
1174func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1175 v := os.Getenv(k)
1176 if v == "" {
1177 return defaultVal
1178 }
1179
1180 d, err := time.ParseDuration(v)
1181 if err != nil {
1182 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1183 }
1184 return d
1185}
1186
1187func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1188 set := map[string]struct{}{}
1189 for _, v := range strings.Split(os.Getenv(k), ",") {
1190 v = strings.TrimSpace(v)
1191 if v != "" {
1192 set[v] = struct{}{}
1193 }
1194 }
1195 return set
1196}
1197
1198func joinStringSet(set map[string]struct{}, sep string) string {
1199 var xs []string
1200 for x := range set {
1201 xs = append(xs, x)
1202 }
1203
1204 return strings.Join(xs, sep)
1205}
1206
1207func setCompoundShardCounter(indexDir string) {
1208 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt"))
1209 if err != nil {
1210 log.Printf("setCompoundShardCounter: %s\n", err)
1211 return
1212 }
1213 metricNumberCompoundShards.Set(float64(len(fns)))
1214}
1215
1216func rootCmd() *ffcli.Command {
1217 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1218 conf := rootConfig{
1219 Main: true,
1220 }
1221 conf.registerRootFlags(rootFs)
1222
1223 return &ffcli.Command{
1224 FlagSet: rootFs,
1225 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1226 Subcommands: []*ffcli.Command{debugCmd()},
1227 Exec: func(ctx context.Context, args []string) error {
1228 return startServer(conf)
1229 },
1230 }
1231}
1232
1233type rootConfig struct {
1234 // Main is true if this rootConfig is for our main long running command (the
1235 // indexserver). Debug commands should not set this value. This is used to
1236 // determine if we need to run tmpfriend.
1237 Main bool
1238
1239 root string
1240 interval time.Duration
1241 index string
1242 indexConcurrency int64
1243 listen string
1244 hostname string
1245 cpuFraction float64
1246
1247 // config values related to shard merging
1248 disableShardMerging bool
1249 vacuumInterval time.Duration
1250 mergeInterval time.Duration
1251 targetSize int64
1252 minSize int64
1253 minAgeDays int
1254
1255 // config values related to backoff indexing repos with one or more consecutive failures
1256 backoffDuration time.Duration
1257 maxBackoffDuration time.Duration
1258}
1259
1260func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1261 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1262 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1263 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
1264 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use")
1265 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1266 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1267 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1268 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1269 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1270
1271 // flags related to shard merging
1272 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging")
1273 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1274 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1275 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB")
1276 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB")
1277 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1278}
1279
1280func startServer(conf rootConfig) error {
1281 s, err := newServer(conf)
1282 if err != nil {
1283 return err
1284 }
1285
1286 profiler.Init("zoekt-sourcegraph-indexserver")
1287 setCompoundShardCounter(s.IndexDir)
1288
1289 if conf.listen != "" {
1290
1291 mux := http.NewServeMux()
1292 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1293 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1294 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1295 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1296 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1297 }...)
1298 s.addDebugHandlers(mux)
1299
1300 go func() {
1301 debug.Printf("serving HTTP on %s", conf.listen)
1302 log.Fatal(http.ListenAndServe(conf.listen, mux))
1303 }()
1304
1305 // Serve mux on a unix domain socket on a best-effort-basis so that
1306 // webserver can call the endpoints via the shared filesystem.
1307 //
1308 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1309 // on the socket due to permission errors. See
1310 // https://github.com/docker/for-mac/issues/6239
1311 go func() {
1312 serveHTTPOverSocket := func() error {
1313 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1314 // We cannot bind a socket to an existing pathname.
1315 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1316 return fmt.Errorf("error removing socket file: %s", socket)
1317 }
1318 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1319 // unixpacket).
1320 l, err := net.Listen("unix", socket)
1321 if err != nil {
1322 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1323 }
1324 // Indexserver (root) and webserver (Sourcegraph) run with
1325 // different users. Per default, the socket is created with
1326 // permission 755 (root root), which doesn't let webserver write to
1327 // it.
1328 //
1329 // See https://github.com/golang/go/issues/11822 for more context.
1330 if err := os.Chmod(socket, 0o777); err != nil {
1331 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1332 }
1333 debug.Printf("serving HTTP on %s", socket)
1334 return http.Serve(l, mux)
1335 }
1336 debug.Print(serveHTTPOverSocket())
1337 }()
1338 }
1339
1340 oc := &ownerChecker{
1341 Path: filepath.Join(conf.index, "owner.txt"),
1342 Hostname: conf.hostname,
1343 }
1344 go oc.Run()
1345
1346 logger := sglog.Scoped("metricsRegistration")
1347 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1348
1349 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1350 prometheus.DefaultRegisterer.MustRegister(c)
1351
1352 s.Run()
1353 return nil
1354}
1355
1356func newServer(conf rootConfig) (*Server, error) {
1357 logger := sglog.Scoped("server")
1358
1359 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1360 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1361 }
1362 if conf.index == "" {
1363 return nil, fmt.Errorf("must set -index")
1364 }
1365 if conf.root == "" {
1366 return nil, fmt.Errorf("must set -sourcegraph_url")
1367 }
1368 rootURL, err := url.Parse(conf.root)
1369 if err != nil {
1370 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1371 }
1372
1373 rootURL = addDefaultPort(rootURL)
1374
1375 // Tune GOMAXPROCS to match Linux container CPU quota.
1376 _, _ = maxprocs.Set()
1377
1378 // Automatically prepend our own path at the front, to minimize
1379 // required configuration.
1380 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1381 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1382 }
1383
1384 if _, err := os.Stat(conf.index); err != nil {
1385 if err := os.MkdirAll(conf.index, 0o755); err != nil {
1386 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1387 }
1388 }
1389
1390 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil {
1391 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1392 }
1393
1394 if srcLogLevelIsDebug() {
1395 debug = log.New(os.Stderr, "", log.LstdFlags)
1396 }
1397
1398 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1399 if len(reposWithSeparateIndexingMetrics) > 0 {
1400 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1401 }
1402
1403 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1404 if len(deltaBuildRepositoriesAllowList) > 0 {
1405 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1406 }
1407
1408 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1409 if deltaShardNumberFallbackThreshold > 0 {
1410 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1411 } else {
1412 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1413 }
1414
1415 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1416 if len(reposShouldSkipSymbolsCalculation) > 0 {
1417 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1418 }
1419
1420 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout)
1421 if indexingTimeout != defaultIndexingTimeout {
1422 debug.Printf("using configured indexing timeout: %s", indexingTimeout)
1423 }
1424
1425 var sg Sourcegraph
1426 if rootURL.IsAbs() {
1427 var batchSize int
1428 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1429 batchSize, err = strconv.Atoi(v)
1430 if err != nil {
1431 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1432 }
1433 }
1434
1435 opts := []SourcegraphClientOption{
1436 WithBatchSize(batchSize),
1437 }
1438
1439 logger := sglog.Scoped("zoektConfigurationGRPCClient")
1440 client, err := dialGRPCClient(rootURL.Host, logger)
1441 if err != nil {
1442 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1443 }
1444
1445 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...)
1446
1447 } else {
1448 sg = sourcegraphFake{
1449 RootDir: rootURL.String(),
1450 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1451 }
1452 }
1453
1454 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
1455 if cpuCount < 1 {
1456 cpuCount = 1
1457 }
1458
1459 if conf.indexConcurrency < 1 {
1460 conf.indexConcurrency = 1
1461 } else if conf.indexConcurrency > int64(cpuCount) {
1462 conf.indexConcurrency = int64(cpuCount)
1463 }
1464
1465 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1466
1467 return &Server{
1468 logger: logger,
1469 Sourcegraph: sg,
1470 IndexDir: conf.index,
1471 IndexConcurrency: int(conf.indexConcurrency),
1472 Interval: conf.interval,
1473 CPUCount: cpuCount,
1474 queue: *q,
1475 shardMerging: !conf.disableShardMerging,
1476 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1477 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1478 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1479 hostname: conf.hostname,
1480 mergeOpts: mergeOpts{
1481 vacuumInterval: conf.vacuumInterval,
1482 mergeInterval: conf.mergeInterval,
1483 targetSizeBytes: conf.targetSize * 1024 * 1024,
1484 minSizeBytes: conf.minSize * 1024 * 1024,
1485 minAgeDays: conf.minAgeDays,
1486 },
1487 timeout: indexingTimeout,
1488 }, err
1489}
1490
1491// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1492// for the indexed-search-configuration gRPC service.
1493//
1494// The default backoff strategy is modeled after the default settings used by
1495// retryablehttp.DefaultClient.
1496//
1497// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1498// - Unavailable
1499// - Aborted
1500//
1501//go:embed default_grpc_service_configuration.json
1502var defaultGRPCServiceConfigurationJSON string
1503
1504func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1505 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1506 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1507 return invoker(ctx, method, req, reply, cc, opts...)
1508 }
1509}
1510
1511func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1512 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1513 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1514 return streamer(ctx, desc, cc, method, opts...)
1515 }
1516}
1517
1518// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1519// This can be overridden by providing custom Server/Dial options.
1520const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1521
1522func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) {
1523 metrics := clientMetricsOnce()
1524
1525 // If the service seems to be unavailable, this
1526 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1
1527 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s])
1528 retryOpts := []retry.CallOption{
1529 retry.WithMax(5),
1530 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)),
1531 retry.WithCodes(codes.Unavailable),
1532 }
1533
1534 opts := []grpc.DialOption{
1535 grpc.WithTransportCredentials(insecure.NewCredentials()),
1536 grpc.WithChainStreamInterceptor(
1537 metrics.StreamClientInterceptor(),
1538 messagesize.StreamClientInterceptor,
1539 internalActorStreamInterceptor(),
1540 internalerrs.LoggingStreamClientInterceptor(logger),
1541 internalerrs.PrometheusStreamClientInterceptor,
1542 retry.StreamClientInterceptor(retryOpts...),
1543 ),
1544 grpc.WithChainUnaryInterceptor(
1545 metrics.UnaryClientInterceptor(),
1546 messagesize.UnaryClientInterceptor,
1547 internalActorUnaryInterceptor(),
1548 internalerrs.LoggingUnaryClientInterceptor(logger),
1549 internalerrs.PrometheusUnaryClientInterceptor,
1550 retry.UnaryClientInterceptor(retryOpts...),
1551 ),
1552 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1553 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1554 }
1555
1556 opts = append(opts, additionalOpts...)
1557
1558 // Ensure that the message size options are set last, so they override any other
1559 // client-specific options that tweak the message size.
1560 //
1561 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1562 // take precedence over everything else with a uniform size setting that's easy to reason about.
1563 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1564
1565 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1566 // This is done lazily, so we can provide the client to use regardless of
1567 // whether we enabled gRPC or not initially.
1568 cc, err := grpc.Dial(addr, opts...)
1569 if err != nil {
1570 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1571 }
1572
1573 client := proto.NewZoektConfigurationServiceClient(cc)
1574 return client, nil
1575}
1576
1577// addDefaultPort adds a default port to a URL if one is not specified.
1578//
1579// If the URL scheme is "http" and no port is specified, "80" is used.
1580// If the scheme is "https", "443" is used.
1581//
1582// The original URL is not mutated. A copy is modified and returned.
1583func addDefaultPort(original *url.URL) *url.URL {
1584 if original == nil {
1585 return nil // don't panic
1586 }
1587
1588 if !original.IsAbs() {
1589 return original // don't do anything if the URL doesn't look like a remote URL
1590 }
1591
1592 if original.Scheme == "http" && original.Port() == "" {
1593 u := cloneURL(original)
1594 u.Host = net.JoinHostPort(u.Host, "80")
1595 return u
1596 }
1597
1598 if original.Scheme == "https" && original.Port() == "" {
1599 u := cloneURL(original)
1600 u.Host = net.JoinHostPort(u.Host, "443")
1601 return u
1602 }
1603
1604 return original
1605}
1606
1607// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1608// This is copied from net/http/clone.go
1609func cloneURL(u *url.URL) *url.URL {
1610 if u == nil {
1611 return nil
1612 }
1613 u2 := new(url.URL)
1614 *u2 = *u
1615 if u.User != nil {
1616 u2.User = new(url.Userinfo)
1617 *u2.User = *u.User
1618 }
1619 return u2
1620}
1621
1622func main() {
1623 liblog := sglog.Init(sglog.Resource{
1624 Name: "zoekt-indexserver",
1625 Version: zoekt.Version,
1626 InstanceID: zoekt.HostnameBestEffort(),
1627 })
1628 defer liblog.Sync()
1629
1630 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1631 log.Fatal(err)
1632 }
1633}
1634
1635// getBoolFromEnvironmentVariables returns the boolean defined by the first environment
1636// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set.
1637//
1638// An error is returned of the provided environment variables fails to parse as a boolean.
1639func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) {
1640 for _, envVar := range envVarNames {
1641 v := os.Getenv(envVar)
1642 if v == "" {
1643 continue
1644 }
1645
1646 b, err := strconv.ParseBool(v)
1647 if err != nil {
1648 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err)
1649 }
1650
1651 return b, nil
1652 }
1653
1654 return defaultBool, nil
1655}