fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled 2// repositories on sourcegraph 3package main 4 5import ( 6 "bytes" 7 "context" 8 _ "embed" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "html/template" 14 "io" 15 "io/fs" 16 "log" 17 "math" 18 "math/rand" 19 "net" 20 "net/http" 21 "net/url" 22 "os" 23 "os/exec" 24 "os/signal" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strconv" 29 "strings" 30 "sync" 31 "text/tabwriter" 32 "time" 33 34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 36 "github.com/peterbourgon/ff/v3/ffcli" 37 "github.com/prometheus/client_golang/prometheus" 38 "github.com/prometheus/client_golang/prometheus/promauto" 39 sglog "github.com/sourcegraph/log" 40 "github.com/sourcegraph/mountinfo" 41 "go.uber.org/automaxprocs/maxprocs" 42 "golang.org/x/net/trace" 43 "golang.org/x/sys/unix" 44 "google.golang.org/grpc" 45 "google.golang.org/grpc/codes" 46 "google.golang.org/grpc/credentials/insecure" 47 "google.golang.org/grpc/metadata" 48 49 "github.com/sourcegraph/zoekt" 50 "github.com/sourcegraph/zoekt/build" 51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 52 "github.com/sourcegraph/zoekt/debugserver" 53 "github.com/sourcegraph/zoekt/grpc/internalerrs" 54 "github.com/sourcegraph/zoekt/grpc/messagesize" 55 "github.com/sourcegraph/zoekt/internal/profiler" 56) 57 58var ( 59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 60 Name: "resolve_revisions_seconds", 61 Help: "A histogram of latencies for resolving all repository revisions.", 62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 63 }) 64 65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 66 Name: "resolve_revision_seconds", 67 Help: "A histogram of latencies for resolving a repository revision.", 68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 69 }, []string{"success"}) // success=true|false 70 71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 72 Name: "get_index_options_total", 73 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 74 }) 75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 76 Name: "get_index_options_error_total", 77 Help: "The total number of times we failed to get index options for a repository.", 78 }) 79 80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 81 Name: "index_repo_seconds", 82 Help: "A histogram of latencies for indexing a repository.", 83 Buckets: prometheus.ExponentialBucketsRange( 84 (100 * time.Millisecond).Seconds(), 85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 86 20), 87 }, []string{ 88 "state", // state is an indexState 89 "name", // name of the repository that was indexed 90 }) 91 92 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 93 Name: "index_fetch_seconds", 94 Help: "A histogram of latencies for fetching a repository.", 95 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 96 }, []string{ 97 "success", // true|false 98 "name", // the name of the repository that the commits were fetched from 99 }) 100 101 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 102 Name: "index_incremental_index_state", 103 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 104 }, []string{"state"}) // state is build.IndexState 105 106 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 107 Name: "index_num_indexed", 108 Help: "Number of indexed repos by code host", 109 }) 110 111 metricNumAssigned = promauto.NewGauge(prometheus.GaugeOpts{ 112 Name: "index_num_assigned", 113 Help: "Number of repos assigned to this indexer by code host", 114 }) 115 116 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 117 Name: "index_failing_total", 118 Help: "Counts failures to index (indexing activity, should be used with rate())", 119 }) 120 121 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 122 Name: "index_indexing_total", 123 Help: "Counts indexings (indexing activity, should be used with rate())", 124 }) 125 126 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 127 Name: "index_num_stopped_tracking_total", 128 Help: "Counts the number of repos we stopped tracking.", 129 }) 130 131 clientMetricsOnce sync.Once 132 clientMetrics *grpcprom.ClientMetrics 133) 134 135// set of repositories that we want to capture separate indexing metrics for 136var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 137 138type indexState string 139 140const ( 141 indexStateFail indexState = "fail" 142 indexStateSuccess indexState = "success" 143 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 144 indexStateNoop indexState = "noop" // We didn't need to update index 145 indexStateEmpty indexState = "empty" // index is empty (empty repo) 146) 147 148// Server is the main functionality of zoekt-sourcegraph-indexserver. It 149// exists to conveniently use all the options passed in via func main. 150type Server struct { 151 logger sglog.Logger 152 153 Sourcegraph Sourcegraph 154 BatchSize int 155 156 // IndexDir is the index directory to use. 157 IndexDir string 158 159 // IndexConcurrency is the number of repositories we index at once. 160 IndexConcurrency int 161 162 // Interval is how often we sync with Sourcegraph. 163 Interval time.Duration 164 165 // CPUCount is the number of CPUs to use for indexing shards. 166 CPUCount int 167 168 queue Queue 169 170 // muIndexDir protects the index directory from concurrent access. 171 muIndexDir indexMutex 172 173 // If true, shard merging is enabled. 174 shardMerging bool 175 176 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 177 // use delta-builds for instead of normal builds 178 deltaBuildRepositoriesAllowList map[string]struct{} 179 180 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 181 // before attempting a delta build. 182 deltaShardNumberFallbackThreshold uint64 183 184 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 185 // we skip calculating symbols metadata for during builds 186 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 187 188 hostname string 189 190 mergeOpts mergeOpts 191 192 // timeout defines how long the index server waits before killing an indexing job. 193 timeout time.Duration 194} 195 196var debug = log.New(io.Discard, "", log.LstdFlags) 197 198// our index commands should output something every 100mb they process. 199// 200// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 201// time." famous last words. A client was indexing a monorepo with 42 202// cores... 5m was not enough. 203const noOutputTimeout = 30 * time.Minute 204 205func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 206 out := &synchronizedBuffer{} 207 cmd.Stdout = out 208 cmd.Stderr = out 209 210 tr.LazyPrintf("%s", cmd.Args) 211 212 defer func() { 213 if err != nil { 214 outS := out.String() 215 tr.LazyPrintf("failed: %v", err) 216 tr.LazyPrintf("output: %s", out) 217 tr.SetError() 218 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 219 } 220 }() 221 222 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 223 224 if err := cmd.Start(); err != nil { 225 return err 226 } 227 228 errC := make(chan error) 229 go func() { 230 errC <- cmd.Wait() 231 }() 232 233 // This channel is set after we have sent sigquit. It allows us to follow up 234 // with a sigkill if the process doesn't quit after sigquit. 235 kill := make(<-chan time.Time) 236 237 lastLen := 0 238 for { 239 select { 240 case <-time.After(noOutputTimeout): 241 // Periodically check if we have had output. If not kill the process. 242 if out.Len() != lastLen { 243 lastLen = out.Len() 244 log.Printf("still running %s", cmd.Args) 245 } else { 246 // Send quit (C-\) first so we get a stack dump. 247 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 248 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 249 log.Println("quit failed:", err) 250 } 251 252 // send sigkill if still running in 10s 253 kill = time.After(10 * time.Second) 254 } 255 256 case <-kill: 257 log.Printf("still running, killing %s", cmd.Args) 258 if err := cmd.Process.Kill(); err != nil { 259 log.Println("kill failed:", err) 260 } 261 262 case err := <-errC: 263 if err != nil { 264 return err 265 } 266 267 tr.LazyPrintf("success") 268 return nil 269 } 270 } 271} 272 273// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 274// monitor the buffer while it is being written to. 275type synchronizedBuffer struct { 276 mu sync.Mutex 277 b bytes.Buffer 278} 279 280func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 281 sb.mu.Lock() 282 defer sb.mu.Unlock() 283 return sb.b.Write(p) 284} 285 286func (sb *synchronizedBuffer) Len() int { 287 sb.mu.Lock() 288 defer sb.mu.Unlock() 289 return sb.b.Len() 290} 291 292func (sb *synchronizedBuffer) String() string { 293 sb.mu.Lock() 294 defer sb.mu.Unlock() 295 return sb.b.String() 296} 297 298// pauseFileName if present in IndexDir will stop index jobs from 299// running. This is to make it possible to experiment with the content of the 300// IndexDir without the indexserver writing to it. 301const pauseFileName = "PAUSE" 302 303// Run the sync loop. This blocks forever. 304func (s *Server) Run() { 305 removeIncompleteShards(s.IndexDir) 306 307 // Start a goroutine which updates the queue with commits to index. 308 go func() { 309 // We update the list of indexed repos every Interval. To speed up manual 310 // testing we also listen for SIGUSR1 to trigger updates. 311 // 312 // "pkill -SIGUSR1 zoekt-sourcegra" 313 for range jitterTicker(s.Interval, unix.SIGUSR1) { 314 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 315 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 316 continue 317 } 318 319 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 320 if err != nil { 321 log.Printf("error listing repos: %s", err) 322 continue 323 } 324 325 debug.Printf("updating index queue with %d repositories", len(repos.IDs)) 326 327 // Stop indexing repos we don't need to track anymore 328 removed := s.queue.MaybeRemoveMissing(repos.IDs) 329 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 330 if len(removed) > 0 { 331 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 332 } 333 334 cleanupDone := make(chan struct{}) 335 go func() { 336 defer close(cleanupDone) 337 s.muIndexDir.Global(func() { 338 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 339 }) 340 }() 341 342 repos.IterateIndexOptions(s.queue.AddOrUpdate) 343 344 // IterateIndexOptions will only iterate over repositories that have 345 // changed since we last called list. However, we want to add all IDs 346 // back onto the queue just to check that what is on disk is still 347 // correct. This will use the last IndexOptions we stored in the 348 // queue. The repositories not on the queue (missing) need a forced 349 // fetch of IndexOptions. 350 missing := s.queue.Bump(repos.IDs) 351 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 352 353 setCompoundShardCounter(s.IndexDir) 354 355 <-cleanupDone 356 } 357 }() 358 359 go func() { 360 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 361 if s.shardMerging { 362 s.vacuum() 363 } 364 } 365 }() 366 367 go func() { 368 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 369 if s.shardMerging { 370 s.doMerge() 371 } 372 } 373 }() 374 375 for i := 0; i < s.IndexConcurrency; i++ { 376 go s.processQueue() 377 } 378 379 // block forever 380 select {} 381} 382 383// formatList returns a comma-separated list of the first min(len(v), m) items. 384func formatListUint32(v []uint32, m int) string { 385 if len(v) < m { 386 m = len(v) 387 } 388 389 sb := strings.Builder{} 390 for i := 0; i < m; i++ { 391 fmt.Fprintf(&sb, "%d, ", v[i]) 392 } 393 394 if len(v) > m { 395 sb.WriteString("...") 396 } 397 398 return strings.TrimRight(sb.String(), ", ") 399} 400 401func (s *Server) processQueue() { 402 for { 403 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 404 time.Sleep(time.Second) 405 continue 406 } 407 408 opts, ok := s.queue.Pop() 409 if !ok { 410 time.Sleep(time.Second) 411 continue 412 } 413 414 args := s.indexArgs(opts) 415 416 ran := s.muIndexDir.With(opts.Name, func() { 417 // only record time taken once we hold the lock. This avoids us 418 // recording time taken while merging/cleanup runs. 419 start := time.Now() 420 421 state, err := s.Index(args) 422 423 elapsed := time.Since(start) 424 425 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 426 427 if err != nil { 428 log.Printf("error indexing %s: %s", args.String(), err) 429 } 430 431 switch state { 432 case indexStateSuccess: 433 var branches []string 434 for _, b := range args.Branches { 435 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 436 } 437 s.logger.Info("updated index", 438 sglog.String("repo", args.Name), 439 sglog.Uint32("id", args.RepoID), 440 sglog.Strings("branches", branches), 441 sglog.Duration("duration", elapsed), 442 ) 443 case indexStateSuccessMeta: 444 log.Printf("updated meta %s in %v", args.String(), elapsed) 445 } 446 s.queue.SetIndexed(opts, state) 447 }) 448 449 if !ran { 450 // Someone else is processing the repository. We can just skip this job 451 // since the repository will be added back to the queue and we will 452 // converge to the correct behaviour. 453 debug.Printf("index job for repository already running: %s", args) 454 continue 455 } 456 } 457} 458 459// repoNameForMetric returns a normalized version of the given repository name that is 460// suitable for use with Prometheus metrics. 461func repoNameForMetric(repo string) string { 462 // Check to see if we want to be able to capture separate indexing metrics for this repository. 463 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 464 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 465 return repo 466 } 467 468 return "" 469} 470 471func batched(slice []uint32, size int) <-chan []uint32 { 472 c := make(chan []uint32) 473 go func() { 474 for len(slice) > 0 { 475 if size > len(slice) { 476 size = len(slice) 477 } 478 c <- slice[:size] 479 slice = slice[size:] 480 } 481 close(c) 482 }() 483 return c 484} 485 486// jitterTicker returns a ticker which ticks with a jitter. Each tick is 487// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 488// 489// sig is a list of signals which also cause the ticker to fire. This is a 490// convenience to allow manually triggering of the ticker. 491func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 492 ticker := make(chan struct{}) 493 494 go func() { 495 for { 496 ticker <- struct{}{} 497 ns := int64(d) 498 jitter := rand.Int63n(ns) 499 time.Sleep(time.Duration(ns/2 + jitter)) 500 } 501 }() 502 503 go func() { 504 if len(sig) == 0 { 505 return 506 } 507 508 c := make(chan os.Signal, 1) 509 signal.Notify(c, sig...) 510 for range c { 511 ticker <- struct{}{} 512 } 513 }() 514 515 return ticker 516} 517 518// Index starts an index job for repo name at commit. 519func (s *Server) Index(args *indexArgs) (state indexState, err error) { 520 tr := trace.New("index", args.Name) 521 522 defer func() { 523 if err != nil { 524 tr.SetError() 525 tr.LazyPrintf("error: %v", err) 526 state = indexStateFail 527 metricFailingTotal.Inc() 528 } 529 tr.LazyPrintf("state: %s", state) 530 tr.Finish() 531 }() 532 533 tr.LazyPrintf("branches: %v", args.Branches) 534 535 if len(args.Branches) == 0 { 536 return indexStateEmpty, createEmptyShard(args) 537 } 538 539 repositoryName := args.Name 540 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 541 tr.LazyPrintf("marking this repository for delta build") 542 args.UseDelta = true 543 } 544 545 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 546 547 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 548 tr.LazyPrintf("skipping symbols calculation") 549 args.Symbols = false 550 } 551 552 reason := "forced" 553 554 if args.Incremental { 555 bo := args.BuildOptions() 556 bo.SetDefaults() 557 incrementalState, fn := bo.IndexState() 558 reason = string(incrementalState) 559 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 560 561 switch incrementalState { 562 case build.IndexStateEqual: 563 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn) 564 return indexStateNoop, nil 565 566 case build.IndexStateMeta: 567 log.Printf("updating index.meta %s", args.String()) 568 569 if err := mergeMeta(bo); err != nil { 570 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 571 } else { 572 return indexStateSuccessMeta, nil 573 } 574 575 case build.IndexStateCorrupt: 576 log.Printf("falling back to full update: corrupt index: %s", args.String()) 577 } 578 } 579 580 log.Printf("updating index %s reason=%s", args.String(), reason) 581 582 metricIndexingTotal.Inc() 583 c := gitIndexConfig{ 584 runCmd: func(cmd *exec.Cmd) error { 585 return s.loggedRun(tr, cmd) 586 }, 587 588 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 589 return args.BuildOptions().FindRepositoryMetadata() 590 }, 591 timeout: s.timeout, 592 } 593 594 err = gitIndex(c, args, s.Sourcegraph, s.logger) 595 if err != nil { 596 return indexStateFail, err 597 } 598 599 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 600 s.logger.Error("failed to update index status", 601 sglog.String("repo", args.Name), 602 sglog.Uint32("id", args.RepoID), 603 sglogBranches("branches", args.Branches), 604 sglog.Error(err), 605 ) 606 } 607 608 return indexStateSuccess, nil 609} 610 611// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 612// it can update the zoekt_repos table. 613func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 614 // We need to read from disk for IndexTime. 615 _, metadata, ok, err := c.findRepositoryMetadata(args) 616 if err != nil { 617 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 618 } 619 if !ok { 620 return errors.New("failed to find metadata for new/updated index") 621 } 622 623 status := []indexStatus{{ 624 RepoID: args.RepoID, 625 Branches: args.Branches, 626 IndexTimeUnix: metadata.IndexTime.Unix(), 627 }} 628 if err := sg.UpdateIndexStatus(status); err != nil { 629 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 630 } 631 632 return nil 633} 634 635func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 636 ss := make([]string, len(branches)) 637 for i, b := range branches { 638 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 639 } 640 return sglog.Strings(key, ss) 641} 642 643func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 644 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0)) 645 return &indexArgs{ 646 IndexOptions: opts, 647 IndexDir: s.IndexDir, 648 Parallelism: parallelism, 649 Incremental: true, 650 651 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 652 FileLimit: 1 << 20, 653 } 654} 655 656// parallelism consults both the server flags and index options to determine the number 657// of shards to index in parallel. If the CPUCount index option is provided, it always 658// overrides the server flag. 659func (s *Server) parallelism(opts IndexOptions, maxProcs int) int { 660 var parallelism int 661 if opts.ShardConcurrency > 0 { 662 parallelism = int(opts.ShardConcurrency) 663 } else { 664 parallelism = s.CPUCount 665 } 666 667 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs 668 if parallelism > 4 * maxProcs { 669 parallelism = 4 * maxProcs 670 } 671 672 // If index concurrency is set, then divide the parallelism by the number of 673 // repos we're indexing in parallel 674 if s.IndexConcurrency > 1 { 675 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency))) 676 } 677 678 return parallelism 679} 680 681func createEmptyShard(args *indexArgs) error { 682 bo := args.BuildOptions() 683 bo.SetDefaults() 684 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 685 686 if args.Incremental && bo.IncrementalSkipIndexing() { 687 return nil 688 } 689 690 builder, err := build.NewBuilder(*bo) 691 if err != nil { 692 return err 693 } 694 return builder.Finish() 695} 696 697// addDebugHandlers adds handlers specific to indexserver. 698func (s *Server) addDebugHandlers(mux *http.ServeMux) { 699 // Sourcegraph's site admin view requires indexserver to serve it's admin view 700 // on "/". 701 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 702 703 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 704 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 705 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 706 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 707 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 708 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 709} 710 711func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 712 if r.Method != "GET" { 713 w.Header().Set("Allow", "GET") 714 w.WriteHeader(http.StatusMethodNotAllowed) 715 return 716 } 717 718 response := struct { 719 Hostname string 720 }{ 721 Hostname: s.hostname, 722 } 723 724 b, err := json.Marshal(response) 725 if err != nil { 726 http.Error(w, err.Error(), http.StatusInternalServerError) 727 return 728 } 729 730 w.Header().Set("Content-Type", "application/json; charset=utf-8") 731 w.Write(b) 732} 733 734var rootTmpl = template.Must(template.New("name").Parse(` 735<html> 736 <body> 737 <a href="debug">Debug</a><br /> 738 <a href="debug/requests">Traces</a><br /> 739 {{.IndexMsg}}<br /> 740 <br /> 741 <h3>Reindex</h3> 742 {{if .Repos}} 743 <a href="?show_repos=false">hide repos</a><br /> 744 <table style="margin-top: 20px"> 745 <th style="text-align:left">Name</th> 746 <th style="text-align:left">ID</th> 747 {{range .Repos}} 748 <tr> 749 <td>{{.Name}}</td> 750 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 751 </tr> 752 {{end}} 753 </table> 754 {{else}} 755 <a href="?show_repos=true">show repos</a><br /> 756 {{end}} 757 </body> 758</html> 759`)) 760 761func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 762 if r.Method != "GET" { 763 w.Header().Set("Allow", "GET") 764 w.WriteHeader(http.StatusMethodNotAllowed) 765 return 766 } 767 768 values := r.URL.Query() 769 770 // ?id= 771 indexMsg := "" 772 if v := values.Get("id"); v != "" { 773 id, err := strconv.Atoi(v) 774 if err != nil { 775 http.Error(w, err.Error(), http.StatusBadRequest) 776 return 777 } 778 indexMsg, _ = s.forceIndex(uint32(id)) 779 } 780 781 // ?show_repos= 782 showRepos := false 783 if v := values.Get("show_repos"); v != "" { 784 showRepos, _ = strconv.ParseBool(v) 785 } 786 787 type Repo struct { 788 ID uint32 789 Name string 790 } 791 var data struct { 792 Repos []Repo 793 IndexMsg string 794 } 795 796 data.IndexMsg = indexMsg 797 798 if showRepos { 799 s.queue.Iterate(func(opts *IndexOptions) { 800 data.Repos = append(data.Repos, Repo{ 801 ID: opts.RepoID, 802 Name: opts.Name, 803 }) 804 }) 805 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 806 } 807 808 _ = rootTmpl.Execute(w, data) 809} 810 811// handleReindex triggers a reindex asynocronously. If a reindex was triggered 812// the request returns with status 202. The caller can infer the new state of 813// the index by calling List. 814func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 815 if r.Method != http.MethodPost { 816 w.Header().Set("Allow", http.MethodPost) 817 w.WriteHeader(http.StatusMethodNotAllowed) 818 return 819 } 820 821 err := r.ParseForm() 822 if err != nil { 823 http.Error(w, err.Error(), http.StatusBadRequest) 824 return 825 } 826 827 id, err := strconv.Atoi(r.Form.Get("repo")) 828 if err != nil { 829 http.Error(w, err.Error(), http.StatusBadRequest) 830 return 831 } 832 833 go func() { s.forceIndex(uint32(id)) }() 834 835 // 202 Accepted 836 w.WriteHeader(http.StatusAccepted) 837} 838 839func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 840 withIndexed := true 841 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 842 withIndexed = b 843 } 844 845 var indexed []uint32 846 if withIndexed { 847 indexed = listIndexed(s.IndexDir) 848 } 849 850 repos, err := s.Sourcegraph.List(r.Context(), indexed) 851 if err != nil { 852 http.Error(w, err.Error(), http.StatusInternalServerError) 853 return 854 } 855 856 bw := bytes.Buffer{} 857 858 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 859 860 _, err = fmt.Fprintf(tw, "ID\tName\n") 861 if err != nil { 862 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 863 return 864 } 865 866 s.queue.mu.Lock() 867 name := "" 868 for _, id := range repos.IDs { 869 if item := s.queue.get(id); item != nil { 870 name = item.opts.Name 871 } else { 872 name = "" 873 } 874 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 875 if err != nil { 876 debug.Printf("handleDebugList: %s\n", err.Error()) 877 } 878 } 879 s.queue.mu.Unlock() 880 881 if err != nil { 882 http.Error(w, err.Error(), http.StatusInternalServerError) 883 return 884 } 885 886 err = tw.Flush() 887 if err != nil { 888 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 889 return 890 } 891 892 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 893 894 if _, err := io.Copy(w, &bw); err != nil { 895 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 896 return 897 } 898} 899 900// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 901// can run this command during periods of low usage (evenings, weekends) to 902// trigger an initial merge run. In the steady-state, merges happen rarely, even 903// on busy instances, and users can rely on automatic merging instead. 904func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 905 906 // A merge operation can take very long, depending on the number merges and the 907 // target size of the compound shards. We run the merge in the background and 908 // return immediately to the user. 909 // 910 // We track the status of the merge with metricShardMergingRunning. 911 go func() { 912 s.doMerge() 913 }() 914 _, _ = w.Write([]byte("merging enqueued\n")) 915} 916 917func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 918 indexed := listIndexed(s.IndexDir) 919 920 bw := bytes.Buffer{} 921 922 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 923 924 _, err := fmt.Fprintf(tw, "ID\tName\n") 925 if err != nil { 926 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 927 return 928 } 929 930 s.queue.mu.Lock() 931 name := "" 932 for _, id := range indexed { 933 if item := s.queue.get(id); item != nil { 934 name = item.opts.Name 935 } else { 936 name = "" 937 } 938 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 939 if err != nil { 940 debug.Printf("handleDebugIndexed: %s\n", err.Error()) 941 } 942 } 943 s.queue.mu.Unlock() 944 945 if err != nil { 946 http.Error(w, err.Error(), http.StatusInternalServerError) 947 return 948 } 949 950 err = tw.Flush() 951 if err != nil { 952 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 953 return 954 } 955 956 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 957 958 if _, err := io.Copy(w, &bw); err != nil { 959 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 960 return 961 } 962} 963 964// forceIndex will run the index job for repo name now. It will return always 965// return a string explaining what it did, even if it failed. 966func (s *Server) forceIndex(id uint32) (string, error) { 967 var opts IndexOptions 968 var err error 969 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 970 opts = o 971 }, func(_ uint32, e error) { 972 err = e 973 }, id) 974 if err != nil { 975 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 976 } 977 978 args := s.indexArgs(opts) 979 args.Incremental = false // force re-index 980 981 var state indexState 982 ran := s.muIndexDir.With(opts.Name, func() { 983 state, err = s.Index(args) 984 }) 985 if !ran { 986 return fmt.Sprintf("index job for repository already running: %s", args), nil 987 } 988 if err != nil { 989 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 990 } 991 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 992} 993 994func listIndexed(indexDir string) []uint32 { 995 index := getShards(indexDir) 996 metricNumIndexed.Set(float64(len(index))) 997 repoIDs := make([]uint32, 0, len(index)) 998 for id := range index { 999 repoIDs = append(repoIDs, id) 1000 } 1001 sort.Slice(repoIDs, func(i, j int) bool { 1002 return repoIDs[i] < repoIDs[j] 1003 }) 1004 return repoIDs 1005} 1006 1007// setupTmpDir sets up a temporary directory on the same volume as the 1008// indexes. 1009// 1010// If main is true we will delete older temp directories left around. main is 1011// false when this is a debug command. 1012func setupTmpDir(logger sglog.Logger, main bool, index string) error { 1013 // change the target tmp directory depending on if it's our main daemon or a 1014 // debug sub command. 1015 dir := ".indexserver.debug.tmp" 1016 if main { 1017 dir = ".indexserver.tmp" 1018 } 1019 1020 tmpRoot := filepath.Join(index, dir) 1021 1022 if main { 1023 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1024 err := os.RemoveAll(tmpRoot) 1025 if err != nil { 1026 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1027 } 1028 } 1029 1030 if err := os.MkdirAll(tmpRoot, 0755); err != nil { 1031 return err 1032 } 1033 1034 return os.Setenv("TMPDIR", tmpRoot) 1035} 1036 1037func printMetaData(fn string) error { 1038 repo, indexMeta, err := zoekt.ReadMetadataPath(fn) 1039 if err != nil { 1040 return err 1041 } 1042 1043 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1044 if err != nil { 1045 return err 1046 } 1047 1048 err = json.NewEncoder(os.Stdout).Encode(repo) 1049 if err != nil { 1050 return err 1051 } 1052 return nil 1053} 1054 1055func printShardStats(fn string) error { 1056 f, err := os.Open(fn) 1057 if err != nil { 1058 return err 1059 } 1060 1061 iFile, err := zoekt.NewIndexFile(f) 1062 if err != nil { 1063 return err 1064 } 1065 1066 return zoekt.PrintNgramStats(iFile) 1067} 1068 1069func srcLogLevelIsDebug() bool { 1070 lvl := os.Getenv(sglog.EnvLogLevel) 1071 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1072} 1073 1074func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1075 v := os.Getenv(k) 1076 if v == "" { 1077 return defaultVal 1078 } 1079 i, err := strconv.ParseInt(v, 10, 64) 1080 if err != nil { 1081 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1082 } 1083 return i 1084} 1085 1086func getEnvWithDefaultInt(k string, defaultVal int) int { 1087 v := os.Getenv(k) 1088 if v == "" { 1089 return defaultVal 1090 } 1091 i, err := strconv.Atoi(k) 1092 if err != nil { 1093 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1094 } 1095 return i 1096} 1097 1098func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1099 v := os.Getenv(k) 1100 if v == "" { 1101 return defaultVal 1102 } 1103 i, err := strconv.ParseUint(v, 10, 64) 1104 if err != nil { 1105 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1106 } 1107 return i 1108} 1109 1110func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1111 v := os.Getenv(k) 1112 if v == "" { 1113 return defaultVal 1114 } 1115 f, err := strconv.ParseFloat(v, 64) 1116 if err != nil { 1117 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1118 } 1119 return f 1120} 1121 1122func getEnvWithDefaultString(k string, defaultVal string) string { 1123 v := os.Getenv(k) 1124 if v == "" { 1125 return defaultVal 1126 } 1127 return v 1128} 1129 1130func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1131 v := os.Getenv(k) 1132 if v == "" { 1133 return defaultVal 1134 } 1135 1136 d, err := time.ParseDuration(v) 1137 if err != nil { 1138 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1139 } 1140 return d 1141} 1142 1143func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1144 v := os.Getenv(k) 1145 if v == "" { 1146 return defaultVal 1147 } 1148 1149 b, err := strconv.ParseBool(v) 1150 if err != nil { 1151 log.Fatalf("error parsing ENV %s to bool: %s", k, err) 1152 } 1153 return b 1154} 1155 1156func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1157 set := map[string]struct{}{} 1158 for _, v := range strings.Split(os.Getenv(k), ",") { 1159 v = strings.TrimSpace(v) 1160 if v != "" { 1161 set[v] = struct{}{} 1162 } 1163 } 1164 return set 1165} 1166 1167func joinStringSet(set map[string]struct{}, sep string) string { 1168 var xs []string 1169 for x := range set { 1170 xs = append(xs, x) 1171 } 1172 1173 return strings.Join(xs, sep) 1174} 1175 1176func setCompoundShardCounter(indexDir string) { 1177 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt")) 1178 if err != nil { 1179 log.Printf("setCompoundShardCounter: %s\n", err) 1180 return 1181 } 1182 metricNumberCompoundShards.Set(float64(len(fns))) 1183} 1184 1185func rootCmd() *ffcli.Command { 1186 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1187 conf := rootConfig{ 1188 Main: true, 1189 } 1190 conf.registerRootFlags(rootFs) 1191 1192 return &ffcli.Command{ 1193 FlagSet: rootFs, 1194 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1195 Subcommands: []*ffcli.Command{debugCmd()}, 1196 Exec: func(ctx context.Context, args []string) error { 1197 return startServer(conf) 1198 }, 1199 } 1200} 1201 1202type rootConfig struct { 1203 // Main is true if this rootConfig is for our main long running command (the 1204 // indexserver). Debug commands should not set this value. This is used to 1205 // determine if we need to run tmpfriend. 1206 Main bool 1207 1208 root string 1209 interval time.Duration 1210 index string 1211 indexConcurrency int64 1212 listen string 1213 hostname string 1214 cpuFraction float64 1215 blockProfileRate int 1216 1217 // config values related to shard merging 1218 vacuumInterval time.Duration 1219 mergeInterval time.Duration 1220 targetSize int64 1221 minSize int64 1222 minAgeDays int 1223 maxPriority float64 1224 1225 // config values related to backoff indexing repos with one or more consecutive failures 1226 backoffDuration time.Duration 1227 maxBackoffDuration time.Duration 1228 1229 // useGRPC is true if we should use the gRPC API to talk to Sourcegraph. 1230 useGRPC bool 1231} 1232 1233func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1234 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1235 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1236 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently") 1237 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1238 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1239 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1240 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1241 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate") 1242 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1243 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1244 fs.BoolVar(&rc.useGRPC, "use_grpc", mustGetBoolFromEnvironmentVariables([]string{"GRPC_ENABLED", "SG_FEATURE_FLAG_GRPC"}, true), "use the gRPC API to talk to Sourcegraph") 1245 1246 // flags related to shard merging 1247 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1248 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1249 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 1250 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 1251 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1252 fs.Float64Var(&rc.maxPriority, "merge_max_priority", getEnvWithDefaultFloat64("SRC_MERGE_MAX_PRIORITY", 100), "the maximum priority a shard can have to be considered for merging.") 1253} 1254 1255func startServer(conf rootConfig) error { 1256 s, err := newServer(conf) 1257 if err != nil { 1258 return err 1259 } 1260 1261 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate) 1262 setCompoundShardCounter(s.IndexDir) 1263 1264 if conf.listen != "" { 1265 1266 mux := http.NewServeMux() 1267 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1268 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1269 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1270 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1271 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1272 }...) 1273 s.addDebugHandlers(mux) 1274 1275 go func() { 1276 debug.Printf("serving HTTP on %s", conf.listen) 1277 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1278 1279 }() 1280 1281 // Serve mux on a unix domain socket on a best-effort-basis so that 1282 // webserver can call the endpoints via the shared filesystem. 1283 // 1284 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1285 // on the socket due to permission errors. See 1286 // https://github.com/docker/for-mac/issues/6239 1287 go func() { 1288 serveHTTPOverSocket := func() error { 1289 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1290 // We cannot bind a socket to an existing pathname. 1291 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1292 return fmt.Errorf("error removing socket file: %s", socket) 1293 } 1294 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1295 // unixpacket). 1296 l, err := net.Listen("unix", socket) 1297 if err != nil { 1298 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1299 } 1300 // Indexserver (root) and webserver (Sourcegraph) run with 1301 // different users. Per default, the socket is created with 1302 // permission 755 (root root), which doesn't let webserver write to 1303 // it. 1304 // 1305 // See https://github.com/golang/go/issues/11822 for more context. 1306 if err := os.Chmod(socket, 0777); err != nil { 1307 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1308 } 1309 debug.Printf("serving HTTP on %s", socket) 1310 return http.Serve(l, mux) 1311 } 1312 debug.Print(serveHTTPOverSocket()) 1313 }() 1314 } 1315 1316 oc := &ownerChecker{ 1317 Path: filepath.Join(conf.index, "owner.txt"), 1318 Hostname: conf.hostname, 1319 } 1320 go oc.Run() 1321 1322 logger := sglog.Scoped("metricsRegistration") 1323 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1324 1325 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1326 prometheus.DefaultRegisterer.MustRegister(c) 1327 1328 s.Run() 1329 return nil 1330} 1331 1332func newServer(conf rootConfig) (*Server, error) { 1333 logger := sglog.Scoped("server") 1334 1335 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1336 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1337 } 1338 if conf.index == "" { 1339 return nil, fmt.Errorf("must set -index") 1340 } 1341 if conf.root == "" { 1342 return nil, fmt.Errorf("must set -sourcegraph_url") 1343 } 1344 rootURL, err := url.Parse(conf.root) 1345 if err != nil { 1346 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1347 } 1348 1349 rootURL = addDefaultPort(rootURL) 1350 1351 // Tune GOMAXPROCS to match Linux container CPU quota. 1352 _, _ = maxprocs.Set() 1353 1354 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler. 1355 // The block profiler is disabled by default and should be enabled with care in production 1356 runtime.SetBlockProfileRate(conf.blockProfileRate) 1357 1358 // Automatically prepend our own path at the front, to minimize 1359 // required configuration. 1360 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1361 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1362 } 1363 1364 if _, err := os.Stat(conf.index); err != nil { 1365 if err := os.MkdirAll(conf.index, 0755); err != nil { 1366 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1367 } 1368 } 1369 1370 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1371 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1372 } 1373 1374 if srcLogLevelIsDebug() { 1375 debug = log.New(os.Stderr, "", log.LstdFlags) 1376 } 1377 1378 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1379 if len(reposWithSeparateIndexingMetrics) > 0 { 1380 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1381 } 1382 1383 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1384 if len(deltaBuildRepositoriesAllowList) > 0 { 1385 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1386 } 1387 1388 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1389 if deltaShardNumberFallbackThreshold > 0 { 1390 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1391 } else { 1392 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1393 } 1394 1395 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1396 if len(reposShouldSkipSymbolsCalculation) > 0 { 1397 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1398 } 1399 1400 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1401 if indexingTimeout != defaultIndexingTimeout { 1402 debug.Printf("using configured indexing timeout: %s", indexingTimeout) 1403 } 1404 1405 var sg Sourcegraph 1406 if rootURL.IsAbs() { 1407 var batchSize int 1408 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1409 batchSize, err = strconv.Atoi(v) 1410 if err != nil { 1411 return nil, fmt.Errorf("Invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1412 } 1413 } 1414 1415 opts := []SourcegraphClientOption{ 1416 WithBatchSize(batchSize), 1417 WithShouldUseGRPC(conf.useGRPC), 1418 } 1419 1420 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1421 client, err := dialGRPCClient(rootURL.Host, logger) 1422 if err != nil { 1423 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1424 } 1425 1426 opts = append(opts, WithGRPCClient(client)) 1427 sg = newSourcegraphClient(rootURL, conf.hostname, opts...) 1428 1429 } else { 1430 sg = sourcegraphFake{ 1431 RootDir: rootURL.String(), 1432 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1433 } 1434 } 1435 1436 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1437 if cpuCount < 1 { 1438 cpuCount = 1 1439 } 1440 1441 if conf.indexConcurrency < 1 { 1442 conf.indexConcurrency = 1 1443 } else if conf.indexConcurrency > int64(cpuCount) { 1444 conf.indexConcurrency = int64(cpuCount) 1445 } 1446 1447 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1448 1449 return &Server{ 1450 logger: logger, 1451 Sourcegraph: sg, 1452 IndexDir: conf.index, 1453 IndexConcurrency: int(conf.indexConcurrency), 1454 Interval: conf.interval, 1455 CPUCount: cpuCount, 1456 queue: *q, 1457 shardMerging: zoekt.ShardMergingEnabled(), 1458 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1459 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1460 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1461 hostname: conf.hostname, 1462 mergeOpts: mergeOpts{ 1463 vacuumInterval: conf.vacuumInterval, 1464 mergeInterval: conf.mergeInterval, 1465 targetSizeBytes: conf.targetSize * 1024 * 1024, 1466 minSizeBytes: conf.minSize * 1024 * 1024, 1467 minAgeDays: conf.minAgeDays, 1468 maxPriority: conf.maxPriority, 1469 }, 1470 timeout: indexingTimeout, 1471 }, err 1472} 1473 1474// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1475// for the indexed-search-configuration gRPC service. 1476// 1477// The default backoff strategy is modeled after the default settings used by 1478// retryablehttp.DefaultClient. 1479// 1480// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1481// - Unavailable 1482// - Aborted 1483// 1484//go:embed default_grpc_service_configuration.json 1485var defaultGRPCServiceConfigurationJSON string 1486 1487func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1488 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1489 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1490 return invoker(ctx, method, req, reply, cc, opts...) 1491 } 1492} 1493 1494func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1495 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1496 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1497 return streamer(ctx, desc, cc, method, opts...) 1498 } 1499} 1500 1501// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1502// This can be overridden by providing custom Server/Dial options. 1503const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1504 1505func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) { 1506 metrics := mustGetClientMetrics() 1507 1508 // If the service seems to be unavailable, this 1509 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1510 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1511 retryOpts := []retry.CallOption{ 1512 retry.WithMax(5), 1513 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1514 retry.WithCodes(codes.Unavailable), 1515 } 1516 1517 opts := []grpc.DialOption{ 1518 grpc.WithTransportCredentials(insecure.NewCredentials()), 1519 grpc.WithChainStreamInterceptor( 1520 metrics.StreamClientInterceptor(), 1521 messagesize.StreamClientInterceptor, 1522 internalActorStreamInterceptor(), 1523 internalerrs.LoggingStreamClientInterceptor(logger), 1524 internalerrs.PrometheusStreamClientInterceptor, 1525 retry.StreamClientInterceptor(retryOpts...), 1526 ), 1527 grpc.WithChainUnaryInterceptor( 1528 metrics.UnaryClientInterceptor(), 1529 messagesize.UnaryClientInterceptor, 1530 internalActorUnaryInterceptor(), 1531 internalerrs.LoggingUnaryClientInterceptor(logger), 1532 internalerrs.PrometheusUnaryClientInterceptor, 1533 retry.UnaryClientInterceptor(retryOpts...), 1534 ), 1535 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1536 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1537 } 1538 1539 opts = append(opts, additionalOpts...) 1540 1541 // Ensure that the message size options are set last, so they override any other 1542 // client-specific options that tweak the message size. 1543 // 1544 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1545 // take precedence over everything else with a uniform size setting that's easy to reason about. 1546 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1547 1548 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1549 // This is done lazily, so we can provide the client to use regardless of 1550 // whether we enabled gRPC or not initially. 1551 cc, err := grpc.Dial(addr, opts...) 1552 if err != nil { 1553 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1554 } 1555 1556 client := proto.NewZoektConfigurationServiceClient(cc) 1557 return client, nil 1558} 1559 1560// mustGetClientMetrics returns a singleton instance of the client metrics 1561// that are shared across all gRPC clients that this process creates. 1562// 1563// This function panics if the metrics cannot be registered with the default 1564// Prometheus registry. 1565func mustGetClientMetrics() *grpcprom.ClientMetrics { 1566 clientMetricsOnce.Do(func() { 1567 clientMetrics = grpcprom.NewClientMetrics( 1568 grpcprom.WithClientCounterOptions(), 1569 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 1570 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 1571 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 1572 ) 1573 1574 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 1575 }) 1576 1577 return clientMetrics 1578} 1579 1580// addDefaultPort adds a default port to a URL if one is not specified. 1581// 1582// If the URL scheme is "http" and no port is specified, "80" is used. 1583// If the scheme is "https", "443" is used. 1584// 1585// The original URL is not mutated. A copy is modified and returned. 1586func addDefaultPort(original *url.URL) *url.URL { 1587 if original == nil { 1588 return nil // don't panic 1589 } 1590 1591 if !original.IsAbs() { 1592 return original // don't do anything if the URL doesn't look like a remote URL 1593 } 1594 1595 if original.Scheme == "http" && original.Port() == "" { 1596 u := cloneURL(original) 1597 u.Host = net.JoinHostPort(u.Host, "80") 1598 return u 1599 } 1600 1601 if original.Scheme == "https" && original.Port() == "" { 1602 u := cloneURL(original) 1603 u.Host = net.JoinHostPort(u.Host, "443") 1604 return u 1605 } 1606 1607 return original 1608} 1609 1610// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1611// This is copied from net/http/clone.go 1612func cloneURL(u *url.URL) *url.URL { 1613 if u == nil { 1614 return nil 1615 } 1616 u2 := new(url.URL) 1617 *u2 = *u 1618 if u.User != nil { 1619 u2.User = new(url.Userinfo) 1620 *u2.User = *u.User 1621 } 1622 return u2 1623} 1624 1625func main() { 1626 liblog := sglog.Init(sglog.Resource{ 1627 Name: "zoekt-indexserver", 1628 Version: zoekt.Version, 1629 InstanceID: zoekt.HostnameBestEffort(), 1630 }) 1631 defer liblog.Sync() 1632 1633 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1634 log.Fatal(err) 1635 } 1636} 1637 1638// mustGetBoolFromEnvironmentVariables is like getBoolFromEnvironmentVariables, but it panics 1639// if any of the provided environment variables fails to parse as a boolean. 1640func mustGetBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) bool { 1641 value, err := getBoolFromEnvironmentVariables(envVarNames, defaultBool) 1642 if err != nil { 1643 panic(err) 1644 } 1645 1646 return value 1647} 1648 1649// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1650// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1651// 1652// An error is returned of the provided environment variables fails to parse as a boolean. 1653func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1654 for _, envVar := range envVarNames { 1655 v := os.Getenv(envVar) 1656 if v == "" { 1657 continue 1658 } 1659 1660 b, err := strconv.ParseBool(v) 1661 if err != nil { 1662 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1663 } 1664 1665 return b, nil 1666 } 1667 1668 return defaultBool, nil 1669}