fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled 2// repositories on sourcegraph 3package main 4 5import ( 6 "bytes" 7 "context" 8 _ "embed" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "html/template" 14 "io" 15 "io/fs" 16 "log" 17 "math" 18 "math/rand" 19 "net" 20 "net/http" 21 "net/url" 22 "os" 23 "os/exec" 24 "os/signal" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strconv" 29 "strings" 30 "sync" 31 "text/tabwriter" 32 "time" 33 34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 36 "github.com/peterbourgon/ff/v3/ffcli" 37 "github.com/prometheus/client_golang/prometheus" 38 "github.com/prometheus/client_golang/prometheus/promauto" 39 sglog "github.com/sourcegraph/log" 40 "github.com/sourcegraph/mountinfo" 41 "go.uber.org/automaxprocs/maxprocs" 42 "golang.org/x/net/trace" 43 "golang.org/x/sys/unix" 44 "google.golang.org/grpc" 45 "google.golang.org/grpc/codes" 46 "google.golang.org/grpc/credentials/insecure" 47 "google.golang.org/grpc/metadata" 48 49 "github.com/sourcegraph/zoekt" 50 "github.com/sourcegraph/zoekt/build" 51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 52 "github.com/sourcegraph/zoekt/debugserver" 53 "github.com/sourcegraph/zoekt/grpc/internalerrs" 54 "github.com/sourcegraph/zoekt/grpc/messagesize" 55 "github.com/sourcegraph/zoekt/internal/profiler" 56) 57 58var ( 59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 60 Name: "resolve_revisions_seconds", 61 Help: "A histogram of latencies for resolving all repository revisions.", 62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 63 }) 64 65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 66 Name: "resolve_revision_seconds", 67 Help: "A histogram of latencies for resolving a repository revision.", 68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 69 }, []string{"success"}) // success=true|false 70 71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 72 Name: "get_index_options_total", 73 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 74 }) 75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 76 Name: "get_index_options_error_total", 77 Help: "The total number of times we failed to get index options for a repository.", 78 }) 79 80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 81 Name: "index_repo_seconds", 82 Help: "A histogram of latencies for indexing a repository.", 83 Buckets: prometheus.ExponentialBucketsRange( 84 (100 * time.Millisecond).Seconds(), 85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 86 20), 87 }, []string{ 88 "state", // state is an indexState 89 "name", // name of the repository that was indexed 90 }) 91 92 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{ 93 Name: "index_indexing_delay_seconds", 94 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.", 95 Buckets: []float64{ 96 60, // 1m 97 300, // 5m 98 1200, // 20m 99 2400, // 40m 100 3600, // 1h 101 10800, // 3h 102 18000, // 5h 103 36000, // 10h 104 43200, // 12h 105 54000, // 15h 106 72000, // 20h 107 86400, // 24h 108 108000, // 30h 109 126000, // 35h 110 172800, // 48h 111 }}, []string{ 112 "state", // state is an indexState 113 "name", // the name of the repository that was indexed 114 }) 115 116 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 117 Name: "index_fetch_seconds", 118 Help: "A histogram of latencies for fetching a repository.", 119 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 120 }, []string{ 121 "success", // true|false 122 "name", // the name of the repository that the commits were fetched from 123 }) 124 125 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 126 Name: "index_incremental_index_state", 127 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 128 }, []string{"state"}) // state is build.IndexState 129 130 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 131 Name: "index_num_indexed", 132 Help: "Number of indexed repos by code host", 133 }) 134 135 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 136 Name: "index_failing_total", 137 Help: "Counts failures to index (indexing activity, should be used with rate())", 138 }) 139 140 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 141 Name: "index_indexing_total", 142 Help: "Counts indexings (indexing activity, should be used with rate())", 143 }) 144 145 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 146 Name: "index_num_stopped_tracking_total", 147 Help: "Counts the number of repos we stopped tracking.", 148 }) 149 150 clientMetricsOnce sync.Once 151 clientMetrics *grpcprom.ClientMetrics 152) 153 154// set of repositories that we want to capture separate indexing metrics for 155var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 156 157type indexState string 158 159const ( 160 indexStateFail indexState = "fail" 161 indexStateSuccess indexState = "success" 162 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 163 indexStateNoop indexState = "noop" // We didn't need to update index 164 indexStateEmpty indexState = "empty" // index is empty (empty repo) 165) 166 167// Server is the main functionality of zoekt-sourcegraph-indexserver. It 168// exists to conveniently use all the options passed in via func main. 169type Server struct { 170 logger sglog.Logger 171 172 Sourcegraph Sourcegraph 173 BatchSize int 174 175 // IndexDir is the index directory to use. 176 IndexDir string 177 178 // IndexConcurrency is the number of repositories we index at once. 179 IndexConcurrency int 180 181 // Interval is how often we sync with Sourcegraph. 182 Interval time.Duration 183 184 // CPUCount is the number of CPUs to use for indexing shards. 185 CPUCount int 186 187 queue Queue 188 189 // muIndexDir protects the index directory from concurrent access. 190 muIndexDir indexMutex 191 192 // If true, shard merging is enabled. 193 shardMerging bool 194 195 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 196 // use delta-builds for instead of normal builds 197 deltaBuildRepositoriesAllowList map[string]struct{} 198 199 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 200 // before attempting a delta build. 201 deltaShardNumberFallbackThreshold uint64 202 203 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 204 // we skip calculating symbols metadata for during builds 205 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 206 207 hostname string 208 209 mergeOpts mergeOpts 210 211 // timeout defines how long the index server waits before killing an indexing job. 212 timeout time.Duration 213} 214 215var debug = log.New(io.Discard, "", log.LstdFlags) 216 217// our index commands should output something every 100mb they process. 218// 219// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 220// time." famous last words. A client was indexing a monorepo with 42 221// cores... 5m was not enough. 222const noOutputTimeout = 30 * time.Minute 223 224func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 225 out := &synchronizedBuffer{} 226 cmd.Stdout = out 227 cmd.Stderr = out 228 229 tr.LazyPrintf("%s", cmd.Args) 230 231 defer func() { 232 if err != nil { 233 outS := out.String() 234 tr.LazyPrintf("failed: %v", err) 235 tr.LazyPrintf("output: %s", out) 236 tr.SetError() 237 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 238 } 239 }() 240 241 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 242 243 if err := cmd.Start(); err != nil { 244 return err 245 } 246 247 errC := make(chan error) 248 go func() { 249 errC <- cmd.Wait() 250 }() 251 252 // This channel is set after we have sent sigquit. It allows us to follow up 253 // with a sigkill if the process doesn't quit after sigquit. 254 kill := make(<-chan time.Time) 255 256 lastLen := 0 257 for { 258 select { 259 case <-time.After(noOutputTimeout): 260 // Periodically check if we have had output. If not kill the process. 261 if out.Len() != lastLen { 262 lastLen = out.Len() 263 log.Printf("still running %s", cmd.Args) 264 } else { 265 // Send quit (C-\) first so we get a stack dump. 266 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 267 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 268 log.Println("quit failed:", err) 269 } 270 271 // send sigkill if still running in 10s 272 kill = time.After(10 * time.Second) 273 } 274 275 case <-kill: 276 log.Printf("still running, killing %s", cmd.Args) 277 if err := cmd.Process.Kill(); err != nil { 278 log.Println("kill failed:", err) 279 } 280 281 case err := <-errC: 282 if err != nil { 283 return err 284 } 285 286 tr.LazyPrintf("success") 287 return nil 288 } 289 } 290} 291 292// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 293// monitor the buffer while it is being written to. 294type synchronizedBuffer struct { 295 mu sync.Mutex 296 b bytes.Buffer 297} 298 299func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 300 sb.mu.Lock() 301 defer sb.mu.Unlock() 302 return sb.b.Write(p) 303} 304 305func (sb *synchronizedBuffer) Len() int { 306 sb.mu.Lock() 307 defer sb.mu.Unlock() 308 return sb.b.Len() 309} 310 311func (sb *synchronizedBuffer) String() string { 312 sb.mu.Lock() 313 defer sb.mu.Unlock() 314 return sb.b.String() 315} 316 317// pauseFileName if present in IndexDir will stop index jobs from 318// running. This is to make it possible to experiment with the content of the 319// IndexDir without the indexserver writing to it. 320const pauseFileName = "PAUSE" 321 322// Run the sync loop. This blocks forever. 323func (s *Server) Run() { 324 removeIncompleteShards(s.IndexDir) 325 326 // Start a goroutine which updates the queue with commits to index. 327 go func() { 328 // We update the list of indexed repos every Interval. To speed up manual 329 // testing we also listen for SIGUSR1 to trigger updates. 330 // 331 // "pkill -SIGUSR1 zoekt-sourcegra" 332 for range jitterTicker(s.Interval, unix.SIGUSR1) { 333 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 334 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 335 continue 336 } 337 338 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 339 if err != nil { 340 log.Printf("error listing repos: %s", err) 341 continue 342 } 343 344 debug.Printf("updating index queue with %d repositories", len(repos.IDs)) 345 346 // Stop indexing repos we don't need to track anymore 347 removed := s.queue.MaybeRemoveMissing(repos.IDs) 348 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 349 if len(removed) > 0 { 350 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 351 } 352 353 cleanupDone := make(chan struct{}) 354 go func() { 355 defer close(cleanupDone) 356 s.muIndexDir.Global(func() { 357 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 358 }) 359 }() 360 361 repos.IterateIndexOptions(s.queue.AddOrUpdate) 362 363 // IterateIndexOptions will only iterate over repositories that have 364 // changed since we last called list. However, we want to add all IDs 365 // back onto the queue just to check that what is on disk is still 366 // correct. This will use the last IndexOptions we stored in the 367 // queue. The repositories not on the queue (missing) need a forced 368 // fetch of IndexOptions. 369 missing := s.queue.Bump(repos.IDs) 370 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 371 372 setCompoundShardCounter(s.IndexDir) 373 374 <-cleanupDone 375 } 376 }() 377 378 go func() { 379 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 380 if s.shardMerging { 381 s.vacuum() 382 } 383 } 384 }() 385 386 go func() { 387 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 388 if s.shardMerging { 389 s.doMerge() 390 } 391 } 392 }() 393 394 for i := 0; i < s.IndexConcurrency; i++ { 395 go s.processQueue() 396 } 397 398 // block forever 399 select {} 400} 401 402// formatList returns a comma-separated list of the first min(len(v), m) items. 403func formatListUint32(v []uint32, m int) string { 404 if len(v) < m { 405 m = len(v) 406 } 407 408 sb := strings.Builder{} 409 for i := 0; i < m; i++ { 410 fmt.Fprintf(&sb, "%d, ", v[i]) 411 } 412 413 if len(v) > m { 414 sb.WriteString("...") 415 } 416 417 return strings.TrimRight(sb.String(), ", ") 418} 419 420func (s *Server) processQueue() { 421 for { 422 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 423 time.Sleep(time.Second) 424 continue 425 } 426 427 item, ok := s.queue.Pop() 428 if !ok { 429 time.Sleep(time.Second) 430 continue 431 } 432 433 opts := item.Opts 434 args := s.indexArgs(opts) 435 436 ran := s.muIndexDir.With(opts.Name, func() { 437 // only record time taken once we hold the lock. This avoids us 438 // recording time taken while merging/cleanup runs. 439 start := time.Now() 440 441 state, err := s.Index(args) 442 443 elapsed := time.Since(start) 444 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 445 446 indexDelay := time.Since(item.DateAddedToQueue) 447 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds()) 448 449 if err != nil { 450 log.Printf("error indexing %s: %s", args.String(), err) 451 } 452 453 switch state { 454 case indexStateSuccess: 455 var branches []string 456 for _, b := range args.Branches { 457 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 458 } 459 s.logger.Info("updated index", 460 sglog.String("repo", args.Name), 461 sglog.Uint32("id", args.RepoID), 462 sglog.Strings("branches", branches), 463 sglog.Duration("duration", elapsed), 464 sglog.Duration("index_delay", indexDelay), 465 ) 466 case indexStateSuccessMeta: 467 log.Printf("updated meta %s in %v", args.String(), elapsed) 468 } 469 s.queue.SetIndexed(opts, state) 470 }) 471 472 if !ran { 473 // Someone else is processing the repository. We can just skip this job 474 // since the repository will be added back to the queue and we will 475 // converge to the correct behaviour. 476 debug.Printf("index job for repository already running: %s", args) 477 continue 478 } 479 } 480} 481 482// repoNameForMetric returns a normalized version of the given repository name that is 483// suitable for use with Prometheus metrics. 484func repoNameForMetric(repo string) string { 485 // Check to see if we want to be able to capture separate indexing metrics for this repository. 486 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 487 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 488 return repo 489 } 490 491 return "" 492} 493 494func batched(slice []uint32, size int) <-chan []uint32 { 495 c := make(chan []uint32) 496 go func() { 497 for len(slice) > 0 { 498 if size > len(slice) { 499 size = len(slice) 500 } 501 c <- slice[:size] 502 slice = slice[size:] 503 } 504 close(c) 505 }() 506 return c 507} 508 509// jitterTicker returns a ticker which ticks with a jitter. Each tick is 510// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 511// 512// sig is a list of signals which also cause the ticker to fire. This is a 513// convenience to allow manually triggering of the ticker. 514func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 515 ticker := make(chan struct{}) 516 517 go func() { 518 for { 519 ticker <- struct{}{} 520 ns := int64(d) 521 jitter := rand.Int63n(ns) 522 time.Sleep(time.Duration(ns/2 + jitter)) 523 } 524 }() 525 526 go func() { 527 if len(sig) == 0 { 528 return 529 } 530 531 c := make(chan os.Signal, 1) 532 signal.Notify(c, sig...) 533 for range c { 534 ticker <- struct{}{} 535 } 536 }() 537 538 return ticker 539} 540 541// Index starts an index job for repo name at commit. 542func (s *Server) Index(args *indexArgs) (state indexState, err error) { 543 tr := trace.New("index", args.Name) 544 545 defer func() { 546 if err != nil { 547 tr.SetError() 548 tr.LazyPrintf("error: %v", err) 549 state = indexStateFail 550 metricFailingTotal.Inc() 551 } 552 tr.LazyPrintf("state: %s", state) 553 tr.Finish() 554 }() 555 556 tr.LazyPrintf("branches: %v", args.Branches) 557 558 if len(args.Branches) == 0 { 559 return indexStateEmpty, createEmptyShard(args) 560 } 561 562 repositoryName := args.Name 563 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 564 tr.LazyPrintf("marking this repository for delta build") 565 args.UseDelta = true 566 } 567 568 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 569 570 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 571 tr.LazyPrintf("skipping symbols calculation") 572 args.Symbols = false 573 } 574 575 reason := "forced" 576 577 if args.Incremental { 578 bo := args.BuildOptions() 579 bo.SetDefaults() 580 incrementalState, fn := bo.IndexState() 581 reason = string(incrementalState) 582 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 583 584 switch incrementalState { 585 case build.IndexStateEqual: 586 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn) 587 return indexStateNoop, nil 588 589 case build.IndexStateMeta: 590 log.Printf("updating index.meta %s", args.String()) 591 592 if err := mergeMeta(bo); err != nil { 593 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 594 } else { 595 return indexStateSuccessMeta, nil 596 } 597 598 case build.IndexStateCorrupt: 599 log.Printf("falling back to full update: corrupt index: %s", args.String()) 600 } 601 } 602 603 log.Printf("updating index %s reason=%s", args.String(), reason) 604 605 metricIndexingTotal.Inc() 606 c := gitIndexConfig{ 607 runCmd: func(cmd *exec.Cmd) error { 608 return s.loggedRun(tr, cmd) 609 }, 610 611 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 612 return args.BuildOptions().FindRepositoryMetadata() 613 }, 614 timeout: s.timeout, 615 } 616 617 err = gitIndex(c, args, s.Sourcegraph, s.logger) 618 if err != nil { 619 return indexStateFail, err 620 } 621 622 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 623 s.logger.Error("failed to update index status", 624 sglog.String("repo", args.Name), 625 sglog.Uint32("id", args.RepoID), 626 sglogBranches("branches", args.Branches), 627 sglog.Error(err), 628 ) 629 } 630 631 return indexStateSuccess, nil 632} 633 634// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 635// it can update the zoekt_repos table. 636func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 637 // We need to read from disk for IndexTime. 638 _, metadata, ok, err := c.findRepositoryMetadata(args) 639 if err != nil { 640 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 641 } 642 if !ok { 643 return errors.New("failed to find metadata for new/updated index") 644 } 645 646 status := []indexStatus{{ 647 RepoID: args.RepoID, 648 Branches: args.Branches, 649 IndexTimeUnix: metadata.IndexTime.Unix(), 650 }} 651 if err := sg.UpdateIndexStatus(status); err != nil { 652 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 653 } 654 655 return nil 656} 657 658func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 659 ss := make([]string, len(branches)) 660 for i, b := range branches { 661 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 662 } 663 return sglog.Strings(key, ss) 664} 665 666func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 667 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0)) 668 return &indexArgs{ 669 IndexOptions: opts, 670 IndexDir: s.IndexDir, 671 Parallelism: parallelism, 672 Incremental: true, 673 674 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 675 FileLimit: 1 << 20, 676 677 ShardMerging: s.shardMerging, 678 } 679} 680 681// parallelism consults both the server flags and index options to determine the number 682// of shards to index in parallel. If the CPUCount index option is provided, it always 683// overrides the server flag. 684func (s *Server) parallelism(opts IndexOptions, maxProcs int) int { 685 var parallelism int 686 if opts.ShardConcurrency > 0 { 687 parallelism = int(opts.ShardConcurrency) 688 } else { 689 parallelism = s.CPUCount 690 } 691 692 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs 693 if parallelism > 4*maxProcs { 694 parallelism = 4 * maxProcs 695 } 696 697 // If index concurrency is set, then divide the parallelism by the number of 698 // repos we're indexing in parallel 699 if s.IndexConcurrency > 1 { 700 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency))) 701 } 702 703 return parallelism 704} 705 706func createEmptyShard(args *indexArgs) error { 707 bo := args.BuildOptions() 708 bo.SetDefaults() 709 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 710 711 if args.Incremental && bo.IncrementalSkipIndexing() { 712 return nil 713 } 714 715 builder, err := build.NewBuilder(*bo) 716 if err != nil { 717 return err 718 } 719 return builder.Finish() 720} 721 722// addDebugHandlers adds handlers specific to indexserver. 723func (s *Server) addDebugHandlers(mux *http.ServeMux) { 724 // Sourcegraph's site admin view requires indexserver to serve it's admin view 725 // on "/". 726 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 727 728 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 729 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 730 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 731 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 732 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 733 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 734} 735 736func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 737 if r.Method != "GET" { 738 w.Header().Set("Allow", "GET") 739 w.WriteHeader(http.StatusMethodNotAllowed) 740 return 741 } 742 743 response := struct { 744 Hostname string 745 }{ 746 Hostname: s.hostname, 747 } 748 749 b, err := json.Marshal(response) 750 if err != nil { 751 http.Error(w, err.Error(), http.StatusInternalServerError) 752 return 753 } 754 755 w.Header().Set("Content-Type", "application/json; charset=utf-8") 756 w.Write(b) 757} 758 759var rootTmpl = template.Must(template.New("name").Parse(` 760<html> 761 <body> 762 <a href="debug">Debug</a><br /> 763 <a href="debug/requests">Traces</a><br /> 764 {{.IndexMsg}}<br /> 765 <br /> 766 <h3>Reindex</h3> 767 {{if .Repos}} 768 <a href="?show_repos=false">hide repos</a><br /> 769 <table style="margin-top: 20px"> 770 <th style="text-align:left">Name</th> 771 <th style="text-align:left">ID</th> 772 {{range .Repos}} 773 <tr> 774 <td>{{.Name}}</td> 775 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 776 </tr> 777 {{end}} 778 </table> 779 {{else}} 780 <a href="?show_repos=true">show repos</a><br /> 781 {{end}} 782 </body> 783</html> 784`)) 785 786func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 787 if r.Method != "GET" { 788 w.Header().Set("Allow", "GET") 789 w.WriteHeader(http.StatusMethodNotAllowed) 790 return 791 } 792 793 values := r.URL.Query() 794 795 // ?id= 796 indexMsg := "" 797 if v := values.Get("id"); v != "" { 798 id, err := strconv.Atoi(v) 799 if err != nil { 800 http.Error(w, err.Error(), http.StatusBadRequest) 801 return 802 } 803 indexMsg, _ = s.forceIndex(uint32(id)) 804 } 805 806 // ?show_repos= 807 showRepos := false 808 if v := values.Get("show_repos"); v != "" { 809 showRepos, _ = strconv.ParseBool(v) 810 } 811 812 type Repo struct { 813 ID uint32 814 Name string 815 } 816 var data struct { 817 Repos []Repo 818 IndexMsg string 819 } 820 821 data.IndexMsg = indexMsg 822 823 if showRepos { 824 s.queue.Iterate(func(opts *IndexOptions) { 825 data.Repos = append(data.Repos, Repo{ 826 ID: opts.RepoID, 827 Name: opts.Name, 828 }) 829 }) 830 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 831 } 832 833 _ = rootTmpl.Execute(w, data) 834} 835 836// handleReindex triggers a reindex asynocronously. If a reindex was triggered 837// the request returns with status 202. The caller can infer the new state of 838// the index by calling List. 839func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 840 if r.Method != http.MethodPost { 841 w.Header().Set("Allow", http.MethodPost) 842 w.WriteHeader(http.StatusMethodNotAllowed) 843 return 844 } 845 846 err := r.ParseForm() 847 if err != nil { 848 http.Error(w, err.Error(), http.StatusBadRequest) 849 return 850 } 851 852 id, err := strconv.Atoi(r.Form.Get("repo")) 853 if err != nil { 854 http.Error(w, err.Error(), http.StatusBadRequest) 855 return 856 } 857 858 go func() { s.forceIndex(uint32(id)) }() 859 860 // 202 Accepted 861 w.WriteHeader(http.StatusAccepted) 862} 863 864func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 865 withIndexed := true 866 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 867 withIndexed = b 868 } 869 870 var indexed []uint32 871 if withIndexed { 872 indexed = listIndexed(s.IndexDir) 873 } 874 875 repos, err := s.Sourcegraph.List(r.Context(), indexed) 876 if err != nil { 877 http.Error(w, err.Error(), http.StatusInternalServerError) 878 return 879 } 880 881 bw := bytes.Buffer{} 882 883 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 884 885 _, err = fmt.Fprintf(tw, "ID\tName\n") 886 if err != nil { 887 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 888 return 889 } 890 891 s.queue.mu.Lock() 892 name := "" 893 for _, id := range repos.IDs { 894 if item := s.queue.get(id); item != nil { 895 name = item.opts.Name 896 } else { 897 name = "" 898 } 899 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 900 if err != nil { 901 debug.Printf("handleDebugList: %s\n", err.Error()) 902 } 903 } 904 s.queue.mu.Unlock() 905 906 if err != nil { 907 http.Error(w, err.Error(), http.StatusInternalServerError) 908 return 909 } 910 911 err = tw.Flush() 912 if err != nil { 913 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 914 return 915 } 916 917 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 918 919 if _, err := io.Copy(w, &bw); err != nil { 920 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 921 return 922 } 923} 924 925// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 926// can run this command during periods of low usage (evenings, weekends) to 927// trigger an initial merge run. In the steady-state, merges happen rarely, even 928// on busy instances, and users can rely on automatic merging instead. 929func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 930 // A merge operation can take very long, depending on the number merges and the 931 // target size of the compound shards. We run the merge in the background and 932 // return immediately to the user. 933 // 934 // We track the status of the merge with metricShardMergingRunning. 935 go func() { 936 s.doMerge() 937 }() 938 _, _ = w.Write([]byte("merging enqueued\n")) 939} 940 941func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 942 indexed := listIndexed(s.IndexDir) 943 944 bw := bytes.Buffer{} 945 946 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 947 948 _, err := fmt.Fprintf(tw, "ID\tName\n") 949 if err != nil { 950 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 951 return 952 } 953 954 s.queue.mu.Lock() 955 name := "" 956 for _, id := range indexed { 957 if item := s.queue.get(id); item != nil { 958 name = item.opts.Name 959 } else { 960 name = "" 961 } 962 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 963 if err != nil { 964 debug.Printf("handleDebugIndexed: %s\n", err.Error()) 965 } 966 } 967 s.queue.mu.Unlock() 968 969 if err != nil { 970 http.Error(w, err.Error(), http.StatusInternalServerError) 971 return 972 } 973 974 err = tw.Flush() 975 if err != nil { 976 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 977 return 978 } 979 980 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 981 982 if _, err := io.Copy(w, &bw); err != nil { 983 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 984 return 985 } 986} 987 988// forceIndex will run the index job for repo name now. It will return always 989// return a string explaining what it did, even if it failed. 990func (s *Server) forceIndex(id uint32) (string, error) { 991 var opts IndexOptions 992 var err error 993 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 994 opts = o 995 }, func(_ uint32, e error) { 996 err = e 997 }, id) 998 if err != nil { 999 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 1000 } 1001 1002 args := s.indexArgs(opts) 1003 args.Incremental = false // force re-index 1004 1005 var state indexState 1006 ran := s.muIndexDir.With(opts.Name, func() { 1007 state, err = s.Index(args) 1008 }) 1009 if !ran { 1010 return fmt.Sprintf("index job for repository already running: %s", args), nil 1011 } 1012 if err != nil { 1013 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 1014 } 1015 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 1016} 1017 1018func listIndexed(indexDir string) []uint32 { 1019 index := getShards(indexDir) 1020 metricNumIndexed.Set(float64(len(index))) 1021 repoIDs := make([]uint32, 0, len(index)) 1022 for id := range index { 1023 repoIDs = append(repoIDs, id) 1024 } 1025 sort.Slice(repoIDs, func(i, j int) bool { 1026 return repoIDs[i] < repoIDs[j] 1027 }) 1028 return repoIDs 1029} 1030 1031// setupTmpDir sets up a temporary directory on the same volume as the 1032// indexes. 1033// 1034// If main is true we will delete older temp directories left around. main is 1035// false when this is a debug command. 1036func setupTmpDir(logger sglog.Logger, main bool, index string) error { 1037 // change the target tmp directory depending on if it's our main daemon or a 1038 // debug sub command. 1039 dir := ".indexserver.debug.tmp" 1040 if main { 1041 dir = ".indexserver.tmp" 1042 } 1043 1044 tmpRoot := filepath.Join(index, dir) 1045 1046 if main { 1047 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1048 err := os.RemoveAll(tmpRoot) 1049 if err != nil { 1050 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1051 } 1052 } 1053 1054 if err := os.MkdirAll(tmpRoot, 0o755); err != nil { 1055 return err 1056 } 1057 1058 return os.Setenv("TMPDIR", tmpRoot) 1059} 1060 1061func printMetaData(fn string) error { 1062 repo, indexMeta, err := zoekt.ReadMetadataPath(fn) 1063 if err != nil { 1064 return err 1065 } 1066 1067 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1068 if err != nil { 1069 return err 1070 } 1071 1072 err = json.NewEncoder(os.Stdout).Encode(repo) 1073 if err != nil { 1074 return err 1075 } 1076 return nil 1077} 1078 1079func printShardStats(fn string) error { 1080 f, err := os.Open(fn) 1081 if err != nil { 1082 return err 1083 } 1084 1085 iFile, err := zoekt.NewIndexFile(f) 1086 if err != nil { 1087 return err 1088 } 1089 1090 return zoekt.PrintNgramStats(iFile) 1091} 1092 1093func srcLogLevelIsDebug() bool { 1094 lvl := os.Getenv(sglog.EnvLogLevel) 1095 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1096} 1097 1098func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1099 v := os.Getenv(k) 1100 if v == "" { 1101 return defaultVal 1102 } 1103 b, err := strconv.ParseBool(v) 1104 if err != nil { 1105 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1106 } 1107 return b 1108} 1109 1110func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1111 v := os.Getenv(k) 1112 if v == "" { 1113 return defaultVal 1114 } 1115 i, err := strconv.ParseInt(v, 10, 64) 1116 if err != nil { 1117 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1118 } 1119 return i 1120} 1121 1122func getEnvWithDefaultInt(k string, defaultVal int) int { 1123 v := os.Getenv(k) 1124 if v == "" { 1125 return defaultVal 1126 } 1127 i, err := strconv.Atoi(k) 1128 if err != nil { 1129 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1130 } 1131 return i 1132} 1133 1134func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1135 v := os.Getenv(k) 1136 if v == "" { 1137 return defaultVal 1138 } 1139 i, err := strconv.ParseUint(v, 10, 64) 1140 if err != nil { 1141 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1142 } 1143 return i 1144} 1145 1146func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1147 v := os.Getenv(k) 1148 if v == "" { 1149 return defaultVal 1150 } 1151 f, err := strconv.ParseFloat(v, 64) 1152 if err != nil { 1153 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1154 } 1155 return f 1156} 1157 1158func getEnvWithDefaultString(k string, defaultVal string) string { 1159 v := os.Getenv(k) 1160 if v == "" { 1161 return defaultVal 1162 } 1163 return v 1164} 1165 1166func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1167 v := os.Getenv(k) 1168 if v == "" { 1169 return defaultVal 1170 } 1171 1172 d, err := time.ParseDuration(v) 1173 if err != nil { 1174 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1175 } 1176 return d 1177} 1178 1179func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1180 set := map[string]struct{}{} 1181 for _, v := range strings.Split(os.Getenv(k), ",") { 1182 v = strings.TrimSpace(v) 1183 if v != "" { 1184 set[v] = struct{}{} 1185 } 1186 } 1187 return set 1188} 1189 1190func joinStringSet(set map[string]struct{}, sep string) string { 1191 var xs []string 1192 for x := range set { 1193 xs = append(xs, x) 1194 } 1195 1196 return strings.Join(xs, sep) 1197} 1198 1199func setCompoundShardCounter(indexDir string) { 1200 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt")) 1201 if err != nil { 1202 log.Printf("setCompoundShardCounter: %s\n", err) 1203 return 1204 } 1205 metricNumberCompoundShards.Set(float64(len(fns))) 1206} 1207 1208func rootCmd() *ffcli.Command { 1209 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1210 conf := rootConfig{ 1211 Main: true, 1212 } 1213 conf.registerRootFlags(rootFs) 1214 1215 return &ffcli.Command{ 1216 FlagSet: rootFs, 1217 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1218 Subcommands: []*ffcli.Command{debugCmd()}, 1219 Exec: func(ctx context.Context, args []string) error { 1220 return startServer(conf) 1221 }, 1222 } 1223} 1224 1225type rootConfig struct { 1226 // Main is true if this rootConfig is for our main long running command (the 1227 // indexserver). Debug commands should not set this value. This is used to 1228 // determine if we need to run tmpfriend. 1229 Main bool 1230 1231 root string 1232 interval time.Duration 1233 index string 1234 indexConcurrency int64 1235 listen string 1236 hostname string 1237 cpuFraction float64 1238 blockProfileRate int 1239 1240 // config values related to shard merging 1241 disableShardMerging bool 1242 vacuumInterval time.Duration 1243 mergeInterval time.Duration 1244 targetSize int64 1245 minSize int64 1246 minAgeDays int 1247 1248 // config values related to backoff indexing repos with one or more consecutive failures 1249 backoffDuration time.Duration 1250 maxBackoffDuration time.Duration 1251} 1252 1253func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1254 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1255 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1256 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently") 1257 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1258 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1259 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1260 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1261 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate") 1262 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1263 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1264 1265 // flags related to shard merging 1266 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging") 1267 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1268 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1269 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 1270 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 1271 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1272} 1273 1274func startServer(conf rootConfig) error { 1275 s, err := newServer(conf) 1276 if err != nil { 1277 return err 1278 } 1279 1280 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate) 1281 setCompoundShardCounter(s.IndexDir) 1282 1283 if conf.listen != "" { 1284 1285 mux := http.NewServeMux() 1286 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1287 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1288 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1289 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1290 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1291 }...) 1292 s.addDebugHandlers(mux) 1293 1294 go func() { 1295 debug.Printf("serving HTTP on %s", conf.listen) 1296 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1297 }() 1298 1299 // Serve mux on a unix domain socket on a best-effort-basis so that 1300 // webserver can call the endpoints via the shared filesystem. 1301 // 1302 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1303 // on the socket due to permission errors. See 1304 // https://github.com/docker/for-mac/issues/6239 1305 go func() { 1306 serveHTTPOverSocket := func() error { 1307 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1308 // We cannot bind a socket to an existing pathname. 1309 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1310 return fmt.Errorf("error removing socket file: %s", socket) 1311 } 1312 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1313 // unixpacket). 1314 l, err := net.Listen("unix", socket) 1315 if err != nil { 1316 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1317 } 1318 // Indexserver (root) and webserver (Sourcegraph) run with 1319 // different users. Per default, the socket is created with 1320 // permission 755 (root root), which doesn't let webserver write to 1321 // it. 1322 // 1323 // See https://github.com/golang/go/issues/11822 for more context. 1324 if err := os.Chmod(socket, 0o777); err != nil { 1325 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1326 } 1327 debug.Printf("serving HTTP on %s", socket) 1328 return http.Serve(l, mux) 1329 } 1330 debug.Print(serveHTTPOverSocket()) 1331 }() 1332 } 1333 1334 oc := &ownerChecker{ 1335 Path: filepath.Join(conf.index, "owner.txt"), 1336 Hostname: conf.hostname, 1337 } 1338 go oc.Run() 1339 1340 logger := sglog.Scoped("metricsRegistration") 1341 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1342 1343 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1344 prometheus.DefaultRegisterer.MustRegister(c) 1345 1346 s.Run() 1347 return nil 1348} 1349 1350func newServer(conf rootConfig) (*Server, error) { 1351 logger := sglog.Scoped("server") 1352 1353 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1354 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1355 } 1356 if conf.index == "" { 1357 return nil, fmt.Errorf("must set -index") 1358 } 1359 if conf.root == "" { 1360 return nil, fmt.Errorf("must set -sourcegraph_url") 1361 } 1362 rootURL, err := url.Parse(conf.root) 1363 if err != nil { 1364 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1365 } 1366 1367 rootURL = addDefaultPort(rootURL) 1368 1369 // Tune GOMAXPROCS to match Linux container CPU quota. 1370 _, _ = maxprocs.Set() 1371 1372 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler. 1373 // The block profiler is disabled by default and should be enabled with care in production 1374 runtime.SetBlockProfileRate(conf.blockProfileRate) 1375 1376 // Automatically prepend our own path at the front, to minimize 1377 // required configuration. 1378 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1379 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1380 } 1381 1382 if _, err := os.Stat(conf.index); err != nil { 1383 if err := os.MkdirAll(conf.index, 0o755); err != nil { 1384 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1385 } 1386 } 1387 1388 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1389 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1390 } 1391 1392 if srcLogLevelIsDebug() { 1393 debug = log.New(os.Stderr, "", log.LstdFlags) 1394 } 1395 1396 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1397 if len(reposWithSeparateIndexingMetrics) > 0 { 1398 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1399 } 1400 1401 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1402 if len(deltaBuildRepositoriesAllowList) > 0 { 1403 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1404 } 1405 1406 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1407 if deltaShardNumberFallbackThreshold > 0 { 1408 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1409 } else { 1410 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1411 } 1412 1413 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1414 if len(reposShouldSkipSymbolsCalculation) > 0 { 1415 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1416 } 1417 1418 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1419 if indexingTimeout != defaultIndexingTimeout { 1420 debug.Printf("using configured indexing timeout: %s", indexingTimeout) 1421 } 1422 1423 var sg Sourcegraph 1424 if rootURL.IsAbs() { 1425 var batchSize int 1426 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1427 batchSize, err = strconv.Atoi(v) 1428 if err != nil { 1429 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1430 } 1431 } 1432 1433 opts := []SourcegraphClientOption{ 1434 WithBatchSize(batchSize), 1435 } 1436 1437 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1438 client, err := dialGRPCClient(rootURL.Host, logger) 1439 if err != nil { 1440 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1441 } 1442 1443 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...) 1444 1445 } else { 1446 sg = sourcegraphFake{ 1447 RootDir: rootURL.String(), 1448 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1449 } 1450 } 1451 1452 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1453 if cpuCount < 1 { 1454 cpuCount = 1 1455 } 1456 1457 if conf.indexConcurrency < 1 { 1458 conf.indexConcurrency = 1 1459 } else if conf.indexConcurrency > int64(cpuCount) { 1460 conf.indexConcurrency = int64(cpuCount) 1461 } 1462 1463 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1464 1465 return &Server{ 1466 logger: logger, 1467 Sourcegraph: sg, 1468 IndexDir: conf.index, 1469 IndexConcurrency: int(conf.indexConcurrency), 1470 Interval: conf.interval, 1471 CPUCount: cpuCount, 1472 queue: *q, 1473 shardMerging: !conf.disableShardMerging, 1474 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1475 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1476 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1477 hostname: conf.hostname, 1478 mergeOpts: mergeOpts{ 1479 vacuumInterval: conf.vacuumInterval, 1480 mergeInterval: conf.mergeInterval, 1481 targetSizeBytes: conf.targetSize * 1024 * 1024, 1482 minSizeBytes: conf.minSize * 1024 * 1024, 1483 minAgeDays: conf.minAgeDays, 1484 }, 1485 timeout: indexingTimeout, 1486 }, err 1487} 1488 1489// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1490// for the indexed-search-configuration gRPC service. 1491// 1492// The default backoff strategy is modeled after the default settings used by 1493// retryablehttp.DefaultClient. 1494// 1495// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1496// - Unavailable 1497// - Aborted 1498// 1499//go:embed default_grpc_service_configuration.json 1500var defaultGRPCServiceConfigurationJSON string 1501 1502func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1503 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1504 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1505 return invoker(ctx, method, req, reply, cc, opts...) 1506 } 1507} 1508 1509func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1510 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1511 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1512 return streamer(ctx, desc, cc, method, opts...) 1513 } 1514} 1515 1516// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1517// This can be overridden by providing custom Server/Dial options. 1518const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1519 1520func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) { 1521 metrics := mustGetClientMetrics() 1522 1523 // If the service seems to be unavailable, this 1524 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1525 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1526 retryOpts := []retry.CallOption{ 1527 retry.WithMax(5), 1528 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1529 retry.WithCodes(codes.Unavailable), 1530 } 1531 1532 opts := []grpc.DialOption{ 1533 grpc.WithTransportCredentials(insecure.NewCredentials()), 1534 grpc.WithChainStreamInterceptor( 1535 metrics.StreamClientInterceptor(), 1536 messagesize.StreamClientInterceptor, 1537 internalActorStreamInterceptor(), 1538 internalerrs.LoggingStreamClientInterceptor(logger), 1539 internalerrs.PrometheusStreamClientInterceptor, 1540 retry.StreamClientInterceptor(retryOpts...), 1541 ), 1542 grpc.WithChainUnaryInterceptor( 1543 metrics.UnaryClientInterceptor(), 1544 messagesize.UnaryClientInterceptor, 1545 internalActorUnaryInterceptor(), 1546 internalerrs.LoggingUnaryClientInterceptor(logger), 1547 internalerrs.PrometheusUnaryClientInterceptor, 1548 retry.UnaryClientInterceptor(retryOpts...), 1549 ), 1550 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1551 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1552 } 1553 1554 opts = append(opts, additionalOpts...) 1555 1556 // Ensure that the message size options are set last, so they override any other 1557 // client-specific options that tweak the message size. 1558 // 1559 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1560 // take precedence over everything else with a uniform size setting that's easy to reason about. 1561 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1562 1563 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1564 // This is done lazily, so we can provide the client to use regardless of 1565 // whether we enabled gRPC or not initially. 1566 cc, err := grpc.Dial(addr, opts...) 1567 if err != nil { 1568 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1569 } 1570 1571 client := proto.NewZoektConfigurationServiceClient(cc) 1572 return client, nil 1573} 1574 1575// mustGetClientMetrics returns a singleton instance of the client metrics 1576// that are shared across all gRPC clients that this process creates. 1577// 1578// This function panics if the metrics cannot be registered with the default 1579// Prometheus registry. 1580func mustGetClientMetrics() *grpcprom.ClientMetrics { 1581 clientMetricsOnce.Do(func() { 1582 clientMetrics = grpcprom.NewClientMetrics( 1583 grpcprom.WithClientCounterOptions(), 1584 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 1585 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 1586 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 1587 ) 1588 1589 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 1590 }) 1591 1592 return clientMetrics 1593} 1594 1595// addDefaultPort adds a default port to a URL if one is not specified. 1596// 1597// If the URL scheme is "http" and no port is specified, "80" is used. 1598// If the scheme is "https", "443" is used. 1599// 1600// The original URL is not mutated. A copy is modified and returned. 1601func addDefaultPort(original *url.URL) *url.URL { 1602 if original == nil { 1603 return nil // don't panic 1604 } 1605 1606 if !original.IsAbs() { 1607 return original // don't do anything if the URL doesn't look like a remote URL 1608 } 1609 1610 if original.Scheme == "http" && original.Port() == "" { 1611 u := cloneURL(original) 1612 u.Host = net.JoinHostPort(u.Host, "80") 1613 return u 1614 } 1615 1616 if original.Scheme == "https" && original.Port() == "" { 1617 u := cloneURL(original) 1618 u.Host = net.JoinHostPort(u.Host, "443") 1619 return u 1620 } 1621 1622 return original 1623} 1624 1625// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1626// This is copied from net/http/clone.go 1627func cloneURL(u *url.URL) *url.URL { 1628 if u == nil { 1629 return nil 1630 } 1631 u2 := new(url.URL) 1632 *u2 = *u 1633 if u.User != nil { 1634 u2.User = new(url.Userinfo) 1635 *u2.User = *u.User 1636 } 1637 return u2 1638} 1639 1640func main() { 1641 liblog := sglog.Init(sglog.Resource{ 1642 Name: "zoekt-indexserver", 1643 Version: zoekt.Version, 1644 InstanceID: zoekt.HostnameBestEffort(), 1645 }) 1646 defer liblog.Sync() 1647 1648 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1649 log.Fatal(err) 1650 } 1651} 1652 1653// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1654// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1655// 1656// An error is returned of the provided environment variables fails to parse as a boolean. 1657func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1658 for _, envVar := range envVarNames { 1659 v := os.Getenv(envVar) 1660 if v == "" { 1661 continue 1662 } 1663 1664 b, err := strconv.ParseBool(v) 1665 if err != nil { 1666 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1667 } 1668 1669 return b, nil 1670 } 1671 1672 return defaultBool, nil 1673}