fork of https://github.com/sourcegraph/zoekt
1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5// http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12
13// Command zoekt-sourcegraph-indexserver periodically reindexes repositories
14// from a Sourcegraph instance. It uses a "pull-based" design, where it periodically
15// reaches out to the Sourcegraph instance for the list of repositories to reindex.
16package main
17
18import (
19 "bytes"
20 "context"
21 _ "embed"
22 "encoding/json"
23 "errors"
24 "flag"
25 "fmt"
26 "html/template"
27 "io"
28 "io/fs"
29 "log"
30 "math"
31 "math/rand"
32 "net"
33 "net/http"
34 "net/url"
35 "os"
36 "os/exec"
37 "os/signal"
38 "path"
39 "path/filepath"
40 "runtime"
41 "slices"
42 "sort"
43 "strconv"
44 "strings"
45 "sync"
46 "text/tabwriter"
47 "time"
48
49 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
50 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
51 "github.com/peterbourgon/ff/v3/ffcli"
52 "github.com/prometheus/client_golang/prometheus"
53 "github.com/prometheus/client_golang/prometheus/promauto"
54 sglog "github.com/sourcegraph/log"
55 "github.com/sourcegraph/mountinfo"
56
57 "github.com/sourcegraph/zoekt"
58 configv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/sourcegraph/zoekt/configuration/v1"
59 indexserverv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/zoekt/indexserver/v1"
60 "github.com/sourcegraph/zoekt/grpc/defaults"
61 "github.com/sourcegraph/zoekt/grpc/grpcutil"
62 "github.com/sourcegraph/zoekt/grpc/internalerrs"
63 "github.com/sourcegraph/zoekt/grpc/messagesize"
64 "github.com/sourcegraph/zoekt/index"
65 "github.com/sourcegraph/zoekt/internal/debugserver"
66 "github.com/sourcegraph/zoekt/internal/profiler"
67 "github.com/sourcegraph/zoekt/internal/tenant"
68
69 "go.uber.org/automaxprocs/maxprocs"
70 "go.uber.org/multierr"
71 "golang.org/x/net/trace"
72 "golang.org/x/sys/unix"
73 "google.golang.org/grpc"
74 "google.golang.org/grpc/codes"
75 "google.golang.org/grpc/credentials/insecure"
76 "google.golang.org/grpc/metadata"
77 "google.golang.org/grpc/status"
78)
79
80var (
81 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
82 Name: "resolve_revisions_seconds",
83 Help: "A histogram of latencies for resolving all repository revisions.",
84 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
85 })
86
87 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
88 Name: "resolve_revision_seconds",
89 Help: "A histogram of latencies for resolving a repository revision.",
90 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
91 }, []string{"success"}) // success=true|false
92
93 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
94 Name: "get_index_options_total",
95 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
96 })
97 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
98 Name: "get_index_options_error_total",
99 Help: "The total number of times we failed to get index options for a repository.",
100 })
101
102 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
103 Name: "index_repo_seconds",
104 Help: "A histogram of latencies for indexing a repository.",
105 Buckets: prometheus.ExponentialBucketsRange(
106 (100 * time.Millisecond).Seconds(),
107 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
108 20),
109 }, []string{
110 "state", // state is an indexState
111 "name", // name of the repository that was indexed
112 })
113
114 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{
115 Name: "index_indexing_delay_seconds",
116 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.",
117 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days
118 }, []string{
119 "state", // state is an indexState
120 "name", // the name of the repository that was indexed
121 })
122
123 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
124 Name: "index_fetch_seconds",
125 Help: "A histogram of latencies for fetching a repository.",
126 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
127 }, []string{
128 "success", // true|false
129 "name", // the name of the repository that the commits were fetched from
130 })
131
132 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
133 Name: "index_incremental_index_state",
134 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
135 }, []string{"state"}) // state is index.IndexState
136
137 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
138 Name: "index_num_indexed",
139 Help: "Number of indexed repos by code host",
140 })
141
142 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
143 Name: "index_failing_total",
144 Help: "Counts failures to index (indexing activity, should be used with rate())",
145 })
146
147 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
148 Name: "index_indexing_total",
149 Help: "Counts indexings (indexing activity, should be used with rate())",
150 })
151
152 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
153 Name: "index_num_stopped_tracking_total",
154 Help: "Counts the number of repos we stopped tracking.",
155 })
156
157 // clientMetricsOnce returns a singleton instance of the client metrics
158 // that are shared across all gRPC clients that this process creates.
159 //
160 // This function panics if the metrics cannot be registered with the default
161 // Prometheus registry.
162 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics {
163 clientMetrics := grpcprom.NewClientMetrics(
164 grpcprom.WithClientCounterOptions(),
165 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
166 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
167 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
168 )
169 prometheus.DefaultRegisterer.MustRegister(clientMetrics)
170 return clientMetrics
171 })
172)
173
174// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
175// NOTE: if you change this, you must also update gitIndex to use the same value when fetching the repo.
176const MaxFileSize = 1 << 20
177
178// set of repositories that we want to capture separate indexing metrics for
179var reposWithSeparateIndexingMetrics = make(map[string]struct{})
180
181type indexState string
182
183const (
184 indexStateFail indexState = "fail"
185 indexStateSuccess indexState = "success"
186 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
187 indexStateNoop indexState = "noop" // We didn't need to update index
188 indexStateEmpty indexState = "empty" // index is empty (empty repo)
189)
190
191// Server is the main functionality of zoekt-sourcegraph-indexserver. It
192// exists to conveniently use all the options passed in via func main.
193type Server struct {
194 logger sglog.Logger
195
196 Sourcegraph Sourcegraph
197
198 // rootURL is the root URL of the Sourcegraph instance.
199 rootURL *url.URL
200
201 BatchSize int
202
203 // IndexDir is the index directory to use.
204 IndexDir string
205
206 // IndexConcurrency is the number of repositories we index at once.
207 IndexConcurrency int
208
209 // indexSemaphore limits the number of concurrent index operations
210 indexSemaphore chan struct{}
211
212 // Interval is how often we sync with Sourcegraph.
213 Interval time.Duration
214
215 // CPUCount is the number of CPUs to use for indexing shards.
216 CPUCount int
217
218 queue Queue
219
220 // muIndexDir protects the index directory from concurrent access.
221 muIndexDir indexMutex
222
223 // If true, shard merging is enabled.
224 shardMerging bool
225
226 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
227 // use delta-builds for instead of normal builds
228 deltaBuildRepositoriesAllowList map[string]struct{}
229
230 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
231 // before attempting a delta build.
232 deltaShardNumberFallbackThreshold uint64
233
234 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
235 // we skip calculating symbols metadata for during builds
236 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
237
238 hostname string
239
240 mergeOpts mergeOpts
241
242 // timeout defines how long the index server waits before killing an indexing job.
243 timeout time.Duration
244
245 indexserverv1.UnimplementedSourcegraphIndexserverServiceServer
246}
247
248var (
249 debugLog = log.New(io.Discard, "[DEBUG] ", log.LstdFlags)
250 infoLog = log.New(os.Stderr, "[INFO] ", log.LstdFlags)
251 errorLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags)
252)
253
254// our index commands should output something every 100mb they process.
255//
256// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
257// time." famous last words. A client was indexing a monorepo with 42
258// cores... 5m was not enough.
259const noOutputTimeout = 30 * time.Minute
260
261func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
262 out := &synchronizedBuffer{}
263 cmd.Stdout = out
264 cmd.Stderr = out
265
266 tr.LazyPrintf("%s", cmd.Args)
267
268 defer func() {
269 if err != nil {
270 outS := out.String()
271 tr.LazyPrintf("failed: %v", err)
272 tr.LazyPrintf("output: %s", out)
273 tr.SetError()
274 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
275 }
276 }()
277
278 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
279
280 if err := cmd.Start(); err != nil {
281 return err
282 }
283
284 errC := make(chan error)
285 go func() {
286 errC <- cmd.Wait()
287 }()
288
289 // This channel is set after we have sent sigquit. It allows us to follow up
290 // with a sigkill if the process doesn't quit after sigquit.
291 kill := make(<-chan time.Time)
292
293 lastLen := 0
294 for {
295 select {
296 case <-time.After(noOutputTimeout):
297 // Periodically check if we have had output. If not kill the process.
298 if out.Len() != lastLen {
299 lastLen = out.Len()
300 infoLog.Printf("still running %s", cmd.Args)
301 } else {
302 // Send quit (C-\) first so we get a stack dump.
303 infoLog.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
304 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
305 errorLog.Println("quit failed:", err)
306 }
307
308 // send sigkill if still running in 10s
309 kill = time.After(10 * time.Second)
310 }
311
312 case <-kill:
313 infoLog.Printf("still running, killing %s", cmd.Args)
314 if err := cmd.Process.Kill(); err != nil {
315 errorLog.Println("kill failed:", err)
316 }
317
318 case err := <-errC:
319 if err != nil {
320 return err
321 }
322
323 tr.LazyPrintf("success")
324 return nil
325 }
326 }
327}
328
329// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
330// monitor the buffer while it is being written to.
331type synchronizedBuffer struct {
332 mu sync.Mutex
333 b bytes.Buffer
334}
335
336func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
337 sb.mu.Lock()
338 defer sb.mu.Unlock()
339 return sb.b.Write(p)
340}
341
342func (sb *synchronizedBuffer) Len() int {
343 sb.mu.Lock()
344 defer sb.mu.Unlock()
345 return sb.b.Len()
346}
347
348func (sb *synchronizedBuffer) String() string {
349 sb.mu.Lock()
350 defer sb.mu.Unlock()
351 return sb.b.String()
352}
353
354// pauseFileName if present in IndexDir will stop index jobs from
355// running. This is to make it possible to experiment with the content of the
356// IndexDir without the indexserver writing to it.
357const (
358 pauseFileName = "PAUSE"
359 pauseEnvVar = "SRC_PAUSE_INDEXING"
360)
361
362// isIndexingPaused checks if indexing should be paused based on either:
363// 1. The presence of a PAUSE file in the index directory
364// 2. The SRC_PAUSE_INDEXING environment variable being set to true
365func isIndexingPaused(indexDir string) (bool, string) {
366 // Check for PAUSE file first
367 if b, err := os.ReadFile(filepath.Join(indexDir, pauseFileName)); err == nil {
368 return true, fmt.Sprintf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
369 }
370
371 // Then check environment variable
372 if getEnvWithDefaultBool(pauseEnvVar, false) {
373 return true, fmt.Sprintf("indexserver paused via %s environment variable", pauseEnvVar)
374 }
375
376 return false, ""
377}
378
379// Run the sync loop. This blocks forever.
380func (s *Server) Run() {
381 removeIncompleteShards(s.IndexDir)
382
383 // Start a goroutine which updates the queue with commits to index.
384 go func() {
385 // We update the list of indexed repos every Interval. To speed up manual
386 // testing we also listen for SIGUSR1 to trigger updates.
387 // "pkill -SIGUSR1 zoekt-sourcegra"
388 for range jitterTicker(s.Interval, unix.SIGUSR1) {
389 if paused, msg := isIndexingPaused(s.IndexDir); paused {
390 infoLog.Printf(msg)
391 continue
392 }
393
394 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
395 if err != nil {
396 errorLog.Printf("error listing repos: %s", err)
397 continue
398 }
399
400 debugLog.Printf("updating index queue with %d repositories", len(repos.IDs))
401
402 // Stop indexing repos we don't need to track anymore
403 removed := s.queue.MaybeRemoveMissing(repos.IDs)
404 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
405 if len(removed) > 0 {
406 infoLog.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
407 }
408
409 cleanupDone := make(chan struct{})
410 go func() {
411 defer close(cleanupDone)
412 s.muIndexDir.Global(func() {
413 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
414 })
415 }()
416
417 repos.IterateIndexOptions(s.queue.AddOrUpdate)
418
419 // IterateIndexOptions will only iterate over repositories that have
420 // changed since we last called list. However, we want to add all IDs
421 // back onto the queue just to check that what is on disk is still
422 // correct. This will use the last IndexOptions we stored in the
423 // queue. The repositories not on the queue (missing) need a forced
424 // fetch of IndexOptions.
425 missing := s.queue.Bump(repos.IDs)
426 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
427
428 setShardsCounter(s.IndexDir)
429
430 <-cleanupDone
431 }
432 }()
433
434 go func() {
435 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
436 if s.shardMerging {
437 s.vacuum()
438 }
439 }
440 }()
441
442 go func() {
443 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
444 if s.shardMerging {
445 s.doMerge()
446 }
447 }
448 }()
449
450 for range s.IndexConcurrency {
451 go s.processQueue()
452 }
453
454 // block forever
455 select {}
456}
457
458// formatList returns a comma-separated list of the first min(len(v), m) items.
459func formatListUint32(v []uint32, m int) string {
460 if len(v) < m {
461 m = len(v)
462 }
463
464 sb := strings.Builder{}
465 for i := range m {
466 fmt.Fprintf(&sb, "%d, ", v[i])
467 }
468
469 if len(v) > m {
470 sb.WriteString("...")
471 }
472
473 return strings.TrimRight(sb.String(), ", ")
474}
475
476func (s *Server) processQueue() {
477 for {
478 if paused, _ := isIndexingPaused(s.IndexDir); paused {
479 time.Sleep(time.Second)
480 continue
481 }
482
483 item, ok := s.queue.Pop()
484 if !ok {
485 time.Sleep(time.Second)
486 continue
487 }
488
489 opts := item.Opts
490 args := s.indexArgs(opts)
491
492 ran := s.muIndexDir.With(opts.Name, func() {
493 // only record time taken once we hold the lock. This avoids us
494 // recording time taken while merging/cleanup runs.
495 start := time.Now()
496
497 state, err := s.index(context.Background(), args)
498
499 elapsed := time.Since(start)
500 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
501
502 indexDelay := time.Since(item.DateAddedToQueue)
503 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds())
504
505 if err != nil {
506 errorLog.Printf("error indexing %s: %s", args.String(), err)
507 }
508
509 switch state {
510 case indexStateSuccess:
511 var branches []string
512 for _, b := range args.Branches {
513 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
514 }
515 s.logger.Info("updated index",
516 sglog.Int("tenant", args.TenantID),
517 sglog.String("repo", args.Name),
518 sglog.Uint32("id", args.RepoID),
519 sglog.Strings("branches", branches),
520 sglog.Duration("duration", elapsed),
521 sglog.Duration("index_delay", indexDelay),
522 )
523 case indexStateSuccessMeta:
524 infoLog.Printf("updated meta %s in %v", args.String(), elapsed)
525 }
526 s.queue.SetIndexed(opts, state)
527 })
528
529 if !ran {
530 // Someone else is processing the repository. We can just skip this job
531 // since the repository will be added back to the queue and we will
532 // converge to the correct behaviour.
533 debugLog.Printf("index job for repository already running: %s", args)
534 continue
535 }
536 }
537}
538
539// repoNameForMetric returns a normalized version of the given repository name that is
540// suitable for use with Prometheus metrics.
541func repoNameForMetric(repo string) string {
542 // Check to see if we want to be able to capture separate indexing metrics for this repository.
543 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
544 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
545 return repo
546 }
547
548 return ""
549}
550
551func batched(slice []uint32, size int) <-chan []uint32 {
552 c := make(chan []uint32)
553 go func() {
554 for len(slice) > 0 {
555 if size > len(slice) {
556 size = len(slice)
557 }
558 c <- slice[:size]
559 slice = slice[size:]
560 }
561 close(c)
562 }()
563 return c
564}
565
566// jitterTicker returns a ticker which ticks with a jitter. Each tick is
567// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
568//
569// sig is a list of signals which also cause the ticker to fire. This is a
570// convenience to allow manually triggering of the ticker.
571func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
572 ticker := make(chan struct{})
573
574 go func() {
575 for {
576 ticker <- struct{}{}
577 ns := int64(d)
578 jitter := rand.Int63n(ns)
579 time.Sleep(time.Duration(ns/2 + jitter))
580 }
581 }()
582
583 go func() {
584 if len(sig) == 0 {
585 return
586 }
587
588 c := make(chan os.Signal, 1)
589 signal.Notify(c, sig...)
590 for range c {
591 ticker <- struct{}{}
592 }
593 }()
594
595 return ticker
596}
597
598// Index starts an index job for repo name at commit.
599func (s *Server) index(ctx context.Context, args *indexArgs) (state indexState, err error) {
600 tr := trace.New("index", args.Name)
601 tr.SetMaxEvents(30) // Ensure we capture all indexing events
602
603 defer func() {
604 if err != nil {
605 tr.SetError()
606 tr.LazyPrintf("error: %v", err)
607 state = indexStateFail
608 metricFailingTotal.Inc()
609 }
610 tr.LazyPrintf("state: %s", state)
611 tr.Finish()
612 }()
613
614 // Sourcegraph should always provide a tenant ID.
615 if args.TenantID < 1 {
616 return indexStateFail, tenant.ErrMissingTenant
617 }
618
619 tr.LazyPrintf("branches: %v", args.Branches)
620
621 if len(args.Branches) == 0 {
622 return indexStateEmpty, createEmptyShard(args)
623 }
624
625 repositoryName := args.Name
626 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
627 tr.LazyPrintf("marking this repository for delta build")
628 args.UseDelta = true
629 }
630
631 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
632
633 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
634 tr.LazyPrintf("skipping symbols calculation")
635 args.Symbols = false
636 }
637
638 reason := "forced"
639
640 if args.Incremental {
641 bo := args.BuildOptions()
642 bo.SetDefaults()
643 incrementalState, fn := bo.IndexState()
644 reason = string(incrementalState)
645 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
646
647 switch incrementalState {
648 case index.IndexStateEqual:
649 debugLog.Printf("%s index already up to date. Shard=%s", args.String(), fn)
650 return indexStateNoop, nil
651
652 case index.IndexStateMeta:
653 infoLog.Printf("updating index.meta %s", args.String())
654
655 // TODO(stefan) handle mergeMeta for tenant id.
656 if err := mergeMeta(bo); err != nil {
657 errorLog.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
658 } else {
659 return indexStateSuccessMeta, nil
660 }
661
662 case index.IndexStateCorrupt:
663 infoLog.Printf("falling back to full update: corrupt index: %s", args.String())
664 }
665 }
666
667 infoLog.Printf("updating index %s reason=%s", args.String(), reason)
668
669 metricIndexingTotal.Inc()
670 c := gitIndexConfig{
671 runCmd: func(cmd *exec.Cmd) error {
672 return s.loggedRun(tr, cmd)
673 },
674
675 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
676 return args.BuildOptions().FindRepositoryMetadata()
677 },
678 timeout: s.timeout,
679 }
680
681 err = gitIndex(ctx, c, args, s.Sourcegraph, s.logger)
682 if err != nil {
683 return indexStateFail, err
684 }
685
686 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
687 s.logger.Error("failed to update index status",
688 sglog.String("repo", args.Name),
689 sglog.Uint32("id", args.RepoID),
690 sglogBranches("branches", args.Branches),
691 sglog.Error(err),
692 )
693 }
694
695 return indexStateSuccess, nil
696}
697
698// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
699// it can update the zoekt_repos table.
700func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
701 // We need to read from disk for IndexTime.
702 _, metadata, ok, err := c.findRepositoryMetadata(args)
703 if err != nil {
704 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
705 }
706 if !ok {
707 return errors.New("failed to find metadata for new/updated index")
708 }
709
710 status := []indexStatus{{
711 RepoID: args.RepoID,
712 Branches: args.Branches,
713 IndexTimeUnix: metadata.IndexTime.Unix(),
714 }}
715 if err := sg.UpdateIndexStatus(status); err != nil {
716 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
717 }
718
719 return nil
720}
721
722func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
723 ss := make([]string, len(branches))
724 for i, b := range branches {
725 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
726 }
727 return sglog.Strings(key, ss)
728}
729
730func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
731 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
732 return &indexArgs{
733 IndexOptions: opts,
734 IndexDir: s.IndexDir,
735 Parallelism: parallelism,
736 Incremental: true,
737 FileLimit: MaxFileSize,
738 ShardMerging: s.shardMerging,
739 }
740}
741
742// parallelism consults both the server flags and index options to determine the number
743// of shards to index in parallel. If the CPUCount index option is provided, it always
744// overrides the server flag.
745func (s *Server) parallelism(opts IndexOptions, maxProcs int) int {
746 var parallelism int
747 if opts.ShardConcurrency > 0 {
748 parallelism = int(opts.ShardConcurrency)
749 } else {
750 parallelism = s.CPUCount
751 }
752
753 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs
754 if parallelism > 4*maxProcs {
755 parallelism = 4 * maxProcs
756 }
757
758 // If index concurrency is set, then divide the parallelism by the number of
759 // repos we're indexing in parallel
760 if s.IndexConcurrency > 1 {
761 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency)))
762 }
763
764 return parallelism
765}
766
767func createEmptyShard(args *indexArgs) error {
768 bo := args.BuildOptions()
769 bo.SetDefaults()
770 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
771
772 if args.Incremental && bo.IncrementalSkipIndexing() {
773 return nil
774 }
775
776 builder, err := index.NewBuilder(*bo)
777 if err != nil {
778 return err
779 }
780 return builder.Finish()
781}
782
783// addDebugHandlers adds handlers specific to indexserver.
784func (s *Server) addDebugHandlers(mux *http.ServeMux) {
785 // Sourcegraph's site admin view requires indexserver to serve it's admin view
786 // on "/".
787 mux.Handle("/", http.HandlerFunc(s.handleRoot))
788
789 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
790 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
791 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
792 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
793 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
794 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
795}
796
797func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
798 if r.Method != "GET" {
799 w.Header().Set("Allow", "GET")
800 w.WriteHeader(http.StatusMethodNotAllowed)
801 return
802 }
803
804 response := struct {
805 Hostname string
806 }{
807 Hostname: s.hostname,
808 }
809
810 b, err := json.Marshal(response)
811 if err != nil {
812 http.Error(w, err.Error(), http.StatusInternalServerError)
813 return
814 }
815
816 w.Header().Set("Content-Type", "application/json; charset=utf-8")
817 w.Write(b)
818}
819
820var rootTmpl = template.Must(template.New("name").Parse(`
821<html>
822 <body>
823 <a href="debug">Debug</a><br />
824 <a href="debug/requests">Traces</a><br />
825 {{.IndexMsg}}<br />
826 <br />
827 <h3>Reindex</h3>
828 {{if .Repos}}
829 <a href="?show_repos=false">hide repos</a><br />
830 <table style="margin-top: 20px">
831 <th style="text-align:left">Name</th>
832 <th style="text-align:left">ID (click to reindex)</th>
833 {{range .Repos}}
834 <tr>
835 <td>{{.Name}}</td>
836 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
837 </tr>
838 {{end}}
839 </table>
840 {{else}}
841 <a href="?show_repos=true">show repos</a><br />
842 {{end}}
843 </body>
844</html>
845`))
846
847func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
848 if r.Method != "GET" {
849 w.Header().Set("Allow", "GET")
850 w.WriteHeader(http.StatusMethodNotAllowed)
851 return
852 }
853
854 values := r.URL.Query()
855
856 // ?id=
857 indexMsg := ""
858 if v := values.Get("id"); v != "" {
859 id, err := strconv.Atoi(v)
860 if err != nil {
861 http.Error(w, err.Error(), http.StatusBadRequest)
862 return
863 }
864 indexMsg, _ = s.forceIndex(r.Context(), uint32(id))
865 }
866
867 // ?show_repos=
868 showRepos := false
869 if v := values.Get("show_repos"); v != "" {
870 showRepos, _ = strconv.ParseBool(v)
871 }
872
873 type Repo struct {
874 ID uint32
875 Name string
876 }
877 var data struct {
878 Repos []Repo
879 IndexMsg string
880 }
881
882 data.IndexMsg = indexMsg
883
884 if showRepos {
885 s.queue.Iterate(func(opts *IndexOptions) {
886 data.Repos = append(data.Repos, Repo{
887 ID: opts.RepoID,
888 Name: opts.Name,
889 })
890 })
891 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
892 }
893
894 _ = rootTmpl.Execute(w, data)
895}
896
897// handleReindex triggers a reindex asynocronously. If a reindex was triggered
898// the request returns with status 202. The caller can infer the new state of
899// the index by calling List.
900func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
901 if r.Method != http.MethodPost {
902 w.Header().Set("Allow", http.MethodPost)
903 w.WriteHeader(http.StatusMethodNotAllowed)
904 return
905 }
906
907 err := r.ParseForm()
908 if err != nil {
909 http.Error(w, err.Error(), http.StatusBadRequest)
910 return
911 }
912
913 id, err := strconv.Atoi(r.Form.Get("repo"))
914 if err != nil {
915 http.Error(w, err.Error(), http.StatusBadRequest)
916 return
917 }
918
919 go func() { s.forceIndex(r.Context(), uint32(id)) }()
920
921 // 202 Accepted
922 w.WriteHeader(http.StatusAccepted)
923}
924
925func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
926 withIndexed := true
927 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
928 withIndexed = b
929 }
930
931 var indexed []uint32
932 if withIndexed {
933 indexed = listIndexed(s.IndexDir)
934 }
935
936 repos, err := s.Sourcegraph.List(r.Context(), indexed)
937 if err != nil {
938 http.Error(w, err.Error(), http.StatusInternalServerError)
939 return
940 }
941
942 bw := bytes.Buffer{}
943
944 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
945
946 _, err = fmt.Fprintf(tw, "ID\tName\n")
947 if err != nil {
948 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
949 return
950 }
951
952 s.queue.mu.Lock()
953 name := ""
954 for _, id := range repos.IDs {
955 if item := s.queue.get(id); item != nil {
956 name = item.opts.Name
957 } else {
958 name = ""
959 }
960 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
961 if err != nil {
962 debugLog.Printf("handleDebugList: %s\n", err.Error())
963 }
964 }
965 s.queue.mu.Unlock()
966
967 if err != nil {
968 http.Error(w, err.Error(), http.StatusInternalServerError)
969 return
970 }
971
972 err = tw.Flush()
973 if err != nil {
974 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
975 return
976 }
977
978 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
979
980 if _, err := io.Copy(w, &bw); err != nil {
981 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
982 return
983 }
984}
985
986// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
987// can run this command during periods of low usage (evenings, weekends) to
988// trigger an initial merge run. In the steady-state, merges happen rarely, even
989// on busy instances, and users can rely on automatic merging instead.
990func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
991 // A merge operation can take very long, depending on the number merges and the
992 // target size of the compound shards. We run the merge in the background and
993 // return immediately to the user.
994 //
995 // We track the status of the merge with metricShardMergingRunning.
996 go func() {
997 s.doMerge()
998 }()
999 _, _ = w.Write([]byte("merging enqueued\n"))
1000}
1001
1002func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
1003 indexed := listIndexed(s.IndexDir)
1004
1005 bw := bytes.Buffer{}
1006
1007 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
1008
1009 _, err := fmt.Fprintf(tw, "ID\tName\n")
1010 if err != nil {
1011 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
1012 return
1013 }
1014
1015 s.queue.mu.Lock()
1016 name := ""
1017 for _, id := range indexed {
1018 if item := s.queue.get(id); item != nil {
1019 name = item.opts.Name
1020 } else {
1021 name = ""
1022 }
1023 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
1024 if err != nil {
1025 debugLog.Printf("handleDebugIndexed: %s\n", err.Error())
1026 }
1027 }
1028 s.queue.mu.Unlock()
1029
1030 if err != nil {
1031 http.Error(w, err.Error(), http.StatusInternalServerError)
1032 return
1033 }
1034
1035 err = tw.Flush()
1036 if err != nil {
1037 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
1038 return
1039 }
1040
1041 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
1042
1043 if _, err := io.Copy(w, &bw); err != nil {
1044 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
1045 return
1046 }
1047}
1048
1049// forceIndex will run the index job for repo name now. It will return always
1050// return a string explaining what it did, even if it failed.
1051func (s *Server) forceIndex(ctx context.Context, id uint32) (string, error) {
1052 var opts IndexOptions
1053 var err error
1054 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
1055 opts = o
1056 }, func(_ uint32, e error) {
1057 err = e
1058 }, id)
1059 if err != nil {
1060 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
1061 }
1062
1063 args := s.indexArgs(opts)
1064 args.Incremental = false // force re-index
1065
1066 var state indexState
1067 ran := s.muIndexDir.With(opts.Name, func() {
1068 state, err = s.index(ctx, args)
1069 })
1070 if !ran {
1071 return fmt.Sprintf("index job for repository already running: %s", args), nil
1072 }
1073 if err != nil {
1074 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
1075 }
1076 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
1077}
1078
1079// Index implements the gRPC method for indexing a repository. If the repository
1080// exists in the trash, it will attempt to recover it before indexing. The
1081// method returns metadata about the indexed repository including branches and
1082// and the time of indexing. The metadata might be empty if we fail to read the
1083// metadata after the index or if the repository is empty.
1084//
1085// Possible GRPC error codes:
1086// - InvalidArgument: returned when the index options are missing or invalid
1087// - AlreadyExists: returned when the repository is already being indexed by another process
1088// - Internal: returned when indexing fails
1089// - Canceled: returned when the context is canceled while waiting for an index slot
1090func (s *Server) Index(ctx context.Context, req *indexserverv1.IndexRequest) (*indexserverv1.IndexResponse, error) {
1091 return s.indexGRPC(ctx, req, s.index)
1092}
1093
1094// indexGRPC takes an index func which is used for testing. For production GRPC calls, use
1095// Server.Index instead
1096func (s *Server) indexGRPC(ctx context.Context, req *indexserverv1.IndexRequest, indexFunc func(ctx context.Context, args *indexArgs) (indexState, error)) (*indexserverv1.IndexResponse, error) {
1097 // Validate request
1098 if req.Options == nil {
1099 return nil, status.Error(codes.InvalidArgument, "index options are required")
1100 }
1101
1102 // Try to acquire semaphore, but respect context cancellation
1103 select {
1104 case s.indexSemaphore <- struct{}{}:
1105 defer func() { <-s.indexSemaphore }()
1106 case <-ctx.Done():
1107 return nil, status.Error(codes.Canceled, "context canceled while waiting for index slot")
1108 case <-time.After(s.timeout):
1109 return nil, status.Error(codes.DeadlineExceeded, "timed out while waiting for index slot")
1110 }
1111
1112 var opts IndexOptions
1113 opts.FromProto(req.Options)
1114 opts.CloneURL = s.rootURL.ResolveReference(&url.URL{Path: path.Join("/.internal/git", req.Options.Name)}).String()
1115 args := s.indexArgs(opts)
1116
1117 // Recover the index from the trash if possible. First we have to make sure
1118 // the shard doesn't exist in the index dir, because it might potentially be
1119 // overwritten if we recover a shard from the trash. If we can find a shard
1120 // in the index dir, we skip recovery. In both cases we continue and let the
1121 // index process figure out whether the shard is up-to-date or requires a
1122 // reindex.
1123 if _, _, ok, err := args.BuildOptions().FindRepositoryMetadata(); err == nil && !ok {
1124 if s.recoverFromTrash(opts.RepoID) {
1125 infoLog.Printf("restored repository %d from trash", opts.RepoID)
1126 }
1127 }
1128
1129 // Index the repository.
1130 var indexErr error
1131 var indexTimeUnix int64
1132 var zoektRepo *zoekt.Repository
1133
1134 ran := s.muIndexDir.With(opts.Name, func() {
1135 // only record time taken once we hold the lock. This avoids us
1136 // recording time taken while merging/cleanup runs.
1137 start := time.Now()
1138
1139 var state indexState
1140 state, indexErr = indexFunc(ctx, args)
1141 elapsed := time.Since(start)
1142 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
1143
1144 if indexErr != nil {
1145 errorLog.Printf("error indexing %s: %s", args.String(), indexErr)
1146 indexErr = status.Errorf(codes.Internal, "failed to index repository: %v", indexErr)
1147 return
1148 }
1149
1150 readMetadata := func(args *indexArgs) (*zoekt.Repository, int64, error) {
1151 repo, metadata, ok, err := args.BuildOptions().FindRepositoryMetadata()
1152 if err != nil || !ok {
1153 return nil, 0, fmt.Errorf("failed to read metadata for %s: %w", args.String(), err)
1154 }
1155 return repo, metadata.IndexTime.Unix(), nil
1156 }
1157
1158 switch state {
1159 case indexStateSuccess:
1160 zoektRepo, indexTimeUnix, indexErr = readMetadata(args)
1161 if indexErr != nil {
1162 return
1163 }
1164 var branches []string
1165 for _, b := range zoektRepo.Branches {
1166 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
1167 }
1168 s.logger.Info("updated index",
1169 sglog.Int("tenant", args.TenantID),
1170 sglog.String("repo", args.Name),
1171 sglog.Uint32("id", args.RepoID),
1172 sglog.Strings("branches", branches),
1173 sglog.Duration("duration", elapsed),
1174 )
1175 case indexStateSuccessMeta:
1176 zoektRepo, indexTimeUnix, indexErr = readMetadata(args)
1177 if indexErr != nil {
1178 return
1179 }
1180 infoLog.Printf("updated meta %s in %v", args.String(), elapsed)
1181 case indexStateNoop:
1182 zoektRepo, indexTimeUnix, indexErr = readMetadata(args)
1183 if indexErr != nil {
1184 return
1185 }
1186 case indexStateEmpty:
1187 zoektRepo = &zoekt.Repository{
1188 ID: args.RepoID,
1189 }
1190 case indexStateFail:
1191 // This should never happen, because indexStateFail implies
1192 // indexErr!=nil and we exit early, but we'll handle it gracefully
1193 // just in case.
1194 indexErr = status.Error(codes.Internal, "failed to index repository")
1195 // Repository exists but is empty, return OK with empty response
1196 default:
1197 indexErr = status.Errorf(codes.Internal, "unknown index state: %s", state)
1198 }
1199 })
1200
1201 if !ran {
1202 // Someone else is processing the repository.
1203 debugLog.Printf("index job for repository already running: %s", args)
1204 return nil, status.Error(codes.AlreadyExists, "repository is already being indexed")
1205 }
1206
1207 if indexErr != nil {
1208 return nil, indexErr
1209 }
1210
1211 if zoektRepo == nil {
1212 // This should never happen, because zoektRepo=nil means we either
1213 // failed to index or failed to read the metadata, both of which yield
1214 // indexErr!=nil.
1215 return nil, status.Error(codes.Internal, fmt.Sprintf("unexpected error: zoektRepo is nil for options %+v", opts))
1216 }
1217
1218 // Convert branches to proto format
1219 repoBranchesProto := make([]*configv1.ZoektRepositoryBranch, len(zoektRepo.Branches))
1220 for i, b := range zoektRepo.Branches {
1221 repoBranchesProto[i] = &configv1.ZoektRepositoryBranch{
1222 Name: b.Name,
1223 Version: b.Version,
1224 }
1225 }
1226
1227 return &indexserverv1.IndexResponse{
1228 RepoId: zoektRepo.ID,
1229 Branches: repoBranchesProto,
1230 IndexTimeUnix: indexTimeUnix,
1231 }, nil
1232}
1233
1234// recoverFromTrash attempts to recover an index from the trash or tombstones.
1235// It returns true if the repository was recovered.
1236func (s *Server) recoverFromTrash(repoID uint32) bool {
1237 trashShards := getShards(filepath.Join(s.IndexDir, ".trash"))
1238 if shards, ok := trashShards[repoID]; ok {
1239 moveAll(s.IndexDir, shards)
1240 return true
1241 }
1242
1243 tombstones := getTombstonedRepos(s.IndexDir)
1244 if tombstone, ok := tombstones[repoID]; ok {
1245 if err := index.UnsetTombstone(tombstone.Path, repoID); err == nil {
1246 return true
1247 }
1248 }
1249
1250 return false
1251}
1252
1253// Delete implements the gRPC method for deleting a repository. It moves the
1254// simple shards to the trash dir and tombstones repos in compound shards.
1255func (s *Server) Delete(ctx context.Context, req *indexserverv1.DeleteRequest) (*indexserverv1.DeleteResponse, error) {
1256 var err error
1257
1258 // Run the delete operation in a goroutine to be able to respond to context
1259 // cancellation while waiting for the lock.
1260 done := make(chan struct{})
1261 go func() {
1262 defer close(done)
1263 s.muIndexDir.Global(func() {
1264 indexShards := getShards(s.IndexDir)
1265 for _, repoID := range req.RepoIds {
1266 if ctx.Err() != nil {
1267 err = status.Error(codes.Canceled, "context canceled")
1268 return
1269 }
1270
1271 if shards, ok := indexShards[repoID]; ok {
1272 simple := shards[:0]
1273 for _, shardItem := range shards {
1274 if s.shardMerging && maybeSetTombstone([]shard{shardItem}, repoID) {
1275 continue
1276 }
1277
1278 simple = append(simple, shardItem)
1279 }
1280
1281 if len(simple) == 0 {
1282 continue
1283 }
1284
1285 moveAll(filepath.Join(s.IndexDir, ".trash"), simple)
1286 }
1287 }
1288 })
1289 }()
1290
1291 select {
1292 case <-done:
1293 if err != nil {
1294 return nil, err
1295 }
1296 case <-ctx.Done():
1297 return nil, status.Error(codes.Canceled, "context canceled")
1298 case <-time.After(s.timeout):
1299 return nil, status.Error(codes.DeadlineExceeded, "timed out while waiting for delete operation to complete")
1300 }
1301
1302 return &indexserverv1.DeleteResponse{}, nil
1303}
1304
1305// DeleteAllData deletes all shards in the index and trash dir belonging to the
1306// tenant associated with the request. The deletion is best-effort, which means
1307// we will delete as much as possible. If no error is returned, the caller can
1308// be certain that all data has been deleted.
1309func (s *Server) DeleteAllData(ctx context.Context, _ *indexserverv1.DeleteAllDataRequest) (*indexserverv1.DeleteAllDataResponse, error) {
1310 tnt, err := tenant.FromContext(ctx)
1311 if err != nil {
1312 return nil, err
1313 }
1314 s.logger.Warn("DeleteAllData", sglog.Int("tenant_id", tnt.ID()))
1315
1316 var merr error
1317 s.muIndexDir.Global(func() {
1318 // First, explode all compound shards that have repos from the tenant in
1319 // question. Because we hold the global lock, we can be sure that no new
1320 // merges start while we do this.
1321 if err := s.explodeTenantCompoundShards(ctx, func(path string) error {
1322 // We call explode in a separate process to protect indexserver.
1323 cmd := defaultExplodeCmd(path)
1324
1325 stdoutBuf := &bytes.Buffer{}
1326 stderrBuf := &bytes.Buffer{}
1327 cmd.Stdout = stdoutBuf
1328 cmd.Stderr = stderrBuf
1329
1330 err := cmd.Run()
1331 if err != nil {
1332 errorLog.Printf("explode failed: %v (stderr: %s)", err, stderrBuf.String())
1333 return err
1334 }
1335
1336 infoLog.Printf("exploded shard: %s", stdoutBuf.String())
1337
1338 return nil
1339 }); err != nil {
1340 merr = multierr.Append(merr, err)
1341 }
1342
1343 // Invariant: all shards from the tenant are simple shards.
1344
1345 if err := purgeTenantShards(ctx, s.IndexDir); err != nil {
1346 merr = multierr.Append(merr, err)
1347 }
1348 if err := purgeTenantShards(ctx, filepath.Join(s.IndexDir, ".trash")); err != nil {
1349 merr = multierr.Append(merr, err)
1350 }
1351 })
1352
1353 return &indexserverv1.DeleteAllDataResponse{}, merr
1354}
1355
1356func listIndexed(indexDir string) []uint32 {
1357 index := getShards(indexDir)
1358 metricNumIndexed.Set(float64(len(index)))
1359 repoIDs := make([]uint32, 0, len(index))
1360 for id := range index {
1361 repoIDs = append(repoIDs, id)
1362 }
1363 slices.Sort(repoIDs)
1364 return repoIDs
1365}
1366
1367// setupTmpDir sets up a temporary directory on the same volume as the
1368// indexes.
1369//
1370// If main is true we will delete older temp directories left around. main is
1371// false when this is a debug command.
1372func setupTmpDir(logger sglog.Logger, main bool, index string) error {
1373 // change the target tmp directory depending on if it's our main daemon or a
1374 // debug sub command.
1375 dir := ".indexserver.debug.tmp"
1376 if main {
1377 dir = ".indexserver.tmp"
1378 }
1379
1380 tmpRoot := filepath.Join(index, dir)
1381
1382 if main {
1383 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot))
1384 err := os.RemoveAll(tmpRoot)
1385 if err != nil {
1386 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err))
1387 }
1388 }
1389
1390 if err := os.MkdirAll(tmpRoot, 0o755); err != nil {
1391 return err
1392 }
1393
1394 return os.Setenv("TMPDIR", tmpRoot)
1395}
1396
1397func printMetaData(fn string) error {
1398 repo, indexMeta, err := index.ReadMetadataPath(fn)
1399 if err != nil {
1400 return err
1401 }
1402
1403 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1404 if err != nil {
1405 return err
1406 }
1407
1408 err = json.NewEncoder(os.Stdout).Encode(repo)
1409 if err != nil {
1410 return err
1411 }
1412 return nil
1413}
1414
1415func printShardStats(fn string) error {
1416 f, err := os.Open(fn)
1417 if err != nil {
1418 return err
1419 }
1420
1421 iFile, err := index.NewIndexFile(f)
1422 if err != nil {
1423 return err
1424 }
1425
1426 return index.PrintNgramStats(iFile)
1427}
1428
1429func srcLogLevelIsDebug() bool {
1430 lvl := os.Getenv(sglog.EnvLogLevel)
1431 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1432}
1433
1434func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1435 v := os.Getenv(k)
1436 if v == "" {
1437 return defaultVal
1438 }
1439 b, err := strconv.ParseBool(v)
1440 if err != nil {
1441 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1442 }
1443 return b
1444}
1445
1446func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1447 v := os.Getenv(k)
1448 if v == "" {
1449 return defaultVal
1450 }
1451 i, err := strconv.ParseInt(v, 10, 64)
1452 if err != nil {
1453 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1454 }
1455 return i
1456}
1457
1458func getEnvWithDefaultInt(k string, defaultVal int) int {
1459 v := os.Getenv(k)
1460 if v == "" {
1461 return defaultVal
1462 }
1463 i, err := strconv.Atoi(k)
1464 if err != nil {
1465 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1466 }
1467 return i
1468}
1469
1470func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1471 v := os.Getenv(k)
1472 if v == "" {
1473 return defaultVal
1474 }
1475 i, err := strconv.ParseUint(v, 10, 64)
1476 if err != nil {
1477 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1478 }
1479 return i
1480}
1481
1482func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1483 v := os.Getenv(k)
1484 if v == "" {
1485 return defaultVal
1486 }
1487 f, err := strconv.ParseFloat(v, 64)
1488 if err != nil {
1489 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1490 }
1491 return f
1492}
1493
1494func getEnvWithDefaultString(k string, defaultVal string) string {
1495 v := os.Getenv(k)
1496 if v == "" {
1497 return defaultVal
1498 }
1499 return v
1500}
1501
1502func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1503 v := os.Getenv(k)
1504 if v == "" {
1505 return defaultVal
1506 }
1507
1508 d, err := time.ParseDuration(v)
1509 if err != nil {
1510 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1511 }
1512 return d
1513}
1514
1515func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1516 set := map[string]struct{}{}
1517 for _, v := range strings.Split(os.Getenv(k), ",") {
1518 v = strings.TrimSpace(v)
1519 if v != "" {
1520 set[v] = struct{}{}
1521 }
1522 }
1523 return set
1524}
1525
1526func joinStringSet(set map[string]struct{}, sep string) string {
1527 var xs []string
1528 for x := range set {
1529 xs = append(xs, x)
1530 }
1531
1532 return strings.Join(xs, sep)
1533}
1534
1535func setShardsCounter(indexDir string) {
1536 fns, err := filepath.Glob(filepath.Join(indexDir, "*.zoekt"))
1537 if err != nil {
1538 errorLog.Printf("setShardsCounter: %s\n", err)
1539 return
1540 }
1541 metricNumberShards.Set(float64(len(fns)))
1542
1543 compoundFns := make([]string, 0, len(fns))
1544 for _, fn := range fns {
1545 if strings.HasPrefix(filepath.Base(fn), "compound-") {
1546 compoundFns = append(compoundFns, fn)
1547 }
1548 }
1549 metricNumberCompoundShards.Set(float64(len(compoundFns)))
1550}
1551
1552func rootCmd() *ffcli.Command {
1553 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1554 conf := rootConfig{
1555 Main: true,
1556 }
1557 conf.registerRootFlags(rootFs)
1558
1559 return &ffcli.Command{
1560 FlagSet: rootFs,
1561 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1562 Subcommands: []*ffcli.Command{debugCmd()},
1563 Exec: func(ctx context.Context, args []string) error {
1564 return startServer(conf)
1565 },
1566 }
1567}
1568
1569type rootConfig struct {
1570 // Main is true if this rootConfig is for our main long running command (the
1571 // indexserver). Debug commands should not set this value. This is used to
1572 // determine if we need to run tmpfriend.
1573 Main bool
1574
1575 root string
1576 interval time.Duration
1577 index string
1578 indexConcurrency int64
1579 listen string
1580 hostname string
1581 cpuFraction float64
1582
1583 // config values related to shard merging
1584 disableShardMerging bool
1585 vacuumInterval time.Duration
1586 mergeInterval time.Duration
1587 targetSize int64
1588 minSize int64
1589 minAgeDays int
1590
1591 // config values related to backoff indexing repos with one or more consecutive failures
1592 backoffDuration time.Duration
1593 maxBackoffDuration time.Duration
1594}
1595
1596func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1597 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1598 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1599 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
1600 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", index.DefaultDir), "set index directory to use")
1601 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1602 fs.StringVar(&rc.hostname, "hostname", index.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1603 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1604 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1605 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1606
1607 // flags related to shard merging
1608 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging")
1609 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1610 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1611 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB")
1612 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB")
1613 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1614}
1615
1616func startServer(conf rootConfig) error {
1617 s, err := newServer(conf)
1618 if err != nil {
1619 return err
1620 }
1621
1622 profiler.Init("zoekt-sourcegraph-indexserver")
1623 setShardsCounter(s.IndexDir)
1624
1625 if conf.listen != "" {
1626
1627 mux := http.NewServeMux()
1628 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1629 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1630 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1631 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1632 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1633 }...)
1634 s.addDebugHandlers(mux)
1635
1636 go func() {
1637 debugLog.Printf("serving HTTP on %s", conf.listen)
1638 mux := grpcutil.MultiplexGRPC(newGRPCServer(sglog.Scoped("indexserver"), s), mux)
1639 log.Fatal(http.ListenAndServe(conf.listen, mux))
1640 }()
1641
1642 // Serve mux on a unix domain socket on a best-effort-basis so that
1643 // webserver can call the endpoints via the shared filesystem.
1644 //
1645 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1646 // on the socket due to permission errors. See
1647 // https://github.com/docker/for-mac/issues/6239
1648 go func() {
1649 serveHTTPOverSocket := func() error {
1650 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1651 // We cannot bind a socket to an existing pathname.
1652 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1653 return fmt.Errorf("error removing socket file: %s", socket)
1654 }
1655 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1656 // unixpacket).
1657 l, err := net.Listen("unix", socket)
1658 if err != nil {
1659 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1660 }
1661 // Indexserver (root) and webserver (Sourcegraph) run with
1662 // different users. Per default, the socket is created with
1663 // permission 755 (root root), which doesn't let webserver write to
1664 // it.
1665 //
1666 // See https://github.com/golang/go/issues/11822 for more context.
1667 if err := os.Chmod(socket, 0o777); err != nil {
1668 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1669 }
1670 debugLog.Printf("serving HTTP on %s", socket)
1671 return http.Serve(l, mux)
1672 }
1673 debugLog.Print(serveHTTPOverSocket())
1674 }()
1675 }
1676
1677 oc := &ownerChecker{
1678 Path: filepath.Join(conf.index, "owner.txt"),
1679 Hostname: conf.hostname,
1680 }
1681 go oc.Run()
1682
1683 logger := sglog.Scoped("metricsRegistration")
1684 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1685
1686 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1687 prometheus.DefaultRegisterer.MustRegister(c)
1688
1689 s.Run()
1690 return nil
1691}
1692
1693func newServer(conf rootConfig) (*Server, error) {
1694 logger := sglog.Scoped("server")
1695
1696 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1697 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1698 }
1699 if conf.index == "" {
1700 return nil, fmt.Errorf("must set -index")
1701 }
1702 if conf.root == "" {
1703 return nil, fmt.Errorf("must set -sourcegraph_url")
1704 }
1705 rootURL, err := url.Parse(conf.root)
1706 if err != nil {
1707 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1708 }
1709
1710 rootURL = addDefaultPort(rootURL)
1711
1712 // Tune GOMAXPROCS to match Linux container CPU quota.
1713 _, _ = maxprocs.Set()
1714
1715 // Automatically prepend our own path at the front, to minimize
1716 // required configuration.
1717 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1718 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1719 }
1720
1721 if _, err := os.Stat(conf.index); err != nil {
1722 if err := os.MkdirAll(conf.index, 0o755); err != nil {
1723 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1724 }
1725 }
1726
1727 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil {
1728 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1729 }
1730
1731 if srcLogLevelIsDebug() {
1732 debugLog.SetOutput(os.Stderr)
1733 }
1734
1735 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1736 if len(reposWithSeparateIndexingMetrics) > 0 {
1737 debugLog.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1738 }
1739
1740 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1741 if len(deltaBuildRepositoriesAllowList) > 0 {
1742 debugLog.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1743 }
1744
1745 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1746 if deltaShardNumberFallbackThreshold > 0 {
1747 debugLog.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1748 } else {
1749 debugLog.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1750 }
1751
1752 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1753 if len(reposShouldSkipSymbolsCalculation) > 0 {
1754 debugLog.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1755 }
1756
1757 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout)
1758 if indexingTimeout != defaultIndexingTimeout {
1759 debugLog.Printf("using configured indexing timeout: %s", indexingTimeout)
1760 }
1761
1762 var sg Sourcegraph
1763 if rootURL.IsAbs() {
1764 var batchSize int
1765 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1766 batchSize, err = strconv.Atoi(v)
1767 if err != nil {
1768 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1769 }
1770 }
1771
1772 opts := []SourcegraphClientOption{
1773 WithBatchSize(batchSize),
1774 }
1775
1776 logger := sglog.Scoped("zoektConfigurationGRPCClient")
1777 client, err := dialGRPCClient(rootURL.Host, logger)
1778 if err != nil {
1779 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1780 }
1781
1782 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...)
1783
1784 } else {
1785 sg = sourcegraphFake{
1786 RootDir: rootURL.String(),
1787 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1788 }
1789 }
1790
1791 cpuCount := max(int(math.Round(float64(runtime.GOMAXPROCS(0))*(conf.cpuFraction))), 1)
1792
1793 if conf.indexConcurrency < 1 {
1794 conf.indexConcurrency = 1
1795 } else if conf.indexConcurrency > int64(cpuCount) {
1796 conf.indexConcurrency = int64(cpuCount)
1797 }
1798
1799 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1800
1801 return &Server{
1802 logger: logger,
1803 rootURL: rootURL,
1804 Sourcegraph: sg,
1805 IndexDir: conf.index,
1806 IndexConcurrency: int(conf.indexConcurrency),
1807 Interval: conf.interval,
1808 CPUCount: cpuCount,
1809 queue: *q,
1810 shardMerging: !conf.disableShardMerging,
1811 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1812 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1813 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1814 hostname: conf.hostname,
1815 mergeOpts: mergeOpts{
1816 vacuumInterval: conf.vacuumInterval,
1817 mergeInterval: conf.mergeInterval,
1818 targetSizeBytes: conf.targetSize * 1024 * 1024,
1819 minSizeBytes: conf.minSize * 1024 * 1024,
1820 minAgeDays: conf.minAgeDays,
1821 },
1822 timeout: indexingTimeout,
1823 indexSemaphore: make(chan struct{}, int(conf.indexConcurrency)),
1824 }, err
1825}
1826
1827func newGRPCServer(logger sglog.Logger, s *Server, additionalOpts ...grpc.ServerOption) *grpc.Server {
1828 grpcServer := defaults.NewServer(logger, additionalOpts...)
1829 indexserverv1.RegisterSourcegraphIndexserverServiceServer(grpcServer, s)
1830 return grpcServer
1831}
1832
1833// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1834// for the indexed-search-configuration gRPC service.
1835//
1836// The default backoff strategy is modeled after the default settings used by
1837// retryablehttp.DefaultClient.
1838//
1839// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1840// - Unavailable
1841// - Aborted
1842//
1843//go:embed default_grpc_service_configuration.json
1844var defaultGRPCServiceConfigurationJSON string
1845
1846func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1847 return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1848 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1849 return invoker(ctx, method, req, reply, cc, opts...)
1850 }
1851}
1852
1853func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1854 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1855 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1856 return streamer(ctx, desc, cc, method, opts...)
1857 }
1858}
1859
1860// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1861// This can be overridden by providing custom Server/Dial options.
1862const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1863
1864func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (configv1.ZoektConfigurationServiceClient, error) {
1865 metrics := clientMetricsOnce()
1866
1867 // If the service seems to be unavailable, this
1868 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1
1869 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s])
1870 retryOpts := []retry.CallOption{
1871 retry.WithMax(5),
1872 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)),
1873 retry.WithCodes(codes.Unavailable),
1874 }
1875
1876 opts := []grpc.DialOption{
1877 grpc.WithTransportCredentials(insecure.NewCredentials()),
1878 grpc.WithChainStreamInterceptor(
1879 metrics.StreamClientInterceptor(),
1880 messagesize.StreamClientInterceptor,
1881 internalActorStreamInterceptor(),
1882 internalerrs.LoggingStreamClientInterceptor(logger),
1883 internalerrs.PrometheusStreamClientInterceptor,
1884 retry.StreamClientInterceptor(retryOpts...),
1885 ),
1886 grpc.WithChainUnaryInterceptor(
1887 metrics.UnaryClientInterceptor(),
1888 messagesize.UnaryClientInterceptor,
1889 internalActorUnaryInterceptor(),
1890 internalerrs.LoggingUnaryClientInterceptor(logger),
1891 internalerrs.PrometheusUnaryClientInterceptor,
1892 retry.UnaryClientInterceptor(retryOpts...),
1893 ),
1894 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1895 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1896 }
1897
1898 opts = append(opts, additionalOpts...)
1899
1900 // Ensure that the message size options are set last, so they override any other
1901 // client-specific options that tweak the message size.
1902 //
1903 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1904 // take precedence over everything else with a uniform size setting that's easy to reason about.
1905 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1906
1907 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1908 // This is done lazily, so we can provide the client to use regardless of
1909 // whether we enabled gRPC or not initially.
1910 cc, err := grpc.Dial(addr, opts...)
1911 if err != nil {
1912 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1913 }
1914
1915 client := configv1.NewZoektConfigurationServiceClient(cc)
1916 return client, nil
1917}
1918
1919// addDefaultPort adds a default port to a URL if one is not specified.
1920//
1921// If the URL scheme is "http" and no port is specified, "80" is used.
1922// If the scheme is "https", "443" is used.
1923//
1924// The original URL is not mutated. A copy is modified and returned.
1925func addDefaultPort(original *url.URL) *url.URL {
1926 if original == nil {
1927 return nil // don't panic
1928 }
1929
1930 if !original.IsAbs() {
1931 return original // don't do anything if the URL doesn't look like a remote URL
1932 }
1933
1934 if original.Scheme == "http" && original.Port() == "" {
1935 u := cloneURL(original)
1936 u.Host = net.JoinHostPort(u.Host, "80")
1937 return u
1938 }
1939
1940 if original.Scheme == "https" && original.Port() == "" {
1941 u := cloneURL(original)
1942 u.Host = net.JoinHostPort(u.Host, "443")
1943 return u
1944 }
1945
1946 return original
1947}
1948
1949// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1950// This is copied from net/http/clone.go
1951func cloneURL(u *url.URL) *url.URL {
1952 if u == nil {
1953 return nil
1954 }
1955 u2 := new(url.URL)
1956 *u2 = *u
1957 if u.User != nil {
1958 u2.User = new(url.Userinfo)
1959 *u2.User = *u.User
1960 }
1961 return u2
1962}
1963
1964func main() {
1965 liblog := sglog.Init(sglog.Resource{
1966 Name: "zoekt-indexserver",
1967 Version: index.Version,
1968 InstanceID: index.HostnameBestEffort(),
1969 })
1970 defer liblog.Sync()
1971
1972 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1973 log.Fatal(err)
1974 }
1975}
1976
1977// getBoolFromEnvironmentVariables returns the boolean defined by the first environment
1978// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set.
1979//
1980// An error is returned of the provided environment variables fails to parse as a boolean.
1981func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) {
1982 for _, envVar := range envVarNames {
1983 v := os.Getenv(envVar)
1984 if v == "" {
1985 continue
1986 }
1987
1988 b, err := strconv.ParseBool(v)
1989 if err != nil {
1990 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err)
1991 }
1992
1993 return b, nil
1994 }
1995
1996 return defaultBool, nil
1997}