fork of https://github.com/sourcegraph/zoekt
1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled
2// repositories on sourcegraph
3package main
4
5import (
6 "bytes"
7 "context"
8 _ "embed"
9 "encoding/json"
10 "errors"
11 "flag"
12 "fmt"
13 "html/template"
14 "io"
15 "io/fs"
16 "log"
17 "math"
18 "math/rand"
19 "net"
20 "net/http"
21 "net/url"
22 "os"
23 "os/exec"
24 "os/signal"
25 "path/filepath"
26 "runtime"
27 "sort"
28 "strconv"
29 "strings"
30 "sync"
31 "text/tabwriter"
32 "time"
33
34 "github.com/keegancsmith/tmpfriend"
35 "github.com/peterbourgon/ff/v3/ffcli"
36 "github.com/prometheus/client_golang/prometheus"
37 "github.com/prometheus/client_golang/prometheus/promauto"
38 sglog "github.com/sourcegraph/log"
39 "go.uber.org/automaxprocs/maxprocs"
40 "golang.org/x/net/trace"
41 "golang.org/x/sys/unix"
42 "google.golang.org/grpc"
43 "google.golang.org/grpc/credentials/insecure"
44 "google.golang.org/grpc/metadata"
45
46 "github.com/sourcegraph/mountinfo"
47
48 "github.com/sourcegraph/zoekt"
49 "github.com/sourcegraph/zoekt/build"
50 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
51 "github.com/sourcegraph/zoekt/debugserver"
52 "github.com/sourcegraph/zoekt/internal/profiler"
53)
54
55var (
56 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
57 Name: "resolve_revisions_seconds",
58 Help: "A histogram of latencies for resolving all repository revisions.",
59 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
60 })
61
62 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
63 Name: "resolve_revision_seconds",
64 Help: "A histogram of latencies for resolving a repository revision.",
65 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
66 }, []string{"success"}) // success=true|false
67
68 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
69 Name: "get_index_options_total",
70 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
71 })
72 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
73 Name: "get_index_options_error_total",
74 Help: "The total number of times we failed to get index options for a repository.",
75 })
76
77 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
78 Name: "index_repo_seconds",
79 Help: "A histogram of latencies for indexing a repository.",
80 Buckets: prometheus.ExponentialBucketsRange(
81 (100 * time.Millisecond).Seconds(),
82 (40*time.Minute + indexTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
83 20),
84 }, []string{
85 "state", // state is an indexState
86 "name", // name of the repository that was indexed
87 })
88
89 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
90 Name: "index_fetch_seconds",
91 Help: "A histogram of latencies for fetching a repository.",
92 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
93 }, []string{
94 "success", // true|false
95 "name", // the name of the repository that the commits were fetched from
96 })
97
98 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
99 Name: "index_incremental_index_state",
100 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
101 }, []string{"state"}) // state is build.IndexState
102
103 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
104 Name: "index_num_indexed",
105 Help: "Number of indexed repos by code host",
106 })
107
108 metricNumAssigned = promauto.NewGauge(prometheus.GaugeOpts{
109 Name: "index_num_assigned",
110 Help: "Number of repos assigned to this indexer by code host",
111 })
112
113 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
114 Name: "index_failing_total",
115 Help: "Counts failures to index (indexing activity, should be used with rate())",
116 })
117
118 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
119 Name: "index_indexing_total",
120 Help: "Counts indexings (indexing activity, should be used with rate())",
121 })
122
123 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
124 Name: "index_num_stopped_tracking_total",
125 Help: "Counts the number of repos we stopped tracking.",
126 })
127)
128
129// set of repositories that we want to capture separate indexing metrics for
130var reposWithSeparateIndexingMetrics = make(map[string]struct{})
131
132type indexState string
133
134const (
135 indexStateFail indexState = "fail"
136 indexStateSuccess indexState = "success"
137 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
138 indexStateNoop indexState = "noop" // We didn't need to update index
139 indexStateEmpty indexState = "empty" // index is empty (empty repo)
140)
141
142// Server is the main functionality of zoekt-sourcegraph-indexserver. It
143// exists to conveniently use all the options passed in via func main.
144type Server struct {
145 logger sglog.Logger
146
147 Sourcegraph Sourcegraph
148 BatchSize int
149
150 // IndexDir is the index directory to use.
151 IndexDir string
152
153 // IndexConcurrency is the number of repositories we index at once.
154 IndexConcurrency int
155
156 // Interval is how often we sync with Sourcegraph.
157 Interval time.Duration
158 // CPUCount is the amount of parallelism to use when indexing a
159 // repository.
160 CPUCount int
161
162 queue Queue
163
164 // muIndexDir protects the index directory from concurrent access.
165 muIndexDir indexMutex
166
167 // If true, shard merging is enabled.
168 shardMerging bool
169
170 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
171 // use delta-builds for instead of normal builds
172 deltaBuildRepositoriesAllowList map[string]struct{}
173
174 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
175 // before attempting a delta build.
176 deltaShardNumberFallbackThreshold uint64
177
178 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
179 // we skip calculating symbols metadata for during builds
180 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
181
182 hostname string
183
184 mergeOpts mergeOpts
185}
186
187var debug = log.New(io.Discard, "", log.LstdFlags)
188
189// our index commands should output something every 100mb they process.
190//
191// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
192// time." famous last words. A client was indexing a monorepo with 42
193// cores... 5m was not enough.
194const noOutputTimeout = 30 * time.Minute
195
196func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
197 out := &synchronizedBuffer{}
198 cmd.Stdout = out
199 cmd.Stderr = out
200
201 tr.LazyPrintf("%s", cmd.Args)
202
203 defer func() {
204 if err != nil {
205 outS := out.String()
206 tr.LazyPrintf("failed: %v", err)
207 tr.LazyPrintf("output: %s", out)
208 tr.SetError()
209 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
210 }
211 }()
212
213 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
214
215 if err := cmd.Start(); err != nil {
216 return err
217 }
218
219 errC := make(chan error)
220 go func() {
221 errC <- cmd.Wait()
222 }()
223
224 // This channel is set after we have sent sigquit. It allows us to follow up
225 // with a sigkill if the process doesn't quit after sigquit.
226 kill := make(<-chan time.Time)
227
228 lastLen := 0
229 for {
230 select {
231 case <-time.After(noOutputTimeout):
232 // Periodically check if we have had output. If not kill the process.
233 if out.Len() != lastLen {
234 lastLen = out.Len()
235 log.Printf("still running %s", cmd.Args)
236 } else {
237 // Send quit (C-\) first so we get a stack dump.
238 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
239 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
240 log.Println("quit failed:", err)
241 }
242
243 // send sigkill if still running in 10s
244 kill = time.After(10 * time.Second)
245 }
246
247 case <-kill:
248 log.Printf("still running, killing %s", cmd.Args)
249 if err := cmd.Process.Kill(); err != nil {
250 log.Println("kill failed:", err)
251 }
252
253 case err := <-errC:
254 if err != nil {
255 return err
256 }
257
258 tr.LazyPrintf("success")
259 return nil
260 }
261 }
262}
263
264// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
265// monitor the buffer while it is being written to.
266type synchronizedBuffer struct {
267 mu sync.Mutex
268 b bytes.Buffer
269}
270
271func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
272 sb.mu.Lock()
273 defer sb.mu.Unlock()
274 return sb.b.Write(p)
275}
276
277func (sb *synchronizedBuffer) Len() int {
278 sb.mu.Lock()
279 defer sb.mu.Unlock()
280 return sb.b.Len()
281}
282
283func (sb *synchronizedBuffer) String() string {
284 sb.mu.Lock()
285 defer sb.mu.Unlock()
286 return sb.b.String()
287}
288
289// pauseFileName if present in IndexDir will stop index jobs from
290// running. This is to make it possible to experiment with the content of the
291// IndexDir without the indexserver writing to it.
292const pauseFileName = "PAUSE"
293
294// Run the sync loop. This blocks forever.
295func (s *Server) Run() {
296 removeIncompleteShards(s.IndexDir)
297
298 // Start a goroutine which updates the queue with commits to index.
299 go func() {
300 // We update the list of indexed repos every Interval. To speed up manual
301 // testing we also listen for SIGUSR1 to trigger updates.
302 //
303 // "pkill -SIGUSR1 zoekt-sourcegra"
304 for range jitterTicker(s.Interval, unix.SIGUSR1) {
305 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
306 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
307 continue
308 }
309
310 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
311 if err != nil {
312 log.Printf("error listing repos: %s", err)
313 continue
314 }
315
316 debug.Printf("updating index queue with %d repositories", len(repos.IDs))
317
318 // Stop indexing repos we don't need to track anymore
319 removed := s.queue.MaybeRemoveMissing(repos.IDs)
320 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
321 if len(removed) > 0 {
322 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
323 }
324
325 cleanupDone := make(chan struct{})
326 go func() {
327 defer close(cleanupDone)
328 s.muIndexDir.Global(func() {
329 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
330 })
331 }()
332
333 repos.IterateIndexOptions(s.queue.AddOrUpdate)
334
335 // IterateIndexOptions will only iterate over repositories that have
336 // changed since we last called list. However, we want to add all IDs
337 // back onto the queue just to check that what is on disk is still
338 // correct. This will use the last IndexOptions we stored in the
339 // queue. The repositories not on the queue (missing) need a forced
340 // fetch of IndexOptions.
341 missing := s.queue.Bump(repos.IDs)
342 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
343
344 setCompoundShardCounter(s.IndexDir)
345
346 <-cleanupDone
347 }
348 }()
349
350 go func() {
351 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
352 if s.shardMerging {
353 s.vacuum()
354 }
355 }
356 }()
357
358 go func() {
359 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
360 if s.shardMerging {
361 s.doMerge()
362 }
363 }
364 }()
365
366 for i := 0; i < s.IndexConcurrency; i++ {
367 go s.processQueue()
368 }
369
370 // block forever
371 select {}
372}
373
374// formatList returns a comma-separated list of the first min(len(v), m) items.
375func formatListUint32(v []uint32, m int) string {
376 if len(v) < m {
377 m = len(v)
378 }
379
380 sb := strings.Builder{}
381 for i := 0; i < m; i++ {
382 fmt.Fprintf(&sb, "%d, ", v[i])
383 }
384
385 if len(v) > m {
386 sb.WriteString("...")
387 }
388
389 return strings.TrimRight(sb.String(), ", ")
390}
391
392func (s *Server) processQueue() {
393 for {
394 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
395 time.Sleep(time.Second)
396 continue
397 }
398
399 opts, ok := s.queue.Pop()
400 if !ok {
401 time.Sleep(time.Second)
402 continue
403 }
404
405 args := s.indexArgs(opts)
406
407 ran := s.muIndexDir.With(opts.Name, func() {
408 // only record time taken once we hold the lock. This avoids us
409 // recording time taken while merging/cleanup runs.
410 start := time.Now()
411
412 state, err := s.Index(args)
413
414 elapsed := time.Since(start)
415
416 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
417
418 if err != nil {
419 log.Printf("error indexing %s: %s", args.String(), err)
420 }
421
422 switch state {
423 case indexStateSuccess:
424 var branches []string
425 for _, b := range args.Branches {
426 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
427 }
428 s.logger.Info("updated index",
429 sglog.String("repo", args.Name),
430 sglog.Uint32("id", args.RepoID),
431 sglog.Strings("branches", branches),
432 sglog.Duration("duration", elapsed),
433 )
434 case indexStateSuccessMeta:
435 log.Printf("updated meta %s in %v", args.String(), elapsed)
436 }
437 s.queue.SetIndexed(opts, state)
438 })
439
440 if !ran {
441 // Someone else is processing the repository. We can just skip this job
442 // since the repository will be added back to the queue and we will
443 // converge to the correct behaviour.
444 debug.Printf("index job for repository already running: %s", args)
445 continue
446 }
447 }
448}
449
450// repoNameForMetric returns a normalized version of the given repository name that is
451// suitable for use with Prometheus metrics.
452func repoNameForMetric(repo string) string {
453 // Check to see if we want to be able to capture separate indexing metrics for this repository.
454 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
455 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
456 return repo
457 }
458
459 return ""
460}
461
462func batched(slice []uint32, size int) <-chan []uint32 {
463 c := make(chan []uint32)
464 go func() {
465 for len(slice) > 0 {
466 if size > len(slice) {
467 size = len(slice)
468 }
469 c <- slice[:size]
470 slice = slice[size:]
471 }
472 close(c)
473 }()
474 return c
475}
476
477// jitterTicker returns a ticker which ticks with a jitter. Each tick is
478// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
479//
480// sig is a list of signals which also cause the ticker to fire. This is a
481// convenience to allow manually triggering of the ticker.
482func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
483 ticker := make(chan struct{})
484
485 go func() {
486 for {
487 ticker <- struct{}{}
488 ns := int64(d)
489 jitter := rand.Int63n(ns)
490 time.Sleep(time.Duration(ns/2 + jitter))
491 }
492 }()
493
494 go func() {
495 if len(sig) == 0 {
496 return
497 }
498
499 c := make(chan os.Signal, 1)
500 signal.Notify(c, sig...)
501 for range c {
502 ticker <- struct{}{}
503 }
504 }()
505
506 return ticker
507}
508
509// Index starts an index job for repo name at commit.
510func (s *Server) Index(args *indexArgs) (state indexState, err error) {
511 tr := trace.New("index", args.Name)
512
513 defer func() {
514 if err != nil {
515 tr.SetError()
516 tr.LazyPrintf("error: %v", err)
517 state = indexStateFail
518 metricFailingTotal.Inc()
519 }
520 tr.LazyPrintf("state: %s", state)
521 tr.Finish()
522 }()
523
524 tr.LazyPrintf("branches: %v", args.Branches)
525
526 if len(args.Branches) == 0 {
527 return indexStateEmpty, createEmptyShard(args)
528 }
529
530 repositoryName := args.Name
531 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
532 tr.LazyPrintf("marking this repository for delta build")
533 args.UseDelta = true
534 }
535
536 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
537
538 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
539 tr.LazyPrintf("skipping symbols calculation")
540 args.Symbols = false
541 }
542
543 reason := "forced"
544
545 if args.Incremental {
546 bo := args.BuildOptions()
547 bo.SetDefaults()
548 incrementalState, fn := bo.IndexState()
549 reason = string(incrementalState)
550 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
551
552 switch incrementalState {
553 case build.IndexStateEqual:
554 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn)
555 return indexStateNoop, nil
556
557 case build.IndexStateMeta:
558 log.Printf("updating index.meta %s", args.String())
559
560 if err := mergeMeta(bo); err != nil {
561 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
562 } else {
563 return indexStateSuccessMeta, nil
564 }
565
566 case build.IndexStateCorrupt:
567 log.Printf("falling back to full update: corrupt index: %s", args.String())
568 }
569 }
570
571 log.Printf("updating index %s reason=%s", args.String(), reason)
572
573 metricIndexingTotal.Inc()
574 c := gitIndexConfig{
575 runCmd: func(cmd *exec.Cmd) error {
576 return s.loggedRun(tr, cmd)
577 },
578
579 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
580 return args.BuildOptions().FindRepositoryMetadata()
581 },
582 }
583
584 err = gitIndex(c, args, s.Sourcegraph, s.logger)
585 if err != nil {
586 return indexStateFail, err
587 }
588
589 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
590 s.logger.Error("failed to update index status",
591 sglog.String("repo", args.Name),
592 sglog.Uint32("id", args.RepoID),
593 sglogBranches("branches", args.Branches),
594 sglog.Error(err),
595 )
596 }
597
598 return indexStateSuccess, nil
599}
600
601// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
602// it can update the zoekt_repos table.
603func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
604 // We need to read from disk for IndexTime.
605 _, metadata, ok, err := c.findRepositoryMetadata(args)
606 if err != nil {
607 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
608 }
609 if !ok {
610 return errors.New("failed to find metadata for new/updated index")
611 }
612
613 status := []indexStatus{{
614 RepoID: args.RepoID,
615 Branches: args.Branches,
616 IndexTimeUnix: metadata.IndexTime.Unix(),
617 }}
618 if err := sg.UpdateIndexStatus(status); err != nil {
619 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
620 }
621
622 return nil
623}
624
625func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
626 ss := make([]string, len(branches))
627 for i, b := range branches {
628 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
629 }
630 return sglog.Strings(key, ss)
631}
632
633func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
634 return &indexArgs{
635 IndexOptions: opts,
636
637 IndexDir: s.IndexDir,
638 Parallelism: s.CPUCount,
639
640 Incremental: true,
641
642 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
643 FileLimit: 1 << 20,
644 }
645}
646
647func createEmptyShard(args *indexArgs) error {
648 bo := args.BuildOptions()
649 bo.SetDefaults()
650 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
651
652 if args.Incremental && bo.IncrementalSkipIndexing() {
653 return nil
654 }
655
656 builder, err := build.NewBuilder(*bo)
657 if err != nil {
658 return err
659 }
660 return builder.Finish()
661}
662
663// addDebugHandlers adds handlers specific to indexserver.
664func (s *Server) addDebugHandlers(mux *http.ServeMux) {
665 // Sourcegraph's site admin view requires indexserver to serve it's admin view
666 // on "/".
667 mux.Handle("/", http.HandlerFunc(s.handleRoot))
668
669 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
670 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
671 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
672 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
673 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
674 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
675}
676
677func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
678 if r.Method != "GET" {
679 w.Header().Set("Allow", "GET")
680 w.WriteHeader(http.StatusMethodNotAllowed)
681 return
682 }
683
684 response := struct {
685 Hostname string
686 }{
687 Hostname: s.hostname,
688 }
689
690 b, err := json.Marshal(response)
691 if err != nil {
692 http.Error(w, err.Error(), http.StatusInternalServerError)
693 return
694 }
695
696 w.Header().Set("Content-Type", "application/json; charset=utf-8")
697 w.Write(b)
698}
699
700var rootTmpl = template.Must(template.New("name").Parse(`
701<html>
702 <body>
703 <a href="debug">Debug</a><br />
704 <a href="debug/requests">Traces</a><br />
705 {{.IndexMsg}}<br />
706 <br />
707 <h3>Reindex</h3>
708 {{if .Repos}}
709 <a href="?show_repos=false">hide repos</a><br />
710 <table style="margin-top: 20px">
711 <th style="text-align:left">Name</th>
712 <th style="text-align:left">ID</th>
713 {{range .Repos}}
714 <tr>
715 <td>{{.Name}}</td>
716 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
717 </tr>
718 {{end}}
719 </table>
720 {{else}}
721 <a href="?show_repos=true">show repos</a><br />
722 {{end}}
723 </body>
724</html>
725`))
726
727func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
728 if r.Method != "GET" {
729 w.Header().Set("Allow", "GET")
730 w.WriteHeader(http.StatusMethodNotAllowed)
731 return
732 }
733
734 values := r.URL.Query()
735
736 // ?id=
737 indexMsg := ""
738 if v := values.Get("id"); v != "" {
739 id, err := strconv.Atoi(v)
740 if err != nil {
741 http.Error(w, err.Error(), http.StatusBadRequest)
742 return
743 }
744 indexMsg, _ = s.forceIndex(uint32(id))
745 }
746
747 // ?show_repos=
748 showRepos := false
749 if v := values.Get("show_repos"); v != "" {
750 showRepos, _ = strconv.ParseBool(v)
751 }
752
753 type Repo struct {
754 ID uint32
755 Name string
756 }
757 var data struct {
758 Repos []Repo
759 IndexMsg string
760 }
761
762 data.IndexMsg = indexMsg
763
764 if showRepos {
765 s.queue.Iterate(func(opts *IndexOptions) {
766 data.Repos = append(data.Repos, Repo{
767 ID: opts.RepoID,
768 Name: opts.Name,
769 })
770 })
771 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
772 }
773
774 _ = rootTmpl.Execute(w, data)
775}
776
777// handleReindex triggers a reindex asynocronously. If a reindex was triggered
778// the request returns with status 202. The caller can infer the new state of
779// the index by calling List.
780func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
781 if r.Method != http.MethodPost {
782 w.Header().Set("Allow", http.MethodPost)
783 w.WriteHeader(http.StatusMethodNotAllowed)
784 return
785 }
786
787 err := r.ParseForm()
788 if err != nil {
789 http.Error(w, err.Error(), http.StatusBadRequest)
790 return
791 }
792
793 id, err := strconv.Atoi(r.Form.Get("repo"))
794 if err != nil {
795 http.Error(w, err.Error(), http.StatusBadRequest)
796 return
797 }
798
799 go func() { s.forceIndex(uint32(id)) }()
800
801 // 202 Accepted
802 w.WriteHeader(http.StatusAccepted)
803}
804
805func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
806 withIndexed := true
807 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
808 withIndexed = b
809 }
810
811 var indexed []uint32
812 if withIndexed {
813 indexed = listIndexed(s.IndexDir)
814 }
815
816 repos, err := s.Sourcegraph.List(r.Context(), indexed)
817 if err != nil {
818 http.Error(w, err.Error(), http.StatusInternalServerError)
819 return
820 }
821
822 bw := bytes.Buffer{}
823
824 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
825
826 _, err = fmt.Fprintf(tw, "ID\tName\n")
827 if err != nil {
828 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
829 return
830 }
831
832 s.queue.mu.Lock()
833 name := ""
834 for _, id := range repos.IDs {
835 if item := s.queue.get(id); item != nil {
836 name = item.opts.Name
837 } else {
838 name = ""
839 }
840 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
841 if err != nil {
842 debug.Printf("handleDebugList: %s\n", err.Error())
843 }
844 }
845 s.queue.mu.Unlock()
846
847 if err != nil {
848 http.Error(w, err.Error(), http.StatusInternalServerError)
849 return
850 }
851
852 err = tw.Flush()
853 if err != nil {
854 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
855 return
856 }
857
858 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
859
860 if _, err := io.Copy(w, &bw); err != nil {
861 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
862 return
863 }
864}
865
866// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
867// can run this command during periods of low usage (evenings, weekends) to
868// trigger an initial merge run. In the steady-state, merges happen rarely, even
869// on busy instances, and users can rely on automatic merging instead.
870func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
871
872 // A merge operation can take very long, depending on the number merges and the
873 // target size of the compound shards. We run the merge in the background and
874 // return immediately to the user.
875 //
876 // We track the status of the merge with metricShardMergingRunning.
877 go func() {
878 s.doMerge()
879 }()
880 _, _ = w.Write([]byte("merging enqueued\n"))
881}
882
883func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
884 indexed := listIndexed(s.IndexDir)
885
886 bw := bytes.Buffer{}
887
888 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
889
890 _, err := fmt.Fprintf(tw, "ID\tName\n")
891 if err != nil {
892 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
893 return
894 }
895
896 s.queue.mu.Lock()
897 name := ""
898 for _, id := range indexed {
899 if item := s.queue.get(id); item != nil {
900 name = item.opts.Name
901 } else {
902 name = ""
903 }
904 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
905 if err != nil {
906 debug.Printf("handleDebugIndexed: %s\n", err.Error())
907 }
908 }
909 s.queue.mu.Unlock()
910
911 if err != nil {
912 http.Error(w, err.Error(), http.StatusInternalServerError)
913 return
914 }
915
916 err = tw.Flush()
917 if err != nil {
918 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
919 return
920 }
921
922 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
923
924 if _, err := io.Copy(w, &bw); err != nil {
925 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
926 return
927 }
928}
929
930// forceIndex will run the index job for repo name now. It will return always
931// return a string explaining what it did, even if it failed.
932func (s *Server) forceIndex(id uint32) (string, error) {
933 var opts IndexOptions
934 var err error
935 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
936 opts = o
937 }, func(_ uint32, e error) {
938 err = e
939 }, id)
940 if err != nil {
941 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
942 }
943
944 args := s.indexArgs(opts)
945 args.Incremental = false // force re-index
946
947 var state indexState
948 ran := s.muIndexDir.With(opts.Name, func() {
949 state, err = s.Index(args)
950 })
951 if !ran {
952 return fmt.Sprintf("index job for repository already running: %s", args), nil
953 }
954 if err != nil {
955 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
956 }
957 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
958}
959
960func listIndexed(indexDir string) []uint32 {
961 index := getShards(indexDir)
962 metricNumIndexed.Set(float64(len(index)))
963 repoIDs := make([]uint32, 0, len(index))
964 for id := range index {
965 repoIDs = append(repoIDs, id)
966 }
967 sort.Slice(repoIDs, func(i, j int) bool {
968 return repoIDs[i] < repoIDs[j]
969 })
970 return repoIDs
971}
972
973func hostnameBestEffort() string {
974 if h := os.Getenv("NODE_NAME"); h != "" {
975 return h
976 }
977 if h := os.Getenv("HOSTNAME"); h != "" {
978 return h
979 }
980 hostname, _ := os.Hostname()
981 return hostname
982}
983
984// setupTmpDir sets up a temporary directory on the same volume as the
985// indexes.
986//
987// If main is true we will delete older temp directories left around. main is
988// false when this is a debug command.
989func setupTmpDir(main bool, index string) error {
990 // change the target tmp directory depending on if its our main daemon or a
991 // debug sub command.
992 dir := ".indexserver.debug.tmp"
993 if main {
994 dir = ".indexserver.tmp"
995 }
996
997 tmpRoot := filepath.Join(index, dir)
998 if err := os.MkdirAll(tmpRoot, 0755); err != nil {
999 return err
1000 }
1001 if !tmpfriend.IsTmpFriendDir(tmpRoot) {
1002 _, err := tmpfriend.RootTempDir(tmpRoot)
1003 return err
1004 }
1005 return nil
1006}
1007
1008func printMetaData(fn string) error {
1009 repo, indexMeta, err := zoekt.ReadMetadataPath(fn)
1010 if err != nil {
1011 return err
1012 }
1013
1014 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1015 if err != nil {
1016 return err
1017 }
1018
1019 err = json.NewEncoder(os.Stdout).Encode(repo)
1020 if err != nil {
1021 return err
1022 }
1023 return nil
1024}
1025
1026func printShardStats(fn string) error {
1027 f, err := os.Open(fn)
1028 if err != nil {
1029 return err
1030 }
1031
1032 iFile, err := zoekt.NewIndexFile(f)
1033 if err != nil {
1034 return err
1035 }
1036
1037 return zoekt.PrintNgramStats(iFile)
1038}
1039
1040func srcLogLevelIsDebug() bool {
1041 lvl := os.Getenv(sglog.EnvLogLevel)
1042 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1043}
1044
1045func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1046 v := os.Getenv(k)
1047 if v == "" {
1048 return defaultVal
1049 }
1050 i, err := strconv.ParseInt(v, 10, 64)
1051 if err != nil {
1052 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1053 }
1054 return i
1055}
1056
1057func getEnvWithDefaultInt(k string, defaultVal int) int {
1058 v := os.Getenv(k)
1059 if v == "" {
1060 return defaultVal
1061 }
1062 i, err := strconv.Atoi(k)
1063 if err != nil {
1064 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1065 }
1066 return i
1067}
1068
1069func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1070 v := os.Getenv(k)
1071 if v == "" {
1072 return defaultVal
1073 }
1074 i, err := strconv.ParseUint(v, 10, 64)
1075 if err != nil {
1076 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1077 }
1078 return i
1079}
1080
1081func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1082 v := os.Getenv(k)
1083 if v == "" {
1084 return defaultVal
1085 }
1086 f, err := strconv.ParseFloat(v, 64)
1087 if err != nil {
1088 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1089 }
1090 return f
1091}
1092
1093func getEnvWithDefaultString(k string, defaultVal string) string {
1094 v := os.Getenv(k)
1095 if v == "" {
1096 return defaultVal
1097 }
1098 return v
1099}
1100
1101func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1102 v := os.Getenv(k)
1103 if v == "" {
1104 return defaultVal
1105 }
1106
1107 d, err := time.ParseDuration(v)
1108 if err != nil {
1109 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1110 }
1111 return d
1112}
1113
1114func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1115 v := os.Getenv(k)
1116 if v == "" {
1117 return defaultVal
1118 }
1119
1120 b, err := strconv.ParseBool(v)
1121 if err != nil {
1122 log.Fatalf("error parsing ENV %s to bool: %s", k, err)
1123 }
1124 return b
1125}
1126
1127func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1128 set := map[string]struct{}{}
1129 for _, v := range strings.Split(os.Getenv(k), ",") {
1130 v = strings.TrimSpace(v)
1131 if v != "" {
1132 set[v] = struct{}{}
1133 }
1134 }
1135 return set
1136}
1137
1138func joinStringSet(set map[string]struct{}, sep string) string {
1139 var xs []string
1140 for x := range set {
1141 xs = append(xs, x)
1142 }
1143
1144 return strings.Join(xs, sep)
1145}
1146
1147func setCompoundShardCounter(indexDir string) {
1148 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt"))
1149 if err != nil {
1150 log.Printf("setCompoundShardCounter: %s\n", err)
1151 return
1152 }
1153 metricNumberCompoundShards.Set(float64(len(fns)))
1154}
1155
1156func rootCmd() *ffcli.Command {
1157 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1158 conf := rootConfig{
1159 Main: true,
1160 }
1161 conf.registerRootFlags(rootFs)
1162
1163 return &ffcli.Command{
1164 FlagSet: rootFs,
1165 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1166 Subcommands: []*ffcli.Command{debugCmd()},
1167 Exec: func(ctx context.Context, args []string) error {
1168 return startServer(conf)
1169 },
1170 }
1171}
1172
1173type rootConfig struct {
1174 // Main is true if this rootConfig is for our main long running command (the
1175 // indexserver). Debug commands should not set this value. This is used to
1176 // determine if we need to run tmpfriend.
1177 Main bool
1178
1179 root string
1180 interval time.Duration
1181 index string
1182 indexConcurrency int64
1183 listen string
1184 hostname string
1185 cpuFraction float64
1186 blockProfileRate int
1187
1188 // config values related to shard merging
1189 vacuumInterval time.Duration
1190 mergeInterval time.Duration
1191 targetSize int64
1192 minSize int64
1193 minAgeDays int
1194 maxPriority float64
1195
1196 // config values related to backoff indexing repos with one or more consecutive failures
1197 backoffDuration time.Duration
1198 maxBackoffDuration time.Duration
1199
1200 // useGRPC is true if we should use the gRPC API to talk to Sourcegraph.
1201 useGRPC bool
1202}
1203
1204func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1205 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1206 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1207 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of concurrent index jobs to run.")
1208 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use")
1209 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1210 fs.StringVar(&rc.hostname, "hostname", hostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1211 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1212 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate")
1213 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1214 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1215 fs.BoolVar(&rc.useGRPC, "use_grpc", getEnvWithDefaultBool("GRPC_ENABLED", false), "use the gRPC API to talk to Sourcegraph")
1216
1217 // flags related to shard merging
1218 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1219 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1220 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB")
1221 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB")
1222 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1223 fs.Float64Var(&rc.maxPriority, "merge_max_priority", getEnvWithDefaultFloat64("SRC_MERGE_MAX_PRIORITY", 100), "the maximum priority a shard can have to be considered for merging.")
1224}
1225
1226func startServer(conf rootConfig) error {
1227 s, err := newServer(conf)
1228 if err != nil {
1229 return err
1230 }
1231
1232 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate)
1233 setCompoundShardCounter(s.IndexDir)
1234
1235 if conf.listen != "" {
1236
1237 mux := http.NewServeMux()
1238 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1239 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1240 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1241 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1242 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1243 }...)
1244 s.addDebugHandlers(mux)
1245
1246 go func() {
1247 debug.Printf("serving HTTP on %s", conf.listen)
1248 log.Fatal(http.ListenAndServe(conf.listen, mux))
1249
1250 }()
1251
1252 // Serve mux on a unix domain socket on a best-effort-basis so that
1253 // webserver can call the endpoints via the shared filesystem.
1254 //
1255 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1256 // on the socket due to permission errors. See
1257 // https://github.com/docker/for-mac/issues/6239
1258 go func() {
1259 serveHTTPOverSocket := func() error {
1260 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1261 // We cannot bind a socket to an existing pathname.
1262 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1263 return fmt.Errorf("error removing socket file: %s", socket)
1264 }
1265 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1266 // unixpacket).
1267 l, err := net.Listen("unix", socket)
1268 if err != nil {
1269 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1270 }
1271 // Indexserver (root) and webserver (Sourcegraph) run with
1272 // different users. Per default, the socket is created with
1273 // permission 755 (root root), which doesn't let webserver write to
1274 // it.
1275 //
1276 // See https://github.com/golang/go/issues/11822 for more context.
1277 if err := os.Chmod(socket, 0777); err != nil {
1278 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1279 }
1280 debug.Printf("serving HTTP on %s", socket)
1281 return http.Serve(l, mux)
1282 }
1283 debug.Print(serveHTTPOverSocket())
1284 }()
1285 }
1286
1287 oc := &ownerChecker{
1288 Path: filepath.Join(conf.index, "owner.txt"),
1289 Hostname: conf.hostname,
1290 }
1291 go oc.Run()
1292
1293 logger := sglog.Scoped("metricsRegistration", "")
1294 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1295
1296 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1297 prometheus.DefaultRegisterer.MustRegister(c)
1298
1299 s.Run()
1300 return nil
1301}
1302
1303func newServer(conf rootConfig) (*Server, error) {
1304 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1305 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1306 }
1307 if conf.index == "" {
1308 return nil, fmt.Errorf("must set -index")
1309 }
1310 if conf.root == "" {
1311 return nil, fmt.Errorf("must set -sourcegraph_url")
1312 }
1313 rootURL, err := url.Parse(conf.root)
1314 if err != nil {
1315 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1316 }
1317
1318 rootURL = addDefaultPort(rootURL)
1319
1320 // Tune GOMAXPROCS to match Linux container CPU quota.
1321 _, _ = maxprocs.Set()
1322
1323 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler.
1324 // The block profiler is disabled by default and should be enabled with care in production
1325 runtime.SetBlockProfileRate(conf.blockProfileRate)
1326
1327 // Automatically prepend our own path at the front, to minimize
1328 // required configuration.
1329 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1330 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1331 }
1332
1333 if _, err := os.Stat(conf.index); err != nil {
1334 if err := os.MkdirAll(conf.index, 0755); err != nil {
1335 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1336 }
1337 }
1338
1339 if err := setupTmpDir(conf.Main, conf.index); err != nil {
1340 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1341 }
1342
1343 if srcLogLevelIsDebug() {
1344 debug = log.New(os.Stderr, "", log.LstdFlags)
1345 }
1346
1347 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1348 if len(reposWithSeparateIndexingMetrics) > 0 {
1349 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1350 }
1351
1352 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1353 if len(deltaBuildRepositoriesAllowList) > 0 {
1354 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1355 }
1356
1357 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1358 if deltaShardNumberFallbackThreshold > 0 {
1359 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1360 } else {
1361 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1362 }
1363
1364 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1365 if len(reposShouldSkipSymbolsCalculation) > 0 {
1366 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1367 }
1368
1369 var sg Sourcegraph
1370 if rootURL.IsAbs() {
1371 var batchSize int
1372 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1373 batchSize, err = strconv.Atoi(v)
1374 if err != nil {
1375 return nil, fmt.Errorf("Invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1376 }
1377 }
1378
1379 opts := []SourcegraphClientOption{
1380 WithBatchSize(batchSize),
1381 WithShouldUseGRPC(conf.useGRPC),
1382 }
1383
1384 gRPCConnectionOptions := []grpc.DialOption{
1385 grpc.WithTransportCredentials(insecure.NewCredentials()),
1386 grpc.WithChainStreamInterceptor(internalActorStreamInterceptor()),
1387 grpc.WithChainUnaryInterceptor(internalActorUnaryInterceptor()),
1388 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1389 }
1390
1391 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1392 // This is done lazily, so we can provide the client to use regardless of
1393 // whether we enabled gRPC or not initially.
1394 cc, err := grpc.Dial(rootURL.Host, gRPCConnectionOptions...)
1395 if err != nil {
1396 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1397 }
1398
1399 client := proto.NewZoektConfigurationServiceClient(cc)
1400 opts = append(opts, WithGRPCClient(client))
1401
1402 sg = newSourcegraphClient(rootURL, conf.hostname, opts...)
1403
1404 } else {
1405 sg = sourcegraphFake{
1406 RootDir: rootURL.String(),
1407 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1408 }
1409 }
1410
1411 if conf.indexConcurrency < 1 {
1412 conf.indexConcurrency = 1
1413 }
1414
1415 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
1416 if cpuCount < 1 {
1417 cpuCount = 1
1418 }
1419
1420 logger := sglog.Scoped("server", "periodically reindexes enabled repositories on sourcegraph")
1421
1422 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1423
1424 return &Server{
1425 logger: logger,
1426 Sourcegraph: sg,
1427 IndexDir: conf.index,
1428 IndexConcurrency: int(conf.indexConcurrency),
1429 Interval: conf.interval,
1430 CPUCount: cpuCount,
1431 queue: *q,
1432 shardMerging: zoekt.ShardMergingEnabled(),
1433 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1434 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1435 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1436 hostname: conf.hostname,
1437 mergeOpts: mergeOpts{
1438 vacuumInterval: conf.vacuumInterval,
1439 mergeInterval: conf.mergeInterval,
1440 targetSizeBytes: conf.targetSize * 1024 * 1024,
1441 minSizeBytes: conf.minSize * 1024 * 1024,
1442 minAgeDays: conf.minAgeDays,
1443 maxPriority: conf.maxPriority,
1444 },
1445 }, err
1446}
1447
1448// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1449// for the indexed-search-configuration gRPC service.
1450//
1451// The default backoff strategy is modeled after the default settings used by
1452// retryablehttp.DefaultClient.
1453//
1454// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1455// - Unavailable
1456// - Aborted
1457//
1458//go:embed default_grpc_service_configuration.json
1459var defaultGRPCServiceConfigurationJSON string
1460
1461func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1462 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1463 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1464 return invoker(ctx, method, req, reply, cc, opts...)
1465 }
1466}
1467
1468func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1469 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1470 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1471 return streamer(ctx, desc, cc, method, opts...)
1472 }
1473}
1474
1475// addDefaultPort adds a default port to a URL if one is not specified.
1476//
1477// If the URL scheme is "http" and no port is specified, "80" is used.
1478// If the scheme is "https", "443" is used.
1479//
1480// The original URL is not mutated. A copy is modified and returned.
1481func addDefaultPort(original *url.URL) *url.URL {
1482 if original == nil {
1483 return nil // don't panic
1484 }
1485
1486 if !original.IsAbs() {
1487 return original // don't do anything if the URL doesn't look like a remote URL
1488 }
1489
1490 if original.Scheme == "http" && original.Port() == "" {
1491 u := cloneURL(original)
1492 u.Host = net.JoinHostPort(u.Host, "80")
1493 return u
1494 }
1495
1496 if original.Scheme == "https" && original.Port() == "" {
1497 u := cloneURL(original)
1498 u.Host = net.JoinHostPort(u.Host, "443")
1499 return u
1500 }
1501
1502 return original
1503}
1504
1505// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1506// This is copied from net/http/clone.go
1507func cloneURL(u *url.URL) *url.URL {
1508 if u == nil {
1509 return nil
1510 }
1511 u2 := new(url.URL)
1512 *u2 = *u
1513 if u.User != nil {
1514 u2.User = new(url.Userinfo)
1515 *u2.User = *u.User
1516 }
1517 return u2
1518}
1519
1520func main() {
1521 liblog := sglog.Init(sglog.Resource{
1522 Name: "zoekt-indexserver",
1523 Version: zoekt.Version,
1524 InstanceID: hostnameBestEffort(),
1525 })
1526 defer liblog.Sync()
1527
1528 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1529 log.Fatal(err)
1530 }
1531}