fork of https://github.com/sourcegraph/zoekt
1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5// http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12
13// Command zoekt-sourcegraph-indexserver periodically reindexes repositories
14// from a Sourcegraph instance. It uses a "pull-based" design, where it periodically
15// reaches out to the Sourcegraph instance for the list of repositories to reindex.
16package main
17
18import (
19 "bytes"
20 "context"
21 _ "embed"
22 "encoding/json"
23 "errors"
24 "flag"
25 "fmt"
26 "html/template"
27 "io"
28 "io/fs"
29 "log"
30 "math"
31 "math/rand"
32 "net"
33 "net/http"
34 "net/url"
35 "os"
36 "os/exec"
37 "os/signal"
38 "path/filepath"
39 "runtime"
40 "slices"
41 "sort"
42 "strconv"
43 "strings"
44 "sync"
45 "text/tabwriter"
46 "time"
47
48 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
49 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
50 "github.com/peterbourgon/ff/v3/ffcli"
51 "github.com/prometheus/client_golang/prometheus"
52 "github.com/prometheus/client_golang/prometheus/promauto"
53 sglog "github.com/sourcegraph/log"
54 "github.com/sourcegraph/mountinfo"
55
56 "github.com/sourcegraph/zoekt"
57 configv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/sourcegraph/zoekt/configuration/v1"
58 indexserverv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/zoekt/indexserver/v1"
59 "github.com/sourcegraph/zoekt/grpc/defaults"
60 "github.com/sourcegraph/zoekt/grpc/grpcutil"
61 "github.com/sourcegraph/zoekt/grpc/internalerrs"
62 "github.com/sourcegraph/zoekt/grpc/messagesize"
63 "github.com/sourcegraph/zoekt/index"
64 "github.com/sourcegraph/zoekt/internal/debugserver"
65 "github.com/sourcegraph/zoekt/internal/profiler"
66 "github.com/sourcegraph/zoekt/internal/tenant"
67
68 "go.uber.org/automaxprocs/maxprocs"
69 "go.uber.org/multierr"
70 "golang.org/x/net/trace"
71 "golang.org/x/sys/unix"
72 "google.golang.org/grpc"
73 "google.golang.org/grpc/codes"
74 "google.golang.org/grpc/credentials/insecure"
75 "google.golang.org/grpc/metadata"
76)
77
78var (
79 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
80 Name: "resolve_revisions_seconds",
81 Help: "A histogram of latencies for resolving all repository revisions.",
82 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
83 })
84
85 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
86 Name: "resolve_revision_seconds",
87 Help: "A histogram of latencies for resolving a repository revision.",
88 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
89 }, []string{"success"}) // success=true|false
90
91 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
92 Name: "get_index_options_total",
93 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
94 })
95 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
96 Name: "get_index_options_error_total",
97 Help: "The total number of times we failed to get index options for a repository.",
98 })
99
100 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
101 Name: "index_repo_seconds",
102 Help: "A histogram of latencies for indexing a repository.",
103 Buckets: prometheus.ExponentialBucketsRange(
104 (100 * time.Millisecond).Seconds(),
105 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
106 20),
107 }, []string{
108 "state", // state is an indexState
109 "name", // name of the repository that was indexed
110 })
111
112 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{
113 Name: "index_indexing_delay_seconds",
114 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.",
115 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days
116 }, []string{
117 "state", // state is an indexState
118 "name", // the name of the repository that was indexed
119 })
120
121 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
122 Name: "index_fetch_seconds",
123 Help: "A histogram of latencies for fetching a repository.",
124 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
125 }, []string{
126 "success", // true|false
127 "name", // the name of the repository that the commits were fetched from
128 })
129
130 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
131 Name: "index_incremental_index_state",
132 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
133 }, []string{"state"}) // state is index.IndexState
134
135 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
136 Name: "index_num_indexed",
137 Help: "Number of indexed repos by code host",
138 })
139
140 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
141 Name: "index_failing_total",
142 Help: "Counts failures to index (indexing activity, should be used with rate())",
143 })
144
145 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
146 Name: "index_indexing_total",
147 Help: "Counts indexings (indexing activity, should be used with rate())",
148 })
149
150 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
151 Name: "index_num_stopped_tracking_total",
152 Help: "Counts the number of repos we stopped tracking.",
153 })
154
155 // clientMetricsOnce returns a singleton instance of the client metrics
156 // that are shared across all gRPC clients that this process creates.
157 //
158 // This function panics if the metrics cannot be registered with the default
159 // Prometheus registry.
160 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics {
161 clientMetrics := grpcprom.NewClientMetrics(
162 grpcprom.WithClientCounterOptions(),
163 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
164 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
165 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
166 )
167 prometheus.DefaultRegisterer.MustRegister(clientMetrics)
168 return clientMetrics
169 })
170)
171
172// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
173// NOTE: if you change this, you must also update gitIndex to use the same value when fetching the repo.
174const MaxFileSize = 1 << 20
175
176// set of repositories that we want to capture separate indexing metrics for
177var reposWithSeparateIndexingMetrics = make(map[string]struct{})
178
179type indexState string
180
181const (
182 indexStateFail indexState = "fail"
183 indexStateSuccess indexState = "success"
184 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
185 indexStateNoop indexState = "noop" // We didn't need to update index
186 indexStateEmpty indexState = "empty" // index is empty (empty repo)
187)
188
189// Server is the main functionality of zoekt-sourcegraph-indexserver. It
190// exists to conveniently use all the options passed in via func main.
191type Server struct {
192 logger sglog.Logger
193
194 Sourcegraph Sourcegraph
195
196 // rootURL is the root URL of the Sourcegraph instance.
197 rootURL *url.URL
198
199 BatchSize int
200
201 // IndexDir is the index directory to use.
202 IndexDir string
203
204 // IndexConcurrency is the number of repositories we index at once.
205 IndexConcurrency int
206
207 // indexSemaphore limits the number of concurrent index operations
208 indexSemaphore chan struct{}
209
210 // Interval is how often we sync with Sourcegraph.
211 Interval time.Duration
212
213 // CPUCount is the number of CPUs to use for indexing shards.
214 CPUCount int
215
216 queue Queue
217
218 // muIndexDir protects the index directory from concurrent access.
219 muIndexDir indexMutex
220
221 // If true, shard merging is enabled.
222 shardMerging bool
223
224 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
225 // use delta-builds for instead of normal builds
226 deltaBuildRepositoriesAllowList map[string]struct{}
227
228 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
229 // before attempting a delta build.
230 deltaShardNumberFallbackThreshold uint64
231
232 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
233 // we skip calculating symbols metadata for during builds
234 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
235
236 hostname string
237
238 mergeOpts mergeOpts
239
240 // timeout defines how long the index server waits before killing an indexing job.
241 timeout time.Duration
242
243 indexserverv1.UnimplementedSourcegraphIndexserverServiceServer
244}
245
246var (
247 debugLog = log.New(io.Discard, "[DEBUG] ", log.LstdFlags)
248 infoLog = log.New(os.Stderr, "[INFO] ", log.LstdFlags)
249 errorLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags)
250)
251
252// our index commands should output something every 100mb they process.
253//
254// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
255// time." famous last words. A client was indexing a monorepo with 42
256// cores... 5m was not enough.
257const noOutputTimeout = 30 * time.Minute
258
259func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
260 out := &synchronizedBuffer{}
261 cmd.Stdout = out
262 cmd.Stderr = out
263
264 tr.LazyPrintf("%s", cmd.Args)
265
266 defer func() {
267 if err != nil {
268 outS := out.String()
269 tr.LazyPrintf("failed: %v", err)
270 tr.LazyPrintf("output: %s", out)
271 tr.SetError()
272 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
273 }
274 }()
275
276 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
277
278 if err := cmd.Start(); err != nil {
279 return err
280 }
281
282 errC := make(chan error)
283 go func() {
284 errC <- cmd.Wait()
285 }()
286
287 // This channel is set after we have sent sigquit. It allows us to follow up
288 // with a sigkill if the process doesn't quit after sigquit.
289 kill := make(<-chan time.Time)
290
291 lastLen := 0
292 for {
293 select {
294 case <-time.After(noOutputTimeout):
295 // Periodically check if we have had output. If not kill the process.
296 if out.Len() != lastLen {
297 lastLen = out.Len()
298 infoLog.Printf("still running %s", cmd.Args)
299 } else {
300 // Send quit (C-\) first so we get a stack dump.
301 infoLog.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
302 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
303 errorLog.Println("quit failed:", err)
304 }
305
306 // send sigkill if still running in 10s
307 kill = time.After(10 * time.Second)
308 }
309
310 case <-kill:
311 infoLog.Printf("still running, killing %s", cmd.Args)
312 if err := cmd.Process.Kill(); err != nil {
313 errorLog.Println("kill failed:", err)
314 }
315
316 case err := <-errC:
317 if err != nil {
318 return err
319 }
320
321 tr.LazyPrintf("success")
322 return nil
323 }
324 }
325}
326
327// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
328// monitor the buffer while it is being written to.
329type synchronizedBuffer struct {
330 mu sync.Mutex
331 b bytes.Buffer
332}
333
334func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
335 sb.mu.Lock()
336 defer sb.mu.Unlock()
337 return sb.b.Write(p)
338}
339
340func (sb *synchronizedBuffer) Len() int {
341 sb.mu.Lock()
342 defer sb.mu.Unlock()
343 return sb.b.Len()
344}
345
346func (sb *synchronizedBuffer) String() string {
347 sb.mu.Lock()
348 defer sb.mu.Unlock()
349 return sb.b.String()
350}
351
352// pauseFileName if present in IndexDir will stop index jobs from
353// running. This is to make it possible to experiment with the content of the
354// IndexDir without the indexserver writing to it.
355const (
356 pauseFileName = "PAUSE"
357 pauseEnvVar = "SRC_PAUSE_INDEXING"
358)
359
360// isIndexingPaused checks if indexing should be paused based on either:
361// 1. The presence of a PAUSE file in the index directory
362// 2. The SRC_PAUSE_INDEXING environment variable being set to true
363func isIndexingPaused(indexDir string) (bool, string) {
364 // Check for PAUSE file first
365 if b, err := os.ReadFile(filepath.Join(indexDir, pauseFileName)); err == nil {
366 return true, fmt.Sprintf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
367 }
368
369 // Then check environment variable
370 if getEnvWithDefaultBool(pauseEnvVar, false) {
371 return true, fmt.Sprintf("indexserver paused via %s environment variable", pauseEnvVar)
372 }
373
374 return false, ""
375}
376
377// Run the sync loop. This blocks forever.
378func (s *Server) Run() {
379 removeIncompleteShards(s.IndexDir)
380
381 // Start a goroutine which updates the queue with commits to index.
382 go func() {
383 // We update the list of indexed repos every Interval. To speed up manual
384 // testing we also listen for SIGUSR1 to trigger updates.
385 // "pkill -SIGUSR1 zoekt-sourcegra"
386 for range jitterTicker(s.Interval, unix.SIGUSR1) {
387 if paused, msg := isIndexingPaused(s.IndexDir); paused {
388 infoLog.Printf(msg)
389 continue
390 }
391
392 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
393 if err != nil {
394 errorLog.Printf("error listing repos: %s", err)
395 continue
396 }
397
398 debugLog.Printf("updating index queue with %d repositories", len(repos.IDs))
399
400 // Stop indexing repos we don't need to track anymore
401 removed := s.queue.MaybeRemoveMissing(repos.IDs)
402 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
403 if len(removed) > 0 {
404 infoLog.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
405 }
406
407 cleanupDone := make(chan struct{})
408 go func() {
409 defer close(cleanupDone)
410 s.muIndexDir.Global(func() {
411 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
412 })
413 }()
414
415 repos.IterateIndexOptions(s.queue.AddOrUpdate)
416
417 // IterateIndexOptions will only iterate over repositories that have
418 // changed since we last called list. However, we want to add all IDs
419 // back onto the queue just to check that what is on disk is still
420 // correct. This will use the last IndexOptions we stored in the
421 // queue. The repositories not on the queue (missing) need a forced
422 // fetch of IndexOptions.
423 missing := s.queue.Bump(repos.IDs)
424 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
425
426 setShardsCounter(s.IndexDir)
427
428 <-cleanupDone
429 }
430 }()
431
432 go func() {
433 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
434 if s.shardMerging {
435 s.vacuum()
436 }
437 }
438 }()
439
440 go func() {
441 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
442 if s.shardMerging {
443 s.doMerge()
444 }
445 }
446 }()
447
448 for range s.IndexConcurrency {
449 go s.processQueue()
450 }
451
452 // block forever
453 select {}
454}
455
456// formatList returns a comma-separated list of the first min(len(v), m) items.
457func formatListUint32(v []uint32, m int) string {
458 if len(v) < m {
459 m = len(v)
460 }
461
462 sb := strings.Builder{}
463 for i := range m {
464 fmt.Fprintf(&sb, "%d, ", v[i])
465 }
466
467 if len(v) > m {
468 sb.WriteString("...")
469 }
470
471 return strings.TrimRight(sb.String(), ", ")
472}
473
474func (s *Server) processQueue() {
475 for {
476 if paused, _ := isIndexingPaused(s.IndexDir); paused {
477 time.Sleep(time.Second)
478 continue
479 }
480
481 item, ok := s.queue.Pop()
482 if !ok {
483 time.Sleep(time.Second)
484 continue
485 }
486
487 opts := item.Opts
488 args := s.indexArgs(opts)
489
490 ran := s.muIndexDir.With(opts.Name, func() {
491 // only record time taken once we hold the lock. This avoids us
492 // recording time taken while merging/cleanup runs.
493 start := time.Now()
494
495 state, err := s.index(context.Background(), args)
496
497 elapsed := time.Since(start)
498 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
499
500 indexDelay := time.Since(item.DateAddedToQueue)
501 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds())
502
503 if err != nil {
504 errorLog.Printf("error indexing %s: %s", args.String(), err)
505 }
506
507 switch state {
508 case indexStateSuccess:
509 var branches []string
510 for _, b := range args.Branches {
511 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
512 }
513 s.logger.Info("updated index",
514 sglog.Int("tenant", args.TenantID),
515 sglog.String("repo", args.Name),
516 sglog.Uint32("id", args.RepoID),
517 sglog.Strings("branches", branches),
518 sglog.Duration("duration", elapsed),
519 sglog.Duration("index_delay", indexDelay),
520 )
521 case indexStateSuccessMeta:
522 infoLog.Printf("updated meta %s in %v", args.String(), elapsed)
523 }
524 s.queue.SetIndexed(opts, state)
525 })
526
527 if !ran {
528 // Someone else is processing the repository. We can just skip this job
529 // since the repository will be added back to the queue and we will
530 // converge to the correct behaviour.
531 debugLog.Printf("index job for repository already running: %s", args)
532 continue
533 }
534 }
535}
536
537// repoNameForMetric returns a normalized version of the given repository name that is
538// suitable for use with Prometheus metrics.
539func repoNameForMetric(repo string) string {
540 // Check to see if we want to be able to capture separate indexing metrics for this repository.
541 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
542 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
543 return repo
544 }
545
546 return ""
547}
548
549func batched(slice []uint32, size int) <-chan []uint32 {
550 c := make(chan []uint32)
551 go func() {
552 for len(slice) > 0 {
553 if size > len(slice) {
554 size = len(slice)
555 }
556 c <- slice[:size]
557 slice = slice[size:]
558 }
559 close(c)
560 }()
561 return c
562}
563
564// jitterTicker returns a ticker which ticks with a jitter. Each tick is
565// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
566//
567// sig is a list of signals which also cause the ticker to fire. This is a
568// convenience to allow manually triggering of the ticker.
569func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
570 ticker := make(chan struct{})
571
572 go func() {
573 for {
574 ticker <- struct{}{}
575 ns := int64(d)
576 jitter := rand.Int63n(ns)
577 time.Sleep(time.Duration(ns/2 + jitter))
578 }
579 }()
580
581 go func() {
582 if len(sig) == 0 {
583 return
584 }
585
586 c := make(chan os.Signal, 1)
587 signal.Notify(c, sig...)
588 for range c {
589 ticker <- struct{}{}
590 }
591 }()
592
593 return ticker
594}
595
596// Index starts an index job for repo name at commit.
597func (s *Server) index(ctx context.Context, args *indexArgs) (state indexState, err error) {
598 tr := trace.New("index", args.Name)
599 tr.SetMaxEvents(30) // Ensure we capture all indexing events
600
601 defer func() {
602 if err != nil {
603 tr.SetError()
604 tr.LazyPrintf("error: %v", err)
605 state = indexStateFail
606 metricFailingTotal.Inc()
607 }
608 tr.LazyPrintf("state: %s", state)
609 tr.Finish()
610 }()
611
612 // Sourcegraph should always provide a tenant ID.
613 if args.TenantID < 1 {
614 return indexStateFail, tenant.ErrMissingTenant
615 }
616
617 tr.LazyPrintf("branches: %v", args.Branches)
618
619 if len(args.Branches) == 0 {
620 return indexStateEmpty, createEmptyShard(args)
621 }
622
623 repositoryName := args.Name
624 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
625 tr.LazyPrintf("marking this repository for delta build")
626 args.UseDelta = true
627 }
628
629 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
630
631 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
632 tr.LazyPrintf("skipping symbols calculation")
633 args.Symbols = false
634 }
635
636 reason := "forced"
637
638 if args.Incremental {
639 bo := args.BuildOptions()
640 bo.SetDefaults()
641 incrementalState, fn := bo.IndexState()
642 reason = string(incrementalState)
643 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
644
645 switch incrementalState {
646 case index.IndexStateEqual:
647 debugLog.Printf("%s index already up to date. Shard=%s", args.String(), fn)
648 return indexStateNoop, nil
649
650 case index.IndexStateMeta:
651 infoLog.Printf("updating index.meta %s", args.String())
652
653 // TODO(stefan) handle mergeMeta for tenant id.
654 if err := mergeMeta(bo); err != nil {
655 errorLog.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
656 } else {
657 return indexStateSuccessMeta, nil
658 }
659
660 case index.IndexStateCorrupt:
661 infoLog.Printf("falling back to full update: corrupt index: %s", args.String())
662 }
663 }
664
665 infoLog.Printf("updating index %s reason=%s", args.String(), reason)
666
667 metricIndexingTotal.Inc()
668 c := gitIndexConfig{
669 runCmd: func(cmd *exec.Cmd) error {
670 return s.loggedRun(tr, cmd)
671 },
672
673 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
674 return args.BuildOptions().FindRepositoryMetadata()
675 },
676 timeout: s.timeout,
677 }
678
679 err = gitIndex(ctx, c, args, s.Sourcegraph, s.logger)
680 if err != nil {
681 return indexStateFail, err
682 }
683
684 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
685 s.logger.Error("failed to update index status",
686 sglog.String("repo", args.Name),
687 sglog.Uint32("id", args.RepoID),
688 sglogBranches("branches", args.Branches),
689 sglog.Error(err),
690 )
691 }
692
693 return indexStateSuccess, nil
694}
695
696// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
697// it can update the zoekt_repos table.
698func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
699 // We need to read from disk for IndexTime.
700 _, metadata, ok, err := c.findRepositoryMetadata(args)
701 if err != nil {
702 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
703 }
704 if !ok {
705 return errors.New("failed to find metadata for new/updated index")
706 }
707
708 status := []indexStatus{{
709 RepoID: args.RepoID,
710 Branches: args.Branches,
711 IndexTimeUnix: metadata.IndexTime.Unix(),
712 }}
713 if err := sg.UpdateIndexStatus(status); err != nil {
714 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
715 }
716
717 return nil
718}
719
720func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
721 ss := make([]string, len(branches))
722 for i, b := range branches {
723 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
724 }
725 return sglog.Strings(key, ss)
726}
727
728func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
729 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
730 return &indexArgs{
731 IndexOptions: opts,
732 IndexDir: s.IndexDir,
733 Parallelism: parallelism,
734 Incremental: true,
735 FileLimit: MaxFileSize,
736 ShardMerging: s.shardMerging,
737 }
738}
739
740// parallelism consults both the server flags and index options to determine the number
741// of shards to index in parallel. If the CPUCount index option is provided, it always
742// overrides the server flag.
743func (s *Server) parallelism(opts IndexOptions, maxProcs int) int {
744 var parallelism int
745 if opts.ShardConcurrency > 0 {
746 parallelism = int(opts.ShardConcurrency)
747 } else {
748 parallelism = s.CPUCount
749 }
750
751 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs
752 if parallelism > 4*maxProcs {
753 parallelism = 4 * maxProcs
754 }
755
756 // If index concurrency is set, then divide the parallelism by the number of
757 // repos we're indexing in parallel
758 if s.IndexConcurrency > 1 {
759 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency)))
760 }
761
762 return parallelism
763}
764
765func createEmptyShard(args *indexArgs) error {
766 bo := args.BuildOptions()
767 bo.SetDefaults()
768 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
769
770 if args.Incremental && bo.IncrementalSkipIndexing() {
771 return nil
772 }
773
774 builder, err := index.NewBuilder(*bo)
775 if err != nil {
776 return err
777 }
778 return builder.Finish()
779}
780
781// addDebugHandlers adds handlers specific to indexserver.
782func (s *Server) addDebugHandlers(mux *http.ServeMux) {
783 // Sourcegraph's site admin view requires indexserver to serve it's admin view
784 // on "/".
785 mux.Handle("/", http.HandlerFunc(s.handleRoot))
786
787 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
788 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
789 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
790 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
791 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
792 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
793}
794
795func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
796 if r.Method != "GET" {
797 w.Header().Set("Allow", "GET")
798 w.WriteHeader(http.StatusMethodNotAllowed)
799 return
800 }
801
802 response := struct {
803 Hostname string
804 }{
805 Hostname: s.hostname,
806 }
807
808 b, err := json.Marshal(response)
809 if err != nil {
810 http.Error(w, err.Error(), http.StatusInternalServerError)
811 return
812 }
813
814 w.Header().Set("Content-Type", "application/json; charset=utf-8")
815 w.Write(b)
816}
817
818var rootTmpl = template.Must(template.New("name").Parse(`
819<html>
820 <body>
821 <a href="debug">Debug</a><br />
822 <a href="debug/requests">Traces</a><br />
823 {{.IndexMsg}}<br />
824 <br />
825 <h3>Reindex</h3>
826 {{if .Repos}}
827 <a href="?show_repos=false">hide repos</a><br />
828 <table style="margin-top: 20px">
829 <th style="text-align:left">Name</th>
830 <th style="text-align:left">ID (click to reindex)</th>
831 {{range .Repos}}
832 <tr>
833 <td>{{.Name}}</td>
834 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
835 </tr>
836 {{end}}
837 </table>
838 {{else}}
839 <a href="?show_repos=true">show repos</a><br />
840 {{end}}
841 </body>
842</html>
843`))
844
845func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
846 if r.Method != "GET" {
847 w.Header().Set("Allow", "GET")
848 w.WriteHeader(http.StatusMethodNotAllowed)
849 return
850 }
851
852 values := r.URL.Query()
853
854 // ?id=
855 indexMsg := ""
856 if v := values.Get("id"); v != "" {
857 id, err := strconv.Atoi(v)
858 if err != nil {
859 http.Error(w, err.Error(), http.StatusBadRequest)
860 return
861 }
862 indexMsg, _ = s.forceIndex(r.Context(), uint32(id))
863 }
864
865 // ?show_repos=
866 showRepos := false
867 if v := values.Get("show_repos"); v != "" {
868 showRepos, _ = strconv.ParseBool(v)
869 }
870
871 type Repo struct {
872 ID uint32
873 Name string
874 }
875 var data struct {
876 Repos []Repo
877 IndexMsg string
878 }
879
880 data.IndexMsg = indexMsg
881
882 if showRepos {
883 s.queue.Iterate(func(opts *IndexOptions) {
884 data.Repos = append(data.Repos, Repo{
885 ID: opts.RepoID,
886 Name: opts.Name,
887 })
888 })
889 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
890 }
891
892 _ = rootTmpl.Execute(w, data)
893}
894
895// handleReindex triggers a reindex asynocronously. If a reindex was triggered
896// the request returns with status 202. The caller can infer the new state of
897// the index by calling List.
898func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
899 if r.Method != http.MethodPost {
900 w.Header().Set("Allow", http.MethodPost)
901 w.WriteHeader(http.StatusMethodNotAllowed)
902 return
903 }
904
905 err := r.ParseForm()
906 if err != nil {
907 http.Error(w, err.Error(), http.StatusBadRequest)
908 return
909 }
910
911 id, err := strconv.Atoi(r.Form.Get("repo"))
912 if err != nil {
913 http.Error(w, err.Error(), http.StatusBadRequest)
914 return
915 }
916
917 go func() { s.forceIndex(r.Context(), uint32(id)) }()
918
919 // 202 Accepted
920 w.WriteHeader(http.StatusAccepted)
921}
922
923func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
924 withIndexed := true
925 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
926 withIndexed = b
927 }
928
929 var indexed []uint32
930 if withIndexed {
931 indexed = listIndexed(s.IndexDir)
932 }
933
934 repos, err := s.Sourcegraph.List(r.Context(), indexed)
935 if err != nil {
936 http.Error(w, err.Error(), http.StatusInternalServerError)
937 return
938 }
939
940 bw := bytes.Buffer{}
941
942 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
943
944 _, err = fmt.Fprintf(tw, "ID\tName\n")
945 if err != nil {
946 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
947 return
948 }
949
950 s.queue.mu.Lock()
951 name := ""
952 for _, id := range repos.IDs {
953 if item := s.queue.get(id); item != nil {
954 name = item.opts.Name
955 } else {
956 name = ""
957 }
958 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
959 if err != nil {
960 debugLog.Printf("handleDebugList: %s\n", err.Error())
961 }
962 }
963 s.queue.mu.Unlock()
964
965 if err != nil {
966 http.Error(w, err.Error(), http.StatusInternalServerError)
967 return
968 }
969
970 err = tw.Flush()
971 if err != nil {
972 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
973 return
974 }
975
976 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
977
978 if _, err := io.Copy(w, &bw); err != nil {
979 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
980 return
981 }
982}
983
984// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
985// can run this command during periods of low usage (evenings, weekends) to
986// trigger an initial merge run. In the steady-state, merges happen rarely, even
987// on busy instances, and users can rely on automatic merging instead.
988func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
989 // A merge operation can take very long, depending on the number merges and the
990 // target size of the compound shards. We run the merge in the background and
991 // return immediately to the user.
992 //
993 // We track the status of the merge with metricShardMergingRunning.
994 go func() {
995 s.doMerge()
996 }()
997 _, _ = w.Write([]byte("merging enqueued\n"))
998}
999
1000func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
1001 indexed := listIndexed(s.IndexDir)
1002
1003 bw := bytes.Buffer{}
1004
1005 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
1006
1007 _, err := fmt.Fprintf(tw, "ID\tName\n")
1008 if err != nil {
1009 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
1010 return
1011 }
1012
1013 s.queue.mu.Lock()
1014 name := ""
1015 for _, id := range indexed {
1016 if item := s.queue.get(id); item != nil {
1017 name = item.opts.Name
1018 } else {
1019 name = ""
1020 }
1021 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
1022 if err != nil {
1023 debugLog.Printf("handleDebugIndexed: %s\n", err.Error())
1024 }
1025 }
1026 s.queue.mu.Unlock()
1027
1028 if err != nil {
1029 http.Error(w, err.Error(), http.StatusInternalServerError)
1030 return
1031 }
1032
1033 err = tw.Flush()
1034 if err != nil {
1035 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
1036 return
1037 }
1038
1039 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
1040
1041 if _, err := io.Copy(w, &bw); err != nil {
1042 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
1043 return
1044 }
1045}
1046
1047// forceIndex will run the index job for repo name now. It will return always
1048// return a string explaining what it did, even if it failed.
1049func (s *Server) forceIndex(ctx context.Context, id uint32) (string, error) {
1050 var opts IndexOptions
1051 var err error
1052 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
1053 opts = o
1054 }, func(_ uint32, e error) {
1055 err = e
1056 }, id)
1057 if err != nil {
1058 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
1059 }
1060
1061 args := s.indexArgs(opts)
1062 args.Incremental = false // force re-index
1063
1064 var state indexState
1065 ran := s.muIndexDir.With(opts.Name, func() {
1066 state, err = s.index(ctx, args)
1067 })
1068 if !ran {
1069 return fmt.Sprintf("index job for repository already running: %s", args), nil
1070 }
1071 if err != nil {
1072 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
1073 }
1074 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
1075}
1076
1077// DeleteAllData deletes all shards in the index and trash dir belonging to the
1078// tenant associated with the request. The deletion is best-effort, which means
1079// we will delete as much as possible. If no error is returned, the caller can
1080// be certain that all data has been deleted.
1081func (s *Server) DeleteAllData(ctx context.Context, _ *indexserverv1.DeleteAllDataRequest) (*indexserverv1.DeleteAllDataResponse, error) {
1082 tnt, err := tenant.FromContext(ctx)
1083 if err != nil {
1084 return nil, err
1085 }
1086 s.logger.Warn("DeleteAllData", sglog.Int("tenant_id", tnt.ID()))
1087
1088 var merr error
1089 s.muIndexDir.Global(func() {
1090 // First, explode all compound shards that have repos from the tenant in
1091 // question. Because we hold the global lock, we can be sure that no new
1092 // merges start while we do this.
1093 if err := s.explodeTenantCompoundShards(ctx, func(path string) error {
1094 // We call explode in a separate process to protect indexserver.
1095 cmd := defaultExplodeCmd(path)
1096
1097 stdoutBuf := &bytes.Buffer{}
1098 stderrBuf := &bytes.Buffer{}
1099 cmd.Stdout = stdoutBuf
1100 cmd.Stderr = stderrBuf
1101
1102 err := cmd.Run()
1103 if err != nil {
1104 errorLog.Printf("explode failed: %v (stderr: %s)", err, stderrBuf.String())
1105 return err
1106 }
1107
1108 infoLog.Printf("exploded shard: %s", stdoutBuf.String())
1109
1110 return nil
1111 }); err != nil {
1112 merr = multierr.Append(merr, err)
1113 }
1114
1115 // Invariant: all shards from the tenant are simple shards.
1116
1117 if err := purgeTenantShards(ctx, s.IndexDir); err != nil {
1118 merr = multierr.Append(merr, err)
1119 }
1120 if err := purgeTenantShards(ctx, filepath.Join(s.IndexDir, ".trash")); err != nil {
1121 merr = multierr.Append(merr, err)
1122 }
1123 })
1124
1125 return &indexserverv1.DeleteAllDataResponse{}, merr
1126}
1127
1128func listIndexed(indexDir string) []uint32 {
1129 index := getShards(indexDir)
1130 metricNumIndexed.Set(float64(len(index)))
1131 repoIDs := make([]uint32, 0, len(index))
1132 for id := range index {
1133 repoIDs = append(repoIDs, id)
1134 }
1135 slices.Sort(repoIDs)
1136 return repoIDs
1137}
1138
1139// setupTmpDir sets up a temporary directory on the same volume as the
1140// indexes.
1141//
1142// If main is true we will delete older temp directories left around. main is
1143// false when this is a debug command.
1144func setupTmpDir(logger sglog.Logger, main bool, index string) error {
1145 // change the target tmp directory depending on if it's our main daemon or a
1146 // debug sub command.
1147 dir := ".indexserver.debug.tmp"
1148 if main {
1149 dir = ".indexserver.tmp"
1150 }
1151
1152 tmpRoot := filepath.Join(index, dir)
1153
1154 if main {
1155 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot))
1156 err := os.RemoveAll(tmpRoot)
1157 if err != nil {
1158 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err))
1159 }
1160 }
1161
1162 if err := os.MkdirAll(tmpRoot, 0o755); err != nil {
1163 return err
1164 }
1165
1166 return os.Setenv("TMPDIR", tmpRoot)
1167}
1168
1169func printMetaData(fn string) error {
1170 repo, indexMeta, err := index.ReadMetadataPath(fn)
1171 if err != nil {
1172 return err
1173 }
1174
1175 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1176 if err != nil {
1177 return err
1178 }
1179
1180 err = json.NewEncoder(os.Stdout).Encode(repo)
1181 if err != nil {
1182 return err
1183 }
1184 return nil
1185}
1186
1187func printShardStats(fn string) error {
1188 f, err := os.Open(fn)
1189 if err != nil {
1190 return err
1191 }
1192
1193 iFile, err := index.NewIndexFile(f)
1194 if err != nil {
1195 return err
1196 }
1197
1198 return index.PrintNgramStats(iFile)
1199}
1200
1201func srcLogLevelIsDebug() bool {
1202 lvl := os.Getenv(sglog.EnvLogLevel)
1203 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1204}
1205
1206func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1207 v := os.Getenv(k)
1208 if v == "" {
1209 return defaultVal
1210 }
1211 b, err := strconv.ParseBool(v)
1212 if err != nil {
1213 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1214 }
1215 return b
1216}
1217
1218func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1219 v := os.Getenv(k)
1220 if v == "" {
1221 return defaultVal
1222 }
1223 i, err := strconv.ParseInt(v, 10, 64)
1224 if err != nil {
1225 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1226 }
1227 return i
1228}
1229
1230func getEnvWithDefaultInt(k string, defaultVal int) int {
1231 v := os.Getenv(k)
1232 if v == "" {
1233 return defaultVal
1234 }
1235 i, err := strconv.Atoi(k)
1236 if err != nil {
1237 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1238 }
1239 return i
1240}
1241
1242func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1243 v := os.Getenv(k)
1244 if v == "" {
1245 return defaultVal
1246 }
1247 i, err := strconv.ParseUint(v, 10, 64)
1248 if err != nil {
1249 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1250 }
1251 return i
1252}
1253
1254func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1255 v := os.Getenv(k)
1256 if v == "" {
1257 return defaultVal
1258 }
1259 f, err := strconv.ParseFloat(v, 64)
1260 if err != nil {
1261 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1262 }
1263 return f
1264}
1265
1266func getEnvWithDefaultString(k string, defaultVal string) string {
1267 v := os.Getenv(k)
1268 if v == "" {
1269 return defaultVal
1270 }
1271 return v
1272}
1273
1274func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1275 v := os.Getenv(k)
1276 if v == "" {
1277 return defaultVal
1278 }
1279
1280 d, err := time.ParseDuration(v)
1281 if err != nil {
1282 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1283 }
1284 return d
1285}
1286
1287func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1288 set := map[string]struct{}{}
1289 for _, v := range strings.Split(os.Getenv(k), ",") {
1290 v = strings.TrimSpace(v)
1291 if v != "" {
1292 set[v] = struct{}{}
1293 }
1294 }
1295 return set
1296}
1297
1298func joinStringSet(set map[string]struct{}, sep string) string {
1299 var xs []string
1300 for x := range set {
1301 xs = append(xs, x)
1302 }
1303
1304 return strings.Join(xs, sep)
1305}
1306
1307func setShardsCounter(indexDir string) {
1308 fns, err := filepath.Glob(filepath.Join(indexDir, "*.zoekt"))
1309 if err != nil {
1310 errorLog.Printf("setShardsCounter: %s\n", err)
1311 return
1312 }
1313 metricNumberShards.Set(float64(len(fns)))
1314
1315 compoundFns := make([]string, 0, len(fns))
1316 for _, fn := range fns {
1317 if strings.HasPrefix(filepath.Base(fn), "compound-") {
1318 compoundFns = append(compoundFns, fn)
1319 }
1320 }
1321 metricNumberCompoundShards.Set(float64(len(compoundFns)))
1322}
1323
1324func rootCmd() *ffcli.Command {
1325 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1326 conf := rootConfig{
1327 Main: true,
1328 }
1329 conf.registerRootFlags(rootFs)
1330
1331 return &ffcli.Command{
1332 FlagSet: rootFs,
1333 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1334 Subcommands: []*ffcli.Command{debugCmd()},
1335 Exec: func(ctx context.Context, args []string) error {
1336 return startServer(conf)
1337 },
1338 }
1339}
1340
1341type rootConfig struct {
1342 // Main is true if this rootConfig is for our main long running command (the
1343 // indexserver). Debug commands should not set this value. This is used to
1344 // determine if we need to run tmpfriend.
1345 Main bool
1346
1347 root string
1348 interval time.Duration
1349 index string
1350 indexConcurrency int64
1351 listen string
1352 hostname string
1353 cpuFraction float64
1354
1355 // config values related to shard merging
1356 disableShardMerging bool
1357 vacuumInterval time.Duration
1358 mergeInterval time.Duration
1359 targetSize int64
1360 minSize int64
1361 minAgeDays int
1362
1363 // config values related to backoff indexing repos with one or more consecutive failures
1364 backoffDuration time.Duration
1365 maxBackoffDuration time.Duration
1366}
1367
1368func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1369 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1370 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1371 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
1372 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", index.DefaultDir), "set index directory to use")
1373 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1374 fs.StringVar(&rc.hostname, "hostname", index.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1375 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1376 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1377 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1378
1379 // flags related to shard merging
1380 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging")
1381 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1382 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1383 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB")
1384 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB")
1385 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1386}
1387
1388func startServer(conf rootConfig) error {
1389 s, err := newServer(conf)
1390 if err != nil {
1391 return err
1392 }
1393
1394 profiler.Init("zoekt-sourcegraph-indexserver")
1395 setShardsCounter(s.IndexDir)
1396
1397 if conf.listen != "" {
1398
1399 mux := http.NewServeMux()
1400 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1401 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1402 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1403 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1404 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1405 }...)
1406 s.addDebugHandlers(mux)
1407
1408 go func() {
1409 debugLog.Printf("serving HTTP on %s", conf.listen)
1410 mux := grpcutil.MultiplexGRPC(newGRPCServer(sglog.Scoped("indexserver"), s), mux)
1411 log.Fatal(http.ListenAndServe(conf.listen, mux))
1412 }()
1413
1414 // Serve mux on a unix domain socket on a best-effort-basis so that
1415 // webserver can call the endpoints via the shared filesystem.
1416 //
1417 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1418 // on the socket due to permission errors. See
1419 // https://github.com/docker/for-mac/issues/6239
1420 go func() {
1421 serveHTTPOverSocket := func() error {
1422 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1423 // We cannot bind a socket to an existing pathname.
1424 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1425 return fmt.Errorf("error removing socket file: %s", socket)
1426 }
1427 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1428 // unixpacket).
1429 l, err := net.Listen("unix", socket)
1430 if err != nil {
1431 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1432 }
1433 // Indexserver (root) and webserver (Sourcegraph) run with
1434 // different users. Per default, the socket is created with
1435 // permission 755 (root root), which doesn't let webserver write to
1436 // it.
1437 //
1438 // See https://github.com/golang/go/issues/11822 for more context.
1439 if err := os.Chmod(socket, 0o777); err != nil {
1440 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1441 }
1442 debugLog.Printf("serving HTTP on %s", socket)
1443 return http.Serve(l, mux)
1444 }
1445 debugLog.Print(serveHTTPOverSocket())
1446 }()
1447 }
1448
1449 oc := &ownerChecker{
1450 Path: filepath.Join(conf.index, "owner.txt"),
1451 Hostname: conf.hostname,
1452 }
1453 go oc.Run()
1454
1455 logger := sglog.Scoped("metricsRegistration")
1456 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1457
1458 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1459 prometheus.DefaultRegisterer.MustRegister(c)
1460
1461 s.Run()
1462 return nil
1463}
1464
1465func newServer(conf rootConfig) (*Server, error) {
1466 logger := sglog.Scoped("server")
1467
1468 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1469 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1470 }
1471 if conf.index == "" {
1472 return nil, fmt.Errorf("must set -index")
1473 }
1474 if conf.root == "" {
1475 return nil, fmt.Errorf("must set -sourcegraph_url")
1476 }
1477 rootURL, err := url.Parse(conf.root)
1478 if err != nil {
1479 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1480 }
1481
1482 rootURL = addDefaultPort(rootURL)
1483
1484 // Tune GOMAXPROCS to match Linux container CPU quota.
1485 _, _ = maxprocs.Set()
1486
1487 // Automatically prepend our own path at the front, to minimize
1488 // required configuration.
1489 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1490 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1491 }
1492
1493 if _, err := os.Stat(conf.index); err != nil {
1494 if err := os.MkdirAll(conf.index, 0o755); err != nil {
1495 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1496 }
1497 }
1498
1499 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil {
1500 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1501 }
1502
1503 if srcLogLevelIsDebug() {
1504 debugLog.SetOutput(os.Stderr)
1505 }
1506
1507 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1508 if len(reposWithSeparateIndexingMetrics) > 0 {
1509 debugLog.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1510 }
1511
1512 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1513 if len(deltaBuildRepositoriesAllowList) > 0 {
1514 debugLog.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1515 }
1516
1517 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1518 if deltaShardNumberFallbackThreshold > 0 {
1519 debugLog.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1520 } else {
1521 debugLog.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1522 }
1523
1524 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1525 if len(reposShouldSkipSymbolsCalculation) > 0 {
1526 debugLog.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1527 }
1528
1529 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout)
1530 if indexingTimeout != defaultIndexingTimeout {
1531 debugLog.Printf("using configured indexing timeout: %s", indexingTimeout)
1532 }
1533
1534 var sg Sourcegraph
1535 if rootURL.IsAbs() {
1536 var batchSize int
1537 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1538 batchSize, err = strconv.Atoi(v)
1539 if err != nil {
1540 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1541 }
1542 }
1543
1544 opts := []SourcegraphClientOption{
1545 WithBatchSize(batchSize),
1546 }
1547
1548 logger := sglog.Scoped("zoektConfigurationGRPCClient")
1549 client, err := dialGRPCClient(rootURL.Host, logger)
1550 if err != nil {
1551 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1552 }
1553
1554 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...)
1555
1556 } else {
1557 sg = sourcegraphFake{
1558 RootDir: rootURL.String(),
1559 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1560 }
1561 }
1562
1563 cpuCount := max(int(math.Round(float64(runtime.GOMAXPROCS(0))*(conf.cpuFraction))), 1)
1564
1565 if conf.indexConcurrency < 1 {
1566 conf.indexConcurrency = 1
1567 } else if conf.indexConcurrency > int64(cpuCount) {
1568 conf.indexConcurrency = int64(cpuCount)
1569 }
1570
1571 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1572
1573 return &Server{
1574 logger: logger,
1575 rootURL: rootURL,
1576 Sourcegraph: sg,
1577 IndexDir: conf.index,
1578 IndexConcurrency: int(conf.indexConcurrency),
1579 Interval: conf.interval,
1580 CPUCount: cpuCount,
1581 queue: *q,
1582 shardMerging: !conf.disableShardMerging,
1583 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1584 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1585 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1586 hostname: conf.hostname,
1587 mergeOpts: mergeOpts{
1588 vacuumInterval: conf.vacuumInterval,
1589 mergeInterval: conf.mergeInterval,
1590 targetSizeBytes: conf.targetSize * 1024 * 1024,
1591 minSizeBytes: conf.minSize * 1024 * 1024,
1592 minAgeDays: conf.minAgeDays,
1593 },
1594 timeout: indexingTimeout,
1595 indexSemaphore: make(chan struct{}, int(conf.indexConcurrency)),
1596 }, err
1597}
1598
1599func newGRPCServer(logger sglog.Logger, s *Server, additionalOpts ...grpc.ServerOption) *grpc.Server {
1600 grpcServer := defaults.NewServer(logger, additionalOpts...)
1601 indexserverv1.RegisterSourcegraphIndexserverServiceServer(grpcServer, s)
1602 return grpcServer
1603}
1604
1605// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1606// for the indexed-search-configuration gRPC service.
1607//
1608// The default backoff strategy is modeled after the default settings used by
1609// retryablehttp.DefaultClient.
1610//
1611// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1612// - Unavailable
1613// - Aborted
1614//
1615//go:embed default_grpc_service_configuration.json
1616var defaultGRPCServiceConfigurationJSON string
1617
1618func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1619 return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1620 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1621 return invoker(ctx, method, req, reply, cc, opts...)
1622 }
1623}
1624
1625func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1626 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1627 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1628 return streamer(ctx, desc, cc, method, opts...)
1629 }
1630}
1631
1632// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1633// This can be overridden by providing custom Server/Dial options.
1634const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1635
1636func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (configv1.ZoektConfigurationServiceClient, error) {
1637 metrics := clientMetricsOnce()
1638
1639 // If the service seems to be unavailable, this
1640 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1
1641 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s])
1642 retryOpts := []retry.CallOption{
1643 retry.WithMax(5),
1644 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)),
1645 retry.WithCodes(codes.Unavailable),
1646 }
1647
1648 opts := []grpc.DialOption{
1649 grpc.WithTransportCredentials(insecure.NewCredentials()),
1650 grpc.WithChainStreamInterceptor(
1651 metrics.StreamClientInterceptor(),
1652 messagesize.StreamClientInterceptor,
1653 internalActorStreamInterceptor(),
1654 internalerrs.LoggingStreamClientInterceptor(logger),
1655 internalerrs.PrometheusStreamClientInterceptor,
1656 retry.StreamClientInterceptor(retryOpts...),
1657 ),
1658 grpc.WithChainUnaryInterceptor(
1659 metrics.UnaryClientInterceptor(),
1660 messagesize.UnaryClientInterceptor,
1661 internalActorUnaryInterceptor(),
1662 internalerrs.LoggingUnaryClientInterceptor(logger),
1663 internalerrs.PrometheusUnaryClientInterceptor,
1664 retry.UnaryClientInterceptor(retryOpts...),
1665 ),
1666 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1667 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1668 }
1669
1670 opts = append(opts, additionalOpts...)
1671
1672 // Ensure that the message size options are set last, so they override any other
1673 // client-specific options that tweak the message size.
1674 //
1675 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1676 // take precedence over everything else with a uniform size setting that's easy to reason about.
1677 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1678
1679 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1680 // This is done lazily, so we can provide the client to use regardless of
1681 // whether we enabled gRPC or not initially.
1682 cc, err := grpc.Dial(addr, opts...)
1683 if err != nil {
1684 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1685 }
1686
1687 client := configv1.NewZoektConfigurationServiceClient(cc)
1688 return client, nil
1689}
1690
1691// addDefaultPort adds a default port to a URL if one is not specified.
1692//
1693// If the URL scheme is "http" and no port is specified, "80" is used.
1694// If the scheme is "https", "443" is used.
1695//
1696// The original URL is not mutated. A copy is modified and returned.
1697func addDefaultPort(original *url.URL) *url.URL {
1698 if original == nil {
1699 return nil // don't panic
1700 }
1701
1702 if !original.IsAbs() {
1703 return original // don't do anything if the URL doesn't look like a remote URL
1704 }
1705
1706 if original.Scheme == "http" && original.Port() == "" {
1707 u := cloneURL(original)
1708 u.Host = net.JoinHostPort(u.Host, "80")
1709 return u
1710 }
1711
1712 if original.Scheme == "https" && original.Port() == "" {
1713 u := cloneURL(original)
1714 u.Host = net.JoinHostPort(u.Host, "443")
1715 return u
1716 }
1717
1718 return original
1719}
1720
1721// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1722// This is copied from net/http/clone.go
1723func cloneURL(u *url.URL) *url.URL {
1724 if u == nil {
1725 return nil
1726 }
1727 u2 := new(url.URL)
1728 *u2 = *u
1729 if u.User != nil {
1730 u2.User = new(url.Userinfo)
1731 *u2.User = *u.User
1732 }
1733 return u2
1734}
1735
1736func main() {
1737 liblog := sglog.Init(sglog.Resource{
1738 Name: "zoekt-indexserver",
1739 Version: index.Version,
1740 InstanceID: index.HostnameBestEffort(),
1741 })
1742 defer liblog.Sync()
1743
1744 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1745 log.Fatal(err)
1746 }
1747}
1748
1749// getBoolFromEnvironmentVariables returns the boolean defined by the first environment
1750// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set.
1751//
1752// An error is returned of the provided environment variables fails to parse as a boolean.
1753func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) {
1754 for _, envVar := range envVarNames {
1755 v := os.Getenv(envVar)
1756 if v == "" {
1757 continue
1758 }
1759
1760 b, err := strconv.ParseBool(v)
1761 if err != nil {
1762 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err)
1763 }
1764
1765 return b, nil
1766 }
1767
1768 return defaultBool, nil
1769}