fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled 2// repositories on sourcegraph 3package main 4 5import ( 6 "bytes" 7 "context" 8 _ "embed" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "html/template" 14 "io" 15 "io/fs" 16 "log" 17 "math" 18 "math/rand" 19 "net" 20 "net/http" 21 "net/url" 22 "os" 23 "os/exec" 24 "os/signal" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strconv" 29 "strings" 30 "sync" 31 "text/tabwriter" 32 "time" 33 34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 36 "github.com/peterbourgon/ff/v3/ffcli" 37 "github.com/prometheus/client_golang/prometheus" 38 "github.com/prometheus/client_golang/prometheus/promauto" 39 sglog "github.com/sourcegraph/log" 40 "github.com/sourcegraph/mountinfo" 41 "go.uber.org/automaxprocs/maxprocs" 42 "golang.org/x/net/trace" 43 "golang.org/x/sys/unix" 44 "google.golang.org/grpc" 45 "google.golang.org/grpc/codes" 46 "google.golang.org/grpc/credentials/insecure" 47 "google.golang.org/grpc/metadata" 48 49 "github.com/sourcegraph/zoekt" 50 "github.com/sourcegraph/zoekt/build" 51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 52 "github.com/sourcegraph/zoekt/debugserver" 53 "github.com/sourcegraph/zoekt/grpc/internalerrs" 54 "github.com/sourcegraph/zoekt/grpc/messagesize" 55 "github.com/sourcegraph/zoekt/internal/profiler" 56) 57 58var ( 59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 60 Name: "resolve_revisions_seconds", 61 Help: "A histogram of latencies for resolving all repository revisions.", 62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 63 }) 64 65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 66 Name: "resolve_revision_seconds", 67 Help: "A histogram of latencies for resolving a repository revision.", 68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 69 }, []string{"success"}) // success=true|false 70 71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 72 Name: "get_index_options_total", 73 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 74 }) 75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 76 Name: "get_index_options_error_total", 77 Help: "The total number of times we failed to get index options for a repository.", 78 }) 79 80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 81 Name: "index_repo_seconds", 82 Help: "A histogram of latencies for indexing a repository.", 83 Buckets: prometheus.ExponentialBucketsRange( 84 (100 * time.Millisecond).Seconds(), 85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 86 20), 87 }, []string{ 88 "state", // state is an indexState 89 "name", // name of the repository that was indexed 90 }) 91 92 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{ 93 Name: "index_indexing_delay_seconds", 94 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.", 95 Buckets: prometheus.ExponentialBuckets(60, 2, 12), // 1m -> ~3 days 96 }, []string{ 97 "state", // state is an indexState 98 "name", // the name of the repository that was indexed 99 }) 100 101 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 102 Name: "index_fetch_seconds", 103 Help: "A histogram of latencies for fetching a repository.", 104 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 105 }, []string{ 106 "success", // true|false 107 "name", // the name of the repository that the commits were fetched from 108 }) 109 110 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 111 Name: "index_incremental_index_state", 112 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 113 }, []string{"state"}) // state is build.IndexState 114 115 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 116 Name: "index_num_indexed", 117 Help: "Number of indexed repos by code host", 118 }) 119 120 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 121 Name: "index_failing_total", 122 Help: "Counts failures to index (indexing activity, should be used with rate())", 123 }) 124 125 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 126 Name: "index_indexing_total", 127 Help: "Counts indexings (indexing activity, should be used with rate())", 128 }) 129 130 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 131 Name: "index_num_stopped_tracking_total", 132 Help: "Counts the number of repos we stopped tracking.", 133 }) 134 135 clientMetricsOnce sync.Once 136 clientMetrics *grpcprom.ClientMetrics 137) 138 139// set of repositories that we want to capture separate indexing metrics for 140var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 141 142type indexState string 143 144const ( 145 indexStateFail indexState = "fail" 146 indexStateSuccess indexState = "success" 147 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 148 indexStateNoop indexState = "noop" // We didn't need to update index 149 indexStateEmpty indexState = "empty" // index is empty (empty repo) 150) 151 152// Server is the main functionality of zoekt-sourcegraph-indexserver. It 153// exists to conveniently use all the options passed in via func main. 154type Server struct { 155 logger sglog.Logger 156 157 Sourcegraph Sourcegraph 158 BatchSize int 159 160 // IndexDir is the index directory to use. 161 IndexDir string 162 163 // IndexConcurrency is the number of repositories we index at once. 164 IndexConcurrency int 165 166 // Interval is how often we sync with Sourcegraph. 167 Interval time.Duration 168 169 // CPUCount is the number of CPUs to use for indexing shards. 170 CPUCount int 171 172 queue Queue 173 174 // muIndexDir protects the index directory from concurrent access. 175 muIndexDir indexMutex 176 177 // If true, shard merging is enabled. 178 shardMerging bool 179 180 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 181 // use delta-builds for instead of normal builds 182 deltaBuildRepositoriesAllowList map[string]struct{} 183 184 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 185 // before attempting a delta build. 186 deltaShardNumberFallbackThreshold uint64 187 188 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 189 // we skip calculating symbols metadata for during builds 190 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 191 192 hostname string 193 194 mergeOpts mergeOpts 195 196 // timeout defines how long the index server waits before killing an indexing job. 197 timeout time.Duration 198} 199 200var debug = log.New(io.Discard, "", log.LstdFlags) 201 202// our index commands should output something every 100mb they process. 203// 204// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 205// time." famous last words. A client was indexing a monorepo with 42 206// cores... 5m was not enough. 207const noOutputTimeout = 30 * time.Minute 208 209func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 210 out := &synchronizedBuffer{} 211 cmd.Stdout = out 212 cmd.Stderr = out 213 214 tr.LazyPrintf("%s", cmd.Args) 215 216 defer func() { 217 if err != nil { 218 outS := out.String() 219 tr.LazyPrintf("failed: %v", err) 220 tr.LazyPrintf("output: %s", out) 221 tr.SetError() 222 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 223 } 224 }() 225 226 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 227 228 if err := cmd.Start(); err != nil { 229 return err 230 } 231 232 errC := make(chan error) 233 go func() { 234 errC <- cmd.Wait() 235 }() 236 237 // This channel is set after we have sent sigquit. It allows us to follow up 238 // with a sigkill if the process doesn't quit after sigquit. 239 kill := make(<-chan time.Time) 240 241 lastLen := 0 242 for { 243 select { 244 case <-time.After(noOutputTimeout): 245 // Periodically check if we have had output. If not kill the process. 246 if out.Len() != lastLen { 247 lastLen = out.Len() 248 log.Printf("still running %s", cmd.Args) 249 } else { 250 // Send quit (C-\) first so we get a stack dump. 251 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 252 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 253 log.Println("quit failed:", err) 254 } 255 256 // send sigkill if still running in 10s 257 kill = time.After(10 * time.Second) 258 } 259 260 case <-kill: 261 log.Printf("still running, killing %s", cmd.Args) 262 if err := cmd.Process.Kill(); err != nil { 263 log.Println("kill failed:", err) 264 } 265 266 case err := <-errC: 267 if err != nil { 268 return err 269 } 270 271 tr.LazyPrintf("success") 272 return nil 273 } 274 } 275} 276 277// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 278// monitor the buffer while it is being written to. 279type synchronizedBuffer struct { 280 mu sync.Mutex 281 b bytes.Buffer 282} 283 284func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 285 sb.mu.Lock() 286 defer sb.mu.Unlock() 287 return sb.b.Write(p) 288} 289 290func (sb *synchronizedBuffer) Len() int { 291 sb.mu.Lock() 292 defer sb.mu.Unlock() 293 return sb.b.Len() 294} 295 296func (sb *synchronizedBuffer) String() string { 297 sb.mu.Lock() 298 defer sb.mu.Unlock() 299 return sb.b.String() 300} 301 302// pauseFileName if present in IndexDir will stop index jobs from 303// running. This is to make it possible to experiment with the content of the 304// IndexDir without the indexserver writing to it. 305const pauseFileName = "PAUSE" 306 307// Run the sync loop. This blocks forever. 308func (s *Server) Run() { 309 removeIncompleteShards(s.IndexDir) 310 311 // Start a goroutine which updates the queue with commits to index. 312 go func() { 313 // We update the list of indexed repos every Interval. To speed up manual 314 // testing we also listen for SIGUSR1 to trigger updates. 315 // 316 // "pkill -SIGUSR1 zoekt-sourcegra" 317 for range jitterTicker(s.Interval, unix.SIGUSR1) { 318 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 319 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 320 continue 321 } 322 323 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 324 if err != nil { 325 log.Printf("error listing repos: %s", err) 326 continue 327 } 328 329 debug.Printf("updating index queue with %d repositories", len(repos.IDs)) 330 331 // Stop indexing repos we don't need to track anymore 332 removed := s.queue.MaybeRemoveMissing(repos.IDs) 333 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 334 if len(removed) > 0 { 335 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 336 } 337 338 cleanupDone := make(chan struct{}) 339 go func() { 340 defer close(cleanupDone) 341 s.muIndexDir.Global(func() { 342 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 343 }) 344 }() 345 346 repos.IterateIndexOptions(s.queue.AddOrUpdate) 347 348 // IterateIndexOptions will only iterate over repositories that have 349 // changed since we last called list. However, we want to add all IDs 350 // back onto the queue just to check that what is on disk is still 351 // correct. This will use the last IndexOptions we stored in the 352 // queue. The repositories not on the queue (missing) need a forced 353 // fetch of IndexOptions. 354 missing := s.queue.Bump(repos.IDs) 355 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 356 357 setCompoundShardCounter(s.IndexDir) 358 359 <-cleanupDone 360 } 361 }() 362 363 go func() { 364 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 365 if s.shardMerging { 366 s.vacuum() 367 } 368 } 369 }() 370 371 go func() { 372 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 373 if s.shardMerging { 374 s.doMerge() 375 } 376 } 377 }() 378 379 for i := 0; i < s.IndexConcurrency; i++ { 380 go s.processQueue() 381 } 382 383 // block forever 384 select {} 385} 386 387// formatList returns a comma-separated list of the first min(len(v), m) items. 388func formatListUint32(v []uint32, m int) string { 389 if len(v) < m { 390 m = len(v) 391 } 392 393 sb := strings.Builder{} 394 for i := 0; i < m; i++ { 395 fmt.Fprintf(&sb, "%d, ", v[i]) 396 } 397 398 if len(v) > m { 399 sb.WriteString("...") 400 } 401 402 return strings.TrimRight(sb.String(), ", ") 403} 404 405func (s *Server) processQueue() { 406 for { 407 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 408 time.Sleep(time.Second) 409 continue 410 } 411 412 item, ok := s.queue.Pop() 413 if !ok { 414 time.Sleep(time.Second) 415 continue 416 } 417 418 opts := item.Opts 419 args := s.indexArgs(opts) 420 421 ran := s.muIndexDir.With(opts.Name, func() { 422 // only record time taken once we hold the lock. This avoids us 423 // recording time taken while merging/cleanup runs. 424 start := time.Now() 425 426 state, err := s.Index(args) 427 428 elapsed := time.Since(start) 429 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 430 431 indexDelay := time.Since(item.DateAddedToQueue) 432 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds()) 433 434 if err != nil { 435 log.Printf("error indexing %s: %s", args.String(), err) 436 } 437 438 switch state { 439 case indexStateSuccess: 440 var branches []string 441 for _, b := range args.Branches { 442 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 443 } 444 s.logger.Info("updated index", 445 sglog.String("repo", args.Name), 446 sglog.Uint32("id", args.RepoID), 447 sglog.Strings("branches", branches), 448 sglog.Duration("duration", elapsed), 449 sglog.Duration("index_delay", indexDelay), 450 ) 451 case indexStateSuccessMeta: 452 log.Printf("updated meta %s in %v", args.String(), elapsed) 453 } 454 s.queue.SetIndexed(opts, state) 455 }) 456 457 if !ran { 458 // Someone else is processing the repository. We can just skip this job 459 // since the repository will be added back to the queue and we will 460 // converge to the correct behaviour. 461 debug.Printf("index job for repository already running: %s", args) 462 continue 463 } 464 } 465} 466 467// repoNameForMetric returns a normalized version of the given repository name that is 468// suitable for use with Prometheus metrics. 469func repoNameForMetric(repo string) string { 470 // Check to see if we want to be able to capture separate indexing metrics for this repository. 471 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 472 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 473 return repo 474 } 475 476 return "" 477} 478 479func batched(slice []uint32, size int) <-chan []uint32 { 480 c := make(chan []uint32) 481 go func() { 482 for len(slice) > 0 { 483 if size > len(slice) { 484 size = len(slice) 485 } 486 c <- slice[:size] 487 slice = slice[size:] 488 } 489 close(c) 490 }() 491 return c 492} 493 494// jitterTicker returns a ticker which ticks with a jitter. Each tick is 495// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 496// 497// sig is a list of signals which also cause the ticker to fire. This is a 498// convenience to allow manually triggering of the ticker. 499func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 500 ticker := make(chan struct{}) 501 502 go func() { 503 for { 504 ticker <- struct{}{} 505 ns := int64(d) 506 jitter := rand.Int63n(ns) 507 time.Sleep(time.Duration(ns/2 + jitter)) 508 } 509 }() 510 511 go func() { 512 if len(sig) == 0 { 513 return 514 } 515 516 c := make(chan os.Signal, 1) 517 signal.Notify(c, sig...) 518 for range c { 519 ticker <- struct{}{} 520 } 521 }() 522 523 return ticker 524} 525 526// Index starts an index job for repo name at commit. 527func (s *Server) Index(args *indexArgs) (state indexState, err error) { 528 tr := trace.New("index", args.Name) 529 tr.SetMaxEvents(30) // Ensure we capture all indexing events 530 531 defer func() { 532 if err != nil { 533 tr.SetError() 534 tr.LazyPrintf("error: %v", err) 535 state = indexStateFail 536 metricFailingTotal.Inc() 537 } 538 tr.LazyPrintf("state: %s", state) 539 tr.Finish() 540 }() 541 542 tr.LazyPrintf("branches: %v", args.Branches) 543 544 if len(args.Branches) == 0 { 545 return indexStateEmpty, createEmptyShard(args) 546 } 547 548 repositoryName := args.Name 549 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 550 tr.LazyPrintf("marking this repository for delta build") 551 args.UseDelta = true 552 } 553 554 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 555 556 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 557 tr.LazyPrintf("skipping symbols calculation") 558 args.Symbols = false 559 } 560 561 reason := "forced" 562 563 if args.Incremental { 564 bo := args.BuildOptions() 565 bo.SetDefaults() 566 incrementalState, fn := bo.IndexState() 567 reason = string(incrementalState) 568 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 569 570 switch incrementalState { 571 case build.IndexStateEqual: 572 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn) 573 return indexStateNoop, nil 574 575 case build.IndexStateMeta: 576 log.Printf("updating index.meta %s", args.String()) 577 578 if err := mergeMeta(bo); err != nil { 579 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 580 } else { 581 return indexStateSuccessMeta, nil 582 } 583 584 case build.IndexStateCorrupt: 585 log.Printf("falling back to full update: corrupt index: %s", args.String()) 586 } 587 } 588 589 log.Printf("updating index %s reason=%s", args.String(), reason) 590 591 metricIndexingTotal.Inc() 592 c := gitIndexConfig{ 593 runCmd: func(cmd *exec.Cmd) error { 594 return s.loggedRun(tr, cmd) 595 }, 596 597 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 598 return args.BuildOptions().FindRepositoryMetadata() 599 }, 600 timeout: s.timeout, 601 } 602 603 err = gitIndex(c, args, s.Sourcegraph, s.logger) 604 if err != nil { 605 return indexStateFail, err 606 } 607 608 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 609 s.logger.Error("failed to update index status", 610 sglog.String("repo", args.Name), 611 sglog.Uint32("id", args.RepoID), 612 sglogBranches("branches", args.Branches), 613 sglog.Error(err), 614 ) 615 } 616 617 return indexStateSuccess, nil 618} 619 620// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 621// it can update the zoekt_repos table. 622func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 623 // We need to read from disk for IndexTime. 624 _, metadata, ok, err := c.findRepositoryMetadata(args) 625 if err != nil { 626 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 627 } 628 if !ok { 629 return errors.New("failed to find metadata for new/updated index") 630 } 631 632 status := []indexStatus{{ 633 RepoID: args.RepoID, 634 Branches: args.Branches, 635 IndexTimeUnix: metadata.IndexTime.Unix(), 636 }} 637 if err := sg.UpdateIndexStatus(status); err != nil { 638 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 639 } 640 641 return nil 642} 643 644func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 645 ss := make([]string, len(branches)) 646 for i, b := range branches { 647 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 648 } 649 return sglog.Strings(key, ss) 650} 651 652func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 653 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0)) 654 return &indexArgs{ 655 IndexOptions: opts, 656 IndexDir: s.IndexDir, 657 Parallelism: parallelism, 658 Incremental: true, 659 660 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 661 FileLimit: 1 << 20, 662 663 ShardMerging: s.shardMerging, 664 } 665} 666 667// parallelism consults both the server flags and index options to determine the number 668// of shards to index in parallel. If the CPUCount index option is provided, it always 669// overrides the server flag. 670func (s *Server) parallelism(opts IndexOptions, maxProcs int) int { 671 var parallelism int 672 if opts.ShardConcurrency > 0 { 673 parallelism = int(opts.ShardConcurrency) 674 } else { 675 parallelism = s.CPUCount 676 } 677 678 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs 679 if parallelism > 4*maxProcs { 680 parallelism = 4 * maxProcs 681 } 682 683 // If index concurrency is set, then divide the parallelism by the number of 684 // repos we're indexing in parallel 685 if s.IndexConcurrency > 1 { 686 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency))) 687 } 688 689 return parallelism 690} 691 692func createEmptyShard(args *indexArgs) error { 693 bo := args.BuildOptions() 694 bo.SetDefaults() 695 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 696 697 if args.Incremental && bo.IncrementalSkipIndexing() { 698 return nil 699 } 700 701 builder, err := build.NewBuilder(*bo) 702 if err != nil { 703 return err 704 } 705 return builder.Finish() 706} 707 708// addDebugHandlers adds handlers specific to indexserver. 709func (s *Server) addDebugHandlers(mux *http.ServeMux) { 710 // Sourcegraph's site admin view requires indexserver to serve it's admin view 711 // on "/". 712 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 713 714 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 715 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 716 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 717 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 718 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 719 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 720} 721 722func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 723 if r.Method != "GET" { 724 w.Header().Set("Allow", "GET") 725 w.WriteHeader(http.StatusMethodNotAllowed) 726 return 727 } 728 729 response := struct { 730 Hostname string 731 }{ 732 Hostname: s.hostname, 733 } 734 735 b, err := json.Marshal(response) 736 if err != nil { 737 http.Error(w, err.Error(), http.StatusInternalServerError) 738 return 739 } 740 741 w.Header().Set("Content-Type", "application/json; charset=utf-8") 742 w.Write(b) 743} 744 745var rootTmpl = template.Must(template.New("name").Parse(` 746<html> 747 <body> 748 <a href="debug">Debug</a><br /> 749 <a href="debug/requests">Traces</a><br /> 750 {{.IndexMsg}}<br /> 751 <br /> 752 <h3>Reindex</h3> 753 {{if .Repos}} 754 <a href="?show_repos=false">hide repos</a><br /> 755 <table style="margin-top: 20px"> 756 <th style="text-align:left">Name</th> 757 <th style="text-align:left">ID</th> 758 {{range .Repos}} 759 <tr> 760 <td>{{.Name}}</td> 761 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 762 </tr> 763 {{end}} 764 </table> 765 {{else}} 766 <a href="?show_repos=true">show repos</a><br /> 767 {{end}} 768 </body> 769</html> 770`)) 771 772func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 773 if r.Method != "GET" { 774 w.Header().Set("Allow", "GET") 775 w.WriteHeader(http.StatusMethodNotAllowed) 776 return 777 } 778 779 values := r.URL.Query() 780 781 // ?id= 782 indexMsg := "" 783 if v := values.Get("id"); v != "" { 784 id, err := strconv.Atoi(v) 785 if err != nil { 786 http.Error(w, err.Error(), http.StatusBadRequest) 787 return 788 } 789 indexMsg, _ = s.forceIndex(uint32(id)) 790 } 791 792 // ?show_repos= 793 showRepos := false 794 if v := values.Get("show_repos"); v != "" { 795 showRepos, _ = strconv.ParseBool(v) 796 } 797 798 type Repo struct { 799 ID uint32 800 Name string 801 } 802 var data struct { 803 Repos []Repo 804 IndexMsg string 805 } 806 807 data.IndexMsg = indexMsg 808 809 if showRepos { 810 s.queue.Iterate(func(opts *IndexOptions) { 811 data.Repos = append(data.Repos, Repo{ 812 ID: opts.RepoID, 813 Name: opts.Name, 814 }) 815 }) 816 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 817 } 818 819 _ = rootTmpl.Execute(w, data) 820} 821 822// handleReindex triggers a reindex asynocronously. If a reindex was triggered 823// the request returns with status 202. The caller can infer the new state of 824// the index by calling List. 825func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 826 if r.Method != http.MethodPost { 827 w.Header().Set("Allow", http.MethodPost) 828 w.WriteHeader(http.StatusMethodNotAllowed) 829 return 830 } 831 832 err := r.ParseForm() 833 if err != nil { 834 http.Error(w, err.Error(), http.StatusBadRequest) 835 return 836 } 837 838 id, err := strconv.Atoi(r.Form.Get("repo")) 839 if err != nil { 840 http.Error(w, err.Error(), http.StatusBadRequest) 841 return 842 } 843 844 go func() { s.forceIndex(uint32(id)) }() 845 846 // 202 Accepted 847 w.WriteHeader(http.StatusAccepted) 848} 849 850func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 851 withIndexed := true 852 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 853 withIndexed = b 854 } 855 856 var indexed []uint32 857 if withIndexed { 858 indexed = listIndexed(s.IndexDir) 859 } 860 861 repos, err := s.Sourcegraph.List(r.Context(), indexed) 862 if err != nil { 863 http.Error(w, err.Error(), http.StatusInternalServerError) 864 return 865 } 866 867 bw := bytes.Buffer{} 868 869 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 870 871 _, err = fmt.Fprintf(tw, "ID\tName\n") 872 if err != nil { 873 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 874 return 875 } 876 877 s.queue.mu.Lock() 878 name := "" 879 for _, id := range repos.IDs { 880 if item := s.queue.get(id); item != nil { 881 name = item.opts.Name 882 } else { 883 name = "" 884 } 885 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 886 if err != nil { 887 debug.Printf("handleDebugList: %s\n", err.Error()) 888 } 889 } 890 s.queue.mu.Unlock() 891 892 if err != nil { 893 http.Error(w, err.Error(), http.StatusInternalServerError) 894 return 895 } 896 897 err = tw.Flush() 898 if err != nil { 899 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 900 return 901 } 902 903 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 904 905 if _, err := io.Copy(w, &bw); err != nil { 906 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 907 return 908 } 909} 910 911// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 912// can run this command during periods of low usage (evenings, weekends) to 913// trigger an initial merge run. In the steady-state, merges happen rarely, even 914// on busy instances, and users can rely on automatic merging instead. 915func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 916 // A merge operation can take very long, depending on the number merges and the 917 // target size of the compound shards. We run the merge in the background and 918 // return immediately to the user. 919 // 920 // We track the status of the merge with metricShardMergingRunning. 921 go func() { 922 s.doMerge() 923 }() 924 _, _ = w.Write([]byte("merging enqueued\n")) 925} 926 927func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 928 indexed := listIndexed(s.IndexDir) 929 930 bw := bytes.Buffer{} 931 932 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 933 934 _, err := fmt.Fprintf(tw, "ID\tName\n") 935 if err != nil { 936 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 937 return 938 } 939 940 s.queue.mu.Lock() 941 name := "" 942 for _, id := range indexed { 943 if item := s.queue.get(id); item != nil { 944 name = item.opts.Name 945 } else { 946 name = "" 947 } 948 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 949 if err != nil { 950 debug.Printf("handleDebugIndexed: %s\n", err.Error()) 951 } 952 } 953 s.queue.mu.Unlock() 954 955 if err != nil { 956 http.Error(w, err.Error(), http.StatusInternalServerError) 957 return 958 } 959 960 err = tw.Flush() 961 if err != nil { 962 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 963 return 964 } 965 966 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 967 968 if _, err := io.Copy(w, &bw); err != nil { 969 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 970 return 971 } 972} 973 974// forceIndex will run the index job for repo name now. It will return always 975// return a string explaining what it did, even if it failed. 976func (s *Server) forceIndex(id uint32) (string, error) { 977 var opts IndexOptions 978 var err error 979 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 980 opts = o 981 }, func(_ uint32, e error) { 982 err = e 983 }, id) 984 if err != nil { 985 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 986 } 987 988 args := s.indexArgs(opts) 989 args.Incremental = false // force re-index 990 991 var state indexState 992 ran := s.muIndexDir.With(opts.Name, func() { 993 state, err = s.Index(args) 994 }) 995 if !ran { 996 return fmt.Sprintf("index job for repository already running: %s", args), nil 997 } 998 if err != nil { 999 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 1000 } 1001 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 1002} 1003 1004func listIndexed(indexDir string) []uint32 { 1005 index := getShards(indexDir) 1006 metricNumIndexed.Set(float64(len(index))) 1007 repoIDs := make([]uint32, 0, len(index)) 1008 for id := range index { 1009 repoIDs = append(repoIDs, id) 1010 } 1011 sort.Slice(repoIDs, func(i, j int) bool { 1012 return repoIDs[i] < repoIDs[j] 1013 }) 1014 return repoIDs 1015} 1016 1017// setupTmpDir sets up a temporary directory on the same volume as the 1018// indexes. 1019// 1020// If main is true we will delete older temp directories left around. main is 1021// false when this is a debug command. 1022func setupTmpDir(logger sglog.Logger, main bool, index string) error { 1023 // change the target tmp directory depending on if it's our main daemon or a 1024 // debug sub command. 1025 dir := ".indexserver.debug.tmp" 1026 if main { 1027 dir = ".indexserver.tmp" 1028 } 1029 1030 tmpRoot := filepath.Join(index, dir) 1031 1032 if main { 1033 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1034 err := os.RemoveAll(tmpRoot) 1035 if err != nil { 1036 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1037 } 1038 } 1039 1040 if err := os.MkdirAll(tmpRoot, 0o755); err != nil { 1041 return err 1042 } 1043 1044 return os.Setenv("TMPDIR", tmpRoot) 1045} 1046 1047func printMetaData(fn string) error { 1048 repo, indexMeta, err := zoekt.ReadMetadataPath(fn) 1049 if err != nil { 1050 return err 1051 } 1052 1053 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1054 if err != nil { 1055 return err 1056 } 1057 1058 err = json.NewEncoder(os.Stdout).Encode(repo) 1059 if err != nil { 1060 return err 1061 } 1062 return nil 1063} 1064 1065func printShardStats(fn string) error { 1066 f, err := os.Open(fn) 1067 if err != nil { 1068 return err 1069 } 1070 1071 iFile, err := zoekt.NewIndexFile(f) 1072 if err != nil { 1073 return err 1074 } 1075 1076 return zoekt.PrintNgramStats(iFile) 1077} 1078 1079func srcLogLevelIsDebug() bool { 1080 lvl := os.Getenv(sglog.EnvLogLevel) 1081 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1082} 1083 1084func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1085 v := os.Getenv(k) 1086 if v == "" { 1087 return defaultVal 1088 } 1089 b, err := strconv.ParseBool(v) 1090 if err != nil { 1091 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1092 } 1093 return b 1094} 1095 1096func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1097 v := os.Getenv(k) 1098 if v == "" { 1099 return defaultVal 1100 } 1101 i, err := strconv.ParseInt(v, 10, 64) 1102 if err != nil { 1103 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1104 } 1105 return i 1106} 1107 1108func getEnvWithDefaultInt(k string, defaultVal int) int { 1109 v := os.Getenv(k) 1110 if v == "" { 1111 return defaultVal 1112 } 1113 i, err := strconv.Atoi(k) 1114 if err != nil { 1115 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1116 } 1117 return i 1118} 1119 1120func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1121 v := os.Getenv(k) 1122 if v == "" { 1123 return defaultVal 1124 } 1125 i, err := strconv.ParseUint(v, 10, 64) 1126 if err != nil { 1127 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1128 } 1129 return i 1130} 1131 1132func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1133 v := os.Getenv(k) 1134 if v == "" { 1135 return defaultVal 1136 } 1137 f, err := strconv.ParseFloat(v, 64) 1138 if err != nil { 1139 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1140 } 1141 return f 1142} 1143 1144func getEnvWithDefaultString(k string, defaultVal string) string { 1145 v := os.Getenv(k) 1146 if v == "" { 1147 return defaultVal 1148 } 1149 return v 1150} 1151 1152func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1153 v := os.Getenv(k) 1154 if v == "" { 1155 return defaultVal 1156 } 1157 1158 d, err := time.ParseDuration(v) 1159 if err != nil { 1160 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1161 } 1162 return d 1163} 1164 1165func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1166 set := map[string]struct{}{} 1167 for _, v := range strings.Split(os.Getenv(k), ",") { 1168 v = strings.TrimSpace(v) 1169 if v != "" { 1170 set[v] = struct{}{} 1171 } 1172 } 1173 return set 1174} 1175 1176func joinStringSet(set map[string]struct{}, sep string) string { 1177 var xs []string 1178 for x := range set { 1179 xs = append(xs, x) 1180 } 1181 1182 return strings.Join(xs, sep) 1183} 1184 1185func setCompoundShardCounter(indexDir string) { 1186 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt")) 1187 if err != nil { 1188 log.Printf("setCompoundShardCounter: %s\n", err) 1189 return 1190 } 1191 metricNumberCompoundShards.Set(float64(len(fns))) 1192} 1193 1194func rootCmd() *ffcli.Command { 1195 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1196 conf := rootConfig{ 1197 Main: true, 1198 } 1199 conf.registerRootFlags(rootFs) 1200 1201 return &ffcli.Command{ 1202 FlagSet: rootFs, 1203 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1204 Subcommands: []*ffcli.Command{debugCmd()}, 1205 Exec: func(ctx context.Context, args []string) error { 1206 return startServer(conf) 1207 }, 1208 } 1209} 1210 1211type rootConfig struct { 1212 // Main is true if this rootConfig is for our main long running command (the 1213 // indexserver). Debug commands should not set this value. This is used to 1214 // determine if we need to run tmpfriend. 1215 Main bool 1216 1217 root string 1218 interval time.Duration 1219 index string 1220 indexConcurrency int64 1221 listen string 1222 hostname string 1223 cpuFraction float64 1224 blockProfileRate int 1225 1226 // config values related to shard merging 1227 disableShardMerging bool 1228 vacuumInterval time.Duration 1229 mergeInterval time.Duration 1230 targetSize int64 1231 minSize int64 1232 minAgeDays int 1233 1234 // config values related to backoff indexing repos with one or more consecutive failures 1235 backoffDuration time.Duration 1236 maxBackoffDuration time.Duration 1237} 1238 1239func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1240 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1241 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1242 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently") 1243 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1244 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1245 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1246 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1247 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate") 1248 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1249 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1250 1251 // flags related to shard merging 1252 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging") 1253 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1254 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1255 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 1256 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 1257 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1258} 1259 1260func startServer(conf rootConfig) error { 1261 s, err := newServer(conf) 1262 if err != nil { 1263 return err 1264 } 1265 1266 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate) 1267 setCompoundShardCounter(s.IndexDir) 1268 1269 if conf.listen != "" { 1270 1271 mux := http.NewServeMux() 1272 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1273 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1274 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1275 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1276 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1277 }...) 1278 s.addDebugHandlers(mux) 1279 1280 go func() { 1281 debug.Printf("serving HTTP on %s", conf.listen) 1282 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1283 }() 1284 1285 // Serve mux on a unix domain socket on a best-effort-basis so that 1286 // webserver can call the endpoints via the shared filesystem. 1287 // 1288 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1289 // on the socket due to permission errors. See 1290 // https://github.com/docker/for-mac/issues/6239 1291 go func() { 1292 serveHTTPOverSocket := func() error { 1293 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1294 // We cannot bind a socket to an existing pathname. 1295 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1296 return fmt.Errorf("error removing socket file: %s", socket) 1297 } 1298 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1299 // unixpacket). 1300 l, err := net.Listen("unix", socket) 1301 if err != nil { 1302 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1303 } 1304 // Indexserver (root) and webserver (Sourcegraph) run with 1305 // different users. Per default, the socket is created with 1306 // permission 755 (root root), which doesn't let webserver write to 1307 // it. 1308 // 1309 // See https://github.com/golang/go/issues/11822 for more context. 1310 if err := os.Chmod(socket, 0o777); err != nil { 1311 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1312 } 1313 debug.Printf("serving HTTP on %s", socket) 1314 return http.Serve(l, mux) 1315 } 1316 debug.Print(serveHTTPOverSocket()) 1317 }() 1318 } 1319 1320 oc := &ownerChecker{ 1321 Path: filepath.Join(conf.index, "owner.txt"), 1322 Hostname: conf.hostname, 1323 } 1324 go oc.Run() 1325 1326 logger := sglog.Scoped("metricsRegistration") 1327 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1328 1329 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1330 prometheus.DefaultRegisterer.MustRegister(c) 1331 1332 s.Run() 1333 return nil 1334} 1335 1336func newServer(conf rootConfig) (*Server, error) { 1337 logger := sglog.Scoped("server") 1338 1339 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1340 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1341 } 1342 if conf.index == "" { 1343 return nil, fmt.Errorf("must set -index") 1344 } 1345 if conf.root == "" { 1346 return nil, fmt.Errorf("must set -sourcegraph_url") 1347 } 1348 rootURL, err := url.Parse(conf.root) 1349 if err != nil { 1350 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1351 } 1352 1353 rootURL = addDefaultPort(rootURL) 1354 1355 // Tune GOMAXPROCS to match Linux container CPU quota. 1356 _, _ = maxprocs.Set() 1357 1358 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler. 1359 // The block profiler is disabled by default and should be enabled with care in production 1360 runtime.SetBlockProfileRate(conf.blockProfileRate) 1361 1362 // Automatically prepend our own path at the front, to minimize 1363 // required configuration. 1364 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1365 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1366 } 1367 1368 if _, err := os.Stat(conf.index); err != nil { 1369 if err := os.MkdirAll(conf.index, 0o755); err != nil { 1370 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1371 } 1372 } 1373 1374 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1375 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1376 } 1377 1378 if srcLogLevelIsDebug() { 1379 debug = log.New(os.Stderr, "", log.LstdFlags) 1380 } 1381 1382 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1383 if len(reposWithSeparateIndexingMetrics) > 0 { 1384 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1385 } 1386 1387 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1388 if len(deltaBuildRepositoriesAllowList) > 0 { 1389 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1390 } 1391 1392 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1393 if deltaShardNumberFallbackThreshold > 0 { 1394 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1395 } else { 1396 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1397 } 1398 1399 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1400 if len(reposShouldSkipSymbolsCalculation) > 0 { 1401 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1402 } 1403 1404 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1405 if indexingTimeout != defaultIndexingTimeout { 1406 debug.Printf("using configured indexing timeout: %s", indexingTimeout) 1407 } 1408 1409 var sg Sourcegraph 1410 if rootURL.IsAbs() { 1411 var batchSize int 1412 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1413 batchSize, err = strconv.Atoi(v) 1414 if err != nil { 1415 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1416 } 1417 } 1418 1419 opts := []SourcegraphClientOption{ 1420 WithBatchSize(batchSize), 1421 } 1422 1423 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1424 client, err := dialGRPCClient(rootURL.Host, logger) 1425 if err != nil { 1426 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1427 } 1428 1429 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...) 1430 1431 } else { 1432 sg = sourcegraphFake{ 1433 RootDir: rootURL.String(), 1434 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1435 } 1436 } 1437 1438 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1439 if cpuCount < 1 { 1440 cpuCount = 1 1441 } 1442 1443 if conf.indexConcurrency < 1 { 1444 conf.indexConcurrency = 1 1445 } else if conf.indexConcurrency > int64(cpuCount) { 1446 conf.indexConcurrency = int64(cpuCount) 1447 } 1448 1449 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1450 1451 return &Server{ 1452 logger: logger, 1453 Sourcegraph: sg, 1454 IndexDir: conf.index, 1455 IndexConcurrency: int(conf.indexConcurrency), 1456 Interval: conf.interval, 1457 CPUCount: cpuCount, 1458 queue: *q, 1459 shardMerging: !conf.disableShardMerging, 1460 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1461 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1462 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1463 hostname: conf.hostname, 1464 mergeOpts: mergeOpts{ 1465 vacuumInterval: conf.vacuumInterval, 1466 mergeInterval: conf.mergeInterval, 1467 targetSizeBytes: conf.targetSize * 1024 * 1024, 1468 minSizeBytes: conf.minSize * 1024 * 1024, 1469 minAgeDays: conf.minAgeDays, 1470 }, 1471 timeout: indexingTimeout, 1472 }, err 1473} 1474 1475// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1476// for the indexed-search-configuration gRPC service. 1477// 1478// The default backoff strategy is modeled after the default settings used by 1479// retryablehttp.DefaultClient. 1480// 1481// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1482// - Unavailable 1483// - Aborted 1484// 1485//go:embed default_grpc_service_configuration.json 1486var defaultGRPCServiceConfigurationJSON string 1487 1488func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1489 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1490 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1491 return invoker(ctx, method, req, reply, cc, opts...) 1492 } 1493} 1494 1495func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1496 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1497 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1498 return streamer(ctx, desc, cc, method, opts...) 1499 } 1500} 1501 1502// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1503// This can be overridden by providing custom Server/Dial options. 1504const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1505 1506func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) { 1507 metrics := mustGetClientMetrics() 1508 1509 // If the service seems to be unavailable, this 1510 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1511 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1512 retryOpts := []retry.CallOption{ 1513 retry.WithMax(5), 1514 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1515 retry.WithCodes(codes.Unavailable), 1516 } 1517 1518 opts := []grpc.DialOption{ 1519 grpc.WithTransportCredentials(insecure.NewCredentials()), 1520 grpc.WithChainStreamInterceptor( 1521 metrics.StreamClientInterceptor(), 1522 messagesize.StreamClientInterceptor, 1523 internalActorStreamInterceptor(), 1524 internalerrs.LoggingStreamClientInterceptor(logger), 1525 internalerrs.PrometheusStreamClientInterceptor, 1526 retry.StreamClientInterceptor(retryOpts...), 1527 ), 1528 grpc.WithChainUnaryInterceptor( 1529 metrics.UnaryClientInterceptor(), 1530 messagesize.UnaryClientInterceptor, 1531 internalActorUnaryInterceptor(), 1532 internalerrs.LoggingUnaryClientInterceptor(logger), 1533 internalerrs.PrometheusUnaryClientInterceptor, 1534 retry.UnaryClientInterceptor(retryOpts...), 1535 ), 1536 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1537 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1538 } 1539 1540 opts = append(opts, additionalOpts...) 1541 1542 // Ensure that the message size options are set last, so they override any other 1543 // client-specific options that tweak the message size. 1544 // 1545 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1546 // take precedence over everything else with a uniform size setting that's easy to reason about. 1547 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1548 1549 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1550 // This is done lazily, so we can provide the client to use regardless of 1551 // whether we enabled gRPC or not initially. 1552 cc, err := grpc.Dial(addr, opts...) 1553 if err != nil { 1554 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1555 } 1556 1557 client := proto.NewZoektConfigurationServiceClient(cc) 1558 return client, nil 1559} 1560 1561// mustGetClientMetrics returns a singleton instance of the client metrics 1562// that are shared across all gRPC clients that this process creates. 1563// 1564// This function panics if the metrics cannot be registered with the default 1565// Prometheus registry. 1566func mustGetClientMetrics() *grpcprom.ClientMetrics { 1567 clientMetricsOnce.Do(func() { 1568 clientMetrics = grpcprom.NewClientMetrics( 1569 grpcprom.WithClientCounterOptions(), 1570 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 1571 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 1572 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 1573 ) 1574 1575 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 1576 }) 1577 1578 return clientMetrics 1579} 1580 1581// addDefaultPort adds a default port to a URL if one is not specified. 1582// 1583// If the URL scheme is "http" and no port is specified, "80" is used. 1584// If the scheme is "https", "443" is used. 1585// 1586// The original URL is not mutated. A copy is modified and returned. 1587func addDefaultPort(original *url.URL) *url.URL { 1588 if original == nil { 1589 return nil // don't panic 1590 } 1591 1592 if !original.IsAbs() { 1593 return original // don't do anything if the URL doesn't look like a remote URL 1594 } 1595 1596 if original.Scheme == "http" && original.Port() == "" { 1597 u := cloneURL(original) 1598 u.Host = net.JoinHostPort(u.Host, "80") 1599 return u 1600 } 1601 1602 if original.Scheme == "https" && original.Port() == "" { 1603 u := cloneURL(original) 1604 u.Host = net.JoinHostPort(u.Host, "443") 1605 return u 1606 } 1607 1608 return original 1609} 1610 1611// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1612// This is copied from net/http/clone.go 1613func cloneURL(u *url.URL) *url.URL { 1614 if u == nil { 1615 return nil 1616 } 1617 u2 := new(url.URL) 1618 *u2 = *u 1619 if u.User != nil { 1620 u2.User = new(url.Userinfo) 1621 *u2.User = *u.User 1622 } 1623 return u2 1624} 1625 1626func main() { 1627 liblog := sglog.Init(sglog.Resource{ 1628 Name: "zoekt-indexserver", 1629 Version: zoekt.Version, 1630 InstanceID: zoekt.HostnameBestEffort(), 1631 }) 1632 defer liblog.Sync() 1633 1634 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1635 log.Fatal(err) 1636 } 1637} 1638 1639// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1640// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1641// 1642// An error is returned of the provided environment variables fails to parse as a boolean. 1643func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1644 for _, envVar := range envVarNames { 1645 v := os.Getenv(envVar) 1646 if v == "" { 1647 continue 1648 } 1649 1650 b, err := strconv.ParseBool(v) 1651 if err != nil { 1652 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1653 } 1654 1655 return b, nil 1656 } 1657 1658 return defaultBool, nil 1659}