fork of https://github.com/sourcegraph/zoekt
1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5// http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12
13// Command zoekt-sourcegraph-indexserver periodically reindexes repositories
14// from a Sourcegraph instance. It uses a "pull-based" design, where it periodically
15// reaches out to the Sourcegraph instance for the list of repositories to reindex.
16package main
17
18import (
19 "bytes"
20 "context"
21 _ "embed"
22 "encoding/json"
23 "errors"
24 "flag"
25 "fmt"
26 "html/template"
27 "io"
28 "io/fs"
29 "log"
30 "math"
31 "math/rand"
32 "net"
33 "net/http"
34 "net/url"
35 "os"
36 "os/exec"
37 "os/signal"
38 "path/filepath"
39 "runtime"
40 "slices"
41 "sort"
42 "strconv"
43 "strings"
44 "sync"
45 "text/tabwriter"
46 "time"
47
48 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
49 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
50 "github.com/peterbourgon/ff/v3/ffcli"
51 "github.com/prometheus/client_golang/prometheus"
52 "github.com/prometheus/client_golang/prometheus/promauto"
53 sglog "github.com/sourcegraph/log"
54 "github.com/sourcegraph/mountinfo"
55
56 "github.com/sourcegraph/zoekt"
57 configv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/sourcegraph/zoekt/configuration/v1"
58 indexserverv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/zoekt/indexserver/v1"
59 "github.com/sourcegraph/zoekt/grpc/defaults"
60 "github.com/sourcegraph/zoekt/grpc/grpcutil"
61 "github.com/sourcegraph/zoekt/grpc/internalerrs"
62 "github.com/sourcegraph/zoekt/grpc/messagesize"
63 "github.com/sourcegraph/zoekt/index"
64 "github.com/sourcegraph/zoekt/internal/debugserver"
65 "github.com/sourcegraph/zoekt/internal/profiler"
66 "github.com/sourcegraph/zoekt/internal/tenant"
67
68 "go.uber.org/automaxprocs/maxprocs"
69 "go.uber.org/multierr"
70 "golang.org/x/net/trace"
71 "golang.org/x/sys/unix"
72 "google.golang.org/grpc"
73 "google.golang.org/grpc/codes"
74 "google.golang.org/grpc/credentials/insecure"
75 "google.golang.org/grpc/metadata"
76)
77
78var (
79 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
80 Name: "resolve_revisions_seconds",
81 Help: "A histogram of latencies for resolving all repository revisions.",
82 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
83 })
84
85 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
86 Name: "resolve_revision_seconds",
87 Help: "A histogram of latencies for resolving a repository revision.",
88 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
89 }, []string{"success"}) // success=true|false
90
91 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
92 Name: "get_index_options_total",
93 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
94 })
95 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
96 Name: "get_index_options_error_total",
97 Help: "The total number of times we failed to get index options for a repository.",
98 })
99
100 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
101 Name: "index_repo_seconds",
102 Help: "A histogram of latencies for indexing a repository.",
103 Buckets: prometheus.ExponentialBucketsRange(
104 (100 * time.Millisecond).Seconds(),
105 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
106 20),
107 }, []string{
108 "state", // state is an indexState
109 "name", // name of the repository that was indexed
110 })
111
112 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{
113 Name: "index_indexing_delay_seconds",
114 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.",
115 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days
116 }, []string{
117 "state", // state is an indexState
118 "name", // the name of the repository that was indexed
119 })
120
121 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
122 Name: "index_fetch_seconds",
123 Help: "A histogram of latencies for fetching a repository.",
124 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
125 }, []string{
126 "success", // true|false
127 "name", // the name of the repository that the commits were fetched from
128 })
129
130 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
131 Name: "index_incremental_index_state",
132 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
133 }, []string{"state"}) // state is index.IndexState
134
135 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
136 Name: "index_num_indexed",
137 Help: "Number of indexed repos by code host",
138 })
139
140 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
141 Name: "index_failing_total",
142 Help: "Counts failures to index (indexing activity, should be used with rate())",
143 })
144
145 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
146 Name: "index_indexing_total",
147 Help: "Counts indexings (indexing activity, should be used with rate())",
148 })
149
150 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
151 Name: "index_num_stopped_tracking_total",
152 Help: "Counts the number of repos we stopped tracking.",
153 })
154
155 // clientMetricsOnce returns a singleton instance of the client metrics
156 // that are shared across all gRPC clients that this process creates.
157 //
158 // This function panics if the metrics cannot be registered with the default
159 // Prometheus registry.
160 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics {
161 clientMetrics := grpcprom.NewClientMetrics(
162 grpcprom.WithClientCounterOptions(),
163 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
164 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
165 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
166 )
167 prometheus.DefaultRegisterer.MustRegister(clientMetrics)
168 return clientMetrics
169 })
170)
171
172// 1 MB; match https://sourcegraph.sourcegraph.com/r/github.com/sourcegraph/sourcegraph/-/blob/cmd/searcher/internal/search/store.go?L32
173const MaxFileSize = 1 << 20
174
175// set of repositories that we want to capture separate indexing metrics for
176var reposWithSeparateIndexingMetrics = make(map[string]struct{})
177
178type indexState string
179
180const (
181 indexStateFail indexState = "fail"
182 indexStateSuccess indexState = "success"
183 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
184 indexStateNoop indexState = "noop" // We didn't need to update index
185 indexStateEmpty indexState = "empty" // index is empty (empty repo)
186)
187
188// Server is the main functionality of zoekt-sourcegraph-indexserver. It
189// exists to conveniently use all the options passed in via func main.
190type Server struct {
191 logger sglog.Logger
192
193 Sourcegraph Sourcegraph
194
195 // rootURL is the root URL of the Sourcegraph instance.
196 rootURL *url.URL
197
198 BatchSize int
199
200 // IndexDir is the index directory to use.
201 IndexDir string
202
203 // IndexConcurrency is the number of repositories we index at once.
204 IndexConcurrency int
205
206 // indexSemaphore limits the number of concurrent index operations
207 indexSemaphore chan struct{}
208
209 // Interval is how often we sync with Sourcegraph.
210 Interval time.Duration
211
212 // CPUCount is the number of CPUs to use for indexing shards.
213 CPUCount int
214
215 queue Queue
216
217 // muIndexDir protects the index directory from concurrent access.
218 muIndexDir indexMutex
219
220 // If true, shard merging is enabled.
221 shardMerging bool
222
223 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
224 // use delta-builds for instead of normal builds
225 deltaBuildRepositoriesAllowList map[string]struct{}
226
227 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
228 // before attempting a delta build.
229 deltaShardNumberFallbackThreshold uint64
230
231 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
232 // we skip calculating symbols metadata for during builds
233 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
234
235 hostname string
236
237 mergeOpts mergeOpts
238
239 // timeout defines how long the index server waits before killing an indexing job.
240 timeout time.Duration
241
242 indexserverv1.UnimplementedSourcegraphIndexserverServiceServer
243}
244
245var (
246 debugLog = log.New(io.Discard, "[DEBUG] ", log.LstdFlags)
247 infoLog = log.New(os.Stderr, "[INFO] ", log.LstdFlags)
248 errorLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags)
249)
250
251// our index commands should output something every 100mb they process.
252//
253// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
254// time." famous last words. A client was indexing a monorepo with 42
255// cores... 5m was not enough.
256const noOutputTimeout = 30 * time.Minute
257
258func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
259 out := &synchronizedBuffer{}
260 cmd.Stdout = out
261 cmd.Stderr = out
262
263 tr.LazyPrintf("%s", cmd.Args)
264
265 defer func() {
266 if err != nil {
267 outS := out.String()
268 tr.LazyPrintf("failed: %v", err)
269 tr.LazyPrintf("output: %s", out)
270 tr.SetError()
271 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
272 }
273 }()
274
275 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
276
277 if err := cmd.Start(); err != nil {
278 return err
279 }
280
281 errC := make(chan error)
282 go func() {
283 errC <- cmd.Wait()
284 }()
285
286 // This channel is set after we have sent sigquit. It allows us to follow up
287 // with a sigkill if the process doesn't quit after sigquit.
288 kill := make(<-chan time.Time)
289
290 lastLen := 0
291 for {
292 select {
293 case <-time.After(noOutputTimeout):
294 // Periodically check if we have had output. If not kill the process.
295 if out.Len() != lastLen {
296 lastLen = out.Len()
297 infoLog.Printf("still running %s", cmd.Args)
298 } else {
299 // Send quit (C-\) first so we get a stack dump.
300 infoLog.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
301 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
302 errorLog.Println("quit failed:", err)
303 }
304
305 // send sigkill if still running in 10s
306 kill = time.After(10 * time.Second)
307 }
308
309 case <-kill:
310 infoLog.Printf("still running, killing %s", cmd.Args)
311 if err := cmd.Process.Kill(); err != nil {
312 errorLog.Println("kill failed:", err)
313 }
314
315 case err := <-errC:
316 if err != nil {
317 return err
318 }
319
320 tr.LazyPrintf("success")
321 return nil
322 }
323 }
324}
325
326// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
327// monitor the buffer while it is being written to.
328type synchronizedBuffer struct {
329 mu sync.Mutex
330 b bytes.Buffer
331}
332
333func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
334 sb.mu.Lock()
335 defer sb.mu.Unlock()
336 return sb.b.Write(p)
337}
338
339func (sb *synchronizedBuffer) Len() int {
340 sb.mu.Lock()
341 defer sb.mu.Unlock()
342 return sb.b.Len()
343}
344
345func (sb *synchronizedBuffer) String() string {
346 sb.mu.Lock()
347 defer sb.mu.Unlock()
348 return sb.b.String()
349}
350
351// pauseFileName if present in IndexDir will stop index jobs from
352// running. This is to make it possible to experiment with the content of the
353// IndexDir without the indexserver writing to it.
354const (
355 pauseFileName = "PAUSE"
356 pauseEnvVar = "SRC_PAUSE_INDEXING"
357)
358
359// isIndexingPaused checks if indexing should be paused based on either:
360// 1. The presence of a PAUSE file in the index directory
361// 2. The SRC_PAUSE_INDEXING environment variable being set to true
362func isIndexingPaused(indexDir string) (bool, string) {
363 // Check for PAUSE file first
364 if b, err := os.ReadFile(filepath.Join(indexDir, pauseFileName)); err == nil {
365 return true, fmt.Sprintf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
366 }
367
368 // Then check environment variable
369 if getEnvWithDefaultBool(pauseEnvVar, false) {
370 return true, fmt.Sprintf("indexserver paused via %s environment variable", pauseEnvVar)
371 }
372
373 return false, ""
374}
375
376// Run the sync loop. This blocks forever.
377func (s *Server) Run() {
378 removeIncompleteShards(s.IndexDir)
379
380 // Start a goroutine which updates the queue with commits to index.
381 go func() {
382 // We update the list of indexed repos every Interval. To speed up manual
383 // testing we also listen for SIGUSR1 to trigger updates.
384 // "pkill -SIGUSR1 zoekt-sourcegra"
385 for range jitterTicker(s.Interval, unix.SIGUSR1) {
386 if paused, msg := isIndexingPaused(s.IndexDir); paused {
387 infoLog.Printf("%s", msg)
388 continue
389 }
390
391 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
392 if err != nil {
393 errorLog.Printf("error listing repos: %s", err)
394 continue
395 }
396
397 debugLog.Printf("updating index queue with %d repositories", len(repos.IDs))
398
399 // Stop indexing repos we don't need to track anymore
400 removed := s.queue.MaybeRemoveMissing(repos.IDs)
401 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
402 if len(removed) > 0 {
403 infoLog.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
404 }
405
406 cleanupDone := make(chan struct{})
407 go func() {
408 defer close(cleanupDone)
409 s.muIndexDir.Global(func() {
410 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
411 })
412 }()
413
414 repos.IterateIndexOptions(s.queue.AddOrUpdate)
415
416 // IterateIndexOptions will only iterate over repositories that have
417 // changed since we last called list. However, we want to add all IDs
418 // back onto the queue just to check that what is on disk is still
419 // correct. This will use the last IndexOptions we stored in the
420 // queue. The repositories not on the queue (missing) need a forced
421 // fetch of IndexOptions.
422 missing := s.queue.Bump(repos.IDs)
423 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
424
425 setShardsCounter(s.IndexDir)
426
427 <-cleanupDone
428 }
429 }()
430
431 go func() {
432 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
433 if s.shardMerging {
434 s.vacuum()
435 }
436 }
437 }()
438
439 go func() {
440 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
441 if s.shardMerging {
442 s.doMerge()
443 }
444 }
445 }()
446
447 for range s.IndexConcurrency {
448 go s.processQueue()
449 }
450
451 // block forever
452 select {}
453}
454
455// formatList returns a comma-separated list of the first min(len(v), m) items.
456func formatListUint32(v []uint32, m int) string {
457 if len(v) < m {
458 m = len(v)
459 }
460
461 sb := strings.Builder{}
462 for i := range m {
463 fmt.Fprintf(&sb, "%d, ", v[i])
464 }
465
466 if len(v) > m {
467 sb.WriteString("...")
468 }
469
470 return strings.TrimRight(sb.String(), ", ")
471}
472
473func (s *Server) processQueue() {
474 for {
475 if paused, _ := isIndexingPaused(s.IndexDir); paused {
476 time.Sleep(time.Second)
477 continue
478 }
479
480 item, ok := s.queue.Pop()
481 if !ok {
482 time.Sleep(time.Second)
483 continue
484 }
485
486 opts := item.Opts
487 args := s.indexArgs(opts)
488
489 ran := s.muIndexDir.With(opts.Name, func() {
490 // only record time taken once we hold the lock. This avoids us
491 // recording time taken while merging/cleanup runs.
492 start := time.Now()
493
494 state, err := s.index(context.Background(), args)
495
496 elapsed := time.Since(start)
497 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
498
499 indexDelay := time.Since(item.DateAddedToQueue)
500 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds())
501
502 if err != nil {
503 errorLog.Printf("error indexing %s: %s", args.String(), err)
504 }
505
506 switch state {
507 case indexStateSuccess:
508 var branches []string
509 for _, b := range args.Branches {
510 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
511 }
512 s.logger.Info("updated index",
513 sglog.Int("tenant", args.TenantID),
514 sglog.String("repo", args.Name),
515 sglog.Uint32("id", args.RepoID),
516 sglog.Strings("branches", branches),
517 sglog.Duration("duration", elapsed),
518 sglog.Duration("index_delay", indexDelay),
519 )
520 case indexStateSuccessMeta:
521 infoLog.Printf("updated meta %s in %v", args.String(), elapsed)
522 }
523 s.queue.SetIndexed(opts, state)
524 })
525
526 if !ran {
527 // Someone else is processing the repository. We can just skip this job
528 // since the repository will be added back to the queue and we will
529 // converge to the correct behaviour.
530 debugLog.Printf("index job for repository already running: %s", args)
531 continue
532 }
533 }
534}
535
536// repoNameForMetric returns a normalized version of the given repository name that is
537// suitable for use with Prometheus metrics.
538func repoNameForMetric(repo string) string {
539 // Check to see if we want to be able to capture separate indexing metrics for this repository.
540 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
541 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
542 return repo
543 }
544
545 return ""
546}
547
548func batched(slice []uint32, size int) <-chan []uint32 {
549 c := make(chan []uint32)
550 go func() {
551 for len(slice) > 0 {
552 if size > len(slice) {
553 size = len(slice)
554 }
555 c <- slice[:size]
556 slice = slice[size:]
557 }
558 close(c)
559 }()
560 return c
561}
562
563// jitterTicker returns a ticker which ticks with a jitter. Each tick is
564// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
565//
566// sig is a list of signals which also cause the ticker to fire. This is a
567// convenience to allow manually triggering of the ticker.
568func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
569 ticker := make(chan struct{})
570
571 go func() {
572 for {
573 ticker <- struct{}{}
574 ns := int64(d)
575 jitter := rand.Int63n(ns)
576 time.Sleep(time.Duration(ns/2 + jitter))
577 }
578 }()
579
580 go func() {
581 if len(sig) == 0 {
582 return
583 }
584
585 c := make(chan os.Signal, 1)
586 signal.Notify(c, sig...)
587 for range c {
588 ticker <- struct{}{}
589 }
590 }()
591
592 return ticker
593}
594
595// Index starts an index job for repo name at commit.
596func (s *Server) index(ctx context.Context, args *indexArgs) (state indexState, err error) {
597 tr := trace.New("index", args.Name)
598 tr.SetMaxEvents(30) // Ensure we capture all indexing events
599
600 defer func() {
601 if err != nil {
602 tr.SetError()
603 tr.LazyPrintf("error: %v", err)
604 state = indexStateFail
605 metricFailingTotal.Inc()
606 }
607 tr.LazyPrintf("state: %s", state)
608 tr.Finish()
609 }()
610
611 // Sourcegraph should always provide a tenant ID.
612 if args.TenantID < 1 {
613 return indexStateFail, tenant.ErrMissingTenant
614 }
615
616 tr.LazyPrintf("branches: %v", args.Branches)
617
618 if len(args.Branches) == 0 {
619 return indexStateEmpty, createEmptyShard(args)
620 }
621
622 repositoryName := args.Name
623 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
624 tr.LazyPrintf("marking this repository for delta build")
625 args.UseDelta = true
626 }
627
628 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
629
630 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
631 tr.LazyPrintf("skipping symbols calculation")
632 args.Symbols = false
633 }
634
635 reason := "forced"
636
637 if args.Incremental {
638 bo := args.BuildOptions()
639 bo.SetDefaults()
640 incrementalState, fn := bo.IndexState()
641 reason = string(incrementalState)
642 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
643
644 switch incrementalState {
645 case index.IndexStateEqual:
646 debugLog.Printf("%s index already up to date. Shard=%s", args.String(), fn)
647 return indexStateNoop, nil
648
649 case index.IndexStateMeta:
650 infoLog.Printf("updating index.meta %s", args.String())
651
652 // TODO(stefan) handle mergeMeta for tenant id.
653 if err := mergeMeta(bo); err != nil {
654 errorLog.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
655 } else {
656 return indexStateSuccessMeta, nil
657 }
658
659 case index.IndexStateCorrupt:
660 infoLog.Printf("falling back to full update: corrupt index: %s", args.String())
661 }
662 }
663
664 infoLog.Printf("updating index %s reason=%s", args.String(), reason)
665
666 metricIndexingTotal.Inc()
667 c := gitIndexConfig{
668 runCmd: func(cmd *exec.Cmd) error {
669 return s.loggedRun(tr, cmd)
670 },
671
672 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
673 return args.BuildOptions().FindRepositoryMetadata()
674 },
675 timeout: s.timeout,
676 }
677
678 logIndexStatusUpdateError := func(err error) {
679 s.logger.Error("failed to update index status",
680 sglog.String("repo", args.Name),
681 sglog.Uint32("id", args.RepoID),
682 sglogBranches("branches", args.Branches),
683 sglog.Error(err),
684 )
685 }
686
687 err = gitIndex(ctx, c, args, s.Sourcegraph, s.logger)
688 if err != nil {
689 if statusErr := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph, err); statusErr != nil {
690 logIndexStatusUpdateError(statusErr)
691 }
692 return indexStateFail, err
693 }
694
695 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph, nil); err != nil {
696 logIndexStatusUpdateError(err)
697 }
698
699 return indexStateSuccess, nil
700}
701
702// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
703// it can update the zoekt_repos table.
704func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph, indexErr error) error {
705 state := configv1.UpdateIndexStatusRequest_Repository_STATE_SUCCESS
706 var failureMessage string
707 var indexTimeUnix int64
708 if indexErr != nil {
709 state = configv1.UpdateIndexStatusRequest_Repository_STATE_FAILURE
710 failureMessage = indexErr.Error()
711
712 // On failure, metadata may not exist yet. Include index time if we can,
713 // but do not block reporting the failure status.
714 if _, metadata, ok, err := c.findRepositoryMetadata(args); err == nil && ok {
715 indexTimeUnix = metadata.IndexTime.Unix()
716 }
717 } else {
718 // We need to read from disk for IndexTime.
719 _, metadata, ok, err := c.findRepositoryMetadata(args)
720 if err != nil {
721 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
722 }
723 if !ok {
724 return errors.New("failed to find metadata for new/updated index")
725 }
726 indexTimeUnix = metadata.IndexTime.Unix()
727 }
728
729 status := []indexStatus{{
730 RepoID: args.RepoID,
731 Branches: args.Branches,
732 IndexTimeUnix: indexTimeUnix,
733 State: state,
734 FailureMessage: failureMessage,
735 }}
736
737 if err := sg.UpdateIndexStatus(status); err != nil {
738 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
739 }
740
741 return nil
742}
743
744func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
745 ss := make([]string, len(branches))
746 for i, b := range branches {
747 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
748 }
749 return sglog.Strings(key, ss)
750}
751
752func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
753 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
754 return &indexArgs{
755 IndexOptions: opts,
756 IndexDir: s.IndexDir,
757 Parallelism: parallelism,
758 Incremental: true,
759 ShardMerging: s.shardMerging,
760 }
761}
762
763// parallelism consults both the server flags and index options to determine the number
764// of shards to index in parallel. If the CPUCount index option is provided, it always
765// overrides the server flag.
766func (s *Server) parallelism(opts IndexOptions, maxProcs int) int {
767 var parallelism int
768 if opts.ShardConcurrency > 0 {
769 parallelism = int(opts.ShardConcurrency)
770 } else {
771 parallelism = s.CPUCount
772 }
773
774 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs
775 if parallelism > 4*maxProcs {
776 parallelism = 4 * maxProcs
777 }
778
779 // If index concurrency is set, then divide the parallelism by the number of
780 // repos we're indexing in parallel
781 if s.IndexConcurrency > 1 {
782 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency)))
783 }
784
785 return parallelism
786}
787
788func createEmptyShard(args *indexArgs) error {
789 bo := args.BuildOptions()
790 bo.SetDefaults()
791 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
792
793 if args.Incremental && bo.IncrementalSkipIndexing() {
794 return nil
795 }
796
797 builder, err := index.NewBuilder(*bo)
798 if err != nil {
799 return err
800 }
801 return builder.Finish()
802}
803
804// addDebugHandlers adds handlers specific to indexserver.
805func (s *Server) addDebugHandlers(mux *http.ServeMux) {
806 // Sourcegraph's site admin view requires indexserver to serve it's admin view
807 // on "/".
808 mux.Handle("/", http.HandlerFunc(s.handleRoot))
809
810 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
811 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
812 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
813 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
814 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
815 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
816}
817
818func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
819 if r.Method != "GET" {
820 w.Header().Set("Allow", "GET")
821 w.WriteHeader(http.StatusMethodNotAllowed)
822 return
823 }
824
825 response := struct {
826 Hostname string
827 }{
828 Hostname: s.hostname,
829 }
830
831 b, err := json.Marshal(response)
832 if err != nil {
833 http.Error(w, err.Error(), http.StatusInternalServerError)
834 return
835 }
836
837 w.Header().Set("Content-Type", "application/json; charset=utf-8")
838 w.Write(b)
839}
840
841var rootTmpl = template.Must(template.New("name").Parse(`
842<html>
843 <body>
844 <a href="debug">Debug</a><br />
845 <a href="debug/requests">Traces</a><br />
846 {{.IndexMsg}}<br />
847 <br />
848 <h3>Reindex</h3>
849 {{if .Repos}}
850 <a href="?show_repos=false">hide repos</a><br />
851 <table style="margin-top: 20px">
852 <th style="text-align:left">Name</th>
853 <th style="text-align:left">ID (click to reindex)</th>
854 {{range .Repos}}
855 <tr>
856 <td>{{.Name}}</td>
857 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
858 </tr>
859 {{end}}
860 </table>
861 {{else}}
862 <a href="?show_repos=true">show repos</a><br />
863 {{end}}
864 </body>
865</html>
866`))
867
868func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
869 if r.Method != "GET" {
870 w.Header().Set("Allow", "GET")
871 w.WriteHeader(http.StatusMethodNotAllowed)
872 return
873 }
874
875 values := r.URL.Query()
876
877 // ?id=
878 indexMsg := ""
879 if v := values.Get("id"); v != "" {
880 id, err := strconv.Atoi(v)
881 if err != nil {
882 http.Error(w, err.Error(), http.StatusBadRequest)
883 return
884 }
885 indexMsg, _ = s.forceIndex(r.Context(), uint32(id))
886 }
887
888 // ?show_repos=
889 showRepos := false
890 if v := values.Get("show_repos"); v != "" {
891 showRepos, _ = strconv.ParseBool(v)
892 }
893
894 type Repo struct {
895 ID uint32
896 Name string
897 }
898 var data struct {
899 Repos []Repo
900 IndexMsg string
901 }
902
903 data.IndexMsg = indexMsg
904
905 if showRepos {
906 s.queue.Iterate(func(opts *IndexOptions) {
907 data.Repos = append(data.Repos, Repo{
908 ID: opts.RepoID,
909 Name: opts.Name,
910 })
911 })
912 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
913 }
914
915 _ = rootTmpl.Execute(w, data)
916}
917
918// handleReindex triggers a reindex asynocronously. If a reindex was triggered
919// the request returns with status 202. The caller can infer the new state of
920// the index by calling List.
921func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
922 if r.Method != http.MethodPost {
923 w.Header().Set("Allow", http.MethodPost)
924 w.WriteHeader(http.StatusMethodNotAllowed)
925 return
926 }
927
928 err := r.ParseForm()
929 if err != nil {
930 http.Error(w, err.Error(), http.StatusBadRequest)
931 return
932 }
933
934 id, err := strconv.Atoi(r.Form.Get("repo"))
935 if err != nil {
936 http.Error(w, err.Error(), http.StatusBadRequest)
937 return
938 }
939
940 go func() { s.forceIndex(context.Background(), uint32(id)) }()
941
942 // 202 Accepted
943 w.WriteHeader(http.StatusAccepted)
944}
945
946func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
947 withIndexed := true
948 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
949 withIndexed = b
950 }
951
952 var indexed []uint32
953 if withIndexed {
954 indexed = listIndexed(s.IndexDir)
955 }
956
957 repos, err := s.Sourcegraph.List(r.Context(), indexed)
958 if err != nil {
959 http.Error(w, err.Error(), http.StatusInternalServerError)
960 return
961 }
962
963 bw := bytes.Buffer{}
964
965 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
966
967 _, err = fmt.Fprintf(tw, "ID\tName\n")
968 if err != nil {
969 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
970 return
971 }
972
973 s.queue.mu.Lock()
974 name := ""
975 for _, id := range repos.IDs {
976 if item := s.queue.get(id); item != nil {
977 name = item.opts.Name
978 } else {
979 name = ""
980 }
981 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
982 if err != nil {
983 debugLog.Printf("handleDebugList: %s\n", err.Error())
984 }
985 }
986 s.queue.mu.Unlock()
987
988 if err != nil {
989 http.Error(w, err.Error(), http.StatusInternalServerError)
990 return
991 }
992
993 err = tw.Flush()
994 if err != nil {
995 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
996 return
997 }
998
999 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
1000
1001 if _, err := io.Copy(w, &bw); err != nil {
1002 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
1003 return
1004 }
1005}
1006
1007// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
1008// can run this command during periods of low usage (evenings, weekends) to
1009// trigger an initial merge run. In the steady-state, merges happen rarely, even
1010// on busy instances, and users can rely on automatic merging instead.
1011func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
1012 // A merge operation can take very long, depending on the number merges and the
1013 // target size of the compound shards. We run the merge in the background and
1014 // return immediately to the user.
1015 //
1016 // We track the status of the merge with metricShardMergingRunning.
1017 go func() {
1018 s.doMerge()
1019 }()
1020 _, _ = w.Write([]byte("merging enqueued\n"))
1021}
1022
1023func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
1024 indexed := listIndexed(s.IndexDir)
1025
1026 bw := bytes.Buffer{}
1027
1028 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
1029
1030 _, err := fmt.Fprintf(tw, "ID\tName\n")
1031 if err != nil {
1032 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
1033 return
1034 }
1035
1036 s.queue.mu.Lock()
1037 name := ""
1038 for _, id := range indexed {
1039 if item := s.queue.get(id); item != nil {
1040 name = item.opts.Name
1041 } else {
1042 name = ""
1043 }
1044 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
1045 if err != nil {
1046 debugLog.Printf("handleDebugIndexed: %s\n", err.Error())
1047 }
1048 }
1049 s.queue.mu.Unlock()
1050
1051 if err != nil {
1052 http.Error(w, err.Error(), http.StatusInternalServerError)
1053 return
1054 }
1055
1056 err = tw.Flush()
1057 if err != nil {
1058 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
1059 return
1060 }
1061
1062 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
1063
1064 if _, err := io.Copy(w, &bw); err != nil {
1065 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
1066 return
1067 }
1068}
1069
1070// forceIndex will run the index job for repo name now. It will return always
1071// return a string explaining what it did, even if it failed.
1072func (s *Server) forceIndex(ctx context.Context, id uint32) (string, error) {
1073 var opts IndexOptions
1074 var err error
1075 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
1076 opts = o
1077 }, func(_ uint32, e error) {
1078 err = e
1079 }, id)
1080 if err != nil {
1081 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
1082 }
1083
1084 args := s.indexArgs(opts)
1085 args.Incremental = false // force re-index
1086
1087 var state indexState
1088 ran := s.muIndexDir.With(opts.Name, func() {
1089 state, err = s.index(ctx, args)
1090 })
1091 if !ran {
1092 return fmt.Sprintf("index job for repository already running: %s", args), nil
1093 }
1094 if err != nil {
1095 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
1096 }
1097 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
1098}
1099
1100// DeleteAllData deletes all shards in the index and trash dir belonging to the
1101// tenant associated with the request. The deletion is best-effort, which means
1102// we will delete as much as possible. If no error is returned, the caller can
1103// be certain that all data has been deleted.
1104func (s *Server) DeleteAllData(ctx context.Context, _ *indexserverv1.DeleteAllDataRequest) (*indexserverv1.DeleteAllDataResponse, error) {
1105 tnt, err := tenant.FromContext(ctx)
1106 if err != nil {
1107 return nil, err
1108 }
1109 s.logger.Warn("DeleteAllData", sglog.Int("tenant_id", tnt.ID()))
1110
1111 var merr error
1112 s.muIndexDir.Global(func() {
1113 // First, explode all compound shards that have repos from the tenant in
1114 // question. Because we hold the global lock, we can be sure that no new
1115 // merges start while we do this.
1116 if err := s.explodeTenantCompoundShards(ctx, func(path string) error {
1117 // We call explode in a separate process to protect indexserver.
1118 cmd := defaultExplodeCmd(path)
1119
1120 stdoutBuf := &bytes.Buffer{}
1121 stderrBuf := &bytes.Buffer{}
1122 cmd.Stdout = stdoutBuf
1123 cmd.Stderr = stderrBuf
1124
1125 err := cmd.Run()
1126 if err != nil {
1127 errorLog.Printf("explode failed: %v (stderr: %s)", err, stderrBuf.String())
1128 return err
1129 }
1130
1131 infoLog.Printf("exploded shard: %s", stdoutBuf.String())
1132
1133 return nil
1134 }); err != nil {
1135 merr = multierr.Append(merr, err)
1136 }
1137
1138 // Invariant: all shards from the tenant are simple shards.
1139
1140 if err := purgeTenantShards(ctx, s.IndexDir); err != nil {
1141 merr = multierr.Append(merr, err)
1142 }
1143 if err := purgeTenantShards(ctx, filepath.Join(s.IndexDir, ".trash")); err != nil {
1144 merr = multierr.Append(merr, err)
1145 }
1146 })
1147
1148 return &indexserverv1.DeleteAllDataResponse{}, merr
1149}
1150
1151func listIndexed(indexDir string) []uint32 {
1152 index := getShards(indexDir)
1153 metricNumIndexed.Set(float64(len(index)))
1154 repoIDs := make([]uint32, 0, len(index))
1155 for id := range index {
1156 repoIDs = append(repoIDs, id)
1157 }
1158 slices.Sort(repoIDs)
1159 return repoIDs
1160}
1161
1162// setupTmpDir sets up a temporary directory on the same volume as the
1163// indexes.
1164//
1165// If main is true we will delete older temp directories left around. main is
1166// false when this is a debug command.
1167func setupTmpDir(logger sglog.Logger, main bool, index string) error {
1168 // change the target tmp directory depending on if it's our main daemon or a
1169 // debug sub command.
1170 dir := ".indexserver.debug.tmp"
1171 if main {
1172 dir = ".indexserver.tmp"
1173 }
1174
1175 tmpRoot := filepath.Join(index, dir)
1176
1177 if main {
1178 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot))
1179 err := os.RemoveAll(tmpRoot)
1180 if err != nil {
1181 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err))
1182 }
1183 }
1184
1185 if err := os.MkdirAll(tmpRoot, 0o755); err != nil {
1186 return err
1187 }
1188
1189 return os.Setenv("TMPDIR", tmpRoot)
1190}
1191
1192func printMetaData(fn string) error {
1193 repo, indexMeta, err := index.ReadMetadataPath(fn)
1194 if err != nil {
1195 return err
1196 }
1197
1198 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1199 if err != nil {
1200 return err
1201 }
1202
1203 err = json.NewEncoder(os.Stdout).Encode(repo)
1204 if err != nil {
1205 return err
1206 }
1207 return nil
1208}
1209
1210func printShardStats(fn string) error {
1211 f, err := os.Open(fn)
1212 if err != nil {
1213 return err
1214 }
1215
1216 iFile, err := index.NewIndexFile(f)
1217 if err != nil {
1218 return err
1219 }
1220
1221 return index.PrintNgramStats(iFile)
1222}
1223
1224func srcLogLevelIsDebug() bool {
1225 lvl := os.Getenv(sglog.EnvLogLevel)
1226 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1227}
1228
1229func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1230 v := os.Getenv(k)
1231 if v == "" {
1232 return defaultVal
1233 }
1234 b, err := strconv.ParseBool(v)
1235 if err != nil {
1236 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1237 }
1238 return b
1239}
1240
1241func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1242 v := os.Getenv(k)
1243 if v == "" {
1244 return defaultVal
1245 }
1246 i, err := strconv.ParseInt(v, 10, 64)
1247 if err != nil {
1248 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1249 }
1250 return i
1251}
1252
1253func getEnvWithDefaultInt(k string, defaultVal int) int {
1254 v := os.Getenv(k)
1255 if v == "" {
1256 return defaultVal
1257 }
1258 i, err := strconv.Atoi(k)
1259 if err != nil {
1260 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1261 }
1262 return i
1263}
1264
1265func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1266 v := os.Getenv(k)
1267 if v == "" {
1268 return defaultVal
1269 }
1270 i, err := strconv.ParseUint(v, 10, 64)
1271 if err != nil {
1272 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1273 }
1274 return i
1275}
1276
1277func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1278 v := os.Getenv(k)
1279 if v == "" {
1280 return defaultVal
1281 }
1282 f, err := strconv.ParseFloat(v, 64)
1283 if err != nil {
1284 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1285 }
1286 return f
1287}
1288
1289func getEnvWithDefaultString(k string, defaultVal string) string {
1290 v := os.Getenv(k)
1291 if v == "" {
1292 return defaultVal
1293 }
1294 return v
1295}
1296
1297func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1298 v := os.Getenv(k)
1299 if v == "" {
1300 return defaultVal
1301 }
1302
1303 d, err := time.ParseDuration(v)
1304 if err != nil {
1305 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1306 }
1307 return d
1308}
1309
1310func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1311 set := map[string]struct{}{}
1312 for _, v := range strings.Split(os.Getenv(k), ",") {
1313 v = strings.TrimSpace(v)
1314 if v != "" {
1315 set[v] = struct{}{}
1316 }
1317 }
1318 return set
1319}
1320
1321func joinStringSet(set map[string]struct{}, sep string) string {
1322 var xs []string
1323 for x := range set {
1324 xs = append(xs, x)
1325 }
1326
1327 return strings.Join(xs, sep)
1328}
1329
1330func setShardsCounter(indexDir string) {
1331 fns, err := filepath.Glob(filepath.Join(indexDir, "*.zoekt"))
1332 if err != nil {
1333 errorLog.Printf("setShardsCounter: %s\n", err)
1334 return
1335 }
1336 metricNumberShards.Set(float64(len(fns)))
1337
1338 compoundFns := make([]string, 0, len(fns))
1339 for _, fn := range fns {
1340 if strings.HasPrefix(filepath.Base(fn), "compound-") {
1341 compoundFns = append(compoundFns, fn)
1342 }
1343 }
1344 metricNumberCompoundShards.Set(float64(len(compoundFns)))
1345}
1346
1347func rootCmd() *ffcli.Command {
1348 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1349 conf := rootConfig{
1350 Main: true,
1351 }
1352 conf.registerRootFlags(rootFs)
1353
1354 return &ffcli.Command{
1355 FlagSet: rootFs,
1356 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1357 Subcommands: []*ffcli.Command{debugCmd()},
1358 Exec: func(ctx context.Context, args []string) error {
1359 return startServer(conf)
1360 },
1361 }
1362}
1363
1364type rootConfig struct {
1365 // Main is true if this rootConfig is for our main long running command (the
1366 // indexserver). Debug commands should not set this value. This is used to
1367 // determine if we need to run tmpfriend.
1368 Main bool
1369
1370 root string
1371 interval time.Duration
1372 index string
1373 indexConcurrency int64
1374 listen string
1375 hostname string
1376 cpuFraction float64
1377
1378 // config values related to shard merging
1379 disableShardMerging bool
1380 vacuumInterval time.Duration
1381 mergeInterval time.Duration
1382 targetSize int64
1383 minSize int64
1384 minAgeDays int
1385
1386 // config values related to backoff indexing repos with one or more consecutive failures
1387 backoffDuration time.Duration
1388 maxBackoffDuration time.Duration
1389}
1390
1391func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1392 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1393 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1394 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
1395 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", index.DefaultDir), "set index directory to use")
1396 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1397 fs.StringVar(&rc.hostname, "hostname", index.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1398 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1399 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1400 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1401
1402 // flags related to shard merging
1403 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging")
1404 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1405 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1406 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB")
1407 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB")
1408 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1409}
1410
1411func startServer(conf rootConfig) error {
1412 s, err := newServer(conf)
1413 if err != nil {
1414 return err
1415 }
1416
1417 profiler.Init("zoekt-sourcegraph-indexserver")
1418 setShardsCounter(s.IndexDir)
1419
1420 if conf.listen != "" {
1421
1422 mux := http.NewServeMux()
1423 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1424 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1425 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1426 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1427 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1428 }...)
1429 s.addDebugHandlers(mux)
1430
1431 go func() {
1432 debugLog.Printf("serving HTTP on %s", conf.listen)
1433 mux := grpcutil.MultiplexGRPC(newGRPCServer(sglog.Scoped("indexserver"), s), mux)
1434 log.Fatal(http.ListenAndServe(conf.listen, mux))
1435 }()
1436
1437 // Serve mux on a unix domain socket on a best-effort-basis so that
1438 // webserver can call the endpoints via the shared filesystem.
1439 //
1440 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1441 // on the socket due to permission errors. See
1442 // https://github.com/docker/for-mac/issues/6239
1443 go func() {
1444 serveHTTPOverSocket := func() error {
1445 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1446 // We cannot bind a socket to an existing pathname.
1447 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1448 return fmt.Errorf("error removing socket file: %s", socket)
1449 }
1450 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1451 // unixpacket).
1452 l, err := net.Listen("unix", socket)
1453 if err != nil {
1454 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1455 }
1456 // Indexserver (root) and webserver (Sourcegraph) run with
1457 // different users. Per default, the socket is created with
1458 // permission 755 (root root), which doesn't let webserver write to
1459 // it.
1460 //
1461 // See https://github.com/golang/go/issues/11822 for more context.
1462 if err := os.Chmod(socket, 0o777); err != nil {
1463 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1464 }
1465 debugLog.Printf("serving HTTP on %s", socket)
1466 return http.Serve(l, mux)
1467 }
1468 debugLog.Print(serveHTTPOverSocket())
1469 }()
1470 }
1471
1472 oc := &ownerChecker{
1473 Path: filepath.Join(conf.index, "owner.txt"),
1474 Hostname: conf.hostname,
1475 }
1476 go oc.Run()
1477
1478 logger := sglog.Scoped("metricsRegistration")
1479 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1480
1481 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1482 prometheus.DefaultRegisterer.MustRegister(c)
1483
1484 s.Run()
1485 return nil
1486}
1487
1488func newServer(conf rootConfig) (*Server, error) {
1489 logger := sglog.Scoped("server")
1490
1491 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1492 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1493 }
1494 if conf.index == "" {
1495 return nil, fmt.Errorf("must set -index")
1496 }
1497 if conf.root == "" {
1498 return nil, fmt.Errorf("must set -sourcegraph_url")
1499 }
1500 rootURL, err := url.Parse(conf.root)
1501 if err != nil {
1502 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1503 }
1504
1505 rootURL = addDefaultPort(rootURL)
1506
1507 // Tune GOMAXPROCS to match Linux container CPU quota.
1508 _, _ = maxprocs.Set()
1509
1510 // Automatically prepend our own path at the front, to minimize
1511 // required configuration.
1512 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1513 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1514 }
1515
1516 if _, err := os.Stat(conf.index); err != nil {
1517 if err := os.MkdirAll(conf.index, 0o755); err != nil {
1518 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1519 }
1520 }
1521
1522 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil {
1523 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1524 }
1525
1526 if srcLogLevelIsDebug() {
1527 debugLog.SetOutput(os.Stderr)
1528 }
1529
1530 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1531 if len(reposWithSeparateIndexingMetrics) > 0 {
1532 debugLog.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1533 }
1534
1535 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1536 if len(deltaBuildRepositoriesAllowList) > 0 {
1537 debugLog.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1538 }
1539
1540 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1541 if deltaShardNumberFallbackThreshold > 0 {
1542 debugLog.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1543 } else {
1544 debugLog.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1545 }
1546
1547 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1548 if len(reposShouldSkipSymbolsCalculation) > 0 {
1549 debugLog.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1550 }
1551
1552 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout)
1553 if indexingTimeout != defaultIndexingTimeout {
1554 debugLog.Printf("using configured indexing timeout: %s", indexingTimeout)
1555 }
1556
1557 var sg Sourcegraph
1558 if rootURL.IsAbs() {
1559 var batchSize int
1560 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1561 batchSize, err = strconv.Atoi(v)
1562 if err != nil {
1563 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1564 }
1565 }
1566
1567 opts := []SourcegraphClientOption{
1568 WithBatchSize(batchSize),
1569 }
1570
1571 logger := sglog.Scoped("zoektConfigurationGRPCClient")
1572 client, err := dialGRPCClient(rootURL.Host, logger)
1573 if err != nil {
1574 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1575 }
1576
1577 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...)
1578
1579 } else {
1580 sg = sourcegraphFake{
1581 RootDir: rootURL.String(),
1582 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1583 }
1584 }
1585
1586 cpuCount := max(int(math.Round(float64(runtime.GOMAXPROCS(0))*(conf.cpuFraction))), 1)
1587
1588 if conf.indexConcurrency < 1 {
1589 conf.indexConcurrency = 1
1590 } else if conf.indexConcurrency > int64(cpuCount) {
1591 conf.indexConcurrency = int64(cpuCount)
1592 }
1593
1594 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1595
1596 return &Server{
1597 logger: logger,
1598 rootURL: rootURL,
1599 Sourcegraph: sg,
1600 IndexDir: conf.index,
1601 IndexConcurrency: int(conf.indexConcurrency),
1602 Interval: conf.interval,
1603 CPUCount: cpuCount,
1604 queue: *q,
1605 shardMerging: !conf.disableShardMerging,
1606 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1607 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1608 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1609 hostname: conf.hostname,
1610 mergeOpts: mergeOpts{
1611 vacuumInterval: conf.vacuumInterval,
1612 mergeInterval: conf.mergeInterval,
1613 targetSizeBytes: conf.targetSize * 1024 * 1024,
1614 minSizeBytes: conf.minSize * 1024 * 1024,
1615 minAgeDays: conf.minAgeDays,
1616 },
1617 timeout: indexingTimeout,
1618 indexSemaphore: make(chan struct{}, int(conf.indexConcurrency)),
1619 }, err
1620}
1621
1622func newGRPCServer(logger sglog.Logger, s *Server, additionalOpts ...grpc.ServerOption) *grpc.Server {
1623 grpcServer := defaults.NewServer(logger, additionalOpts...)
1624 indexserverv1.RegisterSourcegraphIndexserverServiceServer(grpcServer, s)
1625 return grpcServer
1626}
1627
1628// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1629// for the indexed-search-configuration gRPC service.
1630//
1631// The default backoff strategy is modeled after the default settings used by
1632// retryablehttp.DefaultClient.
1633//
1634// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1635// - Unavailable
1636// - Aborted
1637//
1638//go:embed default_grpc_service_configuration.json
1639var defaultGRPCServiceConfigurationJSON string
1640
1641func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1642 return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1643 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1644 return invoker(ctx, method, req, reply, cc, opts...)
1645 }
1646}
1647
1648func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1649 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1650 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1651 return streamer(ctx, desc, cc, method, opts...)
1652 }
1653}
1654
1655// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1656// This can be overridden by providing custom Server/Dial options.
1657const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1658
1659func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (configv1.ZoektConfigurationServiceClient, error) {
1660 metrics := clientMetricsOnce()
1661
1662 // If the service seems to be unavailable, this
1663 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1
1664 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s])
1665 retryOpts := []retry.CallOption{
1666 retry.WithMax(5),
1667 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)),
1668 retry.WithCodes(codes.Unavailable),
1669 }
1670
1671 opts := []grpc.DialOption{
1672 grpc.WithTransportCredentials(insecure.NewCredentials()),
1673 grpc.WithChainStreamInterceptor(
1674 metrics.StreamClientInterceptor(),
1675 messagesize.StreamClientInterceptor,
1676 internalActorStreamInterceptor(),
1677 internalerrs.LoggingStreamClientInterceptor(logger),
1678 internalerrs.PrometheusStreamClientInterceptor,
1679 retry.StreamClientInterceptor(retryOpts...),
1680 ),
1681 grpc.WithChainUnaryInterceptor(
1682 metrics.UnaryClientInterceptor(),
1683 messagesize.UnaryClientInterceptor,
1684 internalActorUnaryInterceptor(),
1685 internalerrs.LoggingUnaryClientInterceptor(logger),
1686 internalerrs.PrometheusUnaryClientInterceptor,
1687 retry.UnaryClientInterceptor(retryOpts...),
1688 ),
1689 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1690 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1691 }
1692
1693 opts = append(opts, additionalOpts...)
1694
1695 // Ensure that the message size options are set last, so they override any other
1696 // client-specific options that tweak the message size.
1697 //
1698 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1699 // take precedence over everything else with a uniform size setting that's easy to reason about.
1700 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1701
1702 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1703 // This is done lazily, so we can provide the client to use regardless of
1704 // whether we enabled gRPC or not initially.
1705 cc, err := grpc.Dial(addr, opts...)
1706 if err != nil {
1707 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1708 }
1709
1710 client := configv1.NewZoektConfigurationServiceClient(cc)
1711 return client, nil
1712}
1713
1714// addDefaultPort adds a default port to a URL if one is not specified.
1715//
1716// If the URL scheme is "http" and no port is specified, "80" is used.
1717// If the scheme is "https", "443" is used.
1718//
1719// The original URL is not mutated. A copy is modified and returned.
1720func addDefaultPort(original *url.URL) *url.URL {
1721 if original == nil {
1722 return nil // don't panic
1723 }
1724
1725 if !original.IsAbs() {
1726 return original // don't do anything if the URL doesn't look like a remote URL
1727 }
1728
1729 if original.Scheme == "http" && original.Port() == "" {
1730 u := cloneURL(original)
1731 u.Host = net.JoinHostPort(u.Host, "80")
1732 return u
1733 }
1734
1735 if original.Scheme == "https" && original.Port() == "" {
1736 u := cloneURL(original)
1737 u.Host = net.JoinHostPort(u.Host, "443")
1738 return u
1739 }
1740
1741 return original
1742}
1743
1744// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1745// This is copied from net/http/clone.go
1746func cloneURL(u *url.URL) *url.URL {
1747 if u == nil {
1748 return nil
1749 }
1750 u2 := new(url.URL)
1751 *u2 = *u
1752 if u.User != nil {
1753 u2.User = new(url.Userinfo)
1754 *u2.User = *u.User
1755 }
1756 return u2
1757}
1758
1759func main() {
1760 liblog := sglog.Init(sglog.Resource{
1761 Name: "zoekt-indexserver",
1762 Version: index.Version,
1763 InstanceID: index.HostnameBestEffort(),
1764 })
1765 defer liblog.Sync()
1766
1767 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1768 log.Fatal(err)
1769 }
1770}
1771
1772// getBoolFromEnvironmentVariables returns the boolean defined by the first environment
1773// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set.
1774//
1775// An error is returned of the provided environment variables fails to parse as a boolean.
1776func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) {
1777 for _, envVar := range envVarNames {
1778 v := os.Getenv(envVar)
1779 if v == "" {
1780 continue
1781 }
1782
1783 b, err := strconv.ParseBool(v)
1784 if err != nil {
1785 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err)
1786 }
1787
1788 return b, nil
1789 }
1790
1791 return defaultBool, nil
1792}