fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled 2// repositories on sourcegraph 3package main 4 5import ( 6 "bytes" 7 "context" 8 _ "embed" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "html/template" 14 "io" 15 "io/fs" 16 "log" 17 "math" 18 "math/rand" 19 "net" 20 "net/http" 21 "net/url" 22 "os" 23 "os/exec" 24 "os/signal" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strconv" 29 "strings" 30 "sync" 31 "text/tabwriter" 32 "time" 33 34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 36 "github.com/peterbourgon/ff/v3/ffcli" 37 "github.com/prometheus/client_golang/prometheus" 38 "github.com/prometheus/client_golang/prometheus/promauto" 39 sglog "github.com/sourcegraph/log" 40 "github.com/sourcegraph/mountinfo" 41 "go.uber.org/automaxprocs/maxprocs" 42 "golang.org/x/net/trace" 43 "golang.org/x/sys/unix" 44 "google.golang.org/grpc" 45 "google.golang.org/grpc/codes" 46 "google.golang.org/grpc/credentials/insecure" 47 "google.golang.org/grpc/metadata" 48 49 "github.com/sourcegraph/zoekt" 50 "github.com/sourcegraph/zoekt/build" 51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 52 "github.com/sourcegraph/zoekt/debugserver" 53 "github.com/sourcegraph/zoekt/grpc/internalerrs" 54 "github.com/sourcegraph/zoekt/grpc/messagesize" 55 "github.com/sourcegraph/zoekt/internal/profiler" 56) 57 58var ( 59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 60 Name: "resolve_revisions_seconds", 61 Help: "A histogram of latencies for resolving all repository revisions.", 62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 63 }) 64 65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 66 Name: "resolve_revision_seconds", 67 Help: "A histogram of latencies for resolving a repository revision.", 68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 69 }, []string{"success"}) // success=true|false 70 71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 72 Name: "get_index_options_total", 73 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 74 }) 75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 76 Name: "get_index_options_error_total", 77 Help: "The total number of times we failed to get index options for a repository.", 78 }) 79 80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 81 Name: "index_repo_seconds", 82 Help: "A histogram of latencies for indexing a repository.", 83 Buckets: prometheus.ExponentialBucketsRange( 84 (100 * time.Millisecond).Seconds(), 85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 86 20), 87 }, []string{ 88 "state", // state is an indexState 89 "name", // name of the repository that was indexed 90 }) 91 92 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{ 93 Name: "index_indexing_delay_seconds", 94 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.", 95 Buckets: prometheus.ExponentialBuckets(60, 2, 12), // 1m -> ~3 days 96 }, []string{ 97 "state", // state is an indexState 98 "name", // the name of the repository that was indexed 99 }) 100 101 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 102 Name: "index_fetch_seconds", 103 Help: "A histogram of latencies for fetching a repository.", 104 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 105 }, []string{ 106 "success", // true|false 107 "name", // the name of the repository that the commits were fetched from 108 }) 109 110 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 111 Name: "index_incremental_index_state", 112 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 113 }, []string{"state"}) // state is build.IndexState 114 115 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 116 Name: "index_num_indexed", 117 Help: "Number of indexed repos by code host", 118 }) 119 120 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 121 Name: "index_failing_total", 122 Help: "Counts failures to index (indexing activity, should be used with rate())", 123 }) 124 125 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 126 Name: "index_indexing_total", 127 Help: "Counts indexings (indexing activity, should be used with rate())", 128 }) 129 130 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 131 Name: "index_num_stopped_tracking_total", 132 Help: "Counts the number of repos we stopped tracking.", 133 }) 134 135 clientMetricsOnce sync.Once 136 clientMetrics *grpcprom.ClientMetrics 137) 138 139// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 140// NOTE: if you change this, you must also update gitIndex to use the same value when fetching the repo. 141const MaxFileSize = 1 << 20 142 143// set of repositories that we want to capture separate indexing metrics for 144var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 145 146type indexState string 147 148const ( 149 indexStateFail indexState = "fail" 150 indexStateSuccess indexState = "success" 151 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 152 indexStateNoop indexState = "noop" // We didn't need to update index 153 indexStateEmpty indexState = "empty" // index is empty (empty repo) 154) 155 156// Server is the main functionality of zoekt-sourcegraph-indexserver. It 157// exists to conveniently use all the options passed in via func main. 158type Server struct { 159 logger sglog.Logger 160 161 Sourcegraph Sourcegraph 162 BatchSize int 163 164 // IndexDir is the index directory to use. 165 IndexDir string 166 167 // IndexConcurrency is the number of repositories we index at once. 168 IndexConcurrency int 169 170 // Interval is how often we sync with Sourcegraph. 171 Interval time.Duration 172 173 // CPUCount is the number of CPUs to use for indexing shards. 174 CPUCount int 175 176 queue Queue 177 178 // muIndexDir protects the index directory from concurrent access. 179 muIndexDir indexMutex 180 181 // If true, shard merging is enabled. 182 shardMerging bool 183 184 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 185 // use delta-builds for instead of normal builds 186 deltaBuildRepositoriesAllowList map[string]struct{} 187 188 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 189 // before attempting a delta build. 190 deltaShardNumberFallbackThreshold uint64 191 192 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 193 // we skip calculating symbols metadata for during builds 194 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 195 196 hostname string 197 198 mergeOpts mergeOpts 199 200 // timeout defines how long the index server waits before killing an indexing job. 201 timeout time.Duration 202} 203 204var debug = log.New(io.Discard, "", log.LstdFlags) 205 206// our index commands should output something every 100mb they process. 207// 208// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 209// time." famous last words. A client was indexing a monorepo with 42 210// cores... 5m was not enough. 211const noOutputTimeout = 30 * time.Minute 212 213func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 214 out := &synchronizedBuffer{} 215 cmd.Stdout = out 216 cmd.Stderr = out 217 218 tr.LazyPrintf("%s", cmd.Args) 219 220 defer func() { 221 if err != nil { 222 outS := out.String() 223 tr.LazyPrintf("failed: %v", err) 224 tr.LazyPrintf("output: %s", out) 225 tr.SetError() 226 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 227 } 228 }() 229 230 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 231 232 if err := cmd.Start(); err != nil { 233 return err 234 } 235 236 errC := make(chan error) 237 go func() { 238 errC <- cmd.Wait() 239 }() 240 241 // This channel is set after we have sent sigquit. It allows us to follow up 242 // with a sigkill if the process doesn't quit after sigquit. 243 kill := make(<-chan time.Time) 244 245 lastLen := 0 246 for { 247 select { 248 case <-time.After(noOutputTimeout): 249 // Periodically check if we have had output. If not kill the process. 250 if out.Len() != lastLen { 251 lastLen = out.Len() 252 log.Printf("still running %s", cmd.Args) 253 } else { 254 // Send quit (C-\) first so we get a stack dump. 255 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 256 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 257 log.Println("quit failed:", err) 258 } 259 260 // send sigkill if still running in 10s 261 kill = time.After(10 * time.Second) 262 } 263 264 case <-kill: 265 log.Printf("still running, killing %s", cmd.Args) 266 if err := cmd.Process.Kill(); err != nil { 267 log.Println("kill failed:", err) 268 } 269 270 case err := <-errC: 271 if err != nil { 272 return err 273 } 274 275 tr.LazyPrintf("success") 276 return nil 277 } 278 } 279} 280 281// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 282// monitor the buffer while it is being written to. 283type synchronizedBuffer struct { 284 mu sync.Mutex 285 b bytes.Buffer 286} 287 288func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 289 sb.mu.Lock() 290 defer sb.mu.Unlock() 291 return sb.b.Write(p) 292} 293 294func (sb *synchronizedBuffer) Len() int { 295 sb.mu.Lock() 296 defer sb.mu.Unlock() 297 return sb.b.Len() 298} 299 300func (sb *synchronizedBuffer) String() string { 301 sb.mu.Lock() 302 defer sb.mu.Unlock() 303 return sb.b.String() 304} 305 306// pauseFileName if present in IndexDir will stop index jobs from 307// running. This is to make it possible to experiment with the content of the 308// IndexDir without the indexserver writing to it. 309const pauseFileName = "PAUSE" 310 311// Run the sync loop. This blocks forever. 312func (s *Server) Run() { 313 removeIncompleteShards(s.IndexDir) 314 315 // Start a goroutine which updates the queue with commits to index. 316 go func() { 317 // We update the list of indexed repos every Interval. To speed up manual 318 // testing we also listen for SIGUSR1 to trigger updates. 319 // 320 // "pkill -SIGUSR1 zoekt-sourcegra" 321 for range jitterTicker(s.Interval, unix.SIGUSR1) { 322 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 323 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 324 continue 325 } 326 327 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 328 if err != nil { 329 log.Printf("error listing repos: %s", err) 330 continue 331 } 332 333 debug.Printf("updating index queue with %d repositories", len(repos.IDs)) 334 335 // Stop indexing repos we don't need to track anymore 336 removed := s.queue.MaybeRemoveMissing(repos.IDs) 337 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 338 if len(removed) > 0 { 339 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 340 } 341 342 cleanupDone := make(chan struct{}) 343 go func() { 344 defer close(cleanupDone) 345 s.muIndexDir.Global(func() { 346 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 347 }) 348 }() 349 350 repos.IterateIndexOptions(s.queue.AddOrUpdate) 351 352 // IterateIndexOptions will only iterate over repositories that have 353 // changed since we last called list. However, we want to add all IDs 354 // back onto the queue just to check that what is on disk is still 355 // correct. This will use the last IndexOptions we stored in the 356 // queue. The repositories not on the queue (missing) need a forced 357 // fetch of IndexOptions. 358 missing := s.queue.Bump(repos.IDs) 359 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 360 361 setCompoundShardCounter(s.IndexDir) 362 363 <-cleanupDone 364 } 365 }() 366 367 go func() { 368 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 369 if s.shardMerging { 370 s.vacuum() 371 } 372 } 373 }() 374 375 go func() { 376 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 377 if s.shardMerging { 378 s.doMerge() 379 } 380 } 381 }() 382 383 for i := 0; i < s.IndexConcurrency; i++ { 384 go s.processQueue() 385 } 386 387 // block forever 388 select {} 389} 390 391// formatList returns a comma-separated list of the first min(len(v), m) items. 392func formatListUint32(v []uint32, m int) string { 393 if len(v) < m { 394 m = len(v) 395 } 396 397 sb := strings.Builder{} 398 for i := 0; i < m; i++ { 399 fmt.Fprintf(&sb, "%d, ", v[i]) 400 } 401 402 if len(v) > m { 403 sb.WriteString("...") 404 } 405 406 return strings.TrimRight(sb.String(), ", ") 407} 408 409func (s *Server) processQueue() { 410 for { 411 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 412 time.Sleep(time.Second) 413 continue 414 } 415 416 item, ok := s.queue.Pop() 417 if !ok { 418 time.Sleep(time.Second) 419 continue 420 } 421 422 opts := item.Opts 423 args := s.indexArgs(opts) 424 425 ran := s.muIndexDir.With(opts.Name, func() { 426 // only record time taken once we hold the lock. This avoids us 427 // recording time taken while merging/cleanup runs. 428 start := time.Now() 429 430 state, err := s.Index(args) 431 432 elapsed := time.Since(start) 433 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 434 435 indexDelay := time.Since(item.DateAddedToQueue) 436 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds()) 437 438 if err != nil { 439 log.Printf("error indexing %s: %s", args.String(), err) 440 } 441 442 switch state { 443 case indexStateSuccess: 444 var branches []string 445 for _, b := range args.Branches { 446 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 447 } 448 s.logger.Info("updated index", 449 sglog.String("repo", args.Name), 450 sglog.Uint32("id", args.RepoID), 451 sglog.Strings("branches", branches), 452 sglog.Duration("duration", elapsed), 453 sglog.Duration("index_delay", indexDelay), 454 ) 455 case indexStateSuccessMeta: 456 log.Printf("updated meta %s in %v", args.String(), elapsed) 457 } 458 s.queue.SetIndexed(opts, state) 459 }) 460 461 if !ran { 462 // Someone else is processing the repository. We can just skip this job 463 // since the repository will be added back to the queue and we will 464 // converge to the correct behaviour. 465 debug.Printf("index job for repository already running: %s", args) 466 continue 467 } 468 } 469} 470 471// repoNameForMetric returns a normalized version of the given repository name that is 472// suitable for use with Prometheus metrics. 473func repoNameForMetric(repo string) string { 474 // Check to see if we want to be able to capture separate indexing metrics for this repository. 475 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 476 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 477 return repo 478 } 479 480 return "" 481} 482 483func batched(slice []uint32, size int) <-chan []uint32 { 484 c := make(chan []uint32) 485 go func() { 486 for len(slice) > 0 { 487 if size > len(slice) { 488 size = len(slice) 489 } 490 c <- slice[:size] 491 slice = slice[size:] 492 } 493 close(c) 494 }() 495 return c 496} 497 498// jitterTicker returns a ticker which ticks with a jitter. Each tick is 499// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 500// 501// sig is a list of signals which also cause the ticker to fire. This is a 502// convenience to allow manually triggering of the ticker. 503func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 504 ticker := make(chan struct{}) 505 506 go func() { 507 for { 508 ticker <- struct{}{} 509 ns := int64(d) 510 jitter := rand.Int63n(ns) 511 time.Sleep(time.Duration(ns/2 + jitter)) 512 } 513 }() 514 515 go func() { 516 if len(sig) == 0 { 517 return 518 } 519 520 c := make(chan os.Signal, 1) 521 signal.Notify(c, sig...) 522 for range c { 523 ticker <- struct{}{} 524 } 525 }() 526 527 return ticker 528} 529 530// Index starts an index job for repo name at commit. 531func (s *Server) Index(args *indexArgs) (state indexState, err error) { 532 tr := trace.New("index", args.Name) 533 tr.SetMaxEvents(30) // Ensure we capture all indexing events 534 535 defer func() { 536 if err != nil { 537 tr.SetError() 538 tr.LazyPrintf("error: %v", err) 539 state = indexStateFail 540 metricFailingTotal.Inc() 541 } 542 tr.LazyPrintf("state: %s", state) 543 tr.Finish() 544 }() 545 546 tr.LazyPrintf("branches: %v", args.Branches) 547 548 if len(args.Branches) == 0 { 549 return indexStateEmpty, createEmptyShard(args) 550 } 551 552 repositoryName := args.Name 553 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 554 tr.LazyPrintf("marking this repository for delta build") 555 args.UseDelta = true 556 } 557 558 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 559 560 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 561 tr.LazyPrintf("skipping symbols calculation") 562 args.Symbols = false 563 } 564 565 reason := "forced" 566 567 if args.Incremental { 568 bo := args.BuildOptions() 569 bo.SetDefaults() 570 incrementalState, fn := bo.IndexState() 571 reason = string(incrementalState) 572 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 573 574 switch incrementalState { 575 case build.IndexStateEqual: 576 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn) 577 return indexStateNoop, nil 578 579 case build.IndexStateMeta: 580 log.Printf("updating index.meta %s", args.String()) 581 582 if err := mergeMeta(bo); err != nil { 583 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 584 } else { 585 return indexStateSuccessMeta, nil 586 } 587 588 case build.IndexStateCorrupt: 589 log.Printf("falling back to full update: corrupt index: %s", args.String()) 590 } 591 } 592 593 log.Printf("updating index %s reason=%s", args.String(), reason) 594 595 metricIndexingTotal.Inc() 596 c := gitIndexConfig{ 597 runCmd: func(cmd *exec.Cmd) error { 598 return s.loggedRun(tr, cmd) 599 }, 600 601 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 602 return args.BuildOptions().FindRepositoryMetadata() 603 }, 604 timeout: s.timeout, 605 } 606 607 err = gitIndex(c, args, s.Sourcegraph, s.logger) 608 if err != nil { 609 return indexStateFail, err 610 } 611 612 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 613 s.logger.Error("failed to update index status", 614 sglog.String("repo", args.Name), 615 sglog.Uint32("id", args.RepoID), 616 sglogBranches("branches", args.Branches), 617 sglog.Error(err), 618 ) 619 } 620 621 return indexStateSuccess, nil 622} 623 624// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 625// it can update the zoekt_repos table. 626func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 627 // We need to read from disk for IndexTime. 628 _, metadata, ok, err := c.findRepositoryMetadata(args) 629 if err != nil { 630 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 631 } 632 if !ok { 633 return errors.New("failed to find metadata for new/updated index") 634 } 635 636 status := []indexStatus{{ 637 RepoID: args.RepoID, 638 Branches: args.Branches, 639 IndexTimeUnix: metadata.IndexTime.Unix(), 640 }} 641 if err := sg.UpdateIndexStatus(status); err != nil { 642 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 643 } 644 645 return nil 646} 647 648func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 649 ss := make([]string, len(branches)) 650 for i, b := range branches { 651 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 652 } 653 return sglog.Strings(key, ss) 654} 655 656func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 657 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0)) 658 return &indexArgs{ 659 IndexOptions: opts, 660 IndexDir: s.IndexDir, 661 Parallelism: parallelism, 662 Incremental: true, 663 FileLimit: MaxFileSize, 664 ShardMerging: s.shardMerging, 665 } 666} 667 668// parallelism consults both the server flags and index options to determine the number 669// of shards to index in parallel. If the CPUCount index option is provided, it always 670// overrides the server flag. 671func (s *Server) parallelism(opts IndexOptions, maxProcs int) int { 672 var parallelism int 673 if opts.ShardConcurrency > 0 { 674 parallelism = int(opts.ShardConcurrency) 675 } else { 676 parallelism = s.CPUCount 677 } 678 679 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs 680 if parallelism > 4*maxProcs { 681 parallelism = 4 * maxProcs 682 } 683 684 // If index concurrency is set, then divide the parallelism by the number of 685 // repos we're indexing in parallel 686 if s.IndexConcurrency > 1 { 687 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency))) 688 } 689 690 return parallelism 691} 692 693func createEmptyShard(args *indexArgs) error { 694 bo := args.BuildOptions() 695 bo.SetDefaults() 696 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 697 698 if args.Incremental && bo.IncrementalSkipIndexing() { 699 return nil 700 } 701 702 builder, err := build.NewBuilder(*bo) 703 if err != nil { 704 return err 705 } 706 return builder.Finish() 707} 708 709// addDebugHandlers adds handlers specific to indexserver. 710func (s *Server) addDebugHandlers(mux *http.ServeMux) { 711 // Sourcegraph's site admin view requires indexserver to serve it's admin view 712 // on "/". 713 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 714 715 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 716 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 717 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 718 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 719 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 720 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 721} 722 723func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 724 if r.Method != "GET" { 725 w.Header().Set("Allow", "GET") 726 w.WriteHeader(http.StatusMethodNotAllowed) 727 return 728 } 729 730 response := struct { 731 Hostname string 732 }{ 733 Hostname: s.hostname, 734 } 735 736 b, err := json.Marshal(response) 737 if err != nil { 738 http.Error(w, err.Error(), http.StatusInternalServerError) 739 return 740 } 741 742 w.Header().Set("Content-Type", "application/json; charset=utf-8") 743 w.Write(b) 744} 745 746var rootTmpl = template.Must(template.New("name").Parse(` 747<html> 748 <body> 749 <a href="debug">Debug</a><br /> 750 <a href="debug/requests">Traces</a><br /> 751 {{.IndexMsg}}<br /> 752 <br /> 753 <h3>Reindex</h3> 754 {{if .Repos}} 755 <a href="?show_repos=false">hide repos</a><br /> 756 <table style="margin-top: 20px"> 757 <th style="text-align:left">Name</th> 758 <th style="text-align:left">ID (click to reindex)</th> 759 {{range .Repos}} 760 <tr> 761 <td>{{.Name}}</td> 762 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 763 </tr> 764 {{end}} 765 </table> 766 {{else}} 767 <a href="?show_repos=true">show repos</a><br /> 768 {{end}} 769 </body> 770</html> 771`)) 772 773func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 774 if r.Method != "GET" { 775 w.Header().Set("Allow", "GET") 776 w.WriteHeader(http.StatusMethodNotAllowed) 777 return 778 } 779 780 values := r.URL.Query() 781 782 // ?id= 783 indexMsg := "" 784 if v := values.Get("id"); v != "" { 785 id, err := strconv.Atoi(v) 786 if err != nil { 787 http.Error(w, err.Error(), http.StatusBadRequest) 788 return 789 } 790 indexMsg, _ = s.forceIndex(uint32(id)) 791 } 792 793 // ?show_repos= 794 showRepos := false 795 if v := values.Get("show_repos"); v != "" { 796 showRepos, _ = strconv.ParseBool(v) 797 } 798 799 type Repo struct { 800 ID uint32 801 Name string 802 } 803 var data struct { 804 Repos []Repo 805 IndexMsg string 806 } 807 808 data.IndexMsg = indexMsg 809 810 if showRepos { 811 s.queue.Iterate(func(opts *IndexOptions) { 812 data.Repos = append(data.Repos, Repo{ 813 ID: opts.RepoID, 814 Name: opts.Name, 815 }) 816 }) 817 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 818 } 819 820 _ = rootTmpl.Execute(w, data) 821} 822 823// handleReindex triggers a reindex asynocronously. If a reindex was triggered 824// the request returns with status 202. The caller can infer the new state of 825// the index by calling List. 826func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 827 if r.Method != http.MethodPost { 828 w.Header().Set("Allow", http.MethodPost) 829 w.WriteHeader(http.StatusMethodNotAllowed) 830 return 831 } 832 833 err := r.ParseForm() 834 if err != nil { 835 http.Error(w, err.Error(), http.StatusBadRequest) 836 return 837 } 838 839 id, err := strconv.Atoi(r.Form.Get("repo")) 840 if err != nil { 841 http.Error(w, err.Error(), http.StatusBadRequest) 842 return 843 } 844 845 go func() { s.forceIndex(uint32(id)) }() 846 847 // 202 Accepted 848 w.WriteHeader(http.StatusAccepted) 849} 850 851func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 852 withIndexed := true 853 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 854 withIndexed = b 855 } 856 857 var indexed []uint32 858 if withIndexed { 859 indexed = listIndexed(s.IndexDir) 860 } 861 862 repos, err := s.Sourcegraph.List(r.Context(), indexed) 863 if err != nil { 864 http.Error(w, err.Error(), http.StatusInternalServerError) 865 return 866 } 867 868 bw := bytes.Buffer{} 869 870 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 871 872 _, err = fmt.Fprintf(tw, "ID\tName\n") 873 if err != nil { 874 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 875 return 876 } 877 878 s.queue.mu.Lock() 879 name := "" 880 for _, id := range repos.IDs { 881 if item := s.queue.get(id); item != nil { 882 name = item.opts.Name 883 } else { 884 name = "" 885 } 886 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 887 if err != nil { 888 debug.Printf("handleDebugList: %s\n", err.Error()) 889 } 890 } 891 s.queue.mu.Unlock() 892 893 if err != nil { 894 http.Error(w, err.Error(), http.StatusInternalServerError) 895 return 896 } 897 898 err = tw.Flush() 899 if err != nil { 900 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 901 return 902 } 903 904 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 905 906 if _, err := io.Copy(w, &bw); err != nil { 907 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 908 return 909 } 910} 911 912// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 913// can run this command during periods of low usage (evenings, weekends) to 914// trigger an initial merge run. In the steady-state, merges happen rarely, even 915// on busy instances, and users can rely on automatic merging instead. 916func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 917 // A merge operation can take very long, depending on the number merges and the 918 // target size of the compound shards. We run the merge in the background and 919 // return immediately to the user. 920 // 921 // We track the status of the merge with metricShardMergingRunning. 922 go func() { 923 s.doMerge() 924 }() 925 _, _ = w.Write([]byte("merging enqueued\n")) 926} 927 928func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 929 indexed := listIndexed(s.IndexDir) 930 931 bw := bytes.Buffer{} 932 933 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 934 935 _, err := fmt.Fprintf(tw, "ID\tName\n") 936 if err != nil { 937 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 938 return 939 } 940 941 s.queue.mu.Lock() 942 name := "" 943 for _, id := range indexed { 944 if item := s.queue.get(id); item != nil { 945 name = item.opts.Name 946 } else { 947 name = "" 948 } 949 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 950 if err != nil { 951 debug.Printf("handleDebugIndexed: %s\n", err.Error()) 952 } 953 } 954 s.queue.mu.Unlock() 955 956 if err != nil { 957 http.Error(w, err.Error(), http.StatusInternalServerError) 958 return 959 } 960 961 err = tw.Flush() 962 if err != nil { 963 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 964 return 965 } 966 967 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 968 969 if _, err := io.Copy(w, &bw); err != nil { 970 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 971 return 972 } 973} 974 975// forceIndex will run the index job for repo name now. It will return always 976// return a string explaining what it did, even if it failed. 977func (s *Server) forceIndex(id uint32) (string, error) { 978 var opts IndexOptions 979 var err error 980 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 981 opts = o 982 }, func(_ uint32, e error) { 983 err = e 984 }, id) 985 if err != nil { 986 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 987 } 988 989 args := s.indexArgs(opts) 990 args.Incremental = false // force re-index 991 992 var state indexState 993 ran := s.muIndexDir.With(opts.Name, func() { 994 state, err = s.Index(args) 995 }) 996 if !ran { 997 return fmt.Sprintf("index job for repository already running: %s", args), nil 998 } 999 if err != nil { 1000 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 1001 } 1002 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 1003} 1004 1005func listIndexed(indexDir string) []uint32 { 1006 index := getShards(indexDir) 1007 metricNumIndexed.Set(float64(len(index))) 1008 repoIDs := make([]uint32, 0, len(index)) 1009 for id := range index { 1010 repoIDs = append(repoIDs, id) 1011 } 1012 sort.Slice(repoIDs, func(i, j int) bool { 1013 return repoIDs[i] < repoIDs[j] 1014 }) 1015 return repoIDs 1016} 1017 1018// setupTmpDir sets up a temporary directory on the same volume as the 1019// indexes. 1020// 1021// If main is true we will delete older temp directories left around. main is 1022// false when this is a debug command. 1023func setupTmpDir(logger sglog.Logger, main bool, index string) error { 1024 // change the target tmp directory depending on if it's our main daemon or a 1025 // debug sub command. 1026 dir := ".indexserver.debug.tmp" 1027 if main { 1028 dir = ".indexserver.tmp" 1029 } 1030 1031 tmpRoot := filepath.Join(index, dir) 1032 1033 if main { 1034 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1035 err := os.RemoveAll(tmpRoot) 1036 if err != nil { 1037 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1038 } 1039 } 1040 1041 if err := os.MkdirAll(tmpRoot, 0o755); err != nil { 1042 return err 1043 } 1044 1045 return os.Setenv("TMPDIR", tmpRoot) 1046} 1047 1048func printMetaData(fn string) error { 1049 repo, indexMeta, err := zoekt.ReadMetadataPath(fn) 1050 if err != nil { 1051 return err 1052 } 1053 1054 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1055 if err != nil { 1056 return err 1057 } 1058 1059 err = json.NewEncoder(os.Stdout).Encode(repo) 1060 if err != nil { 1061 return err 1062 } 1063 return nil 1064} 1065 1066func printShardStats(fn string) error { 1067 f, err := os.Open(fn) 1068 if err != nil { 1069 return err 1070 } 1071 1072 iFile, err := zoekt.NewIndexFile(f) 1073 if err != nil { 1074 return err 1075 } 1076 1077 return zoekt.PrintNgramStats(iFile) 1078} 1079 1080func srcLogLevelIsDebug() bool { 1081 lvl := os.Getenv(sglog.EnvLogLevel) 1082 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1083} 1084 1085func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1086 v := os.Getenv(k) 1087 if v == "" { 1088 return defaultVal 1089 } 1090 b, err := strconv.ParseBool(v) 1091 if err != nil { 1092 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1093 } 1094 return b 1095} 1096 1097func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1098 v := os.Getenv(k) 1099 if v == "" { 1100 return defaultVal 1101 } 1102 i, err := strconv.ParseInt(v, 10, 64) 1103 if err != nil { 1104 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1105 } 1106 return i 1107} 1108 1109func getEnvWithDefaultInt(k string, defaultVal int) int { 1110 v := os.Getenv(k) 1111 if v == "" { 1112 return defaultVal 1113 } 1114 i, err := strconv.Atoi(k) 1115 if err != nil { 1116 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1117 } 1118 return i 1119} 1120 1121func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1122 v := os.Getenv(k) 1123 if v == "" { 1124 return defaultVal 1125 } 1126 i, err := strconv.ParseUint(v, 10, 64) 1127 if err != nil { 1128 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1129 } 1130 return i 1131} 1132 1133func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1134 v := os.Getenv(k) 1135 if v == "" { 1136 return defaultVal 1137 } 1138 f, err := strconv.ParseFloat(v, 64) 1139 if err != nil { 1140 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1141 } 1142 return f 1143} 1144 1145func getEnvWithDefaultString(k string, defaultVal string) string { 1146 v := os.Getenv(k) 1147 if v == "" { 1148 return defaultVal 1149 } 1150 return v 1151} 1152 1153func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1154 v := os.Getenv(k) 1155 if v == "" { 1156 return defaultVal 1157 } 1158 1159 d, err := time.ParseDuration(v) 1160 if err != nil { 1161 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1162 } 1163 return d 1164} 1165 1166func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1167 set := map[string]struct{}{} 1168 for _, v := range strings.Split(os.Getenv(k), ",") { 1169 v = strings.TrimSpace(v) 1170 if v != "" { 1171 set[v] = struct{}{} 1172 } 1173 } 1174 return set 1175} 1176 1177func joinStringSet(set map[string]struct{}, sep string) string { 1178 var xs []string 1179 for x := range set { 1180 xs = append(xs, x) 1181 } 1182 1183 return strings.Join(xs, sep) 1184} 1185 1186func setCompoundShardCounter(indexDir string) { 1187 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt")) 1188 if err != nil { 1189 log.Printf("setCompoundShardCounter: %s\n", err) 1190 return 1191 } 1192 metricNumberCompoundShards.Set(float64(len(fns))) 1193} 1194 1195func rootCmd() *ffcli.Command { 1196 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1197 conf := rootConfig{ 1198 Main: true, 1199 } 1200 conf.registerRootFlags(rootFs) 1201 1202 return &ffcli.Command{ 1203 FlagSet: rootFs, 1204 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1205 Subcommands: []*ffcli.Command{debugCmd()}, 1206 Exec: func(ctx context.Context, args []string) error { 1207 return startServer(conf) 1208 }, 1209 } 1210} 1211 1212type rootConfig struct { 1213 // Main is true if this rootConfig is for our main long running command (the 1214 // indexserver). Debug commands should not set this value. This is used to 1215 // determine if we need to run tmpfriend. 1216 Main bool 1217 1218 root string 1219 interval time.Duration 1220 index string 1221 indexConcurrency int64 1222 listen string 1223 hostname string 1224 cpuFraction float64 1225 blockProfileRate int 1226 1227 // config values related to shard merging 1228 disableShardMerging bool 1229 vacuumInterval time.Duration 1230 mergeInterval time.Duration 1231 targetSize int64 1232 minSize int64 1233 minAgeDays int 1234 1235 // config values related to backoff indexing repos with one or more consecutive failures 1236 backoffDuration time.Duration 1237 maxBackoffDuration time.Duration 1238} 1239 1240func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1241 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1242 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1243 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently") 1244 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1245 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1246 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1247 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1248 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate") 1249 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1250 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1251 1252 // flags related to shard merging 1253 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging") 1254 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1255 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1256 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 1257 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 1258 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1259} 1260 1261func startServer(conf rootConfig) error { 1262 s, err := newServer(conf) 1263 if err != nil { 1264 return err 1265 } 1266 1267 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate) 1268 setCompoundShardCounter(s.IndexDir) 1269 1270 if conf.listen != "" { 1271 1272 mux := http.NewServeMux() 1273 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1274 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1275 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1276 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1277 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1278 }...) 1279 s.addDebugHandlers(mux) 1280 1281 go func() { 1282 debug.Printf("serving HTTP on %s", conf.listen) 1283 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1284 }() 1285 1286 // Serve mux on a unix domain socket on a best-effort-basis so that 1287 // webserver can call the endpoints via the shared filesystem. 1288 // 1289 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1290 // on the socket due to permission errors. See 1291 // https://github.com/docker/for-mac/issues/6239 1292 go func() { 1293 serveHTTPOverSocket := func() error { 1294 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1295 // We cannot bind a socket to an existing pathname. 1296 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1297 return fmt.Errorf("error removing socket file: %s", socket) 1298 } 1299 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1300 // unixpacket). 1301 l, err := net.Listen("unix", socket) 1302 if err != nil { 1303 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1304 } 1305 // Indexserver (root) and webserver (Sourcegraph) run with 1306 // different users. Per default, the socket is created with 1307 // permission 755 (root root), which doesn't let webserver write to 1308 // it. 1309 // 1310 // See https://github.com/golang/go/issues/11822 for more context. 1311 if err := os.Chmod(socket, 0o777); err != nil { 1312 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1313 } 1314 debug.Printf("serving HTTP on %s", socket) 1315 return http.Serve(l, mux) 1316 } 1317 debug.Print(serveHTTPOverSocket()) 1318 }() 1319 } 1320 1321 oc := &ownerChecker{ 1322 Path: filepath.Join(conf.index, "owner.txt"), 1323 Hostname: conf.hostname, 1324 } 1325 go oc.Run() 1326 1327 logger := sglog.Scoped("metricsRegistration") 1328 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1329 1330 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1331 prometheus.DefaultRegisterer.MustRegister(c) 1332 1333 s.Run() 1334 return nil 1335} 1336 1337func newServer(conf rootConfig) (*Server, error) { 1338 logger := sglog.Scoped("server") 1339 1340 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1341 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1342 } 1343 if conf.index == "" { 1344 return nil, fmt.Errorf("must set -index") 1345 } 1346 if conf.root == "" { 1347 return nil, fmt.Errorf("must set -sourcegraph_url") 1348 } 1349 rootURL, err := url.Parse(conf.root) 1350 if err != nil { 1351 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1352 } 1353 1354 rootURL = addDefaultPort(rootURL) 1355 1356 // Tune GOMAXPROCS to match Linux container CPU quota. 1357 _, _ = maxprocs.Set() 1358 1359 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler. 1360 // The block profiler is disabled by default and should be enabled with care in production 1361 runtime.SetBlockProfileRate(conf.blockProfileRate) 1362 1363 // Automatically prepend our own path at the front, to minimize 1364 // required configuration. 1365 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1366 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1367 } 1368 1369 if _, err := os.Stat(conf.index); err != nil { 1370 if err := os.MkdirAll(conf.index, 0o755); err != nil { 1371 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1372 } 1373 } 1374 1375 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1376 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1377 } 1378 1379 if srcLogLevelIsDebug() { 1380 debug = log.New(os.Stderr, "", log.LstdFlags) 1381 } 1382 1383 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1384 if len(reposWithSeparateIndexingMetrics) > 0 { 1385 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1386 } 1387 1388 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1389 if len(deltaBuildRepositoriesAllowList) > 0 { 1390 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1391 } 1392 1393 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1394 if deltaShardNumberFallbackThreshold > 0 { 1395 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1396 } else { 1397 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1398 } 1399 1400 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1401 if len(reposShouldSkipSymbolsCalculation) > 0 { 1402 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1403 } 1404 1405 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1406 if indexingTimeout != defaultIndexingTimeout { 1407 debug.Printf("using configured indexing timeout: %s", indexingTimeout) 1408 } 1409 1410 var sg Sourcegraph 1411 if rootURL.IsAbs() { 1412 var batchSize int 1413 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1414 batchSize, err = strconv.Atoi(v) 1415 if err != nil { 1416 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1417 } 1418 } 1419 1420 opts := []SourcegraphClientOption{ 1421 WithBatchSize(batchSize), 1422 } 1423 1424 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1425 client, err := dialGRPCClient(rootURL.Host, logger) 1426 if err != nil { 1427 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1428 } 1429 1430 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...) 1431 1432 } else { 1433 sg = sourcegraphFake{ 1434 RootDir: rootURL.String(), 1435 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1436 } 1437 } 1438 1439 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1440 if cpuCount < 1 { 1441 cpuCount = 1 1442 } 1443 1444 if conf.indexConcurrency < 1 { 1445 conf.indexConcurrency = 1 1446 } else if conf.indexConcurrency > int64(cpuCount) { 1447 conf.indexConcurrency = int64(cpuCount) 1448 } 1449 1450 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1451 1452 return &Server{ 1453 logger: logger, 1454 Sourcegraph: sg, 1455 IndexDir: conf.index, 1456 IndexConcurrency: int(conf.indexConcurrency), 1457 Interval: conf.interval, 1458 CPUCount: cpuCount, 1459 queue: *q, 1460 shardMerging: !conf.disableShardMerging, 1461 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1462 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1463 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1464 hostname: conf.hostname, 1465 mergeOpts: mergeOpts{ 1466 vacuumInterval: conf.vacuumInterval, 1467 mergeInterval: conf.mergeInterval, 1468 targetSizeBytes: conf.targetSize * 1024 * 1024, 1469 minSizeBytes: conf.minSize * 1024 * 1024, 1470 minAgeDays: conf.minAgeDays, 1471 }, 1472 timeout: indexingTimeout, 1473 }, err 1474} 1475 1476// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1477// for the indexed-search-configuration gRPC service. 1478// 1479// The default backoff strategy is modeled after the default settings used by 1480// retryablehttp.DefaultClient. 1481// 1482// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1483// - Unavailable 1484// - Aborted 1485// 1486//go:embed default_grpc_service_configuration.json 1487var defaultGRPCServiceConfigurationJSON string 1488 1489func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1490 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1491 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1492 return invoker(ctx, method, req, reply, cc, opts...) 1493 } 1494} 1495 1496func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1497 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1498 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1499 return streamer(ctx, desc, cc, method, opts...) 1500 } 1501} 1502 1503// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1504// This can be overridden by providing custom Server/Dial options. 1505const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1506 1507func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) { 1508 metrics := mustGetClientMetrics() 1509 1510 // If the service seems to be unavailable, this 1511 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1512 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1513 retryOpts := []retry.CallOption{ 1514 retry.WithMax(5), 1515 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1516 retry.WithCodes(codes.Unavailable), 1517 } 1518 1519 opts := []grpc.DialOption{ 1520 grpc.WithTransportCredentials(insecure.NewCredentials()), 1521 grpc.WithChainStreamInterceptor( 1522 metrics.StreamClientInterceptor(), 1523 messagesize.StreamClientInterceptor, 1524 internalActorStreamInterceptor(), 1525 internalerrs.LoggingStreamClientInterceptor(logger), 1526 internalerrs.PrometheusStreamClientInterceptor, 1527 retry.StreamClientInterceptor(retryOpts...), 1528 ), 1529 grpc.WithChainUnaryInterceptor( 1530 metrics.UnaryClientInterceptor(), 1531 messagesize.UnaryClientInterceptor, 1532 internalActorUnaryInterceptor(), 1533 internalerrs.LoggingUnaryClientInterceptor(logger), 1534 internalerrs.PrometheusUnaryClientInterceptor, 1535 retry.UnaryClientInterceptor(retryOpts...), 1536 ), 1537 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1538 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1539 } 1540 1541 opts = append(opts, additionalOpts...) 1542 1543 // Ensure that the message size options are set last, so they override any other 1544 // client-specific options that tweak the message size. 1545 // 1546 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1547 // take precedence over everything else with a uniform size setting that's easy to reason about. 1548 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1549 1550 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1551 // This is done lazily, so we can provide the client to use regardless of 1552 // whether we enabled gRPC or not initially. 1553 cc, err := grpc.Dial(addr, opts...) 1554 if err != nil { 1555 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1556 } 1557 1558 client := proto.NewZoektConfigurationServiceClient(cc) 1559 return client, nil 1560} 1561 1562// mustGetClientMetrics returns a singleton instance of the client metrics 1563// that are shared across all gRPC clients that this process creates. 1564// 1565// This function panics if the metrics cannot be registered with the default 1566// Prometheus registry. 1567func mustGetClientMetrics() *grpcprom.ClientMetrics { 1568 clientMetricsOnce.Do(func() { 1569 clientMetrics = grpcprom.NewClientMetrics( 1570 grpcprom.WithClientCounterOptions(), 1571 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 1572 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 1573 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 1574 ) 1575 1576 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 1577 }) 1578 1579 return clientMetrics 1580} 1581 1582// addDefaultPort adds a default port to a URL if one is not specified. 1583// 1584// If the URL scheme is "http" and no port is specified, "80" is used. 1585// If the scheme is "https", "443" is used. 1586// 1587// The original URL is not mutated. A copy is modified and returned. 1588func addDefaultPort(original *url.URL) *url.URL { 1589 if original == nil { 1590 return nil // don't panic 1591 } 1592 1593 if !original.IsAbs() { 1594 return original // don't do anything if the URL doesn't look like a remote URL 1595 } 1596 1597 if original.Scheme == "http" && original.Port() == "" { 1598 u := cloneURL(original) 1599 u.Host = net.JoinHostPort(u.Host, "80") 1600 return u 1601 } 1602 1603 if original.Scheme == "https" && original.Port() == "" { 1604 u := cloneURL(original) 1605 u.Host = net.JoinHostPort(u.Host, "443") 1606 return u 1607 } 1608 1609 return original 1610} 1611 1612// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1613// This is copied from net/http/clone.go 1614func cloneURL(u *url.URL) *url.URL { 1615 if u == nil { 1616 return nil 1617 } 1618 u2 := new(url.URL) 1619 *u2 = *u 1620 if u.User != nil { 1621 u2.User = new(url.Userinfo) 1622 *u2.User = *u.User 1623 } 1624 return u2 1625} 1626 1627func main() { 1628 liblog := sglog.Init(sglog.Resource{ 1629 Name: "zoekt-indexserver", 1630 Version: zoekt.Version, 1631 InstanceID: zoekt.HostnameBestEffort(), 1632 }) 1633 defer liblog.Sync() 1634 1635 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1636 log.Fatal(err) 1637 } 1638} 1639 1640// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1641// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1642// 1643// An error is returned of the provided environment variables fails to parse as a boolean. 1644func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1645 for _, envVar := range envVarNames { 1646 v := os.Getenv(envVar) 1647 if v == "" { 1648 continue 1649 } 1650 1651 b, err := strconv.ParseBool(v) 1652 if err != nil { 1653 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1654 } 1655 1656 return b, nil 1657 } 1658 1659 return defaultBool, nil 1660}