fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled 2// repositories on sourcegraph 3package main 4 5import ( 6 "bytes" 7 "context" 8 _ "embed" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "html/template" 14 "io" 15 "io/fs" 16 "log" 17 "math" 18 "math/rand" 19 "net" 20 "net/http" 21 "net/url" 22 "os" 23 "os/exec" 24 "os/signal" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strconv" 29 "strings" 30 "sync" 31 "text/tabwriter" 32 "time" 33 34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 36 "github.com/peterbourgon/ff/v3/ffcli" 37 "github.com/prometheus/client_golang/prometheus" 38 "github.com/prometheus/client_golang/prometheus/promauto" 39 sglog "github.com/sourcegraph/log" 40 "github.com/sourcegraph/mountinfo" 41 "go.uber.org/automaxprocs/maxprocs" 42 "golang.org/x/net/trace" 43 "golang.org/x/sys/unix" 44 "google.golang.org/grpc" 45 "google.golang.org/grpc/codes" 46 "google.golang.org/grpc/credentials/insecure" 47 "google.golang.org/grpc/metadata" 48 49 "github.com/sourcegraph/zoekt" 50 "github.com/sourcegraph/zoekt/build" 51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 52 "github.com/sourcegraph/zoekt/debugserver" 53 "github.com/sourcegraph/zoekt/grpc/internalerrs" 54 "github.com/sourcegraph/zoekt/grpc/messagesize" 55 "github.com/sourcegraph/zoekt/internal/profiler" 56) 57 58var ( 59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 60 Name: "resolve_revisions_seconds", 61 Help: "A histogram of latencies for resolving all repository revisions.", 62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 63 }) 64 65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 66 Name: "resolve_revision_seconds", 67 Help: "A histogram of latencies for resolving a repository revision.", 68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 69 }, []string{"success"}) // success=true|false 70 71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 72 Name: "get_index_options_total", 73 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 74 }) 75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 76 Name: "get_index_options_error_total", 77 Help: "The total number of times we failed to get index options for a repository.", 78 }) 79 80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 81 Name: "index_repo_seconds", 82 Help: "A histogram of latencies for indexing a repository.", 83 Buckets: prometheus.ExponentialBucketsRange( 84 (100 * time.Millisecond).Seconds(), 85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 86 20), 87 }, []string{ 88 "state", // state is an indexState 89 "name", // name of the repository that was indexed 90 }) 91 92 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{ 93 Name: "index_indexing_delay_seconds", 94 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.", 95 Buckets: prometheus.ExponentialBuckets(60, 2, 12), // 1m -> ~3 days 96 }, []string{ 97 "state", // state is an indexState 98 "name", // the name of the repository that was indexed 99 }) 100 101 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 102 Name: "index_fetch_seconds", 103 Help: "A histogram of latencies for fetching a repository.", 104 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 105 }, []string{ 106 "success", // true|false 107 "name", // the name of the repository that the commits were fetched from 108 }) 109 110 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 111 Name: "index_incremental_index_state", 112 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 113 }, []string{"state"}) // state is build.IndexState 114 115 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 116 Name: "index_num_indexed", 117 Help: "Number of indexed repos by code host", 118 }) 119 120 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 121 Name: "index_failing_total", 122 Help: "Counts failures to index (indexing activity, should be used with rate())", 123 }) 124 125 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 126 Name: "index_indexing_total", 127 Help: "Counts indexings (indexing activity, should be used with rate())", 128 }) 129 130 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 131 Name: "index_num_stopped_tracking_total", 132 Help: "Counts the number of repos we stopped tracking.", 133 }) 134 135 // clientMetricsOnce returns a singleton instance of the client metrics 136 // that are shared across all gRPC clients that this process creates. 137 // 138 // This function panics if the metrics cannot be registered with the default 139 // Prometheus registry. 140 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics { 141 clientMetrics := grpcprom.NewClientMetrics( 142 grpcprom.WithClientCounterOptions(), 143 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 144 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 145 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 146 ) 147 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 148 return clientMetrics 149 }) 150) 151 152// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 153// NOTE: if you change this, you must also update gitIndex to use the same value when fetching the repo. 154const MaxFileSize = 1 << 20 155 156// set of repositories that we want to capture separate indexing metrics for 157var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 158 159type indexState string 160 161const ( 162 indexStateFail indexState = "fail" 163 indexStateSuccess indexState = "success" 164 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 165 indexStateNoop indexState = "noop" // We didn't need to update index 166 indexStateEmpty indexState = "empty" // index is empty (empty repo) 167) 168 169// Server is the main functionality of zoekt-sourcegraph-indexserver. It 170// exists to conveniently use all the options passed in via func main. 171type Server struct { 172 logger sglog.Logger 173 174 Sourcegraph Sourcegraph 175 BatchSize int 176 177 // IndexDir is the index directory to use. 178 IndexDir string 179 180 // IndexConcurrency is the number of repositories we index at once. 181 IndexConcurrency int 182 183 // Interval is how often we sync with Sourcegraph. 184 Interval time.Duration 185 186 // CPUCount is the number of CPUs to use for indexing shards. 187 CPUCount int 188 189 queue Queue 190 191 // muIndexDir protects the index directory from concurrent access. 192 muIndexDir indexMutex 193 194 // If true, shard merging is enabled. 195 shardMerging bool 196 197 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 198 // use delta-builds for instead of normal builds 199 deltaBuildRepositoriesAllowList map[string]struct{} 200 201 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 202 // before attempting a delta build. 203 deltaShardNumberFallbackThreshold uint64 204 205 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 206 // we skip calculating symbols metadata for during builds 207 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 208 209 hostname string 210 211 mergeOpts mergeOpts 212 213 // timeout defines how long the index server waits before killing an indexing job. 214 timeout time.Duration 215} 216 217var debug = log.New(io.Discard, "", log.LstdFlags) 218 219// our index commands should output something every 100mb they process. 220// 221// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 222// time." famous last words. A client was indexing a monorepo with 42 223// cores... 5m was not enough. 224const noOutputTimeout = 30 * time.Minute 225 226func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 227 out := &synchronizedBuffer{} 228 cmd.Stdout = out 229 cmd.Stderr = out 230 231 tr.LazyPrintf("%s", cmd.Args) 232 233 defer func() { 234 if err != nil { 235 outS := out.String() 236 tr.LazyPrintf("failed: %v", err) 237 tr.LazyPrintf("output: %s", out) 238 tr.SetError() 239 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 240 } 241 }() 242 243 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 244 245 if err := cmd.Start(); err != nil { 246 return err 247 } 248 249 errC := make(chan error) 250 go func() { 251 errC <- cmd.Wait() 252 }() 253 254 // This channel is set after we have sent sigquit. It allows us to follow up 255 // with a sigkill if the process doesn't quit after sigquit. 256 kill := make(<-chan time.Time) 257 258 lastLen := 0 259 for { 260 select { 261 case <-time.After(noOutputTimeout): 262 // Periodically check if we have had output. If not kill the process. 263 if out.Len() != lastLen { 264 lastLen = out.Len() 265 log.Printf("still running %s", cmd.Args) 266 } else { 267 // Send quit (C-\) first so we get a stack dump. 268 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 269 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 270 log.Println("quit failed:", err) 271 } 272 273 // send sigkill if still running in 10s 274 kill = time.After(10 * time.Second) 275 } 276 277 case <-kill: 278 log.Printf("still running, killing %s", cmd.Args) 279 if err := cmd.Process.Kill(); err != nil { 280 log.Println("kill failed:", err) 281 } 282 283 case err := <-errC: 284 if err != nil { 285 return err 286 } 287 288 tr.LazyPrintf("success") 289 return nil 290 } 291 } 292} 293 294// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 295// monitor the buffer while it is being written to. 296type synchronizedBuffer struct { 297 mu sync.Mutex 298 b bytes.Buffer 299} 300 301func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 302 sb.mu.Lock() 303 defer sb.mu.Unlock() 304 return sb.b.Write(p) 305} 306 307func (sb *synchronizedBuffer) Len() int { 308 sb.mu.Lock() 309 defer sb.mu.Unlock() 310 return sb.b.Len() 311} 312 313func (sb *synchronizedBuffer) String() string { 314 sb.mu.Lock() 315 defer sb.mu.Unlock() 316 return sb.b.String() 317} 318 319// pauseFileName if present in IndexDir will stop index jobs from 320// running. This is to make it possible to experiment with the content of the 321// IndexDir without the indexserver writing to it. 322const pauseFileName = "PAUSE" 323 324// Run the sync loop. This blocks forever. 325func (s *Server) Run() { 326 removeIncompleteShards(s.IndexDir) 327 328 // Start a goroutine which updates the queue with commits to index. 329 go func() { 330 // We update the list of indexed repos every Interval. To speed up manual 331 // testing we also listen for SIGUSR1 to trigger updates. 332 // 333 // "pkill -SIGUSR1 zoekt-sourcegra" 334 for range jitterTicker(s.Interval, unix.SIGUSR1) { 335 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 336 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 337 continue 338 } 339 340 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 341 if err != nil { 342 log.Printf("error listing repos: %s", err) 343 continue 344 } 345 346 debug.Printf("updating index queue with %d repositories", len(repos.IDs)) 347 348 // Stop indexing repos we don't need to track anymore 349 removed := s.queue.MaybeRemoveMissing(repos.IDs) 350 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 351 if len(removed) > 0 { 352 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 353 } 354 355 cleanupDone := make(chan struct{}) 356 go func() { 357 defer close(cleanupDone) 358 s.muIndexDir.Global(func() { 359 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 360 }) 361 }() 362 363 repos.IterateIndexOptions(s.queue.AddOrUpdate) 364 365 // IterateIndexOptions will only iterate over repositories that have 366 // changed since we last called list. However, we want to add all IDs 367 // back onto the queue just to check that what is on disk is still 368 // correct. This will use the last IndexOptions we stored in the 369 // queue. The repositories not on the queue (missing) need a forced 370 // fetch of IndexOptions. 371 missing := s.queue.Bump(repos.IDs) 372 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 373 374 setCompoundShardCounter(s.IndexDir) 375 376 <-cleanupDone 377 } 378 }() 379 380 go func() { 381 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 382 if s.shardMerging { 383 s.vacuum() 384 } 385 } 386 }() 387 388 go func() { 389 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 390 if s.shardMerging { 391 s.doMerge() 392 } 393 } 394 }() 395 396 for i := 0; i < s.IndexConcurrency; i++ { 397 go s.processQueue() 398 } 399 400 // block forever 401 select {} 402} 403 404// formatList returns a comma-separated list of the first min(len(v), m) items. 405func formatListUint32(v []uint32, m int) string { 406 if len(v) < m { 407 m = len(v) 408 } 409 410 sb := strings.Builder{} 411 for i := 0; i < m; i++ { 412 fmt.Fprintf(&sb, "%d, ", v[i]) 413 } 414 415 if len(v) > m { 416 sb.WriteString("...") 417 } 418 419 return strings.TrimRight(sb.String(), ", ") 420} 421 422func (s *Server) processQueue() { 423 for { 424 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 425 time.Sleep(time.Second) 426 continue 427 } 428 429 item, ok := s.queue.Pop() 430 if !ok { 431 time.Sleep(time.Second) 432 continue 433 } 434 435 opts := item.Opts 436 args := s.indexArgs(opts) 437 438 ran := s.muIndexDir.With(opts.Name, func() { 439 // only record time taken once we hold the lock. This avoids us 440 // recording time taken while merging/cleanup runs. 441 start := time.Now() 442 443 state, err := s.Index(args) 444 445 elapsed := time.Since(start) 446 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 447 448 indexDelay := time.Since(item.DateAddedToQueue) 449 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds()) 450 451 if err != nil { 452 log.Printf("error indexing %s: %s", args.String(), err) 453 } 454 455 switch state { 456 case indexStateSuccess: 457 var branches []string 458 for _, b := range args.Branches { 459 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 460 } 461 s.logger.Info("updated index", 462 sglog.String("repo", args.Name), 463 sglog.Uint32("id", args.RepoID), 464 sglog.Strings("branches", branches), 465 sglog.Duration("duration", elapsed), 466 sglog.Duration("index_delay", indexDelay), 467 ) 468 case indexStateSuccessMeta: 469 log.Printf("updated meta %s in %v", args.String(), elapsed) 470 } 471 s.queue.SetIndexed(opts, state) 472 }) 473 474 if !ran { 475 // Someone else is processing the repository. We can just skip this job 476 // since the repository will be added back to the queue and we will 477 // converge to the correct behaviour. 478 debug.Printf("index job for repository already running: %s", args) 479 continue 480 } 481 } 482} 483 484// repoNameForMetric returns a normalized version of the given repository name that is 485// suitable for use with Prometheus metrics. 486func repoNameForMetric(repo string) string { 487 // Check to see if we want to be able to capture separate indexing metrics for this repository. 488 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 489 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 490 return repo 491 } 492 493 return "" 494} 495 496func batched(slice []uint32, size int) <-chan []uint32 { 497 c := make(chan []uint32) 498 go func() { 499 for len(slice) > 0 { 500 if size > len(slice) { 501 size = len(slice) 502 } 503 c <- slice[:size] 504 slice = slice[size:] 505 } 506 close(c) 507 }() 508 return c 509} 510 511// jitterTicker returns a ticker which ticks with a jitter. Each tick is 512// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 513// 514// sig is a list of signals which also cause the ticker to fire. This is a 515// convenience to allow manually triggering of the ticker. 516func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 517 ticker := make(chan struct{}) 518 519 go func() { 520 for { 521 ticker <- struct{}{} 522 ns := int64(d) 523 jitter := rand.Int63n(ns) 524 time.Sleep(time.Duration(ns/2 + jitter)) 525 } 526 }() 527 528 go func() { 529 if len(sig) == 0 { 530 return 531 } 532 533 c := make(chan os.Signal, 1) 534 signal.Notify(c, sig...) 535 for range c { 536 ticker <- struct{}{} 537 } 538 }() 539 540 return ticker 541} 542 543// Index starts an index job for repo name at commit. 544func (s *Server) Index(args *indexArgs) (state indexState, err error) { 545 tr := trace.New("index", args.Name) 546 tr.SetMaxEvents(30) // Ensure we capture all indexing events 547 548 defer func() { 549 if err != nil { 550 tr.SetError() 551 tr.LazyPrintf("error: %v", err) 552 state = indexStateFail 553 metricFailingTotal.Inc() 554 } 555 tr.LazyPrintf("state: %s", state) 556 tr.Finish() 557 }() 558 559 tr.LazyPrintf("branches: %v", args.Branches) 560 561 if len(args.Branches) == 0 { 562 return indexStateEmpty, createEmptyShard(args) 563 } 564 565 repositoryName := args.Name 566 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 567 tr.LazyPrintf("marking this repository for delta build") 568 args.UseDelta = true 569 } 570 571 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 572 573 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 574 tr.LazyPrintf("skipping symbols calculation") 575 args.Symbols = false 576 } 577 578 reason := "forced" 579 580 if args.Incremental { 581 bo := args.BuildOptions() 582 bo.SetDefaults() 583 incrementalState, fn := bo.IndexState() 584 reason = string(incrementalState) 585 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 586 587 switch incrementalState { 588 case build.IndexStateEqual: 589 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn) 590 return indexStateNoop, nil 591 592 case build.IndexStateMeta: 593 log.Printf("updating index.meta %s", args.String()) 594 595 if err := mergeMeta(bo); err != nil { 596 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 597 } else { 598 return indexStateSuccessMeta, nil 599 } 600 601 case build.IndexStateCorrupt: 602 log.Printf("falling back to full update: corrupt index: %s", args.String()) 603 } 604 } 605 606 log.Printf("updating index %s reason=%s", args.String(), reason) 607 608 metricIndexingTotal.Inc() 609 c := gitIndexConfig{ 610 runCmd: func(cmd *exec.Cmd) error { 611 return s.loggedRun(tr, cmd) 612 }, 613 614 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 615 return args.BuildOptions().FindRepositoryMetadata() 616 }, 617 timeout: s.timeout, 618 } 619 620 err = gitIndex(c, args, s.Sourcegraph, s.logger) 621 if err != nil { 622 return indexStateFail, err 623 } 624 625 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 626 s.logger.Error("failed to update index status", 627 sglog.String("repo", args.Name), 628 sglog.Uint32("id", args.RepoID), 629 sglogBranches("branches", args.Branches), 630 sglog.Error(err), 631 ) 632 } 633 634 return indexStateSuccess, nil 635} 636 637// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 638// it can update the zoekt_repos table. 639func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 640 // We need to read from disk for IndexTime. 641 _, metadata, ok, err := c.findRepositoryMetadata(args) 642 if err != nil { 643 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 644 } 645 if !ok { 646 return errors.New("failed to find metadata for new/updated index") 647 } 648 649 status := []indexStatus{{ 650 RepoID: args.RepoID, 651 Branches: args.Branches, 652 IndexTimeUnix: metadata.IndexTime.Unix(), 653 }} 654 if err := sg.UpdateIndexStatus(status); err != nil { 655 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 656 } 657 658 return nil 659} 660 661func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 662 ss := make([]string, len(branches)) 663 for i, b := range branches { 664 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 665 } 666 return sglog.Strings(key, ss) 667} 668 669func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 670 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0)) 671 return &indexArgs{ 672 IndexOptions: opts, 673 IndexDir: s.IndexDir, 674 Parallelism: parallelism, 675 Incremental: true, 676 FileLimit: MaxFileSize, 677 ShardMerging: s.shardMerging, 678 } 679} 680 681// parallelism consults both the server flags and index options to determine the number 682// of shards to index in parallel. If the CPUCount index option is provided, it always 683// overrides the server flag. 684func (s *Server) parallelism(opts IndexOptions, maxProcs int) int { 685 var parallelism int 686 if opts.ShardConcurrency > 0 { 687 parallelism = int(opts.ShardConcurrency) 688 } else { 689 parallelism = s.CPUCount 690 } 691 692 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs 693 if parallelism > 4*maxProcs { 694 parallelism = 4 * maxProcs 695 } 696 697 // If index concurrency is set, then divide the parallelism by the number of 698 // repos we're indexing in parallel 699 if s.IndexConcurrency > 1 { 700 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency))) 701 } 702 703 return parallelism 704} 705 706func createEmptyShard(args *indexArgs) error { 707 bo := args.BuildOptions() 708 bo.SetDefaults() 709 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 710 711 if args.Incremental && bo.IncrementalSkipIndexing() { 712 return nil 713 } 714 715 builder, err := build.NewBuilder(*bo) 716 if err != nil { 717 return err 718 } 719 return builder.Finish() 720} 721 722// addDebugHandlers adds handlers specific to indexserver. 723func (s *Server) addDebugHandlers(mux *http.ServeMux) { 724 // Sourcegraph's site admin view requires indexserver to serve it's admin view 725 // on "/". 726 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 727 728 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 729 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 730 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 731 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 732 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 733 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 734} 735 736func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 737 if r.Method != "GET" { 738 w.Header().Set("Allow", "GET") 739 w.WriteHeader(http.StatusMethodNotAllowed) 740 return 741 } 742 743 response := struct { 744 Hostname string 745 }{ 746 Hostname: s.hostname, 747 } 748 749 b, err := json.Marshal(response) 750 if err != nil { 751 http.Error(w, err.Error(), http.StatusInternalServerError) 752 return 753 } 754 755 w.Header().Set("Content-Type", "application/json; charset=utf-8") 756 w.Write(b) 757} 758 759var rootTmpl = template.Must(template.New("name").Parse(` 760<html> 761 <body> 762 <a href="debug">Debug</a><br /> 763 <a href="debug/requests">Traces</a><br /> 764 {{.IndexMsg}}<br /> 765 <br /> 766 <h3>Reindex</h3> 767 {{if .Repos}} 768 <a href="?show_repos=false">hide repos</a><br /> 769 <table style="margin-top: 20px"> 770 <th style="text-align:left">Name</th> 771 <th style="text-align:left">ID (click to reindex)</th> 772 {{range .Repos}} 773 <tr> 774 <td>{{.Name}}</td> 775 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 776 </tr> 777 {{end}} 778 </table> 779 {{else}} 780 <a href="?show_repos=true">show repos</a><br /> 781 {{end}} 782 </body> 783</html> 784`)) 785 786func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 787 if r.Method != "GET" { 788 w.Header().Set("Allow", "GET") 789 w.WriteHeader(http.StatusMethodNotAllowed) 790 return 791 } 792 793 values := r.URL.Query() 794 795 // ?id= 796 indexMsg := "" 797 if v := values.Get("id"); v != "" { 798 id, err := strconv.Atoi(v) 799 if err != nil { 800 http.Error(w, err.Error(), http.StatusBadRequest) 801 return 802 } 803 indexMsg, _ = s.forceIndex(uint32(id)) 804 } 805 806 // ?show_repos= 807 showRepos := false 808 if v := values.Get("show_repos"); v != "" { 809 showRepos, _ = strconv.ParseBool(v) 810 } 811 812 type Repo struct { 813 ID uint32 814 Name string 815 } 816 var data struct { 817 Repos []Repo 818 IndexMsg string 819 } 820 821 data.IndexMsg = indexMsg 822 823 if showRepos { 824 s.queue.Iterate(func(opts *IndexOptions) { 825 data.Repos = append(data.Repos, Repo{ 826 ID: opts.RepoID, 827 Name: opts.Name, 828 }) 829 }) 830 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 831 } 832 833 _ = rootTmpl.Execute(w, data) 834} 835 836// handleReindex triggers a reindex asynocronously. If a reindex was triggered 837// the request returns with status 202. The caller can infer the new state of 838// the index by calling List. 839func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 840 if r.Method != http.MethodPost { 841 w.Header().Set("Allow", http.MethodPost) 842 w.WriteHeader(http.StatusMethodNotAllowed) 843 return 844 } 845 846 err := r.ParseForm() 847 if err != nil { 848 http.Error(w, err.Error(), http.StatusBadRequest) 849 return 850 } 851 852 id, err := strconv.Atoi(r.Form.Get("repo")) 853 if err != nil { 854 http.Error(w, err.Error(), http.StatusBadRequest) 855 return 856 } 857 858 go func() { s.forceIndex(uint32(id)) }() 859 860 // 202 Accepted 861 w.WriteHeader(http.StatusAccepted) 862} 863 864func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 865 withIndexed := true 866 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 867 withIndexed = b 868 } 869 870 var indexed []uint32 871 if withIndexed { 872 indexed = listIndexed(s.IndexDir) 873 } 874 875 repos, err := s.Sourcegraph.List(r.Context(), indexed) 876 if err != nil { 877 http.Error(w, err.Error(), http.StatusInternalServerError) 878 return 879 } 880 881 bw := bytes.Buffer{} 882 883 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 884 885 _, err = fmt.Fprintf(tw, "ID\tName\n") 886 if err != nil { 887 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 888 return 889 } 890 891 s.queue.mu.Lock() 892 name := "" 893 for _, id := range repos.IDs { 894 if item := s.queue.get(id); item != nil { 895 name = item.opts.Name 896 } else { 897 name = "" 898 } 899 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 900 if err != nil { 901 debug.Printf("handleDebugList: %s\n", err.Error()) 902 } 903 } 904 s.queue.mu.Unlock() 905 906 if err != nil { 907 http.Error(w, err.Error(), http.StatusInternalServerError) 908 return 909 } 910 911 err = tw.Flush() 912 if err != nil { 913 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 914 return 915 } 916 917 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 918 919 if _, err := io.Copy(w, &bw); err != nil { 920 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 921 return 922 } 923} 924 925// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 926// can run this command during periods of low usage (evenings, weekends) to 927// trigger an initial merge run. In the steady-state, merges happen rarely, even 928// on busy instances, and users can rely on automatic merging instead. 929func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 930 // A merge operation can take very long, depending on the number merges and the 931 // target size of the compound shards. We run the merge in the background and 932 // return immediately to the user. 933 // 934 // We track the status of the merge with metricShardMergingRunning. 935 go func() { 936 s.doMerge() 937 }() 938 _, _ = w.Write([]byte("merging enqueued\n")) 939} 940 941func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 942 indexed := listIndexed(s.IndexDir) 943 944 bw := bytes.Buffer{} 945 946 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 947 948 _, err := fmt.Fprintf(tw, "ID\tName\n") 949 if err != nil { 950 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 951 return 952 } 953 954 s.queue.mu.Lock() 955 name := "" 956 for _, id := range indexed { 957 if item := s.queue.get(id); item != nil { 958 name = item.opts.Name 959 } else { 960 name = "" 961 } 962 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 963 if err != nil { 964 debug.Printf("handleDebugIndexed: %s\n", err.Error()) 965 } 966 } 967 s.queue.mu.Unlock() 968 969 if err != nil { 970 http.Error(w, err.Error(), http.StatusInternalServerError) 971 return 972 } 973 974 err = tw.Flush() 975 if err != nil { 976 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 977 return 978 } 979 980 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 981 982 if _, err := io.Copy(w, &bw); err != nil { 983 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 984 return 985 } 986} 987 988// forceIndex will run the index job for repo name now. It will return always 989// return a string explaining what it did, even if it failed. 990func (s *Server) forceIndex(id uint32) (string, error) { 991 var opts IndexOptions 992 var err error 993 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 994 opts = o 995 }, func(_ uint32, e error) { 996 err = e 997 }, id) 998 if err != nil { 999 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 1000 } 1001 1002 args := s.indexArgs(opts) 1003 args.Incremental = false // force re-index 1004 1005 var state indexState 1006 ran := s.muIndexDir.With(opts.Name, func() { 1007 state, err = s.Index(args) 1008 }) 1009 if !ran { 1010 return fmt.Sprintf("index job for repository already running: %s", args), nil 1011 } 1012 if err != nil { 1013 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 1014 } 1015 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 1016} 1017 1018func listIndexed(indexDir string) []uint32 { 1019 index := getShards(indexDir) 1020 metricNumIndexed.Set(float64(len(index))) 1021 repoIDs := make([]uint32, 0, len(index)) 1022 for id := range index { 1023 repoIDs = append(repoIDs, id) 1024 } 1025 sort.Slice(repoIDs, func(i, j int) bool { 1026 return repoIDs[i] < repoIDs[j] 1027 }) 1028 return repoIDs 1029} 1030 1031// setupTmpDir sets up a temporary directory on the same volume as the 1032// indexes. 1033// 1034// If main is true we will delete older temp directories left around. main is 1035// false when this is a debug command. 1036func setupTmpDir(logger sglog.Logger, main bool, index string) error { 1037 // change the target tmp directory depending on if it's our main daemon or a 1038 // debug sub command. 1039 dir := ".indexserver.debug.tmp" 1040 if main { 1041 dir = ".indexserver.tmp" 1042 } 1043 1044 tmpRoot := filepath.Join(index, dir) 1045 1046 if main { 1047 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1048 err := os.RemoveAll(tmpRoot) 1049 if err != nil { 1050 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1051 } 1052 } 1053 1054 if err := os.MkdirAll(tmpRoot, 0o755); err != nil { 1055 return err 1056 } 1057 1058 return os.Setenv("TMPDIR", tmpRoot) 1059} 1060 1061func printMetaData(fn string) error { 1062 repo, indexMeta, err := zoekt.ReadMetadataPath(fn) 1063 if err != nil { 1064 return err 1065 } 1066 1067 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1068 if err != nil { 1069 return err 1070 } 1071 1072 err = json.NewEncoder(os.Stdout).Encode(repo) 1073 if err != nil { 1074 return err 1075 } 1076 return nil 1077} 1078 1079func printShardStats(fn string) error { 1080 f, err := os.Open(fn) 1081 if err != nil { 1082 return err 1083 } 1084 1085 iFile, err := zoekt.NewIndexFile(f) 1086 if err != nil { 1087 return err 1088 } 1089 1090 return zoekt.PrintNgramStats(iFile) 1091} 1092 1093func srcLogLevelIsDebug() bool { 1094 lvl := os.Getenv(sglog.EnvLogLevel) 1095 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1096} 1097 1098func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1099 v := os.Getenv(k) 1100 if v == "" { 1101 return defaultVal 1102 } 1103 b, err := strconv.ParseBool(v) 1104 if err != nil { 1105 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1106 } 1107 return b 1108} 1109 1110func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1111 v := os.Getenv(k) 1112 if v == "" { 1113 return defaultVal 1114 } 1115 i, err := strconv.ParseInt(v, 10, 64) 1116 if err != nil { 1117 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1118 } 1119 return i 1120} 1121 1122func getEnvWithDefaultInt(k string, defaultVal int) int { 1123 v := os.Getenv(k) 1124 if v == "" { 1125 return defaultVal 1126 } 1127 i, err := strconv.Atoi(k) 1128 if err != nil { 1129 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1130 } 1131 return i 1132} 1133 1134func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1135 v := os.Getenv(k) 1136 if v == "" { 1137 return defaultVal 1138 } 1139 i, err := strconv.ParseUint(v, 10, 64) 1140 if err != nil { 1141 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1142 } 1143 return i 1144} 1145 1146func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1147 v := os.Getenv(k) 1148 if v == "" { 1149 return defaultVal 1150 } 1151 f, err := strconv.ParseFloat(v, 64) 1152 if err != nil { 1153 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1154 } 1155 return f 1156} 1157 1158func getEnvWithDefaultString(k string, defaultVal string) string { 1159 v := os.Getenv(k) 1160 if v == "" { 1161 return defaultVal 1162 } 1163 return v 1164} 1165 1166func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1167 v := os.Getenv(k) 1168 if v == "" { 1169 return defaultVal 1170 } 1171 1172 d, err := time.ParseDuration(v) 1173 if err != nil { 1174 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1175 } 1176 return d 1177} 1178 1179func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1180 set := map[string]struct{}{} 1181 for _, v := range strings.Split(os.Getenv(k), ",") { 1182 v = strings.TrimSpace(v) 1183 if v != "" { 1184 set[v] = struct{}{} 1185 } 1186 } 1187 return set 1188} 1189 1190func joinStringSet(set map[string]struct{}, sep string) string { 1191 var xs []string 1192 for x := range set { 1193 xs = append(xs, x) 1194 } 1195 1196 return strings.Join(xs, sep) 1197} 1198 1199func setCompoundShardCounter(indexDir string) { 1200 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt")) 1201 if err != nil { 1202 log.Printf("setCompoundShardCounter: %s\n", err) 1203 return 1204 } 1205 metricNumberCompoundShards.Set(float64(len(fns))) 1206} 1207 1208func rootCmd() *ffcli.Command { 1209 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1210 conf := rootConfig{ 1211 Main: true, 1212 } 1213 conf.registerRootFlags(rootFs) 1214 1215 return &ffcli.Command{ 1216 FlagSet: rootFs, 1217 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1218 Subcommands: []*ffcli.Command{debugCmd()}, 1219 Exec: func(ctx context.Context, args []string) error { 1220 return startServer(conf) 1221 }, 1222 } 1223} 1224 1225type rootConfig struct { 1226 // Main is true if this rootConfig is for our main long running command (the 1227 // indexserver). Debug commands should not set this value. This is used to 1228 // determine if we need to run tmpfriend. 1229 Main bool 1230 1231 root string 1232 interval time.Duration 1233 index string 1234 indexConcurrency int64 1235 listen string 1236 hostname string 1237 cpuFraction float64 1238 blockProfileRate int 1239 1240 // config values related to shard merging 1241 disableShardMerging bool 1242 vacuumInterval time.Duration 1243 mergeInterval time.Duration 1244 targetSize int64 1245 minSize int64 1246 minAgeDays int 1247 1248 // config values related to backoff indexing repos with one or more consecutive failures 1249 backoffDuration time.Duration 1250 maxBackoffDuration time.Duration 1251} 1252 1253func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1254 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1255 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1256 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently") 1257 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1258 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1259 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1260 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1261 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate") 1262 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1263 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1264 1265 // flags related to shard merging 1266 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging") 1267 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1268 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1269 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 1270 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 1271 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1272} 1273 1274func startServer(conf rootConfig) error { 1275 s, err := newServer(conf) 1276 if err != nil { 1277 return err 1278 } 1279 1280 profiler.Init("zoekt-sourcegraph-indexserver") 1281 setCompoundShardCounter(s.IndexDir) 1282 1283 if conf.listen != "" { 1284 1285 mux := http.NewServeMux() 1286 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1287 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1288 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1289 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1290 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1291 }...) 1292 s.addDebugHandlers(mux) 1293 1294 go func() { 1295 debug.Printf("serving HTTP on %s", conf.listen) 1296 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1297 }() 1298 1299 // Serve mux on a unix domain socket on a best-effort-basis so that 1300 // webserver can call the endpoints via the shared filesystem. 1301 // 1302 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1303 // on the socket due to permission errors. See 1304 // https://github.com/docker/for-mac/issues/6239 1305 go func() { 1306 serveHTTPOverSocket := func() error { 1307 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1308 // We cannot bind a socket to an existing pathname. 1309 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1310 return fmt.Errorf("error removing socket file: %s", socket) 1311 } 1312 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1313 // unixpacket). 1314 l, err := net.Listen("unix", socket) 1315 if err != nil { 1316 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1317 } 1318 // Indexserver (root) and webserver (Sourcegraph) run with 1319 // different users. Per default, the socket is created with 1320 // permission 755 (root root), which doesn't let webserver write to 1321 // it. 1322 // 1323 // See https://github.com/golang/go/issues/11822 for more context. 1324 if err := os.Chmod(socket, 0o777); err != nil { 1325 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1326 } 1327 debug.Printf("serving HTTP on %s", socket) 1328 return http.Serve(l, mux) 1329 } 1330 debug.Print(serveHTTPOverSocket()) 1331 }() 1332 } 1333 1334 oc := &ownerChecker{ 1335 Path: filepath.Join(conf.index, "owner.txt"), 1336 Hostname: conf.hostname, 1337 } 1338 go oc.Run() 1339 1340 logger := sglog.Scoped("metricsRegistration") 1341 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1342 1343 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1344 prometheus.DefaultRegisterer.MustRegister(c) 1345 1346 s.Run() 1347 return nil 1348} 1349 1350func newServer(conf rootConfig) (*Server, error) { 1351 logger := sglog.Scoped("server") 1352 1353 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1354 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1355 } 1356 if conf.index == "" { 1357 return nil, fmt.Errorf("must set -index") 1358 } 1359 if conf.root == "" { 1360 return nil, fmt.Errorf("must set -sourcegraph_url") 1361 } 1362 rootURL, err := url.Parse(conf.root) 1363 if err != nil { 1364 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1365 } 1366 1367 rootURL = addDefaultPort(rootURL) 1368 1369 // Tune GOMAXPROCS to match Linux container CPU quota. 1370 _, _ = maxprocs.Set() 1371 1372 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler. 1373 // The block profiler is disabled by default and should be enabled with care in production 1374 runtime.SetBlockProfileRate(conf.blockProfileRate) 1375 1376 // Automatically prepend our own path at the front, to minimize 1377 // required configuration. 1378 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1379 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1380 } 1381 1382 if _, err := os.Stat(conf.index); err != nil { 1383 if err := os.MkdirAll(conf.index, 0o755); err != nil { 1384 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1385 } 1386 } 1387 1388 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1389 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1390 } 1391 1392 if srcLogLevelIsDebug() { 1393 debug = log.New(os.Stderr, "", log.LstdFlags) 1394 } 1395 1396 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1397 if len(reposWithSeparateIndexingMetrics) > 0 { 1398 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1399 } 1400 1401 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1402 if len(deltaBuildRepositoriesAllowList) > 0 { 1403 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1404 } 1405 1406 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1407 if deltaShardNumberFallbackThreshold > 0 { 1408 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1409 } else { 1410 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1411 } 1412 1413 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1414 if len(reposShouldSkipSymbolsCalculation) > 0 { 1415 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1416 } 1417 1418 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1419 if indexingTimeout != defaultIndexingTimeout { 1420 debug.Printf("using configured indexing timeout: %s", indexingTimeout) 1421 } 1422 1423 var sg Sourcegraph 1424 if rootURL.IsAbs() { 1425 var batchSize int 1426 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1427 batchSize, err = strconv.Atoi(v) 1428 if err != nil { 1429 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1430 } 1431 } 1432 1433 opts := []SourcegraphClientOption{ 1434 WithBatchSize(batchSize), 1435 } 1436 1437 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1438 client, err := dialGRPCClient(rootURL.Host, logger) 1439 if err != nil { 1440 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1441 } 1442 1443 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...) 1444 1445 } else { 1446 sg = sourcegraphFake{ 1447 RootDir: rootURL.String(), 1448 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1449 } 1450 } 1451 1452 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1453 if cpuCount < 1 { 1454 cpuCount = 1 1455 } 1456 1457 if conf.indexConcurrency < 1 { 1458 conf.indexConcurrency = 1 1459 } else if conf.indexConcurrency > int64(cpuCount) { 1460 conf.indexConcurrency = int64(cpuCount) 1461 } 1462 1463 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1464 1465 return &Server{ 1466 logger: logger, 1467 Sourcegraph: sg, 1468 IndexDir: conf.index, 1469 IndexConcurrency: int(conf.indexConcurrency), 1470 Interval: conf.interval, 1471 CPUCount: cpuCount, 1472 queue: *q, 1473 shardMerging: !conf.disableShardMerging, 1474 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1475 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1476 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1477 hostname: conf.hostname, 1478 mergeOpts: mergeOpts{ 1479 vacuumInterval: conf.vacuumInterval, 1480 mergeInterval: conf.mergeInterval, 1481 targetSizeBytes: conf.targetSize * 1024 * 1024, 1482 minSizeBytes: conf.minSize * 1024 * 1024, 1483 minAgeDays: conf.minAgeDays, 1484 }, 1485 timeout: indexingTimeout, 1486 }, err 1487} 1488 1489// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1490// for the indexed-search-configuration gRPC service. 1491// 1492// The default backoff strategy is modeled after the default settings used by 1493// retryablehttp.DefaultClient. 1494// 1495// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1496// - Unavailable 1497// - Aborted 1498// 1499//go:embed default_grpc_service_configuration.json 1500var defaultGRPCServiceConfigurationJSON string 1501 1502func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1503 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1504 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1505 return invoker(ctx, method, req, reply, cc, opts...) 1506 } 1507} 1508 1509func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1510 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1511 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1512 return streamer(ctx, desc, cc, method, opts...) 1513 } 1514} 1515 1516// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1517// This can be overridden by providing custom Server/Dial options. 1518const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1519 1520func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) { 1521 metrics := clientMetricsOnce() 1522 1523 // If the service seems to be unavailable, this 1524 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1525 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1526 retryOpts := []retry.CallOption{ 1527 retry.WithMax(5), 1528 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1529 retry.WithCodes(codes.Unavailable), 1530 } 1531 1532 opts := []grpc.DialOption{ 1533 grpc.WithTransportCredentials(insecure.NewCredentials()), 1534 grpc.WithChainStreamInterceptor( 1535 metrics.StreamClientInterceptor(), 1536 messagesize.StreamClientInterceptor, 1537 internalActorStreamInterceptor(), 1538 internalerrs.LoggingStreamClientInterceptor(logger), 1539 internalerrs.PrometheusStreamClientInterceptor, 1540 retry.StreamClientInterceptor(retryOpts...), 1541 ), 1542 grpc.WithChainUnaryInterceptor( 1543 metrics.UnaryClientInterceptor(), 1544 messagesize.UnaryClientInterceptor, 1545 internalActorUnaryInterceptor(), 1546 internalerrs.LoggingUnaryClientInterceptor(logger), 1547 internalerrs.PrometheusUnaryClientInterceptor, 1548 retry.UnaryClientInterceptor(retryOpts...), 1549 ), 1550 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1551 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1552 } 1553 1554 opts = append(opts, additionalOpts...) 1555 1556 // Ensure that the message size options are set last, so they override any other 1557 // client-specific options that tweak the message size. 1558 // 1559 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1560 // take precedence over everything else with a uniform size setting that's easy to reason about. 1561 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1562 1563 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1564 // This is done lazily, so we can provide the client to use regardless of 1565 // whether we enabled gRPC or not initially. 1566 cc, err := grpc.Dial(addr, opts...) 1567 if err != nil { 1568 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1569 } 1570 1571 client := proto.NewZoektConfigurationServiceClient(cc) 1572 return client, nil 1573} 1574 1575// addDefaultPort adds a default port to a URL if one is not specified. 1576// 1577// If the URL scheme is "http" and no port is specified, "80" is used. 1578// If the scheme is "https", "443" is used. 1579// 1580// The original URL is not mutated. A copy is modified and returned. 1581func addDefaultPort(original *url.URL) *url.URL { 1582 if original == nil { 1583 return nil // don't panic 1584 } 1585 1586 if !original.IsAbs() { 1587 return original // don't do anything if the URL doesn't look like a remote URL 1588 } 1589 1590 if original.Scheme == "http" && original.Port() == "" { 1591 u := cloneURL(original) 1592 u.Host = net.JoinHostPort(u.Host, "80") 1593 return u 1594 } 1595 1596 if original.Scheme == "https" && original.Port() == "" { 1597 u := cloneURL(original) 1598 u.Host = net.JoinHostPort(u.Host, "443") 1599 return u 1600 } 1601 1602 return original 1603} 1604 1605// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1606// This is copied from net/http/clone.go 1607func cloneURL(u *url.URL) *url.URL { 1608 if u == nil { 1609 return nil 1610 } 1611 u2 := new(url.URL) 1612 *u2 = *u 1613 if u.User != nil { 1614 u2.User = new(url.Userinfo) 1615 *u2.User = *u.User 1616 } 1617 return u2 1618} 1619 1620func main() { 1621 liblog := sglog.Init(sglog.Resource{ 1622 Name: "zoekt-indexserver", 1623 Version: zoekt.Version, 1624 InstanceID: zoekt.HostnameBestEffort(), 1625 }) 1626 defer liblog.Sync() 1627 1628 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1629 log.Fatal(err) 1630 } 1631} 1632 1633// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1634// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1635// 1636// An error is returned of the provided environment variables fails to parse as a boolean. 1637func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1638 for _, envVar := range envVarNames { 1639 v := os.Getenv(envVar) 1640 if v == "" { 1641 continue 1642 } 1643 1644 b, err := strconv.ParseBool(v) 1645 if err != nil { 1646 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1647 } 1648 1649 return b, nil 1650 } 1651 1652 return defaultBool, nil 1653}