fork of https://github.com/sourcegraph/zoekt
1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled
2// repositories on sourcegraph
3package main
4
5import (
6 "bytes"
7 "context"
8 _ "embed"
9 "encoding/json"
10 "errors"
11 "flag"
12 "fmt"
13 "html/template"
14 "io"
15 "io/fs"
16 "log"
17 "math"
18 "math/rand"
19 "net"
20 "net/http"
21 "net/url"
22 "os"
23 "os/exec"
24 "os/signal"
25 "path/filepath"
26 "runtime"
27 "sort"
28 "strconv"
29 "strings"
30 "sync"
31 "text/tabwriter"
32 "time"
33
34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
36 "github.com/peterbourgon/ff/v3/ffcli"
37 "github.com/prometheus/client_golang/prometheus"
38 "github.com/prometheus/client_golang/prometheus/promauto"
39 sglog "github.com/sourcegraph/log"
40 "github.com/sourcegraph/mountinfo"
41 "go.uber.org/automaxprocs/maxprocs"
42 "golang.org/x/net/trace"
43 "golang.org/x/sys/unix"
44 "google.golang.org/grpc"
45 "google.golang.org/grpc/codes"
46 "google.golang.org/grpc/credentials/insecure"
47 "google.golang.org/grpc/metadata"
48
49 "github.com/sourcegraph/zoekt"
50 "github.com/sourcegraph/zoekt/build"
51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
52 "github.com/sourcegraph/zoekt/debugserver"
53 "github.com/sourcegraph/zoekt/grpc/internalerrs"
54 "github.com/sourcegraph/zoekt/grpc/messagesize"
55 "github.com/sourcegraph/zoekt/internal/profiler"
56)
57
58var (
59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
60 Name: "resolve_revisions_seconds",
61 Help: "A histogram of latencies for resolving all repository revisions.",
62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
63 })
64
65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
66 Name: "resolve_revision_seconds",
67 Help: "A histogram of latencies for resolving a repository revision.",
68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
69 }, []string{"success"}) // success=true|false
70
71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
72 Name: "get_index_options_total",
73 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
74 })
75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
76 Name: "get_index_options_error_total",
77 Help: "The total number of times we failed to get index options for a repository.",
78 })
79
80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
81 Name: "index_repo_seconds",
82 Help: "A histogram of latencies for indexing a repository.",
83 Buckets: prometheus.ExponentialBucketsRange(
84 (100 * time.Millisecond).Seconds(),
85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
86 20),
87 }, []string{
88 "state", // state is an indexState
89 "name", // name of the repository that was indexed
90 })
91
92 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
93 Name: "index_fetch_seconds",
94 Help: "A histogram of latencies for fetching a repository.",
95 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
96 }, []string{
97 "success", // true|false
98 "name", // the name of the repository that the commits were fetched from
99 })
100
101 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
102 Name: "index_incremental_index_state",
103 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
104 }, []string{"state"}) // state is build.IndexState
105
106 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
107 Name: "index_num_indexed",
108 Help: "Number of indexed repos by code host",
109 })
110
111 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
112 Name: "index_failing_total",
113 Help: "Counts failures to index (indexing activity, should be used with rate())",
114 })
115
116 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
117 Name: "index_indexing_total",
118 Help: "Counts indexings (indexing activity, should be used with rate())",
119 })
120
121 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
122 Name: "index_num_stopped_tracking_total",
123 Help: "Counts the number of repos we stopped tracking.",
124 })
125
126 clientMetricsOnce sync.Once
127 clientMetrics *grpcprom.ClientMetrics
128)
129
130// set of repositories that we want to capture separate indexing metrics for
131var reposWithSeparateIndexingMetrics = make(map[string]struct{})
132
133type indexState string
134
135const (
136 indexStateFail indexState = "fail"
137 indexStateSuccess indexState = "success"
138 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
139 indexStateNoop indexState = "noop" // We didn't need to update index
140 indexStateEmpty indexState = "empty" // index is empty (empty repo)
141)
142
143// Server is the main functionality of zoekt-sourcegraph-indexserver. It
144// exists to conveniently use all the options passed in via func main.
145type Server struct {
146 logger sglog.Logger
147
148 Sourcegraph Sourcegraph
149 BatchSize int
150
151 // IndexDir is the index directory to use.
152 IndexDir string
153
154 // IndexConcurrency is the number of repositories we index at once.
155 IndexConcurrency int
156
157 // Interval is how often we sync with Sourcegraph.
158 Interval time.Duration
159
160 // CPUCount is the number of CPUs to use for indexing shards.
161 CPUCount int
162
163 queue Queue
164
165 // muIndexDir protects the index directory from concurrent access.
166 muIndexDir indexMutex
167
168 // If true, shard merging is enabled.
169 shardMerging bool
170
171 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
172 // use delta-builds for instead of normal builds
173 deltaBuildRepositoriesAllowList map[string]struct{}
174
175 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
176 // before attempting a delta build.
177 deltaShardNumberFallbackThreshold uint64
178
179 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
180 // we skip calculating symbols metadata for during builds
181 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
182
183 hostname string
184
185 mergeOpts mergeOpts
186
187 // timeout defines how long the index server waits before killing an indexing job.
188 timeout time.Duration
189}
190
191var debug = log.New(io.Discard, "", log.LstdFlags)
192
193// our index commands should output something every 100mb they process.
194//
195// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
196// time." famous last words. A client was indexing a monorepo with 42
197// cores... 5m was not enough.
198const noOutputTimeout = 30 * time.Minute
199
200func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
201 out := &synchronizedBuffer{}
202 cmd.Stdout = out
203 cmd.Stderr = out
204
205 tr.LazyPrintf("%s", cmd.Args)
206
207 defer func() {
208 if err != nil {
209 outS := out.String()
210 tr.LazyPrintf("failed: %v", err)
211 tr.LazyPrintf("output: %s", out)
212 tr.SetError()
213 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
214 }
215 }()
216
217 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
218
219 if err := cmd.Start(); err != nil {
220 return err
221 }
222
223 errC := make(chan error)
224 go func() {
225 errC <- cmd.Wait()
226 }()
227
228 // This channel is set after we have sent sigquit. It allows us to follow up
229 // with a sigkill if the process doesn't quit after sigquit.
230 kill := make(<-chan time.Time)
231
232 lastLen := 0
233 for {
234 select {
235 case <-time.After(noOutputTimeout):
236 // Periodically check if we have had output. If not kill the process.
237 if out.Len() != lastLen {
238 lastLen = out.Len()
239 log.Printf("still running %s", cmd.Args)
240 } else {
241 // Send quit (C-\) first so we get a stack dump.
242 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
243 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
244 log.Println("quit failed:", err)
245 }
246
247 // send sigkill if still running in 10s
248 kill = time.After(10 * time.Second)
249 }
250
251 case <-kill:
252 log.Printf("still running, killing %s", cmd.Args)
253 if err := cmd.Process.Kill(); err != nil {
254 log.Println("kill failed:", err)
255 }
256
257 case err := <-errC:
258 if err != nil {
259 return err
260 }
261
262 tr.LazyPrintf("success")
263 return nil
264 }
265 }
266}
267
268// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
269// monitor the buffer while it is being written to.
270type synchronizedBuffer struct {
271 mu sync.Mutex
272 b bytes.Buffer
273}
274
275func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
276 sb.mu.Lock()
277 defer sb.mu.Unlock()
278 return sb.b.Write(p)
279}
280
281func (sb *synchronizedBuffer) Len() int {
282 sb.mu.Lock()
283 defer sb.mu.Unlock()
284 return sb.b.Len()
285}
286
287func (sb *synchronizedBuffer) String() string {
288 sb.mu.Lock()
289 defer sb.mu.Unlock()
290 return sb.b.String()
291}
292
293// pauseFileName if present in IndexDir will stop index jobs from
294// running. This is to make it possible to experiment with the content of the
295// IndexDir without the indexserver writing to it.
296const pauseFileName = "PAUSE"
297
298// Run the sync loop. This blocks forever.
299func (s *Server) Run() {
300 removeIncompleteShards(s.IndexDir)
301
302 // Start a goroutine which updates the queue with commits to index.
303 go func() {
304 // We update the list of indexed repos every Interval. To speed up manual
305 // testing we also listen for SIGUSR1 to trigger updates.
306 //
307 // "pkill -SIGUSR1 zoekt-sourcegra"
308 for range jitterTicker(s.Interval, unix.SIGUSR1) {
309 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
310 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
311 continue
312 }
313
314 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
315 if err != nil {
316 log.Printf("error listing repos: %s", err)
317 continue
318 }
319
320 debug.Printf("updating index queue with %d repositories", len(repos.IDs))
321
322 // Stop indexing repos we don't need to track anymore
323 removed := s.queue.MaybeRemoveMissing(repos.IDs)
324 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
325 if len(removed) > 0 {
326 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
327 }
328
329 cleanupDone := make(chan struct{})
330 go func() {
331 defer close(cleanupDone)
332 s.muIndexDir.Global(func() {
333 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
334 })
335 }()
336
337 repos.IterateIndexOptions(s.queue.AddOrUpdate)
338
339 // IterateIndexOptions will only iterate over repositories that have
340 // changed since we last called list. However, we want to add all IDs
341 // back onto the queue just to check that what is on disk is still
342 // correct. This will use the last IndexOptions we stored in the
343 // queue. The repositories not on the queue (missing) need a forced
344 // fetch of IndexOptions.
345 missing := s.queue.Bump(repos.IDs)
346 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
347
348 setCompoundShardCounter(s.IndexDir)
349
350 <-cleanupDone
351 }
352 }()
353
354 go func() {
355 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
356 if s.shardMerging {
357 s.vacuum()
358 }
359 }
360 }()
361
362 go func() {
363 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
364 if s.shardMerging {
365 s.doMerge()
366 }
367 }
368 }()
369
370 for i := 0; i < s.IndexConcurrency; i++ {
371 go s.processQueue()
372 }
373
374 // block forever
375 select {}
376}
377
378// formatList returns a comma-separated list of the first min(len(v), m) items.
379func formatListUint32(v []uint32, m int) string {
380 if len(v) < m {
381 m = len(v)
382 }
383
384 sb := strings.Builder{}
385 for i := 0; i < m; i++ {
386 fmt.Fprintf(&sb, "%d, ", v[i])
387 }
388
389 if len(v) > m {
390 sb.WriteString("...")
391 }
392
393 return strings.TrimRight(sb.String(), ", ")
394}
395
396func (s *Server) processQueue() {
397 for {
398 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
399 time.Sleep(time.Second)
400 continue
401 }
402
403 opts, ok := s.queue.Pop()
404 if !ok {
405 time.Sleep(time.Second)
406 continue
407 }
408
409 args := s.indexArgs(opts)
410
411 ran := s.muIndexDir.With(opts.Name, func() {
412 // only record time taken once we hold the lock. This avoids us
413 // recording time taken while merging/cleanup runs.
414 start := time.Now()
415
416 state, err := s.Index(args)
417
418 elapsed := time.Since(start)
419
420 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
421
422 if err != nil {
423 log.Printf("error indexing %s: %s", args.String(), err)
424 }
425
426 switch state {
427 case indexStateSuccess:
428 var branches []string
429 for _, b := range args.Branches {
430 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
431 }
432 s.logger.Info("updated index",
433 sglog.String("repo", args.Name),
434 sglog.Uint32("id", args.RepoID),
435 sglog.Strings("branches", branches),
436 sglog.Duration("duration", elapsed),
437 )
438 case indexStateSuccessMeta:
439 log.Printf("updated meta %s in %v", args.String(), elapsed)
440 }
441 s.queue.SetIndexed(opts, state)
442 })
443
444 if !ran {
445 // Someone else is processing the repository. We can just skip this job
446 // since the repository will be added back to the queue and we will
447 // converge to the correct behaviour.
448 debug.Printf("index job for repository already running: %s", args)
449 continue
450 }
451 }
452}
453
454// repoNameForMetric returns a normalized version of the given repository name that is
455// suitable for use with Prometheus metrics.
456func repoNameForMetric(repo string) string {
457 // Check to see if we want to be able to capture separate indexing metrics for this repository.
458 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
459 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
460 return repo
461 }
462
463 return ""
464}
465
466func batched(slice []uint32, size int) <-chan []uint32 {
467 c := make(chan []uint32)
468 go func() {
469 for len(slice) > 0 {
470 if size > len(slice) {
471 size = len(slice)
472 }
473 c <- slice[:size]
474 slice = slice[size:]
475 }
476 close(c)
477 }()
478 return c
479}
480
481// jitterTicker returns a ticker which ticks with a jitter. Each tick is
482// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
483//
484// sig is a list of signals which also cause the ticker to fire. This is a
485// convenience to allow manually triggering of the ticker.
486func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
487 ticker := make(chan struct{})
488
489 go func() {
490 for {
491 ticker <- struct{}{}
492 ns := int64(d)
493 jitter := rand.Int63n(ns)
494 time.Sleep(time.Duration(ns/2 + jitter))
495 }
496 }()
497
498 go func() {
499 if len(sig) == 0 {
500 return
501 }
502
503 c := make(chan os.Signal, 1)
504 signal.Notify(c, sig...)
505 for range c {
506 ticker <- struct{}{}
507 }
508 }()
509
510 return ticker
511}
512
513// Index starts an index job for repo name at commit.
514func (s *Server) Index(args *indexArgs) (state indexState, err error) {
515 tr := trace.New("index", args.Name)
516
517 defer func() {
518 if err != nil {
519 tr.SetError()
520 tr.LazyPrintf("error: %v", err)
521 state = indexStateFail
522 metricFailingTotal.Inc()
523 }
524 tr.LazyPrintf("state: %s", state)
525 tr.Finish()
526 }()
527
528 tr.LazyPrintf("branches: %v", args.Branches)
529
530 if len(args.Branches) == 0 {
531 return indexStateEmpty, createEmptyShard(args)
532 }
533
534 repositoryName := args.Name
535 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
536 tr.LazyPrintf("marking this repository for delta build")
537 args.UseDelta = true
538 }
539
540 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
541
542 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
543 tr.LazyPrintf("skipping symbols calculation")
544 args.Symbols = false
545 }
546
547 reason := "forced"
548
549 if args.Incremental {
550 bo := args.BuildOptions()
551 bo.SetDefaults()
552 incrementalState, fn := bo.IndexState()
553 reason = string(incrementalState)
554 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
555
556 switch incrementalState {
557 case build.IndexStateEqual:
558 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn)
559 return indexStateNoop, nil
560
561 case build.IndexStateMeta:
562 log.Printf("updating index.meta %s", args.String())
563
564 if err := mergeMeta(bo); err != nil {
565 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
566 } else {
567 return indexStateSuccessMeta, nil
568 }
569
570 case build.IndexStateCorrupt:
571 log.Printf("falling back to full update: corrupt index: %s", args.String())
572 }
573 }
574
575 log.Printf("updating index %s reason=%s", args.String(), reason)
576
577 metricIndexingTotal.Inc()
578 c := gitIndexConfig{
579 runCmd: func(cmd *exec.Cmd) error {
580 return s.loggedRun(tr, cmd)
581 },
582
583 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
584 return args.BuildOptions().FindRepositoryMetadata()
585 },
586 timeout: s.timeout,
587 }
588
589 err = gitIndex(c, args, s.Sourcegraph, s.logger)
590 if err != nil {
591 return indexStateFail, err
592 }
593
594 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
595 s.logger.Error("failed to update index status",
596 sglog.String("repo", args.Name),
597 sglog.Uint32("id", args.RepoID),
598 sglogBranches("branches", args.Branches),
599 sglog.Error(err),
600 )
601 }
602
603 return indexStateSuccess, nil
604}
605
606// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
607// it can update the zoekt_repos table.
608func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
609 // We need to read from disk for IndexTime.
610 _, metadata, ok, err := c.findRepositoryMetadata(args)
611 if err != nil {
612 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
613 }
614 if !ok {
615 return errors.New("failed to find metadata for new/updated index")
616 }
617
618 status := []indexStatus{{
619 RepoID: args.RepoID,
620 Branches: args.Branches,
621 IndexTimeUnix: metadata.IndexTime.Unix(),
622 }}
623 if err := sg.UpdateIndexStatus(status); err != nil {
624 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
625 }
626
627 return nil
628}
629
630func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
631 ss := make([]string, len(branches))
632 for i, b := range branches {
633 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
634 }
635 return sglog.Strings(key, ss)
636}
637
638func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
639 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
640 return &indexArgs{
641 IndexOptions: opts,
642 IndexDir: s.IndexDir,
643 Parallelism: parallelism,
644 Incremental: true,
645
646 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
647 FileLimit: 1 << 20,
648
649 ShardMerging: s.shardMerging,
650 }
651}
652
653// parallelism consults both the server flags and index options to determine the number
654// of shards to index in parallel. If the CPUCount index option is provided, it always
655// overrides the server flag.
656func (s *Server) parallelism(opts IndexOptions, maxProcs int) int {
657 var parallelism int
658 if opts.ShardConcurrency > 0 {
659 parallelism = int(opts.ShardConcurrency)
660 } else {
661 parallelism = s.CPUCount
662 }
663
664 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs
665 if parallelism > 4*maxProcs {
666 parallelism = 4 * maxProcs
667 }
668
669 // If index concurrency is set, then divide the parallelism by the number of
670 // repos we're indexing in parallel
671 if s.IndexConcurrency > 1 {
672 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency)))
673 }
674
675 return parallelism
676}
677
678func createEmptyShard(args *indexArgs) error {
679 bo := args.BuildOptions()
680 bo.SetDefaults()
681 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
682
683 if args.Incremental && bo.IncrementalSkipIndexing() {
684 return nil
685 }
686
687 builder, err := build.NewBuilder(*bo)
688 if err != nil {
689 return err
690 }
691 return builder.Finish()
692}
693
694// addDebugHandlers adds handlers specific to indexserver.
695func (s *Server) addDebugHandlers(mux *http.ServeMux) {
696 // Sourcegraph's site admin view requires indexserver to serve it's admin view
697 // on "/".
698 mux.Handle("/", http.HandlerFunc(s.handleRoot))
699
700 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
701 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
702 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
703 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
704 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
705 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
706}
707
708func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
709 if r.Method != "GET" {
710 w.Header().Set("Allow", "GET")
711 w.WriteHeader(http.StatusMethodNotAllowed)
712 return
713 }
714
715 response := struct {
716 Hostname string
717 }{
718 Hostname: s.hostname,
719 }
720
721 b, err := json.Marshal(response)
722 if err != nil {
723 http.Error(w, err.Error(), http.StatusInternalServerError)
724 return
725 }
726
727 w.Header().Set("Content-Type", "application/json; charset=utf-8")
728 w.Write(b)
729}
730
731var rootTmpl = template.Must(template.New("name").Parse(`
732<html>
733 <body>
734 <a href="debug">Debug</a><br />
735 <a href="debug/requests">Traces</a><br />
736 {{.IndexMsg}}<br />
737 <br />
738 <h3>Reindex</h3>
739 {{if .Repos}}
740 <a href="?show_repos=false">hide repos</a><br />
741 <table style="margin-top: 20px">
742 <th style="text-align:left">Name</th>
743 <th style="text-align:left">ID</th>
744 {{range .Repos}}
745 <tr>
746 <td>{{.Name}}</td>
747 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
748 </tr>
749 {{end}}
750 </table>
751 {{else}}
752 <a href="?show_repos=true">show repos</a><br />
753 {{end}}
754 </body>
755</html>
756`))
757
758func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
759 if r.Method != "GET" {
760 w.Header().Set("Allow", "GET")
761 w.WriteHeader(http.StatusMethodNotAllowed)
762 return
763 }
764
765 values := r.URL.Query()
766
767 // ?id=
768 indexMsg := ""
769 if v := values.Get("id"); v != "" {
770 id, err := strconv.Atoi(v)
771 if err != nil {
772 http.Error(w, err.Error(), http.StatusBadRequest)
773 return
774 }
775 indexMsg, _ = s.forceIndex(uint32(id))
776 }
777
778 // ?show_repos=
779 showRepos := false
780 if v := values.Get("show_repos"); v != "" {
781 showRepos, _ = strconv.ParseBool(v)
782 }
783
784 type Repo struct {
785 ID uint32
786 Name string
787 }
788 var data struct {
789 Repos []Repo
790 IndexMsg string
791 }
792
793 data.IndexMsg = indexMsg
794
795 if showRepos {
796 s.queue.Iterate(func(opts *IndexOptions) {
797 data.Repos = append(data.Repos, Repo{
798 ID: opts.RepoID,
799 Name: opts.Name,
800 })
801 })
802 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
803 }
804
805 _ = rootTmpl.Execute(w, data)
806}
807
808// handleReindex triggers a reindex asynocronously. If a reindex was triggered
809// the request returns with status 202. The caller can infer the new state of
810// the index by calling List.
811func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
812 if r.Method != http.MethodPost {
813 w.Header().Set("Allow", http.MethodPost)
814 w.WriteHeader(http.StatusMethodNotAllowed)
815 return
816 }
817
818 err := r.ParseForm()
819 if err != nil {
820 http.Error(w, err.Error(), http.StatusBadRequest)
821 return
822 }
823
824 id, err := strconv.Atoi(r.Form.Get("repo"))
825 if err != nil {
826 http.Error(w, err.Error(), http.StatusBadRequest)
827 return
828 }
829
830 go func() { s.forceIndex(uint32(id)) }()
831
832 // 202 Accepted
833 w.WriteHeader(http.StatusAccepted)
834}
835
836func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
837 withIndexed := true
838 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
839 withIndexed = b
840 }
841
842 var indexed []uint32
843 if withIndexed {
844 indexed = listIndexed(s.IndexDir)
845 }
846
847 repos, err := s.Sourcegraph.List(r.Context(), indexed)
848 if err != nil {
849 http.Error(w, err.Error(), http.StatusInternalServerError)
850 return
851 }
852
853 bw := bytes.Buffer{}
854
855 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
856
857 _, err = fmt.Fprintf(tw, "ID\tName\n")
858 if err != nil {
859 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
860 return
861 }
862
863 s.queue.mu.Lock()
864 name := ""
865 for _, id := range repos.IDs {
866 if item := s.queue.get(id); item != nil {
867 name = item.opts.Name
868 } else {
869 name = ""
870 }
871 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
872 if err != nil {
873 debug.Printf("handleDebugList: %s\n", err.Error())
874 }
875 }
876 s.queue.mu.Unlock()
877
878 if err != nil {
879 http.Error(w, err.Error(), http.StatusInternalServerError)
880 return
881 }
882
883 err = tw.Flush()
884 if err != nil {
885 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
886 return
887 }
888
889 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
890
891 if _, err := io.Copy(w, &bw); err != nil {
892 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
893 return
894 }
895}
896
897// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
898// can run this command during periods of low usage (evenings, weekends) to
899// trigger an initial merge run. In the steady-state, merges happen rarely, even
900// on busy instances, and users can rely on automatic merging instead.
901func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
902 // A merge operation can take very long, depending on the number merges and the
903 // target size of the compound shards. We run the merge in the background and
904 // return immediately to the user.
905 //
906 // We track the status of the merge with metricShardMergingRunning.
907 go func() {
908 s.doMerge()
909 }()
910 _, _ = w.Write([]byte("merging enqueued\n"))
911}
912
913func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
914 indexed := listIndexed(s.IndexDir)
915
916 bw := bytes.Buffer{}
917
918 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
919
920 _, err := fmt.Fprintf(tw, "ID\tName\n")
921 if err != nil {
922 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
923 return
924 }
925
926 s.queue.mu.Lock()
927 name := ""
928 for _, id := range indexed {
929 if item := s.queue.get(id); item != nil {
930 name = item.opts.Name
931 } else {
932 name = ""
933 }
934 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
935 if err != nil {
936 debug.Printf("handleDebugIndexed: %s\n", err.Error())
937 }
938 }
939 s.queue.mu.Unlock()
940
941 if err != nil {
942 http.Error(w, err.Error(), http.StatusInternalServerError)
943 return
944 }
945
946 err = tw.Flush()
947 if err != nil {
948 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
949 return
950 }
951
952 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
953
954 if _, err := io.Copy(w, &bw); err != nil {
955 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
956 return
957 }
958}
959
960// forceIndex will run the index job for repo name now. It will return always
961// return a string explaining what it did, even if it failed.
962func (s *Server) forceIndex(id uint32) (string, error) {
963 var opts IndexOptions
964 var err error
965 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
966 opts = o
967 }, func(_ uint32, e error) {
968 err = e
969 }, id)
970 if err != nil {
971 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
972 }
973
974 args := s.indexArgs(opts)
975 args.Incremental = false // force re-index
976
977 var state indexState
978 ran := s.muIndexDir.With(opts.Name, func() {
979 state, err = s.Index(args)
980 })
981 if !ran {
982 return fmt.Sprintf("index job for repository already running: %s", args), nil
983 }
984 if err != nil {
985 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
986 }
987 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
988}
989
990func listIndexed(indexDir string) []uint32 {
991 index := getShards(indexDir)
992 metricNumIndexed.Set(float64(len(index)))
993 repoIDs := make([]uint32, 0, len(index))
994 for id := range index {
995 repoIDs = append(repoIDs, id)
996 }
997 sort.Slice(repoIDs, func(i, j int) bool {
998 return repoIDs[i] < repoIDs[j]
999 })
1000 return repoIDs
1001}
1002
1003// setupTmpDir sets up a temporary directory on the same volume as the
1004// indexes.
1005//
1006// If main is true we will delete older temp directories left around. main is
1007// false when this is a debug command.
1008func setupTmpDir(logger sglog.Logger, main bool, index string) error {
1009 // change the target tmp directory depending on if it's our main daemon or a
1010 // debug sub command.
1011 dir := ".indexserver.debug.tmp"
1012 if main {
1013 dir = ".indexserver.tmp"
1014 }
1015
1016 tmpRoot := filepath.Join(index, dir)
1017
1018 if main {
1019 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot))
1020 err := os.RemoveAll(tmpRoot)
1021 if err != nil {
1022 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err))
1023 }
1024 }
1025
1026 if err := os.MkdirAll(tmpRoot, 0o755); err != nil {
1027 return err
1028 }
1029
1030 return os.Setenv("TMPDIR", tmpRoot)
1031}
1032
1033func printMetaData(fn string) error {
1034 repo, indexMeta, err := zoekt.ReadMetadataPath(fn)
1035 if err != nil {
1036 return err
1037 }
1038
1039 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1040 if err != nil {
1041 return err
1042 }
1043
1044 err = json.NewEncoder(os.Stdout).Encode(repo)
1045 if err != nil {
1046 return err
1047 }
1048 return nil
1049}
1050
1051func printShardStats(fn string) error {
1052 f, err := os.Open(fn)
1053 if err != nil {
1054 return err
1055 }
1056
1057 iFile, err := zoekt.NewIndexFile(f)
1058 if err != nil {
1059 return err
1060 }
1061
1062 return zoekt.PrintNgramStats(iFile)
1063}
1064
1065func srcLogLevelIsDebug() bool {
1066 lvl := os.Getenv(sglog.EnvLogLevel)
1067 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1068}
1069
1070func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1071 v := os.Getenv(k)
1072 if v == "" {
1073 return defaultVal
1074 }
1075 b, err := strconv.ParseBool(v)
1076 if err != nil {
1077 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1078 }
1079 return b
1080}
1081
1082func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1083 v := os.Getenv(k)
1084 if v == "" {
1085 return defaultVal
1086 }
1087 i, err := strconv.ParseInt(v, 10, 64)
1088 if err != nil {
1089 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1090 }
1091 return i
1092}
1093
1094func getEnvWithDefaultInt(k string, defaultVal int) int {
1095 v := os.Getenv(k)
1096 if v == "" {
1097 return defaultVal
1098 }
1099 i, err := strconv.Atoi(k)
1100 if err != nil {
1101 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1102 }
1103 return i
1104}
1105
1106func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1107 v := os.Getenv(k)
1108 if v == "" {
1109 return defaultVal
1110 }
1111 i, err := strconv.ParseUint(v, 10, 64)
1112 if err != nil {
1113 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1114 }
1115 return i
1116}
1117
1118func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1119 v := os.Getenv(k)
1120 if v == "" {
1121 return defaultVal
1122 }
1123 f, err := strconv.ParseFloat(v, 64)
1124 if err != nil {
1125 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1126 }
1127 return f
1128}
1129
1130func getEnvWithDefaultString(k string, defaultVal string) string {
1131 v := os.Getenv(k)
1132 if v == "" {
1133 return defaultVal
1134 }
1135 return v
1136}
1137
1138func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1139 v := os.Getenv(k)
1140 if v == "" {
1141 return defaultVal
1142 }
1143
1144 d, err := time.ParseDuration(v)
1145 if err != nil {
1146 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1147 }
1148 return d
1149}
1150
1151func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1152 set := map[string]struct{}{}
1153 for _, v := range strings.Split(os.Getenv(k), ",") {
1154 v = strings.TrimSpace(v)
1155 if v != "" {
1156 set[v] = struct{}{}
1157 }
1158 }
1159 return set
1160}
1161
1162func joinStringSet(set map[string]struct{}, sep string) string {
1163 var xs []string
1164 for x := range set {
1165 xs = append(xs, x)
1166 }
1167
1168 return strings.Join(xs, sep)
1169}
1170
1171func setCompoundShardCounter(indexDir string) {
1172 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt"))
1173 if err != nil {
1174 log.Printf("setCompoundShardCounter: %s\n", err)
1175 return
1176 }
1177 metricNumberCompoundShards.Set(float64(len(fns)))
1178}
1179
1180func rootCmd() *ffcli.Command {
1181 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1182 conf := rootConfig{
1183 Main: true,
1184 }
1185 conf.registerRootFlags(rootFs)
1186
1187 return &ffcli.Command{
1188 FlagSet: rootFs,
1189 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1190 Subcommands: []*ffcli.Command{debugCmd()},
1191 Exec: func(ctx context.Context, args []string) error {
1192 return startServer(conf)
1193 },
1194 }
1195}
1196
1197type rootConfig struct {
1198 // Main is true if this rootConfig is for our main long running command (the
1199 // indexserver). Debug commands should not set this value. This is used to
1200 // determine if we need to run tmpfriend.
1201 Main bool
1202
1203 root string
1204 interval time.Duration
1205 index string
1206 indexConcurrency int64
1207 listen string
1208 hostname string
1209 cpuFraction float64
1210 blockProfileRate int
1211
1212 // config values related to shard merging
1213 disableShardMerging bool
1214 vacuumInterval time.Duration
1215 mergeInterval time.Duration
1216 targetSize int64
1217 minSize int64
1218 minAgeDays int
1219
1220 // config values related to backoff indexing repos with one or more consecutive failures
1221 backoffDuration time.Duration
1222 maxBackoffDuration time.Duration
1223}
1224
1225func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1226 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1227 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1228 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
1229 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use")
1230 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1231 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1232 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1233 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate")
1234 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1235 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1236
1237 // flags related to shard merging
1238 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging")
1239 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1240 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1241 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB")
1242 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB")
1243 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1244}
1245
1246func startServer(conf rootConfig) error {
1247 s, err := newServer(conf)
1248 if err != nil {
1249 return err
1250 }
1251
1252 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate)
1253 setCompoundShardCounter(s.IndexDir)
1254
1255 if conf.listen != "" {
1256
1257 mux := http.NewServeMux()
1258 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1259 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1260 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1261 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1262 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1263 }...)
1264 s.addDebugHandlers(mux)
1265
1266 go func() {
1267 debug.Printf("serving HTTP on %s", conf.listen)
1268 log.Fatal(http.ListenAndServe(conf.listen, mux))
1269 }()
1270
1271 // Serve mux on a unix domain socket on a best-effort-basis so that
1272 // webserver can call the endpoints via the shared filesystem.
1273 //
1274 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1275 // on the socket due to permission errors. See
1276 // https://github.com/docker/for-mac/issues/6239
1277 go func() {
1278 serveHTTPOverSocket := func() error {
1279 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1280 // We cannot bind a socket to an existing pathname.
1281 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1282 return fmt.Errorf("error removing socket file: %s", socket)
1283 }
1284 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1285 // unixpacket).
1286 l, err := net.Listen("unix", socket)
1287 if err != nil {
1288 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1289 }
1290 // Indexserver (root) and webserver (Sourcegraph) run with
1291 // different users. Per default, the socket is created with
1292 // permission 755 (root root), which doesn't let webserver write to
1293 // it.
1294 //
1295 // See https://github.com/golang/go/issues/11822 for more context.
1296 if err := os.Chmod(socket, 0o777); err != nil {
1297 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1298 }
1299 debug.Printf("serving HTTP on %s", socket)
1300 return http.Serve(l, mux)
1301 }
1302 debug.Print(serveHTTPOverSocket())
1303 }()
1304 }
1305
1306 oc := &ownerChecker{
1307 Path: filepath.Join(conf.index, "owner.txt"),
1308 Hostname: conf.hostname,
1309 }
1310 go oc.Run()
1311
1312 logger := sglog.Scoped("metricsRegistration")
1313 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1314
1315 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1316 prometheus.DefaultRegisterer.MustRegister(c)
1317
1318 s.Run()
1319 return nil
1320}
1321
1322func newServer(conf rootConfig) (*Server, error) {
1323 logger := sglog.Scoped("server")
1324
1325 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1326 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1327 }
1328 if conf.index == "" {
1329 return nil, fmt.Errorf("must set -index")
1330 }
1331 if conf.root == "" {
1332 return nil, fmt.Errorf("must set -sourcegraph_url")
1333 }
1334 rootURL, err := url.Parse(conf.root)
1335 if err != nil {
1336 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1337 }
1338
1339 rootURL = addDefaultPort(rootURL)
1340
1341 // Tune GOMAXPROCS to match Linux container CPU quota.
1342 _, _ = maxprocs.Set()
1343
1344 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler.
1345 // The block profiler is disabled by default and should be enabled with care in production
1346 runtime.SetBlockProfileRate(conf.blockProfileRate)
1347
1348 // Automatically prepend our own path at the front, to minimize
1349 // required configuration.
1350 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1351 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1352 }
1353
1354 if _, err := os.Stat(conf.index); err != nil {
1355 if err := os.MkdirAll(conf.index, 0o755); err != nil {
1356 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1357 }
1358 }
1359
1360 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil {
1361 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1362 }
1363
1364 if srcLogLevelIsDebug() {
1365 debug = log.New(os.Stderr, "", log.LstdFlags)
1366 }
1367
1368 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1369 if len(reposWithSeparateIndexingMetrics) > 0 {
1370 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1371 }
1372
1373 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1374 if len(deltaBuildRepositoriesAllowList) > 0 {
1375 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1376 }
1377
1378 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1379 if deltaShardNumberFallbackThreshold > 0 {
1380 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1381 } else {
1382 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1383 }
1384
1385 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1386 if len(reposShouldSkipSymbolsCalculation) > 0 {
1387 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1388 }
1389
1390 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout)
1391 if indexingTimeout != defaultIndexingTimeout {
1392 debug.Printf("using configured indexing timeout: %s", indexingTimeout)
1393 }
1394
1395 var sg Sourcegraph
1396 if rootURL.IsAbs() {
1397 var batchSize int
1398 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1399 batchSize, err = strconv.Atoi(v)
1400 if err != nil {
1401 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1402 }
1403 }
1404
1405 opts := []SourcegraphClientOption{
1406 WithBatchSize(batchSize),
1407 }
1408
1409 logger := sglog.Scoped("zoektConfigurationGRPCClient")
1410 client, err := dialGRPCClient(rootURL.Host, logger)
1411 if err != nil {
1412 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1413 }
1414
1415 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...)
1416
1417 } else {
1418 sg = sourcegraphFake{
1419 RootDir: rootURL.String(),
1420 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1421 }
1422 }
1423
1424 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
1425 if cpuCount < 1 {
1426 cpuCount = 1
1427 }
1428
1429 if conf.indexConcurrency < 1 {
1430 conf.indexConcurrency = 1
1431 } else if conf.indexConcurrency > int64(cpuCount) {
1432 conf.indexConcurrency = int64(cpuCount)
1433 }
1434
1435 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1436
1437 return &Server{
1438 logger: logger,
1439 Sourcegraph: sg,
1440 IndexDir: conf.index,
1441 IndexConcurrency: int(conf.indexConcurrency),
1442 Interval: conf.interval,
1443 CPUCount: cpuCount,
1444 queue: *q,
1445 shardMerging: !conf.disableShardMerging,
1446 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1447 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1448 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1449 hostname: conf.hostname,
1450 mergeOpts: mergeOpts{
1451 vacuumInterval: conf.vacuumInterval,
1452 mergeInterval: conf.mergeInterval,
1453 targetSizeBytes: conf.targetSize * 1024 * 1024,
1454 minSizeBytes: conf.minSize * 1024 * 1024,
1455 minAgeDays: conf.minAgeDays,
1456 },
1457 timeout: indexingTimeout,
1458 }, err
1459}
1460
1461// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1462// for the indexed-search-configuration gRPC service.
1463//
1464// The default backoff strategy is modeled after the default settings used by
1465// retryablehttp.DefaultClient.
1466//
1467// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1468// - Unavailable
1469// - Aborted
1470//
1471//go:embed default_grpc_service_configuration.json
1472var defaultGRPCServiceConfigurationJSON string
1473
1474func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1475 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1476 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1477 return invoker(ctx, method, req, reply, cc, opts...)
1478 }
1479}
1480
1481func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1482 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1483 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1484 return streamer(ctx, desc, cc, method, opts...)
1485 }
1486}
1487
1488// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1489// This can be overridden by providing custom Server/Dial options.
1490const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1491
1492func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) {
1493 metrics := mustGetClientMetrics()
1494
1495 // If the service seems to be unavailable, this
1496 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1
1497 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s])
1498 retryOpts := []retry.CallOption{
1499 retry.WithMax(5),
1500 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)),
1501 retry.WithCodes(codes.Unavailable),
1502 }
1503
1504 opts := []grpc.DialOption{
1505 grpc.WithTransportCredentials(insecure.NewCredentials()),
1506 grpc.WithChainStreamInterceptor(
1507 metrics.StreamClientInterceptor(),
1508 messagesize.StreamClientInterceptor,
1509 internalActorStreamInterceptor(),
1510 internalerrs.LoggingStreamClientInterceptor(logger),
1511 internalerrs.PrometheusStreamClientInterceptor,
1512 retry.StreamClientInterceptor(retryOpts...),
1513 ),
1514 grpc.WithChainUnaryInterceptor(
1515 metrics.UnaryClientInterceptor(),
1516 messagesize.UnaryClientInterceptor,
1517 internalActorUnaryInterceptor(),
1518 internalerrs.LoggingUnaryClientInterceptor(logger),
1519 internalerrs.PrometheusUnaryClientInterceptor,
1520 retry.UnaryClientInterceptor(retryOpts...),
1521 ),
1522 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1523 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1524 }
1525
1526 opts = append(opts, additionalOpts...)
1527
1528 // Ensure that the message size options are set last, so they override any other
1529 // client-specific options that tweak the message size.
1530 //
1531 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1532 // take precedence over everything else with a uniform size setting that's easy to reason about.
1533 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1534
1535 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1536 // This is done lazily, so we can provide the client to use regardless of
1537 // whether we enabled gRPC or not initially.
1538 cc, err := grpc.Dial(addr, opts...)
1539 if err != nil {
1540 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1541 }
1542
1543 client := proto.NewZoektConfigurationServiceClient(cc)
1544 return client, nil
1545}
1546
1547// mustGetClientMetrics returns a singleton instance of the client metrics
1548// that are shared across all gRPC clients that this process creates.
1549//
1550// This function panics if the metrics cannot be registered with the default
1551// Prometheus registry.
1552func mustGetClientMetrics() *grpcprom.ClientMetrics {
1553 clientMetricsOnce.Do(func() {
1554 clientMetrics = grpcprom.NewClientMetrics(
1555 grpcprom.WithClientCounterOptions(),
1556 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
1557 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
1558 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
1559 )
1560
1561 prometheus.DefaultRegisterer.MustRegister(clientMetrics)
1562 })
1563
1564 return clientMetrics
1565}
1566
1567// addDefaultPort adds a default port to a URL if one is not specified.
1568//
1569// If the URL scheme is "http" and no port is specified, "80" is used.
1570// If the scheme is "https", "443" is used.
1571//
1572// The original URL is not mutated. A copy is modified and returned.
1573func addDefaultPort(original *url.URL) *url.URL {
1574 if original == nil {
1575 return nil // don't panic
1576 }
1577
1578 if !original.IsAbs() {
1579 return original // don't do anything if the URL doesn't look like a remote URL
1580 }
1581
1582 if original.Scheme == "http" && original.Port() == "" {
1583 u := cloneURL(original)
1584 u.Host = net.JoinHostPort(u.Host, "80")
1585 return u
1586 }
1587
1588 if original.Scheme == "https" && original.Port() == "" {
1589 u := cloneURL(original)
1590 u.Host = net.JoinHostPort(u.Host, "443")
1591 return u
1592 }
1593
1594 return original
1595}
1596
1597// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1598// This is copied from net/http/clone.go
1599func cloneURL(u *url.URL) *url.URL {
1600 if u == nil {
1601 return nil
1602 }
1603 u2 := new(url.URL)
1604 *u2 = *u
1605 if u.User != nil {
1606 u2.User = new(url.Userinfo)
1607 *u2.User = *u.User
1608 }
1609 return u2
1610}
1611
1612func main() {
1613 liblog := sglog.Init(sglog.Resource{
1614 Name: "zoekt-indexserver",
1615 Version: zoekt.Version,
1616 InstanceID: zoekt.HostnameBestEffort(),
1617 })
1618 defer liblog.Sync()
1619
1620 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1621 log.Fatal(err)
1622 }
1623}
1624
1625// getBoolFromEnvironmentVariables returns the boolean defined by the first environment
1626// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set.
1627//
1628// An error is returned of the provided environment variables fails to parse as a boolean.
1629func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) {
1630 for _, envVar := range envVarNames {
1631 v := os.Getenv(envVar)
1632 if v == "" {
1633 continue
1634 }
1635
1636 b, err := strconv.ParseBool(v)
1637 if err != nil {
1638 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err)
1639 }
1640
1641 return b, nil
1642 }
1643
1644 return defaultBool, nil
1645}