fork of https://github.com/sourcegraph/zoekt
1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5// http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12
13// Command zoekt-sourcegraph-indexserver periodically reindexes repositories
14// from a Sourcegraph instance. It uses a "pull-based" design, where it periodically
15// reaches out to the Sourcegraph instance for the list of repositories to reindex.
16package main
17
18import (
19 "bytes"
20 "context"
21 _ "embed"
22 "encoding/json"
23 "errors"
24 "flag"
25 "fmt"
26 "html/template"
27 "io"
28 "io/fs"
29 "log"
30 "math"
31 "math/rand"
32 "net"
33 "net/http"
34 "net/url"
35 "os"
36 "os/exec"
37 "os/signal"
38 "path/filepath"
39 "runtime"
40 "slices"
41 "sort"
42 "strconv"
43 "strings"
44 "sync"
45 "text/tabwriter"
46 "time"
47 "unicode/utf8"
48
49 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
50 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
51 "github.com/peterbourgon/ff/v3/ffcli"
52 "github.com/prometheus/client_golang/prometheus"
53 "github.com/prometheus/client_golang/prometheus/promauto"
54 sglog "github.com/sourcegraph/log"
55 "github.com/sourcegraph/mountinfo"
56
57 "github.com/sourcegraph/zoekt"
58 configv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/sourcegraph/zoekt/configuration/v1"
59 indexserverv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/zoekt/indexserver/v1"
60 "github.com/sourcegraph/zoekt/grpc/defaults"
61 "github.com/sourcegraph/zoekt/grpc/grpcutil"
62 "github.com/sourcegraph/zoekt/grpc/internalerrs"
63 "github.com/sourcegraph/zoekt/grpc/messagesize"
64 "github.com/sourcegraph/zoekt/index"
65 "github.com/sourcegraph/zoekt/internal/debugserver"
66 "github.com/sourcegraph/zoekt/internal/profiler"
67 "github.com/sourcegraph/zoekt/internal/tenant"
68
69 "go.uber.org/automaxprocs/maxprocs"
70 "go.uber.org/multierr"
71 "golang.org/x/net/trace"
72 "golang.org/x/sys/unix"
73 "google.golang.org/grpc"
74 "google.golang.org/grpc/codes"
75 "google.golang.org/grpc/credentials/insecure"
76 "google.golang.org/grpc/metadata"
77)
78
79var (
80 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
81 Name: "resolve_revisions_seconds",
82 Help: "A histogram of latencies for resolving all repository revisions.",
83 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
84 })
85
86 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
87 Name: "resolve_revision_seconds",
88 Help: "A histogram of latencies for resolving a repository revision.",
89 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
90 }, []string{"success"}) // success=true|false
91
92 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
93 Name: "get_index_options_total",
94 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
95 })
96 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
97 Name: "get_index_options_error_total",
98 Help: "The total number of times we failed to get index options for a repository.",
99 })
100
101 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
102 Name: "index_repo_seconds",
103 Help: "A histogram of latencies for indexing a repository.",
104 Buckets: prometheus.ExponentialBucketsRange(
105 (100 * time.Millisecond).Seconds(),
106 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
107 20),
108 }, []string{
109 "state", // state is an indexState
110 "name", // name of the repository that was indexed
111 })
112
113 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{
114 Name: "index_indexing_delay_seconds",
115 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.",
116 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days
117 }, []string{
118 "state", // state is an indexState
119 "name", // the name of the repository that was indexed
120 })
121
122 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
123 Name: "index_fetch_seconds",
124 Help: "A histogram of latencies for fetching a repository.",
125 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
126 }, []string{
127 "success", // true|false
128 "name", // the name of the repository that the commits were fetched from
129 })
130
131 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
132 Name: "index_incremental_index_state",
133 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
134 }, []string{"state"}) // state is index.IndexState
135
136 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
137 Name: "index_num_indexed",
138 Help: "Number of indexed repos by code host",
139 })
140
141 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
142 Name: "index_failing_total",
143 Help: "Counts failures to index (indexing activity, should be used with rate())",
144 })
145
146 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
147 Name: "index_indexing_total",
148 Help: "Counts indexings (indexing activity, should be used with rate())",
149 })
150
151 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
152 Name: "index_num_stopped_tracking_total",
153 Help: "Counts the number of repos we stopped tracking.",
154 })
155
156 // clientMetricsOnce returns a singleton instance of the client metrics
157 // that are shared across all gRPC clients that this process creates.
158 //
159 // This function panics if the metrics cannot be registered with the default
160 // Prometheus registry.
161 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics {
162 clientMetrics := grpcprom.NewClientMetrics(
163 grpcprom.WithClientCounterOptions(),
164 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
165 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
166 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
167 )
168 prometheus.DefaultRegisterer.MustRegister(clientMetrics)
169 return clientMetrics
170 })
171)
172
173// 1 MB; match https://sourcegraph.sourcegraph.com/r/github.com/sourcegraph/sourcegraph/-/blob/cmd/searcher/internal/search/store.go?L32
174const MaxFileSize = 1 << 20
175
176// set of repositories that we want to capture separate indexing metrics for
177var reposWithSeparateIndexingMetrics = make(map[string]struct{})
178
179type indexState string
180
181const (
182 indexStateFail indexState = "fail"
183 indexStateSuccess indexState = "success"
184 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
185 indexStateNoop indexState = "noop" // We didn't need to update index
186 indexStateEmpty indexState = "empty" // index is empty (empty repo)
187)
188
189// Server is the main functionality of zoekt-sourcegraph-indexserver. It
190// exists to conveniently use all the options passed in via func main.
191type Server struct {
192 logger sglog.Logger
193
194 Sourcegraph Sourcegraph
195
196 // rootURL is the root URL of the Sourcegraph instance.
197 rootURL *url.URL
198
199 BatchSize int
200
201 // IndexDir is the index directory to use.
202 IndexDir string
203
204 // IndexConcurrency is the number of repositories we index at once.
205 IndexConcurrency int
206
207 // indexSemaphore limits the number of concurrent index operations
208 indexSemaphore chan struct{}
209
210 // Interval is how often we sync with Sourcegraph.
211 Interval time.Duration
212
213 // CPUCount is the number of CPUs to use for indexing shards.
214 CPUCount int
215
216 queue Queue
217
218 // muIndexDir protects the index directory from concurrent access.
219 muIndexDir indexMutex
220
221 // If true, shard merging is enabled.
222 shardMerging bool
223
224 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
225 // use delta-builds for instead of normal builds
226 deltaBuildRepositoriesAllowList map[string]struct{}
227
228 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
229 // before attempting a delta build.
230 deltaShardNumberFallbackThreshold uint64
231
232 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
233 // we skip calculating symbols metadata for during builds
234 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
235
236 hostname string
237
238 mergeOpts mergeOpts
239
240 // timeout defines how long the index server waits before killing an indexing job.
241 timeout time.Duration
242
243 indexserverv1.UnimplementedSourcegraphIndexserverServiceServer
244}
245
246var (
247 debugLog = log.New(io.Discard, "[DEBUG] ", log.LstdFlags)
248 infoLog = log.New(os.Stderr, "[INFO] ", log.LstdFlags)
249 errorLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags)
250)
251
252// our index commands should output something every 100mb they process.
253//
254// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
255// time." famous last words. A client was indexing a monorepo with 42
256// cores... 5m was not enough.
257const noOutputTimeout = 30 * time.Minute
258
259const (
260 maxFailureMessageBytes = 12 * 1024
261 failureMessageTruncationMarker = "\n\n[Error message truncated]\n\n"
262)
263
264func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
265 out := &synchronizedBuffer{}
266 cmd.Stdout = out
267 cmd.Stderr = out
268
269 tr.LazyPrintf("%s", cmd.Args)
270
271 defer func() {
272 if err != nil {
273 outS := out.String()
274 tr.LazyPrintf("failed: %v", err)
275 tr.LazyPrintf("output: %s", out)
276 tr.SetError()
277 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
278 }
279 }()
280
281 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
282
283 if err := cmd.Start(); err != nil {
284 return err
285 }
286
287 errC := make(chan error)
288 go func() {
289 errC <- cmd.Wait()
290 }()
291
292 // This channel is set after we have sent sigquit. It allows us to follow up
293 // with a sigkill if the process doesn't quit after sigquit.
294 kill := make(<-chan time.Time)
295
296 lastLen := 0
297 for {
298 select {
299 case <-time.After(noOutputTimeout):
300 // Periodically check if we have had output. If not kill the process.
301 if out.Len() != lastLen {
302 lastLen = out.Len()
303 infoLog.Printf("still running %s", cmd.Args)
304 } else {
305 // Send quit (C-\) first so we get a stack dump.
306 infoLog.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
307 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
308 errorLog.Println("quit failed:", err)
309 }
310
311 // send sigkill if still running in 10s
312 kill = time.After(10 * time.Second)
313 }
314
315 case <-kill:
316 infoLog.Printf("still running, killing %s", cmd.Args)
317 if err := cmd.Process.Kill(); err != nil {
318 errorLog.Println("kill failed:", err)
319 }
320
321 case err := <-errC:
322 if err != nil {
323 return err
324 }
325
326 tr.LazyPrintf("success")
327 return nil
328 }
329 }
330}
331
332// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
333// monitor the buffer while it is being written to.
334type synchronizedBuffer struct {
335 mu sync.Mutex
336 b bytes.Buffer
337}
338
339func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
340 sb.mu.Lock()
341 defer sb.mu.Unlock()
342 return sb.b.Write(p)
343}
344
345func (sb *synchronizedBuffer) Len() int {
346 sb.mu.Lock()
347 defer sb.mu.Unlock()
348 return sb.b.Len()
349}
350
351func (sb *synchronizedBuffer) String() string {
352 sb.mu.Lock()
353 defer sb.mu.Unlock()
354 return sb.b.String()
355}
356
357// pauseFileName if present in IndexDir will stop index jobs from
358// running. This is to make it possible to experiment with the content of the
359// IndexDir without the indexserver writing to it.
360const (
361 pauseFileName = "PAUSE"
362 pauseEnvVar = "SRC_PAUSE_INDEXING"
363)
364
365// isIndexingPaused checks if indexing should be paused based on either:
366// 1. The presence of a PAUSE file in the index directory
367// 2. The SRC_PAUSE_INDEXING environment variable being set to true
368func isIndexingPaused(indexDir string) (bool, string) {
369 // Check for PAUSE file first
370 if b, err := os.ReadFile(filepath.Join(indexDir, pauseFileName)); err == nil {
371 return true, fmt.Sprintf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
372 }
373
374 // Then check environment variable
375 if getEnvWithDefaultBool(pauseEnvVar, false) {
376 return true, fmt.Sprintf("indexserver paused via %s environment variable", pauseEnvVar)
377 }
378
379 return false, ""
380}
381
382// Run the sync loop. This blocks forever.
383func (s *Server) Run() {
384 removeIncompleteShards(s.IndexDir)
385
386 // Start a goroutine which updates the queue with commits to index.
387 go func() {
388 // We update the list of indexed repos every Interval. To speed up manual
389 // testing we also listen for SIGUSR1 to trigger updates.
390 // "pkill -SIGUSR1 zoekt-sourcegra"
391 for range jitterTicker(s.Interval, unix.SIGUSR1) {
392 if paused, msg := isIndexingPaused(s.IndexDir); paused {
393 infoLog.Printf("%s", msg)
394 continue
395 }
396
397 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
398 if err != nil {
399 errorLog.Printf("error listing repos: %s", err)
400 continue
401 }
402
403 debugLog.Printf("updating index queue with %d repositories", len(repos.IDs))
404
405 // Stop indexing repos we don't need to track anymore
406 removed := s.queue.MaybeRemoveMissing(repos.IDs)
407 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
408 if len(removed) > 0 {
409 infoLog.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
410 }
411
412 cleanupDone := make(chan struct{})
413 go func() {
414 defer close(cleanupDone)
415 s.muIndexDir.Global(func() {
416 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
417 })
418 }()
419
420 repos.IterateIndexOptions(s.queue.AddOrUpdate)
421
422 // IterateIndexOptions will only iterate over repositories that have
423 // changed since we last called list. However, we want to add all IDs
424 // back onto the queue just to check that what is on disk is still
425 // correct. This will use the last IndexOptions we stored in the
426 // queue. The repositories not on the queue (missing) need a forced
427 // fetch of IndexOptions.
428 missing := s.queue.Bump(repos.IDs)
429 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
430
431 setShardsCounter(s.IndexDir)
432
433 <-cleanupDone
434 }
435 }()
436
437 go func() {
438 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
439 if s.shardMerging {
440 s.vacuum()
441 }
442 }
443 }()
444
445 go func() {
446 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
447 if s.shardMerging {
448 s.doMerge()
449 }
450 }
451 }()
452
453 for range s.IndexConcurrency {
454 go s.processQueue()
455 }
456
457 // block forever
458 select {}
459}
460
461// formatList returns a comma-separated list of the first min(len(v), m) items.
462func formatListUint32(v []uint32, m int) string {
463 if len(v) < m {
464 m = len(v)
465 }
466
467 sb := strings.Builder{}
468 for i := range m {
469 fmt.Fprintf(&sb, "%d, ", v[i])
470 }
471
472 if len(v) > m {
473 sb.WriteString("...")
474 }
475
476 return strings.TrimRight(sb.String(), ", ")
477}
478
479func (s *Server) processQueue() {
480 for {
481 if paused, _ := isIndexingPaused(s.IndexDir); paused {
482 time.Sleep(time.Second)
483 continue
484 }
485
486 item, ok := s.queue.Pop()
487 if !ok {
488 time.Sleep(time.Second)
489 continue
490 }
491
492 opts := item.Opts
493 args := s.indexArgs(opts)
494
495 ran := s.muIndexDir.With(opts.Name, func() {
496 // only record time taken once we hold the lock. This avoids us
497 // recording time taken while merging/cleanup runs.
498 start := time.Now()
499
500 state, err := s.index(context.Background(), args)
501
502 elapsed := time.Since(start)
503 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
504
505 indexDelay := time.Since(item.DateAddedToQueue)
506 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds())
507
508 if err != nil {
509 errorLog.Printf("error indexing %s: %s", args.String(), err)
510 }
511
512 switch state {
513 case indexStateSuccess:
514 var branches []string
515 for _, b := range args.Branches {
516 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
517 }
518 s.logger.Info("updated index",
519 sglog.Int("tenant", args.TenantID),
520 sglog.String("repo", args.Name),
521 sglog.Uint32("id", args.RepoID),
522 sglog.Strings("branches", branches),
523 sglog.Duration("duration", elapsed),
524 sglog.Duration("index_delay", indexDelay),
525 )
526 case indexStateSuccessMeta:
527 infoLog.Printf("updated meta %s in %v", args.String(), elapsed)
528 }
529 s.queue.SetIndexed(opts, state)
530 })
531
532 if !ran {
533 // Someone else is processing the repository. We can just skip this job
534 // since the repository will be added back to the queue and we will
535 // converge to the correct behaviour.
536 debugLog.Printf("index job for repository already running: %s", args)
537 continue
538 }
539 }
540}
541
542// repoNameForMetric returns a normalized version of the given repository name that is
543// suitable for use with Prometheus metrics.
544func repoNameForMetric(repo string) string {
545 // Check to see if we want to be able to capture separate indexing metrics for this repository.
546 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
547 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
548 return repo
549 }
550
551 return ""
552}
553
554func batched(slice []uint32, size int) <-chan []uint32 {
555 c := make(chan []uint32)
556 go func() {
557 for len(slice) > 0 {
558 if size > len(slice) {
559 size = len(slice)
560 }
561 c <- slice[:size]
562 slice = slice[size:]
563 }
564 close(c)
565 }()
566 return c
567}
568
569// jitterTicker returns a ticker which ticks with a jitter. Each tick is
570// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
571//
572// sig is a list of signals which also cause the ticker to fire. This is a
573// convenience to allow manually triggering of the ticker.
574func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
575 ticker := make(chan struct{})
576
577 go func() {
578 for {
579 ticker <- struct{}{}
580 ns := int64(d)
581 jitter := rand.Int63n(ns)
582 time.Sleep(time.Duration(ns/2 + jitter))
583 }
584 }()
585
586 go func() {
587 if len(sig) == 0 {
588 return
589 }
590
591 c := make(chan os.Signal, 1)
592 signal.Notify(c, sig...)
593 for range c {
594 ticker <- struct{}{}
595 }
596 }()
597
598 return ticker
599}
600
601// Index starts an index job for repo name at commit.
602func (s *Server) index(ctx context.Context, args *indexArgs) (state indexState, err error) {
603 tr := trace.New("index", args.Name)
604 tr.SetMaxEvents(30) // Ensure we capture all indexing events
605
606 defer func() {
607 if err != nil {
608 tr.SetError()
609 tr.LazyPrintf("error: %v", err)
610 state = indexStateFail
611 metricFailingTotal.Inc()
612 }
613 tr.LazyPrintf("state: %s", state)
614 tr.Finish()
615 }()
616
617 // Sourcegraph should always provide a tenant ID.
618 if args.TenantID < 1 {
619 return indexStateFail, tenant.ErrMissingTenant
620 }
621
622 tr.LazyPrintf("branches: %v", args.Branches)
623
624 if len(args.Branches) == 0 {
625 return indexStateEmpty, createEmptyShard(args)
626 }
627
628 repositoryName := args.Name
629 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
630 tr.LazyPrintf("marking this repository for delta build")
631 args.UseDelta = true
632 }
633
634 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
635
636 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
637 tr.LazyPrintf("skipping symbols calculation")
638 args.Symbols = false
639 }
640
641 reason := "forced"
642
643 if args.Incremental {
644 bo := args.BuildOptions()
645 bo.SetDefaults()
646 incrementalState, fn := bo.IndexState()
647 reason = string(incrementalState)
648 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
649
650 switch incrementalState {
651 case index.IndexStateEqual:
652 debugLog.Printf("%s index already up to date. Shard=%s", args.String(), fn)
653 return indexStateNoop, nil
654
655 case index.IndexStateMeta:
656 infoLog.Printf("updating index.meta %s", args.String())
657
658 // TODO(stefan) handle mergeMeta for tenant id.
659 if err := mergeMeta(bo); err != nil {
660 errorLog.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
661 } else {
662 return indexStateSuccessMeta, nil
663 }
664
665 case index.IndexStateCorrupt:
666 infoLog.Printf("falling back to full update: corrupt index: %s", args.String())
667 }
668 }
669
670 infoLog.Printf("updating index %s reason=%s", args.String(), reason)
671
672 metricIndexingTotal.Inc()
673 c := gitIndexConfig{
674 runCmd: func(cmd *exec.Cmd) error {
675 return s.loggedRun(tr, cmd)
676 },
677
678 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
679 return args.BuildOptions().FindRepositoryMetadata()
680 },
681 timeout: s.timeout,
682 }
683
684 logIndexStatusUpdateError := func(err error) {
685 s.logger.Error("failed to update index status",
686 sglog.String("repo", args.Name),
687 sglog.Uint32("id", args.RepoID),
688 sglogBranches("branches", args.Branches),
689 sglog.Error(err),
690 )
691 }
692
693 err = gitIndex(ctx, c, args, s.Sourcegraph, s.logger)
694 if err != nil {
695 if statusErr := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph, err); statusErr != nil {
696 logIndexStatusUpdateError(statusErr)
697 }
698 return indexStateFail, err
699 }
700
701 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph, nil); err != nil {
702 logIndexStatusUpdateError(err)
703 }
704
705 return indexStateSuccess, nil
706}
707
708// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
709// it can update the zoekt_repos table.
710func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph, indexErr error) error {
711 state := configv1.UpdateIndexStatusRequest_Repository_STATE_SUCCESS
712 var failureMessage string
713 var indexTimeUnix int64
714 if indexErr != nil {
715 state = configv1.UpdateIndexStatusRequest_Repository_STATE_FAILURE
716 failureMessage = truncateFailureMessageForSourcegraph(indexErr.Error())
717
718 // On failure, metadata may not exist yet. Include index time if we can,
719 // but do not block reporting the failure status.
720 if _, metadata, ok, err := c.findRepositoryMetadata(args); err == nil && ok {
721 indexTimeUnix = metadata.IndexTime.Unix()
722 }
723 } else {
724 // We need to read from disk for IndexTime.
725 _, metadata, ok, err := c.findRepositoryMetadata(args)
726 if err != nil {
727 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
728 }
729 if !ok {
730 return errors.New("failed to find metadata for new/updated index")
731 }
732 indexTimeUnix = metadata.IndexTime.Unix()
733 }
734
735 status := []indexStatus{{
736 RepoID: args.RepoID,
737 Branches: args.Branches,
738 IndexTimeUnix: indexTimeUnix,
739 State: state,
740 FailureMessage: failureMessage,
741 }}
742
743 if err := sg.UpdateIndexStatus(status); err != nil {
744 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
745 }
746
747 return nil
748}
749
750func truncateFailureMessageForSourcegraph(s string) string {
751 if len(s) <= maxFailureMessageBytes {
752 return s
753 }
754
755 budget := maxFailureMessageBytes - len(failureMessageTruncationMarker)
756 headBytes := budget / 2
757 tailBytes := budget - headBytes
758
759 return utf8PrefixBytes(s, headBytes) + failureMessageTruncationMarker + utf8SuffixBytes(s, tailBytes)
760}
761
762func utf8PrefixBytes(s string, maxBytes int) string {
763 if len(s) <= maxBytes {
764 return s
765 }
766
767 if maxBytes <= 0 {
768 return ""
769 }
770
771 for maxBytes > 0 && !utf8.RuneStart(s[maxBytes]) {
772 maxBytes--
773 }
774
775 return s[:maxBytes]
776}
777
778func utf8SuffixBytes(s string, maxBytes int) string {
779 if len(s) <= maxBytes {
780 return s
781 }
782
783 if maxBytes <= 0 {
784 return ""
785 }
786
787 start := len(s) - maxBytes
788 for start < len(s) && !utf8.RuneStart(s[start]) {
789 start++
790 }
791
792 return s[start:]
793}
794
795func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
796 ss := make([]string, len(branches))
797 for i, b := range branches {
798 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
799 }
800 return sglog.Strings(key, ss)
801}
802
803func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
804 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
805 return &indexArgs{
806 IndexOptions: opts,
807 IndexDir: s.IndexDir,
808 Parallelism: parallelism,
809 Incremental: true,
810 ShardMerging: s.shardMerging,
811 }
812}
813
814// parallelism consults both the server flags and index options to determine the number
815// of shards to index in parallel. If the CPUCount index option is provided, it always
816// overrides the server flag.
817func (s *Server) parallelism(opts IndexOptions, maxProcs int) int {
818 var parallelism int
819 if opts.ShardConcurrency > 0 {
820 parallelism = int(opts.ShardConcurrency)
821 } else {
822 parallelism = s.CPUCount
823 }
824
825 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs
826 if parallelism > 4*maxProcs {
827 parallelism = 4 * maxProcs
828 }
829
830 // If index concurrency is set, then divide the parallelism by the number of
831 // repos we're indexing in parallel
832 if s.IndexConcurrency > 1 {
833 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency)))
834 }
835
836 return parallelism
837}
838
839func createEmptyShard(args *indexArgs) error {
840 bo := args.BuildOptions()
841 bo.SetDefaults()
842 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
843
844 if args.Incremental && bo.IncrementalSkipIndexing() {
845 return nil
846 }
847
848 builder, err := index.NewBuilder(*bo)
849 if err != nil {
850 return err
851 }
852 return builder.Finish()
853}
854
855// addDebugHandlers adds handlers specific to indexserver.
856func (s *Server) addDebugHandlers(mux *http.ServeMux) {
857 // Sourcegraph's site admin view requires indexserver to serve it's admin view
858 // on "/".
859 mux.Handle("/", http.HandlerFunc(s.handleRoot))
860
861 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
862 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
863 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
864 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
865 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
866 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
867}
868
869func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
870 if r.Method != "GET" {
871 w.Header().Set("Allow", "GET")
872 w.WriteHeader(http.StatusMethodNotAllowed)
873 return
874 }
875
876 response := struct {
877 Hostname string
878 }{
879 Hostname: s.hostname,
880 }
881
882 b, err := json.Marshal(response)
883 if err != nil {
884 http.Error(w, err.Error(), http.StatusInternalServerError)
885 return
886 }
887
888 w.Header().Set("Content-Type", "application/json; charset=utf-8")
889 w.Write(b)
890}
891
892var rootTmpl = template.Must(template.New("name").Parse(`
893<html>
894 <body>
895 <a href="debug">Debug</a><br />
896 <a href="debug/requests">Traces</a><br />
897 {{.IndexMsg}}<br />
898 <br />
899 <h3>Reindex</h3>
900 {{if .Repos}}
901 <a href="?show_repos=false">hide repos</a><br />
902 <table style="margin-top: 20px">
903 <th style="text-align:left">Name</th>
904 <th style="text-align:left">ID (click to reindex)</th>
905 {{range .Repos}}
906 <tr>
907 <td>{{.Name}}</td>
908 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
909 </tr>
910 {{end}}
911 </table>
912 {{else}}
913 <a href="?show_repos=true">show repos</a><br />
914 {{end}}
915 </body>
916</html>
917`))
918
919func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
920 if r.Method != "GET" {
921 w.Header().Set("Allow", "GET")
922 w.WriteHeader(http.StatusMethodNotAllowed)
923 return
924 }
925
926 values := r.URL.Query()
927
928 // ?id=
929 indexMsg := ""
930 if v := values.Get("id"); v != "" {
931 id, err := strconv.Atoi(v)
932 if err != nil {
933 http.Error(w, err.Error(), http.StatusBadRequest)
934 return
935 }
936 indexMsg, _ = s.forceIndex(r.Context(), uint32(id))
937 }
938
939 // ?show_repos=
940 showRepos := false
941 if v := values.Get("show_repos"); v != "" {
942 showRepos, _ = strconv.ParseBool(v)
943 }
944
945 type Repo struct {
946 ID uint32
947 Name string
948 }
949 var data struct {
950 Repos []Repo
951 IndexMsg string
952 }
953
954 data.IndexMsg = indexMsg
955
956 if showRepos {
957 s.queue.Iterate(func(opts *IndexOptions) {
958 data.Repos = append(data.Repos, Repo{
959 ID: opts.RepoID,
960 Name: opts.Name,
961 })
962 })
963 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
964 }
965
966 _ = rootTmpl.Execute(w, data)
967}
968
969// handleReindex triggers a reindex asynocronously. If a reindex was triggered
970// the request returns with status 202. The caller can infer the new state of
971// the index by calling List.
972func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
973 if r.Method != http.MethodPost {
974 w.Header().Set("Allow", http.MethodPost)
975 w.WriteHeader(http.StatusMethodNotAllowed)
976 return
977 }
978
979 err := r.ParseForm()
980 if err != nil {
981 http.Error(w, err.Error(), http.StatusBadRequest)
982 return
983 }
984
985 id, err := strconv.Atoi(r.Form.Get("repo"))
986 if err != nil {
987 http.Error(w, err.Error(), http.StatusBadRequest)
988 return
989 }
990
991 go func() { s.forceIndex(context.Background(), uint32(id)) }()
992
993 // 202 Accepted
994 w.WriteHeader(http.StatusAccepted)
995}
996
997func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
998 withIndexed := true
999 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
1000 withIndexed = b
1001 }
1002
1003 var indexed []uint32
1004 if withIndexed {
1005 indexed = listIndexed(s.IndexDir)
1006 }
1007
1008 repos, err := s.Sourcegraph.List(r.Context(), indexed)
1009 if err != nil {
1010 http.Error(w, err.Error(), http.StatusInternalServerError)
1011 return
1012 }
1013
1014 bw := bytes.Buffer{}
1015
1016 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
1017
1018 _, err = fmt.Fprintf(tw, "ID\tName\n")
1019 if err != nil {
1020 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
1021 return
1022 }
1023
1024 s.queue.mu.Lock()
1025 name := ""
1026 for _, id := range repos.IDs {
1027 if item := s.queue.get(id); item != nil {
1028 name = item.opts.Name
1029 } else {
1030 name = ""
1031 }
1032 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
1033 if err != nil {
1034 debugLog.Printf("handleDebugList: %s\n", err.Error())
1035 }
1036 }
1037 s.queue.mu.Unlock()
1038
1039 if err != nil {
1040 http.Error(w, err.Error(), http.StatusInternalServerError)
1041 return
1042 }
1043
1044 err = tw.Flush()
1045 if err != nil {
1046 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
1047 return
1048 }
1049
1050 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
1051
1052 if _, err := io.Copy(w, &bw); err != nil {
1053 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
1054 return
1055 }
1056}
1057
1058// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
1059// can run this command during periods of low usage (evenings, weekends) to
1060// trigger an initial merge run. In the steady-state, merges happen rarely, even
1061// on busy instances, and users can rely on automatic merging instead.
1062func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
1063 // A merge operation can take very long, depending on the number merges and the
1064 // target size of the compound shards. We run the merge in the background and
1065 // return immediately to the user.
1066 //
1067 // We track the status of the merge with metricShardMergingRunning.
1068 go func() {
1069 s.doMerge()
1070 }()
1071 _, _ = w.Write([]byte("merging enqueued\n"))
1072}
1073
1074func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
1075 indexed := listIndexed(s.IndexDir)
1076
1077 bw := bytes.Buffer{}
1078
1079 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
1080
1081 _, err := fmt.Fprintf(tw, "ID\tName\n")
1082 if err != nil {
1083 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
1084 return
1085 }
1086
1087 s.queue.mu.Lock()
1088 name := ""
1089 for _, id := range indexed {
1090 if item := s.queue.get(id); item != nil {
1091 name = item.opts.Name
1092 } else {
1093 name = ""
1094 }
1095 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
1096 if err != nil {
1097 debugLog.Printf("handleDebugIndexed: %s\n", err.Error())
1098 }
1099 }
1100 s.queue.mu.Unlock()
1101
1102 if err != nil {
1103 http.Error(w, err.Error(), http.StatusInternalServerError)
1104 return
1105 }
1106
1107 err = tw.Flush()
1108 if err != nil {
1109 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
1110 return
1111 }
1112
1113 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
1114
1115 if _, err := io.Copy(w, &bw); err != nil {
1116 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
1117 return
1118 }
1119}
1120
1121// forceIndex will run the index job for repo name now. It will return always
1122// return a string explaining what it did, even if it failed.
1123func (s *Server) forceIndex(ctx context.Context, id uint32) (string, error) {
1124 var opts IndexOptions
1125 var err error
1126 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
1127 opts = o
1128 }, func(_ uint32, e error) {
1129 err = e
1130 }, id)
1131 if err != nil {
1132 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
1133 }
1134
1135 args := s.indexArgs(opts)
1136 args.Incremental = false // force re-index
1137
1138 var state indexState
1139 ran := s.muIndexDir.With(opts.Name, func() {
1140 state, err = s.index(ctx, args)
1141 })
1142 if !ran {
1143 return fmt.Sprintf("index job for repository already running: %s", args), nil
1144 }
1145 if err != nil {
1146 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
1147 }
1148 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
1149}
1150
1151// DeleteAllData deletes all shards in the index and trash dir belonging to the
1152// tenant associated with the request. The deletion is best-effort, which means
1153// we will delete as much as possible. If no error is returned, the caller can
1154// be certain that all data has been deleted.
1155func (s *Server) DeleteAllData(ctx context.Context, _ *indexserverv1.DeleteAllDataRequest) (*indexserverv1.DeleteAllDataResponse, error) {
1156 tnt, err := tenant.FromContext(ctx)
1157 if err != nil {
1158 return nil, err
1159 }
1160 s.logger.Warn("DeleteAllData", sglog.Int("tenant_id", tnt.ID()))
1161
1162 var merr error
1163 s.muIndexDir.Global(func() {
1164 // First, explode all compound shards that have repos from the tenant in
1165 // question. Because we hold the global lock, we can be sure that no new
1166 // merges start while we do this.
1167 if err := s.explodeTenantCompoundShards(ctx, func(path string) error {
1168 // We call explode in a separate process to protect indexserver.
1169 cmd := defaultExplodeCmd(path)
1170
1171 stdoutBuf := &bytes.Buffer{}
1172 stderrBuf := &bytes.Buffer{}
1173 cmd.Stdout = stdoutBuf
1174 cmd.Stderr = stderrBuf
1175
1176 err := cmd.Run()
1177 if err != nil {
1178 errorLog.Printf("explode failed: %v (stderr: %s)", err, stderrBuf.String())
1179 return err
1180 }
1181
1182 infoLog.Printf("exploded shard: %s", stdoutBuf.String())
1183
1184 return nil
1185 }); err != nil {
1186 merr = multierr.Append(merr, err)
1187 }
1188
1189 // Invariant: all shards from the tenant are simple shards.
1190
1191 if err := purgeTenantShards(ctx, s.IndexDir); err != nil {
1192 merr = multierr.Append(merr, err)
1193 }
1194 if err := purgeTenantShards(ctx, filepath.Join(s.IndexDir, ".trash")); err != nil {
1195 merr = multierr.Append(merr, err)
1196 }
1197 })
1198
1199 return &indexserverv1.DeleteAllDataResponse{}, merr
1200}
1201
1202func listIndexed(indexDir string) []uint32 {
1203 index := getShards(indexDir)
1204 metricNumIndexed.Set(float64(len(index)))
1205 repoIDs := make([]uint32, 0, len(index))
1206 for id := range index {
1207 repoIDs = append(repoIDs, id)
1208 }
1209 slices.Sort(repoIDs)
1210 return repoIDs
1211}
1212
1213// setupTmpDir sets up a temporary directory on the same volume as the
1214// indexes.
1215//
1216// If main is true we will delete older temp directories left around. main is
1217// false when this is a debug command.
1218func setupTmpDir(logger sglog.Logger, main bool, index string) error {
1219 // change the target tmp directory depending on if it's our main daemon or a
1220 // debug sub command.
1221 dir := ".indexserver.debug.tmp"
1222 if main {
1223 dir = ".indexserver.tmp"
1224 }
1225
1226 tmpRoot := filepath.Join(index, dir)
1227
1228 if main {
1229 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot))
1230 err := os.RemoveAll(tmpRoot)
1231 if err != nil {
1232 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err))
1233 }
1234 }
1235
1236 if err := os.MkdirAll(tmpRoot, 0o755); err != nil {
1237 return err
1238 }
1239
1240 return os.Setenv("TMPDIR", tmpRoot)
1241}
1242
1243func printMetaData(fn string) error {
1244 repo, indexMeta, err := index.ReadMetadataPath(fn)
1245 if err != nil {
1246 return err
1247 }
1248
1249 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1250 if err != nil {
1251 return err
1252 }
1253
1254 err = json.NewEncoder(os.Stdout).Encode(repo)
1255 if err != nil {
1256 return err
1257 }
1258 return nil
1259}
1260
1261func printShardStats(fn string) error {
1262 f, err := os.Open(fn)
1263 if err != nil {
1264 return err
1265 }
1266
1267 iFile, err := index.NewIndexFile(f)
1268 if err != nil {
1269 return err
1270 }
1271
1272 return index.PrintNgramStats(iFile)
1273}
1274
1275func srcLogLevelIsDebug() bool {
1276 lvl := os.Getenv(sglog.EnvLogLevel)
1277 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1278}
1279
1280func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1281 v := os.Getenv(k)
1282 if v == "" {
1283 return defaultVal
1284 }
1285 b, err := strconv.ParseBool(v)
1286 if err != nil {
1287 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1288 }
1289 return b
1290}
1291
1292func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1293 v := os.Getenv(k)
1294 if v == "" {
1295 return defaultVal
1296 }
1297 i, err := strconv.ParseInt(v, 10, 64)
1298 if err != nil {
1299 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1300 }
1301 return i
1302}
1303
1304func getEnvWithDefaultInt(k string, defaultVal int) int {
1305 v := os.Getenv(k)
1306 if v == "" {
1307 return defaultVal
1308 }
1309 i, err := strconv.Atoi(k)
1310 if err != nil {
1311 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1312 }
1313 return i
1314}
1315
1316func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1317 v := os.Getenv(k)
1318 if v == "" {
1319 return defaultVal
1320 }
1321 i, err := strconv.ParseUint(v, 10, 64)
1322 if err != nil {
1323 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1324 }
1325 return i
1326}
1327
1328func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1329 v := os.Getenv(k)
1330 if v == "" {
1331 return defaultVal
1332 }
1333 f, err := strconv.ParseFloat(v, 64)
1334 if err != nil {
1335 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1336 }
1337 return f
1338}
1339
1340func getEnvWithDefaultString(k string, defaultVal string) string {
1341 v := os.Getenv(k)
1342 if v == "" {
1343 return defaultVal
1344 }
1345 return v
1346}
1347
1348func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1349 v := os.Getenv(k)
1350 if v == "" {
1351 return defaultVal
1352 }
1353
1354 d, err := time.ParseDuration(v)
1355 if err != nil {
1356 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1357 }
1358 return d
1359}
1360
1361func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1362 set := map[string]struct{}{}
1363 for _, v := range strings.Split(os.Getenv(k), ",") {
1364 v = strings.TrimSpace(v)
1365 if v != "" {
1366 set[v] = struct{}{}
1367 }
1368 }
1369 return set
1370}
1371
1372func joinStringSet(set map[string]struct{}, sep string) string {
1373 var xs []string
1374 for x := range set {
1375 xs = append(xs, x)
1376 }
1377
1378 return strings.Join(xs, sep)
1379}
1380
1381func setShardsCounter(indexDir string) {
1382 fns, err := filepath.Glob(filepath.Join(indexDir, "*.zoekt"))
1383 if err != nil {
1384 errorLog.Printf("setShardsCounter: %s\n", err)
1385 return
1386 }
1387 metricNumberShards.Set(float64(len(fns)))
1388
1389 compoundFns := make([]string, 0, len(fns))
1390 for _, fn := range fns {
1391 if strings.HasPrefix(filepath.Base(fn), "compound-") {
1392 compoundFns = append(compoundFns, fn)
1393 }
1394 }
1395 metricNumberCompoundShards.Set(float64(len(compoundFns)))
1396}
1397
1398func rootCmd() *ffcli.Command {
1399 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1400 conf := rootConfig{
1401 Main: true,
1402 }
1403 conf.registerRootFlags(rootFs)
1404
1405 return &ffcli.Command{
1406 FlagSet: rootFs,
1407 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1408 Subcommands: []*ffcli.Command{debugCmd()},
1409 Exec: func(ctx context.Context, args []string) error {
1410 return startServer(conf)
1411 },
1412 }
1413}
1414
1415type rootConfig struct {
1416 // Main is true if this rootConfig is for our main long running command (the
1417 // indexserver). Debug commands should not set this value. This is used to
1418 // determine if we need to run tmpfriend.
1419 Main bool
1420
1421 root string
1422 interval time.Duration
1423 index string
1424 indexConcurrency int64
1425 listen string
1426 hostname string
1427 cpuFraction float64
1428
1429 // config values related to shard merging
1430 disableShardMerging bool
1431 vacuumInterval time.Duration
1432 mergeInterval time.Duration
1433 targetSize int64
1434 minSize int64
1435 minAgeDays int
1436
1437 // config values related to backoff indexing repos with one or more consecutive failures
1438 backoffDuration time.Duration
1439 maxBackoffDuration time.Duration
1440}
1441
1442func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1443 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1444 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1445 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
1446 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", index.DefaultDir), "set index directory to use")
1447 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1448 fs.StringVar(&rc.hostname, "hostname", index.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1449 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1450 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1451 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1452
1453 // flags related to shard merging
1454 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging")
1455 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1456 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1457 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB")
1458 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB")
1459 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1460}
1461
1462func startServer(conf rootConfig) error {
1463 s, err := newServer(conf)
1464 if err != nil {
1465 return err
1466 }
1467
1468 profiler.Init("zoekt-sourcegraph-indexserver")
1469 setShardsCounter(s.IndexDir)
1470
1471 if conf.listen != "" {
1472
1473 mux := http.NewServeMux()
1474 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1475 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1476 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1477 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1478 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1479 }...)
1480 s.addDebugHandlers(mux)
1481
1482 go func() {
1483 debugLog.Printf("serving HTTP on %s", conf.listen)
1484 mux := grpcutil.MultiplexGRPC(newGRPCServer(sglog.Scoped("indexserver"), s), mux)
1485 log.Fatal(http.ListenAndServe(conf.listen, mux))
1486 }()
1487
1488 // Serve mux on a unix domain socket on a best-effort-basis so that
1489 // webserver can call the endpoints via the shared filesystem.
1490 //
1491 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1492 // on the socket due to permission errors. See
1493 // https://github.com/docker/for-mac/issues/6239
1494 go func() {
1495 serveHTTPOverSocket := func() error {
1496 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1497 // We cannot bind a socket to an existing pathname.
1498 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1499 return fmt.Errorf("error removing socket file: %s", socket)
1500 }
1501 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1502 // unixpacket).
1503 l, err := net.Listen("unix", socket)
1504 if err != nil {
1505 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1506 }
1507 // Indexserver (root) and webserver (Sourcegraph) run with
1508 // different users. Per default, the socket is created with
1509 // permission 755 (root root), which doesn't let webserver write to
1510 // it.
1511 //
1512 // See https://github.com/golang/go/issues/11822 for more context.
1513 if err := os.Chmod(socket, 0o777); err != nil {
1514 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1515 }
1516 debugLog.Printf("serving HTTP on %s", socket)
1517 return http.Serve(l, mux)
1518 }
1519 debugLog.Print(serveHTTPOverSocket())
1520 }()
1521 }
1522
1523 oc := &ownerChecker{
1524 Path: filepath.Join(conf.index, "owner.txt"),
1525 Hostname: conf.hostname,
1526 }
1527 go oc.Run()
1528
1529 logger := sglog.Scoped("metricsRegistration")
1530 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1531
1532 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1533 prometheus.DefaultRegisterer.MustRegister(c)
1534
1535 s.Run()
1536 return nil
1537}
1538
1539func newServer(conf rootConfig) (*Server, error) {
1540 logger := sglog.Scoped("server")
1541
1542 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1543 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1544 }
1545 if conf.index == "" {
1546 return nil, fmt.Errorf("must set -index")
1547 }
1548 if conf.root == "" {
1549 return nil, fmt.Errorf("must set -sourcegraph_url")
1550 }
1551 rootURL, err := url.Parse(conf.root)
1552 if err != nil {
1553 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1554 }
1555
1556 rootURL = addDefaultPort(rootURL)
1557
1558 // Tune GOMAXPROCS to match Linux container CPU quota.
1559 _, _ = maxprocs.Set()
1560
1561 // Automatically prepend our own path at the front, to minimize
1562 // required configuration.
1563 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1564 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1565 }
1566
1567 if _, err := os.Stat(conf.index); err != nil {
1568 if err := os.MkdirAll(conf.index, 0o755); err != nil {
1569 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1570 }
1571 }
1572
1573 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil {
1574 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1575 }
1576
1577 if srcLogLevelIsDebug() {
1578 debugLog.SetOutput(os.Stderr)
1579 }
1580
1581 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1582 if len(reposWithSeparateIndexingMetrics) > 0 {
1583 debugLog.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1584 }
1585
1586 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1587 if len(deltaBuildRepositoriesAllowList) > 0 {
1588 debugLog.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1589 }
1590
1591 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1592 if deltaShardNumberFallbackThreshold > 0 {
1593 debugLog.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1594 } else {
1595 debugLog.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1596 }
1597
1598 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1599 if len(reposShouldSkipSymbolsCalculation) > 0 {
1600 debugLog.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1601 }
1602
1603 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout)
1604 if indexingTimeout != defaultIndexingTimeout {
1605 debugLog.Printf("using configured indexing timeout: %s", indexingTimeout)
1606 }
1607
1608 var sg Sourcegraph
1609 if rootURL.IsAbs() {
1610 var batchSize int
1611 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1612 batchSize, err = strconv.Atoi(v)
1613 if err != nil {
1614 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1615 }
1616 }
1617
1618 opts := []SourcegraphClientOption{
1619 WithBatchSize(batchSize),
1620 }
1621
1622 logger := sglog.Scoped("zoektConfigurationGRPCClient")
1623 client, err := dialGRPCClient(rootURL.Host, logger)
1624 if err != nil {
1625 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1626 }
1627
1628 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...)
1629
1630 } else {
1631 sg = sourcegraphFake{
1632 RootDir: rootURL.String(),
1633 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1634 }
1635 }
1636
1637 cpuCount := max(int(math.Round(float64(runtime.GOMAXPROCS(0))*(conf.cpuFraction))), 1)
1638
1639 if conf.indexConcurrency < 1 {
1640 conf.indexConcurrency = 1
1641 } else if conf.indexConcurrency > int64(cpuCount) {
1642 conf.indexConcurrency = int64(cpuCount)
1643 }
1644
1645 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1646
1647 return &Server{
1648 logger: logger,
1649 rootURL: rootURL,
1650 Sourcegraph: sg,
1651 IndexDir: conf.index,
1652 IndexConcurrency: int(conf.indexConcurrency),
1653 Interval: conf.interval,
1654 CPUCount: cpuCount,
1655 queue: *q,
1656 shardMerging: !conf.disableShardMerging,
1657 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1658 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1659 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1660 hostname: conf.hostname,
1661 mergeOpts: mergeOpts{
1662 vacuumInterval: conf.vacuumInterval,
1663 mergeInterval: conf.mergeInterval,
1664 targetSizeBytes: conf.targetSize * 1024 * 1024,
1665 minSizeBytes: conf.minSize * 1024 * 1024,
1666 minAgeDays: conf.minAgeDays,
1667 },
1668 timeout: indexingTimeout,
1669 indexSemaphore: make(chan struct{}, int(conf.indexConcurrency)),
1670 }, err
1671}
1672
1673func newGRPCServer(logger sglog.Logger, s *Server, additionalOpts ...grpc.ServerOption) *grpc.Server {
1674 grpcServer := defaults.NewServer(logger, additionalOpts...)
1675 indexserverv1.RegisterSourcegraphIndexserverServiceServer(grpcServer, s)
1676 return grpcServer
1677}
1678
1679// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1680// for the indexed-search-configuration gRPC service.
1681//
1682// The default backoff strategy is modeled after the default settings used by
1683// retryablehttp.DefaultClient.
1684//
1685// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1686// - Unavailable
1687// - Aborted
1688//
1689//go:embed default_grpc_service_configuration.json
1690var defaultGRPCServiceConfigurationJSON string
1691
1692func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1693 return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1694 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1695 return invoker(ctx, method, req, reply, cc, opts...)
1696 }
1697}
1698
1699func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1700 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1701 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1702 return streamer(ctx, desc, cc, method, opts...)
1703 }
1704}
1705
1706// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1707// This can be overridden by providing custom Server/Dial options.
1708const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1709
1710func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (configv1.ZoektConfigurationServiceClient, error) {
1711 metrics := clientMetricsOnce()
1712
1713 // If the service seems to be unavailable, this
1714 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1
1715 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s])
1716 retryOpts := []retry.CallOption{
1717 retry.WithMax(5),
1718 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)),
1719 retry.WithCodes(codes.Unavailable),
1720 }
1721
1722 opts := []grpc.DialOption{
1723 grpc.WithTransportCredentials(insecure.NewCredentials()),
1724 grpc.WithChainStreamInterceptor(
1725 metrics.StreamClientInterceptor(),
1726 messagesize.StreamClientInterceptor,
1727 internalActorStreamInterceptor(),
1728 internalerrs.LoggingStreamClientInterceptor(logger),
1729 internalerrs.PrometheusStreamClientInterceptor,
1730 retry.StreamClientInterceptor(retryOpts...),
1731 ),
1732 grpc.WithChainUnaryInterceptor(
1733 metrics.UnaryClientInterceptor(),
1734 messagesize.UnaryClientInterceptor,
1735 internalActorUnaryInterceptor(),
1736 internalerrs.LoggingUnaryClientInterceptor(logger),
1737 internalerrs.PrometheusUnaryClientInterceptor,
1738 retry.UnaryClientInterceptor(retryOpts...),
1739 ),
1740 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1741 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1742 }
1743
1744 opts = append(opts, additionalOpts...)
1745
1746 // Ensure that the message size options are set last, so they override any other
1747 // client-specific options that tweak the message size.
1748 //
1749 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1750 // take precedence over everything else with a uniform size setting that's easy to reason about.
1751 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1752
1753 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1754 // This is done lazily, so we can provide the client to use regardless of
1755 // whether we enabled gRPC or not initially.
1756 cc, err := grpc.Dial(addr, opts...)
1757 if err != nil {
1758 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1759 }
1760
1761 client := configv1.NewZoektConfigurationServiceClient(cc)
1762 return client, nil
1763}
1764
1765// addDefaultPort adds a default port to a URL if one is not specified.
1766//
1767// If the URL scheme is "http" and no port is specified, "80" is used.
1768// If the scheme is "https", "443" is used.
1769//
1770// The original URL is not mutated. A copy is modified and returned.
1771func addDefaultPort(original *url.URL) *url.URL {
1772 if original == nil {
1773 return nil // don't panic
1774 }
1775
1776 if !original.IsAbs() {
1777 return original // don't do anything if the URL doesn't look like a remote URL
1778 }
1779
1780 if original.Scheme == "http" && original.Port() == "" {
1781 u := cloneURL(original)
1782 u.Host = net.JoinHostPort(u.Host, "80")
1783 return u
1784 }
1785
1786 if original.Scheme == "https" && original.Port() == "" {
1787 u := cloneURL(original)
1788 u.Host = net.JoinHostPort(u.Host, "443")
1789 return u
1790 }
1791
1792 return original
1793}
1794
1795// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1796// This is copied from net/http/clone.go
1797func cloneURL(u *url.URL) *url.URL {
1798 if u == nil {
1799 return nil
1800 }
1801 u2 := new(url.URL)
1802 *u2 = *u
1803 if u.User != nil {
1804 u2.User = new(url.Userinfo)
1805 *u2.User = *u.User
1806 }
1807 return u2
1808}
1809
1810func main() {
1811 liblog := sglog.Init(sglog.Resource{
1812 Name: "zoekt-indexserver",
1813 Version: index.Version,
1814 InstanceID: index.HostnameBestEffort(),
1815 })
1816 defer liblog.Sync()
1817
1818 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1819 log.Fatal(err)
1820 }
1821}
1822
1823// getBoolFromEnvironmentVariables returns the boolean defined by the first environment
1824// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set.
1825//
1826// An error is returned of the provided environment variables fails to parse as a boolean.
1827func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) {
1828 for _, envVar := range envVarNames {
1829 v := os.Getenv(envVar)
1830 if v == "" {
1831 continue
1832 }
1833
1834 b, err := strconv.ParseBool(v)
1835 if err != nil {
1836 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err)
1837 }
1838
1839 return b, nil
1840 }
1841
1842 return defaultBool, nil
1843}