fork of https://github.com/sourcegraph/zoekt
1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled
2// repositories on sourcegraph
3package main
4
5import (
6 "bytes"
7 "context"
8 _ "embed"
9 "encoding/json"
10 "errors"
11 "flag"
12 "fmt"
13 "html/template"
14 "io"
15 "io/fs"
16 "log"
17 "math"
18 "math/rand"
19 "net"
20 "net/http"
21 "net/url"
22 "os"
23 "os/exec"
24 "os/signal"
25 "path/filepath"
26 "runtime"
27 "sort"
28 "strconv"
29 "strings"
30 "sync"
31 "text/tabwriter"
32 "time"
33
34 "github.com/keegancsmith/tmpfriend"
35 "github.com/peterbourgon/ff/v3/ffcli"
36 "github.com/prometheus/client_golang/prometheus"
37 "github.com/prometheus/client_golang/prometheus/promauto"
38 sglog "github.com/sourcegraph/log"
39 "github.com/sourcegraph/zoekt/grpc/internalerrs"
40 "github.com/sourcegraph/zoekt/grpc/messagesize"
41 "go.uber.org/automaxprocs/maxprocs"
42 "golang.org/x/net/trace"
43 "golang.org/x/sys/unix"
44 "google.golang.org/grpc"
45 "google.golang.org/grpc/credentials/insecure"
46 "google.golang.org/grpc/metadata"
47
48 "github.com/sourcegraph/mountinfo"
49
50 "github.com/sourcegraph/zoekt"
51 "github.com/sourcegraph/zoekt/build"
52 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
53 "github.com/sourcegraph/zoekt/debugserver"
54 "github.com/sourcegraph/zoekt/internal/profiler"
55)
56
57var (
58 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
59 Name: "resolve_revisions_seconds",
60 Help: "A histogram of latencies for resolving all repository revisions.",
61 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
62 })
63
64 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
65 Name: "resolve_revision_seconds",
66 Help: "A histogram of latencies for resolving a repository revision.",
67 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
68 }, []string{"success"}) // success=true|false
69
70 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
71 Name: "get_index_options_total",
72 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
73 })
74 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
75 Name: "get_index_options_error_total",
76 Help: "The total number of times we failed to get index options for a repository.",
77 })
78
79 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
80 Name: "index_repo_seconds",
81 Help: "A histogram of latencies for indexing a repository.",
82 Buckets: prometheus.ExponentialBucketsRange(
83 (100 * time.Millisecond).Seconds(),
84 (40*time.Minute + indexTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
85 20),
86 }, []string{
87 "state", // state is an indexState
88 "name", // name of the repository that was indexed
89 })
90
91 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
92 Name: "index_fetch_seconds",
93 Help: "A histogram of latencies for fetching a repository.",
94 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
95 }, []string{
96 "success", // true|false
97 "name", // the name of the repository that the commits were fetched from
98 })
99
100 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
101 Name: "index_incremental_index_state",
102 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
103 }, []string{"state"}) // state is build.IndexState
104
105 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
106 Name: "index_num_indexed",
107 Help: "Number of indexed repos by code host",
108 })
109
110 metricNumAssigned = promauto.NewGauge(prometheus.GaugeOpts{
111 Name: "index_num_assigned",
112 Help: "Number of repos assigned to this indexer by code host",
113 })
114
115 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
116 Name: "index_failing_total",
117 Help: "Counts failures to index (indexing activity, should be used with rate())",
118 })
119
120 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
121 Name: "index_indexing_total",
122 Help: "Counts indexings (indexing activity, should be used with rate())",
123 })
124
125 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
126 Name: "index_num_stopped_tracking_total",
127 Help: "Counts the number of repos we stopped tracking.",
128 })
129)
130
131// set of repositories that we want to capture separate indexing metrics for
132var reposWithSeparateIndexingMetrics = make(map[string]struct{})
133
134type indexState string
135
136const (
137 indexStateFail indexState = "fail"
138 indexStateSuccess indexState = "success"
139 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
140 indexStateNoop indexState = "noop" // We didn't need to update index
141 indexStateEmpty indexState = "empty" // index is empty (empty repo)
142)
143
144// Server is the main functionality of zoekt-sourcegraph-indexserver. It
145// exists to conveniently use all the options passed in via func main.
146type Server struct {
147 logger sglog.Logger
148
149 Sourcegraph Sourcegraph
150 BatchSize int
151
152 // IndexDir is the index directory to use.
153 IndexDir string
154
155 // IndexConcurrency is the number of repositories we index at once.
156 IndexConcurrency int
157
158 // Interval is how often we sync with Sourcegraph.
159 Interval time.Duration
160 // CPUCount is the amount of parallelism to use when indexing a
161 // repository.
162 CPUCount int
163
164 queue Queue
165
166 // muIndexDir protects the index directory from concurrent access.
167 muIndexDir indexMutex
168
169 // If true, shard merging is enabled.
170 shardMerging bool
171
172 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
173 // use delta-builds for instead of normal builds
174 deltaBuildRepositoriesAllowList map[string]struct{}
175
176 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
177 // before attempting a delta build.
178 deltaShardNumberFallbackThreshold uint64
179
180 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
181 // we skip calculating symbols metadata for during builds
182 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
183
184 hostname string
185
186 mergeOpts mergeOpts
187}
188
189var debug = log.New(io.Discard, "", log.LstdFlags)
190
191// our index commands should output something every 100mb they process.
192//
193// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
194// time." famous last words. A client was indexing a monorepo with 42
195// cores... 5m was not enough.
196const noOutputTimeout = 30 * time.Minute
197
198func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
199 out := &synchronizedBuffer{}
200 cmd.Stdout = out
201 cmd.Stderr = out
202
203 tr.LazyPrintf("%s", cmd.Args)
204
205 defer func() {
206 if err != nil {
207 outS := out.String()
208 tr.LazyPrintf("failed: %v", err)
209 tr.LazyPrintf("output: %s", out)
210 tr.SetError()
211 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
212 }
213 }()
214
215 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
216
217 if err := cmd.Start(); err != nil {
218 return err
219 }
220
221 errC := make(chan error)
222 go func() {
223 errC <- cmd.Wait()
224 }()
225
226 // This channel is set after we have sent sigquit. It allows us to follow up
227 // with a sigkill if the process doesn't quit after sigquit.
228 kill := make(<-chan time.Time)
229
230 lastLen := 0
231 for {
232 select {
233 case <-time.After(noOutputTimeout):
234 // Periodically check if we have had output. If not kill the process.
235 if out.Len() != lastLen {
236 lastLen = out.Len()
237 log.Printf("still running %s", cmd.Args)
238 } else {
239 // Send quit (C-\) first so we get a stack dump.
240 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
241 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
242 log.Println("quit failed:", err)
243 }
244
245 // send sigkill if still running in 10s
246 kill = time.After(10 * time.Second)
247 }
248
249 case <-kill:
250 log.Printf("still running, killing %s", cmd.Args)
251 if err := cmd.Process.Kill(); err != nil {
252 log.Println("kill failed:", err)
253 }
254
255 case err := <-errC:
256 if err != nil {
257 return err
258 }
259
260 tr.LazyPrintf("success")
261 return nil
262 }
263 }
264}
265
266// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
267// monitor the buffer while it is being written to.
268type synchronizedBuffer struct {
269 mu sync.Mutex
270 b bytes.Buffer
271}
272
273func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
274 sb.mu.Lock()
275 defer sb.mu.Unlock()
276 return sb.b.Write(p)
277}
278
279func (sb *synchronizedBuffer) Len() int {
280 sb.mu.Lock()
281 defer sb.mu.Unlock()
282 return sb.b.Len()
283}
284
285func (sb *synchronizedBuffer) String() string {
286 sb.mu.Lock()
287 defer sb.mu.Unlock()
288 return sb.b.String()
289}
290
291// pauseFileName if present in IndexDir will stop index jobs from
292// running. This is to make it possible to experiment with the content of the
293// IndexDir without the indexserver writing to it.
294const pauseFileName = "PAUSE"
295
296// Run the sync loop. This blocks forever.
297func (s *Server) Run() {
298 removeIncompleteShards(s.IndexDir)
299
300 // Start a goroutine which updates the queue with commits to index.
301 go func() {
302 // We update the list of indexed repos every Interval. To speed up manual
303 // testing we also listen for SIGUSR1 to trigger updates.
304 //
305 // "pkill -SIGUSR1 zoekt-sourcegra"
306 for range jitterTicker(s.Interval, unix.SIGUSR1) {
307 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
308 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
309 continue
310 }
311
312 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
313 if err != nil {
314 log.Printf("error listing repos: %s", err)
315 continue
316 }
317
318 debug.Printf("updating index queue with %d repositories", len(repos.IDs))
319
320 // Stop indexing repos we don't need to track anymore
321 removed := s.queue.MaybeRemoveMissing(repos.IDs)
322 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
323 if len(removed) > 0 {
324 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
325 }
326
327 cleanupDone := make(chan struct{})
328 go func() {
329 defer close(cleanupDone)
330 s.muIndexDir.Global(func() {
331 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
332 })
333 }()
334
335 repos.IterateIndexOptions(s.queue.AddOrUpdate)
336
337 // IterateIndexOptions will only iterate over repositories that have
338 // changed since we last called list. However, we want to add all IDs
339 // back onto the queue just to check that what is on disk is still
340 // correct. This will use the last IndexOptions we stored in the
341 // queue. The repositories not on the queue (missing) need a forced
342 // fetch of IndexOptions.
343 missing := s.queue.Bump(repos.IDs)
344 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
345
346 setCompoundShardCounter(s.IndexDir)
347
348 <-cleanupDone
349 }
350 }()
351
352 go func() {
353 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
354 if s.shardMerging {
355 s.vacuum()
356 }
357 }
358 }()
359
360 go func() {
361 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
362 if s.shardMerging {
363 s.doMerge()
364 }
365 }
366 }()
367
368 for i := 0; i < s.IndexConcurrency; i++ {
369 go s.processQueue()
370 }
371
372 // block forever
373 select {}
374}
375
376// formatList returns a comma-separated list of the first min(len(v), m) items.
377func formatListUint32(v []uint32, m int) string {
378 if len(v) < m {
379 m = len(v)
380 }
381
382 sb := strings.Builder{}
383 for i := 0; i < m; i++ {
384 fmt.Fprintf(&sb, "%d, ", v[i])
385 }
386
387 if len(v) > m {
388 sb.WriteString("...")
389 }
390
391 return strings.TrimRight(sb.String(), ", ")
392}
393
394func (s *Server) processQueue() {
395 for {
396 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
397 time.Sleep(time.Second)
398 continue
399 }
400
401 opts, ok := s.queue.Pop()
402 if !ok {
403 time.Sleep(time.Second)
404 continue
405 }
406
407 args := s.indexArgs(opts)
408
409 ran := s.muIndexDir.With(opts.Name, func() {
410 // only record time taken once we hold the lock. This avoids us
411 // recording time taken while merging/cleanup runs.
412 start := time.Now()
413
414 state, err := s.Index(args)
415
416 elapsed := time.Since(start)
417
418 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
419
420 if err != nil {
421 log.Printf("error indexing %s: %s", args.String(), err)
422 }
423
424 switch state {
425 case indexStateSuccess:
426 var branches []string
427 for _, b := range args.Branches {
428 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
429 }
430 s.logger.Info("updated index",
431 sglog.String("repo", args.Name),
432 sglog.Uint32("id", args.RepoID),
433 sglog.Strings("branches", branches),
434 sglog.Duration("duration", elapsed),
435 )
436 case indexStateSuccessMeta:
437 log.Printf("updated meta %s in %v", args.String(), elapsed)
438 }
439 s.queue.SetIndexed(opts, state)
440 })
441
442 if !ran {
443 // Someone else is processing the repository. We can just skip this job
444 // since the repository will be added back to the queue and we will
445 // converge to the correct behaviour.
446 debug.Printf("index job for repository already running: %s", args)
447 continue
448 }
449 }
450}
451
452// repoNameForMetric returns a normalized version of the given repository name that is
453// suitable for use with Prometheus metrics.
454func repoNameForMetric(repo string) string {
455 // Check to see if we want to be able to capture separate indexing metrics for this repository.
456 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
457 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
458 return repo
459 }
460
461 return ""
462}
463
464func batched(slice []uint32, size int) <-chan []uint32 {
465 c := make(chan []uint32)
466 go func() {
467 for len(slice) > 0 {
468 if size > len(slice) {
469 size = len(slice)
470 }
471 c <- slice[:size]
472 slice = slice[size:]
473 }
474 close(c)
475 }()
476 return c
477}
478
479// jitterTicker returns a ticker which ticks with a jitter. Each tick is
480// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
481//
482// sig is a list of signals which also cause the ticker to fire. This is a
483// convenience to allow manually triggering of the ticker.
484func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
485 ticker := make(chan struct{})
486
487 go func() {
488 for {
489 ticker <- struct{}{}
490 ns := int64(d)
491 jitter := rand.Int63n(ns)
492 time.Sleep(time.Duration(ns/2 + jitter))
493 }
494 }()
495
496 go func() {
497 if len(sig) == 0 {
498 return
499 }
500
501 c := make(chan os.Signal, 1)
502 signal.Notify(c, sig...)
503 for range c {
504 ticker <- struct{}{}
505 }
506 }()
507
508 return ticker
509}
510
511// Index starts an index job for repo name at commit.
512func (s *Server) Index(args *indexArgs) (state indexState, err error) {
513 tr := trace.New("index", args.Name)
514
515 defer func() {
516 if err != nil {
517 tr.SetError()
518 tr.LazyPrintf("error: %v", err)
519 state = indexStateFail
520 metricFailingTotal.Inc()
521 }
522 tr.LazyPrintf("state: %s", state)
523 tr.Finish()
524 }()
525
526 tr.LazyPrintf("branches: %v", args.Branches)
527
528 if len(args.Branches) == 0 {
529 return indexStateEmpty, createEmptyShard(args)
530 }
531
532 repositoryName := args.Name
533 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
534 tr.LazyPrintf("marking this repository for delta build")
535 args.UseDelta = true
536 }
537
538 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
539
540 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
541 tr.LazyPrintf("skipping symbols calculation")
542 args.Symbols = false
543 }
544
545 reason := "forced"
546
547 if args.Incremental {
548 bo := args.BuildOptions()
549 bo.SetDefaults()
550 incrementalState, fn := bo.IndexState()
551 reason = string(incrementalState)
552 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
553
554 switch incrementalState {
555 case build.IndexStateEqual:
556 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn)
557 return indexStateNoop, nil
558
559 case build.IndexStateMeta:
560 log.Printf("updating index.meta %s", args.String())
561
562 if err := mergeMeta(bo); err != nil {
563 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
564 } else {
565 return indexStateSuccessMeta, nil
566 }
567
568 case build.IndexStateCorrupt:
569 log.Printf("falling back to full update: corrupt index: %s", args.String())
570 }
571 }
572
573 log.Printf("updating index %s reason=%s", args.String(), reason)
574
575 metricIndexingTotal.Inc()
576 c := gitIndexConfig{
577 runCmd: func(cmd *exec.Cmd) error {
578 return s.loggedRun(tr, cmd)
579 },
580
581 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
582 return args.BuildOptions().FindRepositoryMetadata()
583 },
584 }
585
586 err = gitIndex(c, args, s.Sourcegraph, s.logger)
587 if err != nil {
588 return indexStateFail, err
589 }
590
591 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
592 s.logger.Error("failed to update index status",
593 sglog.String("repo", args.Name),
594 sglog.Uint32("id", args.RepoID),
595 sglogBranches("branches", args.Branches),
596 sglog.Error(err),
597 )
598 }
599
600 return indexStateSuccess, nil
601}
602
603// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
604// it can update the zoekt_repos table.
605func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
606 // We need to read from disk for IndexTime.
607 _, metadata, ok, err := c.findRepositoryMetadata(args)
608 if err != nil {
609 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
610 }
611 if !ok {
612 return errors.New("failed to find metadata for new/updated index")
613 }
614
615 status := []indexStatus{{
616 RepoID: args.RepoID,
617 Branches: args.Branches,
618 IndexTimeUnix: metadata.IndexTime.Unix(),
619 }}
620 if err := sg.UpdateIndexStatus(status); err != nil {
621 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
622 }
623
624 return nil
625}
626
627func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
628 ss := make([]string, len(branches))
629 for i, b := range branches {
630 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
631 }
632 return sglog.Strings(key, ss)
633}
634
635func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
636 return &indexArgs{
637 IndexOptions: opts,
638
639 IndexDir: s.IndexDir,
640 Parallelism: s.CPUCount,
641
642 Incremental: true,
643
644 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
645 FileLimit: 1 << 20,
646 }
647}
648
649func createEmptyShard(args *indexArgs) error {
650 bo := args.BuildOptions()
651 bo.SetDefaults()
652 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
653
654 if args.Incremental && bo.IncrementalSkipIndexing() {
655 return nil
656 }
657
658 builder, err := build.NewBuilder(*bo)
659 if err != nil {
660 return err
661 }
662 return builder.Finish()
663}
664
665// addDebugHandlers adds handlers specific to indexserver.
666func (s *Server) addDebugHandlers(mux *http.ServeMux) {
667 // Sourcegraph's site admin view requires indexserver to serve it's admin view
668 // on "/".
669 mux.Handle("/", http.HandlerFunc(s.handleRoot))
670
671 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
672 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
673 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
674 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
675 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
676 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
677}
678
679func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
680 if r.Method != "GET" {
681 w.Header().Set("Allow", "GET")
682 w.WriteHeader(http.StatusMethodNotAllowed)
683 return
684 }
685
686 response := struct {
687 Hostname string
688 }{
689 Hostname: s.hostname,
690 }
691
692 b, err := json.Marshal(response)
693 if err != nil {
694 http.Error(w, err.Error(), http.StatusInternalServerError)
695 return
696 }
697
698 w.Header().Set("Content-Type", "application/json; charset=utf-8")
699 w.Write(b)
700}
701
702var rootTmpl = template.Must(template.New("name").Parse(`
703<html>
704 <body>
705 <a href="debug">Debug</a><br />
706 <a href="debug/requests">Traces</a><br />
707 {{.IndexMsg}}<br />
708 <br />
709 <h3>Reindex</h3>
710 {{if .Repos}}
711 <a href="?show_repos=false">hide repos</a><br />
712 <table style="margin-top: 20px">
713 <th style="text-align:left">Name</th>
714 <th style="text-align:left">ID</th>
715 {{range .Repos}}
716 <tr>
717 <td>{{.Name}}</td>
718 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
719 </tr>
720 {{end}}
721 </table>
722 {{else}}
723 <a href="?show_repos=true">show repos</a><br />
724 {{end}}
725 </body>
726</html>
727`))
728
729func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
730 if r.Method != "GET" {
731 w.Header().Set("Allow", "GET")
732 w.WriteHeader(http.StatusMethodNotAllowed)
733 return
734 }
735
736 values := r.URL.Query()
737
738 // ?id=
739 indexMsg := ""
740 if v := values.Get("id"); v != "" {
741 id, err := strconv.Atoi(v)
742 if err != nil {
743 http.Error(w, err.Error(), http.StatusBadRequest)
744 return
745 }
746 indexMsg, _ = s.forceIndex(uint32(id))
747 }
748
749 // ?show_repos=
750 showRepos := false
751 if v := values.Get("show_repos"); v != "" {
752 showRepos, _ = strconv.ParseBool(v)
753 }
754
755 type Repo struct {
756 ID uint32
757 Name string
758 }
759 var data struct {
760 Repos []Repo
761 IndexMsg string
762 }
763
764 data.IndexMsg = indexMsg
765
766 if showRepos {
767 s.queue.Iterate(func(opts *IndexOptions) {
768 data.Repos = append(data.Repos, Repo{
769 ID: opts.RepoID,
770 Name: opts.Name,
771 })
772 })
773 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
774 }
775
776 _ = rootTmpl.Execute(w, data)
777}
778
779// handleReindex triggers a reindex asynocronously. If a reindex was triggered
780// the request returns with status 202. The caller can infer the new state of
781// the index by calling List.
782func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
783 if r.Method != http.MethodPost {
784 w.Header().Set("Allow", http.MethodPost)
785 w.WriteHeader(http.StatusMethodNotAllowed)
786 return
787 }
788
789 err := r.ParseForm()
790 if err != nil {
791 http.Error(w, err.Error(), http.StatusBadRequest)
792 return
793 }
794
795 id, err := strconv.Atoi(r.Form.Get("repo"))
796 if err != nil {
797 http.Error(w, err.Error(), http.StatusBadRequest)
798 return
799 }
800
801 go func() { s.forceIndex(uint32(id)) }()
802
803 // 202 Accepted
804 w.WriteHeader(http.StatusAccepted)
805}
806
807func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
808 withIndexed := true
809 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
810 withIndexed = b
811 }
812
813 var indexed []uint32
814 if withIndexed {
815 indexed = listIndexed(s.IndexDir)
816 }
817
818 repos, err := s.Sourcegraph.List(r.Context(), indexed)
819 if err != nil {
820 http.Error(w, err.Error(), http.StatusInternalServerError)
821 return
822 }
823
824 bw := bytes.Buffer{}
825
826 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
827
828 _, err = fmt.Fprintf(tw, "ID\tName\n")
829 if err != nil {
830 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
831 return
832 }
833
834 s.queue.mu.Lock()
835 name := ""
836 for _, id := range repos.IDs {
837 if item := s.queue.get(id); item != nil {
838 name = item.opts.Name
839 } else {
840 name = ""
841 }
842 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
843 if err != nil {
844 debug.Printf("handleDebugList: %s\n", err.Error())
845 }
846 }
847 s.queue.mu.Unlock()
848
849 if err != nil {
850 http.Error(w, err.Error(), http.StatusInternalServerError)
851 return
852 }
853
854 err = tw.Flush()
855 if err != nil {
856 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
857 return
858 }
859
860 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
861
862 if _, err := io.Copy(w, &bw); err != nil {
863 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
864 return
865 }
866}
867
868// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
869// can run this command during periods of low usage (evenings, weekends) to
870// trigger an initial merge run. In the steady-state, merges happen rarely, even
871// on busy instances, and users can rely on automatic merging instead.
872func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
873
874 // A merge operation can take very long, depending on the number merges and the
875 // target size of the compound shards. We run the merge in the background and
876 // return immediately to the user.
877 //
878 // We track the status of the merge with metricShardMergingRunning.
879 go func() {
880 s.doMerge()
881 }()
882 _, _ = w.Write([]byte("merging enqueued\n"))
883}
884
885func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
886 indexed := listIndexed(s.IndexDir)
887
888 bw := bytes.Buffer{}
889
890 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
891
892 _, err := fmt.Fprintf(tw, "ID\tName\n")
893 if err != nil {
894 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
895 return
896 }
897
898 s.queue.mu.Lock()
899 name := ""
900 for _, id := range indexed {
901 if item := s.queue.get(id); item != nil {
902 name = item.opts.Name
903 } else {
904 name = ""
905 }
906 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
907 if err != nil {
908 debug.Printf("handleDebugIndexed: %s\n", err.Error())
909 }
910 }
911 s.queue.mu.Unlock()
912
913 if err != nil {
914 http.Error(w, err.Error(), http.StatusInternalServerError)
915 return
916 }
917
918 err = tw.Flush()
919 if err != nil {
920 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
921 return
922 }
923
924 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
925
926 if _, err := io.Copy(w, &bw); err != nil {
927 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
928 return
929 }
930}
931
932// forceIndex will run the index job for repo name now. It will return always
933// return a string explaining what it did, even if it failed.
934func (s *Server) forceIndex(id uint32) (string, error) {
935 var opts IndexOptions
936 var err error
937 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
938 opts = o
939 }, func(_ uint32, e error) {
940 err = e
941 }, id)
942 if err != nil {
943 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
944 }
945
946 args := s.indexArgs(opts)
947 args.Incremental = false // force re-index
948
949 var state indexState
950 ran := s.muIndexDir.With(opts.Name, func() {
951 state, err = s.Index(args)
952 })
953 if !ran {
954 return fmt.Sprintf("index job for repository already running: %s", args), nil
955 }
956 if err != nil {
957 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
958 }
959 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
960}
961
962func listIndexed(indexDir string) []uint32 {
963 index := getShards(indexDir)
964 metricNumIndexed.Set(float64(len(index)))
965 repoIDs := make([]uint32, 0, len(index))
966 for id := range index {
967 repoIDs = append(repoIDs, id)
968 }
969 sort.Slice(repoIDs, func(i, j int) bool {
970 return repoIDs[i] < repoIDs[j]
971 })
972 return repoIDs
973}
974
975// setupTmpDir sets up a temporary directory on the same volume as the
976// indexes.
977//
978// If main is true we will delete older temp directories left around. main is
979// false when this is a debug command.
980func setupTmpDir(main bool, index string) error {
981 // change the target tmp directory depending on if its our main daemon or a
982 // debug sub command.
983 dir := ".indexserver.debug.tmp"
984 if main {
985 dir = ".indexserver.tmp"
986 }
987
988 tmpRoot := filepath.Join(index, dir)
989 if err := os.MkdirAll(tmpRoot, 0755); err != nil {
990 return err
991 }
992 if !tmpfriend.IsTmpFriendDir(tmpRoot) {
993 _, err := tmpfriend.RootTempDir(tmpRoot)
994 return err
995 }
996 return nil
997}
998
999func printMetaData(fn string) error {
1000 repo, indexMeta, err := zoekt.ReadMetadataPath(fn)
1001 if err != nil {
1002 return err
1003 }
1004
1005 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1006 if err != nil {
1007 return err
1008 }
1009
1010 err = json.NewEncoder(os.Stdout).Encode(repo)
1011 if err != nil {
1012 return err
1013 }
1014 return nil
1015}
1016
1017func printShardStats(fn string) error {
1018 f, err := os.Open(fn)
1019 if err != nil {
1020 return err
1021 }
1022
1023 iFile, err := zoekt.NewIndexFile(f)
1024 if err != nil {
1025 return err
1026 }
1027
1028 return zoekt.PrintNgramStats(iFile)
1029}
1030
1031func srcLogLevelIsDebug() bool {
1032 lvl := os.Getenv(sglog.EnvLogLevel)
1033 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1034}
1035
1036func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1037 v := os.Getenv(k)
1038 if v == "" {
1039 return defaultVal
1040 }
1041 i, err := strconv.ParseInt(v, 10, 64)
1042 if err != nil {
1043 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1044 }
1045 return i
1046}
1047
1048func getEnvWithDefaultInt(k string, defaultVal int) int {
1049 v := os.Getenv(k)
1050 if v == "" {
1051 return defaultVal
1052 }
1053 i, err := strconv.Atoi(k)
1054 if err != nil {
1055 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1056 }
1057 return i
1058}
1059
1060func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1061 v := os.Getenv(k)
1062 if v == "" {
1063 return defaultVal
1064 }
1065 i, err := strconv.ParseUint(v, 10, 64)
1066 if err != nil {
1067 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1068 }
1069 return i
1070}
1071
1072func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1073 v := os.Getenv(k)
1074 if v == "" {
1075 return defaultVal
1076 }
1077 f, err := strconv.ParseFloat(v, 64)
1078 if err != nil {
1079 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1080 }
1081 return f
1082}
1083
1084func getEnvWithDefaultString(k string, defaultVal string) string {
1085 v := os.Getenv(k)
1086 if v == "" {
1087 return defaultVal
1088 }
1089 return v
1090}
1091
1092func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1093 v := os.Getenv(k)
1094 if v == "" {
1095 return defaultVal
1096 }
1097
1098 d, err := time.ParseDuration(v)
1099 if err != nil {
1100 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1101 }
1102 return d
1103}
1104
1105func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1106 v := os.Getenv(k)
1107 if v == "" {
1108 return defaultVal
1109 }
1110
1111 b, err := strconv.ParseBool(v)
1112 if err != nil {
1113 log.Fatalf("error parsing ENV %s to bool: %s", k, err)
1114 }
1115 return b
1116}
1117
1118func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1119 set := map[string]struct{}{}
1120 for _, v := range strings.Split(os.Getenv(k), ",") {
1121 v = strings.TrimSpace(v)
1122 if v != "" {
1123 set[v] = struct{}{}
1124 }
1125 }
1126 return set
1127}
1128
1129func joinStringSet(set map[string]struct{}, sep string) string {
1130 var xs []string
1131 for x := range set {
1132 xs = append(xs, x)
1133 }
1134
1135 return strings.Join(xs, sep)
1136}
1137
1138func setCompoundShardCounter(indexDir string) {
1139 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt"))
1140 if err != nil {
1141 log.Printf("setCompoundShardCounter: %s\n", err)
1142 return
1143 }
1144 metricNumberCompoundShards.Set(float64(len(fns)))
1145}
1146
1147func rootCmd() *ffcli.Command {
1148 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1149 conf := rootConfig{
1150 Main: true,
1151 }
1152 conf.registerRootFlags(rootFs)
1153
1154 return &ffcli.Command{
1155 FlagSet: rootFs,
1156 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1157 Subcommands: []*ffcli.Command{debugCmd()},
1158 Exec: func(ctx context.Context, args []string) error {
1159 return startServer(conf)
1160 },
1161 }
1162}
1163
1164type rootConfig struct {
1165 // Main is true if this rootConfig is for our main long running command (the
1166 // indexserver). Debug commands should not set this value. This is used to
1167 // determine if we need to run tmpfriend.
1168 Main bool
1169
1170 root string
1171 interval time.Duration
1172 index string
1173 indexConcurrency int64
1174 listen string
1175 hostname string
1176 cpuFraction float64
1177 blockProfileRate int
1178
1179 // config values related to shard merging
1180 vacuumInterval time.Duration
1181 mergeInterval time.Duration
1182 targetSize int64
1183 minSize int64
1184 minAgeDays int
1185 maxPriority float64
1186
1187 // config values related to backoff indexing repos with one or more consecutive failures
1188 backoffDuration time.Duration
1189 maxBackoffDuration time.Duration
1190
1191 // useGRPC is true if we should use the gRPC API to talk to Sourcegraph.
1192 useGRPC bool
1193}
1194
1195func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1196 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1197 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1198 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of concurrent index jobs to run.")
1199 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use")
1200 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1201 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1202 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1203 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate")
1204 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1205 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1206 fs.BoolVar(&rc.useGRPC, "use_grpc", getEnvWithDefaultBool("GRPC_ENABLED", false), "use the gRPC API to talk to Sourcegraph")
1207
1208 // flags related to shard merging
1209 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1210 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1211 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB")
1212 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB")
1213 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1214 fs.Float64Var(&rc.maxPriority, "merge_max_priority", getEnvWithDefaultFloat64("SRC_MERGE_MAX_PRIORITY", 100), "the maximum priority a shard can have to be considered for merging.")
1215}
1216
1217func startServer(conf rootConfig) error {
1218 s, err := newServer(conf)
1219 if err != nil {
1220 return err
1221 }
1222
1223 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate)
1224 setCompoundShardCounter(s.IndexDir)
1225
1226 if conf.listen != "" {
1227
1228 mux := http.NewServeMux()
1229 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1230 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1231 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1232 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1233 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1234 }...)
1235 s.addDebugHandlers(mux)
1236
1237 go func() {
1238 debug.Printf("serving HTTP on %s", conf.listen)
1239 log.Fatal(http.ListenAndServe(conf.listen, mux))
1240
1241 }()
1242
1243 // Serve mux on a unix domain socket on a best-effort-basis so that
1244 // webserver can call the endpoints via the shared filesystem.
1245 //
1246 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1247 // on the socket due to permission errors. See
1248 // https://github.com/docker/for-mac/issues/6239
1249 go func() {
1250 serveHTTPOverSocket := func() error {
1251 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1252 // We cannot bind a socket to an existing pathname.
1253 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1254 return fmt.Errorf("error removing socket file: %s", socket)
1255 }
1256 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1257 // unixpacket).
1258 l, err := net.Listen("unix", socket)
1259 if err != nil {
1260 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1261 }
1262 // Indexserver (root) and webserver (Sourcegraph) run with
1263 // different users. Per default, the socket is created with
1264 // permission 755 (root root), which doesn't let webserver write to
1265 // it.
1266 //
1267 // See https://github.com/golang/go/issues/11822 for more context.
1268 if err := os.Chmod(socket, 0777); err != nil {
1269 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1270 }
1271 debug.Printf("serving HTTP on %s", socket)
1272 return http.Serve(l, mux)
1273 }
1274 debug.Print(serveHTTPOverSocket())
1275 }()
1276 }
1277
1278 oc := &ownerChecker{
1279 Path: filepath.Join(conf.index, "owner.txt"),
1280 Hostname: conf.hostname,
1281 }
1282 go oc.Run()
1283
1284 logger := sglog.Scoped("metricsRegistration", "")
1285 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1286
1287 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1288 prometheus.DefaultRegisterer.MustRegister(c)
1289
1290 s.Run()
1291 return nil
1292}
1293
1294func newServer(conf rootConfig) (*Server, error) {
1295 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1296 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1297 }
1298 if conf.index == "" {
1299 return nil, fmt.Errorf("must set -index")
1300 }
1301 if conf.root == "" {
1302 return nil, fmt.Errorf("must set -sourcegraph_url")
1303 }
1304 rootURL, err := url.Parse(conf.root)
1305 if err != nil {
1306 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1307 }
1308
1309 rootURL = addDefaultPort(rootURL)
1310
1311 // Tune GOMAXPROCS to match Linux container CPU quota.
1312 _, _ = maxprocs.Set()
1313
1314 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler.
1315 // The block profiler is disabled by default and should be enabled with care in production
1316 runtime.SetBlockProfileRate(conf.blockProfileRate)
1317
1318 // Automatically prepend our own path at the front, to minimize
1319 // required configuration.
1320 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1321 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1322 }
1323
1324 if _, err := os.Stat(conf.index); err != nil {
1325 if err := os.MkdirAll(conf.index, 0755); err != nil {
1326 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1327 }
1328 }
1329
1330 if err := setupTmpDir(conf.Main, conf.index); err != nil {
1331 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1332 }
1333
1334 if srcLogLevelIsDebug() {
1335 debug = log.New(os.Stderr, "", log.LstdFlags)
1336 }
1337
1338 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1339 if len(reposWithSeparateIndexingMetrics) > 0 {
1340 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1341 }
1342
1343 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1344 if len(deltaBuildRepositoriesAllowList) > 0 {
1345 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1346 }
1347
1348 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1349 if deltaShardNumberFallbackThreshold > 0 {
1350 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1351 } else {
1352 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1353 }
1354
1355 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1356 if len(reposShouldSkipSymbolsCalculation) > 0 {
1357 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1358 }
1359
1360 var sg Sourcegraph
1361 if rootURL.IsAbs() {
1362 var batchSize int
1363 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1364 batchSize, err = strconv.Atoi(v)
1365 if err != nil {
1366 return nil, fmt.Errorf("Invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1367 }
1368 }
1369
1370 opts := []SourcegraphClientOption{
1371 WithBatchSize(batchSize),
1372 WithShouldUseGRPC(conf.useGRPC),
1373 }
1374
1375 logger := sglog.Scoped("zoektConfigurationGRPCClient", "")
1376 client, err := dialGRPCClient(rootURL.Host, logger)
1377 if err != nil {
1378 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1379 }
1380
1381 opts = append(opts, WithGRPCClient(client))
1382 sg = newSourcegraphClient(rootURL, conf.hostname, opts...)
1383
1384 } else {
1385 sg = sourcegraphFake{
1386 RootDir: rootURL.String(),
1387 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1388 }
1389 }
1390
1391 if conf.indexConcurrency < 1 {
1392 conf.indexConcurrency = 1
1393 }
1394
1395 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
1396 if cpuCount < 1 {
1397 cpuCount = 1
1398 }
1399
1400 logger := sglog.Scoped("server", "periodically reindexes enabled repositories on sourcegraph")
1401
1402 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1403
1404 return &Server{
1405 logger: logger,
1406 Sourcegraph: sg,
1407 IndexDir: conf.index,
1408 IndexConcurrency: int(conf.indexConcurrency),
1409 Interval: conf.interval,
1410 CPUCount: cpuCount,
1411 queue: *q,
1412 shardMerging: zoekt.ShardMergingEnabled(),
1413 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1414 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1415 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1416 hostname: conf.hostname,
1417 mergeOpts: mergeOpts{
1418 vacuumInterval: conf.vacuumInterval,
1419 mergeInterval: conf.mergeInterval,
1420 targetSizeBytes: conf.targetSize * 1024 * 1024,
1421 minSizeBytes: conf.minSize * 1024 * 1024,
1422 minAgeDays: conf.minAgeDays,
1423 maxPriority: conf.maxPriority,
1424 },
1425 }, err
1426}
1427
1428// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1429// for the indexed-search-configuration gRPC service.
1430//
1431// The default backoff strategy is modeled after the default settings used by
1432// retryablehttp.DefaultClient.
1433//
1434// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1435// - Unavailable
1436// - Aborted
1437//
1438//go:embed default_grpc_service_configuration.json
1439var defaultGRPCServiceConfigurationJSON string
1440
1441func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1442 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1443 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1444 return invoker(ctx, method, req, reply, cc, opts...)
1445 }
1446}
1447
1448func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1449 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1450 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1451 return streamer(ctx, desc, cc, method, opts...)
1452 }
1453}
1454
1455// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1456// This can be overridden by providing custom Server/Dial options.
1457const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1458
1459func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) {
1460 opts := []grpc.DialOption{
1461 grpc.WithTransportCredentials(insecure.NewCredentials()),
1462 grpc.WithChainStreamInterceptor(
1463 internalActorStreamInterceptor(),
1464 internalerrs.LoggingStreamClientInterceptor(logger),
1465 internalerrs.PrometheusStreamClientInterceptor,
1466 ),
1467 grpc.WithChainUnaryInterceptor(
1468 internalActorUnaryInterceptor(),
1469 internalerrs.LoggingUnaryClientInterceptor(logger),
1470 internalerrs.PrometheusUnaryClientInterceptor,
1471 ),
1472 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1473 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1474 }
1475
1476 opts = append(opts, additionalOpts...)
1477
1478 // Ensure that the message size options are set last, so they override any other
1479 // client-specific options that tweak the message size.
1480 //
1481 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1482 // take precedence over everything else with a uniform size setting that's easy to reason about.
1483 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1484
1485 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1486 // This is done lazily, so we can provide the client to use regardless of
1487 // whether we enabled gRPC or not initially.
1488 cc, err := grpc.Dial(addr, opts...)
1489 if err != nil {
1490 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1491 }
1492
1493 client := proto.NewZoektConfigurationServiceClient(cc)
1494 return client, nil
1495}
1496
1497// addDefaultPort adds a default port to a URL if one is not specified.
1498//
1499// If the URL scheme is "http" and no port is specified, "80" is used.
1500// If the scheme is "https", "443" is used.
1501//
1502// The original URL is not mutated. A copy is modified and returned.
1503func addDefaultPort(original *url.URL) *url.URL {
1504 if original == nil {
1505 return nil // don't panic
1506 }
1507
1508 if !original.IsAbs() {
1509 return original // don't do anything if the URL doesn't look like a remote URL
1510 }
1511
1512 if original.Scheme == "http" && original.Port() == "" {
1513 u := cloneURL(original)
1514 u.Host = net.JoinHostPort(u.Host, "80")
1515 return u
1516 }
1517
1518 if original.Scheme == "https" && original.Port() == "" {
1519 u := cloneURL(original)
1520 u.Host = net.JoinHostPort(u.Host, "443")
1521 return u
1522 }
1523
1524 return original
1525}
1526
1527// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1528// This is copied from net/http/clone.go
1529func cloneURL(u *url.URL) *url.URL {
1530 if u == nil {
1531 return nil
1532 }
1533 u2 := new(url.URL)
1534 *u2 = *u
1535 if u.User != nil {
1536 u2.User = new(url.Userinfo)
1537 *u2.User = *u.User
1538 }
1539 return u2
1540}
1541
1542func main() {
1543 liblog := sglog.Init(sglog.Resource{
1544 Name: "zoekt-indexserver",
1545 Version: zoekt.Version,
1546 InstanceID: zoekt.HostnameBestEffort(),
1547 })
1548 defer liblog.Sync()
1549
1550 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1551 log.Fatal(err)
1552 }
1553}