fork of https://github.com/sourcegraph/zoekt
1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled
2// repositories on sourcegraph
3package main
4
5import (
6 "bytes"
7 "context"
8 _ "embed"
9 "encoding/json"
10 "errors"
11 "flag"
12 "fmt"
13 "html/template"
14 "io"
15 "io/fs"
16 "log"
17 "math"
18 "math/rand"
19 "net"
20 "net/http"
21 "net/url"
22 "os"
23 "os/exec"
24 "os/signal"
25 "path/filepath"
26 "runtime"
27 "sort"
28 "strconv"
29 "strings"
30 "sync"
31 "text/tabwriter"
32 "time"
33
34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
36 "github.com/peterbourgon/ff/v3/ffcli"
37 "github.com/prometheus/client_golang/prometheus"
38 "github.com/prometheus/client_golang/prometheus/promauto"
39 sglog "github.com/sourcegraph/log"
40 "github.com/sourcegraph/mountinfo"
41 "go.uber.org/automaxprocs/maxprocs"
42 "golang.org/x/net/trace"
43 "golang.org/x/sys/unix"
44 "google.golang.org/grpc"
45 "google.golang.org/grpc/codes"
46 "google.golang.org/grpc/credentials/insecure"
47 "google.golang.org/grpc/metadata"
48
49 "github.com/sourcegraph/zoekt"
50 "github.com/sourcegraph/zoekt/build"
51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
52 "github.com/sourcegraph/zoekt/debugserver"
53 "github.com/sourcegraph/zoekt/grpc/internalerrs"
54 "github.com/sourcegraph/zoekt/grpc/messagesize"
55 "github.com/sourcegraph/zoekt/internal/profiler"
56)
57
58var (
59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
60 Name: "resolve_revisions_seconds",
61 Help: "A histogram of latencies for resolving all repository revisions.",
62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
63 })
64
65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
66 Name: "resolve_revision_seconds",
67 Help: "A histogram of latencies for resolving a repository revision.",
68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
69 }, []string{"success"}) // success=true|false
70
71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
72 Name: "get_index_options_total",
73 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
74 })
75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
76 Name: "get_index_options_error_total",
77 Help: "The total number of times we failed to get index options for a repository.",
78 })
79
80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
81 Name: "index_repo_seconds",
82 Help: "A histogram of latencies for indexing a repository.",
83 Buckets: prometheus.ExponentialBucketsRange(
84 (100 * time.Millisecond).Seconds(),
85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
86 20),
87 }, []string{
88 "state", // state is an indexState
89 "name", // name of the repository that was indexed
90 })
91
92 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{
93 Name: "index_indexing_delay_seconds",
94 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.",
95 Buckets: prometheus.ExponentialBuckets(60, 2, 12), // 1m -> ~3 days
96 }, []string{
97 "state", // state is an indexState
98 "name", // the name of the repository that was indexed
99 })
100
101 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
102 Name: "index_fetch_seconds",
103 Help: "A histogram of latencies for fetching a repository.",
104 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
105 }, []string{
106 "success", // true|false
107 "name", // the name of the repository that the commits were fetched from
108 })
109
110 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
111 Name: "index_incremental_index_state",
112 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
113 }, []string{"state"}) // state is build.IndexState
114
115 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
116 Name: "index_num_indexed",
117 Help: "Number of indexed repos by code host",
118 })
119
120 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
121 Name: "index_failing_total",
122 Help: "Counts failures to index (indexing activity, should be used with rate())",
123 })
124
125 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
126 Name: "index_indexing_total",
127 Help: "Counts indexings (indexing activity, should be used with rate())",
128 })
129
130 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
131 Name: "index_num_stopped_tracking_total",
132 Help: "Counts the number of repos we stopped tracking.",
133 })
134
135 clientMetricsOnce sync.Once
136 clientMetrics *grpcprom.ClientMetrics
137)
138
139// set of repositories that we want to capture separate indexing metrics for
140var reposWithSeparateIndexingMetrics = make(map[string]struct{})
141
142type indexState string
143
144const (
145 indexStateFail indexState = "fail"
146 indexStateSuccess indexState = "success"
147 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
148 indexStateNoop indexState = "noop" // We didn't need to update index
149 indexStateEmpty indexState = "empty" // index is empty (empty repo)
150)
151
152// Server is the main functionality of zoekt-sourcegraph-indexserver. It
153// exists to conveniently use all the options passed in via func main.
154type Server struct {
155 logger sglog.Logger
156
157 Sourcegraph Sourcegraph
158 BatchSize int
159
160 // IndexDir is the index directory to use.
161 IndexDir string
162
163 // IndexConcurrency is the number of repositories we index at once.
164 IndexConcurrency int
165
166 // Interval is how often we sync with Sourcegraph.
167 Interval time.Duration
168
169 // CPUCount is the number of CPUs to use for indexing shards.
170 CPUCount int
171
172 queue Queue
173
174 // muIndexDir protects the index directory from concurrent access.
175 muIndexDir indexMutex
176
177 // If true, shard merging is enabled.
178 shardMerging bool
179
180 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
181 // use delta-builds for instead of normal builds
182 deltaBuildRepositoriesAllowList map[string]struct{}
183
184 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
185 // before attempting a delta build.
186 deltaShardNumberFallbackThreshold uint64
187
188 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
189 // we skip calculating symbols metadata for during builds
190 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
191
192 hostname string
193
194 mergeOpts mergeOpts
195
196 // timeout defines how long the index server waits before killing an indexing job.
197 timeout time.Duration
198}
199
200var debug = log.New(io.Discard, "", log.LstdFlags)
201
202// our index commands should output something every 100mb they process.
203//
204// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
205// time." famous last words. A client was indexing a monorepo with 42
206// cores... 5m was not enough.
207const noOutputTimeout = 30 * time.Minute
208
209func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
210 out := &synchronizedBuffer{}
211 cmd.Stdout = out
212 cmd.Stderr = out
213
214 tr.LazyPrintf("%s", cmd.Args)
215
216 defer func() {
217 if err != nil {
218 outS := out.String()
219 tr.LazyPrintf("failed: %v", err)
220 tr.LazyPrintf("output: %s", out)
221 tr.SetError()
222 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
223 }
224 }()
225
226 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
227
228 if err := cmd.Start(); err != nil {
229 return err
230 }
231
232 errC := make(chan error)
233 go func() {
234 errC <- cmd.Wait()
235 }()
236
237 // This channel is set after we have sent sigquit. It allows us to follow up
238 // with a sigkill if the process doesn't quit after sigquit.
239 kill := make(<-chan time.Time)
240
241 lastLen := 0
242 for {
243 select {
244 case <-time.After(noOutputTimeout):
245 // Periodically check if we have had output. If not kill the process.
246 if out.Len() != lastLen {
247 lastLen = out.Len()
248 log.Printf("still running %s", cmd.Args)
249 } else {
250 // Send quit (C-\) first so we get a stack dump.
251 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
252 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
253 log.Println("quit failed:", err)
254 }
255
256 // send sigkill if still running in 10s
257 kill = time.After(10 * time.Second)
258 }
259
260 case <-kill:
261 log.Printf("still running, killing %s", cmd.Args)
262 if err := cmd.Process.Kill(); err != nil {
263 log.Println("kill failed:", err)
264 }
265
266 case err := <-errC:
267 if err != nil {
268 return err
269 }
270
271 tr.LazyPrintf("success")
272 return nil
273 }
274 }
275}
276
277// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
278// monitor the buffer while it is being written to.
279type synchronizedBuffer struct {
280 mu sync.Mutex
281 b bytes.Buffer
282}
283
284func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
285 sb.mu.Lock()
286 defer sb.mu.Unlock()
287 return sb.b.Write(p)
288}
289
290func (sb *synchronizedBuffer) Len() int {
291 sb.mu.Lock()
292 defer sb.mu.Unlock()
293 return sb.b.Len()
294}
295
296func (sb *synchronizedBuffer) String() string {
297 sb.mu.Lock()
298 defer sb.mu.Unlock()
299 return sb.b.String()
300}
301
302// pauseFileName if present in IndexDir will stop index jobs from
303// running. This is to make it possible to experiment with the content of the
304// IndexDir without the indexserver writing to it.
305const pauseFileName = "PAUSE"
306
307// Run the sync loop. This blocks forever.
308func (s *Server) Run() {
309 removeIncompleteShards(s.IndexDir)
310
311 // Start a goroutine which updates the queue with commits to index.
312 go func() {
313 // We update the list of indexed repos every Interval. To speed up manual
314 // testing we also listen for SIGUSR1 to trigger updates.
315 //
316 // "pkill -SIGUSR1 zoekt-sourcegra"
317 for range jitterTicker(s.Interval, unix.SIGUSR1) {
318 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
319 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
320 continue
321 }
322
323 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
324 if err != nil {
325 log.Printf("error listing repos: %s", err)
326 continue
327 }
328
329 debug.Printf("updating index queue with %d repositories", len(repos.IDs))
330
331 // Stop indexing repos we don't need to track anymore
332 removed := s.queue.MaybeRemoveMissing(repos.IDs)
333 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
334 if len(removed) > 0 {
335 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
336 }
337
338 cleanupDone := make(chan struct{})
339 go func() {
340 defer close(cleanupDone)
341 s.muIndexDir.Global(func() {
342 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
343 })
344 }()
345
346 repos.IterateIndexOptions(s.queue.AddOrUpdate)
347
348 // IterateIndexOptions will only iterate over repositories that have
349 // changed since we last called list. However, we want to add all IDs
350 // back onto the queue just to check that what is on disk is still
351 // correct. This will use the last IndexOptions we stored in the
352 // queue. The repositories not on the queue (missing) need a forced
353 // fetch of IndexOptions.
354 missing := s.queue.Bump(repos.IDs)
355 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
356
357 setCompoundShardCounter(s.IndexDir)
358
359 <-cleanupDone
360 }
361 }()
362
363 go func() {
364 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
365 if s.shardMerging {
366 s.vacuum()
367 }
368 }
369 }()
370
371 go func() {
372 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
373 if s.shardMerging {
374 s.doMerge()
375 }
376 }
377 }()
378
379 for i := 0; i < s.IndexConcurrency; i++ {
380 go s.processQueue()
381 }
382
383 // block forever
384 select {}
385}
386
387// formatList returns a comma-separated list of the first min(len(v), m) items.
388func formatListUint32(v []uint32, m int) string {
389 if len(v) < m {
390 m = len(v)
391 }
392
393 sb := strings.Builder{}
394 for i := 0; i < m; i++ {
395 fmt.Fprintf(&sb, "%d, ", v[i])
396 }
397
398 if len(v) > m {
399 sb.WriteString("...")
400 }
401
402 return strings.TrimRight(sb.String(), ", ")
403}
404
405func (s *Server) processQueue() {
406 for {
407 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
408 time.Sleep(time.Second)
409 continue
410 }
411
412 item, ok := s.queue.Pop()
413 if !ok {
414 time.Sleep(time.Second)
415 continue
416 }
417
418 opts := item.Opts
419 args := s.indexArgs(opts)
420
421 ran := s.muIndexDir.With(opts.Name, func() {
422 // only record time taken once we hold the lock. This avoids us
423 // recording time taken while merging/cleanup runs.
424 start := time.Now()
425
426 state, err := s.Index(args)
427
428 elapsed := time.Since(start)
429 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
430
431 indexDelay := time.Since(item.DateAddedToQueue)
432 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds())
433
434 if err != nil {
435 log.Printf("error indexing %s: %s", args.String(), err)
436 }
437
438 switch state {
439 case indexStateSuccess:
440 var branches []string
441 for _, b := range args.Branches {
442 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
443 }
444 s.logger.Info("updated index",
445 sglog.String("repo", args.Name),
446 sglog.Uint32("id", args.RepoID),
447 sglog.Strings("branches", branches),
448 sglog.Duration("duration", elapsed),
449 sglog.Duration("index_delay", indexDelay),
450 )
451 case indexStateSuccessMeta:
452 log.Printf("updated meta %s in %v", args.String(), elapsed)
453 }
454 s.queue.SetIndexed(opts, state)
455 })
456
457 if !ran {
458 // Someone else is processing the repository. We can just skip this job
459 // since the repository will be added back to the queue and we will
460 // converge to the correct behaviour.
461 debug.Printf("index job for repository already running: %s", args)
462 continue
463 }
464 }
465}
466
467// repoNameForMetric returns a normalized version of the given repository name that is
468// suitable for use with Prometheus metrics.
469func repoNameForMetric(repo string) string {
470 // Check to see if we want to be able to capture separate indexing metrics for this repository.
471 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
472 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
473 return repo
474 }
475
476 return ""
477}
478
479func batched(slice []uint32, size int) <-chan []uint32 {
480 c := make(chan []uint32)
481 go func() {
482 for len(slice) > 0 {
483 if size > len(slice) {
484 size = len(slice)
485 }
486 c <- slice[:size]
487 slice = slice[size:]
488 }
489 close(c)
490 }()
491 return c
492}
493
494// jitterTicker returns a ticker which ticks with a jitter. Each tick is
495// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
496//
497// sig is a list of signals which also cause the ticker to fire. This is a
498// convenience to allow manually triggering of the ticker.
499func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
500 ticker := make(chan struct{})
501
502 go func() {
503 for {
504 ticker <- struct{}{}
505 ns := int64(d)
506 jitter := rand.Int63n(ns)
507 time.Sleep(time.Duration(ns/2 + jitter))
508 }
509 }()
510
511 go func() {
512 if len(sig) == 0 {
513 return
514 }
515
516 c := make(chan os.Signal, 1)
517 signal.Notify(c, sig...)
518 for range c {
519 ticker <- struct{}{}
520 }
521 }()
522
523 return ticker
524}
525
526// Index starts an index job for repo name at commit.
527func (s *Server) Index(args *indexArgs) (state indexState, err error) {
528 tr := trace.New("index", args.Name)
529
530 defer func() {
531 if err != nil {
532 tr.SetError()
533 tr.LazyPrintf("error: %v", err)
534 state = indexStateFail
535 metricFailingTotal.Inc()
536 }
537 tr.LazyPrintf("state: %s", state)
538 tr.Finish()
539 }()
540
541 tr.LazyPrintf("branches: %v", args.Branches)
542
543 if len(args.Branches) == 0 {
544 return indexStateEmpty, createEmptyShard(args)
545 }
546
547 repositoryName := args.Name
548 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
549 tr.LazyPrintf("marking this repository for delta build")
550 args.UseDelta = true
551 }
552
553 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
554
555 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
556 tr.LazyPrintf("skipping symbols calculation")
557 args.Symbols = false
558 }
559
560 reason := "forced"
561
562 if args.Incremental {
563 bo := args.BuildOptions()
564 bo.SetDefaults()
565 incrementalState, fn := bo.IndexState()
566 reason = string(incrementalState)
567 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
568
569 switch incrementalState {
570 case build.IndexStateEqual:
571 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn)
572 return indexStateNoop, nil
573
574 case build.IndexStateMeta:
575 log.Printf("updating index.meta %s", args.String())
576
577 if err := mergeMeta(bo); err != nil {
578 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
579 } else {
580 return indexStateSuccessMeta, nil
581 }
582
583 case build.IndexStateCorrupt:
584 log.Printf("falling back to full update: corrupt index: %s", args.String())
585 }
586 }
587
588 log.Printf("updating index %s reason=%s", args.String(), reason)
589
590 metricIndexingTotal.Inc()
591 c := gitIndexConfig{
592 runCmd: func(cmd *exec.Cmd) error {
593 return s.loggedRun(tr, cmd)
594 },
595
596 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
597 return args.BuildOptions().FindRepositoryMetadata()
598 },
599 timeout: s.timeout,
600 }
601
602 err = gitIndex(c, args, s.Sourcegraph, s.logger)
603 if err != nil {
604 return indexStateFail, err
605 }
606
607 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
608 s.logger.Error("failed to update index status",
609 sglog.String("repo", args.Name),
610 sglog.Uint32("id", args.RepoID),
611 sglogBranches("branches", args.Branches),
612 sglog.Error(err),
613 )
614 }
615
616 return indexStateSuccess, nil
617}
618
619// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
620// it can update the zoekt_repos table.
621func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
622 // We need to read from disk for IndexTime.
623 _, metadata, ok, err := c.findRepositoryMetadata(args)
624 if err != nil {
625 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
626 }
627 if !ok {
628 return errors.New("failed to find metadata for new/updated index")
629 }
630
631 status := []indexStatus{{
632 RepoID: args.RepoID,
633 Branches: args.Branches,
634 IndexTimeUnix: metadata.IndexTime.Unix(),
635 }}
636 if err := sg.UpdateIndexStatus(status); err != nil {
637 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
638 }
639
640 return nil
641}
642
643func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
644 ss := make([]string, len(branches))
645 for i, b := range branches {
646 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
647 }
648 return sglog.Strings(key, ss)
649}
650
651func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
652 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
653 return &indexArgs{
654 IndexOptions: opts,
655 IndexDir: s.IndexDir,
656 Parallelism: parallelism,
657 Incremental: true,
658
659 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
660 FileLimit: 1 << 20,
661
662 ShardMerging: s.shardMerging,
663 }
664}
665
666// parallelism consults both the server flags and index options to determine the number
667// of shards to index in parallel. If the CPUCount index option is provided, it always
668// overrides the server flag.
669func (s *Server) parallelism(opts IndexOptions, maxProcs int) int {
670 var parallelism int
671 if opts.ShardConcurrency > 0 {
672 parallelism = int(opts.ShardConcurrency)
673 } else {
674 parallelism = s.CPUCount
675 }
676
677 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs
678 if parallelism > 4*maxProcs {
679 parallelism = 4 * maxProcs
680 }
681
682 // If index concurrency is set, then divide the parallelism by the number of
683 // repos we're indexing in parallel
684 if s.IndexConcurrency > 1 {
685 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency)))
686 }
687
688 return parallelism
689}
690
691func createEmptyShard(args *indexArgs) error {
692 bo := args.BuildOptions()
693 bo.SetDefaults()
694 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
695
696 if args.Incremental && bo.IncrementalSkipIndexing() {
697 return nil
698 }
699
700 builder, err := build.NewBuilder(*bo)
701 if err != nil {
702 return err
703 }
704 return builder.Finish()
705}
706
707// addDebugHandlers adds handlers specific to indexserver.
708func (s *Server) addDebugHandlers(mux *http.ServeMux) {
709 // Sourcegraph's site admin view requires indexserver to serve it's admin view
710 // on "/".
711 mux.Handle("/", http.HandlerFunc(s.handleRoot))
712
713 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
714 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
715 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
716 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
717 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
718 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
719}
720
721func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
722 if r.Method != "GET" {
723 w.Header().Set("Allow", "GET")
724 w.WriteHeader(http.StatusMethodNotAllowed)
725 return
726 }
727
728 response := struct {
729 Hostname string
730 }{
731 Hostname: s.hostname,
732 }
733
734 b, err := json.Marshal(response)
735 if err != nil {
736 http.Error(w, err.Error(), http.StatusInternalServerError)
737 return
738 }
739
740 w.Header().Set("Content-Type", "application/json; charset=utf-8")
741 w.Write(b)
742}
743
744var rootTmpl = template.Must(template.New("name").Parse(`
745<html>
746 <body>
747 <a href="debug">Debug</a><br />
748 <a href="debug/requests">Traces</a><br />
749 {{.IndexMsg}}<br />
750 <br />
751 <h3>Reindex</h3>
752 {{if .Repos}}
753 <a href="?show_repos=false">hide repos</a><br />
754 <table style="margin-top: 20px">
755 <th style="text-align:left">Name</th>
756 <th style="text-align:left">ID</th>
757 {{range .Repos}}
758 <tr>
759 <td>{{.Name}}</td>
760 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
761 </tr>
762 {{end}}
763 </table>
764 {{else}}
765 <a href="?show_repos=true">show repos</a><br />
766 {{end}}
767 </body>
768</html>
769`))
770
771func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
772 if r.Method != "GET" {
773 w.Header().Set("Allow", "GET")
774 w.WriteHeader(http.StatusMethodNotAllowed)
775 return
776 }
777
778 values := r.URL.Query()
779
780 // ?id=
781 indexMsg := ""
782 if v := values.Get("id"); v != "" {
783 id, err := strconv.Atoi(v)
784 if err != nil {
785 http.Error(w, err.Error(), http.StatusBadRequest)
786 return
787 }
788 indexMsg, _ = s.forceIndex(uint32(id))
789 }
790
791 // ?show_repos=
792 showRepos := false
793 if v := values.Get("show_repos"); v != "" {
794 showRepos, _ = strconv.ParseBool(v)
795 }
796
797 type Repo struct {
798 ID uint32
799 Name string
800 }
801 var data struct {
802 Repos []Repo
803 IndexMsg string
804 }
805
806 data.IndexMsg = indexMsg
807
808 if showRepos {
809 s.queue.Iterate(func(opts *IndexOptions) {
810 data.Repos = append(data.Repos, Repo{
811 ID: opts.RepoID,
812 Name: opts.Name,
813 })
814 })
815 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
816 }
817
818 _ = rootTmpl.Execute(w, data)
819}
820
821// handleReindex triggers a reindex asynocronously. If a reindex was triggered
822// the request returns with status 202. The caller can infer the new state of
823// the index by calling List.
824func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
825 if r.Method != http.MethodPost {
826 w.Header().Set("Allow", http.MethodPost)
827 w.WriteHeader(http.StatusMethodNotAllowed)
828 return
829 }
830
831 err := r.ParseForm()
832 if err != nil {
833 http.Error(w, err.Error(), http.StatusBadRequest)
834 return
835 }
836
837 id, err := strconv.Atoi(r.Form.Get("repo"))
838 if err != nil {
839 http.Error(w, err.Error(), http.StatusBadRequest)
840 return
841 }
842
843 go func() { s.forceIndex(uint32(id)) }()
844
845 // 202 Accepted
846 w.WriteHeader(http.StatusAccepted)
847}
848
849func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
850 withIndexed := true
851 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
852 withIndexed = b
853 }
854
855 var indexed []uint32
856 if withIndexed {
857 indexed = listIndexed(s.IndexDir)
858 }
859
860 repos, err := s.Sourcegraph.List(r.Context(), indexed)
861 if err != nil {
862 http.Error(w, err.Error(), http.StatusInternalServerError)
863 return
864 }
865
866 bw := bytes.Buffer{}
867
868 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
869
870 _, err = fmt.Fprintf(tw, "ID\tName\n")
871 if err != nil {
872 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
873 return
874 }
875
876 s.queue.mu.Lock()
877 name := ""
878 for _, id := range repos.IDs {
879 if item := s.queue.get(id); item != nil {
880 name = item.opts.Name
881 } else {
882 name = ""
883 }
884 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
885 if err != nil {
886 debug.Printf("handleDebugList: %s\n", err.Error())
887 }
888 }
889 s.queue.mu.Unlock()
890
891 if err != nil {
892 http.Error(w, err.Error(), http.StatusInternalServerError)
893 return
894 }
895
896 err = tw.Flush()
897 if err != nil {
898 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
899 return
900 }
901
902 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
903
904 if _, err := io.Copy(w, &bw); err != nil {
905 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
906 return
907 }
908}
909
910// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
911// can run this command during periods of low usage (evenings, weekends) to
912// trigger an initial merge run. In the steady-state, merges happen rarely, even
913// on busy instances, and users can rely on automatic merging instead.
914func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
915 // A merge operation can take very long, depending on the number merges and the
916 // target size of the compound shards. We run the merge in the background and
917 // return immediately to the user.
918 //
919 // We track the status of the merge with metricShardMergingRunning.
920 go func() {
921 s.doMerge()
922 }()
923 _, _ = w.Write([]byte("merging enqueued\n"))
924}
925
926func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
927 indexed := listIndexed(s.IndexDir)
928
929 bw := bytes.Buffer{}
930
931 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
932
933 _, err := fmt.Fprintf(tw, "ID\tName\n")
934 if err != nil {
935 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
936 return
937 }
938
939 s.queue.mu.Lock()
940 name := ""
941 for _, id := range indexed {
942 if item := s.queue.get(id); item != nil {
943 name = item.opts.Name
944 } else {
945 name = ""
946 }
947 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
948 if err != nil {
949 debug.Printf("handleDebugIndexed: %s\n", err.Error())
950 }
951 }
952 s.queue.mu.Unlock()
953
954 if err != nil {
955 http.Error(w, err.Error(), http.StatusInternalServerError)
956 return
957 }
958
959 err = tw.Flush()
960 if err != nil {
961 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
962 return
963 }
964
965 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
966
967 if _, err := io.Copy(w, &bw); err != nil {
968 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
969 return
970 }
971}
972
973// forceIndex will run the index job for repo name now. It will return always
974// return a string explaining what it did, even if it failed.
975func (s *Server) forceIndex(id uint32) (string, error) {
976 var opts IndexOptions
977 var err error
978 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
979 opts = o
980 }, func(_ uint32, e error) {
981 err = e
982 }, id)
983 if err != nil {
984 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
985 }
986
987 args := s.indexArgs(opts)
988 args.Incremental = false // force re-index
989
990 var state indexState
991 ran := s.muIndexDir.With(opts.Name, func() {
992 state, err = s.Index(args)
993 })
994 if !ran {
995 return fmt.Sprintf("index job for repository already running: %s", args), nil
996 }
997 if err != nil {
998 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
999 }
1000 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
1001}
1002
1003func listIndexed(indexDir string) []uint32 {
1004 index := getShards(indexDir)
1005 metricNumIndexed.Set(float64(len(index)))
1006 repoIDs := make([]uint32, 0, len(index))
1007 for id := range index {
1008 repoIDs = append(repoIDs, id)
1009 }
1010 sort.Slice(repoIDs, func(i, j int) bool {
1011 return repoIDs[i] < repoIDs[j]
1012 })
1013 return repoIDs
1014}
1015
1016// setupTmpDir sets up a temporary directory on the same volume as the
1017// indexes.
1018//
1019// If main is true we will delete older temp directories left around. main is
1020// false when this is a debug command.
1021func setupTmpDir(logger sglog.Logger, main bool, index string) error {
1022 // change the target tmp directory depending on if it's our main daemon or a
1023 // debug sub command.
1024 dir := ".indexserver.debug.tmp"
1025 if main {
1026 dir = ".indexserver.tmp"
1027 }
1028
1029 tmpRoot := filepath.Join(index, dir)
1030
1031 if main {
1032 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot))
1033 err := os.RemoveAll(tmpRoot)
1034 if err != nil {
1035 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err))
1036 }
1037 }
1038
1039 if err := os.MkdirAll(tmpRoot, 0o755); err != nil {
1040 return err
1041 }
1042
1043 return os.Setenv("TMPDIR", tmpRoot)
1044}
1045
1046func printMetaData(fn string) error {
1047 repo, indexMeta, err := zoekt.ReadMetadataPath(fn)
1048 if err != nil {
1049 return err
1050 }
1051
1052 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1053 if err != nil {
1054 return err
1055 }
1056
1057 err = json.NewEncoder(os.Stdout).Encode(repo)
1058 if err != nil {
1059 return err
1060 }
1061 return nil
1062}
1063
1064func printShardStats(fn string) error {
1065 f, err := os.Open(fn)
1066 if err != nil {
1067 return err
1068 }
1069
1070 iFile, err := zoekt.NewIndexFile(f)
1071 if err != nil {
1072 return err
1073 }
1074
1075 return zoekt.PrintNgramStats(iFile)
1076}
1077
1078func srcLogLevelIsDebug() bool {
1079 lvl := os.Getenv(sglog.EnvLogLevel)
1080 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1081}
1082
1083func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1084 v := os.Getenv(k)
1085 if v == "" {
1086 return defaultVal
1087 }
1088 b, err := strconv.ParseBool(v)
1089 if err != nil {
1090 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1091 }
1092 return b
1093}
1094
1095func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1096 v := os.Getenv(k)
1097 if v == "" {
1098 return defaultVal
1099 }
1100 i, err := strconv.ParseInt(v, 10, 64)
1101 if err != nil {
1102 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1103 }
1104 return i
1105}
1106
1107func getEnvWithDefaultInt(k string, defaultVal int) int {
1108 v := os.Getenv(k)
1109 if v == "" {
1110 return defaultVal
1111 }
1112 i, err := strconv.Atoi(k)
1113 if err != nil {
1114 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1115 }
1116 return i
1117}
1118
1119func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1120 v := os.Getenv(k)
1121 if v == "" {
1122 return defaultVal
1123 }
1124 i, err := strconv.ParseUint(v, 10, 64)
1125 if err != nil {
1126 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1127 }
1128 return i
1129}
1130
1131func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1132 v := os.Getenv(k)
1133 if v == "" {
1134 return defaultVal
1135 }
1136 f, err := strconv.ParseFloat(v, 64)
1137 if err != nil {
1138 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1139 }
1140 return f
1141}
1142
1143func getEnvWithDefaultString(k string, defaultVal string) string {
1144 v := os.Getenv(k)
1145 if v == "" {
1146 return defaultVal
1147 }
1148 return v
1149}
1150
1151func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1152 v := os.Getenv(k)
1153 if v == "" {
1154 return defaultVal
1155 }
1156
1157 d, err := time.ParseDuration(v)
1158 if err != nil {
1159 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1160 }
1161 return d
1162}
1163
1164func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1165 set := map[string]struct{}{}
1166 for _, v := range strings.Split(os.Getenv(k), ",") {
1167 v = strings.TrimSpace(v)
1168 if v != "" {
1169 set[v] = struct{}{}
1170 }
1171 }
1172 return set
1173}
1174
1175func joinStringSet(set map[string]struct{}, sep string) string {
1176 var xs []string
1177 for x := range set {
1178 xs = append(xs, x)
1179 }
1180
1181 return strings.Join(xs, sep)
1182}
1183
1184func setCompoundShardCounter(indexDir string) {
1185 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt"))
1186 if err != nil {
1187 log.Printf("setCompoundShardCounter: %s\n", err)
1188 return
1189 }
1190 metricNumberCompoundShards.Set(float64(len(fns)))
1191}
1192
1193func rootCmd() *ffcli.Command {
1194 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1195 conf := rootConfig{
1196 Main: true,
1197 }
1198 conf.registerRootFlags(rootFs)
1199
1200 return &ffcli.Command{
1201 FlagSet: rootFs,
1202 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1203 Subcommands: []*ffcli.Command{debugCmd()},
1204 Exec: func(ctx context.Context, args []string) error {
1205 return startServer(conf)
1206 },
1207 }
1208}
1209
1210type rootConfig struct {
1211 // Main is true if this rootConfig is for our main long running command (the
1212 // indexserver). Debug commands should not set this value. This is used to
1213 // determine if we need to run tmpfriend.
1214 Main bool
1215
1216 root string
1217 interval time.Duration
1218 index string
1219 indexConcurrency int64
1220 listen string
1221 hostname string
1222 cpuFraction float64
1223 blockProfileRate int
1224
1225 // config values related to shard merging
1226 disableShardMerging bool
1227 vacuumInterval time.Duration
1228 mergeInterval time.Duration
1229 targetSize int64
1230 minSize int64
1231 minAgeDays int
1232
1233 // config values related to backoff indexing repos with one or more consecutive failures
1234 backoffDuration time.Duration
1235 maxBackoffDuration time.Duration
1236}
1237
1238func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1239 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1240 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1241 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
1242 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use")
1243 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1244 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1245 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1246 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate")
1247 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1248 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1249
1250 // flags related to shard merging
1251 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging")
1252 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1253 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1254 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB")
1255 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB")
1256 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1257}
1258
1259func startServer(conf rootConfig) error {
1260 s, err := newServer(conf)
1261 if err != nil {
1262 return err
1263 }
1264
1265 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate)
1266 setCompoundShardCounter(s.IndexDir)
1267
1268 if conf.listen != "" {
1269
1270 mux := http.NewServeMux()
1271 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1272 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1273 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1274 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1275 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1276 }...)
1277 s.addDebugHandlers(mux)
1278
1279 go func() {
1280 debug.Printf("serving HTTP on %s", conf.listen)
1281 log.Fatal(http.ListenAndServe(conf.listen, mux))
1282 }()
1283
1284 // Serve mux on a unix domain socket on a best-effort-basis so that
1285 // webserver can call the endpoints via the shared filesystem.
1286 //
1287 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1288 // on the socket due to permission errors. See
1289 // https://github.com/docker/for-mac/issues/6239
1290 go func() {
1291 serveHTTPOverSocket := func() error {
1292 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1293 // We cannot bind a socket to an existing pathname.
1294 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1295 return fmt.Errorf("error removing socket file: %s", socket)
1296 }
1297 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1298 // unixpacket).
1299 l, err := net.Listen("unix", socket)
1300 if err != nil {
1301 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1302 }
1303 // Indexserver (root) and webserver (Sourcegraph) run with
1304 // different users. Per default, the socket is created with
1305 // permission 755 (root root), which doesn't let webserver write to
1306 // it.
1307 //
1308 // See https://github.com/golang/go/issues/11822 for more context.
1309 if err := os.Chmod(socket, 0o777); err != nil {
1310 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1311 }
1312 debug.Printf("serving HTTP on %s", socket)
1313 return http.Serve(l, mux)
1314 }
1315 debug.Print(serveHTTPOverSocket())
1316 }()
1317 }
1318
1319 oc := &ownerChecker{
1320 Path: filepath.Join(conf.index, "owner.txt"),
1321 Hostname: conf.hostname,
1322 }
1323 go oc.Run()
1324
1325 logger := sglog.Scoped("metricsRegistration")
1326 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1327
1328 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1329 prometheus.DefaultRegisterer.MustRegister(c)
1330
1331 s.Run()
1332 return nil
1333}
1334
1335func newServer(conf rootConfig) (*Server, error) {
1336 logger := sglog.Scoped("server")
1337
1338 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1339 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1340 }
1341 if conf.index == "" {
1342 return nil, fmt.Errorf("must set -index")
1343 }
1344 if conf.root == "" {
1345 return nil, fmt.Errorf("must set -sourcegraph_url")
1346 }
1347 rootURL, err := url.Parse(conf.root)
1348 if err != nil {
1349 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1350 }
1351
1352 rootURL = addDefaultPort(rootURL)
1353
1354 // Tune GOMAXPROCS to match Linux container CPU quota.
1355 _, _ = maxprocs.Set()
1356
1357 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler.
1358 // The block profiler is disabled by default and should be enabled with care in production
1359 runtime.SetBlockProfileRate(conf.blockProfileRate)
1360
1361 // Automatically prepend our own path at the front, to minimize
1362 // required configuration.
1363 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1364 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1365 }
1366
1367 if _, err := os.Stat(conf.index); err != nil {
1368 if err := os.MkdirAll(conf.index, 0o755); err != nil {
1369 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1370 }
1371 }
1372
1373 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil {
1374 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1375 }
1376
1377 if srcLogLevelIsDebug() {
1378 debug = log.New(os.Stderr, "", log.LstdFlags)
1379 }
1380
1381 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1382 if len(reposWithSeparateIndexingMetrics) > 0 {
1383 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1384 }
1385
1386 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1387 if len(deltaBuildRepositoriesAllowList) > 0 {
1388 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1389 }
1390
1391 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1392 if deltaShardNumberFallbackThreshold > 0 {
1393 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1394 } else {
1395 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1396 }
1397
1398 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1399 if len(reposShouldSkipSymbolsCalculation) > 0 {
1400 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1401 }
1402
1403 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout)
1404 if indexingTimeout != defaultIndexingTimeout {
1405 debug.Printf("using configured indexing timeout: %s", indexingTimeout)
1406 }
1407
1408 var sg Sourcegraph
1409 if rootURL.IsAbs() {
1410 var batchSize int
1411 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1412 batchSize, err = strconv.Atoi(v)
1413 if err != nil {
1414 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1415 }
1416 }
1417
1418 opts := []SourcegraphClientOption{
1419 WithBatchSize(batchSize),
1420 }
1421
1422 logger := sglog.Scoped("zoektConfigurationGRPCClient")
1423 client, err := dialGRPCClient(rootURL.Host, logger)
1424 if err != nil {
1425 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1426 }
1427
1428 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...)
1429
1430 } else {
1431 sg = sourcegraphFake{
1432 RootDir: rootURL.String(),
1433 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1434 }
1435 }
1436
1437 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
1438 if cpuCount < 1 {
1439 cpuCount = 1
1440 }
1441
1442 if conf.indexConcurrency < 1 {
1443 conf.indexConcurrency = 1
1444 } else if conf.indexConcurrency > int64(cpuCount) {
1445 conf.indexConcurrency = int64(cpuCount)
1446 }
1447
1448 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1449
1450 return &Server{
1451 logger: logger,
1452 Sourcegraph: sg,
1453 IndexDir: conf.index,
1454 IndexConcurrency: int(conf.indexConcurrency),
1455 Interval: conf.interval,
1456 CPUCount: cpuCount,
1457 queue: *q,
1458 shardMerging: !conf.disableShardMerging,
1459 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1460 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1461 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1462 hostname: conf.hostname,
1463 mergeOpts: mergeOpts{
1464 vacuumInterval: conf.vacuumInterval,
1465 mergeInterval: conf.mergeInterval,
1466 targetSizeBytes: conf.targetSize * 1024 * 1024,
1467 minSizeBytes: conf.minSize * 1024 * 1024,
1468 minAgeDays: conf.minAgeDays,
1469 },
1470 timeout: indexingTimeout,
1471 }, err
1472}
1473
1474// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1475// for the indexed-search-configuration gRPC service.
1476//
1477// The default backoff strategy is modeled after the default settings used by
1478// retryablehttp.DefaultClient.
1479//
1480// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1481// - Unavailable
1482// - Aborted
1483//
1484//go:embed default_grpc_service_configuration.json
1485var defaultGRPCServiceConfigurationJSON string
1486
1487func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1488 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1489 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1490 return invoker(ctx, method, req, reply, cc, opts...)
1491 }
1492}
1493
1494func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1495 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1496 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1497 return streamer(ctx, desc, cc, method, opts...)
1498 }
1499}
1500
1501// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1502// This can be overridden by providing custom Server/Dial options.
1503const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1504
1505func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) {
1506 metrics := mustGetClientMetrics()
1507
1508 // If the service seems to be unavailable, this
1509 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1
1510 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s])
1511 retryOpts := []retry.CallOption{
1512 retry.WithMax(5),
1513 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)),
1514 retry.WithCodes(codes.Unavailable),
1515 }
1516
1517 opts := []grpc.DialOption{
1518 grpc.WithTransportCredentials(insecure.NewCredentials()),
1519 grpc.WithChainStreamInterceptor(
1520 metrics.StreamClientInterceptor(),
1521 messagesize.StreamClientInterceptor,
1522 internalActorStreamInterceptor(),
1523 internalerrs.LoggingStreamClientInterceptor(logger),
1524 internalerrs.PrometheusStreamClientInterceptor,
1525 retry.StreamClientInterceptor(retryOpts...),
1526 ),
1527 grpc.WithChainUnaryInterceptor(
1528 metrics.UnaryClientInterceptor(),
1529 messagesize.UnaryClientInterceptor,
1530 internalActorUnaryInterceptor(),
1531 internalerrs.LoggingUnaryClientInterceptor(logger),
1532 internalerrs.PrometheusUnaryClientInterceptor,
1533 retry.UnaryClientInterceptor(retryOpts...),
1534 ),
1535 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1536 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1537 }
1538
1539 opts = append(opts, additionalOpts...)
1540
1541 // Ensure that the message size options are set last, so they override any other
1542 // client-specific options that tweak the message size.
1543 //
1544 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1545 // take precedence over everything else with a uniform size setting that's easy to reason about.
1546 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1547
1548 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1549 // This is done lazily, so we can provide the client to use regardless of
1550 // whether we enabled gRPC or not initially.
1551 cc, err := grpc.Dial(addr, opts...)
1552 if err != nil {
1553 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1554 }
1555
1556 client := proto.NewZoektConfigurationServiceClient(cc)
1557 return client, nil
1558}
1559
1560// mustGetClientMetrics returns a singleton instance of the client metrics
1561// that are shared across all gRPC clients that this process creates.
1562//
1563// This function panics if the metrics cannot be registered with the default
1564// Prometheus registry.
1565func mustGetClientMetrics() *grpcprom.ClientMetrics {
1566 clientMetricsOnce.Do(func() {
1567 clientMetrics = grpcprom.NewClientMetrics(
1568 grpcprom.WithClientCounterOptions(),
1569 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
1570 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
1571 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
1572 )
1573
1574 prometheus.DefaultRegisterer.MustRegister(clientMetrics)
1575 })
1576
1577 return clientMetrics
1578}
1579
1580// addDefaultPort adds a default port to a URL if one is not specified.
1581//
1582// If the URL scheme is "http" and no port is specified, "80" is used.
1583// If the scheme is "https", "443" is used.
1584//
1585// The original URL is not mutated. A copy is modified and returned.
1586func addDefaultPort(original *url.URL) *url.URL {
1587 if original == nil {
1588 return nil // don't panic
1589 }
1590
1591 if !original.IsAbs() {
1592 return original // don't do anything if the URL doesn't look like a remote URL
1593 }
1594
1595 if original.Scheme == "http" && original.Port() == "" {
1596 u := cloneURL(original)
1597 u.Host = net.JoinHostPort(u.Host, "80")
1598 return u
1599 }
1600
1601 if original.Scheme == "https" && original.Port() == "" {
1602 u := cloneURL(original)
1603 u.Host = net.JoinHostPort(u.Host, "443")
1604 return u
1605 }
1606
1607 return original
1608}
1609
1610// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1611// This is copied from net/http/clone.go
1612func cloneURL(u *url.URL) *url.URL {
1613 if u == nil {
1614 return nil
1615 }
1616 u2 := new(url.URL)
1617 *u2 = *u
1618 if u.User != nil {
1619 u2.User = new(url.Userinfo)
1620 *u2.User = *u.User
1621 }
1622 return u2
1623}
1624
1625func main() {
1626 liblog := sglog.Init(sglog.Resource{
1627 Name: "zoekt-indexserver",
1628 Version: zoekt.Version,
1629 InstanceID: zoekt.HostnameBestEffort(),
1630 })
1631 defer liblog.Sync()
1632
1633 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1634 log.Fatal(err)
1635 }
1636}
1637
1638// getBoolFromEnvironmentVariables returns the boolean defined by the first environment
1639// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set.
1640//
1641// An error is returned of the provided environment variables fails to parse as a boolean.
1642func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) {
1643 for _, envVar := range envVarNames {
1644 v := os.Getenv(envVar)
1645 if v == "" {
1646 continue
1647 }
1648
1649 b, err := strconv.ParseBool(v)
1650 if err != nil {
1651 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err)
1652 }
1653
1654 return b, nil
1655 }
1656
1657 return defaultBool, nil
1658}