fork of https://github.com/sourcegraph/zoekt
1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled
2// repositories on sourcegraph
3package main
4
5import (
6 "bytes"
7 "context"
8 _ "embed"
9 "encoding/json"
10 "errors"
11 "flag"
12 "fmt"
13 "html/template"
14 "io"
15 "io/fs"
16 "log"
17 "math"
18 "math/rand"
19 "net"
20 "net/http"
21 "net/url"
22 "os"
23 "os/exec"
24 "os/signal"
25 "path/filepath"
26 "runtime"
27 "sort"
28 "strconv"
29 "strings"
30 "sync"
31 "text/tabwriter"
32 "time"
33
34 grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/openmetrics/v2"
35 "github.com/keegancsmith/tmpfriend"
36 "github.com/peterbourgon/ff/v3/ffcli"
37 "github.com/prometheus/client_golang/prometheus"
38 "github.com/prometheus/client_golang/prometheus/promauto"
39 sglog "github.com/sourcegraph/log"
40 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
41 "github.com/sourcegraph/zoekt/grpc/internalerrs"
42 "github.com/sourcegraph/zoekt/grpc/messagesize"
43 "go.uber.org/automaxprocs/maxprocs"
44 "golang.org/x/net/trace"
45 "golang.org/x/sys/unix"
46 "google.golang.org/grpc"
47 "google.golang.org/grpc/credentials/insecure"
48 "google.golang.org/grpc/metadata"
49
50 "github.com/sourcegraph/mountinfo"
51
52 "github.com/sourcegraph/zoekt"
53 "github.com/sourcegraph/zoekt/build"
54 "github.com/sourcegraph/zoekt/debugserver"
55 "github.com/sourcegraph/zoekt/internal/profiler"
56)
57
58var (
59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
60 Name: "resolve_revisions_seconds",
61 Help: "A histogram of latencies for resolving all repository revisions.",
62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
63 })
64
65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
66 Name: "resolve_revision_seconds",
67 Help: "A histogram of latencies for resolving a repository revision.",
68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
69 }, []string{"success"}) // success=true|false
70
71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
72 Name: "get_index_options_total",
73 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
74 })
75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
76 Name: "get_index_options_error_total",
77 Help: "The total number of times we failed to get index options for a repository.",
78 })
79
80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
81 Name: "index_repo_seconds",
82 Help: "A histogram of latencies for indexing a repository.",
83 Buckets: prometheus.ExponentialBucketsRange(
84 (100 * time.Millisecond).Seconds(),
85 (40*time.Minute + indexTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
86 20),
87 }, []string{
88 "state", // state is an indexState
89 "name", // name of the repository that was indexed
90 })
91
92 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
93 Name: "index_fetch_seconds",
94 Help: "A histogram of latencies for fetching a repository.",
95 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
96 }, []string{
97 "success", // true|false
98 "name", // the name of the repository that the commits were fetched from
99 })
100
101 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
102 Name: "index_incremental_index_state",
103 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
104 }, []string{"state"}) // state is build.IndexState
105
106 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
107 Name: "index_num_indexed",
108 Help: "Number of indexed repos by code host",
109 })
110
111 metricNumAssigned = promauto.NewGauge(prometheus.GaugeOpts{
112 Name: "index_num_assigned",
113 Help: "Number of repos assigned to this indexer by code host",
114 })
115
116 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
117 Name: "index_failing_total",
118 Help: "Counts failures to index (indexing activity, should be used with rate())",
119 })
120
121 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
122 Name: "index_indexing_total",
123 Help: "Counts indexings (indexing activity, should be used with rate())",
124 })
125
126 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
127 Name: "index_num_stopped_tracking_total",
128 Help: "Counts the number of repos we stopped tracking.",
129 })
130
131 clientMetricsOnce sync.Once
132 clientMetrics *grpc_prometheus.ClientMetrics
133)
134
135// set of repositories that we want to capture separate indexing metrics for
136var reposWithSeparateIndexingMetrics = make(map[string]struct{})
137
138type indexState string
139
140const (
141 indexStateFail indexState = "fail"
142 indexStateSuccess indexState = "success"
143 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
144 indexStateNoop indexState = "noop" // We didn't need to update index
145 indexStateEmpty indexState = "empty" // index is empty (empty repo)
146)
147
148// Server is the main functionality of zoekt-sourcegraph-indexserver. It
149// exists to conveniently use all the options passed in via func main.
150type Server struct {
151 logger sglog.Logger
152
153 Sourcegraph Sourcegraph
154 BatchSize int
155
156 // IndexDir is the index directory to use.
157 IndexDir string
158
159 // IndexConcurrency is the number of repositories we index at once.
160 IndexConcurrency int
161
162 // Interval is how often we sync with Sourcegraph.
163 Interval time.Duration
164 // CPUCount is the amount of parallelism to use when indexing a
165 // repository.
166 CPUCount int
167
168 queue Queue
169
170 // muIndexDir protects the index directory from concurrent access.
171 muIndexDir indexMutex
172
173 // If true, shard merging is enabled.
174 shardMerging bool
175
176 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
177 // use delta-builds for instead of normal builds
178 deltaBuildRepositoriesAllowList map[string]struct{}
179
180 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
181 // before attempting a delta build.
182 deltaShardNumberFallbackThreshold uint64
183
184 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
185 // we skip calculating symbols metadata for during builds
186 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
187
188 hostname string
189
190 mergeOpts mergeOpts
191}
192
193var debug = log.New(io.Discard, "", log.LstdFlags)
194
195// our index commands should output something every 100mb they process.
196//
197// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
198// time." famous last words. A client was indexing a monorepo with 42
199// cores... 5m was not enough.
200const noOutputTimeout = 30 * time.Minute
201
202func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
203 out := &synchronizedBuffer{}
204 cmd.Stdout = out
205 cmd.Stderr = out
206
207 tr.LazyPrintf("%s", cmd.Args)
208
209 defer func() {
210 if err != nil {
211 outS := out.String()
212 tr.LazyPrintf("failed: %v", err)
213 tr.LazyPrintf("output: %s", out)
214 tr.SetError()
215 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
216 }
217 }()
218
219 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
220
221 if err := cmd.Start(); err != nil {
222 return err
223 }
224
225 errC := make(chan error)
226 go func() {
227 errC <- cmd.Wait()
228 }()
229
230 // This channel is set after we have sent sigquit. It allows us to follow up
231 // with a sigkill if the process doesn't quit after sigquit.
232 kill := make(<-chan time.Time)
233
234 lastLen := 0
235 for {
236 select {
237 case <-time.After(noOutputTimeout):
238 // Periodically check if we have had output. If not kill the process.
239 if out.Len() != lastLen {
240 lastLen = out.Len()
241 log.Printf("still running %s", cmd.Args)
242 } else {
243 // Send quit (C-\) first so we get a stack dump.
244 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
245 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
246 log.Println("quit failed:", err)
247 }
248
249 // send sigkill if still running in 10s
250 kill = time.After(10 * time.Second)
251 }
252
253 case <-kill:
254 log.Printf("still running, killing %s", cmd.Args)
255 if err := cmd.Process.Kill(); err != nil {
256 log.Println("kill failed:", err)
257 }
258
259 case err := <-errC:
260 if err != nil {
261 return err
262 }
263
264 tr.LazyPrintf("success")
265 return nil
266 }
267 }
268}
269
270// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
271// monitor the buffer while it is being written to.
272type synchronizedBuffer struct {
273 mu sync.Mutex
274 b bytes.Buffer
275}
276
277func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
278 sb.mu.Lock()
279 defer sb.mu.Unlock()
280 return sb.b.Write(p)
281}
282
283func (sb *synchronizedBuffer) Len() int {
284 sb.mu.Lock()
285 defer sb.mu.Unlock()
286 return sb.b.Len()
287}
288
289func (sb *synchronizedBuffer) String() string {
290 sb.mu.Lock()
291 defer sb.mu.Unlock()
292 return sb.b.String()
293}
294
295// pauseFileName if present in IndexDir will stop index jobs from
296// running. This is to make it possible to experiment with the content of the
297// IndexDir without the indexserver writing to it.
298const pauseFileName = "PAUSE"
299
300// Run the sync loop. This blocks forever.
301func (s *Server) Run() {
302 removeIncompleteShards(s.IndexDir)
303
304 // Start a goroutine which updates the queue with commits to index.
305 go func() {
306 // We update the list of indexed repos every Interval. To speed up manual
307 // testing we also listen for SIGUSR1 to trigger updates.
308 //
309 // "pkill -SIGUSR1 zoekt-sourcegra"
310 for range jitterTicker(s.Interval, unix.SIGUSR1) {
311 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
312 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
313 continue
314 }
315
316 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
317 if err != nil {
318 log.Printf("error listing repos: %s", err)
319 continue
320 }
321
322 debug.Printf("updating index queue with %d repositories", len(repos.IDs))
323
324 // Stop indexing repos we don't need to track anymore
325 removed := s.queue.MaybeRemoveMissing(repos.IDs)
326 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
327 if len(removed) > 0 {
328 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
329 }
330
331 cleanupDone := make(chan struct{})
332 go func() {
333 defer close(cleanupDone)
334 s.muIndexDir.Global(func() {
335 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
336 })
337 }()
338
339 repos.IterateIndexOptions(s.queue.AddOrUpdate)
340
341 // IterateIndexOptions will only iterate over repositories that have
342 // changed since we last called list. However, we want to add all IDs
343 // back onto the queue just to check that what is on disk is still
344 // correct. This will use the last IndexOptions we stored in the
345 // queue. The repositories not on the queue (missing) need a forced
346 // fetch of IndexOptions.
347 missing := s.queue.Bump(repos.IDs)
348 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
349
350 setCompoundShardCounter(s.IndexDir)
351
352 <-cleanupDone
353 }
354 }()
355
356 go func() {
357 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
358 if s.shardMerging {
359 s.vacuum()
360 }
361 }
362 }()
363
364 go func() {
365 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
366 if s.shardMerging {
367 s.doMerge()
368 }
369 }
370 }()
371
372 for i := 0; i < s.IndexConcurrency; i++ {
373 go s.processQueue()
374 }
375
376 // block forever
377 select {}
378}
379
380// formatList returns a comma-separated list of the first min(len(v), m) items.
381func formatListUint32(v []uint32, m int) string {
382 if len(v) < m {
383 m = len(v)
384 }
385
386 sb := strings.Builder{}
387 for i := 0; i < m; i++ {
388 fmt.Fprintf(&sb, "%d, ", v[i])
389 }
390
391 if len(v) > m {
392 sb.WriteString("...")
393 }
394
395 return strings.TrimRight(sb.String(), ", ")
396}
397
398func (s *Server) processQueue() {
399 for {
400 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
401 time.Sleep(time.Second)
402 continue
403 }
404
405 opts, ok := s.queue.Pop()
406 if !ok {
407 time.Sleep(time.Second)
408 continue
409 }
410
411 args := s.indexArgs(opts)
412
413 ran := s.muIndexDir.With(opts.Name, func() {
414 // only record time taken once we hold the lock. This avoids us
415 // recording time taken while merging/cleanup runs.
416 start := time.Now()
417
418 state, err := s.Index(args)
419
420 elapsed := time.Since(start)
421
422 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
423
424 if err != nil {
425 log.Printf("error indexing %s: %s", args.String(), err)
426 }
427
428 switch state {
429 case indexStateSuccess:
430 var branches []string
431 for _, b := range args.Branches {
432 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
433 }
434 s.logger.Info("updated index",
435 sglog.String("repo", args.Name),
436 sglog.Uint32("id", args.RepoID),
437 sglog.Strings("branches", branches),
438 sglog.Duration("duration", elapsed),
439 )
440 case indexStateSuccessMeta:
441 log.Printf("updated meta %s in %v", args.String(), elapsed)
442 }
443 s.queue.SetIndexed(opts, state)
444 })
445
446 if !ran {
447 // Someone else is processing the repository. We can just skip this job
448 // since the repository will be added back to the queue and we will
449 // converge to the correct behaviour.
450 debug.Printf("index job for repository already running: %s", args)
451 continue
452 }
453 }
454}
455
456// repoNameForMetric returns a normalized version of the given repository name that is
457// suitable for use with Prometheus metrics.
458func repoNameForMetric(repo string) string {
459 // Check to see if we want to be able to capture separate indexing metrics for this repository.
460 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
461 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
462 return repo
463 }
464
465 return ""
466}
467
468func batched(slice []uint32, size int) <-chan []uint32 {
469 c := make(chan []uint32)
470 go func() {
471 for len(slice) > 0 {
472 if size > len(slice) {
473 size = len(slice)
474 }
475 c <- slice[:size]
476 slice = slice[size:]
477 }
478 close(c)
479 }()
480 return c
481}
482
483// jitterTicker returns a ticker which ticks with a jitter. Each tick is
484// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
485//
486// sig is a list of signals which also cause the ticker to fire. This is a
487// convenience to allow manually triggering of the ticker.
488func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
489 ticker := make(chan struct{})
490
491 go func() {
492 for {
493 ticker <- struct{}{}
494 ns := int64(d)
495 jitter := rand.Int63n(ns)
496 time.Sleep(time.Duration(ns/2 + jitter))
497 }
498 }()
499
500 go func() {
501 if len(sig) == 0 {
502 return
503 }
504
505 c := make(chan os.Signal, 1)
506 signal.Notify(c, sig...)
507 for range c {
508 ticker <- struct{}{}
509 }
510 }()
511
512 return ticker
513}
514
515// Index starts an index job for repo name at commit.
516func (s *Server) Index(args *indexArgs) (state indexState, err error) {
517 tr := trace.New("index", args.Name)
518
519 defer func() {
520 if err != nil {
521 tr.SetError()
522 tr.LazyPrintf("error: %v", err)
523 state = indexStateFail
524 metricFailingTotal.Inc()
525 }
526 tr.LazyPrintf("state: %s", state)
527 tr.Finish()
528 }()
529
530 tr.LazyPrintf("branches: %v", args.Branches)
531
532 if len(args.Branches) == 0 {
533 return indexStateEmpty, createEmptyShard(args)
534 }
535
536 repositoryName := args.Name
537 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
538 tr.LazyPrintf("marking this repository for delta build")
539 args.UseDelta = true
540 }
541
542 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
543
544 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
545 tr.LazyPrintf("skipping symbols calculation")
546 args.Symbols = false
547 }
548
549 reason := "forced"
550
551 if args.Incremental {
552 bo := args.BuildOptions()
553 bo.SetDefaults()
554 incrementalState, fn := bo.IndexState()
555 reason = string(incrementalState)
556 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
557
558 switch incrementalState {
559 case build.IndexStateEqual:
560 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn)
561 return indexStateNoop, nil
562
563 case build.IndexStateMeta:
564 log.Printf("updating index.meta %s", args.String())
565
566 if err := mergeMeta(bo); err != nil {
567 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
568 } else {
569 return indexStateSuccessMeta, nil
570 }
571
572 case build.IndexStateCorrupt:
573 log.Printf("falling back to full update: corrupt index: %s", args.String())
574 }
575 }
576
577 log.Printf("updating index %s reason=%s", args.String(), reason)
578
579 metricIndexingTotal.Inc()
580 c := gitIndexConfig{
581 runCmd: func(cmd *exec.Cmd) error {
582 return s.loggedRun(tr, cmd)
583 },
584
585 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
586 return args.BuildOptions().FindRepositoryMetadata()
587 },
588 }
589
590 err = gitIndex(c, args, s.Sourcegraph, s.logger)
591 if err != nil {
592 return indexStateFail, err
593 }
594
595 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
596 s.logger.Error("failed to update index status",
597 sglog.String("repo", args.Name),
598 sglog.Uint32("id", args.RepoID),
599 sglogBranches("branches", args.Branches),
600 sglog.Error(err),
601 )
602 }
603
604 return indexStateSuccess, nil
605}
606
607// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
608// it can update the zoekt_repos table.
609func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
610 // We need to read from disk for IndexTime.
611 _, metadata, ok, err := c.findRepositoryMetadata(args)
612 if err != nil {
613 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
614 }
615 if !ok {
616 return errors.New("failed to find metadata for new/updated index")
617 }
618
619 status := []indexStatus{{
620 RepoID: args.RepoID,
621 Branches: args.Branches,
622 IndexTimeUnix: metadata.IndexTime.Unix(),
623 }}
624 if err := sg.UpdateIndexStatus(status); err != nil {
625 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
626 }
627
628 return nil
629}
630
631func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
632 ss := make([]string, len(branches))
633 for i, b := range branches {
634 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
635 }
636 return sglog.Strings(key, ss)
637}
638
639func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
640 return &indexArgs{
641 IndexOptions: opts,
642
643 IndexDir: s.IndexDir,
644 Parallelism: s.CPUCount,
645
646 Incremental: true,
647
648 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
649 FileLimit: 1 << 20,
650 }
651}
652
653func createEmptyShard(args *indexArgs) error {
654 bo := args.BuildOptions()
655 bo.SetDefaults()
656 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
657
658 if args.Incremental && bo.IncrementalSkipIndexing() {
659 return nil
660 }
661
662 builder, err := build.NewBuilder(*bo)
663 if err != nil {
664 return err
665 }
666 return builder.Finish()
667}
668
669// addDebugHandlers adds handlers specific to indexserver.
670func (s *Server) addDebugHandlers(mux *http.ServeMux) {
671 // Sourcegraph's site admin view requires indexserver to serve it's admin view
672 // on "/".
673 mux.Handle("/", http.HandlerFunc(s.handleRoot))
674
675 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
676 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
677 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
678 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
679 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
680 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
681}
682
683func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
684 if r.Method != "GET" {
685 w.Header().Set("Allow", "GET")
686 w.WriteHeader(http.StatusMethodNotAllowed)
687 return
688 }
689
690 response := struct {
691 Hostname string
692 }{
693 Hostname: s.hostname,
694 }
695
696 b, err := json.Marshal(response)
697 if err != nil {
698 http.Error(w, err.Error(), http.StatusInternalServerError)
699 return
700 }
701
702 w.Header().Set("Content-Type", "application/json; charset=utf-8")
703 w.Write(b)
704}
705
706var rootTmpl = template.Must(template.New("name").Parse(`
707<html>
708 <body>
709 <a href="debug">Debug</a><br />
710 <a href="debug/requests">Traces</a><br />
711 {{.IndexMsg}}<br />
712 <br />
713 <h3>Reindex</h3>
714 {{if .Repos}}
715 <a href="?show_repos=false">hide repos</a><br />
716 <table style="margin-top: 20px">
717 <th style="text-align:left">Name</th>
718 <th style="text-align:left">ID</th>
719 {{range .Repos}}
720 <tr>
721 <td>{{.Name}}</td>
722 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
723 </tr>
724 {{end}}
725 </table>
726 {{else}}
727 <a href="?show_repos=true">show repos</a><br />
728 {{end}}
729 </body>
730</html>
731`))
732
733func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
734 if r.Method != "GET" {
735 w.Header().Set("Allow", "GET")
736 w.WriteHeader(http.StatusMethodNotAllowed)
737 return
738 }
739
740 values := r.URL.Query()
741
742 // ?id=
743 indexMsg := ""
744 if v := values.Get("id"); v != "" {
745 id, err := strconv.Atoi(v)
746 if err != nil {
747 http.Error(w, err.Error(), http.StatusBadRequest)
748 return
749 }
750 indexMsg, _ = s.forceIndex(uint32(id))
751 }
752
753 // ?show_repos=
754 showRepos := false
755 if v := values.Get("show_repos"); v != "" {
756 showRepos, _ = strconv.ParseBool(v)
757 }
758
759 type Repo struct {
760 ID uint32
761 Name string
762 }
763 var data struct {
764 Repos []Repo
765 IndexMsg string
766 }
767
768 data.IndexMsg = indexMsg
769
770 if showRepos {
771 s.queue.Iterate(func(opts *IndexOptions) {
772 data.Repos = append(data.Repos, Repo{
773 ID: opts.RepoID,
774 Name: opts.Name,
775 })
776 })
777 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
778 }
779
780 _ = rootTmpl.Execute(w, data)
781}
782
783// handleReindex triggers a reindex asynocronously. If a reindex was triggered
784// the request returns with status 202. The caller can infer the new state of
785// the index by calling List.
786func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
787 if r.Method != http.MethodPost {
788 w.Header().Set("Allow", http.MethodPost)
789 w.WriteHeader(http.StatusMethodNotAllowed)
790 return
791 }
792
793 err := r.ParseForm()
794 if err != nil {
795 http.Error(w, err.Error(), http.StatusBadRequest)
796 return
797 }
798
799 id, err := strconv.Atoi(r.Form.Get("repo"))
800 if err != nil {
801 http.Error(w, err.Error(), http.StatusBadRequest)
802 return
803 }
804
805 go func() { s.forceIndex(uint32(id)) }()
806
807 // 202 Accepted
808 w.WriteHeader(http.StatusAccepted)
809}
810
811func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
812 withIndexed := true
813 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
814 withIndexed = b
815 }
816
817 var indexed []uint32
818 if withIndexed {
819 indexed = listIndexed(s.IndexDir)
820 }
821
822 repos, err := s.Sourcegraph.List(r.Context(), indexed)
823 if err != nil {
824 http.Error(w, err.Error(), http.StatusInternalServerError)
825 return
826 }
827
828 bw := bytes.Buffer{}
829
830 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
831
832 _, err = fmt.Fprintf(tw, "ID\tName\n")
833 if err != nil {
834 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
835 return
836 }
837
838 s.queue.mu.Lock()
839 name := ""
840 for _, id := range repos.IDs {
841 if item := s.queue.get(id); item != nil {
842 name = item.opts.Name
843 } else {
844 name = ""
845 }
846 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
847 if err != nil {
848 debug.Printf("handleDebugList: %s\n", err.Error())
849 }
850 }
851 s.queue.mu.Unlock()
852
853 if err != nil {
854 http.Error(w, err.Error(), http.StatusInternalServerError)
855 return
856 }
857
858 err = tw.Flush()
859 if err != nil {
860 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
861 return
862 }
863
864 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
865
866 if _, err := io.Copy(w, &bw); err != nil {
867 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
868 return
869 }
870}
871
872// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
873// can run this command during periods of low usage (evenings, weekends) to
874// trigger an initial merge run. In the steady-state, merges happen rarely, even
875// on busy instances, and users can rely on automatic merging instead.
876func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
877
878 // A merge operation can take very long, depending on the number merges and the
879 // target size of the compound shards. We run the merge in the background and
880 // return immediately to the user.
881 //
882 // We track the status of the merge with metricShardMergingRunning.
883 go func() {
884 s.doMerge()
885 }()
886 _, _ = w.Write([]byte("merging enqueued\n"))
887}
888
889func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
890 indexed := listIndexed(s.IndexDir)
891
892 bw := bytes.Buffer{}
893
894 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
895
896 _, err := fmt.Fprintf(tw, "ID\tName\n")
897 if err != nil {
898 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
899 return
900 }
901
902 s.queue.mu.Lock()
903 name := ""
904 for _, id := range indexed {
905 if item := s.queue.get(id); item != nil {
906 name = item.opts.Name
907 } else {
908 name = ""
909 }
910 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
911 if err != nil {
912 debug.Printf("handleDebugIndexed: %s\n", err.Error())
913 }
914 }
915 s.queue.mu.Unlock()
916
917 if err != nil {
918 http.Error(w, err.Error(), http.StatusInternalServerError)
919 return
920 }
921
922 err = tw.Flush()
923 if err != nil {
924 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
925 return
926 }
927
928 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
929
930 if _, err := io.Copy(w, &bw); err != nil {
931 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
932 return
933 }
934}
935
936// forceIndex will run the index job for repo name now. It will return always
937// return a string explaining what it did, even if it failed.
938func (s *Server) forceIndex(id uint32) (string, error) {
939 var opts IndexOptions
940 var err error
941 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
942 opts = o
943 }, func(_ uint32, e error) {
944 err = e
945 }, id)
946 if err != nil {
947 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
948 }
949
950 args := s.indexArgs(opts)
951 args.Incremental = false // force re-index
952
953 var state indexState
954 ran := s.muIndexDir.With(opts.Name, func() {
955 state, err = s.Index(args)
956 })
957 if !ran {
958 return fmt.Sprintf("index job for repository already running: %s", args), nil
959 }
960 if err != nil {
961 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
962 }
963 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
964}
965
966func listIndexed(indexDir string) []uint32 {
967 index := getShards(indexDir)
968 metricNumIndexed.Set(float64(len(index)))
969 repoIDs := make([]uint32, 0, len(index))
970 for id := range index {
971 repoIDs = append(repoIDs, id)
972 }
973 sort.Slice(repoIDs, func(i, j int) bool {
974 return repoIDs[i] < repoIDs[j]
975 })
976 return repoIDs
977}
978
979// setupTmpDir sets up a temporary directory on the same volume as the
980// indexes.
981//
982// If main is true we will delete older temp directories left around. main is
983// false when this is a debug command.
984func setupTmpDir(main bool, index string) error {
985 // change the target tmp directory depending on if its our main daemon or a
986 // debug sub command.
987 dir := ".indexserver.debug.tmp"
988 if main {
989 dir = ".indexserver.tmp"
990 }
991
992 tmpRoot := filepath.Join(index, dir)
993 if err := os.MkdirAll(tmpRoot, 0755); err != nil {
994 return err
995 }
996 if !tmpfriend.IsTmpFriendDir(tmpRoot) {
997 _, err := tmpfriend.RootTempDir(tmpRoot)
998 return err
999 }
1000 return nil
1001}
1002
1003func printMetaData(fn string) error {
1004 repo, indexMeta, err := zoekt.ReadMetadataPath(fn)
1005 if err != nil {
1006 return err
1007 }
1008
1009 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1010 if err != nil {
1011 return err
1012 }
1013
1014 err = json.NewEncoder(os.Stdout).Encode(repo)
1015 if err != nil {
1016 return err
1017 }
1018 return nil
1019}
1020
1021func printShardStats(fn string) error {
1022 f, err := os.Open(fn)
1023 if err != nil {
1024 return err
1025 }
1026
1027 iFile, err := zoekt.NewIndexFile(f)
1028 if err != nil {
1029 return err
1030 }
1031
1032 return zoekt.PrintNgramStats(iFile)
1033}
1034
1035func srcLogLevelIsDebug() bool {
1036 lvl := os.Getenv(sglog.EnvLogLevel)
1037 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1038}
1039
1040func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1041 v := os.Getenv(k)
1042 if v == "" {
1043 return defaultVal
1044 }
1045 i, err := strconv.ParseInt(v, 10, 64)
1046 if err != nil {
1047 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1048 }
1049 return i
1050}
1051
1052func getEnvWithDefaultInt(k string, defaultVal int) int {
1053 v := os.Getenv(k)
1054 if v == "" {
1055 return defaultVal
1056 }
1057 i, err := strconv.Atoi(k)
1058 if err != nil {
1059 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1060 }
1061 return i
1062}
1063
1064func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1065 v := os.Getenv(k)
1066 if v == "" {
1067 return defaultVal
1068 }
1069 i, err := strconv.ParseUint(v, 10, 64)
1070 if err != nil {
1071 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1072 }
1073 return i
1074}
1075
1076func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1077 v := os.Getenv(k)
1078 if v == "" {
1079 return defaultVal
1080 }
1081 f, err := strconv.ParseFloat(v, 64)
1082 if err != nil {
1083 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1084 }
1085 return f
1086}
1087
1088func getEnvWithDefaultString(k string, defaultVal string) string {
1089 v := os.Getenv(k)
1090 if v == "" {
1091 return defaultVal
1092 }
1093 return v
1094}
1095
1096func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1097 v := os.Getenv(k)
1098 if v == "" {
1099 return defaultVal
1100 }
1101
1102 d, err := time.ParseDuration(v)
1103 if err != nil {
1104 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1105 }
1106 return d
1107}
1108
1109func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1110 v := os.Getenv(k)
1111 if v == "" {
1112 return defaultVal
1113 }
1114
1115 b, err := strconv.ParseBool(v)
1116 if err != nil {
1117 log.Fatalf("error parsing ENV %s to bool: %s", k, err)
1118 }
1119 return b
1120}
1121
1122func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1123 set := map[string]struct{}{}
1124 for _, v := range strings.Split(os.Getenv(k), ",") {
1125 v = strings.TrimSpace(v)
1126 if v != "" {
1127 set[v] = struct{}{}
1128 }
1129 }
1130 return set
1131}
1132
1133func joinStringSet(set map[string]struct{}, sep string) string {
1134 var xs []string
1135 for x := range set {
1136 xs = append(xs, x)
1137 }
1138
1139 return strings.Join(xs, sep)
1140}
1141
1142func setCompoundShardCounter(indexDir string) {
1143 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt"))
1144 if err != nil {
1145 log.Printf("setCompoundShardCounter: %s\n", err)
1146 return
1147 }
1148 metricNumberCompoundShards.Set(float64(len(fns)))
1149}
1150
1151func rootCmd() *ffcli.Command {
1152 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1153 conf := rootConfig{
1154 Main: true,
1155 }
1156 conf.registerRootFlags(rootFs)
1157
1158 return &ffcli.Command{
1159 FlagSet: rootFs,
1160 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1161 Subcommands: []*ffcli.Command{debugCmd()},
1162 Exec: func(ctx context.Context, args []string) error {
1163 return startServer(conf)
1164 },
1165 }
1166}
1167
1168type rootConfig struct {
1169 // Main is true if this rootConfig is for our main long running command (the
1170 // indexserver). Debug commands should not set this value. This is used to
1171 // determine if we need to run tmpfriend.
1172 Main bool
1173
1174 root string
1175 interval time.Duration
1176 index string
1177 indexConcurrency int64
1178 listen string
1179 hostname string
1180 cpuFraction float64
1181 blockProfileRate int
1182
1183 // config values related to shard merging
1184 vacuumInterval time.Duration
1185 mergeInterval time.Duration
1186 targetSize int64
1187 minSize int64
1188 minAgeDays int
1189 maxPriority float64
1190
1191 // config values related to backoff indexing repos with one or more consecutive failures
1192 backoffDuration time.Duration
1193 maxBackoffDuration time.Duration
1194
1195 // useGRPC is true if we should use the gRPC API to talk to Sourcegraph.
1196 useGRPC bool
1197}
1198
1199func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1200 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1201 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1202 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of concurrent index jobs to run.")
1203 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use")
1204 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1205 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1206 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1207 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate")
1208 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1209 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1210 fs.BoolVar(&rc.useGRPC, "use_grpc", getEnvWithDefaultBool("GRPC_ENABLED", false), "use the gRPC API to talk to Sourcegraph")
1211
1212 // flags related to shard merging
1213 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1214 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1215 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB")
1216 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB")
1217 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1218 fs.Float64Var(&rc.maxPriority, "merge_max_priority", getEnvWithDefaultFloat64("SRC_MERGE_MAX_PRIORITY", 100), "the maximum priority a shard can have to be considered for merging.")
1219}
1220
1221func startServer(conf rootConfig) error {
1222 s, err := newServer(conf)
1223 if err != nil {
1224 return err
1225 }
1226
1227 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate)
1228 setCompoundShardCounter(s.IndexDir)
1229
1230 if conf.listen != "" {
1231
1232 mux := http.NewServeMux()
1233 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1234 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1235 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1236 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1237 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1238 }...)
1239 s.addDebugHandlers(mux)
1240
1241 go func() {
1242 debug.Printf("serving HTTP on %s", conf.listen)
1243 log.Fatal(http.ListenAndServe(conf.listen, mux))
1244
1245 }()
1246
1247 // Serve mux on a unix domain socket on a best-effort-basis so that
1248 // webserver can call the endpoints via the shared filesystem.
1249 //
1250 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1251 // on the socket due to permission errors. See
1252 // https://github.com/docker/for-mac/issues/6239
1253 go func() {
1254 serveHTTPOverSocket := func() error {
1255 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1256 // We cannot bind a socket to an existing pathname.
1257 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1258 return fmt.Errorf("error removing socket file: %s", socket)
1259 }
1260 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1261 // unixpacket).
1262 l, err := net.Listen("unix", socket)
1263 if err != nil {
1264 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1265 }
1266 // Indexserver (root) and webserver (Sourcegraph) run with
1267 // different users. Per default, the socket is created with
1268 // permission 755 (root root), which doesn't let webserver write to
1269 // it.
1270 //
1271 // See https://github.com/golang/go/issues/11822 for more context.
1272 if err := os.Chmod(socket, 0777); err != nil {
1273 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1274 }
1275 debug.Printf("serving HTTP on %s", socket)
1276 return http.Serve(l, mux)
1277 }
1278 debug.Print(serveHTTPOverSocket())
1279 }()
1280 }
1281
1282 oc := &ownerChecker{
1283 Path: filepath.Join(conf.index, "owner.txt"),
1284 Hostname: conf.hostname,
1285 }
1286 go oc.Run()
1287
1288 logger := sglog.Scoped("metricsRegistration", "")
1289 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1290
1291 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1292 prometheus.DefaultRegisterer.MustRegister(c)
1293
1294 s.Run()
1295 return nil
1296}
1297
1298func newServer(conf rootConfig) (*Server, error) {
1299 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1300 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1301 }
1302 if conf.index == "" {
1303 return nil, fmt.Errorf("must set -index")
1304 }
1305 if conf.root == "" {
1306 return nil, fmt.Errorf("must set -sourcegraph_url")
1307 }
1308 rootURL, err := url.Parse(conf.root)
1309 if err != nil {
1310 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1311 }
1312
1313 rootURL = addDefaultPort(rootURL)
1314
1315 // Tune GOMAXPROCS to match Linux container CPU quota.
1316 _, _ = maxprocs.Set()
1317
1318 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler.
1319 // The block profiler is disabled by default and should be enabled with care in production
1320 runtime.SetBlockProfileRate(conf.blockProfileRate)
1321
1322 // Automatically prepend our own path at the front, to minimize
1323 // required configuration.
1324 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1325 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1326 }
1327
1328 if _, err := os.Stat(conf.index); err != nil {
1329 if err := os.MkdirAll(conf.index, 0755); err != nil {
1330 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1331 }
1332 }
1333
1334 if err := setupTmpDir(conf.Main, conf.index); err != nil {
1335 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1336 }
1337
1338 if srcLogLevelIsDebug() {
1339 debug = log.New(os.Stderr, "", log.LstdFlags)
1340 }
1341
1342 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1343 if len(reposWithSeparateIndexingMetrics) > 0 {
1344 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1345 }
1346
1347 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1348 if len(deltaBuildRepositoriesAllowList) > 0 {
1349 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1350 }
1351
1352 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1353 if deltaShardNumberFallbackThreshold > 0 {
1354 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1355 } else {
1356 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1357 }
1358
1359 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1360 if len(reposShouldSkipSymbolsCalculation) > 0 {
1361 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1362 }
1363
1364 var sg Sourcegraph
1365 if rootURL.IsAbs() {
1366 var batchSize int
1367 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1368 batchSize, err = strconv.Atoi(v)
1369 if err != nil {
1370 return nil, fmt.Errorf("Invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1371 }
1372 }
1373
1374 opts := []SourcegraphClientOption{
1375 WithBatchSize(batchSize),
1376 WithShouldUseGRPC(conf.useGRPC),
1377 }
1378
1379 logger := sglog.Scoped("zoektConfigurationGRPCClient", "")
1380 client, err := dialGRPCClient(rootURL.Host, logger)
1381 if err != nil {
1382 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1383 }
1384
1385 opts = append(opts, WithGRPCClient(client))
1386 sg = newSourcegraphClient(rootURL, conf.hostname, opts...)
1387
1388 } else {
1389 sg = sourcegraphFake{
1390 RootDir: rootURL.String(),
1391 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1392 }
1393 }
1394
1395 if conf.indexConcurrency < 1 {
1396 conf.indexConcurrency = 1
1397 }
1398
1399 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
1400 if cpuCount < 1 {
1401 cpuCount = 1
1402 }
1403
1404 logger := sglog.Scoped("server", "periodically reindexes enabled repositories on sourcegraph")
1405
1406 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1407
1408 return &Server{
1409 logger: logger,
1410 Sourcegraph: sg,
1411 IndexDir: conf.index,
1412 IndexConcurrency: int(conf.indexConcurrency),
1413 Interval: conf.interval,
1414 CPUCount: cpuCount,
1415 queue: *q,
1416 shardMerging: zoekt.ShardMergingEnabled(),
1417 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1418 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1419 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1420 hostname: conf.hostname,
1421 mergeOpts: mergeOpts{
1422 vacuumInterval: conf.vacuumInterval,
1423 mergeInterval: conf.mergeInterval,
1424 targetSizeBytes: conf.targetSize * 1024 * 1024,
1425 minSizeBytes: conf.minSize * 1024 * 1024,
1426 minAgeDays: conf.minAgeDays,
1427 maxPriority: conf.maxPriority,
1428 },
1429 }, err
1430}
1431
1432// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1433// for the indexed-search-configuration gRPC service.
1434//
1435// The default backoff strategy is modeled after the default settings used by
1436// retryablehttp.DefaultClient.
1437//
1438// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1439// - Unavailable
1440// - Aborted
1441//
1442//go:embed default_grpc_service_configuration.json
1443var defaultGRPCServiceConfigurationJSON string
1444
1445func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1446 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1447 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1448 return invoker(ctx, method, req, reply, cc, opts...)
1449 }
1450}
1451
1452func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1453 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1454 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1455 return streamer(ctx, desc, cc, method, opts...)
1456 }
1457}
1458
1459// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1460// This can be overridden by providing custom Server/Dial options.
1461const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1462
1463func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) {
1464 metrics := mustGetClientMetrics()
1465
1466 opts := []grpc.DialOption{
1467 grpc.WithTransportCredentials(insecure.NewCredentials()),
1468 grpc.WithChainStreamInterceptor(
1469 grpc_prometheus.StreamClientInterceptor(metrics),
1470 internalActorStreamInterceptor(),
1471 internalerrs.LoggingStreamClientInterceptor(logger),
1472 internalerrs.PrometheusStreamClientInterceptor,
1473 ),
1474 grpc.WithChainUnaryInterceptor(
1475 grpc_prometheus.UnaryClientInterceptor(metrics),
1476 internalActorUnaryInterceptor(),
1477 internalerrs.LoggingUnaryClientInterceptor(logger),
1478 internalerrs.PrometheusUnaryClientInterceptor,
1479 ),
1480 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1481 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1482 }
1483
1484 opts = append(opts, additionalOpts...)
1485
1486 // Ensure that the message size options are set last, so they override any other
1487 // client-specific options that tweak the message size.
1488 //
1489 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1490 // take precedence over everything else with a uniform size setting that's easy to reason about.
1491 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1492
1493 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1494 // This is done lazily, so we can provide the client to use regardless of
1495 // whether we enabled gRPC or not initially.
1496 cc, err := grpc.Dial(addr, opts...)
1497 if err != nil {
1498 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1499 }
1500
1501 client := proto.NewZoektConfigurationServiceClient(cc)
1502 return client, nil
1503}
1504
1505// mustGetClientMetrics returns a singleton instance of the client metrics
1506// that are shared across all gRPC clients that this process creates.
1507//
1508// This function panics if the metrics cannot be registered with the default
1509// Prometheus registry.
1510func mustGetClientMetrics() *grpc_prometheus.ClientMetrics {
1511 clientMetricsOnce.Do(func() {
1512 clientMetrics = grpc_prometheus.NewRegisteredClientMetrics(prometheus.DefaultRegisterer,
1513 grpc_prometheus.WithClientCounterOptions(),
1514 grpc_prometheus.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
1515 grpc_prometheus.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
1516 grpc_prometheus.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
1517 )
1518 })
1519
1520 return clientMetrics
1521}
1522
1523// addDefaultPort adds a default port to a URL if one is not specified.
1524//
1525// If the URL scheme is "http" and no port is specified, "80" is used.
1526// If the scheme is "https", "443" is used.
1527//
1528// The original URL is not mutated. A copy is modified and returned.
1529func addDefaultPort(original *url.URL) *url.URL {
1530 if original == nil {
1531 return nil // don't panic
1532 }
1533
1534 if !original.IsAbs() {
1535 return original // don't do anything if the URL doesn't look like a remote URL
1536 }
1537
1538 if original.Scheme == "http" && original.Port() == "" {
1539 u := cloneURL(original)
1540 u.Host = net.JoinHostPort(u.Host, "80")
1541 return u
1542 }
1543
1544 if original.Scheme == "https" && original.Port() == "" {
1545 u := cloneURL(original)
1546 u.Host = net.JoinHostPort(u.Host, "443")
1547 return u
1548 }
1549
1550 return original
1551}
1552
1553// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1554// This is copied from net/http/clone.go
1555func cloneURL(u *url.URL) *url.URL {
1556 if u == nil {
1557 return nil
1558 }
1559 u2 := new(url.URL)
1560 *u2 = *u
1561 if u.User != nil {
1562 u2.User = new(url.Userinfo)
1563 *u2.User = *u.User
1564 }
1565 return u2
1566}
1567
1568func main() {
1569 liblog := sglog.Init(sglog.Resource{
1570 Name: "zoekt-indexserver",
1571 Version: zoekt.Version,
1572 InstanceID: zoekt.HostnameBestEffort(),
1573 })
1574 defer liblog.Sync()
1575
1576 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1577 log.Fatal(err)
1578 }
1579}