fork of https://github.com/sourcegraph/zoekt
1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled
2// repositories on sourcegraph
3package main
4
5import (
6 "bytes"
7 "context"
8 _ "embed"
9 "encoding/json"
10 "errors"
11 "flag"
12 "fmt"
13 "html/template"
14 "io"
15 "io/fs"
16 "log"
17 "math"
18 "math/rand"
19 "net"
20 "net/http"
21 "net/url"
22 "os"
23 "os/exec"
24 "os/signal"
25 "path/filepath"
26 "runtime"
27 "sort"
28 "strconv"
29 "strings"
30 "sync"
31 "text/tabwriter"
32 "time"
33
34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
36 "github.com/peterbourgon/ff/v3/ffcli"
37 "github.com/prometheus/client_golang/prometheus"
38 "github.com/prometheus/client_golang/prometheus/promauto"
39 sglog "github.com/sourcegraph/log"
40 "github.com/sourcegraph/mountinfo"
41 "go.uber.org/automaxprocs/maxprocs"
42 "golang.org/x/net/trace"
43 "golang.org/x/sys/unix"
44 "google.golang.org/grpc"
45 "google.golang.org/grpc/codes"
46 "google.golang.org/grpc/credentials/insecure"
47 "google.golang.org/grpc/metadata"
48
49 "github.com/sourcegraph/zoekt"
50 "github.com/sourcegraph/zoekt/build"
51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
52 "github.com/sourcegraph/zoekt/debugserver"
53 "github.com/sourcegraph/zoekt/grpc/internalerrs"
54 "github.com/sourcegraph/zoekt/grpc/messagesize"
55 "github.com/sourcegraph/zoekt/internal/profiler"
56)
57
58var (
59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
60 Name: "resolve_revisions_seconds",
61 Help: "A histogram of latencies for resolving all repository revisions.",
62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
63 })
64
65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
66 Name: "resolve_revision_seconds",
67 Help: "A histogram of latencies for resolving a repository revision.",
68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
69 }, []string{"success"}) // success=true|false
70
71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
72 Name: "get_index_options_total",
73 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
74 })
75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
76 Name: "get_index_options_error_total",
77 Help: "The total number of times we failed to get index options for a repository.",
78 })
79
80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
81 Name: "index_repo_seconds",
82 Help: "A histogram of latencies for indexing a repository.",
83 Buckets: prometheus.ExponentialBucketsRange(
84 (100 * time.Millisecond).Seconds(),
85 (40*time.Minute + indexTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
86 20),
87 }, []string{
88 "state", // state is an indexState
89 "name", // name of the repository that was indexed
90 })
91
92 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
93 Name: "index_fetch_seconds",
94 Help: "A histogram of latencies for fetching a repository.",
95 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
96 }, []string{
97 "success", // true|false
98 "name", // the name of the repository that the commits were fetched from
99 })
100
101 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
102 Name: "index_incremental_index_state",
103 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
104 }, []string{"state"}) // state is build.IndexState
105
106 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
107 Name: "index_num_indexed",
108 Help: "Number of indexed repos by code host",
109 })
110
111 metricNumAssigned = promauto.NewGauge(prometheus.GaugeOpts{
112 Name: "index_num_assigned",
113 Help: "Number of repos assigned to this indexer by code host",
114 })
115
116 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
117 Name: "index_failing_total",
118 Help: "Counts failures to index (indexing activity, should be used with rate())",
119 })
120
121 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
122 Name: "index_indexing_total",
123 Help: "Counts indexings (indexing activity, should be used with rate())",
124 })
125
126 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
127 Name: "index_num_stopped_tracking_total",
128 Help: "Counts the number of repos we stopped tracking.",
129 })
130
131 clientMetricsOnce sync.Once
132 clientMetrics *grpcprom.ClientMetrics
133)
134
135// set of repositories that we want to capture separate indexing metrics for
136var reposWithSeparateIndexingMetrics = make(map[string]struct{})
137
138type indexState string
139
140const (
141 indexStateFail indexState = "fail"
142 indexStateSuccess indexState = "success"
143 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
144 indexStateNoop indexState = "noop" // We didn't need to update index
145 indexStateEmpty indexState = "empty" // index is empty (empty repo)
146)
147
148// Server is the main functionality of zoekt-sourcegraph-indexserver. It
149// exists to conveniently use all the options passed in via func main.
150type Server struct {
151 logger sglog.Logger
152
153 Sourcegraph Sourcegraph
154 BatchSize int
155
156 // IndexDir is the index directory to use.
157 IndexDir string
158
159 // IndexConcurrency is the number of repositories we index at once.
160 IndexConcurrency int
161
162 // Interval is how often we sync with Sourcegraph.
163 Interval time.Duration
164 // CPUCount is the amount of parallelism to use when indexing a
165 // repository.
166 CPUCount int
167
168 queue Queue
169
170 // muIndexDir protects the index directory from concurrent access.
171 muIndexDir indexMutex
172
173 // If true, shard merging is enabled.
174 shardMerging bool
175
176 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
177 // use delta-builds for instead of normal builds
178 deltaBuildRepositoriesAllowList map[string]struct{}
179
180 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
181 // before attempting a delta build.
182 deltaShardNumberFallbackThreshold uint64
183
184 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
185 // we skip calculating symbols metadata for during builds
186 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
187
188 hostname string
189
190 mergeOpts mergeOpts
191}
192
193var debug = log.New(io.Discard, "", log.LstdFlags)
194
195// our index commands should output something every 100mb they process.
196//
197// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
198// time." famous last words. A client was indexing a monorepo with 42
199// cores... 5m was not enough.
200const noOutputTimeout = 30 * time.Minute
201
202func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
203 out := &synchronizedBuffer{}
204 cmd.Stdout = out
205 cmd.Stderr = out
206
207 tr.LazyPrintf("%s", cmd.Args)
208
209 defer func() {
210 if err != nil {
211 outS := out.String()
212 tr.LazyPrintf("failed: %v", err)
213 tr.LazyPrintf("output: %s", out)
214 tr.SetError()
215 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
216 }
217 }()
218
219 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
220
221 if err := cmd.Start(); err != nil {
222 return err
223 }
224
225 errC := make(chan error)
226 go func() {
227 errC <- cmd.Wait()
228 }()
229
230 // This channel is set after we have sent sigquit. It allows us to follow up
231 // with a sigkill if the process doesn't quit after sigquit.
232 kill := make(<-chan time.Time)
233
234 lastLen := 0
235 for {
236 select {
237 case <-time.After(noOutputTimeout):
238 // Periodically check if we have had output. If not kill the process.
239 if out.Len() != lastLen {
240 lastLen = out.Len()
241 log.Printf("still running %s", cmd.Args)
242 } else {
243 // Send quit (C-\) first so we get a stack dump.
244 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
245 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
246 log.Println("quit failed:", err)
247 }
248
249 // send sigkill if still running in 10s
250 kill = time.After(10 * time.Second)
251 }
252
253 case <-kill:
254 log.Printf("still running, killing %s", cmd.Args)
255 if err := cmd.Process.Kill(); err != nil {
256 log.Println("kill failed:", err)
257 }
258
259 case err := <-errC:
260 if err != nil {
261 return err
262 }
263
264 tr.LazyPrintf("success")
265 return nil
266 }
267 }
268}
269
270// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
271// monitor the buffer while it is being written to.
272type synchronizedBuffer struct {
273 mu sync.Mutex
274 b bytes.Buffer
275}
276
277func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
278 sb.mu.Lock()
279 defer sb.mu.Unlock()
280 return sb.b.Write(p)
281}
282
283func (sb *synchronizedBuffer) Len() int {
284 sb.mu.Lock()
285 defer sb.mu.Unlock()
286 return sb.b.Len()
287}
288
289func (sb *synchronizedBuffer) String() string {
290 sb.mu.Lock()
291 defer sb.mu.Unlock()
292 return sb.b.String()
293}
294
295// pauseFileName if present in IndexDir will stop index jobs from
296// running. This is to make it possible to experiment with the content of the
297// IndexDir without the indexserver writing to it.
298const pauseFileName = "PAUSE"
299
300// Run the sync loop. This blocks forever.
301func (s *Server) Run() {
302 removeIncompleteShards(s.IndexDir)
303
304 // Start a goroutine which updates the queue with commits to index.
305 go func() {
306 // We update the list of indexed repos every Interval. To speed up manual
307 // testing we also listen for SIGUSR1 to trigger updates.
308 //
309 // "pkill -SIGUSR1 zoekt-sourcegra"
310 for range jitterTicker(s.Interval, unix.SIGUSR1) {
311 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
312 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
313 continue
314 }
315
316 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
317 if err != nil {
318 log.Printf("error listing repos: %s", err)
319 continue
320 }
321
322 debug.Printf("updating index queue with %d repositories", len(repos.IDs))
323
324 // Stop indexing repos we don't need to track anymore
325 removed := s.queue.MaybeRemoveMissing(repos.IDs)
326 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
327 if len(removed) > 0 {
328 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
329 }
330
331 cleanupDone := make(chan struct{})
332 go func() {
333 defer close(cleanupDone)
334 s.muIndexDir.Global(func() {
335 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
336 })
337 }()
338
339 repos.IterateIndexOptions(s.queue.AddOrUpdate)
340
341 // IterateIndexOptions will only iterate over repositories that have
342 // changed since we last called list. However, we want to add all IDs
343 // back onto the queue just to check that what is on disk is still
344 // correct. This will use the last IndexOptions we stored in the
345 // queue. The repositories not on the queue (missing) need a forced
346 // fetch of IndexOptions.
347 missing := s.queue.Bump(repos.IDs)
348 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
349
350 setCompoundShardCounter(s.IndexDir)
351
352 <-cleanupDone
353 }
354 }()
355
356 go func() {
357 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
358 if s.shardMerging {
359 s.vacuum()
360 }
361 }
362 }()
363
364 go func() {
365 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
366 if s.shardMerging {
367 s.doMerge()
368 }
369 }
370 }()
371
372 for i := 0; i < s.IndexConcurrency; i++ {
373 go s.processQueue()
374 }
375
376 // block forever
377 select {}
378}
379
380// formatList returns a comma-separated list of the first min(len(v), m) items.
381func formatListUint32(v []uint32, m int) string {
382 if len(v) < m {
383 m = len(v)
384 }
385
386 sb := strings.Builder{}
387 for i := 0; i < m; i++ {
388 fmt.Fprintf(&sb, "%d, ", v[i])
389 }
390
391 if len(v) > m {
392 sb.WriteString("...")
393 }
394
395 return strings.TrimRight(sb.String(), ", ")
396}
397
398func (s *Server) processQueue() {
399 for {
400 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
401 time.Sleep(time.Second)
402 continue
403 }
404
405 opts, ok := s.queue.Pop()
406 if !ok {
407 time.Sleep(time.Second)
408 continue
409 }
410
411 args := s.indexArgs(opts)
412
413 ran := s.muIndexDir.With(opts.Name, func() {
414 // only record time taken once we hold the lock. This avoids us
415 // recording time taken while merging/cleanup runs.
416 start := time.Now()
417
418 state, err := s.Index(args)
419
420 elapsed := time.Since(start)
421
422 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
423
424 if err != nil {
425 log.Printf("error indexing %s: %s", args.String(), err)
426 }
427
428 switch state {
429 case indexStateSuccess:
430 var branches []string
431 for _, b := range args.Branches {
432 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
433 }
434 s.logger.Info("updated index",
435 sglog.String("repo", args.Name),
436 sglog.Uint32("id", args.RepoID),
437 sglog.Strings("branches", branches),
438 sglog.Duration("duration", elapsed),
439 )
440 case indexStateSuccessMeta:
441 log.Printf("updated meta %s in %v", args.String(), elapsed)
442 }
443 s.queue.SetIndexed(opts, state)
444 })
445
446 if !ran {
447 // Someone else is processing the repository. We can just skip this job
448 // since the repository will be added back to the queue and we will
449 // converge to the correct behaviour.
450 debug.Printf("index job for repository already running: %s", args)
451 continue
452 }
453 }
454}
455
456// repoNameForMetric returns a normalized version of the given repository name that is
457// suitable for use with Prometheus metrics.
458func repoNameForMetric(repo string) string {
459 // Check to see if we want to be able to capture separate indexing metrics for this repository.
460 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
461 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
462 return repo
463 }
464
465 return ""
466}
467
468func batched(slice []uint32, size int) <-chan []uint32 {
469 c := make(chan []uint32)
470 go func() {
471 for len(slice) > 0 {
472 if size > len(slice) {
473 size = len(slice)
474 }
475 c <- slice[:size]
476 slice = slice[size:]
477 }
478 close(c)
479 }()
480 return c
481}
482
483// jitterTicker returns a ticker which ticks with a jitter. Each tick is
484// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
485//
486// sig is a list of signals which also cause the ticker to fire. This is a
487// convenience to allow manually triggering of the ticker.
488func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
489 ticker := make(chan struct{})
490
491 go func() {
492 for {
493 ticker <- struct{}{}
494 ns := int64(d)
495 jitter := rand.Int63n(ns)
496 time.Sleep(time.Duration(ns/2 + jitter))
497 }
498 }()
499
500 go func() {
501 if len(sig) == 0 {
502 return
503 }
504
505 c := make(chan os.Signal, 1)
506 signal.Notify(c, sig...)
507 for range c {
508 ticker <- struct{}{}
509 }
510 }()
511
512 return ticker
513}
514
515// Index starts an index job for repo name at commit.
516func (s *Server) Index(args *indexArgs) (state indexState, err error) {
517 tr := trace.New("index", args.Name)
518
519 defer func() {
520 if err != nil {
521 tr.SetError()
522 tr.LazyPrintf("error: %v", err)
523 state = indexStateFail
524 metricFailingTotal.Inc()
525 }
526 tr.LazyPrintf("state: %s", state)
527 tr.Finish()
528 }()
529
530 tr.LazyPrintf("branches: %v", args.Branches)
531
532 if len(args.Branches) == 0 {
533 return indexStateEmpty, createEmptyShard(args)
534 }
535
536 repositoryName := args.Name
537 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
538 tr.LazyPrintf("marking this repository for delta build")
539 args.UseDelta = true
540 }
541
542 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
543
544 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
545 tr.LazyPrintf("skipping symbols calculation")
546 args.Symbols = false
547 }
548
549 reason := "forced"
550
551 if args.Incremental {
552 bo := args.BuildOptions()
553 bo.SetDefaults()
554 incrementalState, fn := bo.IndexState()
555 reason = string(incrementalState)
556 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
557
558 switch incrementalState {
559 case build.IndexStateEqual:
560 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn)
561 return indexStateNoop, nil
562
563 case build.IndexStateMeta:
564 log.Printf("updating index.meta %s", args.String())
565
566 if err := mergeMeta(bo); err != nil {
567 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
568 } else {
569 return indexStateSuccessMeta, nil
570 }
571
572 case build.IndexStateCorrupt:
573 log.Printf("falling back to full update: corrupt index: %s", args.String())
574 }
575 }
576
577 log.Printf("updating index %s reason=%s", args.String(), reason)
578
579 metricIndexingTotal.Inc()
580 c := gitIndexConfig{
581 runCmd: func(cmd *exec.Cmd) error {
582 return s.loggedRun(tr, cmd)
583 },
584
585 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
586 return args.BuildOptions().FindRepositoryMetadata()
587 },
588 }
589
590 err = gitIndex(c, args, s.Sourcegraph, s.logger)
591 if err != nil {
592 return indexStateFail, err
593 }
594
595 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
596 s.logger.Error("failed to update index status",
597 sglog.String("repo", args.Name),
598 sglog.Uint32("id", args.RepoID),
599 sglogBranches("branches", args.Branches),
600 sglog.Error(err),
601 )
602 }
603
604 return indexStateSuccess, nil
605}
606
607// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
608// it can update the zoekt_repos table.
609func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
610 // We need to read from disk for IndexTime.
611 _, metadata, ok, err := c.findRepositoryMetadata(args)
612 if err != nil {
613 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
614 }
615 if !ok {
616 return errors.New("failed to find metadata for new/updated index")
617 }
618
619 status := []indexStatus{{
620 RepoID: args.RepoID,
621 Branches: args.Branches,
622 IndexTimeUnix: metadata.IndexTime.Unix(),
623 }}
624 if err := sg.UpdateIndexStatus(status); err != nil {
625 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
626 }
627
628 return nil
629}
630
631func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
632 ss := make([]string, len(branches))
633 for i, b := range branches {
634 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
635 }
636 return sglog.Strings(key, ss)
637}
638
639func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
640 return &indexArgs{
641 IndexOptions: opts,
642
643 IndexDir: s.IndexDir,
644 Parallelism: s.CPUCount,
645
646 Incremental: true,
647
648 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
649 FileLimit: 1 << 20,
650 }
651}
652
653func createEmptyShard(args *indexArgs) error {
654 bo := args.BuildOptions()
655 bo.SetDefaults()
656 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
657
658 if args.Incremental && bo.IncrementalSkipIndexing() {
659 return nil
660 }
661
662 builder, err := build.NewBuilder(*bo)
663 if err != nil {
664 return err
665 }
666 return builder.Finish()
667}
668
669// addDebugHandlers adds handlers specific to indexserver.
670func (s *Server) addDebugHandlers(mux *http.ServeMux) {
671 // Sourcegraph's site admin view requires indexserver to serve it's admin view
672 // on "/".
673 mux.Handle("/", http.HandlerFunc(s.handleRoot))
674
675 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
676 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
677 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
678 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
679 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
680 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
681}
682
683func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
684 if r.Method != "GET" {
685 w.Header().Set("Allow", "GET")
686 w.WriteHeader(http.StatusMethodNotAllowed)
687 return
688 }
689
690 response := struct {
691 Hostname string
692 }{
693 Hostname: s.hostname,
694 }
695
696 b, err := json.Marshal(response)
697 if err != nil {
698 http.Error(w, err.Error(), http.StatusInternalServerError)
699 return
700 }
701
702 w.Header().Set("Content-Type", "application/json; charset=utf-8")
703 w.Write(b)
704}
705
706var rootTmpl = template.Must(template.New("name").Parse(`
707<html>
708 <body>
709 <a href="debug">Debug</a><br />
710 <a href="debug/requests">Traces</a><br />
711 {{.IndexMsg}}<br />
712 <br />
713 <h3>Reindex</h3>
714 {{if .Repos}}
715 <a href="?show_repos=false">hide repos</a><br />
716 <table style="margin-top: 20px">
717 <th style="text-align:left">Name</th>
718 <th style="text-align:left">ID</th>
719 {{range .Repos}}
720 <tr>
721 <td>{{.Name}}</td>
722 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
723 </tr>
724 {{end}}
725 </table>
726 {{else}}
727 <a href="?show_repos=true">show repos</a><br />
728 {{end}}
729 </body>
730</html>
731`))
732
733func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
734 if r.Method != "GET" {
735 w.Header().Set("Allow", "GET")
736 w.WriteHeader(http.StatusMethodNotAllowed)
737 return
738 }
739
740 values := r.URL.Query()
741
742 // ?id=
743 indexMsg := ""
744 if v := values.Get("id"); v != "" {
745 id, err := strconv.Atoi(v)
746 if err != nil {
747 http.Error(w, err.Error(), http.StatusBadRequest)
748 return
749 }
750 indexMsg, _ = s.forceIndex(uint32(id))
751 }
752
753 // ?show_repos=
754 showRepos := false
755 if v := values.Get("show_repos"); v != "" {
756 showRepos, _ = strconv.ParseBool(v)
757 }
758
759 type Repo struct {
760 ID uint32
761 Name string
762 }
763 var data struct {
764 Repos []Repo
765 IndexMsg string
766 }
767
768 data.IndexMsg = indexMsg
769
770 if showRepos {
771 s.queue.Iterate(func(opts *IndexOptions) {
772 data.Repos = append(data.Repos, Repo{
773 ID: opts.RepoID,
774 Name: opts.Name,
775 })
776 })
777 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
778 }
779
780 _ = rootTmpl.Execute(w, data)
781}
782
783// handleReindex triggers a reindex asynocronously. If a reindex was triggered
784// the request returns with status 202. The caller can infer the new state of
785// the index by calling List.
786func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
787 if r.Method != http.MethodPost {
788 w.Header().Set("Allow", http.MethodPost)
789 w.WriteHeader(http.StatusMethodNotAllowed)
790 return
791 }
792
793 err := r.ParseForm()
794 if err != nil {
795 http.Error(w, err.Error(), http.StatusBadRequest)
796 return
797 }
798
799 id, err := strconv.Atoi(r.Form.Get("repo"))
800 if err != nil {
801 http.Error(w, err.Error(), http.StatusBadRequest)
802 return
803 }
804
805 go func() { s.forceIndex(uint32(id)) }()
806
807 // 202 Accepted
808 w.WriteHeader(http.StatusAccepted)
809}
810
811func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
812 withIndexed := true
813 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
814 withIndexed = b
815 }
816
817 var indexed []uint32
818 if withIndexed {
819 indexed = listIndexed(s.IndexDir)
820 }
821
822 repos, err := s.Sourcegraph.List(r.Context(), indexed)
823 if err != nil {
824 http.Error(w, err.Error(), http.StatusInternalServerError)
825 return
826 }
827
828 bw := bytes.Buffer{}
829
830 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
831
832 _, err = fmt.Fprintf(tw, "ID\tName\n")
833 if err != nil {
834 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
835 return
836 }
837
838 s.queue.mu.Lock()
839 name := ""
840 for _, id := range repos.IDs {
841 if item := s.queue.get(id); item != nil {
842 name = item.opts.Name
843 } else {
844 name = ""
845 }
846 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
847 if err != nil {
848 debug.Printf("handleDebugList: %s\n", err.Error())
849 }
850 }
851 s.queue.mu.Unlock()
852
853 if err != nil {
854 http.Error(w, err.Error(), http.StatusInternalServerError)
855 return
856 }
857
858 err = tw.Flush()
859 if err != nil {
860 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
861 return
862 }
863
864 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
865
866 if _, err := io.Copy(w, &bw); err != nil {
867 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
868 return
869 }
870}
871
872// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
873// can run this command during periods of low usage (evenings, weekends) to
874// trigger an initial merge run. In the steady-state, merges happen rarely, even
875// on busy instances, and users can rely on automatic merging instead.
876func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
877
878 // A merge operation can take very long, depending on the number merges and the
879 // target size of the compound shards. We run the merge in the background and
880 // return immediately to the user.
881 //
882 // We track the status of the merge with metricShardMergingRunning.
883 go func() {
884 s.doMerge()
885 }()
886 _, _ = w.Write([]byte("merging enqueued\n"))
887}
888
889func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
890 indexed := listIndexed(s.IndexDir)
891
892 bw := bytes.Buffer{}
893
894 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
895
896 _, err := fmt.Fprintf(tw, "ID\tName\n")
897 if err != nil {
898 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
899 return
900 }
901
902 s.queue.mu.Lock()
903 name := ""
904 for _, id := range indexed {
905 if item := s.queue.get(id); item != nil {
906 name = item.opts.Name
907 } else {
908 name = ""
909 }
910 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
911 if err != nil {
912 debug.Printf("handleDebugIndexed: %s\n", err.Error())
913 }
914 }
915 s.queue.mu.Unlock()
916
917 if err != nil {
918 http.Error(w, err.Error(), http.StatusInternalServerError)
919 return
920 }
921
922 err = tw.Flush()
923 if err != nil {
924 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
925 return
926 }
927
928 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
929
930 if _, err := io.Copy(w, &bw); err != nil {
931 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
932 return
933 }
934}
935
936// forceIndex will run the index job for repo name now. It will return always
937// return a string explaining what it did, even if it failed.
938func (s *Server) forceIndex(id uint32) (string, error) {
939 var opts IndexOptions
940 var err error
941 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
942 opts = o
943 }, func(_ uint32, e error) {
944 err = e
945 }, id)
946 if err != nil {
947 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
948 }
949
950 args := s.indexArgs(opts)
951 args.Incremental = false // force re-index
952
953 var state indexState
954 ran := s.muIndexDir.With(opts.Name, func() {
955 state, err = s.Index(args)
956 })
957 if !ran {
958 return fmt.Sprintf("index job for repository already running: %s", args), nil
959 }
960 if err != nil {
961 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
962 }
963 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
964}
965
966func listIndexed(indexDir string) []uint32 {
967 index := getShards(indexDir)
968 metricNumIndexed.Set(float64(len(index)))
969 repoIDs := make([]uint32, 0, len(index))
970 for id := range index {
971 repoIDs = append(repoIDs, id)
972 }
973 sort.Slice(repoIDs, func(i, j int) bool {
974 return repoIDs[i] < repoIDs[j]
975 })
976 return repoIDs
977}
978
979// setupTmpDir sets up a temporary directory on the same volume as the
980// indexes.
981//
982// If main is true we will delete older temp directories left around. main is
983// false when this is a debug command.
984func setupTmpDir(logger sglog.Logger, main bool, index string) error {
985 // change the target tmp directory depending on if it's our main daemon or a
986 // debug sub command.
987 dir := ".indexserver.debug.tmp"
988 if main {
989 dir = ".indexserver.tmp"
990 }
991
992 tmpRoot := filepath.Join(index, dir)
993
994 if main {
995 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot))
996 err := os.RemoveAll(tmpRoot)
997 if err != nil {
998 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err))
999 }
1000 }
1001
1002 if err := os.MkdirAll(tmpRoot, 0755); err != nil {
1003 return err
1004 }
1005
1006 return os.Setenv("TMPDIR", tmpRoot)
1007}
1008
1009func printMetaData(fn string) error {
1010 repo, indexMeta, err := zoekt.ReadMetadataPath(fn)
1011 if err != nil {
1012 return err
1013 }
1014
1015 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1016 if err != nil {
1017 return err
1018 }
1019
1020 err = json.NewEncoder(os.Stdout).Encode(repo)
1021 if err != nil {
1022 return err
1023 }
1024 return nil
1025}
1026
1027func printShardStats(fn string) error {
1028 f, err := os.Open(fn)
1029 if err != nil {
1030 return err
1031 }
1032
1033 iFile, err := zoekt.NewIndexFile(f)
1034 if err != nil {
1035 return err
1036 }
1037
1038 return zoekt.PrintNgramStats(iFile)
1039}
1040
1041func srcLogLevelIsDebug() bool {
1042 lvl := os.Getenv(sglog.EnvLogLevel)
1043 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1044}
1045
1046func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1047 v := os.Getenv(k)
1048 if v == "" {
1049 return defaultVal
1050 }
1051 i, err := strconv.ParseInt(v, 10, 64)
1052 if err != nil {
1053 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1054 }
1055 return i
1056}
1057
1058func getEnvWithDefaultInt(k string, defaultVal int) int {
1059 v := os.Getenv(k)
1060 if v == "" {
1061 return defaultVal
1062 }
1063 i, err := strconv.Atoi(k)
1064 if err != nil {
1065 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1066 }
1067 return i
1068}
1069
1070func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1071 v := os.Getenv(k)
1072 if v == "" {
1073 return defaultVal
1074 }
1075 i, err := strconv.ParseUint(v, 10, 64)
1076 if err != nil {
1077 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1078 }
1079 return i
1080}
1081
1082func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1083 v := os.Getenv(k)
1084 if v == "" {
1085 return defaultVal
1086 }
1087 f, err := strconv.ParseFloat(v, 64)
1088 if err != nil {
1089 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1090 }
1091 return f
1092}
1093
1094func getEnvWithDefaultString(k string, defaultVal string) string {
1095 v := os.Getenv(k)
1096 if v == "" {
1097 return defaultVal
1098 }
1099 return v
1100}
1101
1102func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1103 v := os.Getenv(k)
1104 if v == "" {
1105 return defaultVal
1106 }
1107
1108 d, err := time.ParseDuration(v)
1109 if err != nil {
1110 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1111 }
1112 return d
1113}
1114
1115func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1116 v := os.Getenv(k)
1117 if v == "" {
1118 return defaultVal
1119 }
1120
1121 b, err := strconv.ParseBool(v)
1122 if err != nil {
1123 log.Fatalf("error parsing ENV %s to bool: %s", k, err)
1124 }
1125 return b
1126}
1127
1128func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1129 set := map[string]struct{}{}
1130 for _, v := range strings.Split(os.Getenv(k), ",") {
1131 v = strings.TrimSpace(v)
1132 if v != "" {
1133 set[v] = struct{}{}
1134 }
1135 }
1136 return set
1137}
1138
1139func joinStringSet(set map[string]struct{}, sep string) string {
1140 var xs []string
1141 for x := range set {
1142 xs = append(xs, x)
1143 }
1144
1145 return strings.Join(xs, sep)
1146}
1147
1148func setCompoundShardCounter(indexDir string) {
1149 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt"))
1150 if err != nil {
1151 log.Printf("setCompoundShardCounter: %s\n", err)
1152 return
1153 }
1154 metricNumberCompoundShards.Set(float64(len(fns)))
1155}
1156
1157func rootCmd() *ffcli.Command {
1158 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1159 conf := rootConfig{
1160 Main: true,
1161 }
1162 conf.registerRootFlags(rootFs)
1163
1164 return &ffcli.Command{
1165 FlagSet: rootFs,
1166 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1167 Subcommands: []*ffcli.Command{debugCmd()},
1168 Exec: func(ctx context.Context, args []string) error {
1169 return startServer(conf)
1170 },
1171 }
1172}
1173
1174type rootConfig struct {
1175 // Main is true if this rootConfig is for our main long running command (the
1176 // indexserver). Debug commands should not set this value. This is used to
1177 // determine if we need to run tmpfriend.
1178 Main bool
1179
1180 root string
1181 interval time.Duration
1182 index string
1183 indexConcurrency int64
1184 listen string
1185 hostname string
1186 cpuFraction float64
1187 blockProfileRate int
1188
1189 // config values related to shard merging
1190 vacuumInterval time.Duration
1191 mergeInterval time.Duration
1192 targetSize int64
1193 minSize int64
1194 minAgeDays int
1195 maxPriority float64
1196
1197 // config values related to backoff indexing repos with one or more consecutive failures
1198 backoffDuration time.Duration
1199 maxBackoffDuration time.Duration
1200
1201 // useGRPC is true if we should use the gRPC API to talk to Sourcegraph.
1202 useGRPC bool
1203}
1204
1205func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1206 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1207 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1208 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of concurrent index jobs to run.")
1209 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use")
1210 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1211 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1212 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1213 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate")
1214 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1215 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1216 fs.BoolVar(&rc.useGRPC, "use_grpc", mustGetBoolFromEnvironmentVariables([]string{"GRPC_ENABLED", "SG_FEATURE_FLAG_GRPC"}, true), "use the gRPC API to talk to Sourcegraph")
1217
1218 // flags related to shard merging
1219 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1220 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1221 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB")
1222 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB")
1223 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1224 fs.Float64Var(&rc.maxPriority, "merge_max_priority", getEnvWithDefaultFloat64("SRC_MERGE_MAX_PRIORITY", 100), "the maximum priority a shard can have to be considered for merging.")
1225}
1226
1227func startServer(conf rootConfig) error {
1228 s, err := newServer(conf)
1229 if err != nil {
1230 return err
1231 }
1232
1233 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate)
1234 setCompoundShardCounter(s.IndexDir)
1235
1236 if conf.listen != "" {
1237
1238 mux := http.NewServeMux()
1239 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1240 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1241 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1242 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1243 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1244 }...)
1245 s.addDebugHandlers(mux)
1246
1247 go func() {
1248 debug.Printf("serving HTTP on %s", conf.listen)
1249 log.Fatal(http.ListenAndServe(conf.listen, mux))
1250
1251 }()
1252
1253 // Serve mux on a unix domain socket on a best-effort-basis so that
1254 // webserver can call the endpoints via the shared filesystem.
1255 //
1256 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1257 // on the socket due to permission errors. See
1258 // https://github.com/docker/for-mac/issues/6239
1259 go func() {
1260 serveHTTPOverSocket := func() error {
1261 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1262 // We cannot bind a socket to an existing pathname.
1263 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1264 return fmt.Errorf("error removing socket file: %s", socket)
1265 }
1266 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1267 // unixpacket).
1268 l, err := net.Listen("unix", socket)
1269 if err != nil {
1270 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1271 }
1272 // Indexserver (root) and webserver (Sourcegraph) run with
1273 // different users. Per default, the socket is created with
1274 // permission 755 (root root), which doesn't let webserver write to
1275 // it.
1276 //
1277 // See https://github.com/golang/go/issues/11822 for more context.
1278 if err := os.Chmod(socket, 0777); err != nil {
1279 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1280 }
1281 debug.Printf("serving HTTP on %s", socket)
1282 return http.Serve(l, mux)
1283 }
1284 debug.Print(serveHTTPOverSocket())
1285 }()
1286 }
1287
1288 oc := &ownerChecker{
1289 Path: filepath.Join(conf.index, "owner.txt"),
1290 Hostname: conf.hostname,
1291 }
1292 go oc.Run()
1293
1294 logger := sglog.Scoped("metricsRegistration", "")
1295 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1296
1297 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1298 prometheus.DefaultRegisterer.MustRegister(c)
1299
1300 s.Run()
1301 return nil
1302}
1303
1304func newServer(conf rootConfig) (*Server, error) {
1305 logger := sglog.Scoped("server", "periodically reindexes enabled repositories on sourcegraph")
1306
1307 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1308 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1309 }
1310 if conf.index == "" {
1311 return nil, fmt.Errorf("must set -index")
1312 }
1313 if conf.root == "" {
1314 return nil, fmt.Errorf("must set -sourcegraph_url")
1315 }
1316 rootURL, err := url.Parse(conf.root)
1317 if err != nil {
1318 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1319 }
1320
1321 rootURL = addDefaultPort(rootURL)
1322
1323 // Tune GOMAXPROCS to match Linux container CPU quota.
1324 _, _ = maxprocs.Set()
1325
1326 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler.
1327 // The block profiler is disabled by default and should be enabled with care in production
1328 runtime.SetBlockProfileRate(conf.blockProfileRate)
1329
1330 // Automatically prepend our own path at the front, to minimize
1331 // required configuration.
1332 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1333 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1334 }
1335
1336 if _, err := os.Stat(conf.index); err != nil {
1337 if err := os.MkdirAll(conf.index, 0755); err != nil {
1338 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1339 }
1340 }
1341
1342 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil {
1343 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1344 }
1345
1346 if srcLogLevelIsDebug() {
1347 debug = log.New(os.Stderr, "", log.LstdFlags)
1348 }
1349
1350 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1351 if len(reposWithSeparateIndexingMetrics) > 0 {
1352 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1353 }
1354
1355 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1356 if len(deltaBuildRepositoriesAllowList) > 0 {
1357 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1358 }
1359
1360 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1361 if deltaShardNumberFallbackThreshold > 0 {
1362 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1363 } else {
1364 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1365 }
1366
1367 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1368 if len(reposShouldSkipSymbolsCalculation) > 0 {
1369 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1370 }
1371
1372 var sg Sourcegraph
1373 if rootURL.IsAbs() {
1374 var batchSize int
1375 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1376 batchSize, err = strconv.Atoi(v)
1377 if err != nil {
1378 return nil, fmt.Errorf("Invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1379 }
1380 }
1381
1382 opts := []SourcegraphClientOption{
1383 WithBatchSize(batchSize),
1384 WithShouldUseGRPC(conf.useGRPC),
1385 }
1386
1387 logger := sglog.Scoped("zoektConfigurationGRPCClient", "")
1388 client, err := dialGRPCClient(rootURL.Host, logger)
1389 if err != nil {
1390 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1391 }
1392
1393 opts = append(opts, WithGRPCClient(client))
1394 sg = newSourcegraphClient(rootURL, conf.hostname, opts...)
1395
1396 } else {
1397 sg = sourcegraphFake{
1398 RootDir: rootURL.String(),
1399 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1400 }
1401 }
1402
1403 if conf.indexConcurrency < 1 {
1404 conf.indexConcurrency = 1
1405 }
1406
1407 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
1408 if cpuCount < 1 {
1409 cpuCount = 1
1410 }
1411
1412 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1413
1414 return &Server{
1415 logger: logger,
1416 Sourcegraph: sg,
1417 IndexDir: conf.index,
1418 IndexConcurrency: int(conf.indexConcurrency),
1419 Interval: conf.interval,
1420 CPUCount: cpuCount,
1421 queue: *q,
1422 shardMerging: zoekt.ShardMergingEnabled(),
1423 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1424 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1425 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1426 hostname: conf.hostname,
1427 mergeOpts: mergeOpts{
1428 vacuumInterval: conf.vacuumInterval,
1429 mergeInterval: conf.mergeInterval,
1430 targetSizeBytes: conf.targetSize * 1024 * 1024,
1431 minSizeBytes: conf.minSize * 1024 * 1024,
1432 minAgeDays: conf.minAgeDays,
1433 maxPriority: conf.maxPriority,
1434 },
1435 }, err
1436}
1437
1438// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1439// for the indexed-search-configuration gRPC service.
1440//
1441// The default backoff strategy is modeled after the default settings used by
1442// retryablehttp.DefaultClient.
1443//
1444// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1445// - Unavailable
1446// - Aborted
1447//
1448//go:embed default_grpc_service_configuration.json
1449var defaultGRPCServiceConfigurationJSON string
1450
1451func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1452 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1453 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1454 return invoker(ctx, method, req, reply, cc, opts...)
1455 }
1456}
1457
1458func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1459 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1460 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1461 return streamer(ctx, desc, cc, method, opts...)
1462 }
1463}
1464
1465// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1466// This can be overridden by providing custom Server/Dial options.
1467const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1468
1469func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) {
1470 metrics := mustGetClientMetrics()
1471
1472 // If the service seems to be unavailable, this
1473 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1
1474 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s])
1475 retryOpts := []retry.CallOption{
1476 retry.WithMax(5),
1477 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)),
1478 retry.WithCodes(codes.Unavailable),
1479 }
1480
1481 opts := []grpc.DialOption{
1482 grpc.WithTransportCredentials(insecure.NewCredentials()),
1483 grpc.WithChainStreamInterceptor(
1484 metrics.StreamClientInterceptor(),
1485 messagesize.StreamClientInterceptor,
1486 internalActorStreamInterceptor(),
1487 internalerrs.LoggingStreamClientInterceptor(logger),
1488 internalerrs.PrometheusStreamClientInterceptor,
1489 retry.StreamClientInterceptor(retryOpts...),
1490 ),
1491 grpc.WithChainUnaryInterceptor(
1492 metrics.UnaryClientInterceptor(),
1493 messagesize.UnaryClientInterceptor,
1494 internalActorUnaryInterceptor(),
1495 internalerrs.LoggingUnaryClientInterceptor(logger),
1496 internalerrs.PrometheusUnaryClientInterceptor,
1497 retry.UnaryClientInterceptor(retryOpts...),
1498 ),
1499 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1500 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1501 }
1502
1503 opts = append(opts, additionalOpts...)
1504
1505 // Ensure that the message size options are set last, so they override any other
1506 // client-specific options that tweak the message size.
1507 //
1508 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1509 // take precedence over everything else with a uniform size setting that's easy to reason about.
1510 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1511
1512 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1513 // This is done lazily, so we can provide the client to use regardless of
1514 // whether we enabled gRPC or not initially.
1515 cc, err := grpc.Dial(addr, opts...)
1516 if err != nil {
1517 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1518 }
1519
1520 client := proto.NewZoektConfigurationServiceClient(cc)
1521 return client, nil
1522}
1523
1524// mustGetClientMetrics returns a singleton instance of the client metrics
1525// that are shared across all gRPC clients that this process creates.
1526//
1527// This function panics if the metrics cannot be registered with the default
1528// Prometheus registry.
1529func mustGetClientMetrics() *grpcprom.ClientMetrics {
1530 clientMetricsOnce.Do(func() {
1531 clientMetrics = grpcprom.NewClientMetrics(
1532 grpcprom.WithClientCounterOptions(),
1533 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
1534 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
1535 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
1536 )
1537
1538 prometheus.DefaultRegisterer.MustRegister(clientMetrics)
1539 })
1540
1541 return clientMetrics
1542}
1543
1544// addDefaultPort adds a default port to a URL if one is not specified.
1545//
1546// If the URL scheme is "http" and no port is specified, "80" is used.
1547// If the scheme is "https", "443" is used.
1548//
1549// The original URL is not mutated. A copy is modified and returned.
1550func addDefaultPort(original *url.URL) *url.URL {
1551 if original == nil {
1552 return nil // don't panic
1553 }
1554
1555 if !original.IsAbs() {
1556 return original // don't do anything if the URL doesn't look like a remote URL
1557 }
1558
1559 if original.Scheme == "http" && original.Port() == "" {
1560 u := cloneURL(original)
1561 u.Host = net.JoinHostPort(u.Host, "80")
1562 return u
1563 }
1564
1565 if original.Scheme == "https" && original.Port() == "" {
1566 u := cloneURL(original)
1567 u.Host = net.JoinHostPort(u.Host, "443")
1568 return u
1569 }
1570
1571 return original
1572}
1573
1574// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1575// This is copied from net/http/clone.go
1576func cloneURL(u *url.URL) *url.URL {
1577 if u == nil {
1578 return nil
1579 }
1580 u2 := new(url.URL)
1581 *u2 = *u
1582 if u.User != nil {
1583 u2.User = new(url.Userinfo)
1584 *u2.User = *u.User
1585 }
1586 return u2
1587}
1588
1589func main() {
1590 liblog := sglog.Init(sglog.Resource{
1591 Name: "zoekt-indexserver",
1592 Version: zoekt.Version,
1593 InstanceID: zoekt.HostnameBestEffort(),
1594 })
1595 defer liblog.Sync()
1596
1597 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1598 log.Fatal(err)
1599 }
1600}
1601
1602// mustGetBoolFromEnvironmentVariables is like getBoolFromEnvironmentVariables, but it panics
1603// if any of the provided environment variables fails to parse as a boolean.
1604func mustGetBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) bool {
1605 value, err := getBoolFromEnvironmentVariables(envVarNames, defaultBool)
1606 if err != nil {
1607 panic(err)
1608 }
1609
1610 return value
1611}
1612
1613// getBoolFromEnvironmentVariables returns the boolean defined by the first environment
1614// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set.
1615//
1616// An error is returned of the provided environment variables fails to parse as a boolean.
1617func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) {
1618 for _, envVar := range envVarNames {
1619 v := os.Getenv(envVar)
1620 if v == "" {
1621 continue
1622 }
1623
1624 b, err := strconv.ParseBool(v)
1625 if err != nil {
1626 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err)
1627 }
1628
1629 return b, nil
1630 }
1631
1632 return defaultBool, nil
1633}