fork of https://github.com/sourcegraph/zoekt
1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled
2// repositories on sourcegraph
3package main
4
5import (
6 "bytes"
7 "context"
8 _ "embed"
9 "encoding/json"
10 "errors"
11 "flag"
12 "fmt"
13 "html/template"
14 "io"
15 "io/fs"
16 "log"
17 "math"
18 "math/rand"
19 "net"
20 "net/http"
21 "net/url"
22 "os"
23 "os/exec"
24 "os/signal"
25 "path/filepath"
26 "runtime"
27 "sort"
28 "strconv"
29 "strings"
30 "sync"
31 "text/tabwriter"
32 "time"
33
34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
36 "github.com/peterbourgon/ff/v3/ffcli"
37 "github.com/prometheus/client_golang/prometheus"
38 "github.com/prometheus/client_golang/prometheus/promauto"
39 sglog "github.com/sourcegraph/log"
40 "github.com/sourcegraph/mountinfo"
41 "go.uber.org/automaxprocs/maxprocs"
42 "golang.org/x/net/trace"
43 "golang.org/x/sys/unix"
44 "google.golang.org/grpc"
45 "google.golang.org/grpc/codes"
46 "google.golang.org/grpc/credentials/insecure"
47 "google.golang.org/grpc/metadata"
48
49 "github.com/sourcegraph/zoekt"
50 "github.com/sourcegraph/zoekt/build"
51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
52 "github.com/sourcegraph/zoekt/debugserver"
53 "github.com/sourcegraph/zoekt/grpc/internalerrs"
54 "github.com/sourcegraph/zoekt/grpc/messagesize"
55 "github.com/sourcegraph/zoekt/internal/profiler"
56)
57
58var (
59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
60 Name: "resolve_revisions_seconds",
61 Help: "A histogram of latencies for resolving all repository revisions.",
62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
63 })
64
65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
66 Name: "resolve_revision_seconds",
67 Help: "A histogram of latencies for resolving a repository revision.",
68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
69 }, []string{"success"}) // success=true|false
70
71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
72 Name: "get_index_options_total",
73 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
74 })
75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
76 Name: "get_index_options_error_total",
77 Help: "The total number of times we failed to get index options for a repository.",
78 })
79
80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
81 Name: "index_repo_seconds",
82 Help: "A histogram of latencies for indexing a repository.",
83 Buckets: prometheus.ExponentialBucketsRange(
84 (100 * time.Millisecond).Seconds(),
85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
86 20),
87 }, []string{
88 "state", // state is an indexState
89 "name", // name of the repository that was indexed
90 })
91
92 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
93 Name: "index_fetch_seconds",
94 Help: "A histogram of latencies for fetching a repository.",
95 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
96 }, []string{
97 "success", // true|false
98 "name", // the name of the repository that the commits were fetched from
99 })
100
101 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
102 Name: "index_incremental_index_state",
103 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
104 }, []string{"state"}) // state is build.IndexState
105
106 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
107 Name: "index_num_indexed",
108 Help: "Number of indexed repos by code host",
109 })
110
111 metricNumAssigned = promauto.NewGauge(prometheus.GaugeOpts{
112 Name: "index_num_assigned",
113 Help: "Number of repos assigned to this indexer by code host",
114 })
115
116 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
117 Name: "index_failing_total",
118 Help: "Counts failures to index (indexing activity, should be used with rate())",
119 })
120
121 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
122 Name: "index_indexing_total",
123 Help: "Counts indexings (indexing activity, should be used with rate())",
124 })
125
126 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
127 Name: "index_num_stopped_tracking_total",
128 Help: "Counts the number of repos we stopped tracking.",
129 })
130
131 clientMetricsOnce sync.Once
132 clientMetrics *grpcprom.ClientMetrics
133)
134
135// set of repositories that we want to capture separate indexing metrics for
136var reposWithSeparateIndexingMetrics = make(map[string]struct{})
137
138type indexState string
139
140const (
141 indexStateFail indexState = "fail"
142 indexStateSuccess indexState = "success"
143 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
144 indexStateNoop indexState = "noop" // We didn't need to update index
145 indexStateEmpty indexState = "empty" // index is empty (empty repo)
146)
147
148// Server is the main functionality of zoekt-sourcegraph-indexserver. It
149// exists to conveniently use all the options passed in via func main.
150type Server struct {
151 logger sglog.Logger
152
153 Sourcegraph Sourcegraph
154 BatchSize int
155
156 // IndexDir is the index directory to use.
157 IndexDir string
158
159 // IndexConcurrency is the number of repositories we index at once.
160 IndexConcurrency int
161
162 // Interval is how often we sync with Sourcegraph.
163 Interval time.Duration
164
165 // CPUCount is the number of CPUs to use for indexing shards.
166 CPUCount int
167
168 queue Queue
169
170 // muIndexDir protects the index directory from concurrent access.
171 muIndexDir indexMutex
172
173 // If true, shard merging is enabled.
174 shardMerging bool
175
176 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
177 // use delta-builds for instead of normal builds
178 deltaBuildRepositoriesAllowList map[string]struct{}
179
180 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
181 // before attempting a delta build.
182 deltaShardNumberFallbackThreshold uint64
183
184 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
185 // we skip calculating symbols metadata for during builds
186 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
187
188 hostname string
189
190 mergeOpts mergeOpts
191
192 // timeout defines how long the index server waits before killing an indexing job.
193 timeout time.Duration
194}
195
196var debug = log.New(io.Discard, "", log.LstdFlags)
197
198// our index commands should output something every 100mb they process.
199//
200// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
201// time." famous last words. A client was indexing a monorepo with 42
202// cores... 5m was not enough.
203const noOutputTimeout = 30 * time.Minute
204
205func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
206 out := &synchronizedBuffer{}
207 cmd.Stdout = out
208 cmd.Stderr = out
209
210 tr.LazyPrintf("%s", cmd.Args)
211
212 defer func() {
213 if err != nil {
214 outS := out.String()
215 tr.LazyPrintf("failed: %v", err)
216 tr.LazyPrintf("output: %s", out)
217 tr.SetError()
218 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
219 }
220 }()
221
222 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
223
224 if err := cmd.Start(); err != nil {
225 return err
226 }
227
228 errC := make(chan error)
229 go func() {
230 errC <- cmd.Wait()
231 }()
232
233 // This channel is set after we have sent sigquit. It allows us to follow up
234 // with a sigkill if the process doesn't quit after sigquit.
235 kill := make(<-chan time.Time)
236
237 lastLen := 0
238 for {
239 select {
240 case <-time.After(noOutputTimeout):
241 // Periodically check if we have had output. If not kill the process.
242 if out.Len() != lastLen {
243 lastLen = out.Len()
244 log.Printf("still running %s", cmd.Args)
245 } else {
246 // Send quit (C-\) first so we get a stack dump.
247 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
248 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
249 log.Println("quit failed:", err)
250 }
251
252 // send sigkill if still running in 10s
253 kill = time.After(10 * time.Second)
254 }
255
256 case <-kill:
257 log.Printf("still running, killing %s", cmd.Args)
258 if err := cmd.Process.Kill(); err != nil {
259 log.Println("kill failed:", err)
260 }
261
262 case err := <-errC:
263 if err != nil {
264 return err
265 }
266
267 tr.LazyPrintf("success")
268 return nil
269 }
270 }
271}
272
273// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
274// monitor the buffer while it is being written to.
275type synchronizedBuffer struct {
276 mu sync.Mutex
277 b bytes.Buffer
278}
279
280func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
281 sb.mu.Lock()
282 defer sb.mu.Unlock()
283 return sb.b.Write(p)
284}
285
286func (sb *synchronizedBuffer) Len() int {
287 sb.mu.Lock()
288 defer sb.mu.Unlock()
289 return sb.b.Len()
290}
291
292func (sb *synchronizedBuffer) String() string {
293 sb.mu.Lock()
294 defer sb.mu.Unlock()
295 return sb.b.String()
296}
297
298// pauseFileName if present in IndexDir will stop index jobs from
299// running. This is to make it possible to experiment with the content of the
300// IndexDir without the indexserver writing to it.
301const pauseFileName = "PAUSE"
302
303// Run the sync loop. This blocks forever.
304func (s *Server) Run() {
305 removeIncompleteShards(s.IndexDir)
306
307 // Start a goroutine which updates the queue with commits to index.
308 go func() {
309 // We update the list of indexed repos every Interval. To speed up manual
310 // testing we also listen for SIGUSR1 to trigger updates.
311 //
312 // "pkill -SIGUSR1 zoekt-sourcegra"
313 for range jitterTicker(s.Interval, unix.SIGUSR1) {
314 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
315 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
316 continue
317 }
318
319 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
320 if err != nil {
321 log.Printf("error listing repos: %s", err)
322 continue
323 }
324
325 debug.Printf("updating index queue with %d repositories", len(repos.IDs))
326
327 // Stop indexing repos we don't need to track anymore
328 removed := s.queue.MaybeRemoveMissing(repos.IDs)
329 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
330 if len(removed) > 0 {
331 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
332 }
333
334 cleanupDone := make(chan struct{})
335 go func() {
336 defer close(cleanupDone)
337 s.muIndexDir.Global(func() {
338 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
339 })
340 }()
341
342 repos.IterateIndexOptions(s.queue.AddOrUpdate)
343
344 // IterateIndexOptions will only iterate over repositories that have
345 // changed since we last called list. However, we want to add all IDs
346 // back onto the queue just to check that what is on disk is still
347 // correct. This will use the last IndexOptions we stored in the
348 // queue. The repositories not on the queue (missing) need a forced
349 // fetch of IndexOptions.
350 missing := s.queue.Bump(repos.IDs)
351 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
352
353 setCompoundShardCounter(s.IndexDir)
354
355 <-cleanupDone
356 }
357 }()
358
359 go func() {
360 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
361 if s.shardMerging {
362 s.vacuum()
363 }
364 }
365 }()
366
367 go func() {
368 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
369 if s.shardMerging {
370 s.doMerge()
371 }
372 }
373 }()
374
375 for i := 0; i < s.IndexConcurrency; i++ {
376 go s.processQueue()
377 }
378
379 // block forever
380 select {}
381}
382
383// formatList returns a comma-separated list of the first min(len(v), m) items.
384func formatListUint32(v []uint32, m int) string {
385 if len(v) < m {
386 m = len(v)
387 }
388
389 sb := strings.Builder{}
390 for i := 0; i < m; i++ {
391 fmt.Fprintf(&sb, "%d, ", v[i])
392 }
393
394 if len(v) > m {
395 sb.WriteString("...")
396 }
397
398 return strings.TrimRight(sb.String(), ", ")
399}
400
401func (s *Server) processQueue() {
402 for {
403 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
404 time.Sleep(time.Second)
405 continue
406 }
407
408 opts, ok := s.queue.Pop()
409 if !ok {
410 time.Sleep(time.Second)
411 continue
412 }
413
414 args := s.indexArgs(opts)
415
416 ran := s.muIndexDir.With(opts.Name, func() {
417 // only record time taken once we hold the lock. This avoids us
418 // recording time taken while merging/cleanup runs.
419 start := time.Now()
420
421 state, err := s.Index(args)
422
423 elapsed := time.Since(start)
424
425 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
426
427 if err != nil {
428 log.Printf("error indexing %s: %s", args.String(), err)
429 }
430
431 switch state {
432 case indexStateSuccess:
433 var branches []string
434 for _, b := range args.Branches {
435 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
436 }
437 s.logger.Info("updated index",
438 sglog.String("repo", args.Name),
439 sglog.Uint32("id", args.RepoID),
440 sglog.Strings("branches", branches),
441 sglog.Duration("duration", elapsed),
442 )
443 case indexStateSuccessMeta:
444 log.Printf("updated meta %s in %v", args.String(), elapsed)
445 }
446 s.queue.SetIndexed(opts, state)
447 })
448
449 if !ran {
450 // Someone else is processing the repository. We can just skip this job
451 // since the repository will be added back to the queue and we will
452 // converge to the correct behaviour.
453 debug.Printf("index job for repository already running: %s", args)
454 continue
455 }
456 }
457}
458
459// repoNameForMetric returns a normalized version of the given repository name that is
460// suitable for use with Prometheus metrics.
461func repoNameForMetric(repo string) string {
462 // Check to see if we want to be able to capture separate indexing metrics for this repository.
463 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
464 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
465 return repo
466 }
467
468 return ""
469}
470
471func batched(slice []uint32, size int) <-chan []uint32 {
472 c := make(chan []uint32)
473 go func() {
474 for len(slice) > 0 {
475 if size > len(slice) {
476 size = len(slice)
477 }
478 c <- slice[:size]
479 slice = slice[size:]
480 }
481 close(c)
482 }()
483 return c
484}
485
486// jitterTicker returns a ticker which ticks with a jitter. Each tick is
487// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
488//
489// sig is a list of signals which also cause the ticker to fire. This is a
490// convenience to allow manually triggering of the ticker.
491func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
492 ticker := make(chan struct{})
493
494 go func() {
495 for {
496 ticker <- struct{}{}
497 ns := int64(d)
498 jitter := rand.Int63n(ns)
499 time.Sleep(time.Duration(ns/2 + jitter))
500 }
501 }()
502
503 go func() {
504 if len(sig) == 0 {
505 return
506 }
507
508 c := make(chan os.Signal, 1)
509 signal.Notify(c, sig...)
510 for range c {
511 ticker <- struct{}{}
512 }
513 }()
514
515 return ticker
516}
517
518// Index starts an index job for repo name at commit.
519func (s *Server) Index(args *indexArgs) (state indexState, err error) {
520 tr := trace.New("index", args.Name)
521
522 defer func() {
523 if err != nil {
524 tr.SetError()
525 tr.LazyPrintf("error: %v", err)
526 state = indexStateFail
527 metricFailingTotal.Inc()
528 }
529 tr.LazyPrintf("state: %s", state)
530 tr.Finish()
531 }()
532
533 tr.LazyPrintf("branches: %v", args.Branches)
534
535 if len(args.Branches) == 0 {
536 return indexStateEmpty, createEmptyShard(args)
537 }
538
539 repositoryName := args.Name
540 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
541 tr.LazyPrintf("marking this repository for delta build")
542 args.UseDelta = true
543 }
544
545 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
546
547 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
548 tr.LazyPrintf("skipping symbols calculation")
549 args.Symbols = false
550 }
551
552 reason := "forced"
553
554 if args.Incremental {
555 bo := args.BuildOptions()
556 bo.SetDefaults()
557 incrementalState, fn := bo.IndexState()
558 reason = string(incrementalState)
559 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
560
561 switch incrementalState {
562 case build.IndexStateEqual:
563 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn)
564 return indexStateNoop, nil
565
566 case build.IndexStateMeta:
567 log.Printf("updating index.meta %s", args.String())
568
569 if err := mergeMeta(bo); err != nil {
570 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
571 } else {
572 return indexStateSuccessMeta, nil
573 }
574
575 case build.IndexStateCorrupt:
576 log.Printf("falling back to full update: corrupt index: %s", args.String())
577 }
578 }
579
580 log.Printf("updating index %s reason=%s", args.String(), reason)
581
582 metricIndexingTotal.Inc()
583 c := gitIndexConfig{
584 runCmd: func(cmd *exec.Cmd) error {
585 return s.loggedRun(tr, cmd)
586 },
587
588 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
589 return args.BuildOptions().FindRepositoryMetadata()
590 },
591 timeout: s.timeout,
592 }
593
594 err = gitIndex(c, args, s.Sourcegraph, s.logger)
595 if err != nil {
596 return indexStateFail, err
597 }
598
599 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
600 s.logger.Error("failed to update index status",
601 sglog.String("repo", args.Name),
602 sglog.Uint32("id", args.RepoID),
603 sglogBranches("branches", args.Branches),
604 sglog.Error(err),
605 )
606 }
607
608 return indexStateSuccess, nil
609}
610
611// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
612// it can update the zoekt_repos table.
613func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
614 // We need to read from disk for IndexTime.
615 _, metadata, ok, err := c.findRepositoryMetadata(args)
616 if err != nil {
617 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
618 }
619 if !ok {
620 return errors.New("failed to find metadata for new/updated index")
621 }
622
623 status := []indexStatus{{
624 RepoID: args.RepoID,
625 Branches: args.Branches,
626 IndexTimeUnix: metadata.IndexTime.Unix(),
627 }}
628 if err := sg.UpdateIndexStatus(status); err != nil {
629 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
630 }
631
632 return nil
633}
634
635func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
636 ss := make([]string, len(branches))
637 for i, b := range branches {
638 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
639 }
640 return sglog.Strings(key, ss)
641}
642
643func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
644 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
645 return &indexArgs{
646 IndexOptions: opts,
647 IndexDir: s.IndexDir,
648 Parallelism: parallelism,
649 Incremental: true,
650
651 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
652 FileLimit: 1 << 20,
653 }
654}
655
656// parallelism consults both the server flags and index options to determine the number
657// of shards to index in parallel. If the CPUCount index option is provided, it always
658// overrides the server flag.
659func (s *Server) parallelism(opts IndexOptions, maxProcs int) int {
660 var parallelism int
661 if opts.ShardConcurrency > 0 {
662 parallelism = int(opts.ShardConcurrency)
663 } else {
664 parallelism = s.CPUCount
665 }
666
667 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs
668 if parallelism > 4*maxProcs {
669 parallelism = 4 * maxProcs
670 }
671
672 // If index concurrency is set, then divide the parallelism by the number of
673 // repos we're indexing in parallel
674 if s.IndexConcurrency > 1 {
675 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency)))
676 }
677
678 return parallelism
679}
680
681func createEmptyShard(args *indexArgs) error {
682 bo := args.BuildOptions()
683 bo.SetDefaults()
684 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
685
686 if args.Incremental && bo.IncrementalSkipIndexing() {
687 return nil
688 }
689
690 builder, err := build.NewBuilder(*bo)
691 if err != nil {
692 return err
693 }
694 return builder.Finish()
695}
696
697// addDebugHandlers adds handlers specific to indexserver.
698func (s *Server) addDebugHandlers(mux *http.ServeMux) {
699 // Sourcegraph's site admin view requires indexserver to serve it's admin view
700 // on "/".
701 mux.Handle("/", http.HandlerFunc(s.handleRoot))
702
703 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
704 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
705 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
706 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
707 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
708 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
709}
710
711func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
712 if r.Method != "GET" {
713 w.Header().Set("Allow", "GET")
714 w.WriteHeader(http.StatusMethodNotAllowed)
715 return
716 }
717
718 response := struct {
719 Hostname string
720 }{
721 Hostname: s.hostname,
722 }
723
724 b, err := json.Marshal(response)
725 if err != nil {
726 http.Error(w, err.Error(), http.StatusInternalServerError)
727 return
728 }
729
730 w.Header().Set("Content-Type", "application/json; charset=utf-8")
731 w.Write(b)
732}
733
734var rootTmpl = template.Must(template.New("name").Parse(`
735<html>
736 <body>
737 <a href="debug">Debug</a><br />
738 <a href="debug/requests">Traces</a><br />
739 {{.IndexMsg}}<br />
740 <br />
741 <h3>Reindex</h3>
742 {{if .Repos}}
743 <a href="?show_repos=false">hide repos</a><br />
744 <table style="margin-top: 20px">
745 <th style="text-align:left">Name</th>
746 <th style="text-align:left">ID</th>
747 {{range .Repos}}
748 <tr>
749 <td>{{.Name}}</td>
750 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
751 </tr>
752 {{end}}
753 </table>
754 {{else}}
755 <a href="?show_repos=true">show repos</a><br />
756 {{end}}
757 </body>
758</html>
759`))
760
761func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
762 if r.Method != "GET" {
763 w.Header().Set("Allow", "GET")
764 w.WriteHeader(http.StatusMethodNotAllowed)
765 return
766 }
767
768 values := r.URL.Query()
769
770 // ?id=
771 indexMsg := ""
772 if v := values.Get("id"); v != "" {
773 id, err := strconv.Atoi(v)
774 if err != nil {
775 http.Error(w, err.Error(), http.StatusBadRequest)
776 return
777 }
778 indexMsg, _ = s.forceIndex(uint32(id))
779 }
780
781 // ?show_repos=
782 showRepos := false
783 if v := values.Get("show_repos"); v != "" {
784 showRepos, _ = strconv.ParseBool(v)
785 }
786
787 type Repo struct {
788 ID uint32
789 Name string
790 }
791 var data struct {
792 Repos []Repo
793 IndexMsg string
794 }
795
796 data.IndexMsg = indexMsg
797
798 if showRepos {
799 s.queue.Iterate(func(opts *IndexOptions) {
800 data.Repos = append(data.Repos, Repo{
801 ID: opts.RepoID,
802 Name: opts.Name,
803 })
804 })
805 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
806 }
807
808 _ = rootTmpl.Execute(w, data)
809}
810
811// handleReindex triggers a reindex asynocronously. If a reindex was triggered
812// the request returns with status 202. The caller can infer the new state of
813// the index by calling List.
814func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
815 if r.Method != http.MethodPost {
816 w.Header().Set("Allow", http.MethodPost)
817 w.WriteHeader(http.StatusMethodNotAllowed)
818 return
819 }
820
821 err := r.ParseForm()
822 if err != nil {
823 http.Error(w, err.Error(), http.StatusBadRequest)
824 return
825 }
826
827 id, err := strconv.Atoi(r.Form.Get("repo"))
828 if err != nil {
829 http.Error(w, err.Error(), http.StatusBadRequest)
830 return
831 }
832
833 go func() { s.forceIndex(uint32(id)) }()
834
835 // 202 Accepted
836 w.WriteHeader(http.StatusAccepted)
837}
838
839func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
840 withIndexed := true
841 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
842 withIndexed = b
843 }
844
845 var indexed []uint32
846 if withIndexed {
847 indexed = listIndexed(s.IndexDir)
848 }
849
850 repos, err := s.Sourcegraph.List(r.Context(), indexed)
851 if err != nil {
852 http.Error(w, err.Error(), http.StatusInternalServerError)
853 return
854 }
855
856 bw := bytes.Buffer{}
857
858 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
859
860 _, err = fmt.Fprintf(tw, "ID\tName\n")
861 if err != nil {
862 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
863 return
864 }
865
866 s.queue.mu.Lock()
867 name := ""
868 for _, id := range repos.IDs {
869 if item := s.queue.get(id); item != nil {
870 name = item.opts.Name
871 } else {
872 name = ""
873 }
874 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
875 if err != nil {
876 debug.Printf("handleDebugList: %s\n", err.Error())
877 }
878 }
879 s.queue.mu.Unlock()
880
881 if err != nil {
882 http.Error(w, err.Error(), http.StatusInternalServerError)
883 return
884 }
885
886 err = tw.Flush()
887 if err != nil {
888 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
889 return
890 }
891
892 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
893
894 if _, err := io.Copy(w, &bw); err != nil {
895 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
896 return
897 }
898}
899
900// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
901// can run this command during periods of low usage (evenings, weekends) to
902// trigger an initial merge run. In the steady-state, merges happen rarely, even
903// on busy instances, and users can rely on automatic merging instead.
904func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
905 // A merge operation can take very long, depending on the number merges and the
906 // target size of the compound shards. We run the merge in the background and
907 // return immediately to the user.
908 //
909 // We track the status of the merge with metricShardMergingRunning.
910 go func() {
911 s.doMerge()
912 }()
913 _, _ = w.Write([]byte("merging enqueued\n"))
914}
915
916func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
917 indexed := listIndexed(s.IndexDir)
918
919 bw := bytes.Buffer{}
920
921 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
922
923 _, err := fmt.Fprintf(tw, "ID\tName\n")
924 if err != nil {
925 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
926 return
927 }
928
929 s.queue.mu.Lock()
930 name := ""
931 for _, id := range indexed {
932 if item := s.queue.get(id); item != nil {
933 name = item.opts.Name
934 } else {
935 name = ""
936 }
937 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
938 if err != nil {
939 debug.Printf("handleDebugIndexed: %s\n", err.Error())
940 }
941 }
942 s.queue.mu.Unlock()
943
944 if err != nil {
945 http.Error(w, err.Error(), http.StatusInternalServerError)
946 return
947 }
948
949 err = tw.Flush()
950 if err != nil {
951 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
952 return
953 }
954
955 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
956
957 if _, err := io.Copy(w, &bw); err != nil {
958 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
959 return
960 }
961}
962
963// forceIndex will run the index job for repo name now. It will return always
964// return a string explaining what it did, even if it failed.
965func (s *Server) forceIndex(id uint32) (string, error) {
966 var opts IndexOptions
967 var err error
968 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
969 opts = o
970 }, func(_ uint32, e error) {
971 err = e
972 }, id)
973 if err != nil {
974 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
975 }
976
977 args := s.indexArgs(opts)
978 args.Incremental = false // force re-index
979
980 var state indexState
981 ran := s.muIndexDir.With(opts.Name, func() {
982 state, err = s.Index(args)
983 })
984 if !ran {
985 return fmt.Sprintf("index job for repository already running: %s", args), nil
986 }
987 if err != nil {
988 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
989 }
990 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
991}
992
993func listIndexed(indexDir string) []uint32 {
994 index := getShards(indexDir)
995 metricNumIndexed.Set(float64(len(index)))
996 repoIDs := make([]uint32, 0, len(index))
997 for id := range index {
998 repoIDs = append(repoIDs, id)
999 }
1000 sort.Slice(repoIDs, func(i, j int) bool {
1001 return repoIDs[i] < repoIDs[j]
1002 })
1003 return repoIDs
1004}
1005
1006// setupTmpDir sets up a temporary directory on the same volume as the
1007// indexes.
1008//
1009// If main is true we will delete older temp directories left around. main is
1010// false when this is a debug command.
1011func setupTmpDir(logger sglog.Logger, main bool, index string) error {
1012 // change the target tmp directory depending on if it's our main daemon or a
1013 // debug sub command.
1014 dir := ".indexserver.debug.tmp"
1015 if main {
1016 dir = ".indexserver.tmp"
1017 }
1018
1019 tmpRoot := filepath.Join(index, dir)
1020
1021 if main {
1022 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot))
1023 err := os.RemoveAll(tmpRoot)
1024 if err != nil {
1025 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err))
1026 }
1027 }
1028
1029 if err := os.MkdirAll(tmpRoot, 0o755); err != nil {
1030 return err
1031 }
1032
1033 return os.Setenv("TMPDIR", tmpRoot)
1034}
1035
1036func printMetaData(fn string) error {
1037 repo, indexMeta, err := zoekt.ReadMetadataPath(fn)
1038 if err != nil {
1039 return err
1040 }
1041
1042 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1043 if err != nil {
1044 return err
1045 }
1046
1047 err = json.NewEncoder(os.Stdout).Encode(repo)
1048 if err != nil {
1049 return err
1050 }
1051 return nil
1052}
1053
1054func printShardStats(fn string) error {
1055 f, err := os.Open(fn)
1056 if err != nil {
1057 return err
1058 }
1059
1060 iFile, err := zoekt.NewIndexFile(f)
1061 if err != nil {
1062 return err
1063 }
1064
1065 return zoekt.PrintNgramStats(iFile)
1066}
1067
1068func srcLogLevelIsDebug() bool {
1069 lvl := os.Getenv(sglog.EnvLogLevel)
1070 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1071}
1072
1073func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1074 v := os.Getenv(k)
1075 if v == "" {
1076 return defaultVal
1077 }
1078 i, err := strconv.ParseInt(v, 10, 64)
1079 if err != nil {
1080 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1081 }
1082 return i
1083}
1084
1085func getEnvWithDefaultInt(k string, defaultVal int) int {
1086 v := os.Getenv(k)
1087 if v == "" {
1088 return defaultVal
1089 }
1090 i, err := strconv.Atoi(k)
1091 if err != nil {
1092 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1093 }
1094 return i
1095}
1096
1097func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1098 v := os.Getenv(k)
1099 if v == "" {
1100 return defaultVal
1101 }
1102 i, err := strconv.ParseUint(v, 10, 64)
1103 if err != nil {
1104 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1105 }
1106 return i
1107}
1108
1109func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1110 v := os.Getenv(k)
1111 if v == "" {
1112 return defaultVal
1113 }
1114 f, err := strconv.ParseFloat(v, 64)
1115 if err != nil {
1116 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1117 }
1118 return f
1119}
1120
1121func getEnvWithDefaultString(k string, defaultVal string) string {
1122 v := os.Getenv(k)
1123 if v == "" {
1124 return defaultVal
1125 }
1126 return v
1127}
1128
1129func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1130 v := os.Getenv(k)
1131 if v == "" {
1132 return defaultVal
1133 }
1134
1135 d, err := time.ParseDuration(v)
1136 if err != nil {
1137 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1138 }
1139 return d
1140}
1141
1142func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1143 v := os.Getenv(k)
1144 if v == "" {
1145 return defaultVal
1146 }
1147
1148 b, err := strconv.ParseBool(v)
1149 if err != nil {
1150 log.Fatalf("error parsing ENV %s to bool: %s", k, err)
1151 }
1152 return b
1153}
1154
1155func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1156 set := map[string]struct{}{}
1157 for _, v := range strings.Split(os.Getenv(k), ",") {
1158 v = strings.TrimSpace(v)
1159 if v != "" {
1160 set[v] = struct{}{}
1161 }
1162 }
1163 return set
1164}
1165
1166func joinStringSet(set map[string]struct{}, sep string) string {
1167 var xs []string
1168 for x := range set {
1169 xs = append(xs, x)
1170 }
1171
1172 return strings.Join(xs, sep)
1173}
1174
1175func setCompoundShardCounter(indexDir string) {
1176 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt"))
1177 if err != nil {
1178 log.Printf("setCompoundShardCounter: %s\n", err)
1179 return
1180 }
1181 metricNumberCompoundShards.Set(float64(len(fns)))
1182}
1183
1184func rootCmd() *ffcli.Command {
1185 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1186 conf := rootConfig{
1187 Main: true,
1188 }
1189 conf.registerRootFlags(rootFs)
1190
1191 return &ffcli.Command{
1192 FlagSet: rootFs,
1193 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1194 Subcommands: []*ffcli.Command{debugCmd()},
1195 Exec: func(ctx context.Context, args []string) error {
1196 return startServer(conf)
1197 },
1198 }
1199}
1200
1201type rootConfig struct {
1202 // Main is true if this rootConfig is for our main long running command (the
1203 // indexserver). Debug commands should not set this value. This is used to
1204 // determine if we need to run tmpfriend.
1205 Main bool
1206
1207 root string
1208 interval time.Duration
1209 index string
1210 indexConcurrency int64
1211 listen string
1212 hostname string
1213 cpuFraction float64
1214 blockProfileRate int
1215
1216 // config values related to shard merging
1217 vacuumInterval time.Duration
1218 mergeInterval time.Duration
1219 targetSize int64
1220 minSize int64
1221 minAgeDays int
1222 maxPriority float64
1223
1224 // config values related to backoff indexing repos with one or more consecutive failures
1225 backoffDuration time.Duration
1226 maxBackoffDuration time.Duration
1227
1228 // useGRPC is true if we should use the gRPC API to talk to Sourcegraph.
1229 useGRPC bool
1230}
1231
1232func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1233 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1234 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1235 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
1236 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use")
1237 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1238 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1239 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1240 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate")
1241 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1242 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1243 fs.BoolVar(&rc.useGRPC, "use_grpc", mustGetBoolFromEnvironmentVariables([]string{"GRPC_ENABLED", "SG_FEATURE_FLAG_GRPC"}, true), "use the gRPC API to talk to Sourcegraph")
1244
1245 // flags related to shard merging
1246 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1247 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1248 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB")
1249 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB")
1250 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1251 fs.Float64Var(&rc.maxPriority, "merge_max_priority", getEnvWithDefaultFloat64("SRC_MERGE_MAX_PRIORITY", 100), "the maximum priority a shard can have to be considered for merging.")
1252}
1253
1254func startServer(conf rootConfig) error {
1255 s, err := newServer(conf)
1256 if err != nil {
1257 return err
1258 }
1259
1260 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate)
1261 setCompoundShardCounter(s.IndexDir)
1262
1263 if conf.listen != "" {
1264
1265 mux := http.NewServeMux()
1266 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1267 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1268 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1269 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1270 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1271 }...)
1272 s.addDebugHandlers(mux)
1273
1274 go func() {
1275 debug.Printf("serving HTTP on %s", conf.listen)
1276 log.Fatal(http.ListenAndServe(conf.listen, mux))
1277 }()
1278
1279 // Serve mux on a unix domain socket on a best-effort-basis so that
1280 // webserver can call the endpoints via the shared filesystem.
1281 //
1282 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1283 // on the socket due to permission errors. See
1284 // https://github.com/docker/for-mac/issues/6239
1285 go func() {
1286 serveHTTPOverSocket := func() error {
1287 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1288 // We cannot bind a socket to an existing pathname.
1289 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1290 return fmt.Errorf("error removing socket file: %s", socket)
1291 }
1292 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1293 // unixpacket).
1294 l, err := net.Listen("unix", socket)
1295 if err != nil {
1296 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1297 }
1298 // Indexserver (root) and webserver (Sourcegraph) run with
1299 // different users. Per default, the socket is created with
1300 // permission 755 (root root), which doesn't let webserver write to
1301 // it.
1302 //
1303 // See https://github.com/golang/go/issues/11822 for more context.
1304 if err := os.Chmod(socket, 0o777); err != nil {
1305 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1306 }
1307 debug.Printf("serving HTTP on %s", socket)
1308 return http.Serve(l, mux)
1309 }
1310 debug.Print(serveHTTPOverSocket())
1311 }()
1312 }
1313
1314 oc := &ownerChecker{
1315 Path: filepath.Join(conf.index, "owner.txt"),
1316 Hostname: conf.hostname,
1317 }
1318 go oc.Run()
1319
1320 logger := sglog.Scoped("metricsRegistration")
1321 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1322
1323 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1324 prometheus.DefaultRegisterer.MustRegister(c)
1325
1326 s.Run()
1327 return nil
1328}
1329
1330func newServer(conf rootConfig) (*Server, error) {
1331 logger := sglog.Scoped("server")
1332
1333 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1334 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1335 }
1336 if conf.index == "" {
1337 return nil, fmt.Errorf("must set -index")
1338 }
1339 if conf.root == "" {
1340 return nil, fmt.Errorf("must set -sourcegraph_url")
1341 }
1342 rootURL, err := url.Parse(conf.root)
1343 if err != nil {
1344 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1345 }
1346
1347 rootURL = addDefaultPort(rootURL)
1348
1349 // Tune GOMAXPROCS to match Linux container CPU quota.
1350 _, _ = maxprocs.Set()
1351
1352 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler.
1353 // The block profiler is disabled by default and should be enabled with care in production
1354 runtime.SetBlockProfileRate(conf.blockProfileRate)
1355
1356 // Automatically prepend our own path at the front, to minimize
1357 // required configuration.
1358 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1359 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1360 }
1361
1362 if _, err := os.Stat(conf.index); err != nil {
1363 if err := os.MkdirAll(conf.index, 0o755); err != nil {
1364 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1365 }
1366 }
1367
1368 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil {
1369 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1370 }
1371
1372 if srcLogLevelIsDebug() {
1373 debug = log.New(os.Stderr, "", log.LstdFlags)
1374 }
1375
1376 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1377 if len(reposWithSeparateIndexingMetrics) > 0 {
1378 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1379 }
1380
1381 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1382 if len(deltaBuildRepositoriesAllowList) > 0 {
1383 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1384 }
1385
1386 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1387 if deltaShardNumberFallbackThreshold > 0 {
1388 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1389 } else {
1390 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1391 }
1392
1393 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1394 if len(reposShouldSkipSymbolsCalculation) > 0 {
1395 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1396 }
1397
1398 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout)
1399 if indexingTimeout != defaultIndexingTimeout {
1400 debug.Printf("using configured indexing timeout: %s", indexingTimeout)
1401 }
1402
1403 var sg Sourcegraph
1404 if rootURL.IsAbs() {
1405 var batchSize int
1406 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1407 batchSize, err = strconv.Atoi(v)
1408 if err != nil {
1409 return nil, fmt.Errorf("Invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1410 }
1411 }
1412
1413 opts := []SourcegraphClientOption{
1414 WithBatchSize(batchSize),
1415 WithShouldUseGRPC(conf.useGRPC),
1416 }
1417
1418 logger := sglog.Scoped("zoektConfigurationGRPCClient")
1419 client, err := dialGRPCClient(rootURL.Host, logger)
1420 if err != nil {
1421 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1422 }
1423
1424 opts = append(opts, WithGRPCClient(client))
1425 sg = newSourcegraphClient(rootURL, conf.hostname, opts...)
1426
1427 } else {
1428 sg = sourcegraphFake{
1429 RootDir: rootURL.String(),
1430 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1431 }
1432 }
1433
1434 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
1435 if cpuCount < 1 {
1436 cpuCount = 1
1437 }
1438
1439 if conf.indexConcurrency < 1 {
1440 conf.indexConcurrency = 1
1441 } else if conf.indexConcurrency > int64(cpuCount) {
1442 conf.indexConcurrency = int64(cpuCount)
1443 }
1444
1445 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1446
1447 return &Server{
1448 logger: logger,
1449 Sourcegraph: sg,
1450 IndexDir: conf.index,
1451 IndexConcurrency: int(conf.indexConcurrency),
1452 Interval: conf.interval,
1453 CPUCount: cpuCount,
1454 queue: *q,
1455 shardMerging: zoekt.ShardMergingEnabled(),
1456 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1457 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1458 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1459 hostname: conf.hostname,
1460 mergeOpts: mergeOpts{
1461 vacuumInterval: conf.vacuumInterval,
1462 mergeInterval: conf.mergeInterval,
1463 targetSizeBytes: conf.targetSize * 1024 * 1024,
1464 minSizeBytes: conf.minSize * 1024 * 1024,
1465 minAgeDays: conf.minAgeDays,
1466 maxPriority: conf.maxPriority,
1467 },
1468 timeout: indexingTimeout,
1469 }, err
1470}
1471
1472// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1473// for the indexed-search-configuration gRPC service.
1474//
1475// The default backoff strategy is modeled after the default settings used by
1476// retryablehttp.DefaultClient.
1477//
1478// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1479// - Unavailable
1480// - Aborted
1481//
1482//go:embed default_grpc_service_configuration.json
1483var defaultGRPCServiceConfigurationJSON string
1484
1485func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1486 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1487 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1488 return invoker(ctx, method, req, reply, cc, opts...)
1489 }
1490}
1491
1492func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1493 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1494 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1495 return streamer(ctx, desc, cc, method, opts...)
1496 }
1497}
1498
1499// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1500// This can be overridden by providing custom Server/Dial options.
1501const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1502
1503func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) {
1504 metrics := mustGetClientMetrics()
1505
1506 // If the service seems to be unavailable, this
1507 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1
1508 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s])
1509 retryOpts := []retry.CallOption{
1510 retry.WithMax(5),
1511 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)),
1512 retry.WithCodes(codes.Unavailable),
1513 }
1514
1515 opts := []grpc.DialOption{
1516 grpc.WithTransportCredentials(insecure.NewCredentials()),
1517 grpc.WithChainStreamInterceptor(
1518 metrics.StreamClientInterceptor(),
1519 messagesize.StreamClientInterceptor,
1520 internalActorStreamInterceptor(),
1521 internalerrs.LoggingStreamClientInterceptor(logger),
1522 internalerrs.PrometheusStreamClientInterceptor,
1523 retry.StreamClientInterceptor(retryOpts...),
1524 ),
1525 grpc.WithChainUnaryInterceptor(
1526 metrics.UnaryClientInterceptor(),
1527 messagesize.UnaryClientInterceptor,
1528 internalActorUnaryInterceptor(),
1529 internalerrs.LoggingUnaryClientInterceptor(logger),
1530 internalerrs.PrometheusUnaryClientInterceptor,
1531 retry.UnaryClientInterceptor(retryOpts...),
1532 ),
1533 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1534 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1535 }
1536
1537 opts = append(opts, additionalOpts...)
1538
1539 // Ensure that the message size options are set last, so they override any other
1540 // client-specific options that tweak the message size.
1541 //
1542 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1543 // take precedence over everything else with a uniform size setting that's easy to reason about.
1544 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1545
1546 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1547 // This is done lazily, so we can provide the client to use regardless of
1548 // whether we enabled gRPC or not initially.
1549 cc, err := grpc.Dial(addr, opts...)
1550 if err != nil {
1551 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1552 }
1553
1554 client := proto.NewZoektConfigurationServiceClient(cc)
1555 return client, nil
1556}
1557
1558// mustGetClientMetrics returns a singleton instance of the client metrics
1559// that are shared across all gRPC clients that this process creates.
1560//
1561// This function panics if the metrics cannot be registered with the default
1562// Prometheus registry.
1563func mustGetClientMetrics() *grpcprom.ClientMetrics {
1564 clientMetricsOnce.Do(func() {
1565 clientMetrics = grpcprom.NewClientMetrics(
1566 grpcprom.WithClientCounterOptions(),
1567 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
1568 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
1569 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
1570 )
1571
1572 prometheus.DefaultRegisterer.MustRegister(clientMetrics)
1573 })
1574
1575 return clientMetrics
1576}
1577
1578// addDefaultPort adds a default port to a URL if one is not specified.
1579//
1580// If the URL scheme is "http" and no port is specified, "80" is used.
1581// If the scheme is "https", "443" is used.
1582//
1583// The original URL is not mutated. A copy is modified and returned.
1584func addDefaultPort(original *url.URL) *url.URL {
1585 if original == nil {
1586 return nil // don't panic
1587 }
1588
1589 if !original.IsAbs() {
1590 return original // don't do anything if the URL doesn't look like a remote URL
1591 }
1592
1593 if original.Scheme == "http" && original.Port() == "" {
1594 u := cloneURL(original)
1595 u.Host = net.JoinHostPort(u.Host, "80")
1596 return u
1597 }
1598
1599 if original.Scheme == "https" && original.Port() == "" {
1600 u := cloneURL(original)
1601 u.Host = net.JoinHostPort(u.Host, "443")
1602 return u
1603 }
1604
1605 return original
1606}
1607
1608// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1609// This is copied from net/http/clone.go
1610func cloneURL(u *url.URL) *url.URL {
1611 if u == nil {
1612 return nil
1613 }
1614 u2 := new(url.URL)
1615 *u2 = *u
1616 if u.User != nil {
1617 u2.User = new(url.Userinfo)
1618 *u2.User = *u.User
1619 }
1620 return u2
1621}
1622
1623func main() {
1624 liblog := sglog.Init(sglog.Resource{
1625 Name: "zoekt-indexserver",
1626 Version: zoekt.Version,
1627 InstanceID: zoekt.HostnameBestEffort(),
1628 })
1629 defer liblog.Sync()
1630
1631 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1632 log.Fatal(err)
1633 }
1634}
1635
1636// mustGetBoolFromEnvironmentVariables is like getBoolFromEnvironmentVariables, but it panics
1637// if any of the provided environment variables fails to parse as a boolean.
1638func mustGetBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) bool {
1639 value, err := getBoolFromEnvironmentVariables(envVarNames, defaultBool)
1640 if err != nil {
1641 panic(err)
1642 }
1643
1644 return value
1645}
1646
1647// getBoolFromEnvironmentVariables returns the boolean defined by the first environment
1648// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set.
1649//
1650// An error is returned of the provided environment variables fails to parse as a boolean.
1651func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) {
1652 for _, envVar := range envVarNames {
1653 v := os.Getenv(envVar)
1654 if v == "" {
1655 continue
1656 }
1657
1658 b, err := strconv.ParseBool(v)
1659 if err != nil {
1660 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err)
1661 }
1662
1663 return b, nil
1664 }
1665
1666 return defaultBool, nil
1667}