fork of https://github.com/sourcegraph/zoekt
1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5// http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12
13// Command zoekt-sourcegraph-indexserver periodically reindexes repositories
14// from a Sourcegraph instance. It uses a "pull-based" design, where it periodically
15// reaches out to the Sourcegraph instance for the list of repositories to reindex.
16package main
17
18import (
19 "bytes"
20 "context"
21 _ "embed"
22 "encoding/json"
23 "errors"
24 "flag"
25 "fmt"
26 "html/template"
27 "io"
28 "io/fs"
29 "log"
30 "math"
31 "math/rand"
32 "net"
33 "net/http"
34 "net/url"
35 "os"
36 "os/exec"
37 "os/signal"
38 "path/filepath"
39 "runtime"
40 "sort"
41 "strconv"
42 "strings"
43 "sync"
44 "text/tabwriter"
45 "time"
46
47 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
48 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
49 "github.com/peterbourgon/ff/v3/ffcli"
50 "github.com/prometheus/client_golang/prometheus"
51 "github.com/prometheus/client_golang/prometheus/promauto"
52 sglog "github.com/sourcegraph/log"
53 "github.com/sourcegraph/mountinfo"
54 "github.com/sourcegraph/zoekt"
55 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
56 "github.com/sourcegraph/zoekt/grpc/internalerrs"
57 "github.com/sourcegraph/zoekt/grpc/messagesize"
58 "github.com/sourcegraph/zoekt/index"
59 "github.com/sourcegraph/zoekt/internal/debugserver"
60 "github.com/sourcegraph/zoekt/internal/profiler"
61 "github.com/sourcegraph/zoekt/internal/tenant"
62
63 "go.uber.org/automaxprocs/maxprocs"
64 "golang.org/x/net/trace"
65 "golang.org/x/sys/unix"
66 "google.golang.org/grpc"
67 "google.golang.org/grpc/codes"
68 "google.golang.org/grpc/credentials/insecure"
69 "google.golang.org/grpc/metadata"
70)
71
72var (
73 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
74 Name: "resolve_revisions_seconds",
75 Help: "A histogram of latencies for resolving all repository revisions.",
76 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min
77 })
78
79 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
80 Name: "resolve_revision_seconds",
81 Help: "A histogram of latencies for resolving a repository revision.",
82 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s
83 }, []string{"success"}) // success=true|false
84
85 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{
86 Name: "get_index_options_total",
87 Help: "The total number of times we tried to get index options for a repository. Includes errors.",
88 })
89 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{
90 Name: "get_index_options_error_total",
91 Help: "The total number of times we failed to get index options for a repository.",
92 })
93
94 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
95 Name: "index_repo_seconds",
96 Help: "A histogram of latencies for indexing a repository.",
97 Buckets: prometheus.ExponentialBucketsRange(
98 (100 * time.Millisecond).Seconds(),
99 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo
100 20),
101 }, []string{
102 "state", // state is an indexState
103 "name", // name of the repository that was indexed
104 })
105
106 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{
107 Name: "index_indexing_delay_seconds",
108 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.",
109 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days
110 }, []string{
111 "state", // state is an indexState
112 "name", // the name of the repository that was indexed
113 })
114
115 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
116 Name: "index_fetch_seconds",
117 Help: "A histogram of latencies for fetching a repository.",
118 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes
119 }, []string{
120 "success", // true|false
121 "name", // the name of the repository that the commits were fetched from
122 })
123
124 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{
125 Name: "index_incremental_index_state",
126 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.",
127 }, []string{"state"}) // state is index.IndexState
128
129 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{
130 Name: "index_num_indexed",
131 Help: "Number of indexed repos by code host",
132 })
133
134 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{
135 Name: "index_failing_total",
136 Help: "Counts failures to index (indexing activity, should be used with rate())",
137 })
138
139 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{
140 Name: "index_indexing_total",
141 Help: "Counts indexings (indexing activity, should be used with rate())",
142 })
143
144 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{
145 Name: "index_num_stopped_tracking_total",
146 Help: "Counts the number of repos we stopped tracking.",
147 })
148
149 // clientMetricsOnce returns a singleton instance of the client metrics
150 // that are shared across all gRPC clients that this process creates.
151 //
152 // This function panics if the metrics cannot be registered with the default
153 // Prometheus registry.
154 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics {
155 clientMetrics := grpcprom.NewClientMetrics(
156 grpcprom.WithClientCounterOptions(),
157 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
158 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
159 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
160 )
161 prometheus.DefaultRegisterer.MustRegister(clientMetrics)
162 return clientMetrics
163 })
164)
165
166// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
167// NOTE: if you change this, you must also update gitIndex to use the same value when fetching the repo.
168const MaxFileSize = 1 << 20
169
170// set of repositories that we want to capture separate indexing metrics for
171var reposWithSeparateIndexingMetrics = make(map[string]struct{})
172
173type indexState string
174
175const (
176 indexStateFail indexState = "fail"
177 indexStateSuccess indexState = "success"
178 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata
179 indexStateNoop indexState = "noop" // We didn't need to update index
180 indexStateEmpty indexState = "empty" // index is empty (empty repo)
181)
182
183// Server is the main functionality of zoekt-sourcegraph-indexserver. It
184// exists to conveniently use all the options passed in via func main.
185type Server struct {
186 logger sglog.Logger
187
188 Sourcegraph Sourcegraph
189 BatchSize int
190
191 // IndexDir is the index directory to use.
192 IndexDir string
193
194 // IndexConcurrency is the number of repositories we index at once.
195 IndexConcurrency int
196
197 // Interval is how often we sync with Sourcegraph.
198 Interval time.Duration
199
200 // CPUCount is the number of CPUs to use for indexing shards.
201 CPUCount int
202
203 queue Queue
204
205 // muIndexDir protects the index directory from concurrent access.
206 muIndexDir indexMutex
207
208 // If true, shard merging is enabled.
209 shardMerging bool
210
211 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we
212 // use delta-builds for instead of normal builds
213 deltaBuildRepositoriesAllowList map[string]struct{}
214
215 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
216 // before attempting a delta build.
217 deltaShardNumberFallbackThreshold uint64
218
219 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that
220 // we skip calculating symbols metadata for during builds
221 repositoriesSkipSymbolsCalculationAllowList map[string]struct{}
222
223 hostname string
224
225 mergeOpts mergeOpts
226
227 // timeout defines how long the index server waits before killing an indexing job.
228 timeout time.Duration
229}
230
231var (
232 debugLog = log.New(io.Discard, "[DEBUG] ", log.LstdFlags)
233 infoLog = log.New(os.Stderr, "[INFO] ", log.LstdFlags)
234 errorLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags)
235)
236
237// our index commands should output something every 100mb they process.
238//
239// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough
240// time." famous last words. A client was indexing a monorepo with 42
241// cores... 5m was not enough.
242const noOutputTimeout = 30 * time.Minute
243
244func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) {
245 out := &synchronizedBuffer{}
246 cmd.Stdout = out
247 cmd.Stderr = out
248
249 tr.LazyPrintf("%s", cmd.Args)
250
251 defer func() {
252 if err != nil {
253 outS := out.String()
254 tr.LazyPrintf("failed: %v", err)
255 tr.LazyPrintf("output: %s", out)
256 tr.SetError()
257 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS)
258 }
259 }()
260
261 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args))
262
263 if err := cmd.Start(); err != nil {
264 return err
265 }
266
267 errC := make(chan error)
268 go func() {
269 errC <- cmd.Wait()
270 }()
271
272 // This channel is set after we have sent sigquit. It allows us to follow up
273 // with a sigkill if the process doesn't quit after sigquit.
274 kill := make(<-chan time.Time)
275
276 lastLen := 0
277 for {
278 select {
279 case <-time.After(noOutputTimeout):
280 // Periodically check if we have had output. If not kill the process.
281 if out.Len() != lastLen {
282 lastLen = out.Len()
283 infoLog.Printf("still running %s", cmd.Args)
284 } else {
285 // Send quit (C-\) first so we get a stack dump.
286 infoLog.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args)
287 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil {
288 errorLog.Println("quit failed:", err)
289 }
290
291 // send sigkill if still running in 10s
292 kill = time.After(10 * time.Second)
293 }
294
295 case <-kill:
296 infoLog.Printf("still running, killing %s", cmd.Args)
297 if err := cmd.Process.Kill(); err != nil {
298 errorLog.Println("kill failed:", err)
299 }
300
301 case err := <-errC:
302 if err != nil {
303 return err
304 }
305
306 tr.LazyPrintf("success")
307 return nil
308 }
309 }
310}
311
312// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can
313// monitor the buffer while it is being written to.
314type synchronizedBuffer struct {
315 mu sync.Mutex
316 b bytes.Buffer
317}
318
319func (sb *synchronizedBuffer) Write(p []byte) (int, error) {
320 sb.mu.Lock()
321 defer sb.mu.Unlock()
322 return sb.b.Write(p)
323}
324
325func (sb *synchronizedBuffer) Len() int {
326 sb.mu.Lock()
327 defer sb.mu.Unlock()
328 return sb.b.Len()
329}
330
331func (sb *synchronizedBuffer) String() string {
332 sb.mu.Lock()
333 defer sb.mu.Unlock()
334 return sb.b.String()
335}
336
337// pauseFileName if present in IndexDir will stop index jobs from
338// running. This is to make it possible to experiment with the content of the
339// IndexDir without the indexserver writing to it.
340const pauseFileName = "PAUSE"
341
342// Run the sync loop. This blocks forever.
343func (s *Server) Run() {
344 removeIncompleteShards(s.IndexDir)
345
346 // Start a goroutine which updates the queue with commits to index.
347 go func() {
348 // We update the list of indexed repos every Interval. To speed up manual
349 // testing we also listen for SIGUSR1 to trigger updates.
350 //
351 // "pkill -SIGUSR1 zoekt-sourcegra"
352 for range jitterTicker(s.Interval, unix.SIGUSR1) {
353 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
354 infoLog.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b)))
355 continue
356 }
357
358 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir))
359 if err != nil {
360 errorLog.Printf("error listing repos: %s", err)
361 continue
362 }
363
364 debugLog.Printf("updating index queue with %d repositories", len(repos.IDs))
365
366 // Stop indexing repos we don't need to track anymore
367 removed := s.queue.MaybeRemoveMissing(repos.IDs)
368 metricNumStoppedTrackingTotal.Add(float64(len(removed)))
369 if len(removed) > 0 {
370 infoLog.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5))
371 }
372
373 cleanupDone := make(chan struct{})
374 go func() {
375 defer close(cleanupDone)
376 s.muIndexDir.Global(func() {
377 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging)
378 })
379 }()
380
381 repos.IterateIndexOptions(s.queue.AddOrUpdate)
382
383 // IterateIndexOptions will only iterate over repositories that have
384 // changed since we last called list. However, we want to add all IDs
385 // back onto the queue just to check that what is on disk is still
386 // correct. This will use the last IndexOptions we stored in the
387 // queue. The repositories not on the queue (missing) need a forced
388 // fetch of IndexOptions.
389 missing := s.queue.Bump(repos.IDs)
390 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...)
391
392 setShardsCounter(s.IndexDir)
393
394 <-cleanupDone
395 }
396 }()
397
398 go func() {
399 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) {
400 if s.shardMerging {
401 s.vacuum()
402 }
403 }
404 }()
405
406 go func() {
407 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) {
408 if s.shardMerging {
409 s.doMerge()
410 }
411 }
412 }()
413
414 for i := 0; i < s.IndexConcurrency; i++ {
415 go s.processQueue()
416 }
417
418 // block forever
419 select {}
420}
421
422// formatList returns a comma-separated list of the first min(len(v), m) items.
423func formatListUint32(v []uint32, m int) string {
424 if len(v) < m {
425 m = len(v)
426 }
427
428 sb := strings.Builder{}
429 for i := 0; i < m; i++ {
430 fmt.Fprintf(&sb, "%d, ", v[i])
431 }
432
433 if len(v) > m {
434 sb.WriteString("...")
435 }
436
437 return strings.TrimRight(sb.String(), ", ")
438}
439
440func (s *Server) processQueue() {
441 for {
442 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil {
443 time.Sleep(time.Second)
444 continue
445 }
446
447 item, ok := s.queue.Pop()
448 if !ok {
449 time.Sleep(time.Second)
450 continue
451 }
452
453 opts := item.Opts
454 args := s.indexArgs(opts)
455
456 ran := s.muIndexDir.With(opts.Name, func() {
457 // only record time taken once we hold the lock. This avoids us
458 // recording time taken while merging/cleanup runs.
459 start := time.Now()
460
461 state, err := s.Index(args)
462
463 elapsed := time.Since(start)
464 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds())
465
466 indexDelay := time.Since(item.DateAddedToQueue)
467 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds())
468
469 if err != nil {
470 errorLog.Printf("error indexing %s: %s", args.String(), err)
471 }
472
473 switch state {
474 case indexStateSuccess:
475 var branches []string
476 for _, b := range args.Branches {
477 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
478 }
479 s.logger.Info("updated index",
480 sglog.Int("tenant", args.TenantID),
481 sglog.String("repo", args.Name),
482 sglog.Uint32("id", args.RepoID),
483 sglog.Strings("branches", branches),
484 sglog.Duration("duration", elapsed),
485 sglog.Duration("index_delay", indexDelay),
486 )
487 case indexStateSuccessMeta:
488 infoLog.Printf("updated meta %s in %v", args.String(), elapsed)
489 }
490 s.queue.SetIndexed(opts, state)
491 })
492
493 if !ran {
494 // Someone else is processing the repository. We can just skip this job
495 // since the repository will be added back to the queue and we will
496 // converge to the correct behaviour.
497 debugLog.Printf("index job for repository already running: %s", args)
498 continue
499 }
500 }
501}
502
503// repoNameForMetric returns a normalized version of the given repository name that is
504// suitable for use with Prometheus metrics.
505func repoNameForMetric(repo string) string {
506 // Check to see if we want to be able to capture separate indexing metrics for this repository.
507 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable.
508 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok {
509 return repo
510 }
511
512 return ""
513}
514
515func batched(slice []uint32, size int) <-chan []uint32 {
516 c := make(chan []uint32)
517 go func() {
518 for len(slice) > 0 {
519 if size > len(slice) {
520 size = len(slice)
521 }
522 c <- slice[:size]
523 slice = slice[size:]
524 }
525 close(c)
526 }()
527 return c
528}
529
530// jitterTicker returns a ticker which ticks with a jitter. Each tick is
531// uniformly selected from the range (d/2, d + d/2). It will tick on creation.
532//
533// sig is a list of signals which also cause the ticker to fire. This is a
534// convenience to allow manually triggering of the ticker.
535func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} {
536 ticker := make(chan struct{})
537
538 go func() {
539 for {
540 ticker <- struct{}{}
541 ns := int64(d)
542 jitter := rand.Int63n(ns)
543 time.Sleep(time.Duration(ns/2 + jitter))
544 }
545 }()
546
547 go func() {
548 if len(sig) == 0 {
549 return
550 }
551
552 c := make(chan os.Signal, 1)
553 signal.Notify(c, sig...)
554 for range c {
555 ticker <- struct{}{}
556 }
557 }()
558
559 return ticker
560}
561
562// Index starts an index job for repo name at commit.
563func (s *Server) Index(args *indexArgs) (state indexState, err error) {
564 tr := trace.New("index", args.Name)
565 tr.SetMaxEvents(30) // Ensure we capture all indexing events
566
567 defer func() {
568 if err != nil {
569 tr.SetError()
570 tr.LazyPrintf("error: %v", err)
571 state = indexStateFail
572 metricFailingTotal.Inc()
573 }
574 tr.LazyPrintf("state: %s", state)
575 tr.Finish()
576 }()
577
578 // Sourcegraph should always provide a tenant ID.
579 if args.TenantID < 1 {
580 return indexStateFail, tenant.ErrMissingTenant
581 }
582
583 tr.LazyPrintf("branches: %v", args.Branches)
584
585 if len(args.Branches) == 0 {
586 return indexStateEmpty, createEmptyShard(args)
587 }
588
589 repositoryName := args.Name
590 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok {
591 tr.LazyPrintf("marking this repository for delta build")
592 args.UseDelta = true
593 }
594
595 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold
596
597 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok {
598 tr.LazyPrintf("skipping symbols calculation")
599 args.Symbols = false
600 }
601
602 reason := "forced"
603
604 if args.Incremental {
605 bo := args.BuildOptions()
606 bo.SetDefaults()
607 incrementalState, fn := bo.IndexState()
608 reason = string(incrementalState)
609 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc()
610
611 switch incrementalState {
612 case index.IndexStateEqual:
613 debugLog.Printf("%s index already up to date. Shard=%s", args.String(), fn)
614 return indexStateNoop, nil
615
616 case index.IndexStateMeta:
617 infoLog.Printf("updating index.meta %s", args.String())
618
619 // TODO(stefan) handle mergeMeta for tenant id.
620 if err := mergeMeta(bo); err != nil {
621 errorLog.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
622 } else {
623 return indexStateSuccessMeta, nil
624 }
625
626 case index.IndexStateCorrupt:
627 infoLog.Printf("falling back to full update: corrupt index: %s", args.String())
628 }
629 }
630
631 infoLog.Printf("updating index %s reason=%s", args.String(), reason)
632
633 metricIndexingTotal.Inc()
634 c := gitIndexConfig{
635 runCmd: func(cmd *exec.Cmd) error {
636 return s.loggedRun(tr, cmd)
637 },
638
639 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) {
640 return args.BuildOptions().FindRepositoryMetadata()
641 },
642 timeout: s.timeout,
643 }
644
645 err = gitIndex(c, args, s.Sourcegraph, s.logger)
646 if err != nil {
647 return indexStateFail, err
648 }
649
650 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil {
651 s.logger.Error("failed to update index status",
652 sglog.String("repo", args.Name),
653 sglog.Uint32("id", args.RepoID),
654 sglogBranches("branches", args.Branches),
655 sglog.Error(err),
656 )
657 }
658
659 return indexStateSuccess, nil
660}
661
662// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so
663// it can update the zoekt_repos table.
664func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error {
665 // We need to read from disk for IndexTime.
666 _, metadata, ok, err := c.findRepositoryMetadata(args)
667 if err != nil {
668 return fmt.Errorf("failed to read metadata for new/updated index: %w", err)
669 }
670 if !ok {
671 return errors.New("failed to find metadata for new/updated index")
672 }
673
674 status := []indexStatus{{
675 RepoID: args.RepoID,
676 Branches: args.Branches,
677 IndexTimeUnix: metadata.IndexTime.Unix(),
678 }}
679 if err := sg.UpdateIndexStatus(status); err != nil {
680 return fmt.Errorf("failed to update sourcegraph with status: %w", err)
681 }
682
683 return nil
684}
685
686func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
687 ss := make([]string, len(branches))
688 for i, b := range branches {
689 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version)
690 }
691 return sglog.Strings(key, ss)
692}
693
694func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
695 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
696 return &indexArgs{
697 IndexOptions: opts,
698 IndexDir: s.IndexDir,
699 Parallelism: parallelism,
700 Incremental: true,
701 FileLimit: MaxFileSize,
702 ShardMerging: s.shardMerging,
703 }
704}
705
706// parallelism consults both the server flags and index options to determine the number
707// of shards to index in parallel. If the CPUCount index option is provided, it always
708// overrides the server flag.
709func (s *Server) parallelism(opts IndexOptions, maxProcs int) int {
710 var parallelism int
711 if opts.ShardConcurrency > 0 {
712 parallelism = int(opts.ShardConcurrency)
713 } else {
714 parallelism = s.CPUCount
715 }
716
717 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs
718 if parallelism > 4*maxProcs {
719 parallelism = 4 * maxProcs
720 }
721
722 // If index concurrency is set, then divide the parallelism by the number of
723 // repos we're indexing in parallel
724 if s.IndexConcurrency > 1 {
725 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency)))
726 }
727
728 return parallelism
729}
730
731func createEmptyShard(args *indexArgs) error {
732 bo := args.BuildOptions()
733 bo.SetDefaults()
734 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}
735
736 if args.Incremental && bo.IncrementalSkipIndexing() {
737 return nil
738 }
739
740 builder, err := index.NewBuilder(*bo)
741 if err != nil {
742 return err
743 }
744 return builder.Finish()
745}
746
747// addDebugHandlers adds handlers specific to indexserver.
748func (s *Server) addDebugHandlers(mux *http.ServeMux) {
749 // Sourcegraph's site admin view requires indexserver to serve it's admin view
750 // on "/".
751 mux.Handle("/", http.HandlerFunc(s.handleRoot))
752
753 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex))
754 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
755 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
756 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
757 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue))
758 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost))
759}
760
761func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) {
762 if r.Method != "GET" {
763 w.Header().Set("Allow", "GET")
764 w.WriteHeader(http.StatusMethodNotAllowed)
765 return
766 }
767
768 response := struct {
769 Hostname string
770 }{
771 Hostname: s.hostname,
772 }
773
774 b, err := json.Marshal(response)
775 if err != nil {
776 http.Error(w, err.Error(), http.StatusInternalServerError)
777 return
778 }
779
780 w.Header().Set("Content-Type", "application/json; charset=utf-8")
781 w.Write(b)
782}
783
784var rootTmpl = template.Must(template.New("name").Parse(`
785<html>
786 <body>
787 <a href="debug">Debug</a><br />
788 <a href="debug/requests">Traces</a><br />
789 {{.IndexMsg}}<br />
790 <br />
791 <h3>Reindex</h3>
792 {{if .Repos}}
793 <a href="?show_repos=false">hide repos</a><br />
794 <table style="margin-top: 20px">
795 <th style="text-align:left">Name</th>
796 <th style="text-align:left">ID (click to reindex)</th>
797 {{range .Repos}}
798 <tr>
799 <td>{{.Name}}</td>
800 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id>
801 </tr>
802 {{end}}
803 </table>
804 {{else}}
805 <a href="?show_repos=true">show repos</a><br />
806 {{end}}
807 </body>
808</html>
809`))
810
811func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
812 if r.Method != "GET" {
813 w.Header().Set("Allow", "GET")
814 w.WriteHeader(http.StatusMethodNotAllowed)
815 return
816 }
817
818 values := r.URL.Query()
819
820 // ?id=
821 indexMsg := ""
822 if v := values.Get("id"); v != "" {
823 id, err := strconv.Atoi(v)
824 if err != nil {
825 http.Error(w, err.Error(), http.StatusBadRequest)
826 return
827 }
828 indexMsg, _ = s.forceIndex(uint32(id))
829 }
830
831 // ?show_repos=
832 showRepos := false
833 if v := values.Get("show_repos"); v != "" {
834 showRepos, _ = strconv.ParseBool(v)
835 }
836
837 type Repo struct {
838 ID uint32
839 Name string
840 }
841 var data struct {
842 Repos []Repo
843 IndexMsg string
844 }
845
846 data.IndexMsg = indexMsg
847
848 if showRepos {
849 s.queue.Iterate(func(opts *IndexOptions) {
850 data.Repos = append(data.Repos, Repo{
851 ID: opts.RepoID,
852 Name: opts.Name,
853 })
854 })
855 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name })
856 }
857
858 _ = rootTmpl.Execute(w, data)
859}
860
861// handleReindex triggers a reindex asynocronously. If a reindex was triggered
862// the request returns with status 202. The caller can infer the new state of
863// the index by calling List.
864func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) {
865 if r.Method != http.MethodPost {
866 w.Header().Set("Allow", http.MethodPost)
867 w.WriteHeader(http.StatusMethodNotAllowed)
868 return
869 }
870
871 err := r.ParseForm()
872 if err != nil {
873 http.Error(w, err.Error(), http.StatusBadRequest)
874 return
875 }
876
877 id, err := strconv.Atoi(r.Form.Get("repo"))
878 if err != nil {
879 http.Error(w, err.Error(), http.StatusBadRequest)
880 return
881 }
882
883 go func() { s.forceIndex(uint32(id)) }()
884
885 // 202 Accepted
886 w.WriteHeader(http.StatusAccepted)
887}
888
889func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
890 withIndexed := true
891 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil {
892 withIndexed = b
893 }
894
895 var indexed []uint32
896 if withIndexed {
897 indexed = listIndexed(s.IndexDir)
898 }
899
900 repos, err := s.Sourcegraph.List(r.Context(), indexed)
901 if err != nil {
902 http.Error(w, err.Error(), http.StatusInternalServerError)
903 return
904 }
905
906 bw := bytes.Buffer{}
907
908 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
909
910 _, err = fmt.Fprintf(tw, "ID\tName\n")
911 if err != nil {
912 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
913 return
914 }
915
916 s.queue.mu.Lock()
917 name := ""
918 for _, id := range repos.IDs {
919 if item := s.queue.get(id); item != nil {
920 name = item.opts.Name
921 } else {
922 name = ""
923 }
924 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
925 if err != nil {
926 debugLog.Printf("handleDebugList: %s\n", err.Error())
927 }
928 }
929 s.queue.mu.Unlock()
930
931 if err != nil {
932 http.Error(w, err.Error(), http.StatusInternalServerError)
933 return
934 }
935
936 err = tw.Flush()
937 if err != nil {
938 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
939 return
940 }
941
942 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
943
944 if _, err := io.Copy(w, &bw); err != nil {
945 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
946 return
947 }
948}
949
950// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
951// can run this command during periods of low usage (evenings, weekends) to
952// trigger an initial merge run. In the steady-state, merges happen rarely, even
953// on busy instances, and users can rely on automatic merging instead.
954func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {
955 // A merge operation can take very long, depending on the number merges and the
956 // target size of the compound shards. We run the merge in the background and
957 // return immediately to the user.
958 //
959 // We track the status of the merge with metricShardMergingRunning.
960 go func() {
961 s.doMerge()
962 }()
963 _, _ = w.Write([]byte("merging enqueued\n"))
964}
965
966func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
967 indexed := listIndexed(s.IndexDir)
968
969 bw := bytes.Buffer{}
970
971 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0)
972
973 _, err := fmt.Fprintf(tw, "ID\tName\n")
974 if err != nil {
975 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError)
976 return
977 }
978
979 s.queue.mu.Lock()
980 name := ""
981 for _, id := range indexed {
982 if item := s.queue.get(id); item != nil {
983 name = item.opts.Name
984 } else {
985 name = ""
986 }
987 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name)
988 if err != nil {
989 debugLog.Printf("handleDebugIndexed: %s\n", err.Error())
990 }
991 }
992 s.queue.mu.Unlock()
993
994 if err != nil {
995 http.Error(w, err.Error(), http.StatusInternalServerError)
996 return
997 }
998
999 err = tw.Flush()
1000 if err != nil {
1001 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError)
1002 return
1003 }
1004
1005 w.Header().Set("Content-Length", strconv.Itoa(bw.Len()))
1006
1007 if _, err := io.Copy(w, &bw); err != nil {
1008 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError)
1009 return
1010 }
1011}
1012
1013// forceIndex will run the index job for repo name now. It will return always
1014// return a string explaining what it did, even if it failed.
1015func (s *Server) forceIndex(id uint32) (string, error) {
1016 var opts IndexOptions
1017 var err error
1018 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) {
1019 opts = o
1020 }, func(_ uint32, e error) {
1021 err = e
1022 }, id)
1023 if err != nil {
1024 return fmt.Sprintf("Indexing %d failed: %v", id, err), err
1025 }
1026
1027 args := s.indexArgs(opts)
1028 args.Incremental = false // force re-index
1029
1030 var state indexState
1031 ran := s.muIndexDir.With(opts.Name, func() {
1032 state, err = s.Index(args)
1033 })
1034 if !ran {
1035 return fmt.Sprintf("index job for repository already running: %s", args), nil
1036 }
1037 if err != nil {
1038 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err
1039 }
1040 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil
1041}
1042
1043func listIndexed(indexDir string) []uint32 {
1044 index := getShards(indexDir)
1045 metricNumIndexed.Set(float64(len(index)))
1046 repoIDs := make([]uint32, 0, len(index))
1047 for id := range index {
1048 repoIDs = append(repoIDs, id)
1049 }
1050 sort.Slice(repoIDs, func(i, j int) bool {
1051 return repoIDs[i] < repoIDs[j]
1052 })
1053 return repoIDs
1054}
1055
1056// setupTmpDir sets up a temporary directory on the same volume as the
1057// indexes.
1058//
1059// If main is true we will delete older temp directories left around. main is
1060// false when this is a debug command.
1061func setupTmpDir(logger sglog.Logger, main bool, index string) error {
1062 // change the target tmp directory depending on if it's our main daemon or a
1063 // debug sub command.
1064 dir := ".indexserver.debug.tmp"
1065 if main {
1066 dir = ".indexserver.tmp"
1067 }
1068
1069 tmpRoot := filepath.Join(index, dir)
1070
1071 if main {
1072 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot))
1073 err := os.RemoveAll(tmpRoot)
1074 if err != nil {
1075 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err))
1076 }
1077 }
1078
1079 if err := os.MkdirAll(tmpRoot, 0o755); err != nil {
1080 return err
1081 }
1082
1083 return os.Setenv("TMPDIR", tmpRoot)
1084}
1085
1086func printMetaData(fn string) error {
1087 repo, indexMeta, err := index.ReadMetadataPath(fn)
1088 if err != nil {
1089 return err
1090 }
1091
1092 err = json.NewEncoder(os.Stdout).Encode(indexMeta)
1093 if err != nil {
1094 return err
1095 }
1096
1097 err = json.NewEncoder(os.Stdout).Encode(repo)
1098 if err != nil {
1099 return err
1100 }
1101 return nil
1102}
1103
1104func printShardStats(fn string) error {
1105 f, err := os.Open(fn)
1106 if err != nil {
1107 return err
1108 }
1109
1110 iFile, err := index.NewIndexFile(f)
1111 if err != nil {
1112 return err
1113 }
1114
1115 return index.PrintNgramStats(iFile)
1116}
1117
1118func srcLogLevelIsDebug() bool {
1119 lvl := os.Getenv(sglog.EnvLogLevel)
1120 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
1121}
1122
1123func getEnvWithDefaultBool(k string, defaultVal bool) bool {
1124 v := os.Getenv(k)
1125 if v == "" {
1126 return defaultVal
1127 }
1128 b, err := strconv.ParseBool(v)
1129 if err != nil {
1130 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1131 }
1132 return b
1133}
1134
1135func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
1136 v := os.Getenv(k)
1137 if v == "" {
1138 return defaultVal
1139 }
1140 i, err := strconv.ParseInt(v, 10, 64)
1141 if err != nil {
1142 log.Fatalf("error parsing ENV %s to int64: %s", k, err)
1143 }
1144 return i
1145}
1146
1147func getEnvWithDefaultInt(k string, defaultVal int) int {
1148 v := os.Getenv(k)
1149 if v == "" {
1150 return defaultVal
1151 }
1152 i, err := strconv.Atoi(k)
1153 if err != nil {
1154 log.Fatalf("error parsing ENV %s to int: %s", k, err)
1155 }
1156 return i
1157}
1158
1159func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 {
1160 v := os.Getenv(k)
1161 if v == "" {
1162 return defaultVal
1163 }
1164 i, err := strconv.ParseUint(v, 10, 64)
1165 if err != nil {
1166 log.Fatalf("error parsing ENV %s to uint64: %s", k, err)
1167 }
1168 return i
1169}
1170
1171func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 {
1172 v := os.Getenv(k)
1173 if v == "" {
1174 return defaultVal
1175 }
1176 f, err := strconv.ParseFloat(v, 64)
1177 if err != nil {
1178 log.Fatalf("error parsing ENV %s to float64: %s", k, err)
1179 }
1180 return f
1181}
1182
1183func getEnvWithDefaultString(k string, defaultVal string) string {
1184 v := os.Getenv(k)
1185 if v == "" {
1186 return defaultVal
1187 }
1188 return v
1189}
1190
1191func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration {
1192 v := os.Getenv(k)
1193 if v == "" {
1194 return defaultVal
1195 }
1196
1197 d, err := time.ParseDuration(v)
1198 if err != nil {
1199 log.Fatalf("error parsing ENV %s to duration: %s", k, err)
1200 }
1201 return d
1202}
1203
1204func getEnvWithDefaultEmptySet(k string) map[string]struct{} {
1205 set := map[string]struct{}{}
1206 for _, v := range strings.Split(os.Getenv(k), ",") {
1207 v = strings.TrimSpace(v)
1208 if v != "" {
1209 set[v] = struct{}{}
1210 }
1211 }
1212 return set
1213}
1214
1215func joinStringSet(set map[string]struct{}, sep string) string {
1216 var xs []string
1217 for x := range set {
1218 xs = append(xs, x)
1219 }
1220
1221 return strings.Join(xs, sep)
1222}
1223
1224func setShardsCounter(indexDir string) {
1225 fns, err := filepath.Glob(filepath.Join(indexDir, "*.zoekt"))
1226 if err != nil {
1227 errorLog.Printf("setShardsCounter: %s\n", err)
1228 return
1229 }
1230 metricNumberShards.Set(float64(len(fns)))
1231
1232 compoundFns := make([]string, 0, len(fns))
1233 for _, fn := range fns {
1234 if strings.HasPrefix(filepath.Base(fn), "compound-") {
1235 compoundFns = append(compoundFns, fn)
1236 }
1237 }
1238 metricNumberCompoundShards.Set(float64(len(fns)))
1239}
1240
1241func rootCmd() *ffcli.Command {
1242 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError)
1243 conf := rootConfig{
1244 Main: true,
1245 }
1246 conf.registerRootFlags(rootFs)
1247
1248 return &ffcli.Command{
1249 FlagSet: rootFs,
1250 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]",
1251 Subcommands: []*ffcli.Command{debugCmd()},
1252 Exec: func(ctx context.Context, args []string) error {
1253 return startServer(conf)
1254 },
1255 }
1256}
1257
1258type rootConfig struct {
1259 // Main is true if this rootConfig is for our main long running command (the
1260 // indexserver). Debug commands should not set this value. This is used to
1261 // determine if we need to run tmpfriend.
1262 Main bool
1263
1264 root string
1265 interval time.Duration
1266 index string
1267 indexConcurrency int64
1268 listen string
1269 hostname string
1270 cpuFraction float64
1271
1272 // config values related to shard merging
1273 disableShardMerging bool
1274 vacuumInterval time.Duration
1275 mergeInterval time.Duration
1276 targetSize int64
1277 minSize int64
1278 minAgeDays int
1279
1280 // config values related to backoff indexing repos with one or more consecutive failures
1281 backoffDuration time.Duration
1282 maxBackoffDuration time.Duration
1283}
1284
1285func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
1286 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
1287 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
1288 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
1289 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", index.DefaultDir), "set index directory to use")
1290 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
1291 fs.StringVar(&rc.hostname, "hostname", index.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
1292 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
1293 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
1294 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
1295
1296 // flags related to shard merging
1297 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging")
1298 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
1299 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
1300 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB")
1301 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB")
1302 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
1303}
1304
1305func startServer(conf rootConfig) error {
1306 s, err := newServer(conf)
1307 if err != nil {
1308 return err
1309 }
1310
1311 profiler.Init("zoekt-sourcegraph-indexserver")
1312 setShardsCounter(s.IndexDir)
1313
1314 if conf.listen != "" {
1315
1316 mux := http.NewServeMux()
1317 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{
1318 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"},
1319 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"},
1320 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"},
1321 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"},
1322 }...)
1323 s.addDebugHandlers(mux)
1324
1325 go func() {
1326 debugLog.Printf("serving HTTP on %s", conf.listen)
1327 log.Fatal(http.ListenAndServe(conf.listen, mux))
1328 }()
1329
1330 // Serve mux on a unix domain socket on a best-effort-basis so that
1331 // webserver can call the endpoints via the shared filesystem.
1332 //
1333 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen
1334 // on the socket due to permission errors. See
1335 // https://github.com/docker/for-mac/issues/6239
1336 go func() {
1337 serveHTTPOverSocket := func() error {
1338 socket := filepath.Join(s.IndexDir, "indexserver.sock")
1339 // We cannot bind a socket to an existing pathname.
1340 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) {
1341 return fmt.Errorf("error removing socket file: %s", socket)
1342 }
1343 // The "unix" network corresponds to stream sockets. (cf. unixgram,
1344 // unixpacket).
1345 l, err := net.Listen("unix", socket)
1346 if err != nil {
1347 return fmt.Errorf("failed to listen on socket %s: %w", socket, err)
1348 }
1349 // Indexserver (root) and webserver (Sourcegraph) run with
1350 // different users. Per default, the socket is created with
1351 // permission 755 (root root), which doesn't let webserver write to
1352 // it.
1353 //
1354 // See https://github.com/golang/go/issues/11822 for more context.
1355 if err := os.Chmod(socket, 0o777); err != nil {
1356 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err)
1357 }
1358 debugLog.Printf("serving HTTP on %s", socket)
1359 return http.Serve(l, mux)
1360 }
1361 debugLog.Print(serveHTTPOverSocket())
1362 }()
1363 }
1364
1365 oc := &ownerChecker{
1366 Path: filepath.Join(conf.index, "owner.txt"),
1367 Hostname: conf.hostname,
1368 }
1369 go oc.Run()
1370
1371 logger := sglog.Scoped("metricsRegistration")
1372 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"}
1373
1374 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index})
1375 prometheus.DefaultRegisterer.MustRegister(c)
1376
1377 s.Run()
1378 return nil
1379}
1380
1381func newServer(conf rootConfig) (*Server, error) {
1382 logger := sglog.Scoped("server")
1383
1384 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 {
1385 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0")
1386 }
1387 if conf.index == "" {
1388 return nil, fmt.Errorf("must set -index")
1389 }
1390 if conf.root == "" {
1391 return nil, fmt.Errorf("must set -sourcegraph_url")
1392 }
1393 rootURL, err := url.Parse(conf.root)
1394 if err != nil {
1395 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err)
1396 }
1397
1398 rootURL = addDefaultPort(rootURL)
1399
1400 // Tune GOMAXPROCS to match Linux container CPU quota.
1401 _, _ = maxprocs.Set()
1402
1403 // Automatically prepend our own path at the front, to minimize
1404 // required configuration.
1405 if l, err := os.Readlink("/proc/self/exe"); err == nil {
1406 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH"))
1407 }
1408
1409 if _, err := os.Stat(conf.index); err != nil {
1410 if err := os.MkdirAll(conf.index, 0o755); err != nil {
1411 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err)
1412 }
1413 }
1414
1415 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil {
1416 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err)
1417 }
1418
1419 if srcLogLevelIsDebug() {
1420 debugLog.SetOutput(os.Stderr)
1421 }
1422
1423 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST")
1424 if len(reposWithSeparateIndexingMetrics) > 0 {
1425 debugLog.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", "))
1426 }
1427
1428 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST")
1429 if len(deltaBuildRepositoriesAllowList) > 0 {
1430 debugLog.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", "))
1431 }
1432
1433 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150)
1434 if deltaShardNumberFallbackThreshold > 0 {
1435 debugLog.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold)
1436 } else {
1437 debugLog.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards")
1438 }
1439
1440 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST")
1441 if len(reposShouldSkipSymbolsCalculation) > 0 {
1442 debugLog.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", "))
1443 }
1444
1445 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout)
1446 if indexingTimeout != defaultIndexingTimeout {
1447 debugLog.Printf("using configured indexing timeout: %s", indexingTimeout)
1448 }
1449
1450 var sg Sourcegraph
1451 if rootURL.IsAbs() {
1452 var batchSize int
1453 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" {
1454 batchSize, err = strconv.Atoi(v)
1455 if err != nil {
1456 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int")
1457 }
1458 }
1459
1460 opts := []SourcegraphClientOption{
1461 WithBatchSize(batchSize),
1462 }
1463
1464 logger := sglog.Scoped("zoektConfigurationGRPCClient")
1465 client, err := dialGRPCClient(rootURL.Host, logger)
1466 if err != nil {
1467 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
1468 }
1469
1470 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...)
1471
1472 } else {
1473 sg = sourcegraphFake{
1474 RootDir: rootURL.String(),
1475 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags),
1476 }
1477 }
1478
1479 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
1480 if cpuCount < 1 {
1481 cpuCount = 1
1482 }
1483
1484 if conf.indexConcurrency < 1 {
1485 conf.indexConcurrency = 1
1486 } else if conf.indexConcurrency > int64(cpuCount) {
1487 conf.indexConcurrency = int64(cpuCount)
1488 }
1489
1490 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)
1491
1492 return &Server{
1493 logger: logger,
1494 Sourcegraph: sg,
1495 IndexDir: conf.index,
1496 IndexConcurrency: int(conf.indexConcurrency),
1497 Interval: conf.interval,
1498 CPUCount: cpuCount,
1499 queue: *q,
1500 shardMerging: !conf.disableShardMerging,
1501 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
1502 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
1503 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
1504 hostname: conf.hostname,
1505 mergeOpts: mergeOpts{
1506 vacuumInterval: conf.vacuumInterval,
1507 mergeInterval: conf.mergeInterval,
1508 targetSizeBytes: conf.targetSize * 1024 * 1024,
1509 minSizeBytes: conf.minSize * 1024 * 1024,
1510 minAgeDays: conf.minAgeDays,
1511 },
1512 timeout: indexingTimeout,
1513 }, err
1514}
1515
1516// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration
1517// for the indexed-search-configuration gRPC service.
1518//
1519// The default backoff strategy is modeled after the default settings used by
1520// retryablehttp.DefaultClient.
1521//
1522// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html):
1523// - Unavailable
1524// - Aborted
1525//
1526//go:embed default_grpc_service_configuration.json
1527var defaultGRPCServiceConfigurationJSON string
1528
1529func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor {
1530 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1531 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1532 return invoker(ctx, method, req, reply, cc, opts...)
1533 }
1534}
1535
1536func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
1537 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1538 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal")
1539 return streamer(ctx, desc, cc, method, opts...)
1540 }
1541}
1542
1543// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
1544// This can be overridden by providing custom Server/Dial options.
1545const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB
1546
1547func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) {
1548 metrics := clientMetricsOnce()
1549
1550 // If the service seems to be unavailable, this
1551 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1
1552 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s])
1553 retryOpts := []retry.CallOption{
1554 retry.WithMax(5),
1555 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)),
1556 retry.WithCodes(codes.Unavailable),
1557 }
1558
1559 opts := []grpc.DialOption{
1560 grpc.WithTransportCredentials(insecure.NewCredentials()),
1561 grpc.WithChainStreamInterceptor(
1562 metrics.StreamClientInterceptor(),
1563 messagesize.StreamClientInterceptor,
1564 internalActorStreamInterceptor(),
1565 internalerrs.LoggingStreamClientInterceptor(logger),
1566 internalerrs.PrometheusStreamClientInterceptor,
1567 retry.StreamClientInterceptor(retryOpts...),
1568 ),
1569 grpc.WithChainUnaryInterceptor(
1570 metrics.UnaryClientInterceptor(),
1571 messagesize.UnaryClientInterceptor,
1572 internalActorUnaryInterceptor(),
1573 internalerrs.LoggingUnaryClientInterceptor(logger),
1574 internalerrs.PrometheusUnaryClientInterceptor,
1575 retry.UnaryClientInterceptor(retryOpts...),
1576 ),
1577 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
1578 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
1579 }
1580
1581 opts = append(opts, additionalOpts...)
1582
1583 // Ensure that the message size options are set last, so they override any other
1584 // client-specific options that tweak the message size.
1585 //
1586 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
1587 // take precedence over everything else with a uniform size setting that's easy to reason about.
1588 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)
1589
1590 // This dialer is used to connect via gRPC to the Sourcegraph instance.
1591 // This is done lazily, so we can provide the client to use regardless of
1592 // whether we enabled gRPC or not initially.
1593 cc, err := grpc.Dial(addr, opts...)
1594 if err != nil {
1595 return nil, fmt.Errorf("dialing %q: %w", addr, err)
1596 }
1597
1598 client := proto.NewZoektConfigurationServiceClient(cc)
1599 return client, nil
1600}
1601
1602// addDefaultPort adds a default port to a URL if one is not specified.
1603//
1604// If the URL scheme is "http" and no port is specified, "80" is used.
1605// If the scheme is "https", "443" is used.
1606//
1607// The original URL is not mutated. A copy is modified and returned.
1608func addDefaultPort(original *url.URL) *url.URL {
1609 if original == nil {
1610 return nil // don't panic
1611 }
1612
1613 if !original.IsAbs() {
1614 return original // don't do anything if the URL doesn't look like a remote URL
1615 }
1616
1617 if original.Scheme == "http" && original.Port() == "" {
1618 u := cloneURL(original)
1619 u.Host = net.JoinHostPort(u.Host, "80")
1620 return u
1621 }
1622
1623 if original.Scheme == "https" && original.Port() == "" {
1624 u := cloneURL(original)
1625 u.Host = net.JoinHostPort(u.Host, "443")
1626 return u
1627 }
1628
1629 return original
1630}
1631
1632// cloneURL returns a copy of the URL. It is safe to mutate the returned URL.
1633// This is copied from net/http/clone.go
1634func cloneURL(u *url.URL) *url.URL {
1635 if u == nil {
1636 return nil
1637 }
1638 u2 := new(url.URL)
1639 *u2 = *u
1640 if u.User != nil {
1641 u2.User = new(url.Userinfo)
1642 *u2.User = *u.User
1643 }
1644 return u2
1645}
1646
1647func main() {
1648 liblog := sglog.Init(sglog.Resource{
1649 Name: "zoekt-indexserver",
1650 Version: index.Version,
1651 InstanceID: index.HostnameBestEffort(),
1652 })
1653 defer liblog.Sync()
1654
1655 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil {
1656 log.Fatal(err)
1657 }
1658}
1659
1660// getBoolFromEnvironmentVariables returns the boolean defined by the first environment
1661// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set.
1662//
1663// An error is returned of the provided environment variables fails to parse as a boolean.
1664func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) {
1665 for _, envVar := range envVarNames {
1666 v := os.Getenv(envVar)
1667 if v == "" {
1668 continue
1669 }
1670
1671 b, err := strconv.ParseBool(v)
1672 if err != nil {
1673 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err)
1674 }
1675
1676 return b, nil
1677 }
1678
1679 return defaultBool, nil
1680}