fork of https://github.com/sourcegraph/zoekt
1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled
2// repositories on sourcegraph
3package main
4
5import (
6 "bytes"
7 "context"
8 _ "embed"
9 "encoding/json"
10 "errors"
11 "flag"
12 "fmt"
13 "html/template"
14 "io"
15 "io/fs"
16 "log"
17 "math"
18 "math/rand"
19 "net"
20 "net/http"
21 "net/url"
22 "os"
23 "os/exec"
24 "os/signal"
25 "path/filepath"
26 "runtime"
27 "sort"
28 "strconv"
29 "strings"
30 "sync"
31 "text/tabwriter"
32 "time"
33
34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
36 "github.com/peterbourgon/ff/v3/ffcli"
37 "github.com/prometheus/client_golang/prometheus"
38 "github.com/prometheus/client_golang/prometheus/promauto"
39 sglog "github.com/sourcegraph/log"
40 "github.com/sourcegraph/mountinfo"
41 "go.uber.org/automaxprocs/maxprocs"
42 "golang.org/x/net/trace"
43 "golang.org/x/sys/unix"
44 "google.golang.org/grpc"
45 "google.golang.org/grpc/codes"
46 "google.golang.org/grpc/credentials/insecure"
47 "google.golang.org/grpc/metadata"
48
49 "github.com/sourcegraph/zoekt"
50 "github.com/sourcegraph/zoekt/build"
51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
52 "github.com/sourcegraph/zoekt/debugserver"
53 "github.com/sourcegraph/zoekt/grpc/internalerrs"
54 "github.com/sourcegraph/zoekt/grpc/messagesize"
55 "github.com/sourcegraph/zoekt/internal/profiler"
56)
57
58var (
59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
60 Name: "resolve_revisions_seconds",
61 Help: "A histogram of latencies for resolving all repository revisions.",
62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
63 })
64
65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
66 Name: "resolve_revision_seconds",
67 Help: "A histogram of latencies for resolving a repository revision.",
68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
69 }, []string{"success"}) // success=true|false
70
71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
72 Name: "get_index_options_total",
73 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
74 })
75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
76 Name: "get_index_options_error_total",
77 Help: "The total number of times we failed to get index options for a repository.",
78 })
79
80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
81 Name: "index_repo_seconds",
82 Help: "A histogram of latencies for indexing a repository.",
83 Buckets: prometheus.ExponentialBucketsRange(
84 (100 * time.Millisecond).Seconds(),
85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
86 20),
87 }, []string{
88 "state", // state is an indexState
89 "name", // name of the repository that was indexed
90 })
91
92 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{
93 Name: "index_indexing_delay_seconds",
94 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.",
95 Buckets: prometheus.ExponentialBuckets(60, 2, 12), // 1m -> ~3 days
96 }, []string{
97 "state", // state is an indexState
98 "name", // the name of the repository that was indexed
99 })
100
101 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
102 Name: "index_fetch_seconds",
103 Help: "A histogram of latencies for fetching a repository.",
104 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
105 }, []string{
106 "success", // true|false
107 "name", // the name of the repository that the commits were fetched from
108 })
109
110 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
111 Name: "index_incremental_index_state",
112 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
113 }, []string{"state"}) // state is build.IndexState
114
115 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
116 Name: "index_num_indexed",
117 Help: "Number of indexed repos by code host",
118 })
119
120 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
121 Name: "index_failing_total",
122 Help: "Counts failures to index (indexing activity, should be used with rate())",
123 })
124
125 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
126 Name: "index_indexing_total",
127 Help: "Counts indexings (indexing activity, should be used with rate())",
128 })
129
130 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
131 Name: "index_num_stopped_tracking_total",
132 Help: "Counts the number of repos we stopped tracking.",
133 })
134
135 clientMetricsOnce sync.Once
136 clientMetrics *grpcprom.ClientMetrics
137)
138
139// set of repositories that we want to capture separate indexing metrics for
140var reposWithSeparateIndexingMetrics = make(map[string]struct{})
141
142type indexState string
143
144const (
145 indexStateFail indexState = "fail"
146 indexStateSuccess indexState = "success"
147 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
148 indexStateNoop indexState = "noop" // We didn't need to update index
149 indexStateEmpty indexState = "empty" // index is empty (empty repo)
150)
151
152// Server is the main functionality of zoekt-sourcegraph-indexserver. It
153// exists to conveniently use all the options passed in via func main.
154type Server struct {
155 logger sglog.Logger
156
157 Sourcegraph Sourcegraph
158 BatchSize int
159
160 // IndexDir is the index directory to use.
161 IndexDir string
162
163 // IndexConcurrency is the number of repositories we index at once.
164 IndexConcurrency int
165
166 // Interval is how often we sync with Sourcegraph.
167 Interval time.Duration
168
169 // CPUCount is the number of CPUs to use for indexing shards.
170 CPUCount int
171
172 queue Queue
173
174 // muIndexDir protects the index directory from concurrent access.
175 muIndexDir indexMutex
176
177 // If true, shard merging is enabled.
178 shardMerging bool
179
180 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
181 // use delta-builds for instead of normal builds
182 deltaBuildRepositoriesAllowList map[string]struct{}
183
184 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
185 // before attempting a delta build.
186 deltaShardNumberFallbackThreshold uint64
187
188 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
189 // we skip calculating symbols metadata for during builds
190 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
191
192 hostname string
193
194 mergeOpts mergeOpts
195
196 // timeout defines how long the index server waits before killing an indexing job.
197 timeout time.Duration
198}
199
200var debug = log.New(io.Discard, "", log.LstdFlags)
201
202// our index commands should output something every 100mb they process.
203//
204// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
205// time." famous last words. A client was indexing a monorepo with 42
206// cores... 5m was not enough.
207const noOutputTimeout = 30 * time.Minute
208
209func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
210 out := &synchronizedBuffer{}
211 cmd.Stdout = out
212 cmd.Stderr = out
213
214 tr.LazyPrintf("%s", cmd.Args)
215
216 defer func() {
217 if err != nil {
218 outS := out.String()
219 tr.LazyPrintf("failed: %v", err)
220 tr.LazyPrintf("output: %s", out)
221 tr.SetError()
222 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
223 }
224 }()
225
226 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
227
228 if err := cmd.Start(); err != nil {
229 return err
230 }
231
232 errC := make(chan error)
233 go func() {
234 errC <- cmd.Wait()
235 }()
236
237 // This channel is set after we have sent sigquit. It allows us to follow up
238 // with a sigkill if the process doesn't quit after sigquit.
239 kill := make(<-chan time.Time)
240
241 lastLen := 0
242 for {
243 select {
244 case <-time.After(noOutputTimeout):
245 // Periodically check if we have had output. If not kill the process.
246 if out.Len() != lastLen {
247 lastLen = out.Len()
248 log.Printf("still running %s", cmd.Args)
249 } else {
250 // Send quit (C-\) first so we get a stack dump.
251 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
252 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
253 log.Println("quit failed:", err)
254 }
255
256 // send sigkill if still running in 10s
257 kill = time.After(10 * time.Second)
258 }
259
260 case <-kill:
261 log.Printf("still running, killing %s", cmd.Args)
262 if err := cmd.Process.Kill(); err != nil {
263 log.Println("kill failed:", err)
264 }
265
266 case err := <-errC:
267 if err != nil {
268 return err
269 }
270
271 tr.LazyPrintf("success")
272 return nil
273 }
274 }
275}
276
277// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
278// monitor the buffer while it is being written to.
279type synchronizedBuffer struct {
280 mu sync.Mutex
281 b bytes.Buffer
282}
283
284func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
285 sb.mu.Lock()
286 defer sb.mu.Unlock()
287 return sb.b.Write(p)
288}
289
290func (sb *synchronizedBuffer) Len() int {
291 sb.mu.Lock()
292 defer sb.mu.Unlock()
293 return sb.b.Len()
294}
295
296func (sb *synchronizedBuffer) String() string {
297 sb.mu.Lock()
298 defer sb.mu.Unlock()
299 return sb.b.String()
300}
301
302// pauseFileName if present in IndexDir will stop index jobs from
303// running. This is to make it possible to experiment with the content of the
304// IndexDir without the indexserver writing to it.
305const pauseFileName = "PAUSE"
306
307// Run the sync loop. This blocks forever.
308func (s *Server) Run() {
309 removeIncompleteShards(s.IndexDir)
310
311 // Start a goroutine which updates the queue with commits to index.
312 go func() {
313 // We update the list of indexed repos every Interval. To speed up manual
314 // testing we also listen for SIGUSR1 to trigger updates.
315 //
316 // "pkill -SIGUSR1 zoekt-sourcegra"
317 for range jitterTicker(s.Interval, unix.SIGUSR1) {
318 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
319 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
320 continue
321 }
322
323 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
324 if err != nil {
325 log.Printf("error listing repos: %s", err)
326 continue
327 }
328
329 debug.Printf("updating index queue with %d repositories", len(repos.IDs))
330
331 // Stop indexing repos we don't need to track anymore
332 removed := s.queue.MaybeRemoveMissing(repos.IDs)
333 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
334 if len(removed) > 0 {
335 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
336 }
337
338 cleanupDone := make(chan struct{})
339 go func() {
340 defer close(cleanupDone)
341 s.muIndexDir.Global(func() {
342 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
343 })
344 }()
345
346 repos.IterateIndexOptions(s.queue.AddOrUpdate)
347
348 // IterateIndexOptions will only iterate over repositories that have
349 // changed since we last called list. However, we want to add all IDs
350 // back onto the queue just to check that what is on disk is still
351 // correct. This will use the last IndexOptions we stored in the
352 // queue. The repositories not on the queue (missing) need a forced
353 // fetch of IndexOptions.
354 missing := s.queue.Bump(repos.IDs)
355 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
356
357 setCompoundShardCounter(s.IndexDir)
358
359 <-cleanupDone
360 }
361 }()
362
363 go func() {
364 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
365 if s.shardMerging {
366 s.vacuum()
367 }
368 }
369 }()
370
371 go func() {
372 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
373 if s.shardMerging {
374 s.doMerge()
375 }
376 }
377 }()
378
379 for i := 0; i < s.IndexConcurrency; i++ {
380 go s.processQueue()
381 }
382
383 // block forever
384 select {}
385}
386
387// formatList returns a comma-separated list of the first min(len(v), m) items.
388func formatListUint32(v []uint32, m int) string {
389 if len(v) < m {
390 m = len(v)
391 }
392
393 sb := strings.Builder{}
394 for i := 0; i < m; i++ {
395 fmt.Fprintf(&sb, "%d, ", v[i])
396 }
397
398 if len(v) > m {
399 sb.WriteString("...")
400 }
401
402 return strings.TrimRight(sb.String(), ", ")
403}
404
405func (s *Server) processQueue() {
406 for {
407 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
408 time.Sleep(time.Second)
409 continue
410 }
411
412 item, ok := s.queue.Pop()
413 if !ok {
414 time.Sleep(time.Second)
415 continue
416 }
417
418 opts := item.Opts
419 args := s.indexArgs(opts)
420
421 ran := s.muIndexDir.With(opts.Name, func() {
422 // only record time taken once we hold the lock. This avoids us
423 // recording time taken while merging/cleanup runs.
424 start := time.Now()
425
426 state, err := s.Index(args)
427
428 elapsed := time.Since(start)
429 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
430
431 indexDelay := time.Since(item.DateAddedToQueue)
432 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds())
433
434 if err != nil {
435 log.Printf("error indexing %s: %s", args.String(), err)
436 }
437
438 switch state {
439 case indexStateSuccess:
440 var branches []string
441 for _, b := range args.Branches {
442 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
443 }
444 s.logger.Info("updated index",
445 sglog.String("repo", args.Name),
446 sglog.Uint32("id", args.RepoID),
447 sglog.Strings("branches", branches),
448 sglog.Duration("duration", elapsed),
449 sglog.Duration("index_delay", indexDelay),
450 )
451 case indexStateSuccessMeta:
452 log.Printf("updated meta %s in %v", args.String(), elapsed)
453 }
454 s.queue.SetIndexed(opts, state)
455 })
456
457 if !ran {
458 // Someone else is processing the repository. We can just skip this job
459 // since the repository will be added back to the queue and we will
460 // converge to the correct behaviour.
461 debug.Printf("index job for repository already running: %s", args)
462 continue
463 }
464 }
465}
466
467// repoNameForMetric returns a normalized version of the given repository name that is
468// suitable for use with Prometheus metrics.
469func repoNameForMetric(repo string) string {
470 // Check to see if we want to be able to capture separate indexing metrics for this repository.
471 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
472 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
473 return repo
474 }
475
476 return ""
477}
478
479func batched(slice []uint32, size int) <-chan []uint32 {
480 c := make(chan []uint32)
481 go func() {
482 for len(slice) > 0 {
483 if size > len(slice) {
484 size = len(slice)
485 }
486 c <- slice[:size]
487 slice = slice[size:]
488 }
489 close(c)
490 }()
491 return c
492}
493
494// jitterTicker returns a ticker which ticks with a jitter. Each tick is
495// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
496//
497// sig is a list of signals which also cause the ticker to fire. This is a
498// convenience to allow manually triggering of the ticker.
499func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
500 ticker := make(chan struct{})
501
502 go func() {
503 for {
504 ticker <- struct{}{}
505 ns := int64(d)
506 jitter := rand.Int63n(ns)
507 time.Sleep(time.Duration(ns/2 + jitter))
508 }
509 }()
510
511 go func() {
512 if len(sig) == 0 {
513 return
514 }
515
516 c := make(chan os.Signal, 1)
517 signal.Notify(c, sig...)
518 for range c {
519 ticker <- struct{}{}
520 }
521 }()
522
523 return ticker
524}
525
526// Index starts an index job for repo name at commit.
527func (s *Server) Index(args *indexArgs) (state indexState, err error) {
528 tr := trace.New("index", args.Name)
529 tr.SetMaxEvents(30) // Ensure we capture all indexing events
530
531 defer func() {
532 if err != nil {
533 tr.SetError()
534 tr.LazyPrintf("error: %v", err)
535 state = indexStateFail
536 metricFailingTotal.Inc()
537 }
538 tr.LazyPrintf("state: %s", state)
539 tr.Finish()
540 }()
541
542 tr.LazyPrintf("branches: %v", args.Branches)
543
544 if len(args.Branches) == 0 {
545 return indexStateEmpty, createEmptyShard(args)
546 }
547
548 repositoryName := args.Name
549 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
550 tr.LazyPrintf("marking this repository for delta build")
551 args.UseDelta = true
552 }
553
554 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
555
556 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
557 tr.LazyPrintf("skipping symbols calculation")
558 args.Symbols = false
559 }
560
561 reason := "forced"
562
563 if args.Incremental {
564 bo := args.BuildOptions()
565 bo.SetDefaults()
566 incrementalState, fn := bo.IndexState()
567 reason = string(incrementalState)
568 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
569
570 switch incrementalState {
571 case build.IndexStateEqual:
572 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn)
573 return indexStateNoop, nil
574
575 case build.IndexStateMeta:
576 log.Printf("updating index.meta %s", args.String())
577
578 if err := mergeMeta(bo); err != nil {
579 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
580 } else {
581 return indexStateSuccessMeta, nil
582 }
583
584 case build.IndexStateCorrupt:
585 log.Printf("falling back to full update: corrupt index: %s", args.String())
586 }
587 }
588
589 log.Printf("updating index %s reason=%s", args.String(), reason)
590
591 metricIndexingTotal.Inc()
592 c := gitIndexConfig{
593 runCmd: func(cmd *exec.Cmd) error {
594 return s.loggedRun(tr, cmd)
595 },
596
597 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
598 return args.BuildOptions().FindRepositoryMetadata()
599 },
600 timeout: s.timeout,
601 }
602
603 err = gitIndex(c, args, s.Sourcegraph, s.logger)
604 if err != nil {
605 return indexStateFail, err
606 }
607
608 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
609 s.logger.Error("failed to update index status",
610 sglog.String("repo", args.Name),
611 sglog.Uint32("id", args.RepoID),
612 sglogBranches("branches", args.Branches),
613 sglog.Error(err),
614 )
615 }
616
617 return indexStateSuccess, nil
618}
619
620// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
621// it can update the zoekt_repos table.
622func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
623 // We need to read from disk for IndexTime.
624 _, metadata, ok, err := c.findRepositoryMetadata(args)
625 if err != nil {
626 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
627 }
628 if !ok {
629 return errors.New("failed to find metadata for new/updated index")
630 }
631
632 status := []indexStatus{{
633 RepoID: args.RepoID,
634 Branches: args.Branches,
635 IndexTimeUnix: metadata.IndexTime.Unix(),
636 }}
637 if err := sg.UpdateIndexStatus(status); err != nil {
638 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
639 }
640
641 return nil
642}
643
644func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
645 ss := make([]string, len(branches))
646 for i, b := range branches {
647 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
648 }
649 return sglog.Strings(key, ss)
650}
651
652func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
653 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
654 return &indexArgs{
655 IndexOptions: opts,
656 IndexDir: s.IndexDir,
657 Parallelism: parallelism,
658 Incremental: true,
659
660 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
661 FileLimit: 1 << 20,
662
663 ShardMerging: s.shardMerging,
664 }
665}
666
667// parallelism consults both the server flags and index options to determine the number
668// of shards to index in parallel. If the CPUCount index option is provided, it always
669// overrides the server flag.
670func (s *Server) parallelism(opts IndexOptions, maxProcs int) int {
671 var parallelism int
672 if opts.ShardConcurrency > 0 {
673 parallelism = int(opts.ShardConcurrency)
674 } else {
675 parallelism = s.CPUCount
676 }
677
678 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs
679 if parallelism > 4*maxProcs {
680 parallelism = 4 * maxProcs
681 }
682
683 // If index concurrency is set, then divide the parallelism by the number of
684 // repos we're indexing in parallel
685 if s.IndexConcurrency > 1 {
686 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency)))
687 }
688
689 return parallelism
690}
691
692func createEmptyShard(args *indexArgs) error {
693 bo := args.BuildOptions()
694 bo.SetDefaults()
695 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
696
697 if args.Incremental && bo.IncrementalSkipIndexing() {
698 return nil
699 }
700
701 builder, err := build.NewBuilder(*bo)
702 if err != nil {
703 return err
704 }
705 return builder.Finish()
706}
707
708// addDebugHandlers adds handlers specific to indexserver.
709func (s *Server) addDebugHandlers(mux *http.ServeMux) {
710 // Sourcegraph's site admin view requires indexserver to serve it's admin view
711 // on "/".
712 mux.Handle("/", http.HandlerFunc(s.handleRoot))
713
714 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
715 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
716 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
717 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
718 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
719 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
720}
721
722func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
723 if r.Method != "GET" {
724 w.Header().Set("Allow", "GET")
725 w.WriteHeader(http.StatusMethodNotAllowed)
726 return
727 }
728
729 response := struct {
730 Hostname string
731 }{
732 Hostname: s.hostname,
733 }
734
735 b, err := json.Marshal(response)
736 if err != nil {
737 http.Error(w, err.Error(), http.StatusInternalServerError)
738 return
739 }
740
741 w.Header().Set("Content-Type", "application/json; charset=utf-8")
742 w.Write(b)
743}
744
745var rootTmpl = template.Must(template.New("name").Parse(`
746<html>
747 <body>
748 <a href="debug">Debug</a><br />
749 <a href="debug/requests">Traces</a><br />
750 {{.IndexMsg}}<br />
751 <br />
752 <h3>Reindex</h3>
753 {{if .Repos}}
754 <a href="?show_repos=false">hide repos</a><br />
755 <table style="margin-top: 20px">
756 <th style="text-align:left">Name</th>
757 <th style="text-align:left">ID</th>
758 {{range .Repos}}
759 <tr>
760 <td>{{.Name}}</td>
761 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
762 </tr>
763 {{end}}
764 </table>
765 {{else}}
766 <a href="?show_repos=true">show repos</a><br />
767 {{end}}
768 </body>
769</html>
770`))
771
772func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
773 if r.Method != "GET" {
774 w.Header().Set("Allow", "GET")
775 w.WriteHeader(http.StatusMethodNotAllowed)
776 return
777 }
778
779 values := r.URL.Query()
780
781 // ?id=
782 indexMsg := ""
783 if v := values.Get("id"); v != "" {
784 id, err := strconv.Atoi(v)
785 if err != nil {
786 http.Error(w, err.Error(), http.StatusBadRequest)
787 return
788 }
789 indexMsg, _ = s.forceIndex(uint32(id))
790 }
791
792 // ?show_repos=
793 showRepos := false
794 if v := values.Get("show_repos"); v != "" {
795 showRepos, _ = strconv.ParseBool(v)
796 }
797
798 type Repo struct {
799 ID uint32
800 Name string
801 }
802 var data struct {
803 Repos []Repo
804 IndexMsg string
805 }
806
807 data.IndexMsg = indexMsg
808
809 if showRepos {
810 s.queue.Iterate(func(opts *IndexOptions) {
811 data.Repos = append(data.Repos, Repo{
812 ID: opts.RepoID,
813 Name: opts.Name,
814 })
815 })
816 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
817 }
818
819 _ = rootTmpl.Execute(w, data)
820}
821
822// handleReindex triggers a reindex asynocronously. If a reindex was triggered
823// the request returns with status 202. The caller can infer the new state of
824// the index by calling List.
825func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
826 if r.Method != http.MethodPost {
827 w.Header().Set("Allow", http.MethodPost)
828 w.WriteHeader(http.StatusMethodNotAllowed)
829 return
830 }
831
832 err := r.ParseForm()
833 if err != nil {
834 http.Error(w, err.Error(), http.StatusBadRequest)
835 return
836 }
837
838 id, err := strconv.Atoi(r.Form.Get("repo"))
839 if err != nil {
840 http.Error(w, err.Error(), http.StatusBadRequest)
841 return
842 }
843
844 go func() { s.forceIndex(uint32(id)) }()
845
846 // 202 Accepted
847 w.WriteHeader(http.StatusAccepted)
848}
849
850func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
851 withIndexed := true
852 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
853 withIndexed = b
854 }
855
856 var indexed []uint32
857 if withIndexed {
858 indexed = listIndexed(s.IndexDir)
859 }
860
861 repos, err := s.Sourcegraph.List(r.Context(), indexed)
862 if err != nil {
863 http.Error(w, err.Error(), http.StatusInternalServerError)
864 return
865 }
866
867 bw := bytes.Buffer{}
868
869 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
870
871 _, err = fmt.Fprintf(tw, "ID\tName\n")
872 if err != nil {
873 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
874 return
875 }
876
877 s.queue.mu.Lock()
878 name := ""
879 for _, id := range repos.IDs {
880 if item := s.queue.get(id); item != nil {
881 name = item.opts.Name
882 } else {
883 name = ""
884 }
885 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
886 if err != nil {
887 debug.Printf("handleDebugList: %s\n", err.Error())
888 }
889 }
890 s.queue.mu.Unlock()
891
892 if err != nil {
893 http.Error(w, err.Error(), http.StatusInternalServerError)
894 return
895 }
896
897 err = tw.Flush()
898 if err != nil {
899 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
900 return
901 }
902
903 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
904
905 if _, err := io.Copy(w, &bw); err != nil {
906 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
907 return
908 }
909}
910
911// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
912// can run this command during periods of low usage (evenings, weekends) to
913// trigger an initial merge run. In the steady-state, merges happen rarely, even
914// on busy instances, and users can rely on automatic merging instead.
915func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
916 // A merge operation can take very long, depending on the number merges and the
917 // target size of the compound shards. We run the merge in the background and
918 // return immediately to the user.
919 //
920 // We track the status of the merge with metricShardMergingRunning.
921 go func() {
922 s.doMerge()
923 }()
924 _, _ = w.Write([]byte("merging enqueued\n"))
925}
926
927func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
928 indexed := listIndexed(s.IndexDir)
929
930 bw := bytes.Buffer{}
931
932 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
933
934 _, err := fmt.Fprintf(tw, "ID\tName\n")
935 if err != nil {
936 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
937 return
938 }
939
940 s.queue.mu.Lock()
941 name := ""
942 for _, id := range indexed {
943 if item := s.queue.get(id); item != nil {
944 name = item.opts.Name
945 } else {
946 name = ""
947 }
948 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
949 if err != nil {
950 debug.Printf("handleDebugIndexed: %s\n", err.Error())
951 }
952 }
953 s.queue.mu.Unlock()
954
955 if err != nil {
956 http.Error(w, err.Error(), http.StatusInternalServerError)
957 return
958 }
959
960 err = tw.Flush()
961 if err != nil {
962 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
963 return
964 }
965
966 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
967
968 if _, err := io.Copy(w, &bw); err != nil {
969 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
970 return
971 }
972}
973
974// forceIndex will run the index job for repo name now. It will return always
975// return a string explaining what it did, even if it failed.
976func (s *Server) forceIndex(id uint32) (string, error) {
977 var opts IndexOptions
978 var err error
979 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
980 opts = o
981 }, func(_ uint32, e error) {
982 err = e
983 }, id)
984 if err != nil {
985 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
986 }
987
988 args := s.indexArgs(opts)
989 args.Incremental = false // force re-index
990
991 var state indexState
992 ran := s.muIndexDir.With(opts.Name, func() {
993 state, err = s.Index(args)
994 })
995 if !ran {
996 return fmt.Sprintf("index job for repository already running: %s", args), nil
997 }
998 if err != nil {
999 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
1000 }
1001 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
1002}
1003
1004func listIndexed(indexDir string) []uint32 {
1005 index := getShards(indexDir)
1006 metricNumIndexed.Set(float64(len(index)))
1007 repoIDs := make([]uint32, 0, len(index))
1008 for id := range index {
1009 repoIDs = append(repoIDs, id)
1010 }
1011 sort.Slice(repoIDs, func(i, j int) bool {
1012 return repoIDs[i] < repoIDs[j]
1013 })
1014 return repoIDs
1015}
1016
1017// setupTmpDir sets up a temporary directory on the same volume as the
1018// indexes.
1019//
1020// If main is true we will delete older temp directories left around. main is
1021// false when this is a debug command.
1022func setupTmpDir(logger sglog.Logger, main bool, index string) error {
1023 // change the target tmp directory depending on if it's our main daemon or a
1024 // debug sub command.
1025 dir := ".indexserver.debug.tmp"
1026 if main {
1027 dir = ".indexserver.tmp"
1028 }
1029
1030 tmpRoot := filepath.Join(index, dir)
1031
1032 if main {
1033 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot))
1034 err := os.RemoveAll(tmpRoot)
1035 if err != nil {
1036 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err))
1037 }
1038 }
1039
1040 if err := os.MkdirAll(tmpRoot, 0o755); err != nil {
1041 return err
1042 }
1043
1044 return os.Setenv("TMPDIR", tmpRoot)
1045}
1046
1047func printMetaData(fn string) error {
1048 repo, indexMeta, err := zoekt.ReadMetadataPath(fn)
1049 if err != nil {
1050 return err
1051 }
1052
1053 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1054 if err != nil {
1055 return err
1056 }
1057
1058 err = json.NewEncoder(os.Stdout).Encode(repo)
1059 if err != nil {
1060 return err
1061 }
1062 return nil
1063}
1064
1065func printShardStats(fn string) error {
1066 f, err := os.Open(fn)
1067 if err != nil {
1068 return err
1069 }
1070
1071 iFile, err := zoekt.NewIndexFile(f)
1072 if err != nil {
1073 return err
1074 }
1075
1076 return zoekt.PrintNgramStats(iFile)
1077}
1078
1079func srcLogLevelIsDebug() bool {
1080 lvl := os.Getenv(sglog.EnvLogLevel)
1081 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1082}
1083
1084func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1085 v := os.Getenv(k)
1086 if v == "" {
1087 return defaultVal
1088 }
1089 b, err := strconv.ParseBool(v)
1090 if err != nil {
1091 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1092 }
1093 return b
1094}
1095
1096func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1097 v := os.Getenv(k)
1098 if v == "" {
1099 return defaultVal
1100 }
1101 i, err := strconv.ParseInt(v, 10, 64)
1102 if err != nil {
1103 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1104 }
1105 return i
1106}
1107
1108func getEnvWithDefaultInt(k string, defaultVal int) int {
1109 v := os.Getenv(k)
1110 if v == "" {
1111 return defaultVal
1112 }
1113 i, err := strconv.Atoi(k)
1114 if err != nil {
1115 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1116 }
1117 return i
1118}
1119
1120func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1121 v := os.Getenv(k)
1122 if v == "" {
1123 return defaultVal
1124 }
1125 i, err := strconv.ParseUint(v, 10, 64)
1126 if err != nil {
1127 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1128 }
1129 return i
1130}
1131
1132func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1133 v := os.Getenv(k)
1134 if v == "" {
1135 return defaultVal
1136 }
1137 f, err := strconv.ParseFloat(v, 64)
1138 if err != nil {
1139 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1140 }
1141 return f
1142}
1143
1144func getEnvWithDefaultString(k string, defaultVal string) string {
1145 v := os.Getenv(k)
1146 if v == "" {
1147 return defaultVal
1148 }
1149 return v
1150}
1151
1152func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1153 v := os.Getenv(k)
1154 if v == "" {
1155 return defaultVal
1156 }
1157
1158 d, err := time.ParseDuration(v)
1159 if err != nil {
1160 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1161 }
1162 return d
1163}
1164
1165func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1166 set := map[string]struct{}{}
1167 for _, v := range strings.Split(os.Getenv(k), ",") {
1168 v = strings.TrimSpace(v)
1169 if v != "" {
1170 set[v] = struct{}{}
1171 }
1172 }
1173 return set
1174}
1175
1176func joinStringSet(set map[string]struct{}, sep string) string {
1177 var xs []string
1178 for x := range set {
1179 xs = append(xs, x)
1180 }
1181
1182 return strings.Join(xs, sep)
1183}
1184
1185func setCompoundShardCounter(indexDir string) {
1186 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt"))
1187 if err != nil {
1188 log.Printf("setCompoundShardCounter: %s\n", err)
1189 return
1190 }
1191 metricNumberCompoundShards.Set(float64(len(fns)))
1192}
1193
1194func rootCmd() *ffcli.Command {
1195 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1196 conf := rootConfig{
1197 Main: true,
1198 }
1199 conf.registerRootFlags(rootFs)
1200
1201 return &ffcli.Command{
1202 FlagSet: rootFs,
1203 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1204 Subcommands: []*ffcli.Command{debugCmd()},
1205 Exec: func(ctx context.Context, args []string) error {
1206 return startServer(conf)
1207 },
1208 }
1209}
1210
1211type rootConfig struct {
1212 // Main is true if this rootConfig is for our main long running command (the
1213 // indexserver). Debug commands should not set this value. This is used to
1214 // determine if we need to run tmpfriend.
1215 Main bool
1216
1217 root string
1218 interval time.Duration
1219 index string
1220 indexConcurrency int64
1221 listen string
1222 hostname string
1223 cpuFraction float64
1224 blockProfileRate int
1225
1226 // config values related to shard merging
1227 disableShardMerging bool
1228 vacuumInterval time.Duration
1229 mergeInterval time.Duration
1230 targetSize int64
1231 minSize int64
1232 minAgeDays int
1233
1234 // config values related to backoff indexing repos with one or more consecutive failures
1235 backoffDuration time.Duration
1236 maxBackoffDuration time.Duration
1237}
1238
1239func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1240 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1241 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1242 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
1243 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use")
1244 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1245 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1246 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1247 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate")
1248 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1249 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1250
1251 // flags related to shard merging
1252 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging")
1253 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1254 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1255 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB")
1256 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB")
1257 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1258}
1259
1260func startServer(conf rootConfig) error {
1261 s, err := newServer(conf)
1262 if err != nil {
1263 return err
1264 }
1265
1266 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate)
1267 setCompoundShardCounter(s.IndexDir)
1268
1269 if conf.listen != "" {
1270
1271 mux := http.NewServeMux()
1272 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1273 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1274 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1275 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1276 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1277 }...)
1278 s.addDebugHandlers(mux)
1279
1280 go func() {
1281 debug.Printf("serving HTTP on %s", conf.listen)
1282 log.Fatal(http.ListenAndServe(conf.listen, mux))
1283 }()
1284
1285 // Serve mux on a unix domain socket on a best-effort-basis so that
1286 // webserver can call the endpoints via the shared filesystem.
1287 //
1288 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1289 // on the socket due to permission errors. See
1290 // https://github.com/docker/for-mac/issues/6239
1291 go func() {
1292 serveHTTPOverSocket := func() error {
1293 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1294 // We cannot bind a socket to an existing pathname.
1295 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1296 return fmt.Errorf("error removing socket file: %s", socket)
1297 }
1298 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1299 // unixpacket).
1300 l, err := net.Listen("unix", socket)
1301 if err != nil {
1302 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1303 }
1304 // Indexserver (root) and webserver (Sourcegraph) run with
1305 // different users. Per default, the socket is created with
1306 // permission 755 (root root), which doesn't let webserver write to
1307 // it.
1308 //
1309 // See https://github.com/golang/go/issues/11822 for more context.
1310 if err := os.Chmod(socket, 0o777); err != nil {
1311 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1312 }
1313 debug.Printf("serving HTTP on %s", socket)
1314 return http.Serve(l, mux)
1315 }
1316 debug.Print(serveHTTPOverSocket())
1317 }()
1318 }
1319
1320 oc := &ownerChecker{
1321 Path: filepath.Join(conf.index, "owner.txt"),
1322 Hostname: conf.hostname,
1323 }
1324 go oc.Run()
1325
1326 logger := sglog.Scoped("metricsRegistration")
1327 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1328
1329 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1330 prometheus.DefaultRegisterer.MustRegister(c)
1331
1332 s.Run()
1333 return nil
1334}
1335
1336func newServer(conf rootConfig) (*Server, error) {
1337 logger := sglog.Scoped("server")
1338
1339 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1340 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1341 }
1342 if conf.index == "" {
1343 return nil, fmt.Errorf("must set -index")
1344 }
1345 if conf.root == "" {
1346 return nil, fmt.Errorf("must set -sourcegraph_url")
1347 }
1348 rootURL, err := url.Parse(conf.root)
1349 if err != nil {
1350 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1351 }
1352
1353 rootURL = addDefaultPort(rootURL)
1354
1355 // Tune GOMAXPROCS to match Linux container CPU quota.
1356 _, _ = maxprocs.Set()
1357
1358 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler.
1359 // The block profiler is disabled by default and should be enabled with care in production
1360 runtime.SetBlockProfileRate(conf.blockProfileRate)
1361
1362 // Automatically prepend our own path at the front, to minimize
1363 // required configuration.
1364 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1365 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1366 }
1367
1368 if _, err := os.Stat(conf.index); err != nil {
1369 if err := os.MkdirAll(conf.index, 0o755); err != nil {
1370 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1371 }
1372 }
1373
1374 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil {
1375 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1376 }
1377
1378 if srcLogLevelIsDebug() {
1379 debug = log.New(os.Stderr, "", log.LstdFlags)
1380 }
1381
1382 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1383 if len(reposWithSeparateIndexingMetrics) > 0 {
1384 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1385 }
1386
1387 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1388 if len(deltaBuildRepositoriesAllowList) > 0 {
1389 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1390 }
1391
1392 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1393 if deltaShardNumberFallbackThreshold > 0 {
1394 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1395 } else {
1396 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1397 }
1398
1399 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1400 if len(reposShouldSkipSymbolsCalculation) > 0 {
1401 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1402 }
1403
1404 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout)
1405 if indexingTimeout != defaultIndexingTimeout {
1406 debug.Printf("using configured indexing timeout: %s", indexingTimeout)
1407 }
1408
1409 var sg Sourcegraph
1410 if rootURL.IsAbs() {
1411 var batchSize int
1412 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1413 batchSize, err = strconv.Atoi(v)
1414 if err != nil {
1415 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1416 }
1417 }
1418
1419 opts := []SourcegraphClientOption{
1420 WithBatchSize(batchSize),
1421 }
1422
1423 logger := sglog.Scoped("zoektConfigurationGRPCClient")
1424 client, err := dialGRPCClient(rootURL.Host, logger)
1425 if err != nil {
1426 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1427 }
1428
1429 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...)
1430
1431 } else {
1432 sg = sourcegraphFake{
1433 RootDir: rootURL.String(),
1434 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1435 }
1436 }
1437
1438 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
1439 if cpuCount < 1 {
1440 cpuCount = 1
1441 }
1442
1443 if conf.indexConcurrency < 1 {
1444 conf.indexConcurrency = 1
1445 } else if conf.indexConcurrency > int64(cpuCount) {
1446 conf.indexConcurrency = int64(cpuCount)
1447 }
1448
1449 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1450
1451 return &Server{
1452 logger: logger,
1453 Sourcegraph: sg,
1454 IndexDir: conf.index,
1455 IndexConcurrency: int(conf.indexConcurrency),
1456 Interval: conf.interval,
1457 CPUCount: cpuCount,
1458 queue: *q,
1459 shardMerging: !conf.disableShardMerging,
1460 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1461 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1462 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1463 hostname: conf.hostname,
1464 mergeOpts: mergeOpts{
1465 vacuumInterval: conf.vacuumInterval,
1466 mergeInterval: conf.mergeInterval,
1467 targetSizeBytes: conf.targetSize * 1024 * 1024,
1468 minSizeBytes: conf.minSize * 1024 * 1024,
1469 minAgeDays: conf.minAgeDays,
1470 },
1471 timeout: indexingTimeout,
1472 }, err
1473}
1474
1475// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1476// for the indexed-search-configuration gRPC service.
1477//
1478// The default backoff strategy is modeled after the default settings used by
1479// retryablehttp.DefaultClient.
1480//
1481// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1482// - Unavailable
1483// - Aborted
1484//
1485//go:embed default_grpc_service_configuration.json
1486var defaultGRPCServiceConfigurationJSON string
1487
1488func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1489 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1490 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1491 return invoker(ctx, method, req, reply, cc, opts...)
1492 }
1493}
1494
1495func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1496 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1497 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1498 return streamer(ctx, desc, cc, method, opts...)
1499 }
1500}
1501
1502// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1503// This can be overridden by providing custom Server/Dial options.
1504const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1505
1506func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) {
1507 metrics := mustGetClientMetrics()
1508
1509 // If the service seems to be unavailable, this
1510 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1
1511 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s])
1512 retryOpts := []retry.CallOption{
1513 retry.WithMax(5),
1514 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)),
1515 retry.WithCodes(codes.Unavailable),
1516 }
1517
1518 opts := []grpc.DialOption{
1519 grpc.WithTransportCredentials(insecure.NewCredentials()),
1520 grpc.WithChainStreamInterceptor(
1521 metrics.StreamClientInterceptor(),
1522 messagesize.StreamClientInterceptor,
1523 internalActorStreamInterceptor(),
1524 internalerrs.LoggingStreamClientInterceptor(logger),
1525 internalerrs.PrometheusStreamClientInterceptor,
1526 retry.StreamClientInterceptor(retryOpts...),
1527 ),
1528 grpc.WithChainUnaryInterceptor(
1529 metrics.UnaryClientInterceptor(),
1530 messagesize.UnaryClientInterceptor,
1531 internalActorUnaryInterceptor(),
1532 internalerrs.LoggingUnaryClientInterceptor(logger),
1533 internalerrs.PrometheusUnaryClientInterceptor,
1534 retry.UnaryClientInterceptor(retryOpts...),
1535 ),
1536 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1537 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1538 }
1539
1540 opts = append(opts, additionalOpts...)
1541
1542 // Ensure that the message size options are set last, so they override any other
1543 // client-specific options that tweak the message size.
1544 //
1545 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1546 // take precedence over everything else with a uniform size setting that's easy to reason about.
1547 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1548
1549 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1550 // This is done lazily, so we can provide the client to use regardless of
1551 // whether we enabled gRPC or not initially.
1552 cc, err := grpc.Dial(addr, opts...)
1553 if err != nil {
1554 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1555 }
1556
1557 client := proto.NewZoektConfigurationServiceClient(cc)
1558 return client, nil
1559}
1560
1561// mustGetClientMetrics returns a singleton instance of the client metrics
1562// that are shared across all gRPC clients that this process creates.
1563//
1564// This function panics if the metrics cannot be registered with the default
1565// Prometheus registry.
1566func mustGetClientMetrics() *grpcprom.ClientMetrics {
1567 clientMetricsOnce.Do(func() {
1568 clientMetrics = grpcprom.NewClientMetrics(
1569 grpcprom.WithClientCounterOptions(),
1570 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
1571 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
1572 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
1573 )
1574
1575 prometheus.DefaultRegisterer.MustRegister(clientMetrics)
1576 })
1577
1578 return clientMetrics
1579}
1580
1581// addDefaultPort adds a default port to a URL if one is not specified.
1582//
1583// If the URL scheme is "http" and no port is specified, "80" is used.
1584// If the scheme is "https", "443" is used.
1585//
1586// The original URL is not mutated. A copy is modified and returned.
1587func addDefaultPort(original *url.URL) *url.URL {
1588 if original == nil {
1589 return nil // don't panic
1590 }
1591
1592 if !original.IsAbs() {
1593 return original // don't do anything if the URL doesn't look like a remote URL
1594 }
1595
1596 if original.Scheme == "http" && original.Port() == "" {
1597 u := cloneURL(original)
1598 u.Host = net.JoinHostPort(u.Host, "80")
1599 return u
1600 }
1601
1602 if original.Scheme == "https" && original.Port() == "" {
1603 u := cloneURL(original)
1604 u.Host = net.JoinHostPort(u.Host, "443")
1605 return u
1606 }
1607
1608 return original
1609}
1610
1611// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1612// This is copied from net/http/clone.go
1613func cloneURL(u *url.URL) *url.URL {
1614 if u == nil {
1615 return nil
1616 }
1617 u2 := new(url.URL)
1618 *u2 = *u
1619 if u.User != nil {
1620 u2.User = new(url.Userinfo)
1621 *u2.User = *u.User
1622 }
1623 return u2
1624}
1625
1626func main() {
1627 liblog := sglog.Init(sglog.Resource{
1628 Name: "zoekt-indexserver",
1629 Version: zoekt.Version,
1630 InstanceID: zoekt.HostnameBestEffort(),
1631 })
1632 defer liblog.Sync()
1633
1634 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1635 log.Fatal(err)
1636 }
1637}
1638
1639// getBoolFromEnvironmentVariables returns the boolean defined by the first environment
1640// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set.
1641//
1642// An error is returned of the provided environment variables fails to parse as a boolean.
1643func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) {
1644 for _, envVar := range envVarNames {
1645 v := os.Getenv(envVar)
1646 if v == "" {
1647 continue
1648 }
1649
1650 b, err := strconv.ParseBool(v)
1651 if err != nil {
1652 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err)
1653 }
1654
1655 return b, nil
1656 }
1657
1658 return defaultBool, nil
1659}