fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled 2// repositories on sourcegraph 3package main 4 5import ( 6 "bytes" 7 "context" 8 _ "embed" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "html/template" 14 "io" 15 "io/fs" 16 "log" 17 "math" 18 "math/rand" 19 "net" 20 "net/http" 21 "net/url" 22 "os" 23 "os/exec" 24 "os/signal" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strconv" 29 "strings" 30 "sync" 31 "text/tabwriter" 32 "time" 33 34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 36 "github.com/peterbourgon/ff/v3/ffcli" 37 "github.com/prometheus/client_golang/prometheus" 38 "github.com/prometheus/client_golang/prometheus/promauto" 39 sglog "github.com/sourcegraph/log" 40 "github.com/sourcegraph/mountinfo" 41 "go.uber.org/automaxprocs/maxprocs" 42 "golang.org/x/net/trace" 43 "golang.org/x/sys/unix" 44 "google.golang.org/grpc" 45 "google.golang.org/grpc/codes" 46 "google.golang.org/grpc/credentials/insecure" 47 "google.golang.org/grpc/metadata" 48 49 "github.com/sourcegraph/zoekt" 50 "github.com/sourcegraph/zoekt/build" 51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 52 "github.com/sourcegraph/zoekt/debugserver" 53 "github.com/sourcegraph/zoekt/grpc/internalerrs" 54 "github.com/sourcegraph/zoekt/grpc/messagesize" 55 "github.com/sourcegraph/zoekt/internal/profiler" 56) 57 58var ( 59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 60 Name: "resolve_revisions_seconds", 61 Help: "A histogram of latencies for resolving all repository revisions.", 62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 63 }) 64 65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 66 Name: "resolve_revision_seconds", 67 Help: "A histogram of latencies for resolving a repository revision.", 68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 69 }, []string{"success"}) // success=true|false 70 71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 72 Name: "get_index_options_total", 73 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 74 }) 75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 76 Name: "get_index_options_error_total", 77 Help: "The total number of times we failed to get index options for a repository.", 78 }) 79 80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 81 Name: "index_repo_seconds", 82 Help: "A histogram of latencies for indexing a repository.", 83 Buckets: prometheus.ExponentialBucketsRange( 84 (100 * time.Millisecond).Seconds(), 85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 86 20), 87 }, []string{ 88 "state", // state is an indexState 89 "name", // name of the repository that was indexed 90 }) 91 92 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{ 93 Name: "index_indexing_delay_seconds", 94 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.", 95 Buckets: prometheus.ExponentialBuckets(60, 2, 12), // 1m -> ~3 days 96 }, []string{ 97 "state", // state is an indexState 98 "name", // the name of the repository that was indexed 99 }) 100 101 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 102 Name: "index_fetch_seconds", 103 Help: "A histogram of latencies for fetching a repository.", 104 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 105 }, []string{ 106 "success", // true|false 107 "name", // the name of the repository that the commits were fetched from 108 }) 109 110 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 111 Name: "index_incremental_index_state", 112 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 113 }, []string{"state"}) // state is build.IndexState 114 115 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 116 Name: "index_num_indexed", 117 Help: "Number of indexed repos by code host", 118 }) 119 120 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 121 Name: "index_failing_total", 122 Help: "Counts failures to index (indexing activity, should be used with rate())", 123 }) 124 125 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 126 Name: "index_indexing_total", 127 Help: "Counts indexings (indexing activity, should be used with rate())", 128 }) 129 130 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 131 Name: "index_num_stopped_tracking_total", 132 Help: "Counts the number of repos we stopped tracking.", 133 }) 134 135 clientMetricsOnce sync.Once 136 clientMetrics *grpcprom.ClientMetrics 137) 138 139// set of repositories that we want to capture separate indexing metrics for 140var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 141 142type indexState string 143 144const ( 145 indexStateFail indexState = "fail" 146 indexStateSuccess indexState = "success" 147 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 148 indexStateNoop indexState = "noop" // We didn't need to update index 149 indexStateEmpty indexState = "empty" // index is empty (empty repo) 150) 151 152// Server is the main functionality of zoekt-sourcegraph-indexserver. It 153// exists to conveniently use all the options passed in via func main. 154type Server struct { 155 logger sglog.Logger 156 157 Sourcegraph Sourcegraph 158 BatchSize int 159 160 // IndexDir is the index directory to use. 161 IndexDir string 162 163 // IndexConcurrency is the number of repositories we index at once. 164 IndexConcurrency int 165 166 // Interval is how often we sync with Sourcegraph. 167 Interval time.Duration 168 169 // CPUCount is the number of CPUs to use for indexing shards. 170 CPUCount int 171 172 queue Queue 173 174 // muIndexDir protects the index directory from concurrent access. 175 muIndexDir indexMutex 176 177 // If true, shard merging is enabled. 178 shardMerging bool 179 180 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 181 // use delta-builds for instead of normal builds 182 deltaBuildRepositoriesAllowList map[string]struct{} 183 184 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 185 // before attempting a delta build. 186 deltaShardNumberFallbackThreshold uint64 187 188 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 189 // we skip calculating symbols metadata for during builds 190 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 191 192 hostname string 193 194 mergeOpts mergeOpts 195 196 // timeout defines how long the index server waits before killing an indexing job. 197 timeout time.Duration 198} 199 200var debug = log.New(io.Discard, "", log.LstdFlags) 201 202// our index commands should output something every 100mb they process. 203// 204// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 205// time." famous last words. A client was indexing a monorepo with 42 206// cores... 5m was not enough. 207const noOutputTimeout = 30 * time.Minute 208 209func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 210 out := &synchronizedBuffer{} 211 cmd.Stdout = out 212 cmd.Stderr = out 213 214 tr.LazyPrintf("%s", cmd.Args) 215 216 defer func() { 217 if err != nil { 218 outS := out.String() 219 tr.LazyPrintf("failed: %v", err) 220 tr.LazyPrintf("output: %s", out) 221 tr.SetError() 222 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 223 } 224 }() 225 226 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 227 228 if err := cmd.Start(); err != nil { 229 return err 230 } 231 232 errC := make(chan error) 233 go func() { 234 errC <- cmd.Wait() 235 }() 236 237 // This channel is set after we have sent sigquit. It allows us to follow up 238 // with a sigkill if the process doesn't quit after sigquit. 239 kill := make(<-chan time.Time) 240 241 lastLen := 0 242 for { 243 select { 244 case <-time.After(noOutputTimeout): 245 // Periodically check if we have had output. If not kill the process. 246 if out.Len() != lastLen { 247 lastLen = out.Len() 248 log.Printf("still running %s", cmd.Args) 249 } else { 250 // Send quit (C-\) first so we get a stack dump. 251 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 252 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 253 log.Println("quit failed:", err) 254 } 255 256 // send sigkill if still running in 10s 257 kill = time.After(10 * time.Second) 258 } 259 260 case <-kill: 261 log.Printf("still running, killing %s", cmd.Args) 262 if err := cmd.Process.Kill(); err != nil { 263 log.Println("kill failed:", err) 264 } 265 266 case err := <-errC: 267 if err != nil { 268 return err 269 } 270 271 tr.LazyPrintf("success") 272 return nil 273 } 274 } 275} 276 277// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 278// monitor the buffer while it is being written to. 279type synchronizedBuffer struct { 280 mu sync.Mutex 281 b bytes.Buffer 282} 283 284func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 285 sb.mu.Lock() 286 defer sb.mu.Unlock() 287 return sb.b.Write(p) 288} 289 290func (sb *synchronizedBuffer) Len() int { 291 sb.mu.Lock() 292 defer sb.mu.Unlock() 293 return sb.b.Len() 294} 295 296func (sb *synchronizedBuffer) String() string { 297 sb.mu.Lock() 298 defer sb.mu.Unlock() 299 return sb.b.String() 300} 301 302// pauseFileName if present in IndexDir will stop index jobs from 303// running. This is to make it possible to experiment with the content of the 304// IndexDir without the indexserver writing to it. 305const pauseFileName = "PAUSE" 306 307// Run the sync loop. This blocks forever. 308func (s *Server) Run() { 309 removeIncompleteShards(s.IndexDir) 310 311 // Start a goroutine which updates the queue with commits to index. 312 go func() { 313 // We update the list of indexed repos every Interval. To speed up manual 314 // testing we also listen for SIGUSR1 to trigger updates. 315 // 316 // "pkill -SIGUSR1 zoekt-sourcegra" 317 for range jitterTicker(s.Interval, unix.SIGUSR1) { 318 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 319 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 320 continue 321 } 322 323 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 324 if err != nil { 325 log.Printf("error listing repos: %s", err) 326 continue 327 } 328 329 debug.Printf("updating index queue with %d repositories", len(repos.IDs)) 330 331 // Stop indexing repos we don't need to track anymore 332 removed := s.queue.MaybeRemoveMissing(repos.IDs) 333 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 334 if len(removed) > 0 { 335 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 336 } 337 338 cleanupDone := make(chan struct{}) 339 go func() { 340 defer close(cleanupDone) 341 s.muIndexDir.Global(func() { 342 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 343 }) 344 }() 345 346 repos.IterateIndexOptions(s.queue.AddOrUpdate) 347 348 // IterateIndexOptions will only iterate over repositories that have 349 // changed since we last called list. However, we want to add all IDs 350 // back onto the queue just to check that what is on disk is still 351 // correct. This will use the last IndexOptions we stored in the 352 // queue. The repositories not on the queue (missing) need a forced 353 // fetch of IndexOptions. 354 missing := s.queue.Bump(repos.IDs) 355 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 356 357 setCompoundShardCounter(s.IndexDir) 358 359 <-cleanupDone 360 } 361 }() 362 363 go func() { 364 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 365 if s.shardMerging { 366 s.vacuum() 367 } 368 } 369 }() 370 371 go func() { 372 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 373 if s.shardMerging { 374 s.doMerge() 375 } 376 } 377 }() 378 379 for i := 0; i < s.IndexConcurrency; i++ { 380 go s.processQueue() 381 } 382 383 // block forever 384 select {} 385} 386 387// formatList returns a comma-separated list of the first min(len(v), m) items. 388func formatListUint32(v []uint32, m int) string { 389 if len(v) < m { 390 m = len(v) 391 } 392 393 sb := strings.Builder{} 394 for i := 0; i < m; i++ { 395 fmt.Fprintf(&sb, "%d, ", v[i]) 396 } 397 398 if len(v) > m { 399 sb.WriteString("...") 400 } 401 402 return strings.TrimRight(sb.String(), ", ") 403} 404 405func (s *Server) processQueue() { 406 for { 407 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 408 time.Sleep(time.Second) 409 continue 410 } 411 412 item, ok := s.queue.Pop() 413 if !ok { 414 time.Sleep(time.Second) 415 continue 416 } 417 418 opts := item.Opts 419 args := s.indexArgs(opts) 420 421 ran := s.muIndexDir.With(opts.Name, func() { 422 // only record time taken once we hold the lock. This avoids us 423 // recording time taken while merging/cleanup runs. 424 start := time.Now() 425 426 state, err := s.Index(args) 427 428 elapsed := time.Since(start) 429 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 430 431 indexDelay := time.Since(item.DateAddedToQueue) 432 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds()) 433 434 if err != nil { 435 log.Printf("error indexing %s: %s", args.String(), err) 436 } 437 438 switch state { 439 case indexStateSuccess: 440 var branches []string 441 for _, b := range args.Branches { 442 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 443 } 444 s.logger.Info("updated index", 445 sglog.String("repo", args.Name), 446 sglog.Uint32("id", args.RepoID), 447 sglog.Strings("branches", branches), 448 sglog.Duration("duration", elapsed), 449 sglog.Duration("index_delay", indexDelay), 450 ) 451 case indexStateSuccessMeta: 452 log.Printf("updated meta %s in %v", args.String(), elapsed) 453 } 454 s.queue.SetIndexed(opts, state) 455 }) 456 457 if !ran { 458 // Someone else is processing the repository. We can just skip this job 459 // since the repository will be added back to the queue and we will 460 // converge to the correct behaviour. 461 debug.Printf("index job for repository already running: %s", args) 462 continue 463 } 464 } 465} 466 467// repoNameForMetric returns a normalized version of the given repository name that is 468// suitable for use with Prometheus metrics. 469func repoNameForMetric(repo string) string { 470 // Check to see if we want to be able to capture separate indexing metrics for this repository. 471 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 472 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 473 return repo 474 } 475 476 return "" 477} 478 479func batched(slice []uint32, size int) <-chan []uint32 { 480 c := make(chan []uint32) 481 go func() { 482 for len(slice) > 0 { 483 if size > len(slice) { 484 size = len(slice) 485 } 486 c <- slice[:size] 487 slice = slice[size:] 488 } 489 close(c) 490 }() 491 return c 492} 493 494// jitterTicker returns a ticker which ticks with a jitter. Each tick is 495// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 496// 497// sig is a list of signals which also cause the ticker to fire. This is a 498// convenience to allow manually triggering of the ticker. 499func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 500 ticker := make(chan struct{}) 501 502 go func() { 503 for { 504 ticker <- struct{}{} 505 ns := int64(d) 506 jitter := rand.Int63n(ns) 507 time.Sleep(time.Duration(ns/2 + jitter)) 508 } 509 }() 510 511 go func() { 512 if len(sig) == 0 { 513 return 514 } 515 516 c := make(chan os.Signal, 1) 517 signal.Notify(c, sig...) 518 for range c { 519 ticker <- struct{}{} 520 } 521 }() 522 523 return ticker 524} 525 526// Index starts an index job for repo name at commit. 527func (s *Server) Index(args *indexArgs) (state indexState, err error) { 528 tr := trace.New("index", args.Name) 529 530 defer func() { 531 if err != nil { 532 tr.SetError() 533 tr.LazyPrintf("error: %v", err) 534 state = indexStateFail 535 metricFailingTotal.Inc() 536 } 537 tr.LazyPrintf("state: %s", state) 538 tr.Finish() 539 }() 540 541 tr.LazyPrintf("branches: %v", args.Branches) 542 543 if len(args.Branches) == 0 { 544 return indexStateEmpty, createEmptyShard(args) 545 } 546 547 repositoryName := args.Name 548 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 549 tr.LazyPrintf("marking this repository for delta build") 550 args.UseDelta = true 551 } 552 553 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 554 555 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 556 tr.LazyPrintf("skipping symbols calculation") 557 args.Symbols = false 558 } 559 560 reason := "forced" 561 562 if args.Incremental { 563 bo := args.BuildOptions() 564 bo.SetDefaults() 565 incrementalState, fn := bo.IndexState() 566 reason = string(incrementalState) 567 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 568 569 switch incrementalState { 570 case build.IndexStateEqual: 571 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn) 572 return indexStateNoop, nil 573 574 case build.IndexStateMeta: 575 log.Printf("updating index.meta %s", args.String()) 576 577 if err := mergeMeta(bo); err != nil { 578 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 579 } else { 580 return indexStateSuccessMeta, nil 581 } 582 583 case build.IndexStateCorrupt: 584 log.Printf("falling back to full update: corrupt index: %s", args.String()) 585 } 586 } 587 588 log.Printf("updating index %s reason=%s", args.String(), reason) 589 590 metricIndexingTotal.Inc() 591 c := gitIndexConfig{ 592 runCmd: func(cmd *exec.Cmd) error { 593 return s.loggedRun(tr, cmd) 594 }, 595 596 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 597 return args.BuildOptions().FindRepositoryMetadata() 598 }, 599 timeout: s.timeout, 600 } 601 602 err = gitIndex(c, args, s.Sourcegraph, s.logger) 603 if err != nil { 604 return indexStateFail, err 605 } 606 607 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 608 s.logger.Error("failed to update index status", 609 sglog.String("repo", args.Name), 610 sglog.Uint32("id", args.RepoID), 611 sglogBranches("branches", args.Branches), 612 sglog.Error(err), 613 ) 614 } 615 616 return indexStateSuccess, nil 617} 618 619// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 620// it can update the zoekt_repos table. 621func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 622 // We need to read from disk for IndexTime. 623 _, metadata, ok, err := c.findRepositoryMetadata(args) 624 if err != nil { 625 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 626 } 627 if !ok { 628 return errors.New("failed to find metadata for new/updated index") 629 } 630 631 status := []indexStatus{{ 632 RepoID: args.RepoID, 633 Branches: args.Branches, 634 IndexTimeUnix: metadata.IndexTime.Unix(), 635 }} 636 if err := sg.UpdateIndexStatus(status); err != nil { 637 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 638 } 639 640 return nil 641} 642 643func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 644 ss := make([]string, len(branches)) 645 for i, b := range branches { 646 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 647 } 648 return sglog.Strings(key, ss) 649} 650 651func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 652 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0)) 653 return &indexArgs{ 654 IndexOptions: opts, 655 IndexDir: s.IndexDir, 656 Parallelism: parallelism, 657 Incremental: true, 658 659 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 660 FileLimit: 1 << 20, 661 662 ShardMerging: s.shardMerging, 663 } 664} 665 666// parallelism consults both the server flags and index options to determine the number 667// of shards to index in parallel. If the CPUCount index option is provided, it always 668// overrides the server flag. 669func (s *Server) parallelism(opts IndexOptions, maxProcs int) int { 670 var parallelism int 671 if opts.ShardConcurrency > 0 { 672 parallelism = int(opts.ShardConcurrency) 673 } else { 674 parallelism = s.CPUCount 675 } 676 677 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs 678 if parallelism > 4*maxProcs { 679 parallelism = 4 * maxProcs 680 } 681 682 // If index concurrency is set, then divide the parallelism by the number of 683 // repos we're indexing in parallel 684 if s.IndexConcurrency > 1 { 685 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency))) 686 } 687 688 return parallelism 689} 690 691func createEmptyShard(args *indexArgs) error { 692 bo := args.BuildOptions() 693 bo.SetDefaults() 694 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 695 696 if args.Incremental && bo.IncrementalSkipIndexing() { 697 return nil 698 } 699 700 builder, err := build.NewBuilder(*bo) 701 if err != nil { 702 return err 703 } 704 return builder.Finish() 705} 706 707// addDebugHandlers adds handlers specific to indexserver. 708func (s *Server) addDebugHandlers(mux *http.ServeMux) { 709 // Sourcegraph's site admin view requires indexserver to serve it's admin view 710 // on "/". 711 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 712 713 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 714 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 715 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 716 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 717 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 718 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 719} 720 721func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 722 if r.Method != "GET" { 723 w.Header().Set("Allow", "GET") 724 w.WriteHeader(http.StatusMethodNotAllowed) 725 return 726 } 727 728 response := struct { 729 Hostname string 730 }{ 731 Hostname: s.hostname, 732 } 733 734 b, err := json.Marshal(response) 735 if err != nil { 736 http.Error(w, err.Error(), http.StatusInternalServerError) 737 return 738 } 739 740 w.Header().Set("Content-Type", "application/json; charset=utf-8") 741 w.Write(b) 742} 743 744var rootTmpl = template.Must(template.New("name").Parse(` 745<html> 746 <body> 747 <a href="debug">Debug</a><br /> 748 <a href="debug/requests">Traces</a><br /> 749 {{.IndexMsg}}<br /> 750 <br /> 751 <h3>Reindex</h3> 752 {{if .Repos}} 753 <a href="?show_repos=false">hide repos</a><br /> 754 <table style="margin-top: 20px"> 755 <th style="text-align:left">Name</th> 756 <th style="text-align:left">ID</th> 757 {{range .Repos}} 758 <tr> 759 <td>{{.Name}}</td> 760 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 761 </tr> 762 {{end}} 763 </table> 764 {{else}} 765 <a href="?show_repos=true">show repos</a><br /> 766 {{end}} 767 </body> 768</html> 769`)) 770 771func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 772 if r.Method != "GET" { 773 w.Header().Set("Allow", "GET") 774 w.WriteHeader(http.StatusMethodNotAllowed) 775 return 776 } 777 778 values := r.URL.Query() 779 780 // ?id= 781 indexMsg := "" 782 if v := values.Get("id"); v != "" { 783 id, err := strconv.Atoi(v) 784 if err != nil { 785 http.Error(w, err.Error(), http.StatusBadRequest) 786 return 787 } 788 indexMsg, _ = s.forceIndex(uint32(id)) 789 } 790 791 // ?show_repos= 792 showRepos := false 793 if v := values.Get("show_repos"); v != "" { 794 showRepos, _ = strconv.ParseBool(v) 795 } 796 797 type Repo struct { 798 ID uint32 799 Name string 800 } 801 var data struct { 802 Repos []Repo 803 IndexMsg string 804 } 805 806 data.IndexMsg = indexMsg 807 808 if showRepos { 809 s.queue.Iterate(func(opts *IndexOptions) { 810 data.Repos = append(data.Repos, Repo{ 811 ID: opts.RepoID, 812 Name: opts.Name, 813 }) 814 }) 815 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 816 } 817 818 _ = rootTmpl.Execute(w, data) 819} 820 821// handleReindex triggers a reindex asynocronously. If a reindex was triggered 822// the request returns with status 202. The caller can infer the new state of 823// the index by calling List. 824func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 825 if r.Method != http.MethodPost { 826 w.Header().Set("Allow", http.MethodPost) 827 w.WriteHeader(http.StatusMethodNotAllowed) 828 return 829 } 830 831 err := r.ParseForm() 832 if err != nil { 833 http.Error(w, err.Error(), http.StatusBadRequest) 834 return 835 } 836 837 id, err := strconv.Atoi(r.Form.Get("repo")) 838 if err != nil { 839 http.Error(w, err.Error(), http.StatusBadRequest) 840 return 841 } 842 843 go func() { s.forceIndex(uint32(id)) }() 844 845 // 202 Accepted 846 w.WriteHeader(http.StatusAccepted) 847} 848 849func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 850 withIndexed := true 851 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 852 withIndexed = b 853 } 854 855 var indexed []uint32 856 if withIndexed { 857 indexed = listIndexed(s.IndexDir) 858 } 859 860 repos, err := s.Sourcegraph.List(r.Context(), indexed) 861 if err != nil { 862 http.Error(w, err.Error(), http.StatusInternalServerError) 863 return 864 } 865 866 bw := bytes.Buffer{} 867 868 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 869 870 _, err = fmt.Fprintf(tw, "ID\tName\n") 871 if err != nil { 872 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 873 return 874 } 875 876 s.queue.mu.Lock() 877 name := "" 878 for _, id := range repos.IDs { 879 if item := s.queue.get(id); item != nil { 880 name = item.opts.Name 881 } else { 882 name = "" 883 } 884 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 885 if err != nil { 886 debug.Printf("handleDebugList: %s\n", err.Error()) 887 } 888 } 889 s.queue.mu.Unlock() 890 891 if err != nil { 892 http.Error(w, err.Error(), http.StatusInternalServerError) 893 return 894 } 895 896 err = tw.Flush() 897 if err != nil { 898 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 899 return 900 } 901 902 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 903 904 if _, err := io.Copy(w, &bw); err != nil { 905 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 906 return 907 } 908} 909 910// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 911// can run this command during periods of low usage (evenings, weekends) to 912// trigger an initial merge run. In the steady-state, merges happen rarely, even 913// on busy instances, and users can rely on automatic merging instead. 914func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 915 // A merge operation can take very long, depending on the number merges and the 916 // target size of the compound shards. We run the merge in the background and 917 // return immediately to the user. 918 // 919 // We track the status of the merge with metricShardMergingRunning. 920 go func() { 921 s.doMerge() 922 }() 923 _, _ = w.Write([]byte("merging enqueued\n")) 924} 925 926func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 927 indexed := listIndexed(s.IndexDir) 928 929 bw := bytes.Buffer{} 930 931 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 932 933 _, err := fmt.Fprintf(tw, "ID\tName\n") 934 if err != nil { 935 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 936 return 937 } 938 939 s.queue.mu.Lock() 940 name := "" 941 for _, id := range indexed { 942 if item := s.queue.get(id); item != nil { 943 name = item.opts.Name 944 } else { 945 name = "" 946 } 947 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 948 if err != nil { 949 debug.Printf("handleDebugIndexed: %s\n", err.Error()) 950 } 951 } 952 s.queue.mu.Unlock() 953 954 if err != nil { 955 http.Error(w, err.Error(), http.StatusInternalServerError) 956 return 957 } 958 959 err = tw.Flush() 960 if err != nil { 961 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 962 return 963 } 964 965 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 966 967 if _, err := io.Copy(w, &bw); err != nil { 968 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 969 return 970 } 971} 972 973// forceIndex will run the index job for repo name now. It will return always 974// return a string explaining what it did, even if it failed. 975func (s *Server) forceIndex(id uint32) (string, error) { 976 var opts IndexOptions 977 var err error 978 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 979 opts = o 980 }, func(_ uint32, e error) { 981 err = e 982 }, id) 983 if err != nil { 984 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 985 } 986 987 args := s.indexArgs(opts) 988 args.Incremental = false // force re-index 989 990 var state indexState 991 ran := s.muIndexDir.With(opts.Name, func() { 992 state, err = s.Index(args) 993 }) 994 if !ran { 995 return fmt.Sprintf("index job for repository already running: %s", args), nil 996 } 997 if err != nil { 998 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 999 } 1000 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 1001} 1002 1003func listIndexed(indexDir string) []uint32 { 1004 index := getShards(indexDir) 1005 metricNumIndexed.Set(float64(len(index))) 1006 repoIDs := make([]uint32, 0, len(index)) 1007 for id := range index { 1008 repoIDs = append(repoIDs, id) 1009 } 1010 sort.Slice(repoIDs, func(i, j int) bool { 1011 return repoIDs[i] < repoIDs[j] 1012 }) 1013 return repoIDs 1014} 1015 1016// setupTmpDir sets up a temporary directory on the same volume as the 1017// indexes. 1018// 1019// If main is true we will delete older temp directories left around. main is 1020// false when this is a debug command. 1021func setupTmpDir(logger sglog.Logger, main bool, index string) error { 1022 // change the target tmp directory depending on if it's our main daemon or a 1023 // debug sub command. 1024 dir := ".indexserver.debug.tmp" 1025 if main { 1026 dir = ".indexserver.tmp" 1027 } 1028 1029 tmpRoot := filepath.Join(index, dir) 1030 1031 if main { 1032 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1033 err := os.RemoveAll(tmpRoot) 1034 if err != nil { 1035 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1036 } 1037 } 1038 1039 if err := os.MkdirAll(tmpRoot, 0o755); err != nil { 1040 return err 1041 } 1042 1043 return os.Setenv("TMPDIR", tmpRoot) 1044} 1045 1046func printMetaData(fn string) error { 1047 repo, indexMeta, err := zoekt.ReadMetadataPath(fn) 1048 if err != nil { 1049 return err 1050 } 1051 1052 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1053 if err != nil { 1054 return err 1055 } 1056 1057 err = json.NewEncoder(os.Stdout).Encode(repo) 1058 if err != nil { 1059 return err 1060 } 1061 return nil 1062} 1063 1064func printShardStats(fn string) error { 1065 f, err := os.Open(fn) 1066 if err != nil { 1067 return err 1068 } 1069 1070 iFile, err := zoekt.NewIndexFile(f) 1071 if err != nil { 1072 return err 1073 } 1074 1075 return zoekt.PrintNgramStats(iFile) 1076} 1077 1078func srcLogLevelIsDebug() bool { 1079 lvl := os.Getenv(sglog.EnvLogLevel) 1080 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1081} 1082 1083func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1084 v := os.Getenv(k) 1085 if v == "" { 1086 return defaultVal 1087 } 1088 b, err := strconv.ParseBool(v) 1089 if err != nil { 1090 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1091 } 1092 return b 1093} 1094 1095func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1096 v := os.Getenv(k) 1097 if v == "" { 1098 return defaultVal 1099 } 1100 i, err := strconv.ParseInt(v, 10, 64) 1101 if err != nil { 1102 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1103 } 1104 return i 1105} 1106 1107func getEnvWithDefaultInt(k string, defaultVal int) int { 1108 v := os.Getenv(k) 1109 if v == "" { 1110 return defaultVal 1111 } 1112 i, err := strconv.Atoi(k) 1113 if err != nil { 1114 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1115 } 1116 return i 1117} 1118 1119func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1120 v := os.Getenv(k) 1121 if v == "" { 1122 return defaultVal 1123 } 1124 i, err := strconv.ParseUint(v, 10, 64) 1125 if err != nil { 1126 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1127 } 1128 return i 1129} 1130 1131func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1132 v := os.Getenv(k) 1133 if v == "" { 1134 return defaultVal 1135 } 1136 f, err := strconv.ParseFloat(v, 64) 1137 if err != nil { 1138 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1139 } 1140 return f 1141} 1142 1143func getEnvWithDefaultString(k string, defaultVal string) string { 1144 v := os.Getenv(k) 1145 if v == "" { 1146 return defaultVal 1147 } 1148 return v 1149} 1150 1151func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1152 v := os.Getenv(k) 1153 if v == "" { 1154 return defaultVal 1155 } 1156 1157 d, err := time.ParseDuration(v) 1158 if err != nil { 1159 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1160 } 1161 return d 1162} 1163 1164func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1165 set := map[string]struct{}{} 1166 for _, v := range strings.Split(os.Getenv(k), ",") { 1167 v = strings.TrimSpace(v) 1168 if v != "" { 1169 set[v] = struct{}{} 1170 } 1171 } 1172 return set 1173} 1174 1175func joinStringSet(set map[string]struct{}, sep string) string { 1176 var xs []string 1177 for x := range set { 1178 xs = append(xs, x) 1179 } 1180 1181 return strings.Join(xs, sep) 1182} 1183 1184func setCompoundShardCounter(indexDir string) { 1185 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt")) 1186 if err != nil { 1187 log.Printf("setCompoundShardCounter: %s\n", err) 1188 return 1189 } 1190 metricNumberCompoundShards.Set(float64(len(fns))) 1191} 1192 1193func rootCmd() *ffcli.Command { 1194 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1195 conf := rootConfig{ 1196 Main: true, 1197 } 1198 conf.registerRootFlags(rootFs) 1199 1200 return &ffcli.Command{ 1201 FlagSet: rootFs, 1202 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1203 Subcommands: []*ffcli.Command{debugCmd()}, 1204 Exec: func(ctx context.Context, args []string) error { 1205 return startServer(conf) 1206 }, 1207 } 1208} 1209 1210type rootConfig struct { 1211 // Main is true if this rootConfig is for our main long running command (the 1212 // indexserver). Debug commands should not set this value. This is used to 1213 // determine if we need to run tmpfriend. 1214 Main bool 1215 1216 root string 1217 interval time.Duration 1218 index string 1219 indexConcurrency int64 1220 listen string 1221 hostname string 1222 cpuFraction float64 1223 blockProfileRate int 1224 1225 // config values related to shard merging 1226 disableShardMerging bool 1227 vacuumInterval time.Duration 1228 mergeInterval time.Duration 1229 targetSize int64 1230 minSize int64 1231 minAgeDays int 1232 1233 // config values related to backoff indexing repos with one or more consecutive failures 1234 backoffDuration time.Duration 1235 maxBackoffDuration time.Duration 1236} 1237 1238func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1239 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1240 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1241 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently") 1242 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1243 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1244 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1245 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1246 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate") 1247 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1248 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1249 1250 // flags related to shard merging 1251 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging") 1252 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1253 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1254 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 1255 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 1256 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1257} 1258 1259func startServer(conf rootConfig) error { 1260 s, err := newServer(conf) 1261 if err != nil { 1262 return err 1263 } 1264 1265 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate) 1266 setCompoundShardCounter(s.IndexDir) 1267 1268 if conf.listen != "" { 1269 1270 mux := http.NewServeMux() 1271 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1272 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1273 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1274 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1275 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1276 }...) 1277 s.addDebugHandlers(mux) 1278 1279 go func() { 1280 debug.Printf("serving HTTP on %s", conf.listen) 1281 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1282 }() 1283 1284 // Serve mux on a unix domain socket on a best-effort-basis so that 1285 // webserver can call the endpoints via the shared filesystem. 1286 // 1287 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1288 // on the socket due to permission errors. See 1289 // https://github.com/docker/for-mac/issues/6239 1290 go func() { 1291 serveHTTPOverSocket := func() error { 1292 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1293 // We cannot bind a socket to an existing pathname. 1294 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1295 return fmt.Errorf("error removing socket file: %s", socket) 1296 } 1297 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1298 // unixpacket). 1299 l, err := net.Listen("unix", socket) 1300 if err != nil { 1301 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1302 } 1303 // Indexserver (root) and webserver (Sourcegraph) run with 1304 // different users. Per default, the socket is created with 1305 // permission 755 (root root), which doesn't let webserver write to 1306 // it. 1307 // 1308 // See https://github.com/golang/go/issues/11822 for more context. 1309 if err := os.Chmod(socket, 0o777); err != nil { 1310 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1311 } 1312 debug.Printf("serving HTTP on %s", socket) 1313 return http.Serve(l, mux) 1314 } 1315 debug.Print(serveHTTPOverSocket()) 1316 }() 1317 } 1318 1319 oc := &ownerChecker{ 1320 Path: filepath.Join(conf.index, "owner.txt"), 1321 Hostname: conf.hostname, 1322 } 1323 go oc.Run() 1324 1325 logger := sglog.Scoped("metricsRegistration") 1326 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1327 1328 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1329 prometheus.DefaultRegisterer.MustRegister(c) 1330 1331 s.Run() 1332 return nil 1333} 1334 1335func newServer(conf rootConfig) (*Server, error) { 1336 logger := sglog.Scoped("server") 1337 1338 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1339 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1340 } 1341 if conf.index == "" { 1342 return nil, fmt.Errorf("must set -index") 1343 } 1344 if conf.root == "" { 1345 return nil, fmt.Errorf("must set -sourcegraph_url") 1346 } 1347 rootURL, err := url.Parse(conf.root) 1348 if err != nil { 1349 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1350 } 1351 1352 rootURL = addDefaultPort(rootURL) 1353 1354 // Tune GOMAXPROCS to match Linux container CPU quota. 1355 _, _ = maxprocs.Set() 1356 1357 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler. 1358 // The block profiler is disabled by default and should be enabled with care in production 1359 runtime.SetBlockProfileRate(conf.blockProfileRate) 1360 1361 // Automatically prepend our own path at the front, to minimize 1362 // required configuration. 1363 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1364 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1365 } 1366 1367 if _, err := os.Stat(conf.index); err != nil { 1368 if err := os.MkdirAll(conf.index, 0o755); err != nil { 1369 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1370 } 1371 } 1372 1373 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1374 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1375 } 1376 1377 if srcLogLevelIsDebug() { 1378 debug = log.New(os.Stderr, "", log.LstdFlags) 1379 } 1380 1381 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1382 if len(reposWithSeparateIndexingMetrics) > 0 { 1383 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1384 } 1385 1386 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1387 if len(deltaBuildRepositoriesAllowList) > 0 { 1388 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1389 } 1390 1391 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1392 if deltaShardNumberFallbackThreshold > 0 { 1393 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1394 } else { 1395 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1396 } 1397 1398 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1399 if len(reposShouldSkipSymbolsCalculation) > 0 { 1400 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1401 } 1402 1403 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1404 if indexingTimeout != defaultIndexingTimeout { 1405 debug.Printf("using configured indexing timeout: %s", indexingTimeout) 1406 } 1407 1408 var sg Sourcegraph 1409 if rootURL.IsAbs() { 1410 var batchSize int 1411 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1412 batchSize, err = strconv.Atoi(v) 1413 if err != nil { 1414 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1415 } 1416 } 1417 1418 opts := []SourcegraphClientOption{ 1419 WithBatchSize(batchSize), 1420 } 1421 1422 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1423 client, err := dialGRPCClient(rootURL.Host, logger) 1424 if err != nil { 1425 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1426 } 1427 1428 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...) 1429 1430 } else { 1431 sg = sourcegraphFake{ 1432 RootDir: rootURL.String(), 1433 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1434 } 1435 } 1436 1437 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1438 if cpuCount < 1 { 1439 cpuCount = 1 1440 } 1441 1442 if conf.indexConcurrency < 1 { 1443 conf.indexConcurrency = 1 1444 } else if conf.indexConcurrency > int64(cpuCount) { 1445 conf.indexConcurrency = int64(cpuCount) 1446 } 1447 1448 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1449 1450 return &Server{ 1451 logger: logger, 1452 Sourcegraph: sg, 1453 IndexDir: conf.index, 1454 IndexConcurrency: int(conf.indexConcurrency), 1455 Interval: conf.interval, 1456 CPUCount: cpuCount, 1457 queue: *q, 1458 shardMerging: !conf.disableShardMerging, 1459 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1460 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1461 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1462 hostname: conf.hostname, 1463 mergeOpts: mergeOpts{ 1464 vacuumInterval: conf.vacuumInterval, 1465 mergeInterval: conf.mergeInterval, 1466 targetSizeBytes: conf.targetSize * 1024 * 1024, 1467 minSizeBytes: conf.minSize * 1024 * 1024, 1468 minAgeDays: conf.minAgeDays, 1469 }, 1470 timeout: indexingTimeout, 1471 }, err 1472} 1473 1474// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1475// for the indexed-search-configuration gRPC service. 1476// 1477// The default backoff strategy is modeled after the default settings used by 1478// retryablehttp.DefaultClient. 1479// 1480// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1481// - Unavailable 1482// - Aborted 1483// 1484//go:embed default_grpc_service_configuration.json 1485var defaultGRPCServiceConfigurationJSON string 1486 1487func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1488 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1489 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1490 return invoker(ctx, method, req, reply, cc, opts...) 1491 } 1492} 1493 1494func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1495 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1496 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1497 return streamer(ctx, desc, cc, method, opts...) 1498 } 1499} 1500 1501// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1502// This can be overridden by providing custom Server/Dial options. 1503const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1504 1505func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) { 1506 metrics := mustGetClientMetrics() 1507 1508 // If the service seems to be unavailable, this 1509 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1510 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1511 retryOpts := []retry.CallOption{ 1512 retry.WithMax(5), 1513 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1514 retry.WithCodes(codes.Unavailable), 1515 } 1516 1517 opts := []grpc.DialOption{ 1518 grpc.WithTransportCredentials(insecure.NewCredentials()), 1519 grpc.WithChainStreamInterceptor( 1520 metrics.StreamClientInterceptor(), 1521 messagesize.StreamClientInterceptor, 1522 internalActorStreamInterceptor(), 1523 internalerrs.LoggingStreamClientInterceptor(logger), 1524 internalerrs.PrometheusStreamClientInterceptor, 1525 retry.StreamClientInterceptor(retryOpts...), 1526 ), 1527 grpc.WithChainUnaryInterceptor( 1528 metrics.UnaryClientInterceptor(), 1529 messagesize.UnaryClientInterceptor, 1530 internalActorUnaryInterceptor(), 1531 internalerrs.LoggingUnaryClientInterceptor(logger), 1532 internalerrs.PrometheusUnaryClientInterceptor, 1533 retry.UnaryClientInterceptor(retryOpts...), 1534 ), 1535 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1536 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1537 } 1538 1539 opts = append(opts, additionalOpts...) 1540 1541 // Ensure that the message size options are set last, so they override any other 1542 // client-specific options that tweak the message size. 1543 // 1544 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1545 // take precedence over everything else with a uniform size setting that's easy to reason about. 1546 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1547 1548 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1549 // This is done lazily, so we can provide the client to use regardless of 1550 // whether we enabled gRPC or not initially. 1551 cc, err := grpc.Dial(addr, opts...) 1552 if err != nil { 1553 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1554 } 1555 1556 client := proto.NewZoektConfigurationServiceClient(cc) 1557 return client, nil 1558} 1559 1560// mustGetClientMetrics returns a singleton instance of the client metrics 1561// that are shared across all gRPC clients that this process creates. 1562// 1563// This function panics if the metrics cannot be registered with the default 1564// Prometheus registry. 1565func mustGetClientMetrics() *grpcprom.ClientMetrics { 1566 clientMetricsOnce.Do(func() { 1567 clientMetrics = grpcprom.NewClientMetrics( 1568 grpcprom.WithClientCounterOptions(), 1569 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 1570 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 1571 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 1572 ) 1573 1574 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 1575 }) 1576 1577 return clientMetrics 1578} 1579 1580// addDefaultPort adds a default port to a URL if one is not specified. 1581// 1582// If the URL scheme is "http" and no port is specified, "80" is used. 1583// If the scheme is "https", "443" is used. 1584// 1585// The original URL is not mutated. A copy is modified and returned. 1586func addDefaultPort(original *url.URL) *url.URL { 1587 if original == nil { 1588 return nil // don't panic 1589 } 1590 1591 if !original.IsAbs() { 1592 return original // don't do anything if the URL doesn't look like a remote URL 1593 } 1594 1595 if original.Scheme == "http" && original.Port() == "" { 1596 u := cloneURL(original) 1597 u.Host = net.JoinHostPort(u.Host, "80") 1598 return u 1599 } 1600 1601 if original.Scheme == "https" && original.Port() == "" { 1602 u := cloneURL(original) 1603 u.Host = net.JoinHostPort(u.Host, "443") 1604 return u 1605 } 1606 1607 return original 1608} 1609 1610// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1611// This is copied from net/http/clone.go 1612func cloneURL(u *url.URL) *url.URL { 1613 if u == nil { 1614 return nil 1615 } 1616 u2 := new(url.URL) 1617 *u2 = *u 1618 if u.User != nil { 1619 u2.User = new(url.Userinfo) 1620 *u2.User = *u.User 1621 } 1622 return u2 1623} 1624 1625func main() { 1626 liblog := sglog.Init(sglog.Resource{ 1627 Name: "zoekt-indexserver", 1628 Version: zoekt.Version, 1629 InstanceID: zoekt.HostnameBestEffort(), 1630 }) 1631 defer liblog.Sync() 1632 1633 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1634 log.Fatal(err) 1635 } 1636} 1637 1638// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1639// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1640// 1641// An error is returned of the provided environment variables fails to parse as a boolean. 1642func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1643 for _, envVar := range envVarNames { 1644 v := os.Getenv(envVar) 1645 if v == "" { 1646 continue 1647 } 1648 1649 b, err := strconv.ParseBool(v) 1650 if err != nil { 1651 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1652 } 1653 1654 return b, nil 1655 } 1656 1657 return defaultBool, nil 1658}