fork of https://github.com/sourcegraph/zoekt
1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled
2// repositories on sourcegraph
3package main
4
5import (
6 "bytes"
7 "context"
8 _ "embed"
9 "encoding/json"
10 "errors"
11 "flag"
12 "fmt"
13 "html/template"
14 "io"
15 "io/fs"
16 "log"
17 "math"
18 "math/rand"
19 "net"
20 "net/http"
21 "net/url"
22 "os"
23 "os/exec"
24 "os/signal"
25 "path/filepath"
26 "runtime"
27 "sort"
28 "strconv"
29 "strings"
30 "sync"
31 "text/tabwriter"
32 "time"
33
34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
36 "github.com/peterbourgon/ff/v3/ffcli"
37 "github.com/prometheus/client_golang/prometheus"
38 "github.com/prometheus/client_golang/prometheus/promauto"
39 sglog "github.com/sourcegraph/log"
40 "github.com/sourcegraph/mountinfo"
41 "github.com/sourcegraph/zoekt"
42 "github.com/sourcegraph/zoekt/build"
43 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
44 "github.com/sourcegraph/zoekt/grpc/internalerrs"
45 "github.com/sourcegraph/zoekt/grpc/messagesize"
46 "github.com/sourcegraph/zoekt/internal/debugserver"
47 "github.com/sourcegraph/zoekt/internal/profiler"
48 "github.com/sourcegraph/zoekt/internal/tenant"
49 "go.uber.org/automaxprocs/maxprocs"
50 "golang.org/x/net/trace"
51 "golang.org/x/sys/unix"
52 "google.golang.org/grpc"
53 "google.golang.org/grpc/codes"
54 "google.golang.org/grpc/credentials/insecure"
55 "google.golang.org/grpc/metadata"
56)
57
58var (
59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
60 Name: "resolve_revisions_seconds",
61 Help: "A histogram of latencies for resolving all repository revisions.",
62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
63 })
64
65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
66 Name: "resolve_revision_seconds",
67 Help: "A histogram of latencies for resolving a repository revision.",
68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
69 }, []string{"success"}) // success=true|false
70
71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
72 Name: "get_index_options_total",
73 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
74 })
75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
76 Name: "get_index_options_error_total",
77 Help: "The total number of times we failed to get index options for a repository.",
78 })
79
80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
81 Name: "index_repo_seconds",
82 Help: "A histogram of latencies for indexing a repository.",
83 Buckets: prometheus.ExponentialBucketsRange(
84 (100 * time.Millisecond).Seconds(),
85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
86 20),
87 }, []string{
88 "state", // state is an indexState
89 "name", // name of the repository that was indexed
90 })
91
92 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{
93 Name: "index_indexing_delay_seconds",
94 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.",
95 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days
96 }, []string{
97 "state", // state is an indexState
98 "name", // the name of the repository that was indexed
99 })
100
101 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
102 Name: "index_fetch_seconds",
103 Help: "A histogram of latencies for fetching a repository.",
104 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
105 }, []string{
106 "success", // true|false
107 "name", // the name of the repository that the commits were fetched from
108 })
109
110 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
111 Name: "index_incremental_index_state",
112 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
113 }, []string{"state"}) // state is build.IndexState
114
115 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
116 Name: "index_num_indexed",
117 Help: "Number of indexed repos by code host",
118 })
119
120 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
121 Name: "index_failing_total",
122 Help: "Counts failures to index (indexing activity, should be used with rate())",
123 })
124
125 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
126 Name: "index_indexing_total",
127 Help: "Counts indexings (indexing activity, should be used with rate())",
128 })
129
130 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
131 Name: "index_num_stopped_tracking_total",
132 Help: "Counts the number of repos we stopped tracking.",
133 })
134
135 // clientMetricsOnce returns a singleton instance of the client metrics
136 // that are shared across all gRPC clients that this process creates.
137 //
138 // This function panics if the metrics cannot be registered with the default
139 // Prometheus registry.
140 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics {
141 clientMetrics := grpcprom.NewClientMetrics(
142 grpcprom.WithClientCounterOptions(),
143 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
144 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
145 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
146 )
147 prometheus.DefaultRegisterer.MustRegister(clientMetrics)
148 return clientMetrics
149 })
150)
151
152// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
153// NOTE: if you change this, you must also update gitIndex to use the same value when fetching the repo.
154const MaxFileSize = 1 << 20
155
156// set of repositories that we want to capture separate indexing metrics for
157var reposWithSeparateIndexingMetrics = make(map[string]struct{})
158
159type indexState string
160
161const (
162 indexStateFail indexState = "fail"
163 indexStateSuccess indexState = "success"
164 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
165 indexStateNoop indexState = "noop" // We didn't need to update index
166 indexStateEmpty indexState = "empty" // index is empty (empty repo)
167)
168
169// Server is the main functionality of zoekt-sourcegraph-indexserver. It
170// exists to conveniently use all the options passed in via func main.
171type Server struct {
172 logger sglog.Logger
173
174 Sourcegraph Sourcegraph
175 BatchSize int
176
177 // IndexDir is the index directory to use.
178 IndexDir string
179
180 // IndexConcurrency is the number of repositories we index at once.
181 IndexConcurrency int
182
183 // Interval is how often we sync with Sourcegraph.
184 Interval time.Duration
185
186 // CPUCount is the number of CPUs to use for indexing shards.
187 CPUCount int
188
189 queue Queue
190
191 // muIndexDir protects the index directory from concurrent access.
192 muIndexDir indexMutex
193
194 // If true, shard merging is enabled.
195 shardMerging bool
196
197 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
198 // use delta-builds for instead of normal builds
199 deltaBuildRepositoriesAllowList map[string]struct{}
200
201 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
202 // before attempting a delta build.
203 deltaShardNumberFallbackThreshold uint64
204
205 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
206 // we skip calculating symbols metadata for during builds
207 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
208
209 hostname string
210
211 mergeOpts mergeOpts
212
213 // timeout defines how long the index server waits before killing an indexing job.
214 timeout time.Duration
215}
216
217var (
218 debugLog = log.New(io.Discard, "[DEBUG] ", log.LstdFlags)
219 infoLog = log.New(os.Stderr, "[INFO] ", log.LstdFlags)
220 errorLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags)
221)
222
223// our index commands should output something every 100mb they process.
224//
225// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
226// time." famous last words. A client was indexing a monorepo with 42
227// cores... 5m was not enough.
228const noOutputTimeout = 30 * time.Minute
229
230func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
231 out := &synchronizedBuffer{}
232 cmd.Stdout = out
233 cmd.Stderr = out
234
235 tr.LazyPrintf("%s", cmd.Args)
236
237 defer func() {
238 if err != nil {
239 outS := out.String()
240 tr.LazyPrintf("failed: %v", err)
241 tr.LazyPrintf("output: %s", out)
242 tr.SetError()
243 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
244 }
245 }()
246
247 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
248
249 if err := cmd.Start(); err != nil {
250 return err
251 }
252
253 errC := make(chan error)
254 go func() {
255 errC <- cmd.Wait()
256 }()
257
258 // This channel is set after we have sent sigquit. It allows us to follow up
259 // with a sigkill if the process doesn't quit after sigquit.
260 kill := make(<-chan time.Time)
261
262 lastLen := 0
263 for {
264 select {
265 case <-time.After(noOutputTimeout):
266 // Periodically check if we have had output. If not kill the process.
267 if out.Len() != lastLen {
268 lastLen = out.Len()
269 infoLog.Printf("still running %s", cmd.Args)
270 } else {
271 // Send quit (C-\) first so we get a stack dump.
272 infoLog.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
273 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
274 errorLog.Println("quit failed:", err)
275 }
276
277 // send sigkill if still running in 10s
278 kill = time.After(10 * time.Second)
279 }
280
281 case <-kill:
282 infoLog.Printf("still running, killing %s", cmd.Args)
283 if err := cmd.Process.Kill(); err != nil {
284 errorLog.Println("kill failed:", err)
285 }
286
287 case err := <-errC:
288 if err != nil {
289 return err
290 }
291
292 tr.LazyPrintf("success")
293 return nil
294 }
295 }
296}
297
298// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
299// monitor the buffer while it is being written to.
300type synchronizedBuffer struct {
301 mu sync.Mutex
302 b bytes.Buffer
303}
304
305func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
306 sb.mu.Lock()
307 defer sb.mu.Unlock()
308 return sb.b.Write(p)
309}
310
311func (sb *synchronizedBuffer) Len() int {
312 sb.mu.Lock()
313 defer sb.mu.Unlock()
314 return sb.b.Len()
315}
316
317func (sb *synchronizedBuffer) String() string {
318 sb.mu.Lock()
319 defer sb.mu.Unlock()
320 return sb.b.String()
321}
322
323// pauseFileName if present in IndexDir will stop index jobs from
324// running. This is to make it possible to experiment with the content of the
325// IndexDir without the indexserver writing to it.
326const pauseFileName = "PAUSE"
327
328// Run the sync loop. This blocks forever.
329func (s *Server) Run() {
330 removeIncompleteShards(s.IndexDir)
331
332 // Start a goroutine which updates the queue with commits to index.
333 go func() {
334 // We update the list of indexed repos every Interval. To speed up manual
335 // testing we also listen for SIGUSR1 to trigger updates.
336 //
337 // "pkill -SIGUSR1 zoekt-sourcegra"
338 for range jitterTicker(s.Interval, unix.SIGUSR1) {
339 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
340 infoLog.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
341 continue
342 }
343
344 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
345 if err != nil {
346 errorLog.Printf("error listing repos: %s", err)
347 continue
348 }
349
350 debugLog.Printf("updating index queue with %d repositories", len(repos.IDs))
351
352 // Stop indexing repos we don't need to track anymore
353 removed := s.queue.MaybeRemoveMissing(repos.IDs)
354 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
355 if len(removed) > 0 {
356 infoLog.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
357 }
358
359 cleanupDone := make(chan struct{})
360 go func() {
361 defer close(cleanupDone)
362 s.muIndexDir.Global(func() {
363 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
364 })
365 }()
366
367 repos.IterateIndexOptions(s.queue.AddOrUpdate)
368
369 // IterateIndexOptions will only iterate over repositories that have
370 // changed since we last called list. However, we want to add all IDs
371 // back onto the queue just to check that what is on disk is still
372 // correct. This will use the last IndexOptions we stored in the
373 // queue. The repositories not on the queue (missing) need a forced
374 // fetch of IndexOptions.
375 missing := s.queue.Bump(repos.IDs)
376 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
377
378 setShardsCounter(s.IndexDir)
379
380 <-cleanupDone
381 }
382 }()
383
384 go func() {
385 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
386 if s.shardMerging {
387 s.vacuum()
388 }
389 }
390 }()
391
392 go func() {
393 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
394 if s.shardMerging {
395 s.doMerge()
396 }
397 }
398 }()
399
400 for i := 0; i < s.IndexConcurrency; i++ {
401 go s.processQueue()
402 }
403
404 // block forever
405 select {}
406}
407
408// formatList returns a comma-separated list of the first min(len(v), m) items.
409func formatListUint32(v []uint32, m int) string {
410 if len(v) < m {
411 m = len(v)
412 }
413
414 sb := strings.Builder{}
415 for i := 0; i < m; i++ {
416 fmt.Fprintf(&sb, "%d, ", v[i])
417 }
418
419 if len(v) > m {
420 sb.WriteString("...")
421 }
422
423 return strings.TrimRight(sb.String(), ", ")
424}
425
426func (s *Server) processQueue() {
427 for {
428 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
429 time.Sleep(time.Second)
430 continue
431 }
432
433 item, ok := s.queue.Pop()
434 if !ok {
435 time.Sleep(time.Second)
436 continue
437 }
438
439 opts := item.Opts
440 args := s.indexArgs(opts)
441
442 ran := s.muIndexDir.With(opts.Name, func() {
443 // only record time taken once we hold the lock. This avoids us
444 // recording time taken while merging/cleanup runs.
445 start := time.Now()
446
447 state, err := s.Index(args)
448
449 elapsed := time.Since(start)
450 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
451
452 indexDelay := time.Since(item.DateAddedToQueue)
453 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds())
454
455 if err != nil {
456 errorLog.Printf("error indexing %s: %s", args.String(), err)
457 }
458
459 switch state {
460 case indexStateSuccess:
461 var branches []string
462 for _, b := range args.Branches {
463 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
464 }
465 s.logger.Info("updated index",
466 sglog.Int("tenant", args.TenantID),
467 sglog.String("repo", args.Name),
468 sglog.Uint32("id", args.RepoID),
469 sglog.Strings("branches", branches),
470 sglog.Duration("duration", elapsed),
471 sglog.Duration("index_delay", indexDelay),
472 )
473 case indexStateSuccessMeta:
474 infoLog.Printf("updated meta %s in %v", args.String(), elapsed)
475 }
476 s.queue.SetIndexed(opts, state)
477 })
478
479 if !ran {
480 // Someone else is processing the repository. We can just skip this job
481 // since the repository will be added back to the queue and we will
482 // converge to the correct behaviour.
483 debugLog.Printf("index job for repository already running: %s", args)
484 continue
485 }
486 }
487}
488
489// repoNameForMetric returns a normalized version of the given repository name that is
490// suitable for use with Prometheus metrics.
491func repoNameForMetric(repo string) string {
492 // Check to see if we want to be able to capture separate indexing metrics for this repository.
493 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
494 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
495 return repo
496 }
497
498 return ""
499}
500
501func batched(slice []uint32, size int) <-chan []uint32 {
502 c := make(chan []uint32)
503 go func() {
504 for len(slice) > 0 {
505 if size > len(slice) {
506 size = len(slice)
507 }
508 c <- slice[:size]
509 slice = slice[size:]
510 }
511 close(c)
512 }()
513 return c
514}
515
516// jitterTicker returns a ticker which ticks with a jitter. Each tick is
517// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
518//
519// sig is a list of signals which also cause the ticker to fire. This is a
520// convenience to allow manually triggering of the ticker.
521func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
522 ticker := make(chan struct{})
523
524 go func() {
525 for {
526 ticker <- struct{}{}
527 ns := int64(d)
528 jitter := rand.Int63n(ns)
529 time.Sleep(time.Duration(ns/2 + jitter))
530 }
531 }()
532
533 go func() {
534 if len(sig) == 0 {
535 return
536 }
537
538 c := make(chan os.Signal, 1)
539 signal.Notify(c, sig...)
540 for range c {
541 ticker <- struct{}{}
542 }
543 }()
544
545 return ticker
546}
547
548// Index starts an index job for repo name at commit.
549func (s *Server) Index(args *indexArgs) (state indexState, err error) {
550 tr := trace.New("index", args.Name)
551 tr.SetMaxEvents(30) // Ensure we capture all indexing events
552
553 defer func() {
554 if err != nil {
555 tr.SetError()
556 tr.LazyPrintf("error: %v", err)
557 state = indexStateFail
558 metricFailingTotal.Inc()
559 }
560 tr.LazyPrintf("state: %s", state)
561 tr.Finish()
562 }()
563
564 // Sourcegraph should always provide a tenant ID.
565 if args.TenantID < 1 {
566 return indexStateFail, tenant.ErrMissingTenant
567 }
568
569 tr.LazyPrintf("branches: %v", args.Branches)
570
571 if len(args.Branches) == 0 {
572 return indexStateEmpty, createEmptyShard(args)
573 }
574
575 repositoryName := args.Name
576 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
577 tr.LazyPrintf("marking this repository for delta build")
578 args.UseDelta = true
579 }
580
581 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
582
583 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
584 tr.LazyPrintf("skipping symbols calculation")
585 args.Symbols = false
586 }
587
588 reason := "forced"
589
590 if args.Incremental {
591 bo := args.BuildOptions()
592 bo.SetDefaults()
593 incrementalState, fn := bo.IndexState()
594 reason = string(incrementalState)
595 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
596
597 switch incrementalState {
598 case build.IndexStateEqual:
599 debugLog.Printf("%s index already up to date. Shard=%s", args.String(), fn)
600 return indexStateNoop, nil
601
602 case build.IndexStateMeta:
603 infoLog.Printf("updating index.meta %s", args.String())
604
605 // TODO(stefan) handle mergeMeta for tenant id.
606 if err := mergeMeta(bo); err != nil {
607 errorLog.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
608 } else {
609 return indexStateSuccessMeta, nil
610 }
611
612 case build.IndexStateCorrupt:
613 infoLog.Printf("falling back to full update: corrupt index: %s", args.String())
614 }
615 }
616
617 infoLog.Printf("updating index %s reason=%s", args.String(), reason)
618
619 metricIndexingTotal.Inc()
620 c := gitIndexConfig{
621 runCmd: func(cmd *exec.Cmd) error {
622 return s.loggedRun(tr, cmd)
623 },
624
625 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
626 return args.BuildOptions().FindRepositoryMetadata()
627 },
628 timeout: s.timeout,
629 }
630
631 err = gitIndex(c, args, s.Sourcegraph, s.logger)
632 if err != nil {
633 return indexStateFail, err
634 }
635
636 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
637 s.logger.Error("failed to update index status",
638 sglog.String("repo", args.Name),
639 sglog.Uint32("id", args.RepoID),
640 sglogBranches("branches", args.Branches),
641 sglog.Error(err),
642 )
643 }
644
645 return indexStateSuccess, nil
646}
647
648// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
649// it can update the zoekt_repos table.
650func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
651 // We need to read from disk for IndexTime.
652 _, metadata, ok, err := c.findRepositoryMetadata(args)
653 if err != nil {
654 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
655 }
656 if !ok {
657 return errors.New("failed to find metadata for new/updated index")
658 }
659
660 status := []indexStatus{{
661 RepoID: args.RepoID,
662 Branches: args.Branches,
663 IndexTimeUnix: metadata.IndexTime.Unix(),
664 }}
665 if err := sg.UpdateIndexStatus(status); err != nil {
666 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
667 }
668
669 return nil
670}
671
672func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
673 ss := make([]string, len(branches))
674 for i, b := range branches {
675 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
676 }
677 return sglog.Strings(key, ss)
678}
679
680func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
681 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
682 return &indexArgs{
683 IndexOptions: opts,
684 IndexDir: s.IndexDir,
685 Parallelism: parallelism,
686 Incremental: true,
687 FileLimit: MaxFileSize,
688 ShardMerging: s.shardMerging,
689 }
690}
691
692// parallelism consults both the server flags and index options to determine the number
693// of shards to index in parallel. If the CPUCount index option is provided, it always
694// overrides the server flag.
695func (s *Server) parallelism(opts IndexOptions, maxProcs int) int {
696 var parallelism int
697 if opts.ShardConcurrency > 0 {
698 parallelism = int(opts.ShardConcurrency)
699 } else {
700 parallelism = s.CPUCount
701 }
702
703 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs
704 if parallelism > 4*maxProcs {
705 parallelism = 4 * maxProcs
706 }
707
708 // If index concurrency is set, then divide the parallelism by the number of
709 // repos we're indexing in parallel
710 if s.IndexConcurrency > 1 {
711 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency)))
712 }
713
714 return parallelism
715}
716
717func createEmptyShard(args *indexArgs) error {
718 bo := args.BuildOptions()
719 bo.SetDefaults()
720 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
721
722 if args.Incremental && bo.IncrementalSkipIndexing() {
723 return nil
724 }
725
726 builder, err := build.NewBuilder(*bo)
727 if err != nil {
728 return err
729 }
730 return builder.Finish()
731}
732
733// addDebugHandlers adds handlers specific to indexserver.
734func (s *Server) addDebugHandlers(mux *http.ServeMux) {
735 // Sourcegraph's site admin view requires indexserver to serve it's admin view
736 // on "/".
737 mux.Handle("/", http.HandlerFunc(s.handleRoot))
738
739 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
740 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
741 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
742 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
743 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
744 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
745}
746
747func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
748 if r.Method != "GET" {
749 w.Header().Set("Allow", "GET")
750 w.WriteHeader(http.StatusMethodNotAllowed)
751 return
752 }
753
754 response := struct {
755 Hostname string
756 }{
757 Hostname: s.hostname,
758 }
759
760 b, err := json.Marshal(response)
761 if err != nil {
762 http.Error(w, err.Error(), http.StatusInternalServerError)
763 return
764 }
765
766 w.Header().Set("Content-Type", "application/json; charset=utf-8")
767 w.Write(b)
768}
769
770var rootTmpl = template.Must(template.New("name").Parse(`
771<html>
772 <body>
773 <a href="debug">Debug</a><br />
774 <a href="debug/requests">Traces</a><br />
775 {{.IndexMsg}}<br />
776 <br />
777 <h3>Reindex</h3>
778 {{if .Repos}}
779 <a href="?show_repos=false">hide repos</a><br />
780 <table style="margin-top: 20px">
781 <th style="text-align:left">Name</th>
782 <th style="text-align:left">ID (click to reindex)</th>
783 {{range .Repos}}
784 <tr>
785 <td>{{.Name}}</td>
786 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
787 </tr>
788 {{end}}
789 </table>
790 {{else}}
791 <a href="?show_repos=true">show repos</a><br />
792 {{end}}
793 </body>
794</html>
795`))
796
797func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
798 if r.Method != "GET" {
799 w.Header().Set("Allow", "GET")
800 w.WriteHeader(http.StatusMethodNotAllowed)
801 return
802 }
803
804 values := r.URL.Query()
805
806 // ?id=
807 indexMsg := ""
808 if v := values.Get("id"); v != "" {
809 id, err := strconv.Atoi(v)
810 if err != nil {
811 http.Error(w, err.Error(), http.StatusBadRequest)
812 return
813 }
814 indexMsg, _ = s.forceIndex(uint32(id))
815 }
816
817 // ?show_repos=
818 showRepos := false
819 if v := values.Get("show_repos"); v != "" {
820 showRepos, _ = strconv.ParseBool(v)
821 }
822
823 type Repo struct {
824 ID uint32
825 Name string
826 }
827 var data struct {
828 Repos []Repo
829 IndexMsg string
830 }
831
832 data.IndexMsg = indexMsg
833
834 if showRepos {
835 s.queue.Iterate(func(opts *IndexOptions) {
836 data.Repos = append(data.Repos, Repo{
837 ID: opts.RepoID,
838 Name: opts.Name,
839 })
840 })
841 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
842 }
843
844 _ = rootTmpl.Execute(w, data)
845}
846
847// handleReindex triggers a reindex asynocronously. If a reindex was triggered
848// the request returns with status 202. The caller can infer the new state of
849// the index by calling List.
850func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
851 if r.Method != http.MethodPost {
852 w.Header().Set("Allow", http.MethodPost)
853 w.WriteHeader(http.StatusMethodNotAllowed)
854 return
855 }
856
857 err := r.ParseForm()
858 if err != nil {
859 http.Error(w, err.Error(), http.StatusBadRequest)
860 return
861 }
862
863 id, err := strconv.Atoi(r.Form.Get("repo"))
864 if err != nil {
865 http.Error(w, err.Error(), http.StatusBadRequest)
866 return
867 }
868
869 go func() { s.forceIndex(uint32(id)) }()
870
871 // 202 Accepted
872 w.WriteHeader(http.StatusAccepted)
873}
874
875func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
876 withIndexed := true
877 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
878 withIndexed = b
879 }
880
881 var indexed []uint32
882 if withIndexed {
883 indexed = listIndexed(s.IndexDir)
884 }
885
886 repos, err := s.Sourcegraph.List(r.Context(), indexed)
887 if err != nil {
888 http.Error(w, err.Error(), http.StatusInternalServerError)
889 return
890 }
891
892 bw := bytes.Buffer{}
893
894 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
895
896 _, err = fmt.Fprintf(tw, "ID\tName\n")
897 if err != nil {
898 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
899 return
900 }
901
902 s.queue.mu.Lock()
903 name := ""
904 for _, id := range repos.IDs {
905 if item := s.queue.get(id); item != nil {
906 name = item.opts.Name
907 } else {
908 name = ""
909 }
910 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
911 if err != nil {
912 debugLog.Printf("handleDebugList: %s\n", err.Error())
913 }
914 }
915 s.queue.mu.Unlock()
916
917 if err != nil {
918 http.Error(w, err.Error(), http.StatusInternalServerError)
919 return
920 }
921
922 err = tw.Flush()
923 if err != nil {
924 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
925 return
926 }
927
928 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
929
930 if _, err := io.Copy(w, &bw); err != nil {
931 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
932 return
933 }
934}
935
936// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
937// can run this command during periods of low usage (evenings, weekends) to
938// trigger an initial merge run. In the steady-state, merges happen rarely, even
939// on busy instances, and users can rely on automatic merging instead.
940func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
941 // A merge operation can take very long, depending on the number merges and the
942 // target size of the compound shards. We run the merge in the background and
943 // return immediately to the user.
944 //
945 // We track the status of the merge with metricShardMergingRunning.
946 go func() {
947 s.doMerge()
948 }()
949 _, _ = w.Write([]byte("merging enqueued\n"))
950}
951
952func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
953 indexed := listIndexed(s.IndexDir)
954
955 bw := bytes.Buffer{}
956
957 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
958
959 _, err := fmt.Fprintf(tw, "ID\tName\n")
960 if err != nil {
961 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
962 return
963 }
964
965 s.queue.mu.Lock()
966 name := ""
967 for _, id := range indexed {
968 if item := s.queue.get(id); item != nil {
969 name = item.opts.Name
970 } else {
971 name = ""
972 }
973 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
974 if err != nil {
975 debugLog.Printf("handleDebugIndexed: %s\n", err.Error())
976 }
977 }
978 s.queue.mu.Unlock()
979
980 if err != nil {
981 http.Error(w, err.Error(), http.StatusInternalServerError)
982 return
983 }
984
985 err = tw.Flush()
986 if err != nil {
987 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
988 return
989 }
990
991 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
992
993 if _, err := io.Copy(w, &bw); err != nil {
994 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
995 return
996 }
997}
998
999// forceIndex will run the index job for repo name now. It will return always
1000// return a string explaining what it did, even if it failed.
1001func (s *Server) forceIndex(id uint32) (string, error) {
1002 var opts IndexOptions
1003 var err error
1004 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
1005 opts = o
1006 }, func(_ uint32, e error) {
1007 err = e
1008 }, id)
1009 if err != nil {
1010 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
1011 }
1012
1013 args := s.indexArgs(opts)
1014 args.Incremental = false // force re-index
1015
1016 var state indexState
1017 ran := s.muIndexDir.With(opts.Name, func() {
1018 state, err = s.Index(args)
1019 })
1020 if !ran {
1021 return fmt.Sprintf("index job for repository already running: %s", args), nil
1022 }
1023 if err != nil {
1024 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
1025 }
1026 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
1027}
1028
1029func listIndexed(indexDir string) []uint32 {
1030 index := getShards(indexDir)
1031 metricNumIndexed.Set(float64(len(index)))
1032 repoIDs := make([]uint32, 0, len(index))
1033 for id := range index {
1034 repoIDs = append(repoIDs, id)
1035 }
1036 sort.Slice(repoIDs, func(i, j int) bool {
1037 return repoIDs[i] < repoIDs[j]
1038 })
1039 return repoIDs
1040}
1041
1042// setupTmpDir sets up a temporary directory on the same volume as the
1043// indexes.
1044//
1045// If main is true we will delete older temp directories left around. main is
1046// false when this is a debug command.
1047func setupTmpDir(logger sglog.Logger, main bool, index string) error {
1048 // change the target tmp directory depending on if it's our main daemon or a
1049 // debug sub command.
1050 dir := ".indexserver.debug.tmp"
1051 if main {
1052 dir = ".indexserver.tmp"
1053 }
1054
1055 tmpRoot := filepath.Join(index, dir)
1056
1057 if main {
1058 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot))
1059 err := os.RemoveAll(tmpRoot)
1060 if err != nil {
1061 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err))
1062 }
1063 }
1064
1065 if err := os.MkdirAll(tmpRoot, 0o755); err != nil {
1066 return err
1067 }
1068
1069 return os.Setenv("TMPDIR", tmpRoot)
1070}
1071
1072func printMetaData(fn string) error {
1073 repo, indexMeta, err := zoekt.ReadMetadataPath(fn)
1074 if err != nil {
1075 return err
1076 }
1077
1078 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1079 if err != nil {
1080 return err
1081 }
1082
1083 err = json.NewEncoder(os.Stdout).Encode(repo)
1084 if err != nil {
1085 return err
1086 }
1087 return nil
1088}
1089
1090func printShardStats(fn string) error {
1091 f, err := os.Open(fn)
1092 if err != nil {
1093 return err
1094 }
1095
1096 iFile, err := zoekt.NewIndexFile(f)
1097 if err != nil {
1098 return err
1099 }
1100
1101 return zoekt.PrintNgramStats(iFile)
1102}
1103
1104func srcLogLevelIsDebug() bool {
1105 lvl := os.Getenv(sglog.EnvLogLevel)
1106 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1107}
1108
1109func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1110 v := os.Getenv(k)
1111 if v == "" {
1112 return defaultVal
1113 }
1114 b, err := strconv.ParseBool(v)
1115 if err != nil {
1116 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1117 }
1118 return b
1119}
1120
1121func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1122 v := os.Getenv(k)
1123 if v == "" {
1124 return defaultVal
1125 }
1126 i, err := strconv.ParseInt(v, 10, 64)
1127 if err != nil {
1128 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1129 }
1130 return i
1131}
1132
1133func getEnvWithDefaultInt(k string, defaultVal int) int {
1134 v := os.Getenv(k)
1135 if v == "" {
1136 return defaultVal
1137 }
1138 i, err := strconv.Atoi(k)
1139 if err != nil {
1140 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1141 }
1142 return i
1143}
1144
1145func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1146 v := os.Getenv(k)
1147 if v == "" {
1148 return defaultVal
1149 }
1150 i, err := strconv.ParseUint(v, 10, 64)
1151 if err != nil {
1152 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1153 }
1154 return i
1155}
1156
1157func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1158 v := os.Getenv(k)
1159 if v == "" {
1160 return defaultVal
1161 }
1162 f, err := strconv.ParseFloat(v, 64)
1163 if err != nil {
1164 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1165 }
1166 return f
1167}
1168
1169func getEnvWithDefaultString(k string, defaultVal string) string {
1170 v := os.Getenv(k)
1171 if v == "" {
1172 return defaultVal
1173 }
1174 return v
1175}
1176
1177func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1178 v := os.Getenv(k)
1179 if v == "" {
1180 return defaultVal
1181 }
1182
1183 d, err := time.ParseDuration(v)
1184 if err != nil {
1185 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1186 }
1187 return d
1188}
1189
1190func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1191 set := map[string]struct{}{}
1192 for _, v := range strings.Split(os.Getenv(k), ",") {
1193 v = strings.TrimSpace(v)
1194 if v != "" {
1195 set[v] = struct{}{}
1196 }
1197 }
1198 return set
1199}
1200
1201func joinStringSet(set map[string]struct{}, sep string) string {
1202 var xs []string
1203 for x := range set {
1204 xs = append(xs, x)
1205 }
1206
1207 return strings.Join(xs, sep)
1208}
1209
1210func setShardsCounter(indexDir string) {
1211 fns, err := filepath.Glob(filepath.Join(indexDir, "*.zoekt"))
1212 if err != nil {
1213 errorLog.Printf("setShardsCounter: %s\n", err)
1214 return
1215 }
1216 metricNumberShards.Set(float64(len(fns)))
1217
1218 compoundFns := make([]string, 0, len(fns))
1219 for _, fn := range fns {
1220 if strings.HasPrefix(filepath.Base(fn), "compound-") {
1221 compoundFns = append(compoundFns, fn)
1222 }
1223 }
1224 metricNumberCompoundShards.Set(float64(len(fns)))
1225}
1226
1227func rootCmd() *ffcli.Command {
1228 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1229 conf := rootConfig{
1230 Main: true,
1231 }
1232 conf.registerRootFlags(rootFs)
1233
1234 return &ffcli.Command{
1235 FlagSet: rootFs,
1236 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1237 Subcommands: []*ffcli.Command{debugCmd()},
1238 Exec: func(ctx context.Context, args []string) error {
1239 return startServer(conf)
1240 },
1241 }
1242}
1243
1244type rootConfig struct {
1245 // Main is true if this rootConfig is for our main long running command (the
1246 // indexserver). Debug commands should not set this value. This is used to
1247 // determine if we need to run tmpfriend.
1248 Main bool
1249
1250 root string
1251 interval time.Duration
1252 index string
1253 indexConcurrency int64
1254 listen string
1255 hostname string
1256 cpuFraction float64
1257
1258 // config values related to shard merging
1259 disableShardMerging bool
1260 vacuumInterval time.Duration
1261 mergeInterval time.Duration
1262 targetSize int64
1263 minSize int64
1264 minAgeDays int
1265
1266 // config values related to backoff indexing repos with one or more consecutive failures
1267 backoffDuration time.Duration
1268 maxBackoffDuration time.Duration
1269}
1270
1271func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1272 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1273 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1274 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
1275 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use")
1276 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1277 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1278 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1279 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1280 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1281
1282 // flags related to shard merging
1283 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging")
1284 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1285 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1286 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB")
1287 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB")
1288 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1289}
1290
1291func startServer(conf rootConfig) error {
1292 s, err := newServer(conf)
1293 if err != nil {
1294 return err
1295 }
1296
1297 profiler.Init("zoekt-sourcegraph-indexserver")
1298 setShardsCounter(s.IndexDir)
1299
1300 if conf.listen != "" {
1301
1302 mux := http.NewServeMux()
1303 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1304 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1305 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1306 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1307 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1308 }...)
1309 s.addDebugHandlers(mux)
1310
1311 go func() {
1312 debugLog.Printf("serving HTTP on %s", conf.listen)
1313 log.Fatal(http.ListenAndServe(conf.listen, mux))
1314 }()
1315
1316 // Serve mux on a unix domain socket on a best-effort-basis so that
1317 // webserver can call the endpoints via the shared filesystem.
1318 //
1319 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1320 // on the socket due to permission errors. See
1321 // https://github.com/docker/for-mac/issues/6239
1322 go func() {
1323 serveHTTPOverSocket := func() error {
1324 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1325 // We cannot bind a socket to an existing pathname.
1326 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1327 return fmt.Errorf("error removing socket file: %s", socket)
1328 }
1329 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1330 // unixpacket).
1331 l, err := net.Listen("unix", socket)
1332 if err != nil {
1333 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1334 }
1335 // Indexserver (root) and webserver (Sourcegraph) run with
1336 // different users. Per default, the socket is created with
1337 // permission 755 (root root), which doesn't let webserver write to
1338 // it.
1339 //
1340 // See https://github.com/golang/go/issues/11822 for more context.
1341 if err := os.Chmod(socket, 0o777); err != nil {
1342 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1343 }
1344 debugLog.Printf("serving HTTP on %s", socket)
1345 return http.Serve(l, mux)
1346 }
1347 debugLog.Print(serveHTTPOverSocket())
1348 }()
1349 }
1350
1351 oc := &ownerChecker{
1352 Path: filepath.Join(conf.index, "owner.txt"),
1353 Hostname: conf.hostname,
1354 }
1355 go oc.Run()
1356
1357 logger := sglog.Scoped("metricsRegistration")
1358 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1359
1360 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1361 prometheus.DefaultRegisterer.MustRegister(c)
1362
1363 s.Run()
1364 return nil
1365}
1366
1367func newServer(conf rootConfig) (*Server, error) {
1368 logger := sglog.Scoped("server")
1369
1370 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1371 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1372 }
1373 if conf.index == "" {
1374 return nil, fmt.Errorf("must set -index")
1375 }
1376 if conf.root == "" {
1377 return nil, fmt.Errorf("must set -sourcegraph_url")
1378 }
1379 rootURL, err := url.Parse(conf.root)
1380 if err != nil {
1381 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1382 }
1383
1384 rootURL = addDefaultPort(rootURL)
1385
1386 // Tune GOMAXPROCS to match Linux container CPU quota.
1387 _, _ = maxprocs.Set()
1388
1389 // Automatically prepend our own path at the front, to minimize
1390 // required configuration.
1391 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1392 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1393 }
1394
1395 if _, err := os.Stat(conf.index); err != nil {
1396 if err := os.MkdirAll(conf.index, 0o755); err != nil {
1397 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1398 }
1399 }
1400
1401 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil {
1402 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1403 }
1404
1405 if srcLogLevelIsDebug() {
1406 debugLog.SetOutput(os.Stderr)
1407 }
1408
1409 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1410 if len(reposWithSeparateIndexingMetrics) > 0 {
1411 debugLog.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1412 }
1413
1414 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1415 if len(deltaBuildRepositoriesAllowList) > 0 {
1416 debugLog.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1417 }
1418
1419 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1420 if deltaShardNumberFallbackThreshold > 0 {
1421 debugLog.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1422 } else {
1423 debugLog.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1424 }
1425
1426 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1427 if len(reposShouldSkipSymbolsCalculation) > 0 {
1428 debugLog.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1429 }
1430
1431 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout)
1432 if indexingTimeout != defaultIndexingTimeout {
1433 debugLog.Printf("using configured indexing timeout: %s", indexingTimeout)
1434 }
1435
1436 var sg Sourcegraph
1437 if rootURL.IsAbs() {
1438 var batchSize int
1439 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1440 batchSize, err = strconv.Atoi(v)
1441 if err != nil {
1442 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1443 }
1444 }
1445
1446 opts := []SourcegraphClientOption{
1447 WithBatchSize(batchSize),
1448 }
1449
1450 logger := sglog.Scoped("zoektConfigurationGRPCClient")
1451 client, err := dialGRPCClient(rootURL.Host, logger)
1452 if err != nil {
1453 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1454 }
1455
1456 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...)
1457
1458 } else {
1459 sg = sourcegraphFake{
1460 RootDir: rootURL.String(),
1461 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1462 }
1463 }
1464
1465 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
1466 if cpuCount < 1 {
1467 cpuCount = 1
1468 }
1469
1470 if conf.indexConcurrency < 1 {
1471 conf.indexConcurrency = 1
1472 } else if conf.indexConcurrency > int64(cpuCount) {
1473 conf.indexConcurrency = int64(cpuCount)
1474 }
1475
1476 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1477
1478 return &Server{
1479 logger: logger,
1480 Sourcegraph: sg,
1481 IndexDir: conf.index,
1482 IndexConcurrency: int(conf.indexConcurrency),
1483 Interval: conf.interval,
1484 CPUCount: cpuCount,
1485 queue: *q,
1486 shardMerging: !conf.disableShardMerging,
1487 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1488 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1489 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1490 hostname: conf.hostname,
1491 mergeOpts: mergeOpts{
1492 vacuumInterval: conf.vacuumInterval,
1493 mergeInterval: conf.mergeInterval,
1494 targetSizeBytes: conf.targetSize * 1024 * 1024,
1495 minSizeBytes: conf.minSize * 1024 * 1024,
1496 minAgeDays: conf.minAgeDays,
1497 },
1498 timeout: indexingTimeout,
1499 }, err
1500}
1501
1502// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1503// for the indexed-search-configuration gRPC service.
1504//
1505// The default backoff strategy is modeled after the default settings used by
1506// retryablehttp.DefaultClient.
1507//
1508// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1509// - Unavailable
1510// - Aborted
1511//
1512//go:embed default_grpc_service_configuration.json
1513var defaultGRPCServiceConfigurationJSON string
1514
1515func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1516 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1517 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1518 return invoker(ctx, method, req, reply, cc, opts...)
1519 }
1520}
1521
1522func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1523 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1524 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1525 return streamer(ctx, desc, cc, method, opts...)
1526 }
1527}
1528
1529// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1530// This can be overridden by providing custom Server/Dial options.
1531const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1532
1533func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) {
1534 metrics := clientMetricsOnce()
1535
1536 // If the service seems to be unavailable, this
1537 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1
1538 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s])
1539 retryOpts := []retry.CallOption{
1540 retry.WithMax(5),
1541 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)),
1542 retry.WithCodes(codes.Unavailable),
1543 }
1544
1545 opts := []grpc.DialOption{
1546 grpc.WithTransportCredentials(insecure.NewCredentials()),
1547 grpc.WithChainStreamInterceptor(
1548 metrics.StreamClientInterceptor(),
1549 messagesize.StreamClientInterceptor,
1550 internalActorStreamInterceptor(),
1551 internalerrs.LoggingStreamClientInterceptor(logger),
1552 internalerrs.PrometheusStreamClientInterceptor,
1553 retry.StreamClientInterceptor(retryOpts...),
1554 ),
1555 grpc.WithChainUnaryInterceptor(
1556 metrics.UnaryClientInterceptor(),
1557 messagesize.UnaryClientInterceptor,
1558 internalActorUnaryInterceptor(),
1559 internalerrs.LoggingUnaryClientInterceptor(logger),
1560 internalerrs.PrometheusUnaryClientInterceptor,
1561 retry.UnaryClientInterceptor(retryOpts...),
1562 ),
1563 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1564 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1565 }
1566
1567 opts = append(opts, additionalOpts...)
1568
1569 // Ensure that the message size options are set last, so they override any other
1570 // client-specific options that tweak the message size.
1571 //
1572 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1573 // take precedence over everything else with a uniform size setting that's easy to reason about.
1574 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1575
1576 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1577 // This is done lazily, so we can provide the client to use regardless of
1578 // whether we enabled gRPC or not initially.
1579 cc, err := grpc.Dial(addr, opts...)
1580 if err != nil {
1581 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1582 }
1583
1584 client := proto.NewZoektConfigurationServiceClient(cc)
1585 return client, nil
1586}
1587
1588// addDefaultPort adds a default port to a URL if one is not specified.
1589//
1590// If the URL scheme is "http" and no port is specified, "80" is used.
1591// If the scheme is "https", "443" is used.
1592//
1593// The original URL is not mutated. A copy is modified and returned.
1594func addDefaultPort(original *url.URL) *url.URL {
1595 if original == nil {
1596 return nil // don't panic
1597 }
1598
1599 if !original.IsAbs() {
1600 return original // don't do anything if the URL doesn't look like a remote URL
1601 }
1602
1603 if original.Scheme == "http" && original.Port() == "" {
1604 u := cloneURL(original)
1605 u.Host = net.JoinHostPort(u.Host, "80")
1606 return u
1607 }
1608
1609 if original.Scheme == "https" && original.Port() == "" {
1610 u := cloneURL(original)
1611 u.Host = net.JoinHostPort(u.Host, "443")
1612 return u
1613 }
1614
1615 return original
1616}
1617
1618// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1619// This is copied from net/http/clone.go
1620func cloneURL(u *url.URL) *url.URL {
1621 if u == nil {
1622 return nil
1623 }
1624 u2 := new(url.URL)
1625 *u2 = *u
1626 if u.User != nil {
1627 u2.User = new(url.Userinfo)
1628 *u2.User = *u.User
1629 }
1630 return u2
1631}
1632
1633func main() {
1634 liblog := sglog.Init(sglog.Resource{
1635 Name: "zoekt-indexserver",
1636 Version: zoekt.Version,
1637 InstanceID: zoekt.HostnameBestEffort(),
1638 })
1639 defer liblog.Sync()
1640
1641 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1642 log.Fatal(err)
1643 }
1644}
1645
1646// getBoolFromEnvironmentVariables returns the boolean defined by the first environment
1647// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set.
1648//
1649// An error is returned of the provided environment variables fails to parse as a boolean.
1650func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) {
1651 for _, envVar := range envVarNames {
1652 v := os.Getenv(envVar)
1653 if v == "" {
1654 continue
1655 }
1656
1657 b, err := strconv.ParseBool(v)
1658 if err != nil {
1659 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err)
1660 }
1661
1662 return b, nil
1663 }
1664
1665 return defaultBool, nil
1666}