fork of https://github.com/sourcegraph/zoekt
1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5// http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12
13// Command zoekt-sourcegraph-indexserver periodically reindexes repositories
14// from a Sourcegraph instance. It uses a "pull-based" design, where it periodically
15// reaches out to the Sourcegraph instance for the list of repositories to reindex.
16package main
17
18import (
19 "bytes"
20 "context"
21 _ "embed"
22 "encoding/json"
23 "errors"
24 "flag"
25 "fmt"
26 "html/template"
27 "io"
28 "io/fs"
29 "log"
30 "math"
31 "math/rand"
32 "net"
33 "net/http"
34 "net/url"
35 "os"
36 "os/exec"
37 "os/signal"
38 "path/filepath"
39 "runtime"
40 "sort"
41 "strconv"
42 "strings"
43 "sync"
44 "text/tabwriter"
45 "time"
46
47 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
48 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
49 "github.com/peterbourgon/ff/v3/ffcli"
50 "github.com/prometheus/client_golang/prometheus"
51 "github.com/prometheus/client_golang/prometheus/promauto"
52 sglog "github.com/sourcegraph/log"
53 "github.com/sourcegraph/mountinfo"
54
55 "github.com/sourcegraph/zoekt"
56 configv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/sourcegraph/zoekt/configuration/v1"
57 indexserverv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/zoekt/indexserver/v1"
58 "github.com/sourcegraph/zoekt/grpc/defaults"
59 "github.com/sourcegraph/zoekt/grpc/grpcutil"
60 "github.com/sourcegraph/zoekt/grpc/internalerrs"
61 "github.com/sourcegraph/zoekt/grpc/messagesize"
62 "github.com/sourcegraph/zoekt/index"
63 "github.com/sourcegraph/zoekt/internal/debugserver"
64 "github.com/sourcegraph/zoekt/internal/profiler"
65 "github.com/sourcegraph/zoekt/internal/tenant"
66
67 "go.uber.org/automaxprocs/maxprocs"
68 "go.uber.org/multierr"
69 "golang.org/x/net/trace"
70 "golang.org/x/sys/unix"
71 "google.golang.org/grpc"
72 "google.golang.org/grpc/codes"
73 "google.golang.org/grpc/credentials/insecure"
74 "google.golang.org/grpc/metadata"
75)
76
77var (
78 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
79 Name: "resolve_revisions_seconds",
80 Help: "A histogram of latencies for resolving all repository revisions.",
81 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
82 })
83
84 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
85 Name: "resolve_revision_seconds",
86 Help: "A histogram of latencies for resolving a repository revision.",
87 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
88 }, []string{"success"}) // success=true|false
89
90 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
91 Name: "get_index_options_total",
92 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
93 })
94 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
95 Name: "get_index_options_error_total",
96 Help: "The total number of times we failed to get index options for a repository.",
97 })
98
99 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
100 Name: "index_repo_seconds",
101 Help: "A histogram of latencies for indexing a repository.",
102 Buckets: prometheus.ExponentialBucketsRange(
103 (100 * time.Millisecond).Seconds(),
104 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
105 20),
106 }, []string{
107 "state", // state is an indexState
108 "name", // name of the repository that was indexed
109 })
110
111 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{
112 Name: "index_indexing_delay_seconds",
113 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.",
114 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days
115 }, []string{
116 "state", // state is an indexState
117 "name", // the name of the repository that was indexed
118 })
119
120 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
121 Name: "index_fetch_seconds",
122 Help: "A histogram of latencies for fetching a repository.",
123 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
124 }, []string{
125 "success", // true|false
126 "name", // the name of the repository that the commits were fetched from
127 })
128
129 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
130 Name: "index_incremental_index_state",
131 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
132 }, []string{"state"}) // state is index.IndexState
133
134 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
135 Name: "index_num_indexed",
136 Help: "Number of indexed repos by code host",
137 })
138
139 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
140 Name: "index_failing_total",
141 Help: "Counts failures to index (indexing activity, should be used with rate())",
142 })
143
144 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
145 Name: "index_indexing_total",
146 Help: "Counts indexings (indexing activity, should be used with rate())",
147 })
148
149 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
150 Name: "index_num_stopped_tracking_total",
151 Help: "Counts the number of repos we stopped tracking.",
152 })
153
154 // clientMetricsOnce returns a singleton instance of the client metrics
155 // that are shared across all gRPC clients that this process creates.
156 //
157 // This function panics if the metrics cannot be registered with the default
158 // Prometheus registry.
159 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics {
160 clientMetrics := grpcprom.NewClientMetrics(
161 grpcprom.WithClientCounterOptions(),
162 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
163 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
164 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
165 )
166 prometheus.DefaultRegisterer.MustRegister(clientMetrics)
167 return clientMetrics
168 })
169)
170
171// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
172// NOTE: if you change this, you must also update gitIndex to use the same value when fetching the repo.
173const MaxFileSize = 1 << 20
174
175// set of repositories that we want to capture separate indexing metrics for
176var reposWithSeparateIndexingMetrics = make(map[string]struct{})
177
178type indexState string
179
180const (
181 indexStateFail indexState = "fail"
182 indexStateSuccess indexState = "success"
183 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
184 indexStateNoop indexState = "noop" // We didn't need to update index
185 indexStateEmpty indexState = "empty" // index is empty (empty repo)
186)
187
188// Server is the main functionality of zoekt-sourcegraph-indexserver. It
189// exists to conveniently use all the options passed in via func main.
190type Server struct {
191 logger sglog.Logger
192
193 Sourcegraph Sourcegraph
194 BatchSize int
195
196 // IndexDir is the index directory to use.
197 IndexDir string
198
199 // IndexConcurrency is the number of repositories we index at once.
200 IndexConcurrency int
201
202 // Interval is how often we sync with Sourcegraph.
203 Interval time.Duration
204
205 // CPUCount is the number of CPUs to use for indexing shards.
206 CPUCount int
207
208 queue Queue
209
210 // muIndexDir protects the index directory from concurrent access.
211 muIndexDir indexMutex
212
213 // If true, shard merging is enabled.
214 shardMerging bool
215
216 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
217 // use delta-builds for instead of normal builds
218 deltaBuildRepositoriesAllowList map[string]struct{}
219
220 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
221 // before attempting a delta build.
222 deltaShardNumberFallbackThreshold uint64
223
224 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
225 // we skip calculating symbols metadata for during builds
226 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
227
228 hostname string
229
230 mergeOpts mergeOpts
231
232 // timeout defines how long the index server waits before killing an indexing job.
233 timeout time.Duration
234
235 indexserverv1.UnimplementedSourcegraphIndexserverServiceServer
236}
237
238var (
239 debugLog = log.New(io.Discard, "[DEBUG] ", log.LstdFlags)
240 infoLog = log.New(os.Stderr, "[INFO] ", log.LstdFlags)
241 errorLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags)
242)
243
244// our index commands should output something every 100mb they process.
245//
246// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
247// time." famous last words. A client was indexing a monorepo with 42
248// cores... 5m was not enough.
249const noOutputTimeout = 30 * time.Minute
250
251func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
252 out := &synchronizedBuffer{}
253 cmd.Stdout = out
254 cmd.Stderr = out
255
256 tr.LazyPrintf("%s", cmd.Args)
257
258 defer func() {
259 if err != nil {
260 outS := out.String()
261 tr.LazyPrintf("failed: %v", err)
262 tr.LazyPrintf("output: %s", out)
263 tr.SetError()
264 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
265 }
266 }()
267
268 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
269
270 if err := cmd.Start(); err != nil {
271 return err
272 }
273
274 errC := make(chan error)
275 go func() {
276 errC <- cmd.Wait()
277 }()
278
279 // This channel is set after we have sent sigquit. It allows us to follow up
280 // with a sigkill if the process doesn't quit after sigquit.
281 kill := make(<-chan time.Time)
282
283 lastLen := 0
284 for {
285 select {
286 case <-time.After(noOutputTimeout):
287 // Periodically check if we have had output. If not kill the process.
288 if out.Len() != lastLen {
289 lastLen = out.Len()
290 infoLog.Printf("still running %s", cmd.Args)
291 } else {
292 // Send quit (C-\) first so we get a stack dump.
293 infoLog.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
294 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
295 errorLog.Println("quit failed:", err)
296 }
297
298 // send sigkill if still running in 10s
299 kill = time.After(10 * time.Second)
300 }
301
302 case <-kill:
303 infoLog.Printf("still running, killing %s", cmd.Args)
304 if err := cmd.Process.Kill(); err != nil {
305 errorLog.Println("kill failed:", err)
306 }
307
308 case err := <-errC:
309 if err != nil {
310 return err
311 }
312
313 tr.LazyPrintf("success")
314 return nil
315 }
316 }
317}
318
319// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
320// monitor the buffer while it is being written to.
321type synchronizedBuffer struct {
322 mu sync.Mutex
323 b bytes.Buffer
324}
325
326func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
327 sb.mu.Lock()
328 defer sb.mu.Unlock()
329 return sb.b.Write(p)
330}
331
332func (sb *synchronizedBuffer) Len() int {
333 sb.mu.Lock()
334 defer sb.mu.Unlock()
335 return sb.b.Len()
336}
337
338func (sb *synchronizedBuffer) String() string {
339 sb.mu.Lock()
340 defer sb.mu.Unlock()
341 return sb.b.String()
342}
343
344// pauseFileName if present in IndexDir will stop index jobs from
345// running. This is to make it possible to experiment with the content of the
346// IndexDir without the indexserver writing to it.
347const pauseFileName = "PAUSE"
348
349// Run the sync loop. This blocks forever.
350func (s *Server) Run() {
351 removeIncompleteShards(s.IndexDir)
352
353 // Start a goroutine which updates the queue with commits to index.
354 go func() {
355 // We update the list of indexed repos every Interval. To speed up manual
356 // testing we also listen for SIGUSR1 to trigger updates.
357 //
358 // "pkill -SIGUSR1 zoekt-sourcegra"
359 for range jitterTicker(s.Interval, unix.SIGUSR1) {
360 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
361 infoLog.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
362 continue
363 }
364
365 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
366 if err != nil {
367 errorLog.Printf("error listing repos: %s", err)
368 continue
369 }
370
371 debugLog.Printf("updating index queue with %d repositories", len(repos.IDs))
372
373 // Stop indexing repos we don't need to track anymore
374 removed := s.queue.MaybeRemoveMissing(repos.IDs)
375 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
376 if len(removed) > 0 {
377 infoLog.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
378 }
379
380 cleanupDone := make(chan struct{})
381 go func() {
382 defer close(cleanupDone)
383 s.muIndexDir.Global(func() {
384 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
385 })
386 }()
387
388 repos.IterateIndexOptions(s.queue.AddOrUpdate)
389
390 // IterateIndexOptions will only iterate over repositories that have
391 // changed since we last called list. However, we want to add all IDs
392 // back onto the queue just to check that what is on disk is still
393 // correct. This will use the last IndexOptions we stored in the
394 // queue. The repositories not on the queue (missing) need a forced
395 // fetch of IndexOptions.
396 missing := s.queue.Bump(repos.IDs)
397 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
398
399 setShardsCounter(s.IndexDir)
400
401 <-cleanupDone
402 }
403 }()
404
405 go func() {
406 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
407 if s.shardMerging {
408 s.vacuum()
409 }
410 }
411 }()
412
413 go func() {
414 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
415 if s.shardMerging {
416 s.doMerge()
417 }
418 }
419 }()
420
421 for i := 0; i < s.IndexConcurrency; i++ {
422 go s.processQueue()
423 }
424
425 // block forever
426 select {}
427}
428
429// formatList returns a comma-separated list of the first min(len(v), m) items.
430func formatListUint32(v []uint32, m int) string {
431 if len(v) < m {
432 m = len(v)
433 }
434
435 sb := strings.Builder{}
436 for i := 0; i < m; i++ {
437 fmt.Fprintf(&sb, "%d, ", v[i])
438 }
439
440 if len(v) > m {
441 sb.WriteString("...")
442 }
443
444 return strings.TrimRight(sb.String(), ", ")
445}
446
447func (s *Server) processQueue() {
448 for {
449 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
450 time.Sleep(time.Second)
451 continue
452 }
453
454 item, ok := s.queue.Pop()
455 if !ok {
456 time.Sleep(time.Second)
457 continue
458 }
459
460 opts := item.Opts
461 args := s.indexArgs(opts)
462
463 ran := s.muIndexDir.With(opts.Name, func() {
464 // only record time taken once we hold the lock. This avoids us
465 // recording time taken while merging/cleanup runs.
466 start := time.Now()
467
468 state, err := s.Index(args)
469
470 elapsed := time.Since(start)
471 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
472
473 indexDelay := time.Since(item.DateAddedToQueue)
474 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds())
475
476 if err != nil {
477 errorLog.Printf("error indexing %s: %s", args.String(), err)
478 }
479
480 switch state {
481 case indexStateSuccess:
482 var branches []string
483 for _, b := range args.Branches {
484 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
485 }
486 s.logger.Info("updated index",
487 sglog.Int("tenant", args.TenantID),
488 sglog.String("repo", args.Name),
489 sglog.Uint32("id", args.RepoID),
490 sglog.Strings("branches", branches),
491 sglog.Duration("duration", elapsed),
492 sglog.Duration("index_delay", indexDelay),
493 )
494 case indexStateSuccessMeta:
495 infoLog.Printf("updated meta %s in %v", args.String(), elapsed)
496 }
497 s.queue.SetIndexed(opts, state)
498 })
499
500 if !ran {
501 // Someone else is processing the repository. We can just skip this job
502 // since the repository will be added back to the queue and we will
503 // converge to the correct behaviour.
504 debugLog.Printf("index job for repository already running: %s", args)
505 continue
506 }
507 }
508}
509
510// repoNameForMetric returns a normalized version of the given repository name that is
511// suitable for use with Prometheus metrics.
512func repoNameForMetric(repo string) string {
513 // Check to see if we want to be able to capture separate indexing metrics for this repository.
514 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
515 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
516 return repo
517 }
518
519 return ""
520}
521
522func batched(slice []uint32, size int) <-chan []uint32 {
523 c := make(chan []uint32)
524 go func() {
525 for len(slice) > 0 {
526 if size > len(slice) {
527 size = len(slice)
528 }
529 c <- slice[:size]
530 slice = slice[size:]
531 }
532 close(c)
533 }()
534 return c
535}
536
537// jitterTicker returns a ticker which ticks with a jitter. Each tick is
538// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
539//
540// sig is a list of signals which also cause the ticker to fire. This is a
541// convenience to allow manually triggering of the ticker.
542func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
543 ticker := make(chan struct{})
544
545 go func() {
546 for {
547 ticker <- struct{}{}
548 ns := int64(d)
549 jitter := rand.Int63n(ns)
550 time.Sleep(time.Duration(ns/2 + jitter))
551 }
552 }()
553
554 go func() {
555 if len(sig) == 0 {
556 return
557 }
558
559 c := make(chan os.Signal, 1)
560 signal.Notify(c, sig...)
561 for range c {
562 ticker <- struct{}{}
563 }
564 }()
565
566 return ticker
567}
568
569// Index starts an index job for repo name at commit.
570func (s *Server) Index(args *indexArgs) (state indexState, err error) {
571 tr := trace.New("index", args.Name)
572 tr.SetMaxEvents(30) // Ensure we capture all indexing events
573
574 defer func() {
575 if err != nil {
576 tr.SetError()
577 tr.LazyPrintf("error: %v", err)
578 state = indexStateFail
579 metricFailingTotal.Inc()
580 }
581 tr.LazyPrintf("state: %s", state)
582 tr.Finish()
583 }()
584
585 // Sourcegraph should always provide a tenant ID.
586 if args.TenantID < 1 {
587 return indexStateFail, tenant.ErrMissingTenant
588 }
589
590 tr.LazyPrintf("branches: %v", args.Branches)
591
592 if len(args.Branches) == 0 {
593 return indexStateEmpty, createEmptyShard(args)
594 }
595
596 repositoryName := args.Name
597 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
598 tr.LazyPrintf("marking this repository for delta build")
599 args.UseDelta = true
600 }
601
602 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
603
604 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
605 tr.LazyPrintf("skipping symbols calculation")
606 args.Symbols = false
607 }
608
609 reason := "forced"
610
611 if args.Incremental {
612 bo := args.BuildOptions()
613 bo.SetDefaults()
614 incrementalState, fn := bo.IndexState()
615 reason = string(incrementalState)
616 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
617
618 switch incrementalState {
619 case index.IndexStateEqual:
620 debugLog.Printf("%s index already up to date. Shard=%s", args.String(), fn)
621 return indexStateNoop, nil
622
623 case index.IndexStateMeta:
624 infoLog.Printf("updating index.meta %s", args.String())
625
626 // TODO(stefan) handle mergeMeta for tenant id.
627 if err := mergeMeta(bo); err != nil {
628 errorLog.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
629 } else {
630 return indexStateSuccessMeta, nil
631 }
632
633 case index.IndexStateCorrupt:
634 infoLog.Printf("falling back to full update: corrupt index: %s", args.String())
635 }
636 }
637
638 infoLog.Printf("updating index %s reason=%s", args.String(), reason)
639
640 metricIndexingTotal.Inc()
641 c := gitIndexConfig{
642 runCmd: func(cmd *exec.Cmd) error {
643 return s.loggedRun(tr, cmd)
644 },
645
646 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
647 return args.BuildOptions().FindRepositoryMetadata()
648 },
649 timeout: s.timeout,
650 }
651
652 err = gitIndex(c, args, s.Sourcegraph, s.logger)
653 if err != nil {
654 return indexStateFail, err
655 }
656
657 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
658 s.logger.Error("failed to update index status",
659 sglog.String("repo", args.Name),
660 sglog.Uint32("id", args.RepoID),
661 sglogBranches("branches", args.Branches),
662 sglog.Error(err),
663 )
664 }
665
666 return indexStateSuccess, nil
667}
668
669// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
670// it can update the zoekt_repos table.
671func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
672 // We need to read from disk for IndexTime.
673 _, metadata, ok, err := c.findRepositoryMetadata(args)
674 if err != nil {
675 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
676 }
677 if !ok {
678 return errors.New("failed to find metadata for new/updated index")
679 }
680
681 status := []indexStatus{{
682 RepoID: args.RepoID,
683 Branches: args.Branches,
684 IndexTimeUnix: metadata.IndexTime.Unix(),
685 }}
686 if err := sg.UpdateIndexStatus(status); err != nil {
687 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
688 }
689
690 return nil
691}
692
693func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
694 ss := make([]string, len(branches))
695 for i, b := range branches {
696 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
697 }
698 return sglog.Strings(key, ss)
699}
700
701func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
702 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
703 return &indexArgs{
704 IndexOptions: opts,
705 IndexDir: s.IndexDir,
706 Parallelism: parallelism,
707 Incremental: true,
708 FileLimit: MaxFileSize,
709 ShardMerging: s.shardMerging,
710 }
711}
712
713// parallelism consults both the server flags and index options to determine the number
714// of shards to index in parallel. If the CPUCount index option is provided, it always
715// overrides the server flag.
716func (s *Server) parallelism(opts IndexOptions, maxProcs int) int {
717 var parallelism int
718 if opts.ShardConcurrency > 0 {
719 parallelism = int(opts.ShardConcurrency)
720 } else {
721 parallelism = s.CPUCount
722 }
723
724 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs
725 if parallelism > 4*maxProcs {
726 parallelism = 4 * maxProcs
727 }
728
729 // If index concurrency is set, then divide the parallelism by the number of
730 // repos we're indexing in parallel
731 if s.IndexConcurrency > 1 {
732 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency)))
733 }
734
735 return parallelism
736}
737
738func createEmptyShard(args *indexArgs) error {
739 bo := args.BuildOptions()
740 bo.SetDefaults()
741 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
742
743 if args.Incremental && bo.IncrementalSkipIndexing() {
744 return nil
745 }
746
747 builder, err := index.NewBuilder(*bo)
748 if err != nil {
749 return err
750 }
751 return builder.Finish()
752}
753
754// addDebugHandlers adds handlers specific to indexserver.
755func (s *Server) addDebugHandlers(mux *http.ServeMux) {
756 // Sourcegraph's site admin view requires indexserver to serve it's admin view
757 // on "/".
758 mux.Handle("/", http.HandlerFunc(s.handleRoot))
759
760 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
761 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
762 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
763 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
764 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
765 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
766}
767
768func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
769 if r.Method != "GET" {
770 w.Header().Set("Allow", "GET")
771 w.WriteHeader(http.StatusMethodNotAllowed)
772 return
773 }
774
775 response := struct {
776 Hostname string
777 }{
778 Hostname: s.hostname,
779 }
780
781 b, err := json.Marshal(response)
782 if err != nil {
783 http.Error(w, err.Error(), http.StatusInternalServerError)
784 return
785 }
786
787 w.Header().Set("Content-Type", "application/json; charset=utf-8")
788 w.Write(b)
789}
790
791var rootTmpl = template.Must(template.New("name").Parse(`
792<html>
793 <body>
794 <a href="debug">Debug</a><br />
795 <a href="debug/requests">Traces</a><br />
796 {{.IndexMsg}}<br />
797 <br />
798 <h3>Reindex</h3>
799 {{if .Repos}}
800 <a href="?show_repos=false">hide repos</a><br />
801 <table style="margin-top: 20px">
802 <th style="text-align:left">Name</th>
803 <th style="text-align:left">ID (click to reindex)</th>
804 {{range .Repos}}
805 <tr>
806 <td>{{.Name}}</td>
807 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
808 </tr>
809 {{end}}
810 </table>
811 {{else}}
812 <a href="?show_repos=true">show repos</a><br />
813 {{end}}
814 </body>
815</html>
816`))
817
818func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
819 if r.Method != "GET" {
820 w.Header().Set("Allow", "GET")
821 w.WriteHeader(http.StatusMethodNotAllowed)
822 return
823 }
824
825 values := r.URL.Query()
826
827 // ?id=
828 indexMsg := ""
829 if v := values.Get("id"); v != "" {
830 id, err := strconv.Atoi(v)
831 if err != nil {
832 http.Error(w, err.Error(), http.StatusBadRequest)
833 return
834 }
835 indexMsg, _ = s.forceIndex(uint32(id))
836 }
837
838 // ?show_repos=
839 showRepos := false
840 if v := values.Get("show_repos"); v != "" {
841 showRepos, _ = strconv.ParseBool(v)
842 }
843
844 type Repo struct {
845 ID uint32
846 Name string
847 }
848 var data struct {
849 Repos []Repo
850 IndexMsg string
851 }
852
853 data.IndexMsg = indexMsg
854
855 if showRepos {
856 s.queue.Iterate(func(opts *IndexOptions) {
857 data.Repos = append(data.Repos, Repo{
858 ID: opts.RepoID,
859 Name: opts.Name,
860 })
861 })
862 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
863 }
864
865 _ = rootTmpl.Execute(w, data)
866}
867
868// handleReindex triggers a reindex asynocronously. If a reindex was triggered
869// the request returns with status 202. The caller can infer the new state of
870// the index by calling List.
871func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
872 if r.Method != http.MethodPost {
873 w.Header().Set("Allow", http.MethodPost)
874 w.WriteHeader(http.StatusMethodNotAllowed)
875 return
876 }
877
878 err := r.ParseForm()
879 if err != nil {
880 http.Error(w, err.Error(), http.StatusBadRequest)
881 return
882 }
883
884 id, err := strconv.Atoi(r.Form.Get("repo"))
885 if err != nil {
886 http.Error(w, err.Error(), http.StatusBadRequest)
887 return
888 }
889
890 go func() { s.forceIndex(uint32(id)) }()
891
892 // 202 Accepted
893 w.WriteHeader(http.StatusAccepted)
894}
895
896func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
897 withIndexed := true
898 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
899 withIndexed = b
900 }
901
902 var indexed []uint32
903 if withIndexed {
904 indexed = listIndexed(s.IndexDir)
905 }
906
907 repos, err := s.Sourcegraph.List(r.Context(), indexed)
908 if err != nil {
909 http.Error(w, err.Error(), http.StatusInternalServerError)
910 return
911 }
912
913 bw := bytes.Buffer{}
914
915 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
916
917 _, err = fmt.Fprintf(tw, "ID\tName\n")
918 if err != nil {
919 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
920 return
921 }
922
923 s.queue.mu.Lock()
924 name := ""
925 for _, id := range repos.IDs {
926 if item := s.queue.get(id); item != nil {
927 name = item.opts.Name
928 } else {
929 name = ""
930 }
931 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
932 if err != nil {
933 debugLog.Printf("handleDebugList: %s\n", err.Error())
934 }
935 }
936 s.queue.mu.Unlock()
937
938 if err != nil {
939 http.Error(w, err.Error(), http.StatusInternalServerError)
940 return
941 }
942
943 err = tw.Flush()
944 if err != nil {
945 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
946 return
947 }
948
949 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
950
951 if _, err := io.Copy(w, &bw); err != nil {
952 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
953 return
954 }
955}
956
957// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
958// can run this command during periods of low usage (evenings, weekends) to
959// trigger an initial merge run. In the steady-state, merges happen rarely, even
960// on busy instances, and users can rely on automatic merging instead.
961func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
962 // A merge operation can take very long, depending on the number merges and the
963 // target size of the compound shards. We run the merge in the background and
964 // return immediately to the user.
965 //
966 // We track the status of the merge with metricShardMergingRunning.
967 go func() {
968 s.doMerge()
969 }()
970 _, _ = w.Write([]byte("merging enqueued\n"))
971}
972
973func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
974 indexed := listIndexed(s.IndexDir)
975
976 bw := bytes.Buffer{}
977
978 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
979
980 _, err := fmt.Fprintf(tw, "ID\tName\n")
981 if err != nil {
982 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
983 return
984 }
985
986 s.queue.mu.Lock()
987 name := ""
988 for _, id := range indexed {
989 if item := s.queue.get(id); item != nil {
990 name = item.opts.Name
991 } else {
992 name = ""
993 }
994 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
995 if err != nil {
996 debugLog.Printf("handleDebugIndexed: %s\n", err.Error())
997 }
998 }
999 s.queue.mu.Unlock()
1000
1001 if err != nil {
1002 http.Error(w, err.Error(), http.StatusInternalServerError)
1003 return
1004 }
1005
1006 err = tw.Flush()
1007 if err != nil {
1008 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
1009 return
1010 }
1011
1012 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
1013
1014 if _, err := io.Copy(w, &bw); err != nil {
1015 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
1016 return
1017 }
1018}
1019
1020// forceIndex will run the index job for repo name now. It will return always
1021// return a string explaining what it did, even if it failed.
1022func (s *Server) forceIndex(id uint32) (string, error) {
1023 var opts IndexOptions
1024 var err error
1025 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
1026 opts = o
1027 }, func(_ uint32, e error) {
1028 err = e
1029 }, id)
1030 if err != nil {
1031 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
1032 }
1033
1034 args := s.indexArgs(opts)
1035 args.Incremental = false // force re-index
1036
1037 var state indexState
1038 ran := s.muIndexDir.With(opts.Name, func() {
1039 state, err = s.Index(args)
1040 })
1041 if !ran {
1042 return fmt.Sprintf("index job for repository already running: %s", args), nil
1043 }
1044 if err != nil {
1045 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
1046 }
1047 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
1048}
1049
1050// DeleteAllData deletes all shards in the index and trash dir belonging to the
1051// tenant associated with the request. The deletion is best-effort, which means
1052// we will delete as much as possible. If no error is returned, the caller can
1053// be certain that all data has been deleted.
1054func (s *Server) DeleteAllData(ctx context.Context, _ *indexserverv1.DeleteAllDataRequest) (*indexserverv1.DeleteAllDataResponse, error) {
1055 tnt, err := tenant.FromContext(ctx)
1056 if err != nil {
1057 return nil, err
1058 }
1059 s.logger.Warn("DeleteAllData", sglog.Int("tenant_id", tnt.ID()))
1060
1061 var merr error
1062 s.muIndexDir.Global(func() {
1063 // First, explode all compound shards that have repos from the tenant in
1064 // question. Because we hold the global lock, we can be sure that no new
1065 // merges start while we do this.
1066 if err := s.explodeTenantCompoundShards(ctx, func(path string) error {
1067 // We call explode in a separate process to protect indexserver.
1068 cmd := defaultExplodeCmd(path)
1069
1070 stdoutBuf := &bytes.Buffer{}
1071 stderrBuf := &bytes.Buffer{}
1072 cmd.Stdout = stdoutBuf
1073 cmd.Stderr = stderrBuf
1074
1075 err := cmd.Run()
1076 if err != nil {
1077 errorLog.Printf("explode failed: %v (stderr: %s)", err, stderrBuf.String())
1078 return err
1079 }
1080
1081 infoLog.Printf("exploded shard: %s", stdoutBuf.String())
1082
1083 return nil
1084 }); err != nil {
1085 merr = multierr.Append(merr, err)
1086 }
1087
1088 // Invariant: all shards from the tenant are simple shards.
1089
1090 if err := purgeTenantShards(ctx, s.IndexDir); err != nil {
1091 merr = multierr.Append(merr, err)
1092 }
1093 if err := purgeTenantShards(ctx, filepath.Join(s.IndexDir, ".trash")); err != nil {
1094 merr = multierr.Append(merr, err)
1095 }
1096 })
1097
1098 return &indexserverv1.DeleteAllDataResponse{}, merr
1099}
1100
1101func listIndexed(indexDir string) []uint32 {
1102 index := getShards(indexDir)
1103 metricNumIndexed.Set(float64(len(index)))
1104 repoIDs := make([]uint32, 0, len(index))
1105 for id := range index {
1106 repoIDs = append(repoIDs, id)
1107 }
1108 sort.Slice(repoIDs, func(i, j int) bool {
1109 return repoIDs[i] < repoIDs[j]
1110 })
1111 return repoIDs
1112}
1113
1114// setupTmpDir sets up a temporary directory on the same volume as the
1115// indexes.
1116//
1117// If main is true we will delete older temp directories left around. main is
1118// false when this is a debug command.
1119func setupTmpDir(logger sglog.Logger, main bool, index string) error {
1120 // change the target tmp directory depending on if it's our main daemon or a
1121 // debug sub command.
1122 dir := ".indexserver.debug.tmp"
1123 if main {
1124 dir = ".indexserver.tmp"
1125 }
1126
1127 tmpRoot := filepath.Join(index, dir)
1128
1129 if main {
1130 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot))
1131 err := os.RemoveAll(tmpRoot)
1132 if err != nil {
1133 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err))
1134 }
1135 }
1136
1137 if err := os.MkdirAll(tmpRoot, 0o755); err != nil {
1138 return err
1139 }
1140
1141 return os.Setenv("TMPDIR", tmpRoot)
1142}
1143
1144func printMetaData(fn string) error {
1145 repo, indexMeta, err := index.ReadMetadataPath(fn)
1146 if err != nil {
1147 return err
1148 }
1149
1150 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1151 if err != nil {
1152 return err
1153 }
1154
1155 err = json.NewEncoder(os.Stdout).Encode(repo)
1156 if err != nil {
1157 return err
1158 }
1159 return nil
1160}
1161
1162func printShardStats(fn string) error {
1163 f, err := os.Open(fn)
1164 if err != nil {
1165 return err
1166 }
1167
1168 iFile, err := index.NewIndexFile(f)
1169 if err != nil {
1170 return err
1171 }
1172
1173 return index.PrintNgramStats(iFile)
1174}
1175
1176func srcLogLevelIsDebug() bool {
1177 lvl := os.Getenv(sglog.EnvLogLevel)
1178 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1179}
1180
1181func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1182 v := os.Getenv(k)
1183 if v == "" {
1184 return defaultVal
1185 }
1186 b, err := strconv.ParseBool(v)
1187 if err != nil {
1188 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1189 }
1190 return b
1191}
1192
1193func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1194 v := os.Getenv(k)
1195 if v == "" {
1196 return defaultVal
1197 }
1198 i, err := strconv.ParseInt(v, 10, 64)
1199 if err != nil {
1200 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1201 }
1202 return i
1203}
1204
1205func getEnvWithDefaultInt(k string, defaultVal int) int {
1206 v := os.Getenv(k)
1207 if v == "" {
1208 return defaultVal
1209 }
1210 i, err := strconv.Atoi(k)
1211 if err != nil {
1212 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1213 }
1214 return i
1215}
1216
1217func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1218 v := os.Getenv(k)
1219 if v == "" {
1220 return defaultVal
1221 }
1222 i, err := strconv.ParseUint(v, 10, 64)
1223 if err != nil {
1224 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1225 }
1226 return i
1227}
1228
1229func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1230 v := os.Getenv(k)
1231 if v == "" {
1232 return defaultVal
1233 }
1234 f, err := strconv.ParseFloat(v, 64)
1235 if err != nil {
1236 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1237 }
1238 return f
1239}
1240
1241func getEnvWithDefaultString(k string, defaultVal string) string {
1242 v := os.Getenv(k)
1243 if v == "" {
1244 return defaultVal
1245 }
1246 return v
1247}
1248
1249func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1250 v := os.Getenv(k)
1251 if v == "" {
1252 return defaultVal
1253 }
1254
1255 d, err := time.ParseDuration(v)
1256 if err != nil {
1257 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1258 }
1259 return d
1260}
1261
1262func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1263 set := map[string]struct{}{}
1264 for _, v := range strings.Split(os.Getenv(k), ",") {
1265 v = strings.TrimSpace(v)
1266 if v != "" {
1267 set[v] = struct{}{}
1268 }
1269 }
1270 return set
1271}
1272
1273func joinStringSet(set map[string]struct{}, sep string) string {
1274 var xs []string
1275 for x := range set {
1276 xs = append(xs, x)
1277 }
1278
1279 return strings.Join(xs, sep)
1280}
1281
1282func setShardsCounter(indexDir string) {
1283 fns, err := filepath.Glob(filepath.Join(indexDir, "*.zoekt"))
1284 if err != nil {
1285 errorLog.Printf("setShardsCounter: %s\n", err)
1286 return
1287 }
1288 metricNumberShards.Set(float64(len(fns)))
1289
1290 compoundFns := make([]string, 0, len(fns))
1291 for _, fn := range fns {
1292 if strings.HasPrefix(filepath.Base(fn), "compound-") {
1293 compoundFns = append(compoundFns, fn)
1294 }
1295 }
1296 metricNumberCompoundShards.Set(float64(len(compoundFns)))
1297}
1298
1299func rootCmd() *ffcli.Command {
1300 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1301 conf := rootConfig{
1302 Main: true,
1303 }
1304 conf.registerRootFlags(rootFs)
1305
1306 return &ffcli.Command{
1307 FlagSet: rootFs,
1308 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1309 Subcommands: []*ffcli.Command{debugCmd()},
1310 Exec: func(ctx context.Context, args []string) error {
1311 return startServer(conf)
1312 },
1313 }
1314}
1315
1316type rootConfig struct {
1317 // Main is true if this rootConfig is for our main long running command (the
1318 // indexserver). Debug commands should not set this value. This is used to
1319 // determine if we need to run tmpfriend.
1320 Main bool
1321
1322 root string
1323 interval time.Duration
1324 index string
1325 indexConcurrency int64
1326 listen string
1327 hostname string
1328 cpuFraction float64
1329
1330 // config values related to shard merging
1331 disableShardMerging bool
1332 vacuumInterval time.Duration
1333 mergeInterval time.Duration
1334 targetSize int64
1335 minSize int64
1336 minAgeDays int
1337
1338 // config values related to backoff indexing repos with one or more consecutive failures
1339 backoffDuration time.Duration
1340 maxBackoffDuration time.Duration
1341}
1342
1343func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1344 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1345 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1346 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
1347 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", index.DefaultDir), "set index directory to use")
1348 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1349 fs.StringVar(&rc.hostname, "hostname", index.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1350 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1351 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1352 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1353
1354 // flags related to shard merging
1355 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging")
1356 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1357 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1358 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB")
1359 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB")
1360 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1361}
1362
1363func startServer(conf rootConfig) error {
1364 s, err := newServer(conf)
1365 if err != nil {
1366 return err
1367 }
1368
1369 profiler.Init("zoekt-sourcegraph-indexserver")
1370 setShardsCounter(s.IndexDir)
1371
1372 if conf.listen != "" {
1373
1374 mux := http.NewServeMux()
1375 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1376 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1377 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1378 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1379 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1380 }...)
1381 s.addDebugHandlers(mux)
1382
1383 go func() {
1384 debugLog.Printf("serving HTTP on %s", conf.listen)
1385 mux := grpcutil.MultiplexGRPC(newGRPCServer(sglog.Scoped("indexserver"), s), mux)
1386 log.Fatal(http.ListenAndServe(conf.listen, mux))
1387 }()
1388
1389 // Serve mux on a unix domain socket on a best-effort-basis so that
1390 // webserver can call the endpoints via the shared filesystem.
1391 //
1392 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1393 // on the socket due to permission errors. See
1394 // https://github.com/docker/for-mac/issues/6239
1395 go func() {
1396 serveHTTPOverSocket := func() error {
1397 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1398 // We cannot bind a socket to an existing pathname.
1399 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1400 return fmt.Errorf("error removing socket file: %s", socket)
1401 }
1402 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1403 // unixpacket).
1404 l, err := net.Listen("unix", socket)
1405 if err != nil {
1406 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1407 }
1408 // Indexserver (root) and webserver (Sourcegraph) run with
1409 // different users. Per default, the socket is created with
1410 // permission 755 (root root), which doesn't let webserver write to
1411 // it.
1412 //
1413 // See https://github.com/golang/go/issues/11822 for more context.
1414 if err := os.Chmod(socket, 0o777); err != nil {
1415 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1416 }
1417 debugLog.Printf("serving HTTP on %s", socket)
1418 return http.Serve(l, mux)
1419 }
1420 debugLog.Print(serveHTTPOverSocket())
1421 }()
1422 }
1423
1424 oc := &ownerChecker{
1425 Path: filepath.Join(conf.index, "owner.txt"),
1426 Hostname: conf.hostname,
1427 }
1428 go oc.Run()
1429
1430 logger := sglog.Scoped("metricsRegistration")
1431 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1432
1433 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1434 prometheus.DefaultRegisterer.MustRegister(c)
1435
1436 s.Run()
1437 return nil
1438}
1439
1440func newServer(conf rootConfig) (*Server, error) {
1441 logger := sglog.Scoped("server")
1442
1443 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1444 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1445 }
1446 if conf.index == "" {
1447 return nil, fmt.Errorf("must set -index")
1448 }
1449 if conf.root == "" {
1450 return nil, fmt.Errorf("must set -sourcegraph_url")
1451 }
1452 rootURL, err := url.Parse(conf.root)
1453 if err != nil {
1454 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1455 }
1456
1457 rootURL = addDefaultPort(rootURL)
1458
1459 // Tune GOMAXPROCS to match Linux container CPU quota.
1460 _, _ = maxprocs.Set()
1461
1462 // Automatically prepend our own path at the front, to minimize
1463 // required configuration.
1464 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1465 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1466 }
1467
1468 if _, err := os.Stat(conf.index); err != nil {
1469 if err := os.MkdirAll(conf.index, 0o755); err != nil {
1470 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1471 }
1472 }
1473
1474 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil {
1475 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1476 }
1477
1478 if srcLogLevelIsDebug() {
1479 debugLog.SetOutput(os.Stderr)
1480 }
1481
1482 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1483 if len(reposWithSeparateIndexingMetrics) > 0 {
1484 debugLog.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1485 }
1486
1487 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1488 if len(deltaBuildRepositoriesAllowList) > 0 {
1489 debugLog.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1490 }
1491
1492 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1493 if deltaShardNumberFallbackThreshold > 0 {
1494 debugLog.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1495 } else {
1496 debugLog.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1497 }
1498
1499 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1500 if len(reposShouldSkipSymbolsCalculation) > 0 {
1501 debugLog.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1502 }
1503
1504 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout)
1505 if indexingTimeout != defaultIndexingTimeout {
1506 debugLog.Printf("using configured indexing timeout: %s", indexingTimeout)
1507 }
1508
1509 var sg Sourcegraph
1510 if rootURL.IsAbs() {
1511 var batchSize int
1512 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1513 batchSize, err = strconv.Atoi(v)
1514 if err != nil {
1515 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1516 }
1517 }
1518
1519 opts := []SourcegraphClientOption{
1520 WithBatchSize(batchSize),
1521 }
1522
1523 logger := sglog.Scoped("zoektConfigurationGRPCClient")
1524 client, err := dialGRPCClient(rootURL.Host, logger)
1525 if err != nil {
1526 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1527 }
1528
1529 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...)
1530
1531 } else {
1532 sg = sourcegraphFake{
1533 RootDir: rootURL.String(),
1534 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1535 }
1536 }
1537
1538 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
1539 if cpuCount < 1 {
1540 cpuCount = 1
1541 }
1542
1543 if conf.indexConcurrency < 1 {
1544 conf.indexConcurrency = 1
1545 } else if conf.indexConcurrency > int64(cpuCount) {
1546 conf.indexConcurrency = int64(cpuCount)
1547 }
1548
1549 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1550
1551 return &Server{
1552 logger: logger,
1553 Sourcegraph: sg,
1554 IndexDir: conf.index,
1555 IndexConcurrency: int(conf.indexConcurrency),
1556 Interval: conf.interval,
1557 CPUCount: cpuCount,
1558 queue: *q,
1559 shardMerging: !conf.disableShardMerging,
1560 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1561 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1562 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1563 hostname: conf.hostname,
1564 mergeOpts: mergeOpts{
1565 vacuumInterval: conf.vacuumInterval,
1566 mergeInterval: conf.mergeInterval,
1567 targetSizeBytes: conf.targetSize * 1024 * 1024,
1568 minSizeBytes: conf.minSize * 1024 * 1024,
1569 minAgeDays: conf.minAgeDays,
1570 },
1571 timeout: indexingTimeout,
1572 }, err
1573}
1574
1575func newGRPCServer(logger sglog.Logger, s *Server, additionalOpts ...grpc.ServerOption) *grpc.Server {
1576 grpcServer := defaults.NewServer(logger, additionalOpts...)
1577 indexserverv1.RegisterSourcegraphIndexserverServiceServer(grpcServer, s)
1578 return grpcServer
1579}
1580
1581// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1582// for the indexed-search-configuration gRPC service.
1583//
1584// The default backoff strategy is modeled after the default settings used by
1585// retryablehttp.DefaultClient.
1586//
1587// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1588// - Unavailable
1589// - Aborted
1590//
1591//go:embed default_grpc_service_configuration.json
1592var defaultGRPCServiceConfigurationJSON string
1593
1594func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1595 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1596 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1597 return invoker(ctx, method, req, reply, cc, opts...)
1598 }
1599}
1600
1601func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1602 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1603 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1604 return streamer(ctx, desc, cc, method, opts...)
1605 }
1606}
1607
1608// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1609// This can be overridden by providing custom Server/Dial options.
1610const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1611
1612func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (configv1.ZoektConfigurationServiceClient, error) {
1613 metrics := clientMetricsOnce()
1614
1615 // If the service seems to be unavailable, this
1616 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1
1617 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s])
1618 retryOpts := []retry.CallOption{
1619 retry.WithMax(5),
1620 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)),
1621 retry.WithCodes(codes.Unavailable),
1622 }
1623
1624 opts := []grpc.DialOption{
1625 grpc.WithTransportCredentials(insecure.NewCredentials()),
1626 grpc.WithChainStreamInterceptor(
1627 metrics.StreamClientInterceptor(),
1628 messagesize.StreamClientInterceptor,
1629 internalActorStreamInterceptor(),
1630 internalerrs.LoggingStreamClientInterceptor(logger),
1631 internalerrs.PrometheusStreamClientInterceptor,
1632 retry.StreamClientInterceptor(retryOpts...),
1633 ),
1634 grpc.WithChainUnaryInterceptor(
1635 metrics.UnaryClientInterceptor(),
1636 messagesize.UnaryClientInterceptor,
1637 internalActorUnaryInterceptor(),
1638 internalerrs.LoggingUnaryClientInterceptor(logger),
1639 internalerrs.PrometheusUnaryClientInterceptor,
1640 retry.UnaryClientInterceptor(retryOpts...),
1641 ),
1642 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1643 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1644 }
1645
1646 opts = append(opts, additionalOpts...)
1647
1648 // Ensure that the message size options are set last, so they override any other
1649 // client-specific options that tweak the message size.
1650 //
1651 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1652 // take precedence over everything else with a uniform size setting that's easy to reason about.
1653 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1654
1655 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1656 // This is done lazily, so we can provide the client to use regardless of
1657 // whether we enabled gRPC or not initially.
1658 cc, err := grpc.Dial(addr, opts...)
1659 if err != nil {
1660 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1661 }
1662
1663 client := configv1.NewZoektConfigurationServiceClient(cc)
1664 return client, nil
1665}
1666
1667// addDefaultPort adds a default port to a URL if one is not specified.
1668//
1669// If the URL scheme is "http" and no port is specified, "80" is used.
1670// If the scheme is "https", "443" is used.
1671//
1672// The original URL is not mutated. A copy is modified and returned.
1673func addDefaultPort(original *url.URL) *url.URL {
1674 if original == nil {
1675 return nil // don't panic
1676 }
1677
1678 if !original.IsAbs() {
1679 return original // don't do anything if the URL doesn't look like a remote URL
1680 }
1681
1682 if original.Scheme == "http" && original.Port() == "" {
1683 u := cloneURL(original)
1684 u.Host = net.JoinHostPort(u.Host, "80")
1685 return u
1686 }
1687
1688 if original.Scheme == "https" && original.Port() == "" {
1689 u := cloneURL(original)
1690 u.Host = net.JoinHostPort(u.Host, "443")
1691 return u
1692 }
1693
1694 return original
1695}
1696
1697// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1698// This is copied from net/http/clone.go
1699func cloneURL(u *url.URL) *url.URL {
1700 if u == nil {
1701 return nil
1702 }
1703 u2 := new(url.URL)
1704 *u2 = *u
1705 if u.User != nil {
1706 u2.User = new(url.Userinfo)
1707 *u2.User = *u.User
1708 }
1709 return u2
1710}
1711
1712func main() {
1713 liblog := sglog.Init(sglog.Resource{
1714 Name: "zoekt-indexserver",
1715 Version: index.Version,
1716 InstanceID: index.HostnameBestEffort(),
1717 })
1718 defer liblog.Sync()
1719
1720 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1721 log.Fatal(err)
1722 }
1723}
1724
1725// getBoolFromEnvironmentVariables returns the boolean defined by the first environment
1726// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set.
1727//
1728// An error is returned of the provided environment variables fails to parse as a boolean.
1729func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) {
1730 for _, envVar := range envVarNames {
1731 v := os.Getenv(envVar)
1732 if v == "" {
1733 continue
1734 }
1735
1736 b, err := strconv.ParseBool(v)
1737 if err != nil {
1738 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err)
1739 }
1740
1741 return b, nil
1742 }
1743
1744 return defaultBool, nil
1745}