fork of https://github.com/sourcegraph/zoekt
1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5// http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12
13// Command zoekt-sourcegraph-indexserver periodically reindexes repositories
14// from a Sourcegraph instance. It uses a "pull-based" design, where it periodically
15// reaches out to the Sourcegraph instance for the list of repositories to reindex.
16package main
17
18import (
19 "bytes"
20 "context"
21 _ "embed"
22 "encoding/json"
23 "errors"
24 "flag"
25 "fmt"
26 "html/template"
27 "io"
28 "io/fs"
29 "log"
30 "math"
31 "math/rand"
32 "net"
33 "net/http"
34 "net/url"
35 "os"
36 "os/exec"
37 "os/signal"
38 "path/filepath"
39 "runtime"
40 "slices"
41 "sort"
42 "strconv"
43 "strings"
44 "sync"
45 "text/tabwriter"
46 "time"
47
48 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
49 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
50 "github.com/peterbourgon/ff/v3/ffcli"
51 "github.com/prometheus/client_golang/prometheus"
52 "github.com/prometheus/client_golang/prometheus/promauto"
53 sglog "github.com/sourcegraph/log"
54 "github.com/sourcegraph/mountinfo"
55
56 "github.com/sourcegraph/zoekt"
57 configv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/sourcegraph/zoekt/configuration/v1"
58 indexserverv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/zoekt/indexserver/v1"
59 "github.com/sourcegraph/zoekt/grpc/defaults"
60 "github.com/sourcegraph/zoekt/grpc/grpcutil"
61 "github.com/sourcegraph/zoekt/grpc/internalerrs"
62 "github.com/sourcegraph/zoekt/grpc/messagesize"
63 "github.com/sourcegraph/zoekt/index"
64 "github.com/sourcegraph/zoekt/internal/debugserver"
65 "github.com/sourcegraph/zoekt/internal/profiler"
66 "github.com/sourcegraph/zoekt/internal/tenant"
67
68 "go.uber.org/automaxprocs/maxprocs"
69 "go.uber.org/multierr"
70 "golang.org/x/net/trace"
71 "golang.org/x/sys/unix"
72 "google.golang.org/grpc"
73 "google.golang.org/grpc/codes"
74 "google.golang.org/grpc/credentials/insecure"
75 "google.golang.org/grpc/metadata"
76)
77
78var (
79 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
80 Name: "resolve_revisions_seconds",
81 Help: "A histogram of latencies for resolving all repository revisions.",
82 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
83 })
84
85 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
86 Name: "resolve_revision_seconds",
87 Help: "A histogram of latencies for resolving a repository revision.",
88 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
89 }, []string{"success"}) // success=true|false
90
91 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
92 Name: "get_index_options_total",
93 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
94 })
95 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
96 Name: "get_index_options_error_total",
97 Help: "The total number of times we failed to get index options for a repository.",
98 })
99
100 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
101 Name: "index_repo_seconds",
102 Help: "A histogram of latencies for indexing a repository.",
103 Buckets: prometheus.ExponentialBucketsRange(
104 (100 * time.Millisecond).Seconds(),
105 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
106 20),
107 }, []string{
108 "state", // state is an indexState
109 "name", // name of the repository that was indexed
110 })
111
112 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{
113 Name: "index_indexing_delay_seconds",
114 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.",
115 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days
116 }, []string{
117 "state", // state is an indexState
118 "name", // the name of the repository that was indexed
119 })
120
121 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
122 Name: "index_fetch_seconds",
123 Help: "A histogram of latencies for fetching a repository.",
124 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
125 }, []string{
126 "success", // true|false
127 "name", // the name of the repository that the commits were fetched from
128 })
129
130 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
131 Name: "index_incremental_index_state",
132 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
133 }, []string{"state"}) // state is index.IndexState
134
135 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
136 Name: "index_num_indexed",
137 Help: "Number of indexed repos by code host",
138 })
139
140 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
141 Name: "index_failing_total",
142 Help: "Counts failures to index (indexing activity, should be used with rate())",
143 })
144
145 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
146 Name: "index_indexing_total",
147 Help: "Counts indexings (indexing activity, should be used with rate())",
148 })
149
150 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
151 Name: "index_num_stopped_tracking_total",
152 Help: "Counts the number of repos we stopped tracking.",
153 })
154
155 // clientMetricsOnce returns a singleton instance of the client metrics
156 // that are shared across all gRPC clients that this process creates.
157 //
158 // This function panics if the metrics cannot be registered with the default
159 // Prometheus registry.
160 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics {
161 clientMetrics := grpcprom.NewClientMetrics(
162 grpcprom.WithClientCounterOptions(),
163 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
164 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
165 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
166 )
167 prometheus.DefaultRegisterer.MustRegister(clientMetrics)
168 return clientMetrics
169 })
170)
171
172// 1 MB; match https://sourcegraph.sourcegraph.com/r/github.com/sourcegraph/sourcegraph/-/blob/cmd/searcher/internal/search/store.go?L32
173const MaxFileSize = 1 << 20
174
175// set of repositories that we want to capture separate indexing metrics for
176var reposWithSeparateIndexingMetrics = make(map[string]struct{})
177
178type indexState string
179
180const (
181 indexStateFail indexState = "fail"
182 indexStateSuccess indexState = "success"
183 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
184 indexStateNoop indexState = "noop" // We didn't need to update index
185 indexStateEmpty indexState = "empty" // index is empty (empty repo)
186)
187
188// Server is the main functionality of zoekt-sourcegraph-indexserver. It
189// exists to conveniently use all the options passed in via func main.
190type Server struct {
191 logger sglog.Logger
192
193 Sourcegraph Sourcegraph
194
195 // rootURL is the root URL of the Sourcegraph instance.
196 rootURL *url.URL
197
198 BatchSize int
199
200 // IndexDir is the index directory to use.
201 IndexDir string
202
203 // IndexConcurrency is the number of repositories we index at once.
204 IndexConcurrency int
205
206 // indexSemaphore limits the number of concurrent index operations
207 indexSemaphore chan struct{}
208
209 // Interval is how often we sync with Sourcegraph.
210 Interval time.Duration
211
212 // CPUCount is the number of CPUs to use for indexing shards.
213 CPUCount int
214
215 queue Queue
216
217 // muIndexDir protects the index directory from concurrent access.
218 muIndexDir indexMutex
219
220 // If true, shard merging is enabled.
221 shardMerging bool
222
223 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
224 // use delta-builds for instead of normal builds
225 deltaBuildRepositoriesAllowList map[string]struct{}
226
227 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
228 // before attempting a delta build.
229 deltaShardNumberFallbackThreshold uint64
230
231 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
232 // we skip calculating symbols metadata for during builds
233 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
234
235 hostname string
236
237 mergeOpts mergeOpts
238
239 // timeout defines how long the index server waits before killing an indexing job.
240 timeout time.Duration
241
242 indexserverv1.UnimplementedSourcegraphIndexserverServiceServer
243}
244
245var (
246 debugLog = log.New(io.Discard, "[DEBUG] ", log.LstdFlags)
247 infoLog = log.New(os.Stderr, "[INFO] ", log.LstdFlags)
248 errorLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags)
249)
250
251// our index commands should output something every 100mb they process.
252//
253// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
254// time." famous last words. A client was indexing a monorepo with 42
255// cores... 5m was not enough.
256const noOutputTimeout = 30 * time.Minute
257
258func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
259 out := &synchronizedBuffer{}
260 cmd.Stdout = out
261 cmd.Stderr = out
262
263 tr.LazyPrintf("%s", cmd.Args)
264
265 defer func() {
266 if err != nil {
267 outS := out.String()
268 tr.LazyPrintf("failed: %v", err)
269 tr.LazyPrintf("output: %s", out)
270 tr.SetError()
271 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
272 }
273 }()
274
275 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
276
277 if err := cmd.Start(); err != nil {
278 return err
279 }
280
281 errC := make(chan error)
282 go func() {
283 errC <- cmd.Wait()
284 }()
285
286 // This channel is set after we have sent sigquit. It allows us to follow up
287 // with a sigkill if the process doesn't quit after sigquit.
288 kill := make(<-chan time.Time)
289
290 lastLen := 0
291 for {
292 select {
293 case <-time.After(noOutputTimeout):
294 // Periodically check if we have had output. If not kill the process.
295 if out.Len() != lastLen {
296 lastLen = out.Len()
297 infoLog.Printf("still running %s", cmd.Args)
298 } else {
299 // Send quit (C-\) first so we get a stack dump.
300 infoLog.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
301 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
302 errorLog.Println("quit failed:", err)
303 }
304
305 // send sigkill if still running in 10s
306 kill = time.After(10 * time.Second)
307 }
308
309 case <-kill:
310 infoLog.Printf("still running, killing %s", cmd.Args)
311 if err := cmd.Process.Kill(); err != nil {
312 errorLog.Println("kill failed:", err)
313 }
314
315 case err := <-errC:
316 if err != nil {
317 return err
318 }
319
320 tr.LazyPrintf("success")
321 return nil
322 }
323 }
324}
325
326// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
327// monitor the buffer while it is being written to.
328type synchronizedBuffer struct {
329 mu sync.Mutex
330 b bytes.Buffer
331}
332
333func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
334 sb.mu.Lock()
335 defer sb.mu.Unlock()
336 return sb.b.Write(p)
337}
338
339func (sb *synchronizedBuffer) Len() int {
340 sb.mu.Lock()
341 defer sb.mu.Unlock()
342 return sb.b.Len()
343}
344
345func (sb *synchronizedBuffer) String() string {
346 sb.mu.Lock()
347 defer sb.mu.Unlock()
348 return sb.b.String()
349}
350
351// pauseFileName if present in IndexDir will stop index jobs from
352// running. This is to make it possible to experiment with the content of the
353// IndexDir without the indexserver writing to it.
354const (
355 pauseFileName = "PAUSE"
356 pauseEnvVar = "SRC_PAUSE_INDEXING"
357)
358
359// isIndexingPaused checks if indexing should be paused based on either:
360// 1. The presence of a PAUSE file in the index directory
361// 2. The SRC_PAUSE_INDEXING environment variable being set to true
362func isIndexingPaused(indexDir string) (bool, string) {
363 // Check for PAUSE file first
364 if b, err := os.ReadFile(filepath.Join(indexDir, pauseFileName)); err == nil {
365 return true, fmt.Sprintf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
366 }
367
368 // Then check environment variable
369 if getEnvWithDefaultBool(pauseEnvVar, false) {
370 return true, fmt.Sprintf("indexserver paused via %s environment variable", pauseEnvVar)
371 }
372
373 return false, ""
374}
375
376// Run the sync loop. This blocks forever.
377func (s *Server) Run() {
378 removeIncompleteShards(s.IndexDir)
379
380 // Start a goroutine which updates the queue with commits to index.
381 go func() {
382 // We update the list of indexed repos every Interval. To speed up manual
383 // testing we also listen for SIGUSR1 to trigger updates.
384 // "pkill -SIGUSR1 zoekt-sourcegra"
385 for range jitterTicker(s.Interval, unix.SIGUSR1) {
386 if paused, msg := isIndexingPaused(s.IndexDir); paused {
387 infoLog.Printf("%s", msg)
388 continue
389 }
390
391 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
392 if err != nil {
393 errorLog.Printf("error listing repos: %s", err)
394 continue
395 }
396
397 debugLog.Printf("updating index queue with %d repositories", len(repos.IDs))
398
399 // Stop indexing repos we don't need to track anymore
400 removed := s.queue.MaybeRemoveMissing(repos.IDs)
401 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
402 if len(removed) > 0 {
403 infoLog.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
404 }
405
406 cleanupDone := make(chan struct{})
407 go func() {
408 defer close(cleanupDone)
409 s.muIndexDir.Global(func() {
410 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
411 })
412 }()
413
414 repos.IterateIndexOptions(s.queue.AddOrUpdate)
415
416 // IterateIndexOptions will only iterate over repositories that have
417 // changed since we last called list. However, we want to add all IDs
418 // back onto the queue just to check that what is on disk is still
419 // correct. This will use the last IndexOptions we stored in the
420 // queue. The repositories not on the queue (missing) need a forced
421 // fetch of IndexOptions.
422 missing := s.queue.Bump(repos.IDs)
423 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
424
425 setShardsCounter(s.IndexDir)
426
427 <-cleanupDone
428 }
429 }()
430
431 go func() {
432 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
433 if s.shardMerging {
434 s.vacuum()
435 }
436 }
437 }()
438
439 go func() {
440 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
441 if s.shardMerging {
442 s.doMerge()
443 }
444 }
445 }()
446
447 for range s.IndexConcurrency {
448 go s.processQueue()
449 }
450
451 // block forever
452 select {}
453}
454
455// formatList returns a comma-separated list of the first min(len(v), m) items.
456func formatListUint32(v []uint32, m int) string {
457 if len(v) < m {
458 m = len(v)
459 }
460
461 sb := strings.Builder{}
462 for i := range m {
463 fmt.Fprintf(&sb, "%d, ", v[i])
464 }
465
466 if len(v) > m {
467 sb.WriteString("...")
468 }
469
470 return strings.TrimRight(sb.String(), ", ")
471}
472
473func (s *Server) processQueue() {
474 for {
475 if paused, _ := isIndexingPaused(s.IndexDir); paused {
476 time.Sleep(time.Second)
477 continue
478 }
479
480 item, ok := s.queue.Pop()
481 if !ok {
482 time.Sleep(time.Second)
483 continue
484 }
485
486 opts := item.Opts
487 args := s.indexArgs(opts)
488
489 ran := s.muIndexDir.With(opts.Name, func() {
490 // only record time taken once we hold the lock. This avoids us
491 // recording time taken while merging/cleanup runs.
492 start := time.Now()
493
494 state, err := s.index(context.Background(), args)
495
496 elapsed := time.Since(start)
497 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
498
499 indexDelay := time.Since(item.DateAddedToQueue)
500 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds())
501
502 if err != nil {
503 errorLog.Printf("error indexing %s: %s", args.String(), err)
504 }
505
506 switch state {
507 case indexStateSuccess:
508 var branches []string
509 for _, b := range args.Branches {
510 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
511 }
512 s.logger.Info("updated index",
513 sglog.Int("tenant", args.TenantID),
514 sglog.String("repo", args.Name),
515 sglog.Uint32("id", args.RepoID),
516 sglog.Strings("branches", branches),
517 sglog.Duration("duration", elapsed),
518 sglog.Duration("index_delay", indexDelay),
519 )
520 case indexStateSuccessMeta:
521 infoLog.Printf("updated meta %s in %v", args.String(), elapsed)
522 }
523 s.queue.SetIndexed(opts, state)
524 })
525
526 if !ran {
527 // Someone else is processing the repository. We can just skip this job
528 // since the repository will be added back to the queue and we will
529 // converge to the correct behaviour.
530 debugLog.Printf("index job for repository already running: %s", args)
531 continue
532 }
533 }
534}
535
536// repoNameForMetric returns a normalized version of the given repository name that is
537// suitable for use with Prometheus metrics.
538func repoNameForMetric(repo string) string {
539 // Check to see if we want to be able to capture separate indexing metrics for this repository.
540 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
541 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
542 return repo
543 }
544
545 return ""
546}
547
548func batched(slice []uint32, size int) <-chan []uint32 {
549 c := make(chan []uint32)
550 go func() {
551 for len(slice) > 0 {
552 if size > len(slice) {
553 size = len(slice)
554 }
555 c <- slice[:size]
556 slice = slice[size:]
557 }
558 close(c)
559 }()
560 return c
561}
562
563// jitterTicker returns a ticker which ticks with a jitter. Each tick is
564// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
565//
566// sig is a list of signals which also cause the ticker to fire. This is a
567// convenience to allow manually triggering of the ticker.
568func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
569 ticker := make(chan struct{})
570
571 go func() {
572 for {
573 ticker <- struct{}{}
574 ns := int64(d)
575 jitter := rand.Int63n(ns)
576 time.Sleep(time.Duration(ns/2 + jitter))
577 }
578 }()
579
580 go func() {
581 if len(sig) == 0 {
582 return
583 }
584
585 c := make(chan os.Signal, 1)
586 signal.Notify(c, sig...)
587 for range c {
588 ticker <- struct{}{}
589 }
590 }()
591
592 return ticker
593}
594
595// Index starts an index job for repo name at commit.
596func (s *Server) index(ctx context.Context, args *indexArgs) (state indexState, err error) {
597 tr := trace.New("index", args.Name)
598 tr.SetMaxEvents(30) // Ensure we capture all indexing events
599
600 defer func() {
601 if err != nil {
602 tr.SetError()
603 tr.LazyPrintf("error: %v", err)
604 state = indexStateFail
605 metricFailingTotal.Inc()
606 }
607 tr.LazyPrintf("state: %s", state)
608 tr.Finish()
609 }()
610
611 // Sourcegraph should always provide a tenant ID.
612 if args.TenantID < 1 {
613 return indexStateFail, tenant.ErrMissingTenant
614 }
615
616 tr.LazyPrintf("branches: %v", args.Branches)
617
618 if len(args.Branches) == 0 {
619 return indexStateEmpty, createEmptyShard(args)
620 }
621
622 repositoryName := args.Name
623 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
624 tr.LazyPrintf("marking this repository for delta build")
625 args.UseDelta = true
626 }
627
628 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
629
630 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
631 tr.LazyPrintf("skipping symbols calculation")
632 args.Symbols = false
633 }
634
635 reason := "forced"
636
637 if args.Incremental {
638 bo := args.BuildOptions()
639 bo.SetDefaults()
640 incrementalState, fn := bo.IndexState()
641 reason = string(incrementalState)
642 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
643
644 switch incrementalState {
645 case index.IndexStateEqual:
646 debugLog.Printf("%s index already up to date. Shard=%s", args.String(), fn)
647 return indexStateNoop, nil
648
649 case index.IndexStateMeta:
650 infoLog.Printf("updating index.meta %s", args.String())
651
652 // TODO(stefan) handle mergeMeta for tenant id.
653 if err := mergeMeta(bo); err != nil {
654 errorLog.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
655 } else {
656 return indexStateSuccessMeta, nil
657 }
658
659 case index.IndexStateCorrupt:
660 infoLog.Printf("falling back to full update: corrupt index: %s", args.String())
661 }
662 }
663
664 infoLog.Printf("updating index %s reason=%s", args.String(), reason)
665
666 metricIndexingTotal.Inc()
667 c := gitIndexConfig{
668 runCmd: func(cmd *exec.Cmd) error {
669 return s.loggedRun(tr, cmd)
670 },
671
672 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
673 return args.BuildOptions().FindRepositoryMetadata()
674 },
675 timeout: s.timeout,
676 }
677
678 err = gitIndex(ctx, c, args, s.Sourcegraph, s.logger)
679 if err != nil {
680 return indexStateFail, err
681 }
682
683 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
684 s.logger.Error("failed to update index status",
685 sglog.String("repo", args.Name),
686 sglog.Uint32("id", args.RepoID),
687 sglogBranches("branches", args.Branches),
688 sglog.Error(err),
689 )
690 }
691
692 return indexStateSuccess, nil
693}
694
695// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
696// it can update the zoekt_repos table.
697func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
698 // We need to read from disk for IndexTime.
699 _, metadata, ok, err := c.findRepositoryMetadata(args)
700 if err != nil {
701 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
702 }
703 if !ok {
704 return errors.New("failed to find metadata for new/updated index")
705 }
706
707 status := []indexStatus{{
708 RepoID: args.RepoID,
709 Branches: args.Branches,
710 IndexTimeUnix: metadata.IndexTime.Unix(),
711 }}
712 if err := sg.UpdateIndexStatus(status); err != nil {
713 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
714 }
715
716 return nil
717}
718
719func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
720 ss := make([]string, len(branches))
721 for i, b := range branches {
722 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
723 }
724 return sglog.Strings(key, ss)
725}
726
727func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
728 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
729 return &indexArgs{
730 IndexOptions: opts,
731 IndexDir: s.IndexDir,
732 Parallelism: parallelism,
733 Incremental: true,
734 ShardMerging: s.shardMerging,
735 }
736}
737
738// parallelism consults both the server flags and index options to determine the number
739// of shards to index in parallel. If the CPUCount index option is provided, it always
740// overrides the server flag.
741func (s *Server) parallelism(opts IndexOptions, maxProcs int) int {
742 var parallelism int
743 if opts.ShardConcurrency > 0 {
744 parallelism = int(opts.ShardConcurrency)
745 } else {
746 parallelism = s.CPUCount
747 }
748
749 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs
750 if parallelism > 4*maxProcs {
751 parallelism = 4 * maxProcs
752 }
753
754 // If index concurrency is set, then divide the parallelism by the number of
755 // repos we're indexing in parallel
756 if s.IndexConcurrency > 1 {
757 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency)))
758 }
759
760 return parallelism
761}
762
763func createEmptyShard(args *indexArgs) error {
764 bo := args.BuildOptions()
765 bo.SetDefaults()
766 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
767
768 if args.Incremental && bo.IncrementalSkipIndexing() {
769 return nil
770 }
771
772 builder, err := index.NewBuilder(*bo)
773 if err != nil {
774 return err
775 }
776 return builder.Finish()
777}
778
779// addDebugHandlers adds handlers specific to indexserver.
780func (s *Server) addDebugHandlers(mux *http.ServeMux) {
781 // Sourcegraph's site admin view requires indexserver to serve it's admin view
782 // on "/".
783 mux.Handle("/", http.HandlerFunc(s.handleRoot))
784
785 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
786 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
787 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
788 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
789 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
790 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
791}
792
793func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
794 if r.Method != "GET" {
795 w.Header().Set("Allow", "GET")
796 w.WriteHeader(http.StatusMethodNotAllowed)
797 return
798 }
799
800 response := struct {
801 Hostname string
802 }{
803 Hostname: s.hostname,
804 }
805
806 b, err := json.Marshal(response)
807 if err != nil {
808 http.Error(w, err.Error(), http.StatusInternalServerError)
809 return
810 }
811
812 w.Header().Set("Content-Type", "application/json; charset=utf-8")
813 w.Write(b)
814}
815
816var rootTmpl = template.Must(template.New("name").Parse(`
817<html>
818 <body>
819 <a href="debug">Debug</a><br />
820 <a href="debug/requests">Traces</a><br />
821 {{.IndexMsg}}<br />
822 <br />
823 <h3>Reindex</h3>
824 {{if .Repos}}
825 <a href="?show_repos=false">hide repos</a><br />
826 <table style="margin-top: 20px">
827 <th style="text-align:left">Name</th>
828 <th style="text-align:left">ID (click to reindex)</th>
829 {{range .Repos}}
830 <tr>
831 <td>{{.Name}}</td>
832 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
833 </tr>
834 {{end}}
835 </table>
836 {{else}}
837 <a href="?show_repos=true">show repos</a><br />
838 {{end}}
839 </body>
840</html>
841`))
842
843func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
844 if r.Method != "GET" {
845 w.Header().Set("Allow", "GET")
846 w.WriteHeader(http.StatusMethodNotAllowed)
847 return
848 }
849
850 values := r.URL.Query()
851
852 // ?id=
853 indexMsg := ""
854 if v := values.Get("id"); v != "" {
855 id, err := strconv.Atoi(v)
856 if err != nil {
857 http.Error(w, err.Error(), http.StatusBadRequest)
858 return
859 }
860 indexMsg, _ = s.forceIndex(r.Context(), uint32(id))
861 }
862
863 // ?show_repos=
864 showRepos := false
865 if v := values.Get("show_repos"); v != "" {
866 showRepos, _ = strconv.ParseBool(v)
867 }
868
869 type Repo struct {
870 ID uint32
871 Name string
872 }
873 var data struct {
874 Repos []Repo
875 IndexMsg string
876 }
877
878 data.IndexMsg = indexMsg
879
880 if showRepos {
881 s.queue.Iterate(func(opts *IndexOptions) {
882 data.Repos = append(data.Repos, Repo{
883 ID: opts.RepoID,
884 Name: opts.Name,
885 })
886 })
887 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
888 }
889
890 _ = rootTmpl.Execute(w, data)
891}
892
893// handleReindex triggers a reindex asynocronously. If a reindex was triggered
894// the request returns with status 202. The caller can infer the new state of
895// the index by calling List.
896func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
897 if r.Method != http.MethodPost {
898 w.Header().Set("Allow", http.MethodPost)
899 w.WriteHeader(http.StatusMethodNotAllowed)
900 return
901 }
902
903 err := r.ParseForm()
904 if err != nil {
905 http.Error(w, err.Error(), http.StatusBadRequest)
906 return
907 }
908
909 id, err := strconv.Atoi(r.Form.Get("repo"))
910 if err != nil {
911 http.Error(w, err.Error(), http.StatusBadRequest)
912 return
913 }
914
915 go func() { s.forceIndex(r.Context(), uint32(id)) }()
916
917 // 202 Accepted
918 w.WriteHeader(http.StatusAccepted)
919}
920
921func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
922 withIndexed := true
923 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
924 withIndexed = b
925 }
926
927 var indexed []uint32
928 if withIndexed {
929 indexed = listIndexed(s.IndexDir)
930 }
931
932 repos, err := s.Sourcegraph.List(r.Context(), indexed)
933 if err != nil {
934 http.Error(w, err.Error(), http.StatusInternalServerError)
935 return
936 }
937
938 bw := bytes.Buffer{}
939
940 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
941
942 _, err = fmt.Fprintf(tw, "ID\tName\n")
943 if err != nil {
944 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
945 return
946 }
947
948 s.queue.mu.Lock()
949 name := ""
950 for _, id := range repos.IDs {
951 if item := s.queue.get(id); item != nil {
952 name = item.opts.Name
953 } else {
954 name = ""
955 }
956 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
957 if err != nil {
958 debugLog.Printf("handleDebugList: %s\n", err.Error())
959 }
960 }
961 s.queue.mu.Unlock()
962
963 if err != nil {
964 http.Error(w, err.Error(), http.StatusInternalServerError)
965 return
966 }
967
968 err = tw.Flush()
969 if err != nil {
970 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
971 return
972 }
973
974 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
975
976 if _, err := io.Copy(w, &bw); err != nil {
977 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
978 return
979 }
980}
981
982// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
983// can run this command during periods of low usage (evenings, weekends) to
984// trigger an initial merge run. In the steady-state, merges happen rarely, even
985// on busy instances, and users can rely on automatic merging instead.
986func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
987 // A merge operation can take very long, depending on the number merges and the
988 // target size of the compound shards. We run the merge in the background and
989 // return immediately to the user.
990 //
991 // We track the status of the merge with metricShardMergingRunning.
992 go func() {
993 s.doMerge()
994 }()
995 _, _ = w.Write([]byte("merging enqueued\n"))
996}
997
998func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
999 indexed := listIndexed(s.IndexDir)
1000
1001 bw := bytes.Buffer{}
1002
1003 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
1004
1005 _, err := fmt.Fprintf(tw, "ID\tName\n")
1006 if err != nil {
1007 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
1008 return
1009 }
1010
1011 s.queue.mu.Lock()
1012 name := ""
1013 for _, id := range indexed {
1014 if item := s.queue.get(id); item != nil {
1015 name = item.opts.Name
1016 } else {
1017 name = ""
1018 }
1019 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
1020 if err != nil {
1021 debugLog.Printf("handleDebugIndexed: %s\n", err.Error())
1022 }
1023 }
1024 s.queue.mu.Unlock()
1025
1026 if err != nil {
1027 http.Error(w, err.Error(), http.StatusInternalServerError)
1028 return
1029 }
1030
1031 err = tw.Flush()
1032 if err != nil {
1033 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
1034 return
1035 }
1036
1037 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
1038
1039 if _, err := io.Copy(w, &bw); err != nil {
1040 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
1041 return
1042 }
1043}
1044
1045// forceIndex will run the index job for repo name now. It will return always
1046// return a string explaining what it did, even if it failed.
1047func (s *Server) forceIndex(ctx context.Context, id uint32) (string, error) {
1048 var opts IndexOptions
1049 var err error
1050 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
1051 opts = o
1052 }, func(_ uint32, e error) {
1053 err = e
1054 }, id)
1055 if err != nil {
1056 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
1057 }
1058
1059 args := s.indexArgs(opts)
1060 args.Incremental = false // force re-index
1061
1062 var state indexState
1063 ran := s.muIndexDir.With(opts.Name, func() {
1064 state, err = s.index(ctx, args)
1065 })
1066 if !ran {
1067 return fmt.Sprintf("index job for repository already running: %s", args), nil
1068 }
1069 if err != nil {
1070 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
1071 }
1072 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
1073}
1074
1075// DeleteAllData deletes all shards in the index and trash dir belonging to the
1076// tenant associated with the request. The deletion is best-effort, which means
1077// we will delete as much as possible. If no error is returned, the caller can
1078// be certain that all data has been deleted.
1079func (s *Server) DeleteAllData(ctx context.Context, _ *indexserverv1.DeleteAllDataRequest) (*indexserverv1.DeleteAllDataResponse, error) {
1080 tnt, err := tenant.FromContext(ctx)
1081 if err != nil {
1082 return nil, err
1083 }
1084 s.logger.Warn("DeleteAllData", sglog.Int("tenant_id", tnt.ID()))
1085
1086 var merr error
1087 s.muIndexDir.Global(func() {
1088 // First, explode all compound shards that have repos from the tenant in
1089 // question. Because we hold the global lock, we can be sure that no new
1090 // merges start while we do this.
1091 if err := s.explodeTenantCompoundShards(ctx, func(path string) error {
1092 // We call explode in a separate process to protect indexserver.
1093 cmd := defaultExplodeCmd(path)
1094
1095 stdoutBuf := &bytes.Buffer{}
1096 stderrBuf := &bytes.Buffer{}
1097 cmd.Stdout = stdoutBuf
1098 cmd.Stderr = stderrBuf
1099
1100 err := cmd.Run()
1101 if err != nil {
1102 errorLog.Printf("explode failed: %v (stderr: %s)", err, stderrBuf.String())
1103 return err
1104 }
1105
1106 infoLog.Printf("exploded shard: %s", stdoutBuf.String())
1107
1108 return nil
1109 }); err != nil {
1110 merr = multierr.Append(merr, err)
1111 }
1112
1113 // Invariant: all shards from the tenant are simple shards.
1114
1115 if err := purgeTenantShards(ctx, s.IndexDir); err != nil {
1116 merr = multierr.Append(merr, err)
1117 }
1118 if err := purgeTenantShards(ctx, filepath.Join(s.IndexDir, ".trash")); err != nil {
1119 merr = multierr.Append(merr, err)
1120 }
1121 })
1122
1123 return &indexserverv1.DeleteAllDataResponse{}, merr
1124}
1125
1126func listIndexed(indexDir string) []uint32 {
1127 index := getShards(indexDir)
1128 metricNumIndexed.Set(float64(len(index)))
1129 repoIDs := make([]uint32, 0, len(index))
1130 for id := range index {
1131 repoIDs = append(repoIDs, id)
1132 }
1133 slices.Sort(repoIDs)
1134 return repoIDs
1135}
1136
1137// setupTmpDir sets up a temporary directory on the same volume as the
1138// indexes.
1139//
1140// If main is true we will delete older temp directories left around. main is
1141// false when this is a debug command.
1142func setupTmpDir(logger sglog.Logger, main bool, index string) error {
1143 // change the target tmp directory depending on if it's our main daemon or a
1144 // debug sub command.
1145 dir := ".indexserver.debug.tmp"
1146 if main {
1147 dir = ".indexserver.tmp"
1148 }
1149
1150 tmpRoot := filepath.Join(index, dir)
1151
1152 if main {
1153 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot))
1154 err := os.RemoveAll(tmpRoot)
1155 if err != nil {
1156 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err))
1157 }
1158 }
1159
1160 if err := os.MkdirAll(tmpRoot, 0o755); err != nil {
1161 return err
1162 }
1163
1164 return os.Setenv("TMPDIR", tmpRoot)
1165}
1166
1167func printMetaData(fn string) error {
1168 repo, indexMeta, err := index.ReadMetadataPath(fn)
1169 if err != nil {
1170 return err
1171 }
1172
1173 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1174 if err != nil {
1175 return err
1176 }
1177
1178 err = json.NewEncoder(os.Stdout).Encode(repo)
1179 if err != nil {
1180 return err
1181 }
1182 return nil
1183}
1184
1185func printShardStats(fn string) error {
1186 f, err := os.Open(fn)
1187 if err != nil {
1188 return err
1189 }
1190
1191 iFile, err := index.NewIndexFile(f)
1192 if err != nil {
1193 return err
1194 }
1195
1196 return index.PrintNgramStats(iFile)
1197}
1198
1199func srcLogLevelIsDebug() bool {
1200 lvl := os.Getenv(sglog.EnvLogLevel)
1201 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1202}
1203
1204func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1205 v := os.Getenv(k)
1206 if v == "" {
1207 return defaultVal
1208 }
1209 b, err := strconv.ParseBool(v)
1210 if err != nil {
1211 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1212 }
1213 return b
1214}
1215
1216func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1217 v := os.Getenv(k)
1218 if v == "" {
1219 return defaultVal
1220 }
1221 i, err := strconv.ParseInt(v, 10, 64)
1222 if err != nil {
1223 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1224 }
1225 return i
1226}
1227
1228func getEnvWithDefaultInt(k string, defaultVal int) int {
1229 v := os.Getenv(k)
1230 if v == "" {
1231 return defaultVal
1232 }
1233 i, err := strconv.Atoi(k)
1234 if err != nil {
1235 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1236 }
1237 return i
1238}
1239
1240func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1241 v := os.Getenv(k)
1242 if v == "" {
1243 return defaultVal
1244 }
1245 i, err := strconv.ParseUint(v, 10, 64)
1246 if err != nil {
1247 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1248 }
1249 return i
1250}
1251
1252func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1253 v := os.Getenv(k)
1254 if v == "" {
1255 return defaultVal
1256 }
1257 f, err := strconv.ParseFloat(v, 64)
1258 if err != nil {
1259 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1260 }
1261 return f
1262}
1263
1264func getEnvWithDefaultString(k string, defaultVal string) string {
1265 v := os.Getenv(k)
1266 if v == "" {
1267 return defaultVal
1268 }
1269 return v
1270}
1271
1272func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1273 v := os.Getenv(k)
1274 if v == "" {
1275 return defaultVal
1276 }
1277
1278 d, err := time.ParseDuration(v)
1279 if err != nil {
1280 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1281 }
1282 return d
1283}
1284
1285func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1286 set := map[string]struct{}{}
1287 for _, v := range strings.Split(os.Getenv(k), ",") {
1288 v = strings.TrimSpace(v)
1289 if v != "" {
1290 set[v] = struct{}{}
1291 }
1292 }
1293 return set
1294}
1295
1296func joinStringSet(set map[string]struct{}, sep string) string {
1297 var xs []string
1298 for x := range set {
1299 xs = append(xs, x)
1300 }
1301
1302 return strings.Join(xs, sep)
1303}
1304
1305func setShardsCounter(indexDir string) {
1306 fns, err := filepath.Glob(filepath.Join(indexDir, "*.zoekt"))
1307 if err != nil {
1308 errorLog.Printf("setShardsCounter: %s\n", err)
1309 return
1310 }
1311 metricNumberShards.Set(float64(len(fns)))
1312
1313 compoundFns := make([]string, 0, len(fns))
1314 for _, fn := range fns {
1315 if strings.HasPrefix(filepath.Base(fn), "compound-") {
1316 compoundFns = append(compoundFns, fn)
1317 }
1318 }
1319 metricNumberCompoundShards.Set(float64(len(compoundFns)))
1320}
1321
1322func rootCmd() *ffcli.Command {
1323 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1324 conf := rootConfig{
1325 Main: true,
1326 }
1327 conf.registerRootFlags(rootFs)
1328
1329 return &ffcli.Command{
1330 FlagSet: rootFs,
1331 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1332 Subcommands: []*ffcli.Command{debugCmd()},
1333 Exec: func(ctx context.Context, args []string) error {
1334 return startServer(conf)
1335 },
1336 }
1337}
1338
1339type rootConfig struct {
1340 // Main is true if this rootConfig is for our main long running command (the
1341 // indexserver). Debug commands should not set this value. This is used to
1342 // determine if we need to run tmpfriend.
1343 Main bool
1344
1345 root string
1346 interval time.Duration
1347 index string
1348 indexConcurrency int64
1349 listen string
1350 hostname string
1351 cpuFraction float64
1352
1353 // config values related to shard merging
1354 disableShardMerging bool
1355 vacuumInterval time.Duration
1356 mergeInterval time.Duration
1357 targetSize int64
1358 minSize int64
1359 minAgeDays int
1360
1361 // config values related to backoff indexing repos with one or more consecutive failures
1362 backoffDuration time.Duration
1363 maxBackoffDuration time.Duration
1364}
1365
1366func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1367 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1368 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1369 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
1370 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", index.DefaultDir), "set index directory to use")
1371 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1372 fs.StringVar(&rc.hostname, "hostname", index.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1373 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1374 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1375 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1376
1377 // flags related to shard merging
1378 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging")
1379 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1380 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1381 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB")
1382 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB")
1383 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1384}
1385
1386func startServer(conf rootConfig) error {
1387 s, err := newServer(conf)
1388 if err != nil {
1389 return err
1390 }
1391
1392 profiler.Init("zoekt-sourcegraph-indexserver")
1393 setShardsCounter(s.IndexDir)
1394
1395 if conf.listen != "" {
1396
1397 mux := http.NewServeMux()
1398 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1399 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1400 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1401 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1402 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1403 }...)
1404 s.addDebugHandlers(mux)
1405
1406 go func() {
1407 debugLog.Printf("serving HTTP on %s", conf.listen)
1408 mux := grpcutil.MultiplexGRPC(newGRPCServer(sglog.Scoped("indexserver"), s), mux)
1409 log.Fatal(http.ListenAndServe(conf.listen, mux))
1410 }()
1411
1412 // Serve mux on a unix domain socket on a best-effort-basis so that
1413 // webserver can call the endpoints via the shared filesystem.
1414 //
1415 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1416 // on the socket due to permission errors. See
1417 // https://github.com/docker/for-mac/issues/6239
1418 go func() {
1419 serveHTTPOverSocket := func() error {
1420 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1421 // We cannot bind a socket to an existing pathname.
1422 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1423 return fmt.Errorf("error removing socket file: %s", socket)
1424 }
1425 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1426 // unixpacket).
1427 l, err := net.Listen("unix", socket)
1428 if err != nil {
1429 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1430 }
1431 // Indexserver (root) and webserver (Sourcegraph) run with
1432 // different users. Per default, the socket is created with
1433 // permission 755 (root root), which doesn't let webserver write to
1434 // it.
1435 //
1436 // See https://github.com/golang/go/issues/11822 for more context.
1437 if err := os.Chmod(socket, 0o777); err != nil {
1438 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1439 }
1440 debugLog.Printf("serving HTTP on %s", socket)
1441 return http.Serve(l, mux)
1442 }
1443 debugLog.Print(serveHTTPOverSocket())
1444 }()
1445 }
1446
1447 oc := &ownerChecker{
1448 Path: filepath.Join(conf.index, "owner.txt"),
1449 Hostname: conf.hostname,
1450 }
1451 go oc.Run()
1452
1453 logger := sglog.Scoped("metricsRegistration")
1454 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1455
1456 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1457 prometheus.DefaultRegisterer.MustRegister(c)
1458
1459 s.Run()
1460 return nil
1461}
1462
1463func newServer(conf rootConfig) (*Server, error) {
1464 logger := sglog.Scoped("server")
1465
1466 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1467 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1468 }
1469 if conf.index == "" {
1470 return nil, fmt.Errorf("must set -index")
1471 }
1472 if conf.root == "" {
1473 return nil, fmt.Errorf("must set -sourcegraph_url")
1474 }
1475 rootURL, err := url.Parse(conf.root)
1476 if err != nil {
1477 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1478 }
1479
1480 rootURL = addDefaultPort(rootURL)
1481
1482 // Tune GOMAXPROCS to match Linux container CPU quota.
1483 _, _ = maxprocs.Set()
1484
1485 // Automatically prepend our own path at the front, to minimize
1486 // required configuration.
1487 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1488 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1489 }
1490
1491 if _, err := os.Stat(conf.index); err != nil {
1492 if err := os.MkdirAll(conf.index, 0o755); err != nil {
1493 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1494 }
1495 }
1496
1497 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil {
1498 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1499 }
1500
1501 if srcLogLevelIsDebug() {
1502 debugLog.SetOutput(os.Stderr)
1503 }
1504
1505 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1506 if len(reposWithSeparateIndexingMetrics) > 0 {
1507 debugLog.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1508 }
1509
1510 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1511 if len(deltaBuildRepositoriesAllowList) > 0 {
1512 debugLog.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1513 }
1514
1515 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1516 if deltaShardNumberFallbackThreshold > 0 {
1517 debugLog.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1518 } else {
1519 debugLog.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1520 }
1521
1522 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1523 if len(reposShouldSkipSymbolsCalculation) > 0 {
1524 debugLog.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1525 }
1526
1527 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout)
1528 if indexingTimeout != defaultIndexingTimeout {
1529 debugLog.Printf("using configured indexing timeout: %s", indexingTimeout)
1530 }
1531
1532 var sg Sourcegraph
1533 if rootURL.IsAbs() {
1534 var batchSize int
1535 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1536 batchSize, err = strconv.Atoi(v)
1537 if err != nil {
1538 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1539 }
1540 }
1541
1542 opts := []SourcegraphClientOption{
1543 WithBatchSize(batchSize),
1544 }
1545
1546 logger := sglog.Scoped("zoektConfigurationGRPCClient")
1547 client, err := dialGRPCClient(rootURL.Host, logger)
1548 if err != nil {
1549 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1550 }
1551
1552 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...)
1553
1554 } else {
1555 sg = sourcegraphFake{
1556 RootDir: rootURL.String(),
1557 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1558 }
1559 }
1560
1561 cpuCount := max(int(math.Round(float64(runtime.GOMAXPROCS(0))*(conf.cpuFraction))), 1)
1562
1563 if conf.indexConcurrency < 1 {
1564 conf.indexConcurrency = 1
1565 } else if conf.indexConcurrency > int64(cpuCount) {
1566 conf.indexConcurrency = int64(cpuCount)
1567 }
1568
1569 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1570
1571 return &Server{
1572 logger: logger,
1573 rootURL: rootURL,
1574 Sourcegraph: sg,
1575 IndexDir: conf.index,
1576 IndexConcurrency: int(conf.indexConcurrency),
1577 Interval: conf.interval,
1578 CPUCount: cpuCount,
1579 queue: *q,
1580 shardMerging: !conf.disableShardMerging,
1581 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1582 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1583 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1584 hostname: conf.hostname,
1585 mergeOpts: mergeOpts{
1586 vacuumInterval: conf.vacuumInterval,
1587 mergeInterval: conf.mergeInterval,
1588 targetSizeBytes: conf.targetSize * 1024 * 1024,
1589 minSizeBytes: conf.minSize * 1024 * 1024,
1590 minAgeDays: conf.minAgeDays,
1591 },
1592 timeout: indexingTimeout,
1593 indexSemaphore: make(chan struct{}, int(conf.indexConcurrency)),
1594 }, err
1595}
1596
1597func newGRPCServer(logger sglog.Logger, s *Server, additionalOpts ...grpc.ServerOption) *grpc.Server {
1598 grpcServer := defaults.NewServer(logger, additionalOpts...)
1599 indexserverv1.RegisterSourcegraphIndexserverServiceServer(grpcServer, s)
1600 return grpcServer
1601}
1602
1603// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1604// for the indexed-search-configuration gRPC service.
1605//
1606// The default backoff strategy is modeled after the default settings used by
1607// retryablehttp.DefaultClient.
1608//
1609// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1610// - Unavailable
1611// - Aborted
1612//
1613//go:embed default_grpc_service_configuration.json
1614var defaultGRPCServiceConfigurationJSON string
1615
1616func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1617 return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1618 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1619 return invoker(ctx, method, req, reply, cc, opts...)
1620 }
1621}
1622
1623func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1624 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1625 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1626 return streamer(ctx, desc, cc, method, opts...)
1627 }
1628}
1629
1630// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1631// This can be overridden by providing custom Server/Dial options.
1632const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1633
1634func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (configv1.ZoektConfigurationServiceClient, error) {
1635 metrics := clientMetricsOnce()
1636
1637 // If the service seems to be unavailable, this
1638 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1
1639 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s])
1640 retryOpts := []retry.CallOption{
1641 retry.WithMax(5),
1642 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)),
1643 retry.WithCodes(codes.Unavailable),
1644 }
1645
1646 opts := []grpc.DialOption{
1647 grpc.WithTransportCredentials(insecure.NewCredentials()),
1648 grpc.WithChainStreamInterceptor(
1649 metrics.StreamClientInterceptor(),
1650 messagesize.StreamClientInterceptor,
1651 internalActorStreamInterceptor(),
1652 internalerrs.LoggingStreamClientInterceptor(logger),
1653 internalerrs.PrometheusStreamClientInterceptor,
1654 retry.StreamClientInterceptor(retryOpts...),
1655 ),
1656 grpc.WithChainUnaryInterceptor(
1657 metrics.UnaryClientInterceptor(),
1658 messagesize.UnaryClientInterceptor,
1659 internalActorUnaryInterceptor(),
1660 internalerrs.LoggingUnaryClientInterceptor(logger),
1661 internalerrs.PrometheusUnaryClientInterceptor,
1662 retry.UnaryClientInterceptor(retryOpts...),
1663 ),
1664 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1665 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1666 }
1667
1668 opts = append(opts, additionalOpts...)
1669
1670 // Ensure that the message size options are set last, so they override any other
1671 // client-specific options that tweak the message size.
1672 //
1673 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1674 // take precedence over everything else with a uniform size setting that's easy to reason about.
1675 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1676
1677 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1678 // This is done lazily, so we can provide the client to use regardless of
1679 // whether we enabled gRPC or not initially.
1680 cc, err := grpc.Dial(addr, opts...)
1681 if err != nil {
1682 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1683 }
1684
1685 client := configv1.NewZoektConfigurationServiceClient(cc)
1686 return client, nil
1687}
1688
1689// addDefaultPort adds a default port to a URL if one is not specified.
1690//
1691// If the URL scheme is "http" and no port is specified, "80" is used.
1692// If the scheme is "https", "443" is used.
1693//
1694// The original URL is not mutated. A copy is modified and returned.
1695func addDefaultPort(original *url.URL) *url.URL {
1696 if original == nil {
1697 return nil // don't panic
1698 }
1699
1700 if !original.IsAbs() {
1701 return original // don't do anything if the URL doesn't look like a remote URL
1702 }
1703
1704 if original.Scheme == "http" && original.Port() == "" {
1705 u := cloneURL(original)
1706 u.Host = net.JoinHostPort(u.Host, "80")
1707 return u
1708 }
1709
1710 if original.Scheme == "https" && original.Port() == "" {
1711 u := cloneURL(original)
1712 u.Host = net.JoinHostPort(u.Host, "443")
1713 return u
1714 }
1715
1716 return original
1717}
1718
1719// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1720// This is copied from net/http/clone.go
1721func cloneURL(u *url.URL) *url.URL {
1722 if u == nil {
1723 return nil
1724 }
1725 u2 := new(url.URL)
1726 *u2 = *u
1727 if u.User != nil {
1728 u2.User = new(url.Userinfo)
1729 *u2.User = *u.User
1730 }
1731 return u2
1732}
1733
1734func main() {
1735 liblog := sglog.Init(sglog.Resource{
1736 Name: "zoekt-indexserver",
1737 Version: index.Version,
1738 InstanceID: index.HostnameBestEffort(),
1739 })
1740 defer liblog.Sync()
1741
1742 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1743 log.Fatal(err)
1744 }
1745}
1746
1747// getBoolFromEnvironmentVariables returns the boolean defined by the first environment
1748// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set.
1749//
1750// An error is returned of the provided environment variables fails to parse as a boolean.
1751func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) {
1752 for _, envVar := range envVarNames {
1753 v := os.Getenv(envVar)
1754 if v == "" {
1755 continue
1756 }
1757
1758 b, err := strconv.ParseBool(v)
1759 if err != nil {
1760 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err)
1761 }
1762
1763 return b, nil
1764 }
1765
1766 return defaultBool, nil
1767}