fork of https://github.com/sourcegraph/zoekt
1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled
2// repositories on sourcegraph
3package main
4
5import (
6 "bytes"
7 "context"
8 _ "embed"
9 "encoding/json"
10 "errors"
11 "flag"
12 "fmt"
13 "html/template"
14 "io"
15 "io/fs"
16 "log"
17 "math"
18 "math/rand"
19 "net"
20 "net/http"
21 "net/url"
22 "os"
23 "os/exec"
24 "os/signal"
25 "path/filepath"
26 "runtime"
27 "sort"
28 "strconv"
29 "strings"
30 "sync"
31 "text/tabwriter"
32 "time"
33
34 "github.com/keegancsmith/tmpfriend"
35 "github.com/peterbourgon/ff/v3/ffcli"
36 "github.com/prometheus/client_golang/prometheus"
37 "github.com/prometheus/client_golang/prometheus/promauto"
38 sglog "github.com/sourcegraph/log"
39 "go.uber.org/automaxprocs/maxprocs"
40 "golang.org/x/net/trace"
41 "golang.org/x/sys/unix"
42 "google.golang.org/grpc"
43 "google.golang.org/grpc/credentials/insecure"
44 "google.golang.org/grpc/metadata"
45
46 "github.com/sourcegraph/mountinfo"
47
48 "github.com/sourcegraph/zoekt"
49 "github.com/sourcegraph/zoekt/build"
50 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
51 "github.com/sourcegraph/zoekt/debugserver"
52 "github.com/sourcegraph/zoekt/internal/profiler"
53)
54
55var (
56 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
57 Name: "resolve_revisions_seconds",
58 Help: "A histogram of latencies for resolving all repository revisions.",
59 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
60 })
61
62 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
63 Name: "resolve_revision_seconds",
64 Help: "A histogram of latencies for resolving a repository revision.",
65 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
66 }, []string{"success"}) // success=true|false
67
68 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
69 Name: "get_index_options_total",
70 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
71 })
72 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
73 Name: "get_index_options_error_total",
74 Help: "The total number of times we failed to get index options for a repository.",
75 })
76
77 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
78 Name: "index_repo_seconds",
79 Help: "A histogram of latencies for indexing a repository.",
80 Buckets: prometheus.ExponentialBucketsRange(
81 (100 * time.Millisecond).Seconds(),
82 (40*time.Minute + indexTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
83 20),
84 }, []string{
85 "state", // state is an indexState
86 "name", // name of the repository that was indexed
87 })
88
89 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
90 Name: "index_fetch_seconds",
91 Help: "A histogram of latencies for fetching a repository.",
92 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
93 }, []string{
94 "success", // true|false
95 "name", // the name of the repository that the commits were fetched from
96 })
97
98 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
99 Name: "index_incremental_index_state",
100 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
101 }, []string{"state"}) // state is build.IndexState
102
103 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
104 Name: "index_num_indexed",
105 Help: "Number of indexed repos by code host",
106 })
107
108 metricNumAssigned = promauto.NewGauge(prometheus.GaugeOpts{
109 Name: "index_num_assigned",
110 Help: "Number of repos assigned to this indexer by code host",
111 })
112
113 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
114 Name: "index_failing_total",
115 Help: "Counts failures to index (indexing activity, should be used with rate())",
116 })
117
118 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
119 Name: "index_indexing_total",
120 Help: "Counts indexings (indexing activity, should be used with rate())",
121 })
122
123 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
124 Name: "index_num_stopped_tracking_total",
125 Help: "Counts the number of repos we stopped tracking.",
126 })
127)
128
129// set of repositories that we want to capture separate indexing metrics for
130var reposWithSeparateIndexingMetrics = make(map[string]struct{})
131
132type indexState string
133
134const (
135 indexStateFail indexState = "fail"
136 indexStateSuccess indexState = "success"
137 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
138 indexStateNoop indexState = "noop" // We didn't need to update index
139 indexStateEmpty indexState = "empty" // index is empty (empty repo)
140)
141
142// Server is the main functionality of zoekt-sourcegraph-indexserver. It
143// exists to conveniently use all the options passed in via func main.
144type Server struct {
145 logger sglog.Logger
146
147 Sourcegraph Sourcegraph
148 BatchSize int
149
150 // IndexDir is the index directory to use.
151 IndexDir string
152
153 // IndexConcurrency is the number of repositories we index at once.
154 IndexConcurrency int
155
156 // Interval is how often we sync with Sourcegraph.
157 Interval time.Duration
158 // CPUCount is the amount of parallelism to use when indexing a
159 // repository.
160 CPUCount int
161
162 queue Queue
163
164 // muIndexDir protects the index directory from concurrent access.
165 muIndexDir indexMutex
166
167 // If true, shard merging is enabled.
168 shardMerging bool
169
170 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
171 // use delta-builds for instead of normal builds
172 deltaBuildRepositoriesAllowList map[string]struct{}
173
174 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
175 // before attempting a delta build.
176 deltaShardNumberFallbackThreshold uint64
177
178 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
179 // we skip calculating symbols metadata for during builds
180 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
181
182 hostname string
183
184 mergeOpts mergeOpts
185}
186
187var debug = log.New(io.Discard, "", log.LstdFlags)
188
189// our index commands should output something every 100mb they process.
190//
191// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
192// time." famous last words. A client was indexing a monorepo with 42
193// cores... 5m was not enough.
194const noOutputTimeout = 30 * time.Minute
195
196func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
197 out := &synchronizedBuffer{}
198 cmd.Stdout = out
199 cmd.Stderr = out
200
201 tr.LazyPrintf("%s", cmd.Args)
202
203 defer func() {
204 if err != nil {
205 outS := out.String()
206 tr.LazyPrintf("failed: %v", err)
207 tr.LazyPrintf("output: %s", out)
208 tr.SetError()
209 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
210 }
211 }()
212
213 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
214
215 if err := cmd.Start(); err != nil {
216 return err
217 }
218
219 errC := make(chan error)
220 go func() {
221 errC <- cmd.Wait()
222 }()
223
224 // This channel is set after we have sent sigquit. It allows us to follow up
225 // with a sigkill if the process doesn't quit after sigquit.
226 kill := make(<-chan time.Time)
227
228 lastLen := 0
229 for {
230 select {
231 case <-time.After(noOutputTimeout):
232 // Periodically check if we have had output. If not kill the process.
233 if out.Len() != lastLen {
234 lastLen = out.Len()
235 log.Printf("still running %s", cmd.Args)
236 } else {
237 // Send quit (C-\) first so we get a stack dump.
238 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
239 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
240 log.Println("quit failed:", err)
241 }
242
243 // send sigkill if still running in 10s
244 kill = time.After(10 * time.Second)
245 }
246
247 case <-kill:
248 log.Printf("still running, killing %s", cmd.Args)
249 if err := cmd.Process.Kill(); err != nil {
250 log.Println("kill failed:", err)
251 }
252
253 case err := <-errC:
254 if err != nil {
255 return err
256 }
257
258 tr.LazyPrintf("success")
259 return nil
260 }
261 }
262}
263
264// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
265// monitor the buffer while it is being written to.
266type synchronizedBuffer struct {
267 mu sync.Mutex
268 b bytes.Buffer
269}
270
271func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
272 sb.mu.Lock()
273 defer sb.mu.Unlock()
274 return sb.b.Write(p)
275}
276
277func (sb *synchronizedBuffer) Len() int {
278 sb.mu.Lock()
279 defer sb.mu.Unlock()
280 return sb.b.Len()
281}
282
283func (sb *synchronizedBuffer) String() string {
284 sb.mu.Lock()
285 defer sb.mu.Unlock()
286 return sb.b.String()
287}
288
289// pauseFileName if present in IndexDir will stop index jobs from
290// running. This is to make it possible to experiment with the content of the
291// IndexDir without the indexserver writing to it.
292const pauseFileName = "PAUSE"
293
294// Run the sync loop. This blocks forever.
295func (s *Server) Run() {
296 removeIncompleteShards(s.IndexDir)
297
298 // Start a goroutine which updates the queue with commits to index.
299 go func() {
300 // We update the list of indexed repos every Interval. To speed up manual
301 // testing we also listen for SIGUSR1 to trigger updates.
302 //
303 // "pkill -SIGUSR1 zoekt-sourcegra"
304 for range jitterTicker(s.Interval, unix.SIGUSR1) {
305 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
306 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
307 continue
308 }
309
310 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
311 if err != nil {
312 log.Printf("error listing repos: %s", err)
313 continue
314 }
315
316 debug.Printf("updating index queue with %d repositories", len(repos.IDs))
317
318 // Stop indexing repos we don't need to track anymore
319 removed := s.queue.MaybeRemoveMissing(repos.IDs)
320 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
321 if len(removed) > 0 {
322 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
323 }
324
325 cleanupDone := make(chan struct{})
326 go func() {
327 defer close(cleanupDone)
328 s.muIndexDir.Global(func() {
329 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
330 })
331 }()
332
333 repos.IterateIndexOptions(s.queue.AddOrUpdate)
334
335 // IterateIndexOptions will only iterate over repositories that have
336 // changed since we last called list. However, we want to add all IDs
337 // back onto the queue just to check that what is on disk is still
338 // correct. This will use the last IndexOptions we stored in the
339 // queue. The repositories not on the queue (missing) need a forced
340 // fetch of IndexOptions.
341 missing := s.queue.Bump(repos.IDs)
342 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
343
344 setCompoundShardCounter(s.IndexDir)
345
346 <-cleanupDone
347 }
348 }()
349
350 go func() {
351 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
352 if s.shardMerging {
353 s.vacuum()
354 }
355 }
356 }()
357
358 go func() {
359 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
360 if s.shardMerging {
361 s.doMerge()
362 }
363 }
364 }()
365
366 for i := 0; i < s.IndexConcurrency; i++ {
367 go s.processQueue()
368 }
369
370 // block forever
371 select {}
372}
373
374// formatList returns a comma-separated list of the first min(len(v), m) items.
375func formatListUint32(v []uint32, m int) string {
376 if len(v) < m {
377 m = len(v)
378 }
379
380 sb := strings.Builder{}
381 for i := 0; i < m; i++ {
382 fmt.Fprintf(&sb, "%d, ", v[i])
383 }
384
385 if len(v) > m {
386 sb.WriteString("...")
387 }
388
389 return strings.TrimRight(sb.String(), ", ")
390}
391
392func (s *Server) processQueue() {
393 for {
394 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
395 time.Sleep(time.Second)
396 continue
397 }
398
399 opts, ok := s.queue.Pop()
400 if !ok {
401 time.Sleep(time.Second)
402 continue
403 }
404
405 args := s.indexArgs(opts)
406
407 ran := s.muIndexDir.With(opts.Name, func() {
408 // only record time taken once we hold the lock. This avoids us
409 // recording time taken while merging/cleanup runs.
410 start := time.Now()
411
412 state, err := s.Index(args)
413
414 elapsed := time.Since(start)
415
416 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
417
418 if err != nil {
419 log.Printf("error indexing %s: %s", args.String(), err)
420 }
421
422 switch state {
423 case indexStateSuccess:
424 var branches []string
425 for _, b := range args.Branches {
426 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
427 }
428 s.logger.Info("updated index",
429 sglog.String("repo", args.Name),
430 sglog.Uint32("id", args.RepoID),
431 sglog.Strings("branches", branches),
432 sglog.Duration("duration", elapsed),
433 )
434 case indexStateSuccessMeta:
435 log.Printf("updated meta %s in %v", args.String(), elapsed)
436 }
437 s.queue.SetIndexed(opts, state)
438 })
439
440 if !ran {
441 // Someone else is processing the repository. We can just skip this job
442 // since the repository will be added back to the queue and we will
443 // converge to the correct behaviour.
444 debug.Printf("index job for repository already running: %s", args)
445 continue
446 }
447 }
448}
449
450// repoNameForMetric returns a normalized version of the given repository name that is
451// suitable for use with Prometheus metrics.
452func repoNameForMetric(repo string) string {
453 // Check to see if we want to be able to capture separate indexing metrics for this repository.
454 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
455 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
456 return repo
457 }
458
459 return ""
460}
461
462func batched(slice []uint32, size int) <-chan []uint32 {
463 c := make(chan []uint32)
464 go func() {
465 for len(slice) > 0 {
466 if size > len(slice) {
467 size = len(slice)
468 }
469 c <- slice[:size]
470 slice = slice[size:]
471 }
472 close(c)
473 }()
474 return c
475}
476
477// jitterTicker returns a ticker which ticks with a jitter. Each tick is
478// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
479//
480// sig is a list of signals which also cause the ticker to fire. This is a
481// convenience to allow manually triggering of the ticker.
482func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
483 ticker := make(chan struct{})
484
485 go func() {
486 for {
487 ticker <- struct{}{}
488 ns := int64(d)
489 jitter := rand.Int63n(ns)
490 time.Sleep(time.Duration(ns/2 + jitter))
491 }
492 }()
493
494 go func() {
495 if len(sig) == 0 {
496 return
497 }
498
499 c := make(chan os.Signal, 1)
500 signal.Notify(c, sig...)
501 for range c {
502 ticker <- struct{}{}
503 }
504 }()
505
506 return ticker
507}
508
509// Index starts an index job for repo name at commit.
510func (s *Server) Index(args *indexArgs) (state indexState, err error) {
511 tr := trace.New("index", args.Name)
512
513 defer func() {
514 if err != nil {
515 tr.SetError()
516 tr.LazyPrintf("error: %v", err)
517 state = indexStateFail
518 metricFailingTotal.Inc()
519 }
520 tr.LazyPrintf("state: %s", state)
521 tr.Finish()
522 }()
523
524 tr.LazyPrintf("branches: %v", args.Branches)
525
526 if len(args.Branches) == 0 {
527 return indexStateEmpty, createEmptyShard(args)
528 }
529
530 repositoryName := args.Name
531 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
532 tr.LazyPrintf("marking this repository for delta build")
533 args.UseDelta = true
534 }
535
536 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
537
538 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
539 tr.LazyPrintf("skipping symbols calculation")
540 args.Symbols = false
541 }
542
543 reason := "forced"
544
545 if args.Incremental {
546 bo := args.BuildOptions()
547 bo.SetDefaults()
548 incrementalState, fn := bo.IndexState()
549 reason = string(incrementalState)
550 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
551
552 switch incrementalState {
553 case build.IndexStateEqual:
554 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn)
555 return indexStateNoop, nil
556
557 case build.IndexStateMeta:
558 log.Printf("updating index.meta %s", args.String())
559
560 if err := mergeMeta(bo); err != nil {
561 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
562 } else {
563 return indexStateSuccessMeta, nil
564 }
565
566 case build.IndexStateCorrupt:
567 log.Printf("falling back to full update: corrupt index: %s", args.String())
568 }
569 }
570
571 log.Printf("updating index %s reason=%s", args.String(), reason)
572
573 metricIndexingTotal.Inc()
574 c := gitIndexConfig{
575 runCmd: func(cmd *exec.Cmd) error {
576 return s.loggedRun(tr, cmd)
577 },
578
579 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, ok bool, err error) {
580 return args.BuildOptions().FindRepositoryMetadata()
581 },
582 }
583
584 err = gitIndex(c, args, s.Sourcegraph, s.logger)
585 if err != nil {
586 return indexStateFail, err
587 }
588
589 status := []indexStatus{{RepoID: args.RepoID, Branches: args.Branches}}
590 if err := s.Sourcegraph.UpdateIndexStatus(status); err != nil {
591 branches := make([]string, len(args.Branches))
592 for i, b := range args.Branches {
593 branches[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
594 }
595
596 s.logger.Error("failed to update index status",
597 sglog.String("repo", args.Name),
598 sglog.Uint32("id", args.RepoID),
599 sglog.Strings("branches", branches),
600 sglog.Error(err),
601 )
602 }
603
604 return indexStateSuccess, nil
605}
606
607func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
608 return &indexArgs{
609 IndexOptions: opts,
610
611 IndexDir: s.IndexDir,
612 Parallelism: s.CPUCount,
613
614 Incremental: true,
615
616 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
617 FileLimit: 1 << 20,
618 }
619}
620
621func createEmptyShard(args *indexArgs) error {
622 bo := args.BuildOptions()
623 bo.SetDefaults()
624 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
625
626 if args.Incremental && bo.IncrementalSkipIndexing() {
627 return nil
628 }
629
630 builder, err := build.NewBuilder(*bo)
631 if err != nil {
632 return err
633 }
634 return builder.Finish()
635}
636
637// addDebugHandlers adds handlers specific to indexserver.
638func (s *Server) addDebugHandlers(mux *http.ServeMux) {
639 // Sourcegraph's site admin view requires indexserver to serve it's admin view
640 // on "/".
641 mux.Handle("/", http.HandlerFunc(s.handleRoot))
642
643 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
644 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
645 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
646 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
647 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
648 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
649}
650
651func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
652 if r.Method != "GET" {
653 w.Header().Set("Allow", "GET")
654 w.WriteHeader(http.StatusMethodNotAllowed)
655 return
656 }
657
658 response := struct {
659 Hostname string
660 }{
661 Hostname: s.hostname,
662 }
663
664 b, err := json.Marshal(response)
665 if err != nil {
666 http.Error(w, err.Error(), http.StatusInternalServerError)
667 return
668 }
669
670 w.Header().Set("Content-Type", "application/json; charset=utf-8")
671 w.Write(b)
672}
673
674var rootTmpl = template.Must(template.New("name").Parse(`
675<html>
676 <body>
677 <a href="debug">Debug</a><br />
678 <a href="debug/requests">Traces</a><br />
679 {{.IndexMsg}}<br />
680 <br />
681 <h3>Reindex</h3>
682 {{if .Repos}}
683 <a href="?show_repos=false">hide repos</a><br />
684 <table style="margin-top: 20px">
685 <th style="text-align:left">Name</th>
686 <th style="text-align:left">ID</th>
687 {{range .Repos}}
688 <tr>
689 <td>{{.Name}}</td>
690 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
691 </tr>
692 {{end}}
693 </table>
694 {{else}}
695 <a href="?show_repos=true">show repos</a><br />
696 {{end}}
697 </body>
698</html>
699`))
700
701func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
702 if r.Method != "GET" {
703 w.Header().Set("Allow", "GET")
704 w.WriteHeader(http.StatusMethodNotAllowed)
705 return
706 }
707
708 values := r.URL.Query()
709
710 // ?id=
711 indexMsg := ""
712 if v := values.Get("id"); v != "" {
713 id, err := strconv.Atoi(v)
714 if err != nil {
715 http.Error(w, err.Error(), http.StatusBadRequest)
716 return
717 }
718 indexMsg, _ = s.forceIndex(uint32(id))
719 }
720
721 // ?show_repos=
722 showRepos := false
723 if v := values.Get("show_repos"); v != "" {
724 showRepos, _ = strconv.ParseBool(v)
725 }
726
727 type Repo struct {
728 ID uint32
729 Name string
730 }
731 var data struct {
732 Repos []Repo
733 IndexMsg string
734 }
735
736 data.IndexMsg = indexMsg
737
738 if showRepos {
739 s.queue.Iterate(func(opts *IndexOptions) {
740 data.Repos = append(data.Repos, Repo{
741 ID: opts.RepoID,
742 Name: opts.Name,
743 })
744 })
745 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
746 }
747
748 _ = rootTmpl.Execute(w, data)
749}
750
751// handleReindex triggers a reindex asynocronously. If a reindex was triggered
752// the request returns with status 202. The caller can infer the new state of
753// the index by calling List.
754func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
755 if r.Method != http.MethodPost {
756 w.Header().Set("Allow", http.MethodPost)
757 w.WriteHeader(http.StatusMethodNotAllowed)
758 return
759 }
760
761 err := r.ParseForm()
762 if err != nil {
763 http.Error(w, err.Error(), http.StatusBadRequest)
764 return
765 }
766
767 id, err := strconv.Atoi(r.Form.Get("repo"))
768 if err != nil {
769 http.Error(w, err.Error(), http.StatusBadRequest)
770 return
771 }
772
773 go func() { s.forceIndex(uint32(id)) }()
774
775 // 202 Accepted
776 w.WriteHeader(http.StatusAccepted)
777}
778
779func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
780 withIndexed := true
781 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
782 withIndexed = b
783 }
784
785 var indexed []uint32
786 if withIndexed {
787 indexed = listIndexed(s.IndexDir)
788 }
789
790 repos, err := s.Sourcegraph.List(r.Context(), indexed)
791 if err != nil {
792 http.Error(w, err.Error(), http.StatusInternalServerError)
793 return
794 }
795
796 bw := bytes.Buffer{}
797
798 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
799
800 _, err = fmt.Fprintf(tw, "ID\tName\n")
801 if err != nil {
802 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
803 return
804 }
805
806 s.queue.mu.Lock()
807 name := ""
808 for _, id := range repos.IDs {
809 if item := s.queue.get(id); item != nil {
810 name = item.opts.Name
811 } else {
812 name = ""
813 }
814 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
815 if err != nil {
816 debug.Printf("handleDebugList: %s\n", err.Error())
817 }
818 }
819 s.queue.mu.Unlock()
820
821 if err != nil {
822 http.Error(w, err.Error(), http.StatusInternalServerError)
823 return
824 }
825
826 err = tw.Flush()
827 if err != nil {
828 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
829 return
830 }
831
832 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
833
834 if _, err := io.Copy(w, &bw); err != nil {
835 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
836 return
837 }
838}
839
840// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
841// can run this command during periods of low usage (evenings, weekends) to
842// trigger an initial merge run. In the steady-state, merges happen rarely, even
843// on busy instances, and users can rely on automatic merging instead.
844func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
845
846 // A merge operation can take very long, depending on the number merges and the
847 // target size of the compound shards. We run the merge in the background and
848 // return immediately to the user.
849 //
850 // We track the status of the merge with metricShardMergingRunning.
851 go func() {
852 s.doMerge()
853 }()
854 _, _ = w.Write([]byte("merging enqueued\n"))
855}
856
857func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
858 indexed := listIndexed(s.IndexDir)
859
860 bw := bytes.Buffer{}
861
862 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
863
864 _, err := fmt.Fprintf(tw, "ID\tName\n")
865 if err != nil {
866 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
867 return
868 }
869
870 s.queue.mu.Lock()
871 name := ""
872 for _, id := range indexed {
873 if item := s.queue.get(id); item != nil {
874 name = item.opts.Name
875 } else {
876 name = ""
877 }
878 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
879 if err != nil {
880 debug.Printf("handleDebugIndexed: %s\n", err.Error())
881 }
882 }
883 s.queue.mu.Unlock()
884
885 if err != nil {
886 http.Error(w, err.Error(), http.StatusInternalServerError)
887 return
888 }
889
890 err = tw.Flush()
891 if err != nil {
892 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
893 return
894 }
895
896 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
897
898 if _, err := io.Copy(w, &bw); err != nil {
899 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
900 return
901 }
902}
903
904// forceIndex will run the index job for repo name now. It will return always
905// return a string explaining what it did, even if it failed.
906func (s *Server) forceIndex(id uint32) (string, error) {
907 var opts IndexOptions
908 var err error
909 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
910 opts = o
911 }, func(_ uint32, e error) {
912 err = e
913 }, id)
914 if err != nil {
915 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
916 }
917
918 args := s.indexArgs(opts)
919 args.Incremental = false // force re-index
920
921 var state indexState
922 ran := s.muIndexDir.With(opts.Name, func() {
923 state, err = s.Index(args)
924 })
925 if !ran {
926 return fmt.Sprintf("index job for repository already running: %s", args), nil
927 }
928 if err != nil {
929 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
930 }
931 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
932}
933
934func listIndexed(indexDir string) []uint32 {
935 index := getShards(indexDir)
936 metricNumIndexed.Set(float64(len(index)))
937 repoIDs := make([]uint32, 0, len(index))
938 for id := range index {
939 repoIDs = append(repoIDs, id)
940 }
941 sort.Slice(repoIDs, func(i, j int) bool {
942 return repoIDs[i] < repoIDs[j]
943 })
944 return repoIDs
945}
946
947func hostnameBestEffort() string {
948 if h := os.Getenv("NODE_NAME"); h != "" {
949 return h
950 }
951 if h := os.Getenv("HOSTNAME"); h != "" {
952 return h
953 }
954 hostname, _ := os.Hostname()
955 return hostname
956}
957
958// setupTmpDir sets up a temporary directory on the same volume as the
959// indexes.
960//
961// If main is true we will delete older temp directories left around. main is
962// false when this is a debug command.
963func setupTmpDir(main bool, index string) error {
964 // change the target tmp directory depending on if its our main daemon or a
965 // debug sub command.
966 dir := ".indexserver.debug.tmp"
967 if main {
968 dir = ".indexserver.tmp"
969 }
970
971 tmpRoot := filepath.Join(index, dir)
972 if err := os.MkdirAll(tmpRoot, 0755); err != nil {
973 return err
974 }
975 if !tmpfriend.IsTmpFriendDir(tmpRoot) {
976 _, err := tmpfriend.RootTempDir(tmpRoot)
977 return err
978 }
979 return nil
980}
981
982func printMetaData(fn string) error {
983 repo, indexMeta, err := zoekt.ReadMetadataPath(fn)
984 if err != nil {
985 return err
986 }
987
988 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
989 if err != nil {
990 return err
991 }
992
993 err = json.NewEncoder(os.Stdout).Encode(repo)
994 if err != nil {
995 return err
996 }
997 return nil
998}
999
1000func printShardStats(fn string) error {
1001 f, err := os.Open(fn)
1002 if err != nil {
1003 return err
1004 }
1005
1006 iFile, err := zoekt.NewIndexFile(f)
1007 if err != nil {
1008 return err
1009 }
1010
1011 return zoekt.PrintNgramStats(iFile)
1012}
1013
1014func srcLogLevelIsDebug() bool {
1015 lvl := os.Getenv(sglog.EnvLogLevel)
1016 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1017}
1018
1019func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1020 v := os.Getenv(k)
1021 if v == "" {
1022 return defaultVal
1023 }
1024 i, err := strconv.ParseInt(v, 10, 64)
1025 if err != nil {
1026 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1027 }
1028 return i
1029}
1030
1031func getEnvWithDefaultInt(k string, defaultVal int) int {
1032 v := os.Getenv(k)
1033 if v == "" {
1034 return defaultVal
1035 }
1036 i, err := strconv.Atoi(k)
1037 if err != nil {
1038 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1039 }
1040 return i
1041}
1042
1043func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1044 v := os.Getenv(k)
1045 if v == "" {
1046 return defaultVal
1047 }
1048 i, err := strconv.ParseUint(v, 10, 64)
1049 if err != nil {
1050 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1051 }
1052 return i
1053}
1054
1055func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1056 v := os.Getenv(k)
1057 if v == "" {
1058 return defaultVal
1059 }
1060 f, err := strconv.ParseFloat(v, 64)
1061 if err != nil {
1062 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1063 }
1064 return f
1065}
1066
1067func getEnvWithDefaultString(k string, defaultVal string) string {
1068 v := os.Getenv(k)
1069 if v == "" {
1070 return defaultVal
1071 }
1072 return v
1073}
1074
1075func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1076 v := os.Getenv(k)
1077 if v == "" {
1078 return defaultVal
1079 }
1080
1081 d, err := time.ParseDuration(v)
1082 if err != nil {
1083 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1084 }
1085 return d
1086}
1087
1088func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1089 v := os.Getenv(k)
1090 if v == "" {
1091 return defaultVal
1092 }
1093
1094 b, err := strconv.ParseBool(v)
1095 if err != nil {
1096 log.Fatalf("error parsing ENV %s to bool: %s", k, err)
1097 }
1098 return b
1099}
1100
1101func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1102 set := map[string]struct{}{}
1103 for _, v := range strings.Split(os.Getenv(k), ",") {
1104 v = strings.TrimSpace(v)
1105 if v != "" {
1106 set[v] = struct{}{}
1107 }
1108 }
1109 return set
1110}
1111
1112func joinStringSet(set map[string]struct{}, sep string) string {
1113 var xs []string
1114 for x := range set {
1115 xs = append(xs, x)
1116 }
1117
1118 return strings.Join(xs, sep)
1119}
1120
1121func setCompoundShardCounter(indexDir string) {
1122 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt"))
1123 if err != nil {
1124 log.Printf("setCompoundShardCounter: %s\n", err)
1125 return
1126 }
1127 metricNumberCompoundShards.Set(float64(len(fns)))
1128}
1129
1130func rootCmd() *ffcli.Command {
1131 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1132 conf := rootConfig{
1133 Main: true,
1134 }
1135 conf.registerRootFlags(rootFs)
1136
1137 return &ffcli.Command{
1138 FlagSet: rootFs,
1139 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1140 Subcommands: []*ffcli.Command{debugCmd()},
1141 Exec: func(ctx context.Context, args []string) error {
1142 return startServer(conf)
1143 },
1144 }
1145}
1146
1147type rootConfig struct {
1148 // Main is true if this rootConfig is for our main long running command (the
1149 // indexserver). Debug commands should not set this value. This is used to
1150 // determine if we need to run tmpfriend.
1151 Main bool
1152
1153 root string
1154 interval time.Duration
1155 index string
1156 indexConcurrency int64
1157 listen string
1158 hostname string
1159 cpuFraction float64
1160 blockProfileRate int
1161
1162 // config values related to shard merging
1163 vacuumInterval time.Duration
1164 mergeInterval time.Duration
1165 targetSize int64
1166 minSize int64
1167 minAgeDays int
1168 maxPriority float64
1169
1170 // config values related to backoff indexing repos with one or more consecutive failures
1171 backoffDuration time.Duration
1172 maxBackoffDuration time.Duration
1173
1174 // useGRPC is true if we should use the gRPC API to talk to Sourcegraph.
1175 useGRPC bool
1176}
1177
1178func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1179 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1180 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1181 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of concurrent index jobs to run.")
1182 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use")
1183 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1184 fs.StringVar(&rc.hostname, "hostname", hostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1185 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1186 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate")
1187 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1188 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1189 fs.BoolVar(&rc.useGRPC, "use_grpc", getEnvWithDefaultBool("GRPC_ENABLED", false), "use the gRPC API to talk to Sourcegraph")
1190
1191 // flags related to shard merging
1192 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1193 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1194 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB")
1195 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB")
1196 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1197 fs.Float64Var(&rc.maxPriority, "merge_max_priority", getEnvWithDefaultFloat64("SRC_MERGE_MAX_PRIORITY", 100), "the maximum priority a shard can have to be considered for merging.")
1198}
1199
1200func startServer(conf rootConfig) error {
1201 s, err := newServer(conf)
1202 if err != nil {
1203 return err
1204 }
1205
1206 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate)
1207 setCompoundShardCounter(s.IndexDir)
1208
1209 if conf.listen != "" {
1210
1211 mux := http.NewServeMux()
1212 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1213 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1214 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1215 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1216 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1217 }...)
1218 s.addDebugHandlers(mux)
1219
1220 go func() {
1221 debug.Printf("serving HTTP on %s", conf.listen)
1222 log.Fatal(http.ListenAndServe(conf.listen, mux))
1223
1224 }()
1225
1226 // Serve mux on a unix domain socket on a best-effort-basis so that
1227 // webserver can call the endpoints via the shared filesystem.
1228 //
1229 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1230 // on the socket due to permission errors. See
1231 // https://github.com/docker/for-mac/issues/6239
1232 go func() {
1233 serveHTTPOverSocket := func() error {
1234 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1235 // We cannot bind a socket to an existing pathname.
1236 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1237 return fmt.Errorf("error removing socket file: %s", socket)
1238 }
1239 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1240 // unixpacket).
1241 l, err := net.Listen("unix", socket)
1242 if err != nil {
1243 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1244 }
1245 // Indexserver (root) and webserver (Sourcegraph) run with
1246 // different users. Per default, the socket is created with
1247 // permission 755 (root root), which doesn't let webserver write to
1248 // it.
1249 //
1250 // See https://github.com/golang/go/issues/11822 for more context.
1251 if err := os.Chmod(socket, 0777); err != nil {
1252 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1253 }
1254 debug.Printf("serving HTTP on %s", socket)
1255 return http.Serve(l, mux)
1256 }
1257 debug.Print(serveHTTPOverSocket())
1258 }()
1259 }
1260
1261 oc := &ownerChecker{
1262 Path: filepath.Join(conf.index, "owner.txt"),
1263 Hostname: conf.hostname,
1264 }
1265 go oc.Run()
1266
1267 logger := sglog.Scoped("metricsRegistration", "")
1268 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1269
1270 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1271 prometheus.DefaultRegisterer.MustRegister(c)
1272
1273 s.Run()
1274 return nil
1275}
1276
1277func newServer(conf rootConfig) (*Server, error) {
1278 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1279 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1280 }
1281 if conf.index == "" {
1282 return nil, fmt.Errorf("must set -index")
1283 }
1284 if conf.root == "" {
1285 return nil, fmt.Errorf("must set -sourcegraph_url")
1286 }
1287 rootURL, err := url.Parse(conf.root)
1288 if err != nil {
1289 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1290 }
1291
1292 rootURL = addDefaultPort(rootURL)
1293
1294 // Tune GOMAXPROCS to match Linux container CPU quota.
1295 _, _ = maxprocs.Set()
1296
1297 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler.
1298 // The block profiler is disabled by default and should be enabled with care in production
1299 runtime.SetBlockProfileRate(conf.blockProfileRate)
1300
1301 // Automatically prepend our own path at the front, to minimize
1302 // required configuration.
1303 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1304 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1305 }
1306
1307 if _, err := os.Stat(conf.index); err != nil {
1308 if err := os.MkdirAll(conf.index, 0755); err != nil {
1309 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1310 }
1311 }
1312
1313 if err := setupTmpDir(conf.Main, conf.index); err != nil {
1314 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1315 }
1316
1317 if srcLogLevelIsDebug() {
1318 debug = log.New(os.Stderr, "", log.LstdFlags)
1319 }
1320
1321 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1322 if len(reposWithSeparateIndexingMetrics) > 0 {
1323 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1324 }
1325
1326 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1327 if len(deltaBuildRepositoriesAllowList) > 0 {
1328 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1329 }
1330
1331 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1332 if deltaShardNumberFallbackThreshold > 0 {
1333 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1334 } else {
1335 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1336 }
1337
1338 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1339 if len(reposShouldSkipSymbolsCalculation) > 0 {
1340 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1341 }
1342
1343 var sg Sourcegraph
1344 if rootURL.IsAbs() {
1345 var batchSize int
1346 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1347 batchSize, err = strconv.Atoi(v)
1348 if err != nil {
1349 return nil, fmt.Errorf("Invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1350 }
1351 }
1352
1353 opts := []SourcegraphClientOption{
1354 WithBatchSize(batchSize),
1355 WithShouldUseGRPC(conf.useGRPC),
1356 }
1357
1358 gRPCConnectionOptions := []grpc.DialOption{
1359 grpc.WithTransportCredentials(insecure.NewCredentials()),
1360 grpc.WithChainStreamInterceptor(internalActorStreamInterceptor()),
1361 grpc.WithChainUnaryInterceptor(internalActorUnaryInterceptor()),
1362 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1363 }
1364
1365 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1366 // This is done lazily, so we can provide the client to use regardless of
1367 // whether we enabled gRPC or not initially.
1368 cc, err := grpc.Dial(rootURL.Host, gRPCConnectionOptions...)
1369 if err != nil {
1370 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1371 }
1372
1373 client := proto.NewZoektConfigurationServiceClient(cc)
1374 opts = append(opts, WithGRPCClient(client))
1375
1376 sg = newSourcegraphClient(rootURL, conf.hostname, opts...)
1377
1378 } else {
1379 sg = sourcegraphFake{
1380 RootDir: rootURL.String(),
1381 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1382 }
1383 }
1384
1385 if conf.indexConcurrency < 1 {
1386 conf.indexConcurrency = 1
1387 }
1388
1389 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
1390 if cpuCount < 1 {
1391 cpuCount = 1
1392 }
1393
1394 logger := sglog.Scoped("server", "periodically reindexes enabled repositories on sourcegraph")
1395
1396 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1397
1398 return &Server{
1399 logger: logger,
1400 Sourcegraph: sg,
1401 IndexDir: conf.index,
1402 IndexConcurrency: int(conf.indexConcurrency),
1403 Interval: conf.interval,
1404 CPUCount: cpuCount,
1405 queue: *q,
1406 shardMerging: zoekt.ShardMergingEnabled(),
1407 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1408 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1409 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1410 hostname: conf.hostname,
1411 mergeOpts: mergeOpts{
1412 vacuumInterval: conf.vacuumInterval,
1413 mergeInterval: conf.mergeInterval,
1414 targetSizeBytes: conf.targetSize * 1024 * 1024,
1415 minSizeBytes: conf.minSize * 1024 * 1024,
1416 minAgeDays: conf.minAgeDays,
1417 maxPriority: conf.maxPriority,
1418 },
1419 }, err
1420}
1421
1422// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1423// for the indexed-search-configuration gRPC service.
1424//
1425// The default backoff strategy is modeled after the default settings used by
1426// retryablehttp.DefaultClient.
1427//
1428// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1429// - Unavailable
1430// - Aborted
1431//
1432//go:embed default_grpc_service_configuration.json
1433var defaultGRPCServiceConfigurationJSON string
1434
1435func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1436 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1437 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1438 return invoker(ctx, method, req, reply, cc, opts...)
1439 }
1440}
1441
1442func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1443 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1444 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1445 return streamer(ctx, desc, cc, method, opts...)
1446 }
1447}
1448
1449// addDefaultPort adds a default port to a URL if one is not specified.
1450//
1451// If the URL scheme is "http" and no port is specified, "80" is used.
1452// If the scheme is "https", "443" is used.
1453//
1454// The original URL is not mutated. A copy is modified and returned.
1455func addDefaultPort(original *url.URL) *url.URL {
1456 if original == nil {
1457 return nil // don't panic
1458 }
1459
1460 if !original.IsAbs() {
1461 return original // don't do anything if the URL doesn't look like a remote URL
1462 }
1463
1464 if original.Scheme == "http" && original.Port() == "" {
1465 u := cloneURL(original)
1466 u.Host = net.JoinHostPort(u.Host, "80")
1467 return u
1468 }
1469
1470 if original.Scheme == "https" && original.Port() == "" {
1471 u := cloneURL(original)
1472 u.Host = net.JoinHostPort(u.Host, "443")
1473 return u
1474 }
1475
1476 return original
1477}
1478
1479// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1480// This is copied from net/http/clone.go
1481func cloneURL(u *url.URL) *url.URL {
1482 if u == nil {
1483 return nil
1484 }
1485 u2 := new(url.URL)
1486 *u2 = *u
1487 if u.User != nil {
1488 u2.User = new(url.Userinfo)
1489 *u2.User = *u.User
1490 }
1491 return u2
1492}
1493
1494func main() {
1495 liblog := sglog.Init(sglog.Resource{
1496 Name: "zoekt-indexserver",
1497 Version: zoekt.Version,
1498 InstanceID: hostnameBestEffort(),
1499 })
1500 defer liblog.Sync()
1501
1502 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1503 log.Fatal(err)
1504 }
1505}