fork of https://github.com/sourcegraph/zoekt
1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled
2// repositories on sourcegraph
3package main
4
5import (
6 "bytes"
7 "context"
8 _ "embed"
9 "encoding/json"
10 "errors"
11 "flag"
12 "fmt"
13 "html/template"
14 "io"
15 "io/fs"
16 "log"
17 "math"
18 "math/rand"
19 "net"
20 "net/http"
21 "net/url"
22 "os"
23 "os/exec"
24 "os/signal"
25 "path/filepath"
26 "runtime"
27 "sort"
28 "strconv"
29 "strings"
30 "sync"
31 "text/tabwriter"
32 "time"
33
34 "github.com/keegancsmith/tmpfriend"
35 "github.com/peterbourgon/ff/v3/ffcli"
36 "github.com/prometheus/client_golang/prometheus"
37 "github.com/prometheus/client_golang/prometheus/promauto"
38 sglog "github.com/sourcegraph/log"
39 "go.uber.org/automaxprocs/maxprocs"
40 "golang.org/x/net/trace"
41 "golang.org/x/sys/unix"
42 "google.golang.org/grpc"
43 "google.golang.org/grpc/credentials/insecure"
44 "google.golang.org/grpc/metadata"
45
46 "github.com/sourcegraph/mountinfo"
47
48 "github.com/sourcegraph/zoekt"
49 "github.com/sourcegraph/zoekt/build"
50 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
51 "github.com/sourcegraph/zoekt/debugserver"
52 "github.com/sourcegraph/zoekt/internal/profiler"
53)
54
55var (
56 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
57 Name: "resolve_revisions_seconds",
58 Help: "A histogram of latencies for resolving all repository revisions.",
59 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
60 })
61
62 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
63 Name: "resolve_revision_seconds",
64 Help: "A histogram of latencies for resolving a repository revision.",
65 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
66 }, []string{"success"}) // success=true|false
67
68 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
69 Name: "get_index_options_total",
70 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
71 })
72 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
73 Name: "get_index_options_error_total",
74 Help: "The total number of times we failed to get index options for a repository.",
75 })
76
77 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
78 Name: "index_repo_seconds",
79 Help: "A histogram of latencies for indexing a repository.",
80 Buckets: prometheus.ExponentialBucketsRange(
81 (100 * time.Millisecond).Seconds(),
82 (40*time.Minute + indexTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
83 20),
84 }, []string{
85 "state", // state is an indexState
86 "name", // name of the repository that was indexed
87 })
88
89 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
90 Name: "index_fetch_seconds",
91 Help: "A histogram of latencies for fetching a repository.",
92 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
93 }, []string{
94 "success", // true|false
95 "name", // the name of the repository that the commits were fetched from
96 })
97
98 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
99 Name: "index_incremental_index_state",
100 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
101 }, []string{"state"}) // state is build.IndexState
102
103 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
104 Name: "index_num_indexed",
105 Help: "Number of indexed repos by code host",
106 })
107
108 metricNumAssigned = promauto.NewGauge(prometheus.GaugeOpts{
109 Name: "index_num_assigned",
110 Help: "Number of repos assigned to this indexer by code host",
111 })
112
113 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
114 Name: "index_failing_total",
115 Help: "Counts failures to index (indexing activity, should be used with rate())",
116 })
117
118 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
119 Name: "index_indexing_total",
120 Help: "Counts indexings (indexing activity, should be used with rate())",
121 })
122
123 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
124 Name: "index_num_stopped_tracking_total",
125 Help: "Counts the number of repos we stopped tracking.",
126 })
127)
128
129// set of repositories that we want to capture separate indexing metrics for
130var reposWithSeparateIndexingMetrics = make(map[string]struct{})
131
132type indexState string
133
134const (
135 indexStateFail indexState = "fail"
136 indexStateSuccess indexState = "success"
137 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
138 indexStateNoop indexState = "noop" // We didn't need to update index
139 indexStateEmpty indexState = "empty" // index is empty (empty repo)
140)
141
142// Server is the main functionality of zoekt-sourcegraph-indexserver. It
143// exists to conveniently use all the options passed in via func main.
144type Server struct {
145 logger sglog.Logger
146
147 Sourcegraph Sourcegraph
148 BatchSize int
149
150 // IndexDir is the index directory to use.
151 IndexDir string
152
153 // IndexConcurrency is the number of repositories we index at once.
154 IndexConcurrency int
155
156 // Interval is how often we sync with Sourcegraph.
157 Interval time.Duration
158 // CPUCount is the amount of parallelism to use when indexing a
159 // repository.
160 CPUCount int
161
162 queue Queue
163
164 // muIndexDir protects the index directory from concurrent access.
165 muIndexDir indexMutex
166
167 // If true, shard merging is enabled.
168 shardMerging bool
169
170 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
171 // use delta-builds for instead of normal builds
172 deltaBuildRepositoriesAllowList map[string]struct{}
173
174 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
175 // before attempting a delta build.
176 deltaShardNumberFallbackThreshold uint64
177
178 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
179 // we skip calculating symbols metadata for during builds
180 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
181
182 hostname string
183
184 mergeOpts mergeOpts
185}
186
187var debug = log.New(io.Discard, "", log.LstdFlags)
188
189// our index commands should output something every 100mb they process.
190//
191// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
192// time." famous last words. A client was indexing a monorepo with 42
193// cores... 5m was not enough.
194const noOutputTimeout = 30 * time.Minute
195
196func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
197 out := &synchronizedBuffer{}
198 cmd.Stdout = out
199 cmd.Stderr = out
200
201 tr.LazyPrintf("%s", cmd.Args)
202
203 defer func() {
204 if err != nil {
205 outS := out.String()
206 tr.LazyPrintf("failed: %v", err)
207 tr.LazyPrintf("output: %s", out)
208 tr.SetError()
209 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
210 }
211 }()
212
213 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
214
215 if err := cmd.Start(); err != nil {
216 return err
217 }
218
219 errC := make(chan error)
220 go func() {
221 errC <- cmd.Wait()
222 }()
223
224 // This channel is set after we have sent sigquit. It allows us to follow up
225 // with a sigkill if the process doesn't quit after sigquit.
226 kill := make(<-chan time.Time)
227
228 lastLen := 0
229 for {
230 select {
231 case <-time.After(noOutputTimeout):
232 // Periodically check if we have had output. If not kill the process.
233 if out.Len() != lastLen {
234 lastLen = out.Len()
235 log.Printf("still running %s", cmd.Args)
236 } else {
237 // Send quit (C-\) first so we get a stack dump.
238 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
239 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
240 log.Println("quit failed:", err)
241 }
242
243 // send sigkill if still running in 10s
244 kill = time.After(10 * time.Second)
245 }
246
247 case <-kill:
248 log.Printf("still running, killing %s", cmd.Args)
249 if err := cmd.Process.Kill(); err != nil {
250 log.Println("kill failed:", err)
251 }
252
253 case err := <-errC:
254 if err != nil {
255 return err
256 }
257
258 tr.LazyPrintf("success")
259 return nil
260 }
261 }
262}
263
264// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
265// monitor the buffer while it is being written to.
266type synchronizedBuffer struct {
267 mu sync.Mutex
268 b bytes.Buffer
269}
270
271func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
272 sb.mu.Lock()
273 defer sb.mu.Unlock()
274 return sb.b.Write(p)
275}
276
277func (sb *synchronizedBuffer) Len() int {
278 sb.mu.Lock()
279 defer sb.mu.Unlock()
280 return sb.b.Len()
281}
282
283func (sb *synchronizedBuffer) String() string {
284 sb.mu.Lock()
285 defer sb.mu.Unlock()
286 return sb.b.String()
287}
288
289// pauseFileName if present in IndexDir will stop index jobs from
290// running. This is to make it possible to experiment with the content of the
291// IndexDir without the indexserver writing to it.
292const pauseFileName = "PAUSE"
293
294// Run the sync loop. This blocks forever.
295func (s *Server) Run() {
296 removeIncompleteShards(s.IndexDir)
297
298 // Start a goroutine which updates the queue with commits to index.
299 go func() {
300 // We update the list of indexed repos every Interval. To speed up manual
301 // testing we also listen for SIGUSR1 to trigger updates.
302 //
303 // "pkill -SIGUSR1 zoekt-sourcegra"
304 for range jitterTicker(s.Interval, unix.SIGUSR1) {
305 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
306 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
307 continue
308 }
309
310 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
311 if err != nil {
312 log.Printf("error listing repos: %s", err)
313 continue
314 }
315
316 debug.Printf("updating index queue with %d repositories", len(repos.IDs))
317
318 // Stop indexing repos we don't need to track anymore
319 removed := s.queue.MaybeRemoveMissing(repos.IDs)
320 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
321 if len(removed) > 0 {
322 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
323 }
324
325 cleanupDone := make(chan struct{})
326 go func() {
327 defer close(cleanupDone)
328 s.muIndexDir.Global(func() {
329 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
330 })
331 }()
332
333 repos.IterateIndexOptions(s.queue.AddOrUpdate)
334
335 // IterateIndexOptions will only iterate over repositories that have
336 // changed since we last called list. However, we want to add all IDs
337 // back onto the queue just to check that what is on disk is still
338 // correct. This will use the last IndexOptions we stored in the
339 // queue. The repositories not on the queue (missing) need a forced
340 // fetch of IndexOptions.
341 missing := s.queue.Bump(repos.IDs)
342 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
343
344 setCompoundShardCounter(s.IndexDir)
345
346 <-cleanupDone
347 }
348 }()
349
350 go func() {
351 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
352 if s.shardMerging {
353 s.vacuum()
354 }
355 }
356 }()
357
358 go func() {
359 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
360 if s.shardMerging {
361 s.doMerge()
362 }
363 }
364 }()
365
366 for i := 0; i < s.IndexConcurrency; i++ {
367 go s.processQueue()
368 }
369
370 // block forever
371 select {}
372}
373
374// formatList returns a comma-separated list of the first min(len(v), m) items.
375func formatListUint32(v []uint32, m int) string {
376 if len(v) < m {
377 m = len(v)
378 }
379
380 sb := strings.Builder{}
381 for i := 0; i < m; i++ {
382 fmt.Fprintf(&sb, "%d, ", v[i])
383 }
384
385 if len(v) > m {
386 sb.WriteString("...")
387 }
388
389 return strings.TrimRight(sb.String(), ", ")
390}
391
392func (s *Server) processQueue() {
393 for {
394 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
395 time.Sleep(time.Second)
396 continue
397 }
398
399 opts, ok := s.queue.Pop()
400 if !ok {
401 time.Sleep(time.Second)
402 continue
403 }
404
405 args := s.indexArgs(opts)
406
407 ran := s.muIndexDir.With(opts.Name, func() {
408 // only record time taken once we hold the lock. This avoids us
409 // recording time taken while merging/cleanup runs.
410 start := time.Now()
411
412 state, err := s.Index(args)
413
414 elapsed := time.Since(start)
415
416 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
417
418 if err != nil {
419 log.Printf("error indexing %s: %s", args.String(), err)
420 }
421
422 switch state {
423 case indexStateSuccess:
424 var branches []string
425 for _, b := range args.Branches {
426 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
427 }
428 s.logger.Info("updated index",
429 sglog.String("repo", args.Name),
430 sglog.Uint32("id", args.RepoID),
431 sglog.Strings("branches", branches),
432 sglog.Duration("duration", elapsed),
433 )
434 case indexStateSuccessMeta:
435 log.Printf("updated meta %s in %v", args.String(), elapsed)
436 }
437 s.queue.SetIndexed(opts, state)
438 })
439
440 if !ran {
441 // Someone else is processing the repository. We can just skip this job
442 // since the repository will be added back to the queue and we will
443 // converge to the correct behaviour.
444 debug.Printf("index job for repository already running: %s", args)
445 continue
446 }
447 }
448}
449
450// repoNameForMetric returns a normalized version of the given repository name that is
451// suitable for use with Prometheus metrics.
452func repoNameForMetric(repo string) string {
453 // Check to see if we want to be able to capture separate indexing metrics for this repository.
454 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
455 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
456 return repo
457 }
458
459 return ""
460}
461
462func batched(slice []uint32, size int) <-chan []uint32 {
463 c := make(chan []uint32)
464 go func() {
465 for len(slice) > 0 {
466 if size > len(slice) {
467 size = len(slice)
468 }
469 c <- slice[:size]
470 slice = slice[size:]
471 }
472 close(c)
473 }()
474 return c
475}
476
477// jitterTicker returns a ticker which ticks with a jitter. Each tick is
478// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
479//
480// sig is a list of signals which also cause the ticker to fire. This is a
481// convenience to allow manually triggering of the ticker.
482func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
483 ticker := make(chan struct{})
484
485 go func() {
486 for {
487 ticker <- struct{}{}
488 ns := int64(d)
489 jitter := rand.Int63n(ns)
490 time.Sleep(time.Duration(ns/2 + jitter))
491 }
492 }()
493
494 go func() {
495 if len(sig) == 0 {
496 return
497 }
498
499 c := make(chan os.Signal, 1)
500 signal.Notify(c, sig...)
501 for range c {
502 ticker <- struct{}{}
503 }
504 }()
505
506 return ticker
507}
508
509// Index starts an index job for repo name at commit.
510func (s *Server) Index(args *indexArgs) (state indexState, err error) {
511 tr := trace.New("index", args.Name)
512
513 defer func() {
514 if err != nil {
515 tr.SetError()
516 tr.LazyPrintf("error: %v", err)
517 state = indexStateFail
518 metricFailingTotal.Inc()
519 }
520 tr.LazyPrintf("state: %s", state)
521 tr.Finish()
522 }()
523
524 tr.LazyPrintf("branches: %v", args.Branches)
525
526 if len(args.Branches) == 0 {
527 return indexStateEmpty, createEmptyShard(args)
528 }
529
530 repositoryName := args.Name
531 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
532 tr.LazyPrintf("marking this repository for delta build")
533 args.UseDelta = true
534 }
535
536 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
537
538 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
539 tr.LazyPrintf("skipping symbols calculation")
540 args.Symbols = false
541 }
542
543 reason := "forced"
544
545 if args.Incremental {
546 bo := args.BuildOptions()
547 bo.SetDefaults()
548 incrementalState, fn := bo.IndexState()
549 reason = string(incrementalState)
550 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
551
552 switch incrementalState {
553 case build.IndexStateEqual:
554 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn)
555 return indexStateNoop, nil
556
557 case build.IndexStateMeta:
558 log.Printf("updating index.meta %s", args.String())
559
560 if err := mergeMeta(bo); err != nil {
561 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
562 } else {
563 return indexStateSuccessMeta, nil
564 }
565
566 case build.IndexStateCorrupt:
567 log.Printf("falling back to full update: corrupt index: %s", args.String())
568 }
569 }
570
571 log.Printf("updating index %s reason=%s", args.String(), reason)
572
573 metricIndexingTotal.Inc()
574 c := gitIndexConfig{
575 runCmd: func(cmd *exec.Cmd) error {
576 return s.loggedRun(tr, cmd)
577 },
578
579 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
580 return args.BuildOptions().FindRepositoryMetadata()
581 },
582 }
583
584 err = gitIndex(c, args, s.Sourcegraph, s.logger)
585 if err != nil {
586 return indexStateFail, err
587 }
588
589 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
590 s.logger.Error("failed to update index status",
591 sglog.String("repo", args.Name),
592 sglog.Uint32("id", args.RepoID),
593 sglogBranches("branches", args.Branches),
594 sglog.Error(err),
595 )
596 }
597
598 return indexStateSuccess, nil
599}
600
601// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
602// it can update the zoekt_repos table.
603func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
604 // We need to read from disk for IndexTime.
605 _, metadata, ok, err := c.findRepositoryMetadata(args)
606 if err != nil {
607 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
608 }
609 if !ok {
610 return errors.New("failed to find metadata for new/updated index")
611 }
612
613 status := []indexStatus{{
614 RepoID: args.RepoID,
615 Branches: args.Branches,
616 IndexTimeUnix: metadata.IndexTime.Unix(),
617 }}
618 if err := sg.UpdateIndexStatus(status); err != nil {
619 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
620 }
621
622 return nil
623}
624
625func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
626 ss := make([]string, len(branches))
627 for i, b := range branches {
628 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
629 }
630 return sglog.Strings(key, ss)
631}
632
633func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
634 return &indexArgs{
635 IndexOptions: opts,
636
637 IndexDir: s.IndexDir,
638 Parallelism: s.CPUCount,
639
640 Incremental: true,
641
642 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
643 FileLimit: 1 << 20,
644 }
645}
646
647func createEmptyShard(args *indexArgs) error {
648 bo := args.BuildOptions()
649 bo.SetDefaults()
650 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
651
652 if args.Incremental && bo.IncrementalSkipIndexing() {
653 return nil
654 }
655
656 builder, err := build.NewBuilder(*bo)
657 if err != nil {
658 return err
659 }
660 return builder.Finish()
661}
662
663// addDebugHandlers adds handlers specific to indexserver.
664func (s *Server) addDebugHandlers(mux *http.ServeMux) {
665 // Sourcegraph's site admin view requires indexserver to serve it's admin view
666 // on "/".
667 mux.Handle("/", http.HandlerFunc(s.handleRoot))
668
669 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
670 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
671 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
672 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
673 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
674 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
675}
676
677func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
678 if r.Method != "GET" {
679 w.Header().Set("Allow", "GET")
680 w.WriteHeader(http.StatusMethodNotAllowed)
681 return
682 }
683
684 response := struct {
685 Hostname string
686 }{
687 Hostname: s.hostname,
688 }
689
690 b, err := json.Marshal(response)
691 if err != nil {
692 http.Error(w, err.Error(), http.StatusInternalServerError)
693 return
694 }
695
696 w.Header().Set("Content-Type", "application/json; charset=utf-8")
697 w.Write(b)
698}
699
700var rootTmpl = template.Must(template.New("name").Parse(`
701<html>
702 <body>
703 <a href="debug">Debug</a><br />
704 <a href="debug/requests">Traces</a><br />
705 {{.IndexMsg}}<br />
706 <br />
707 <h3>Reindex</h3>
708 {{if .Repos}}
709 <a href="?show_repos=false">hide repos</a><br />
710 <table style="margin-top: 20px">
711 <th style="text-align:left">Name</th>
712 <th style="text-align:left">ID</th>
713 {{range .Repos}}
714 <tr>
715 <td>{{.Name}}</td>
716 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
717 </tr>
718 {{end}}
719 </table>
720 {{else}}
721 <a href="?show_repos=true">show repos</a><br />
722 {{end}}
723 </body>
724</html>
725`))
726
727func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
728 if r.Method != "GET" {
729 w.Header().Set("Allow", "GET")
730 w.WriteHeader(http.StatusMethodNotAllowed)
731 return
732 }
733
734 values := r.URL.Query()
735
736 // ?id=
737 indexMsg := ""
738 if v := values.Get("id"); v != "" {
739 id, err := strconv.Atoi(v)
740 if err != nil {
741 http.Error(w, err.Error(), http.StatusBadRequest)
742 return
743 }
744 indexMsg, _ = s.forceIndex(uint32(id))
745 }
746
747 // ?show_repos=
748 showRepos := false
749 if v := values.Get("show_repos"); v != "" {
750 showRepos, _ = strconv.ParseBool(v)
751 }
752
753 type Repo struct {
754 ID uint32
755 Name string
756 }
757 var data struct {
758 Repos []Repo
759 IndexMsg string
760 }
761
762 data.IndexMsg = indexMsg
763
764 if showRepos {
765 s.queue.Iterate(func(opts *IndexOptions) {
766 data.Repos = append(data.Repos, Repo{
767 ID: opts.RepoID,
768 Name: opts.Name,
769 })
770 })
771 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
772 }
773
774 _ = rootTmpl.Execute(w, data)
775}
776
777// handleReindex triggers a reindex asynocronously. If a reindex was triggered
778// the request returns with status 202. The caller can infer the new state of
779// the index by calling List.
780func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
781 if r.Method != http.MethodPost {
782 w.Header().Set("Allow", http.MethodPost)
783 w.WriteHeader(http.StatusMethodNotAllowed)
784 return
785 }
786
787 err := r.ParseForm()
788 if err != nil {
789 http.Error(w, err.Error(), http.StatusBadRequest)
790 return
791 }
792
793 id, err := strconv.Atoi(r.Form.Get("repo"))
794 if err != nil {
795 http.Error(w, err.Error(), http.StatusBadRequest)
796 return
797 }
798
799 go func() { s.forceIndex(uint32(id)) }()
800
801 // 202 Accepted
802 w.WriteHeader(http.StatusAccepted)
803}
804
805func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
806 withIndexed := true
807 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
808 withIndexed = b
809 }
810
811 var indexed []uint32
812 if withIndexed {
813 indexed = listIndexed(s.IndexDir)
814 }
815
816 repos, err := s.Sourcegraph.List(r.Context(), indexed)
817 if err != nil {
818 http.Error(w, err.Error(), http.StatusInternalServerError)
819 return
820 }
821
822 bw := bytes.Buffer{}
823
824 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
825
826 _, err = fmt.Fprintf(tw, "ID\tName\n")
827 if err != nil {
828 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
829 return
830 }
831
832 s.queue.mu.Lock()
833 name := ""
834 for _, id := range repos.IDs {
835 if item := s.queue.get(id); item != nil {
836 name = item.opts.Name
837 } else {
838 name = ""
839 }
840 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
841 if err != nil {
842 debug.Printf("handleDebugList: %s\n", err.Error())
843 }
844 }
845 s.queue.mu.Unlock()
846
847 if err != nil {
848 http.Error(w, err.Error(), http.StatusInternalServerError)
849 return
850 }
851
852 err = tw.Flush()
853 if err != nil {
854 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
855 return
856 }
857
858 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
859
860 if _, err := io.Copy(w, &bw); err != nil {
861 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
862 return
863 }
864}
865
866// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
867// can run this command during periods of low usage (evenings, weekends) to
868// trigger an initial merge run. In the steady-state, merges happen rarely, even
869// on busy instances, and users can rely on automatic merging instead.
870func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
871
872 // A merge operation can take very long, depending on the number merges and the
873 // target size of the compound shards. We run the merge in the background and
874 // return immediately to the user.
875 //
876 // We track the status of the merge with metricShardMergingRunning.
877 go func() {
878 s.doMerge()
879 }()
880 _, _ = w.Write([]byte("merging enqueued\n"))
881}
882
883func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
884 indexed := listIndexed(s.IndexDir)
885
886 bw := bytes.Buffer{}
887
888 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
889
890 _, err := fmt.Fprintf(tw, "ID\tName\n")
891 if err != nil {
892 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
893 return
894 }
895
896 s.queue.mu.Lock()
897 name := ""
898 for _, id := range indexed {
899 if item := s.queue.get(id); item != nil {
900 name = item.opts.Name
901 } else {
902 name = ""
903 }
904 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
905 if err != nil {
906 debug.Printf("handleDebugIndexed: %s\n", err.Error())
907 }
908 }
909 s.queue.mu.Unlock()
910
911 if err != nil {
912 http.Error(w, err.Error(), http.StatusInternalServerError)
913 return
914 }
915
916 err = tw.Flush()
917 if err != nil {
918 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
919 return
920 }
921
922 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
923
924 if _, err := io.Copy(w, &bw); err != nil {
925 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
926 return
927 }
928}
929
930// forceIndex will run the index job for repo name now. It will return always
931// return a string explaining what it did, even if it failed.
932func (s *Server) forceIndex(id uint32) (string, error) {
933 var opts IndexOptions
934 var err error
935 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
936 opts = o
937 }, func(_ uint32, e error) {
938 err = e
939 }, id)
940 if err != nil {
941 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
942 }
943
944 args := s.indexArgs(opts)
945 args.Incremental = false // force re-index
946
947 var state indexState
948 ran := s.muIndexDir.With(opts.Name, func() {
949 state, err = s.Index(args)
950 })
951 if !ran {
952 return fmt.Sprintf("index job for repository already running: %s", args), nil
953 }
954 if err != nil {
955 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
956 }
957 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
958}
959
960func listIndexed(indexDir string) []uint32 {
961 index := getShards(indexDir)
962 metricNumIndexed.Set(float64(len(index)))
963 repoIDs := make([]uint32, 0, len(index))
964 for id := range index {
965 repoIDs = append(repoIDs, id)
966 }
967 sort.Slice(repoIDs, func(i, j int) bool {
968 return repoIDs[i] < repoIDs[j]
969 })
970 return repoIDs
971}
972
973// setupTmpDir sets up a temporary directory on the same volume as the
974// indexes.
975//
976// If main is true we will delete older temp directories left around. main is
977// false when this is a debug command.
978func setupTmpDir(main bool, index string) error {
979 // change the target tmp directory depending on if its our main daemon or a
980 // debug sub command.
981 dir := ".indexserver.debug.tmp"
982 if main {
983 dir = ".indexserver.tmp"
984 }
985
986 tmpRoot := filepath.Join(index, dir)
987 if err := os.MkdirAll(tmpRoot, 0755); err != nil {
988 return err
989 }
990 if !tmpfriend.IsTmpFriendDir(tmpRoot) {
991 _, err := tmpfriend.RootTempDir(tmpRoot)
992 return err
993 }
994 return nil
995}
996
997func printMetaData(fn string) error {
998 repo, indexMeta, err := zoekt.ReadMetadataPath(fn)
999 if err != nil {
1000 return err
1001 }
1002
1003 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1004 if err != nil {
1005 return err
1006 }
1007
1008 err = json.NewEncoder(os.Stdout).Encode(repo)
1009 if err != nil {
1010 return err
1011 }
1012 return nil
1013}
1014
1015func printShardStats(fn string) error {
1016 f, err := os.Open(fn)
1017 if err != nil {
1018 return err
1019 }
1020
1021 iFile, err := zoekt.NewIndexFile(f)
1022 if err != nil {
1023 return err
1024 }
1025
1026 return zoekt.PrintNgramStats(iFile)
1027}
1028
1029func srcLogLevelIsDebug() bool {
1030 lvl := os.Getenv(sglog.EnvLogLevel)
1031 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1032}
1033
1034func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1035 v := os.Getenv(k)
1036 if v == "" {
1037 return defaultVal
1038 }
1039 i, err := strconv.ParseInt(v, 10, 64)
1040 if err != nil {
1041 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1042 }
1043 return i
1044}
1045
1046func getEnvWithDefaultInt(k string, defaultVal int) int {
1047 v := os.Getenv(k)
1048 if v == "" {
1049 return defaultVal
1050 }
1051 i, err := strconv.Atoi(k)
1052 if err != nil {
1053 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1054 }
1055 return i
1056}
1057
1058func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1059 v := os.Getenv(k)
1060 if v == "" {
1061 return defaultVal
1062 }
1063 i, err := strconv.ParseUint(v, 10, 64)
1064 if err != nil {
1065 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1066 }
1067 return i
1068}
1069
1070func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1071 v := os.Getenv(k)
1072 if v == "" {
1073 return defaultVal
1074 }
1075 f, err := strconv.ParseFloat(v, 64)
1076 if err != nil {
1077 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1078 }
1079 return f
1080}
1081
1082func getEnvWithDefaultString(k string, defaultVal string) string {
1083 v := os.Getenv(k)
1084 if v == "" {
1085 return defaultVal
1086 }
1087 return v
1088}
1089
1090func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1091 v := os.Getenv(k)
1092 if v == "" {
1093 return defaultVal
1094 }
1095
1096 d, err := time.ParseDuration(v)
1097 if err != nil {
1098 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1099 }
1100 return d
1101}
1102
1103func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1104 v := os.Getenv(k)
1105 if v == "" {
1106 return defaultVal
1107 }
1108
1109 b, err := strconv.ParseBool(v)
1110 if err != nil {
1111 log.Fatalf("error parsing ENV %s to bool: %s", k, err)
1112 }
1113 return b
1114}
1115
1116func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1117 set := map[string]struct{}{}
1118 for _, v := range strings.Split(os.Getenv(k), ",") {
1119 v = strings.TrimSpace(v)
1120 if v != "" {
1121 set[v] = struct{}{}
1122 }
1123 }
1124 return set
1125}
1126
1127func joinStringSet(set map[string]struct{}, sep string) string {
1128 var xs []string
1129 for x := range set {
1130 xs = append(xs, x)
1131 }
1132
1133 return strings.Join(xs, sep)
1134}
1135
1136func setCompoundShardCounter(indexDir string) {
1137 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt"))
1138 if err != nil {
1139 log.Printf("setCompoundShardCounter: %s\n", err)
1140 return
1141 }
1142 metricNumberCompoundShards.Set(float64(len(fns)))
1143}
1144
1145func rootCmd() *ffcli.Command {
1146 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1147 conf := rootConfig{
1148 Main: true,
1149 }
1150 conf.registerRootFlags(rootFs)
1151
1152 return &ffcli.Command{
1153 FlagSet: rootFs,
1154 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1155 Subcommands: []*ffcli.Command{debugCmd()},
1156 Exec: func(ctx context.Context, args []string) error {
1157 return startServer(conf)
1158 },
1159 }
1160}
1161
1162type rootConfig struct {
1163 // Main is true if this rootConfig is for our main long running command (the
1164 // indexserver). Debug commands should not set this value. This is used to
1165 // determine if we need to run tmpfriend.
1166 Main bool
1167
1168 root string
1169 interval time.Duration
1170 index string
1171 indexConcurrency int64
1172 listen string
1173 hostname string
1174 cpuFraction float64
1175 blockProfileRate int
1176
1177 // config values related to shard merging
1178 vacuumInterval time.Duration
1179 mergeInterval time.Duration
1180 targetSize int64
1181 minSize int64
1182 minAgeDays int
1183 maxPriority float64
1184
1185 // config values related to backoff indexing repos with one or more consecutive failures
1186 backoffDuration time.Duration
1187 maxBackoffDuration time.Duration
1188
1189 // useGRPC is true if we should use the gRPC API to talk to Sourcegraph.
1190 useGRPC bool
1191}
1192
1193func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1194 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1195 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1196 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of concurrent index jobs to run.")
1197 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use")
1198 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1199 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1200 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1201 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate")
1202 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1203 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1204 fs.BoolVar(&rc.useGRPC, "use_grpc", getEnvWithDefaultBool("GRPC_ENABLED", false), "use the gRPC API to talk to Sourcegraph")
1205
1206 // flags related to shard merging
1207 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1208 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1209 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB")
1210 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB")
1211 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1212 fs.Float64Var(&rc.maxPriority, "merge_max_priority", getEnvWithDefaultFloat64("SRC_MERGE_MAX_PRIORITY", 100), "the maximum priority a shard can have to be considered for merging.")
1213}
1214
1215func startServer(conf rootConfig) error {
1216 s, err := newServer(conf)
1217 if err != nil {
1218 return err
1219 }
1220
1221 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate)
1222 setCompoundShardCounter(s.IndexDir)
1223
1224 if conf.listen != "" {
1225
1226 mux := http.NewServeMux()
1227 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1228 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1229 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1230 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1231 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1232 }...)
1233 s.addDebugHandlers(mux)
1234
1235 go func() {
1236 debug.Printf("serving HTTP on %s", conf.listen)
1237 log.Fatal(http.ListenAndServe(conf.listen, mux))
1238
1239 }()
1240
1241 // Serve mux on a unix domain socket on a best-effort-basis so that
1242 // webserver can call the endpoints via the shared filesystem.
1243 //
1244 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1245 // on the socket due to permission errors. See
1246 // https://github.com/docker/for-mac/issues/6239
1247 go func() {
1248 serveHTTPOverSocket := func() error {
1249 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1250 // We cannot bind a socket to an existing pathname.
1251 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1252 return fmt.Errorf("error removing socket file: %s", socket)
1253 }
1254 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1255 // unixpacket).
1256 l, err := net.Listen("unix", socket)
1257 if err != nil {
1258 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1259 }
1260 // Indexserver (root) and webserver (Sourcegraph) run with
1261 // different users. Per default, the socket is created with
1262 // permission 755 (root root), which doesn't let webserver write to
1263 // it.
1264 //
1265 // See https://github.com/golang/go/issues/11822 for more context.
1266 if err := os.Chmod(socket, 0777); err != nil {
1267 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1268 }
1269 debug.Printf("serving HTTP on %s", socket)
1270 return http.Serve(l, mux)
1271 }
1272 debug.Print(serveHTTPOverSocket())
1273 }()
1274 }
1275
1276 oc := &ownerChecker{
1277 Path: filepath.Join(conf.index, "owner.txt"),
1278 Hostname: conf.hostname,
1279 }
1280 go oc.Run()
1281
1282 logger := sglog.Scoped("metricsRegistration", "")
1283 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1284
1285 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1286 prometheus.DefaultRegisterer.MustRegister(c)
1287
1288 s.Run()
1289 return nil
1290}
1291
1292func newServer(conf rootConfig) (*Server, error) {
1293 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1294 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1295 }
1296 if conf.index == "" {
1297 return nil, fmt.Errorf("must set -index")
1298 }
1299 if conf.root == "" {
1300 return nil, fmt.Errorf("must set -sourcegraph_url")
1301 }
1302 rootURL, err := url.Parse(conf.root)
1303 if err != nil {
1304 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1305 }
1306
1307 rootURL = addDefaultPort(rootURL)
1308
1309 // Tune GOMAXPROCS to match Linux container CPU quota.
1310 _, _ = maxprocs.Set()
1311
1312 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler.
1313 // The block profiler is disabled by default and should be enabled with care in production
1314 runtime.SetBlockProfileRate(conf.blockProfileRate)
1315
1316 // Automatically prepend our own path at the front, to minimize
1317 // required configuration.
1318 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1319 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1320 }
1321
1322 if _, err := os.Stat(conf.index); err != nil {
1323 if err := os.MkdirAll(conf.index, 0755); err != nil {
1324 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1325 }
1326 }
1327
1328 if err := setupTmpDir(conf.Main, conf.index); err != nil {
1329 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1330 }
1331
1332 if srcLogLevelIsDebug() {
1333 debug = log.New(os.Stderr, "", log.LstdFlags)
1334 }
1335
1336 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1337 if len(reposWithSeparateIndexingMetrics) > 0 {
1338 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1339 }
1340
1341 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1342 if len(deltaBuildRepositoriesAllowList) > 0 {
1343 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1344 }
1345
1346 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1347 if deltaShardNumberFallbackThreshold > 0 {
1348 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1349 } else {
1350 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1351 }
1352
1353 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1354 if len(reposShouldSkipSymbolsCalculation) > 0 {
1355 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1356 }
1357
1358 var sg Sourcegraph
1359 if rootURL.IsAbs() {
1360 var batchSize int
1361 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1362 batchSize, err = strconv.Atoi(v)
1363 if err != nil {
1364 return nil, fmt.Errorf("Invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1365 }
1366 }
1367
1368 opts := []SourcegraphClientOption{
1369 WithBatchSize(batchSize),
1370 WithShouldUseGRPC(conf.useGRPC),
1371 }
1372
1373 gRPCConnectionOptions := []grpc.DialOption{
1374 grpc.WithTransportCredentials(insecure.NewCredentials()),
1375 grpc.WithChainStreamInterceptor(internalActorStreamInterceptor()),
1376 grpc.WithChainUnaryInterceptor(internalActorUnaryInterceptor()),
1377 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1378 }
1379
1380 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1381 // This is done lazily, so we can provide the client to use regardless of
1382 // whether we enabled gRPC or not initially.
1383 cc, err := grpc.Dial(rootURL.Host, gRPCConnectionOptions...)
1384 if err != nil {
1385 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1386 }
1387
1388 client := proto.NewZoektConfigurationServiceClient(cc)
1389 opts = append(opts, WithGRPCClient(client))
1390
1391 sg = newSourcegraphClient(rootURL, conf.hostname, opts...)
1392
1393 } else {
1394 sg = sourcegraphFake{
1395 RootDir: rootURL.String(),
1396 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1397 }
1398 }
1399
1400 if conf.indexConcurrency < 1 {
1401 conf.indexConcurrency = 1
1402 }
1403
1404 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
1405 if cpuCount < 1 {
1406 cpuCount = 1
1407 }
1408
1409 logger := sglog.Scoped("server", "periodically reindexes enabled repositories on sourcegraph")
1410
1411 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1412
1413 return &Server{
1414 logger: logger,
1415 Sourcegraph: sg,
1416 IndexDir: conf.index,
1417 IndexConcurrency: int(conf.indexConcurrency),
1418 Interval: conf.interval,
1419 CPUCount: cpuCount,
1420 queue: *q,
1421 shardMerging: zoekt.ShardMergingEnabled(),
1422 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1423 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1424 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1425 hostname: conf.hostname,
1426 mergeOpts: mergeOpts{
1427 vacuumInterval: conf.vacuumInterval,
1428 mergeInterval: conf.mergeInterval,
1429 targetSizeBytes: conf.targetSize * 1024 * 1024,
1430 minSizeBytes: conf.minSize * 1024 * 1024,
1431 minAgeDays: conf.minAgeDays,
1432 maxPriority: conf.maxPriority,
1433 },
1434 }, err
1435}
1436
1437// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1438// for the indexed-search-configuration gRPC service.
1439//
1440// The default backoff strategy is modeled after the default settings used by
1441// retryablehttp.DefaultClient.
1442//
1443// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1444// - Unavailable
1445// - Aborted
1446//
1447//go:embed default_grpc_service_configuration.json
1448var defaultGRPCServiceConfigurationJSON string
1449
1450func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1451 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1452 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1453 return invoker(ctx, method, req, reply, cc, opts...)
1454 }
1455}
1456
1457func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1458 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1459 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1460 return streamer(ctx, desc, cc, method, opts...)
1461 }
1462}
1463
1464// addDefaultPort adds a default port to a URL if one is not specified.
1465//
1466// If the URL scheme is "http" and no port is specified, "80" is used.
1467// If the scheme is "https", "443" is used.
1468//
1469// The original URL is not mutated. A copy is modified and returned.
1470func addDefaultPort(original *url.URL) *url.URL {
1471 if original == nil {
1472 return nil // don't panic
1473 }
1474
1475 if !original.IsAbs() {
1476 return original // don't do anything if the URL doesn't look like a remote URL
1477 }
1478
1479 if original.Scheme == "http" && original.Port() == "" {
1480 u := cloneURL(original)
1481 u.Host = net.JoinHostPort(u.Host, "80")
1482 return u
1483 }
1484
1485 if original.Scheme == "https" && original.Port() == "" {
1486 u := cloneURL(original)
1487 u.Host = net.JoinHostPort(u.Host, "443")
1488 return u
1489 }
1490
1491 return original
1492}
1493
1494// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1495// This is copied from net/http/clone.go
1496func cloneURL(u *url.URL) *url.URL {
1497 if u == nil {
1498 return nil
1499 }
1500 u2 := new(url.URL)
1501 *u2 = *u
1502 if u.User != nil {
1503 u2.User = new(url.Userinfo)
1504 *u2.User = *u.User
1505 }
1506 return u2
1507}
1508
1509func main() {
1510 liblog := sglog.Init(sglog.Resource{
1511 Name: "zoekt-indexserver",
1512 Version: zoekt.Version,
1513 InstanceID: zoekt.HostnameBestEffort(),
1514 })
1515 defer liblog.Sync()
1516
1517 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1518 log.Fatal(err)
1519 }
1520}