fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled 2// repositories on sourcegraph 3package main 4 5import ( 6 "bytes" 7 "context" 8 _ "embed" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "html/template" 14 "io" 15 "io/fs" 16 "log" 17 "math" 18 "math/rand" 19 "net" 20 "net/http" 21 "net/url" 22 "os" 23 "os/exec" 24 "os/signal" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strconv" 29 "strings" 30 "sync" 31 "text/tabwriter" 32 "time" 33 34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 36 "github.com/peterbourgon/ff/v3/ffcli" 37 "github.com/prometheus/client_golang/prometheus" 38 "github.com/prometheus/client_golang/prometheus/promauto" 39 sglog "github.com/sourcegraph/log" 40 "github.com/sourcegraph/mountinfo" 41 "go.uber.org/automaxprocs/maxprocs" 42 "golang.org/x/net/trace" 43 "golang.org/x/sys/unix" 44 "google.golang.org/grpc" 45 "google.golang.org/grpc/codes" 46 "google.golang.org/grpc/credentials/insecure" 47 "google.golang.org/grpc/metadata" 48 49 "github.com/sourcegraph/zoekt" 50 "github.com/sourcegraph/zoekt/build" 51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 52 "github.com/sourcegraph/zoekt/debugserver" 53 "github.com/sourcegraph/zoekt/grpc/internalerrs" 54 "github.com/sourcegraph/zoekt/grpc/messagesize" 55 "github.com/sourcegraph/zoekt/internal/profiler" 56 "github.com/sourcegraph/zoekt/internal/tenant" 57) 58 59var ( 60 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 61 Name: "resolve_revisions_seconds", 62 Help: "A histogram of latencies for resolving all repository revisions.", 63 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 64 }) 65 66 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 67 Name: "resolve_revision_seconds", 68 Help: "A histogram of latencies for resolving a repository revision.", 69 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 70 }, []string{"success"}) // success=true|false 71 72 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 73 Name: "get_index_options_total", 74 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 75 }) 76 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 77 Name: "get_index_options_error_total", 78 Help: "The total number of times we failed to get index options for a repository.", 79 }) 80 81 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 82 Name: "index_repo_seconds", 83 Help: "A histogram of latencies for indexing a repository.", 84 Buckets: prometheus.ExponentialBucketsRange( 85 (100 * time.Millisecond).Seconds(), 86 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 87 20), 88 }, []string{ 89 "state", // state is an indexState 90 "name", // name of the repository that was indexed 91 }) 92 93 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{ 94 Name: "index_indexing_delay_seconds", 95 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.", 96 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days 97 }, []string{ 98 "state", // state is an indexState 99 "name", // the name of the repository that was indexed 100 }) 101 102 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 103 Name: "index_fetch_seconds", 104 Help: "A histogram of latencies for fetching a repository.", 105 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 106 }, []string{ 107 "success", // true|false 108 "name", // the name of the repository that the commits were fetched from 109 }) 110 111 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 112 Name: "index_incremental_index_state", 113 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 114 }, []string{"state"}) // state is build.IndexState 115 116 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 117 Name: "index_num_indexed", 118 Help: "Number of indexed repos by code host", 119 }) 120 121 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 122 Name: "index_failing_total", 123 Help: "Counts failures to index (indexing activity, should be used with rate())", 124 }) 125 126 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 127 Name: "index_indexing_total", 128 Help: "Counts indexings (indexing activity, should be used with rate())", 129 }) 130 131 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 132 Name: "index_num_stopped_tracking_total", 133 Help: "Counts the number of repos we stopped tracking.", 134 }) 135 136 // clientMetricsOnce returns a singleton instance of the client metrics 137 // that are shared across all gRPC clients that this process creates. 138 // 139 // This function panics if the metrics cannot be registered with the default 140 // Prometheus registry. 141 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics { 142 clientMetrics := grpcprom.NewClientMetrics( 143 grpcprom.WithClientCounterOptions(), 144 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 145 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 146 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 147 ) 148 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 149 return clientMetrics 150 }) 151) 152 153// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 154// NOTE: if you change this, you must also update gitIndex to use the same value when fetching the repo. 155const MaxFileSize = 1 << 20 156 157// set of repositories that we want to capture separate indexing metrics for 158var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 159 160type indexState string 161 162const ( 163 indexStateFail indexState = "fail" 164 indexStateSuccess indexState = "success" 165 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 166 indexStateNoop indexState = "noop" // We didn't need to update index 167 indexStateEmpty indexState = "empty" // index is empty (empty repo) 168) 169 170// Server is the main functionality of zoekt-sourcegraph-indexserver. It 171// exists to conveniently use all the options passed in via func main. 172type Server struct { 173 logger sglog.Logger 174 175 Sourcegraph Sourcegraph 176 BatchSize int 177 178 // IndexDir is the index directory to use. 179 IndexDir string 180 181 // IndexConcurrency is the number of repositories we index at once. 182 IndexConcurrency int 183 184 // Interval is how often we sync with Sourcegraph. 185 Interval time.Duration 186 187 // CPUCount is the number of CPUs to use for indexing shards. 188 CPUCount int 189 190 queue Queue 191 192 // muIndexDir protects the index directory from concurrent access. 193 muIndexDir indexMutex 194 195 // If true, shard merging is enabled. 196 shardMerging bool 197 198 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 199 // use delta-builds for instead of normal builds 200 deltaBuildRepositoriesAllowList map[string]struct{} 201 202 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 203 // before attempting a delta build. 204 deltaShardNumberFallbackThreshold uint64 205 206 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 207 // we skip calculating symbols metadata for during builds 208 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 209 210 hostname string 211 212 mergeOpts mergeOpts 213 214 // timeout defines how long the index server waits before killing an indexing job. 215 timeout time.Duration 216} 217 218var debug = log.New(io.Discard, "", log.LstdFlags) 219 220// our index commands should output something every 100mb they process. 221// 222// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 223// time." famous last words. A client was indexing a monorepo with 42 224// cores... 5m was not enough. 225const noOutputTimeout = 30 * time.Minute 226 227func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 228 out := &synchronizedBuffer{} 229 cmd.Stdout = out 230 cmd.Stderr = out 231 232 tr.LazyPrintf("%s", cmd.Args) 233 234 defer func() { 235 if err != nil { 236 outS := out.String() 237 tr.LazyPrintf("failed: %v", err) 238 tr.LazyPrintf("output: %s", out) 239 tr.SetError() 240 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 241 } 242 }() 243 244 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 245 246 if err := cmd.Start(); err != nil { 247 return err 248 } 249 250 errC := make(chan error) 251 go func() { 252 errC <- cmd.Wait() 253 }() 254 255 // This channel is set after we have sent sigquit. It allows us to follow up 256 // with a sigkill if the process doesn't quit after sigquit. 257 kill := make(<-chan time.Time) 258 259 lastLen := 0 260 for { 261 select { 262 case <-time.After(noOutputTimeout): 263 // Periodically check if we have had output. If not kill the process. 264 if out.Len() != lastLen { 265 lastLen = out.Len() 266 log.Printf("still running %s", cmd.Args) 267 } else { 268 // Send quit (C-\) first so we get a stack dump. 269 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 270 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 271 log.Println("quit failed:", err) 272 } 273 274 // send sigkill if still running in 10s 275 kill = time.After(10 * time.Second) 276 } 277 278 case <-kill: 279 log.Printf("still running, killing %s", cmd.Args) 280 if err := cmd.Process.Kill(); err != nil { 281 log.Println("kill failed:", err) 282 } 283 284 case err := <-errC: 285 if err != nil { 286 return err 287 } 288 289 tr.LazyPrintf("success") 290 return nil 291 } 292 } 293} 294 295// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 296// monitor the buffer while it is being written to. 297type synchronizedBuffer struct { 298 mu sync.Mutex 299 b bytes.Buffer 300} 301 302func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 303 sb.mu.Lock() 304 defer sb.mu.Unlock() 305 return sb.b.Write(p) 306} 307 308func (sb *synchronizedBuffer) Len() int { 309 sb.mu.Lock() 310 defer sb.mu.Unlock() 311 return sb.b.Len() 312} 313 314func (sb *synchronizedBuffer) String() string { 315 sb.mu.Lock() 316 defer sb.mu.Unlock() 317 return sb.b.String() 318} 319 320// pauseFileName if present in IndexDir will stop index jobs from 321// running. This is to make it possible to experiment with the content of the 322// IndexDir without the indexserver writing to it. 323const pauseFileName = "PAUSE" 324 325// Run the sync loop. This blocks forever. 326func (s *Server) Run() { 327 removeIncompleteShards(s.IndexDir) 328 329 // Start a goroutine which updates the queue with commits to index. 330 go func() { 331 // We update the list of indexed repos every Interval. To speed up manual 332 // testing we also listen for SIGUSR1 to trigger updates. 333 // 334 // "pkill -SIGUSR1 zoekt-sourcegra" 335 for range jitterTicker(s.Interval, unix.SIGUSR1) { 336 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 337 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 338 continue 339 } 340 341 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 342 if err != nil { 343 log.Printf("error listing repos: %s", err) 344 continue 345 } 346 347 debug.Printf("updating index queue with %d repositories", len(repos.IDs)) 348 349 // Stop indexing repos we don't need to track anymore 350 removed := s.queue.MaybeRemoveMissing(repos.IDs) 351 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 352 if len(removed) > 0 { 353 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 354 } 355 356 cleanupDone := make(chan struct{}) 357 go func() { 358 defer close(cleanupDone) 359 s.muIndexDir.Global(func() { 360 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 361 }) 362 }() 363 364 repos.IterateIndexOptions(s.queue.AddOrUpdate) 365 366 // IterateIndexOptions will only iterate over repositories that have 367 // changed since we last called list. However, we want to add all IDs 368 // back onto the queue just to check that what is on disk is still 369 // correct. This will use the last IndexOptions we stored in the 370 // queue. The repositories not on the queue (missing) need a forced 371 // fetch of IndexOptions. 372 missing := s.queue.Bump(repos.IDs) 373 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 374 375 setCompoundShardCounter(s.IndexDir) 376 377 <-cleanupDone 378 } 379 }() 380 381 go func() { 382 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 383 if s.shardMerging { 384 s.vacuum() 385 } 386 } 387 }() 388 389 go func() { 390 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 391 if s.shardMerging { 392 s.doMerge() 393 } 394 } 395 }() 396 397 for i := 0; i < s.IndexConcurrency; i++ { 398 go s.processQueue() 399 } 400 401 // block forever 402 select {} 403} 404 405// formatList returns a comma-separated list of the first min(len(v), m) items. 406func formatListUint32(v []uint32, m int) string { 407 if len(v) < m { 408 m = len(v) 409 } 410 411 sb := strings.Builder{} 412 for i := 0; i < m; i++ { 413 fmt.Fprintf(&sb, "%d, ", v[i]) 414 } 415 416 if len(v) > m { 417 sb.WriteString("...") 418 } 419 420 return strings.TrimRight(sb.String(), ", ") 421} 422 423func (s *Server) processQueue() { 424 for { 425 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 426 time.Sleep(time.Second) 427 continue 428 } 429 430 item, ok := s.queue.Pop() 431 if !ok { 432 time.Sleep(time.Second) 433 continue 434 } 435 436 opts := item.Opts 437 args := s.indexArgs(opts) 438 439 ran := s.muIndexDir.With(opts.Name, func() { 440 // only record time taken once we hold the lock. This avoids us 441 // recording time taken while merging/cleanup runs. 442 start := time.Now() 443 444 state, err := s.Index(args) 445 446 elapsed := time.Since(start) 447 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 448 449 indexDelay := time.Since(item.DateAddedToQueue) 450 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds()) 451 452 if err != nil { 453 log.Printf("error indexing %s: %s", args.String(), err) 454 } 455 456 switch state { 457 case indexStateSuccess: 458 var branches []string 459 for _, b := range args.Branches { 460 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 461 } 462 s.logger.Info("updated index", 463 sglog.Int("tenant", args.TenantID), 464 sglog.String("repo", args.Name), 465 sglog.Uint32("id", args.RepoID), 466 sglog.Strings("branches", branches), 467 sglog.Duration("duration", elapsed), 468 sglog.Duration("index_delay", indexDelay), 469 ) 470 case indexStateSuccessMeta: 471 log.Printf("updated meta %s in %v", args.String(), elapsed) 472 } 473 s.queue.SetIndexed(opts, state) 474 }) 475 476 if !ran { 477 // Someone else is processing the repository. We can just skip this job 478 // since the repository will be added back to the queue and we will 479 // converge to the correct behaviour. 480 debug.Printf("index job for repository already running: %s", args) 481 continue 482 } 483 } 484} 485 486// repoNameForMetric returns a normalized version of the given repository name that is 487// suitable for use with Prometheus metrics. 488func repoNameForMetric(repo string) string { 489 // Check to see if we want to be able to capture separate indexing metrics for this repository. 490 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 491 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 492 return repo 493 } 494 495 return "" 496} 497 498func batched(slice []uint32, size int) <-chan []uint32 { 499 c := make(chan []uint32) 500 go func() { 501 for len(slice) > 0 { 502 if size > len(slice) { 503 size = len(slice) 504 } 505 c <- slice[:size] 506 slice = slice[size:] 507 } 508 close(c) 509 }() 510 return c 511} 512 513// jitterTicker returns a ticker which ticks with a jitter. Each tick is 514// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 515// 516// sig is a list of signals which also cause the ticker to fire. This is a 517// convenience to allow manually triggering of the ticker. 518func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 519 ticker := make(chan struct{}) 520 521 go func() { 522 for { 523 ticker <- struct{}{} 524 ns := int64(d) 525 jitter := rand.Int63n(ns) 526 time.Sleep(time.Duration(ns/2 + jitter)) 527 } 528 }() 529 530 go func() { 531 if len(sig) == 0 { 532 return 533 } 534 535 c := make(chan os.Signal, 1) 536 signal.Notify(c, sig...) 537 for range c { 538 ticker <- struct{}{} 539 } 540 }() 541 542 return ticker 543} 544 545// Index starts an index job for repo name at commit. 546func (s *Server) Index(args *indexArgs) (state indexState, err error) { 547 tr := trace.New("index", args.Name) 548 tr.SetMaxEvents(30) // Ensure we capture all indexing events 549 550 defer func() { 551 if err != nil { 552 tr.SetError() 553 tr.LazyPrintf("error: %v", err) 554 state = indexStateFail 555 metricFailingTotal.Inc() 556 } 557 tr.LazyPrintf("state: %s", state) 558 tr.Finish() 559 }() 560 561 // Sourcegraph should always provide a tenant ID. 562 if args.TenantID < 1 { 563 return indexStateFail, tenant.ErrMissingTenant 564 } 565 566 tr.LazyPrintf("branches: %v", args.Branches) 567 568 if len(args.Branches) == 0 { 569 return indexStateEmpty, createEmptyShard(args) 570 } 571 572 repositoryName := args.Name 573 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 574 tr.LazyPrintf("marking this repository for delta build") 575 args.UseDelta = true 576 } 577 578 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 579 580 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 581 tr.LazyPrintf("skipping symbols calculation") 582 args.Symbols = false 583 } 584 585 reason := "forced" 586 587 if args.Incremental { 588 bo := args.BuildOptions() 589 bo.SetDefaults() 590 incrementalState, fn := bo.IndexState() 591 reason = string(incrementalState) 592 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 593 594 switch incrementalState { 595 case build.IndexStateEqual: 596 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn) 597 return indexStateNoop, nil 598 599 case build.IndexStateMeta: 600 log.Printf("updating index.meta %s", args.String()) 601 602 // TODO(stefan) handle mergeMeta for tenant id. 603 if err := mergeMeta(bo); err != nil { 604 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 605 } else { 606 return indexStateSuccessMeta, nil 607 } 608 609 case build.IndexStateCorrupt: 610 log.Printf("falling back to full update: corrupt index: %s", args.String()) 611 } 612 } 613 614 log.Printf("updating index %s reason=%s", args.String(), reason) 615 616 metricIndexingTotal.Inc() 617 c := gitIndexConfig{ 618 runCmd: func(cmd *exec.Cmd) error { 619 return s.loggedRun(tr, cmd) 620 }, 621 622 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 623 return args.BuildOptions().FindRepositoryMetadata() 624 }, 625 timeout: s.timeout, 626 } 627 628 err = gitIndex(c, args, s.Sourcegraph, s.logger) 629 if err != nil { 630 return indexStateFail, err 631 } 632 633 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 634 s.logger.Error("failed to update index status", 635 sglog.String("repo", args.Name), 636 sglog.Uint32("id", args.RepoID), 637 sglogBranches("branches", args.Branches), 638 sglog.Error(err), 639 ) 640 } 641 642 return indexStateSuccess, nil 643} 644 645// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 646// it can update the zoekt_repos table. 647func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 648 // We need to read from disk for IndexTime. 649 _, metadata, ok, err := c.findRepositoryMetadata(args) 650 if err != nil { 651 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 652 } 653 if !ok { 654 return errors.New("failed to find metadata for new/updated index") 655 } 656 657 status := []indexStatus{{ 658 RepoID: args.RepoID, 659 Branches: args.Branches, 660 IndexTimeUnix: metadata.IndexTime.Unix(), 661 }} 662 if err := sg.UpdateIndexStatus(status); err != nil { 663 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 664 } 665 666 return nil 667} 668 669func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 670 ss := make([]string, len(branches)) 671 for i, b := range branches { 672 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 673 } 674 return sglog.Strings(key, ss) 675} 676 677func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 678 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0)) 679 return &indexArgs{ 680 IndexOptions: opts, 681 IndexDir: s.IndexDir, 682 Parallelism: parallelism, 683 Incremental: true, 684 FileLimit: MaxFileSize, 685 ShardMerging: s.shardMerging, 686 } 687} 688 689// parallelism consults both the server flags and index options to determine the number 690// of shards to index in parallel. If the CPUCount index option is provided, it always 691// overrides the server flag. 692func (s *Server) parallelism(opts IndexOptions, maxProcs int) int { 693 var parallelism int 694 if opts.ShardConcurrency > 0 { 695 parallelism = int(opts.ShardConcurrency) 696 } else { 697 parallelism = s.CPUCount 698 } 699 700 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs 701 if parallelism > 4*maxProcs { 702 parallelism = 4 * maxProcs 703 } 704 705 // If index concurrency is set, then divide the parallelism by the number of 706 // repos we're indexing in parallel 707 if s.IndexConcurrency > 1 { 708 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency))) 709 } 710 711 return parallelism 712} 713 714func createEmptyShard(args *indexArgs) error { 715 bo := args.BuildOptions() 716 bo.SetDefaults() 717 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 718 719 if args.Incremental && bo.IncrementalSkipIndexing() { 720 return nil 721 } 722 723 builder, err := build.NewBuilder(*bo) 724 if err != nil { 725 return err 726 } 727 return builder.Finish() 728} 729 730// addDebugHandlers adds handlers specific to indexserver. 731func (s *Server) addDebugHandlers(mux *http.ServeMux) { 732 // Sourcegraph's site admin view requires indexserver to serve it's admin view 733 // on "/". 734 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 735 736 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 737 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 738 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 739 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 740 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 741 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 742} 743 744func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 745 if r.Method != "GET" { 746 w.Header().Set("Allow", "GET") 747 w.WriteHeader(http.StatusMethodNotAllowed) 748 return 749 } 750 751 response := struct { 752 Hostname string 753 }{ 754 Hostname: s.hostname, 755 } 756 757 b, err := json.Marshal(response) 758 if err != nil { 759 http.Error(w, err.Error(), http.StatusInternalServerError) 760 return 761 } 762 763 w.Header().Set("Content-Type", "application/json; charset=utf-8") 764 w.Write(b) 765} 766 767var rootTmpl = template.Must(template.New("name").Parse(` 768<html> 769 <body> 770 <a href="debug">Debug</a><br /> 771 <a href="debug/requests">Traces</a><br /> 772 {{.IndexMsg}}<br /> 773 <br /> 774 <h3>Reindex</h3> 775 {{if .Repos}} 776 <a href="?show_repos=false">hide repos</a><br /> 777 <table style="margin-top: 20px"> 778 <th style="text-align:left">Name</th> 779 <th style="text-align:left">ID (click to reindex)</th> 780 {{range .Repos}} 781 <tr> 782 <td>{{.Name}}</td> 783 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 784 </tr> 785 {{end}} 786 </table> 787 {{else}} 788 <a href="?show_repos=true">show repos</a><br /> 789 {{end}} 790 </body> 791</html> 792`)) 793 794func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 795 if r.Method != "GET" { 796 w.Header().Set("Allow", "GET") 797 w.WriteHeader(http.StatusMethodNotAllowed) 798 return 799 } 800 801 values := r.URL.Query() 802 803 // ?id= 804 indexMsg := "" 805 if v := values.Get("id"); v != "" { 806 id, err := strconv.Atoi(v) 807 if err != nil { 808 http.Error(w, err.Error(), http.StatusBadRequest) 809 return 810 } 811 indexMsg, _ = s.forceIndex(uint32(id)) 812 } 813 814 // ?show_repos= 815 showRepos := false 816 if v := values.Get("show_repos"); v != "" { 817 showRepos, _ = strconv.ParseBool(v) 818 } 819 820 type Repo struct { 821 ID uint32 822 Name string 823 } 824 var data struct { 825 Repos []Repo 826 IndexMsg string 827 } 828 829 data.IndexMsg = indexMsg 830 831 if showRepos { 832 s.queue.Iterate(func(opts *IndexOptions) { 833 data.Repos = append(data.Repos, Repo{ 834 ID: opts.RepoID, 835 Name: opts.Name, 836 }) 837 }) 838 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 839 } 840 841 _ = rootTmpl.Execute(w, data) 842} 843 844// handleReindex triggers a reindex asynocronously. If a reindex was triggered 845// the request returns with status 202. The caller can infer the new state of 846// the index by calling List. 847func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 848 if r.Method != http.MethodPost { 849 w.Header().Set("Allow", http.MethodPost) 850 w.WriteHeader(http.StatusMethodNotAllowed) 851 return 852 } 853 854 err := r.ParseForm() 855 if err != nil { 856 http.Error(w, err.Error(), http.StatusBadRequest) 857 return 858 } 859 860 id, err := strconv.Atoi(r.Form.Get("repo")) 861 if err != nil { 862 http.Error(w, err.Error(), http.StatusBadRequest) 863 return 864 } 865 866 go func() { s.forceIndex(uint32(id)) }() 867 868 // 202 Accepted 869 w.WriteHeader(http.StatusAccepted) 870} 871 872func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 873 withIndexed := true 874 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 875 withIndexed = b 876 } 877 878 var indexed []uint32 879 if withIndexed { 880 indexed = listIndexed(s.IndexDir) 881 } 882 883 repos, err := s.Sourcegraph.List(r.Context(), indexed) 884 if err != nil { 885 http.Error(w, err.Error(), http.StatusInternalServerError) 886 return 887 } 888 889 bw := bytes.Buffer{} 890 891 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 892 893 _, err = fmt.Fprintf(tw, "ID\tName\n") 894 if err != nil { 895 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 896 return 897 } 898 899 s.queue.mu.Lock() 900 name := "" 901 for _, id := range repos.IDs { 902 if item := s.queue.get(id); item != nil { 903 name = item.opts.Name 904 } else { 905 name = "" 906 } 907 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 908 if err != nil { 909 debug.Printf("handleDebugList: %s\n", err.Error()) 910 } 911 } 912 s.queue.mu.Unlock() 913 914 if err != nil { 915 http.Error(w, err.Error(), http.StatusInternalServerError) 916 return 917 } 918 919 err = tw.Flush() 920 if err != nil { 921 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 922 return 923 } 924 925 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 926 927 if _, err := io.Copy(w, &bw); err != nil { 928 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 929 return 930 } 931} 932 933// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 934// can run this command during periods of low usage (evenings, weekends) to 935// trigger an initial merge run. In the steady-state, merges happen rarely, even 936// on busy instances, and users can rely on automatic merging instead. 937func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 938 // A merge operation can take very long, depending on the number merges and the 939 // target size of the compound shards. We run the merge in the background and 940 // return immediately to the user. 941 // 942 // We track the status of the merge with metricShardMergingRunning. 943 go func() { 944 s.doMerge() 945 }() 946 _, _ = w.Write([]byte("merging enqueued\n")) 947} 948 949func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 950 indexed := listIndexed(s.IndexDir) 951 952 bw := bytes.Buffer{} 953 954 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 955 956 _, err := fmt.Fprintf(tw, "ID\tName\n") 957 if err != nil { 958 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 959 return 960 } 961 962 s.queue.mu.Lock() 963 name := "" 964 for _, id := range indexed { 965 if item := s.queue.get(id); item != nil { 966 name = item.opts.Name 967 } else { 968 name = "" 969 } 970 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 971 if err != nil { 972 debug.Printf("handleDebugIndexed: %s\n", err.Error()) 973 } 974 } 975 s.queue.mu.Unlock() 976 977 if err != nil { 978 http.Error(w, err.Error(), http.StatusInternalServerError) 979 return 980 } 981 982 err = tw.Flush() 983 if err != nil { 984 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 985 return 986 } 987 988 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 989 990 if _, err := io.Copy(w, &bw); err != nil { 991 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 992 return 993 } 994} 995 996// forceIndex will run the index job for repo name now. It will return always 997// return a string explaining what it did, even if it failed. 998func (s *Server) forceIndex(id uint32) (string, error) { 999 var opts IndexOptions 1000 var err error 1001 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 1002 opts = o 1003 }, func(_ uint32, e error) { 1004 err = e 1005 }, id) 1006 if err != nil { 1007 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 1008 } 1009 1010 args := s.indexArgs(opts) 1011 args.Incremental = false // force re-index 1012 1013 var state indexState 1014 ran := s.muIndexDir.With(opts.Name, func() { 1015 state, err = s.Index(args) 1016 }) 1017 if !ran { 1018 return fmt.Sprintf("index job for repository already running: %s", args), nil 1019 } 1020 if err != nil { 1021 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 1022 } 1023 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 1024} 1025 1026func listIndexed(indexDir string) []uint32 { 1027 index := getShards(indexDir) 1028 metricNumIndexed.Set(float64(len(index))) 1029 repoIDs := make([]uint32, 0, len(index)) 1030 for id := range index { 1031 repoIDs = append(repoIDs, id) 1032 } 1033 sort.Slice(repoIDs, func(i, j int) bool { 1034 return repoIDs[i] < repoIDs[j] 1035 }) 1036 return repoIDs 1037} 1038 1039// setupTmpDir sets up a temporary directory on the same volume as the 1040// indexes. 1041// 1042// If main is true we will delete older temp directories left around. main is 1043// false when this is a debug command. 1044func setupTmpDir(logger sglog.Logger, main bool, index string) error { 1045 // change the target tmp directory depending on if it's our main daemon or a 1046 // debug sub command. 1047 dir := ".indexserver.debug.tmp" 1048 if main { 1049 dir = ".indexserver.tmp" 1050 } 1051 1052 tmpRoot := filepath.Join(index, dir) 1053 1054 if main { 1055 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1056 err := os.RemoveAll(tmpRoot) 1057 if err != nil { 1058 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1059 } 1060 } 1061 1062 if err := os.MkdirAll(tmpRoot, 0o755); err != nil { 1063 return err 1064 } 1065 1066 return os.Setenv("TMPDIR", tmpRoot) 1067} 1068 1069func printMetaData(fn string) error { 1070 repo, indexMeta, err := zoekt.ReadMetadataPath(fn) 1071 if err != nil { 1072 return err 1073 } 1074 1075 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1076 if err != nil { 1077 return err 1078 } 1079 1080 err = json.NewEncoder(os.Stdout).Encode(repo) 1081 if err != nil { 1082 return err 1083 } 1084 return nil 1085} 1086 1087func printShardStats(fn string) error { 1088 f, err := os.Open(fn) 1089 if err != nil { 1090 return err 1091 } 1092 1093 iFile, err := zoekt.NewIndexFile(f) 1094 if err != nil { 1095 return err 1096 } 1097 1098 return zoekt.PrintNgramStats(iFile) 1099} 1100 1101func srcLogLevelIsDebug() bool { 1102 lvl := os.Getenv(sglog.EnvLogLevel) 1103 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1104} 1105 1106func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1107 v := os.Getenv(k) 1108 if v == "" { 1109 return defaultVal 1110 } 1111 b, err := strconv.ParseBool(v) 1112 if err != nil { 1113 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1114 } 1115 return b 1116} 1117 1118func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1119 v := os.Getenv(k) 1120 if v == "" { 1121 return defaultVal 1122 } 1123 i, err := strconv.ParseInt(v, 10, 64) 1124 if err != nil { 1125 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1126 } 1127 return i 1128} 1129 1130func getEnvWithDefaultInt(k string, defaultVal int) int { 1131 v := os.Getenv(k) 1132 if v == "" { 1133 return defaultVal 1134 } 1135 i, err := strconv.Atoi(k) 1136 if err != nil { 1137 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1138 } 1139 return i 1140} 1141 1142func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1143 v := os.Getenv(k) 1144 if v == "" { 1145 return defaultVal 1146 } 1147 i, err := strconv.ParseUint(v, 10, 64) 1148 if err != nil { 1149 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1150 } 1151 return i 1152} 1153 1154func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1155 v := os.Getenv(k) 1156 if v == "" { 1157 return defaultVal 1158 } 1159 f, err := strconv.ParseFloat(v, 64) 1160 if err != nil { 1161 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1162 } 1163 return f 1164} 1165 1166func getEnvWithDefaultString(k string, defaultVal string) string { 1167 v := os.Getenv(k) 1168 if v == "" { 1169 return defaultVal 1170 } 1171 return v 1172} 1173 1174func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1175 v := os.Getenv(k) 1176 if v == "" { 1177 return defaultVal 1178 } 1179 1180 d, err := time.ParseDuration(v) 1181 if err != nil { 1182 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1183 } 1184 return d 1185} 1186 1187func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1188 set := map[string]struct{}{} 1189 for _, v := range strings.Split(os.Getenv(k), ",") { 1190 v = strings.TrimSpace(v) 1191 if v != "" { 1192 set[v] = struct{}{} 1193 } 1194 } 1195 return set 1196} 1197 1198func joinStringSet(set map[string]struct{}, sep string) string { 1199 var xs []string 1200 for x := range set { 1201 xs = append(xs, x) 1202 } 1203 1204 return strings.Join(xs, sep) 1205} 1206 1207func setCompoundShardCounter(indexDir string) { 1208 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt")) 1209 if err != nil { 1210 log.Printf("setCompoundShardCounter: %s\n", err) 1211 return 1212 } 1213 metricNumberCompoundShards.Set(float64(len(fns))) 1214} 1215 1216func rootCmd() *ffcli.Command { 1217 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1218 conf := rootConfig{ 1219 Main: true, 1220 } 1221 conf.registerRootFlags(rootFs) 1222 1223 return &ffcli.Command{ 1224 FlagSet: rootFs, 1225 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1226 Subcommands: []*ffcli.Command{debugCmd()}, 1227 Exec: func(ctx context.Context, args []string) error { 1228 return startServer(conf) 1229 }, 1230 } 1231} 1232 1233type rootConfig struct { 1234 // Main is true if this rootConfig is for our main long running command (the 1235 // indexserver). Debug commands should not set this value. This is used to 1236 // determine if we need to run tmpfriend. 1237 Main bool 1238 1239 root string 1240 interval time.Duration 1241 index string 1242 indexConcurrency int64 1243 listen string 1244 hostname string 1245 cpuFraction float64 1246 1247 // config values related to shard merging 1248 disableShardMerging bool 1249 vacuumInterval time.Duration 1250 mergeInterval time.Duration 1251 targetSize int64 1252 minSize int64 1253 minAgeDays int 1254 1255 // config values related to backoff indexing repos with one or more consecutive failures 1256 backoffDuration time.Duration 1257 maxBackoffDuration time.Duration 1258} 1259 1260func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1261 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1262 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1263 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently") 1264 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1265 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1266 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1267 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1268 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1269 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1270 1271 // flags related to shard merging 1272 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging") 1273 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1274 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1275 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB") 1276 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB") 1277 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1278} 1279 1280func startServer(conf rootConfig) error { 1281 s, err := newServer(conf) 1282 if err != nil { 1283 return err 1284 } 1285 1286 profiler.Init("zoekt-sourcegraph-indexserver") 1287 setCompoundShardCounter(s.IndexDir) 1288 1289 if conf.listen != "" { 1290 1291 mux := http.NewServeMux() 1292 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1293 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1294 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1295 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1296 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1297 }...) 1298 s.addDebugHandlers(mux) 1299 1300 go func() { 1301 debug.Printf("serving HTTP on %s", conf.listen) 1302 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1303 }() 1304 1305 // Serve mux on a unix domain socket on a best-effort-basis so that 1306 // webserver can call the endpoints via the shared filesystem. 1307 // 1308 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1309 // on the socket due to permission errors. See 1310 // https://github.com/docker/for-mac/issues/6239 1311 go func() { 1312 serveHTTPOverSocket := func() error { 1313 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1314 // We cannot bind a socket to an existing pathname. 1315 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1316 return fmt.Errorf("error removing socket file: %s", socket) 1317 } 1318 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1319 // unixpacket). 1320 l, err := net.Listen("unix", socket) 1321 if err != nil { 1322 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1323 } 1324 // Indexserver (root) and webserver (Sourcegraph) run with 1325 // different users. Per default, the socket is created with 1326 // permission 755 (root root), which doesn't let webserver write to 1327 // it. 1328 // 1329 // See https://github.com/golang/go/issues/11822 for more context. 1330 if err := os.Chmod(socket, 0o777); err != nil { 1331 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1332 } 1333 debug.Printf("serving HTTP on %s", socket) 1334 return http.Serve(l, mux) 1335 } 1336 debug.Print(serveHTTPOverSocket()) 1337 }() 1338 } 1339 1340 oc := &ownerChecker{ 1341 Path: filepath.Join(conf.index, "owner.txt"), 1342 Hostname: conf.hostname, 1343 } 1344 go oc.Run() 1345 1346 logger := sglog.Scoped("metricsRegistration") 1347 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1348 1349 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1350 prometheus.DefaultRegisterer.MustRegister(c) 1351 1352 s.Run() 1353 return nil 1354} 1355 1356func newServer(conf rootConfig) (*Server, error) { 1357 logger := sglog.Scoped("server") 1358 1359 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1360 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1361 } 1362 if conf.index == "" { 1363 return nil, fmt.Errorf("must set -index") 1364 } 1365 if conf.root == "" { 1366 return nil, fmt.Errorf("must set -sourcegraph_url") 1367 } 1368 rootURL, err := url.Parse(conf.root) 1369 if err != nil { 1370 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1371 } 1372 1373 rootURL = addDefaultPort(rootURL) 1374 1375 // Tune GOMAXPROCS to match Linux container CPU quota. 1376 _, _ = maxprocs.Set() 1377 1378 // Automatically prepend our own path at the front, to minimize 1379 // required configuration. 1380 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1381 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1382 } 1383 1384 if _, err := os.Stat(conf.index); err != nil { 1385 if err := os.MkdirAll(conf.index, 0o755); err != nil { 1386 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1387 } 1388 } 1389 1390 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1391 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1392 } 1393 1394 if srcLogLevelIsDebug() { 1395 debug = log.New(os.Stderr, "", log.LstdFlags) 1396 } 1397 1398 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1399 if len(reposWithSeparateIndexingMetrics) > 0 { 1400 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1401 } 1402 1403 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1404 if len(deltaBuildRepositoriesAllowList) > 0 { 1405 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1406 } 1407 1408 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1409 if deltaShardNumberFallbackThreshold > 0 { 1410 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1411 } else { 1412 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1413 } 1414 1415 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1416 if len(reposShouldSkipSymbolsCalculation) > 0 { 1417 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1418 } 1419 1420 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1421 if indexingTimeout != defaultIndexingTimeout { 1422 debug.Printf("using configured indexing timeout: %s", indexingTimeout) 1423 } 1424 1425 var sg Sourcegraph 1426 if rootURL.IsAbs() { 1427 var batchSize int 1428 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1429 batchSize, err = strconv.Atoi(v) 1430 if err != nil { 1431 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1432 } 1433 } 1434 1435 opts := []SourcegraphClientOption{ 1436 WithBatchSize(batchSize), 1437 } 1438 1439 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1440 client, err := dialGRPCClient(rootURL.Host, logger) 1441 if err != nil { 1442 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1443 } 1444 1445 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...) 1446 1447 } else { 1448 sg = sourcegraphFake{ 1449 RootDir: rootURL.String(), 1450 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1451 } 1452 } 1453 1454 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1455 if cpuCount < 1 { 1456 cpuCount = 1 1457 } 1458 1459 if conf.indexConcurrency < 1 { 1460 conf.indexConcurrency = 1 1461 } else if conf.indexConcurrency > int64(cpuCount) { 1462 conf.indexConcurrency = int64(cpuCount) 1463 } 1464 1465 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1466 1467 return &Server{ 1468 logger: logger, 1469 Sourcegraph: sg, 1470 IndexDir: conf.index, 1471 IndexConcurrency: int(conf.indexConcurrency), 1472 Interval: conf.interval, 1473 CPUCount: cpuCount, 1474 queue: *q, 1475 shardMerging: !conf.disableShardMerging, 1476 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1477 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1478 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1479 hostname: conf.hostname, 1480 mergeOpts: mergeOpts{ 1481 vacuumInterval: conf.vacuumInterval, 1482 mergeInterval: conf.mergeInterval, 1483 targetSizeBytes: conf.targetSize * 1024 * 1024, 1484 minSizeBytes: conf.minSize * 1024 * 1024, 1485 minAgeDays: conf.minAgeDays, 1486 }, 1487 timeout: indexingTimeout, 1488 }, err 1489} 1490 1491// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1492// for the indexed-search-configuration gRPC service. 1493// 1494// The default backoff strategy is modeled after the default settings used by 1495// retryablehttp.DefaultClient. 1496// 1497// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1498// - Unavailable 1499// - Aborted 1500// 1501//go:embed default_grpc_service_configuration.json 1502var defaultGRPCServiceConfigurationJSON string 1503 1504func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1505 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1506 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1507 return invoker(ctx, method, req, reply, cc, opts...) 1508 } 1509} 1510 1511func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1512 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1513 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1514 return streamer(ctx, desc, cc, method, opts...) 1515 } 1516} 1517 1518// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1519// This can be overridden by providing custom Server/Dial options. 1520const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1521 1522func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) { 1523 metrics := clientMetricsOnce() 1524 1525 // If the service seems to be unavailable, this 1526 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1527 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1528 retryOpts := []retry.CallOption{ 1529 retry.WithMax(5), 1530 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1531 retry.WithCodes(codes.Unavailable), 1532 } 1533 1534 opts := []grpc.DialOption{ 1535 grpc.WithTransportCredentials(insecure.NewCredentials()), 1536 grpc.WithChainStreamInterceptor( 1537 metrics.StreamClientInterceptor(), 1538 messagesize.StreamClientInterceptor, 1539 internalActorStreamInterceptor(), 1540 internalerrs.LoggingStreamClientInterceptor(logger), 1541 internalerrs.PrometheusStreamClientInterceptor, 1542 retry.StreamClientInterceptor(retryOpts...), 1543 ), 1544 grpc.WithChainUnaryInterceptor( 1545 metrics.UnaryClientInterceptor(), 1546 messagesize.UnaryClientInterceptor, 1547 internalActorUnaryInterceptor(), 1548 internalerrs.LoggingUnaryClientInterceptor(logger), 1549 internalerrs.PrometheusUnaryClientInterceptor, 1550 retry.UnaryClientInterceptor(retryOpts...), 1551 ), 1552 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1553 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1554 } 1555 1556 opts = append(opts, additionalOpts...) 1557 1558 // Ensure that the message size options are set last, so they override any other 1559 // client-specific options that tweak the message size. 1560 // 1561 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1562 // take precedence over everything else with a uniform size setting that's easy to reason about. 1563 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1564 1565 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1566 // This is done lazily, so we can provide the client to use regardless of 1567 // whether we enabled gRPC or not initially. 1568 cc, err := grpc.Dial(addr, opts...) 1569 if err != nil { 1570 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1571 } 1572 1573 client := proto.NewZoektConfigurationServiceClient(cc) 1574 return client, nil 1575} 1576 1577// addDefaultPort adds a default port to a URL if one is not specified. 1578// 1579// If the URL scheme is "http" and no port is specified, "80" is used. 1580// If the scheme is "https", "443" is used. 1581// 1582// The original URL is not mutated. A copy is modified and returned. 1583func addDefaultPort(original *url.URL) *url.URL { 1584 if original == nil { 1585 return nil // don't panic 1586 } 1587 1588 if !original.IsAbs() { 1589 return original // don't do anything if the URL doesn't look like a remote URL 1590 } 1591 1592 if original.Scheme == "http" && original.Port() == "" { 1593 u := cloneURL(original) 1594 u.Host = net.JoinHostPort(u.Host, "80") 1595 return u 1596 } 1597 1598 if original.Scheme == "https" && original.Port() == "" { 1599 u := cloneURL(original) 1600 u.Host = net.JoinHostPort(u.Host, "443") 1601 return u 1602 } 1603 1604 return original 1605} 1606 1607// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1608// This is copied from net/http/clone.go 1609func cloneURL(u *url.URL) *url.URL { 1610 if u == nil { 1611 return nil 1612 } 1613 u2 := new(url.URL) 1614 *u2 = *u 1615 if u.User != nil { 1616 u2.User = new(url.Userinfo) 1617 *u2.User = *u.User 1618 } 1619 return u2 1620} 1621 1622func main() { 1623 liblog := sglog.Init(sglog.Resource{ 1624 Name: "zoekt-indexserver", 1625 Version: zoekt.Version, 1626 InstanceID: zoekt.HostnameBestEffort(), 1627 }) 1628 defer liblog.Sync() 1629 1630 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1631 log.Fatal(err) 1632 } 1633} 1634 1635// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1636// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1637// 1638// An error is returned of the provided environment variables fails to parse as a boolean. 1639func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1640 for _, envVar := range envVarNames { 1641 v := os.Getenv(envVar) 1642 if v == "" { 1643 continue 1644 } 1645 1646 b, err := strconv.ParseBool(v) 1647 if err != nil { 1648 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1649 } 1650 1651 return b, nil 1652 } 1653 1654 return defaultBool, nil 1655}