fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled 2// repositories on sourcegraph 3package main 4 5import ( 6 "bytes" 7 "context" 8 _ "embed" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "html/template" 14 "io" 15 "io/fs" 16 "log" 17 "math" 18 "math/rand" 19 "net" 20 "net/http" 21 "net/url" 22 "os" 23 "os/exec" 24 "os/signal" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strconv" 29 "strings" 30 "sync" 31 "text/tabwriter" 32 "time" 33 34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 36 "github.com/peterbourgon/ff/v3/ffcli" 37 "github.com/prometheus/client_golang/prometheus" 38 "github.com/prometheus/client_golang/prometheus/promauto" 39 sglog "github.com/sourcegraph/log" 40 "github.com/sourcegraph/mountinfo" 41 "go.uber.org/automaxprocs/maxprocs" 42 "golang.org/x/net/trace" 43 "golang.org/x/sys/unix" 44 "google.golang.org/grpc" 45 "google.golang.org/grpc/codes" 46 "google.golang.org/grpc/credentials/insecure" 47 "google.golang.org/grpc/metadata" 48 49 "github.com/sourcegraph/zoekt" 50 "github.com/sourcegraph/zoekt/build" 51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 52 "github.com/sourcegraph/zoekt/debugserver" 53 "github.com/sourcegraph/zoekt/grpc/internalerrs" 54 "github.com/sourcegraph/zoekt/grpc/messagesize" 55 "github.com/sourcegraph/zoekt/internal/profiler" 56) 57 58var ( 59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 60 Name: "resolve_revisions_seconds", 61 Help: "A histogram of latencies for resolving all repository revisions.", 62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 63 }) 64 65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 66 Name: "resolve_revision_seconds", 67 Help: "A histogram of latencies for resolving a repository revision.", 68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 69 }, []string{"success"}) // success=true|false 70 71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 72 Name: "get_index_options_total", 73 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 74 }) 75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 76 Name: "get_index_options_error_total", 77 Help: "The total number of times we failed to get index options for a repository.", 78 }) 79 80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 81 Name: "index_repo_seconds", 82 Help: "A histogram of latencies for indexing a repository.", 83 Buckets: prometheus.ExponentialBucketsRange( 84 (100 * time.Millisecond).Seconds(), 85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 86 20), 87 }, []string{ 88 "state", // state is an indexState 89 "name", // name of the repository that was indexed 90 }) 91 92 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{ 93 Name: "index_indexing_delay_seconds", 94 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.", 95 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days 96 }, []string{ 97 "state", // state is an indexState 98 "name", // the name of the repository that was indexed 99 }) 100 101 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 102 Name: "index_fetch_seconds", 103 Help: "A histogram of latencies for fetching a repository.", 104 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 105 }, []string{ 106 "success", // true|false 107 "name", // the name of the repository that the commits were fetched from 108 }) 109 110 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 111 Name: "index_incremental_index_state", 112 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 113 }, []string{"state"}) // state is build.IndexState 114 115 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 116 Name: "index_num_indexed", 117 Help: "Number of indexed repos by code host", 118 }) 119 120 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 121 Name: "index_failing_total", 122 Help: "Counts failures to index (indexing activity, should be used with rate())", 123 }) 124 125 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 126 Name: "index_indexing_total", 127 Help: "Counts indexings (indexing activity, should be used with rate())", 128 }) 129 130 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 131 Name: "index_num_stopped_tracking_total", 132 Help: "Counts the number of repos we stopped tracking.", 133 }) 134 135 // clientMetricsOnce returns a singleton instance of the client metrics 136 // that are shared across all gRPC clients that this process creates. 137 // 138 // This function panics if the metrics cannot be registered with the default 139 // Prometheus registry. 140 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics { 141 clientMetrics := grpcprom.NewClientMetrics( 142 grpcprom.WithClientCounterOptions(), 143 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 144 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 145 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 146 ) 147 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 148 return clientMetrics 149 }) 150) 151 152// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 153// NOTE: if you change this, you must also update gitIndex to use the same value when fetching the repo. 154const MaxFileSize = 1 << 20 155 156// set of repositories that we want to capture separate indexing metrics for 157var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 158 159type indexState string 160 161const ( 162 indexStateFail indexState = "fail" 163 indexStateSuccess indexState = "success" 164 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 165 indexStateNoop indexState = "noop" // We didn't need to update index 166 indexStateEmpty indexState = "empty" // index is empty (empty repo) 167) 168 169// Server is the main functionality of zoekt-sourcegraph-indexserver. It 170// exists to conveniently use all the options passed in via func main. 171type Server struct { 172 logger sglog.Logger 173 174 Sourcegraph Sourcegraph 175 BatchSize int 176 177 // IndexDir is the index directory to use. 178 IndexDir string 179 180 // IndexConcurrency is the number of repositories we index at once. 181 IndexConcurrency int 182 183 // Interval is how often we sync with Sourcegraph. 184 Interval time.Duration 185 186 // CPUCount is the number of CPUs to use for indexing shards. 187 CPUCount int 188 189 queue Queue 190 191 // muIndexDir protects the index directory from concurrent access. 192 muIndexDir indexMutex 193 194 // If true, shard merging is enabled. 195 shardMerging bool 196 197 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 198 // use delta-builds for instead of normal builds 199 deltaBuildRepositoriesAllowList map[string]struct{} 200 201 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 202 // before attempting a delta build. 203 deltaShardNumberFallbackThreshold uint64 204 205 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 206 // we skip calculating symbols metadata for during builds 207 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 208 209 hostname string 210 211 mergeOpts mergeOpts 212 213 // timeout defines how long the index server waits before killing an indexing job. 214 timeout time.Duration 215} 216 217var debug = log.New(io.Discard, "", log.LstdFlags) 218 219// our index commands should output something every 100mb they process. 220// 221// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 222// time." famous last words. A client was indexing a monorepo with 42 223// cores... 5m was not enough. 224const noOutputTimeout = 30 * time.Minute 225 226func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 227 out := &synchronizedBuffer{} 228 cmd.Stdout = out 229 cmd.Stderr = out 230 231 tr.LazyPrintf("%s", cmd.Args) 232 233 defer func() { 234 if err != nil { 235 outS := out.String() 236 tr.LazyPrintf("failed: %v", err) 237 tr.LazyPrintf("output: %s", out) 238 tr.SetError() 239 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 240 } 241 }() 242 243 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 244 245 if err := cmd.Start(); err != nil { 246 return err 247 } 248 249 errC := make(chan error) 250 go func() { 251 errC <- cmd.Wait() 252 }() 253 254 // This channel is set after we have sent sigquit. It allows us to follow up 255 // with a sigkill if the process doesn't quit after sigquit. 256 kill := make(<-chan time.Time) 257 258 lastLen := 0 259 for { 260 select { 261 case <-time.After(noOutputTimeout): 262 // Periodically check if we have had output. If not kill the process. 263 if out.Len() != lastLen { 264 lastLen = out.Len() 265 log.Printf("still running %s", cmd.Args) 266 } else { 267 // Send quit (C-\) first so we get a stack dump. 268 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 269 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 270 log.Println("quit failed:", err) 271 } 272 273 // send sigkill if still running in 10s 274 kill = time.After(10 * time.Second) 275 } 276 277 case <-kill: 278 log.Printf("still running, killing %s", cmd.Args) 279 if err := cmd.Process.Kill(); err != nil { 280 log.Println("kill failed:", err) 281 } 282 283 case err := <-errC: 284 if err != nil { 285 return err 286 } 287 288 tr.LazyPrintf("success") 289 return nil 290 } 291 } 292} 293 294// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 295// monitor the buffer while it is being written to. 296type synchronizedBuffer struct { 297 mu sync.Mutex 298 b bytes.Buffer 299} 300 301func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 302 sb.mu.Lock() 303 defer sb.mu.Unlock() 304 return sb.b.Write(p) 305} 306 307func (sb *synchronizedBuffer) Len() int { 308 sb.mu.Lock() 309 defer sb.mu.Unlock() 310 return sb.b.Len() 311} 312 313func (sb *synchronizedBuffer) String() string { 314 sb.mu.Lock() 315 defer sb.mu.Unlock() 316 return sb.b.String() 317} 318 319// pauseFileName if present in IndexDir will stop index jobs from 320// running. This is to make it possible to experiment with the content of the 321// IndexDir without the indexserver writing to it. 322const pauseFileName = "PAUSE" 323 324// Run the sync loop. This blocks forever. 325func (s *Server) Run() { 326 removeIncompleteShards(s.IndexDir) 327 328 // Start a goroutine which updates the queue with commits to index. 329 go func() { 330 // We update the list of indexed repos every Interval. To speed up manual 331 // testing we also listen for SIGUSR1 to trigger updates. 332 // 333 // "pkill -SIGUSR1 zoekt-sourcegra" 334 for range jitterTicker(s.Interval, unix.SIGUSR1) { 335 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 336 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 337 continue 338 } 339 340 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 341 if err != nil { 342 log.Printf("error listing repos: %s", err) 343 continue 344 } 345 346 debug.Printf("updating index queue with %d repositories", len(repos.IDs)) 347 348 // Stop indexing repos we don't need to track anymore 349 removed := s.queue.MaybeRemoveMissing(repos.IDs) 350 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 351 if len(removed) > 0 { 352 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 353 } 354 355 cleanupDone := make(chan struct{}) 356 go func() { 357 defer close(cleanupDone) 358 s.muIndexDir.Global(func() { 359 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 360 }) 361 }() 362 363 repos.IterateIndexOptions(s.queue.AddOrUpdate) 364 365 // IterateIndexOptions will only iterate over repositories that have 366 // changed since we last called list. However, we want to add all IDs 367 // back onto the queue just to check that what is on disk is still 368 // correct. This will use the last IndexOptions we stored in the 369 // queue. The repositories not on the queue (missing) need a forced 370 // fetch of IndexOptions. 371 missing := s.queue.Bump(repos.IDs) 372 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 373 374 setCompoundShardCounter(s.IndexDir) 375 376 <-cleanupDone 377 } 378 }() 379 380 go func() { 381 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 382 if s.shardMerging { 383 s.vacuum() 384 } 385 } 386 }() 387 388 go func() { 389 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 390 if s.shardMerging { 391 s.doMerge() 392 } 393 } 394 }() 395 396 for i := 0; i < s.IndexConcurrency; i++ { 397 go s.processQueue() 398 } 399 400 // block forever 401 select {} 402} 403 404// formatList returns a comma-separated list of the first min(len(v), m) items. 405func formatListUint32(v []uint32, m int) string { 406 if len(v) < m { 407 m = len(v) 408 } 409 410 sb := strings.Builder{} 411 for i := 0; i < m; i++ { 412 fmt.Fprintf(&sb, "%d, ", v[i]) 413 } 414 415 if len(v) > m { 416 sb.WriteString("...") 417 } 418 419 return strings.TrimRight(sb.String(), ", ") 420} 421 422func (s *Server) processQueue() { 423 for { 424 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 425 time.Sleep(time.Second) 426 continue 427 } 428 429 item, ok := s.queue.Pop() 430 if !ok { 431 time.Sleep(time.Second) 432 continue 433 } 434 435 opts := item.Opts 436 args := s.indexArgs(opts) 437 438 ran := s.muIndexDir.With(opts.Name, func() { 439 // only record time taken once we hold the lock. This avoids us 440 // recording time taken while merging/cleanup runs. 441 start := time.Now() 442 443 state, err := s.Index(args) 444 445 elapsed := time.Since(start) 446 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 447 448 indexDelay := time.Since(item.DateAddedToQueue) 449 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds()) 450 451 if err != nil { 452 log.Printf("error indexing %s: %s", args.String(), err) 453 } 454 455 switch state { 456 case indexStateSuccess: 457 var branches []string 458 for _, b := range args.Branches { 459 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 460 } 461 s.logger.Info("updated index", 462 sglog.String("repo", args.Name), 463 sglog.Uint32("id", args.RepoID), 464 sglog.Strings("branches", branches), 465 sglog.Duration("duration", elapsed), 466 sglog.Duration("index_delay", indexDelay), 467 ) 468 case indexStateSuccessMeta: 469 log.Printf("updated meta %s in %v", args.String(), elapsed) 470 } 471 s.queue.SetIndexed(opts, state) 472 }) 473 474 if !ran { 475 // Someone else is processing the repository. We can just skip this job 476 // since the repository will be added back to the queue and we will 477 // converge to the correct behaviour. 478 debug.Printf("index job for repository already running: %s", args) 479 continue 480 } 481 } 482} 483 484// repoNameForMetric returns a normalized version of the given repository name that is 485// suitable for use with Prometheus metrics. 486func repoNameForMetric(repo string) string { 487 // Check to see if we want to be able to capture separate indexing metrics for this repository. 488 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 489 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 490 return repo 491 } 492 493 return "" 494} 495 496func batched(slice []uint32, size int) <-chan []uint32 { 497 c := make(chan []uint32) 498 go func() { 499 for len(slice) > 0 { 500 if size > len(slice) { 501 size = len(slice) 502 } 503 c <- slice[:size] 504 slice = slice[size:] 505 } 506 close(c) 507 }() 508 return c 509} 510 511// jitterTicker returns a ticker which ticks with a jitter. Each tick is 512// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 513// 514// sig is a list of signals which also cause the ticker to fire. This is a 515// convenience to allow manually triggering of the ticker. 516func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 517 ticker := make(chan struct{}) 518 519 go func() { 520 for { 521 ticker <- struct{}{} 522 ns := int64(d) 523 jitter := rand.Int63n(ns) 524 time.Sleep(time.Duration(ns/2 + jitter)) 525 } 526 }() 527 528 go func() { 529 if len(sig) == 0 { 530 return 531 } 532 533 c := make(chan os.Signal, 1) 534 signal.Notify(c, sig...) 535 for range c { 536 ticker <- struct{}{} 537 } 538 }() 539 540 return ticker 541} 542 543// Index starts an index job for repo name at commit. 544func (s *Server) Index(args *indexArgs) (state indexState, err error) { 545 tr := trace.New("index", args.Name) 546 tr.SetMaxEvents(30) // Ensure we capture all indexing events 547 548 defer func() { 549 if err != nil { 550 tr.SetError() 551 tr.LazyPrintf("error: %v", err) 552 state = indexStateFail 553 metricFailingTotal.Inc() 554 } 555 tr.LazyPrintf("state: %s", state) 556 tr.Finish() 557 }() 558 559 tr.LazyPrintf("branches: %v", args.Branches) 560 561 if len(args.Branches) == 0 { 562 return indexStateEmpty, createEmptyShard(args) 563 } 564 565 repositoryName := args.Name 566 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 567 tr.LazyPrintf("marking this repository for delta build") 568 args.UseDelta = true 569 } 570 571 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 572 573 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 574 tr.LazyPrintf("skipping symbols calculation") 575 args.Symbols = false 576 } 577 578 reason := "forced" 579 580 if args.Incremental { 581 bo := args.BuildOptions() 582 bo.SetDefaults() 583 incrementalState, fn := bo.IndexState() 584 reason = string(incrementalState) 585 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 586 587 switch incrementalState { 588 case build.IndexStateEqual: 589 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn) 590 return indexStateNoop, nil 591 592 case build.IndexStateMeta: 593 log.Printf("updating index.meta %s", args.String()) 594 595 if err := mergeMeta(bo); err != nil { 596 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 597 } else { 598 return indexStateSuccessMeta, nil 599 } 600 601 case build.IndexStateCorrupt: 602 log.Printf("falling back to full update: corrupt index: %s", args.String()) 603 } 604 } 605 606 log.Printf("updating index %s reason=%s", args.String(), reason) 607 608 metricIndexingTotal.Inc() 609 c := gitIndexConfig{ 610 runCmd: func(cmd *exec.Cmd) error { 611 return s.loggedRun(tr, cmd) 612 }, 613 614 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 615 return args.BuildOptions().FindRepositoryMetadata() 616 }, 617 timeout: s.timeout, 618 } 619 620 err = gitIndex(c, args, s.Sourcegraph, s.logger) 621 if err != nil { 622 return indexStateFail, err 623 } 624 625 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 626 s.logger.Error("failed to update index status", 627 sglog.String("repo", args.Name), 628 sglog.Uint32("id", args.RepoID), 629 sglogBranches("branches", args.Branches), 630 sglog.Error(err), 631 ) 632 } 633 634 return indexStateSuccess, nil 635} 636 637// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 638// it can update the zoekt_repos table. 639func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 640 // We need to read from disk for IndexTime. 641 _, metadata, ok, err := c.findRepositoryMetadata(args) 642 if err != nil { 643 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 644 } 645 if !ok { 646 return errors.New("failed to find metadata for new/updated index") 647 } 648 649 status := []indexStatus{{ 650 RepoID: args.RepoID, 651 Branches: args.Branches, 652 IndexTimeUnix: metadata.IndexTime.Unix(), 653 }} 654 if err := sg.UpdateIndexStatus(status); err != nil { 655 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 656 } 657 658 return nil 659} 660 661func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 662 ss := make([]string, len(branches)) 663 for i, b := range branches { 664 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 665 } 666 return sglog.Strings(key, ss) 667} 668 669func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 670 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0)) 671 return &indexArgs{ 672 IndexOptions: opts, 673 IndexDir: s.IndexDir, 674 Parallelism: parallelism, 675 Incremental: true, 676 FileLimit: MaxFileSize, 677 ShardMerging: s.shardMerging, 678 } 679} 680 681// parallelism consults both the server flags and index options to determine the number 682// of shards to index in parallel. If the CPUCount index option is provided, it always 683// overrides the server flag. 684func (s *Server) parallelism(opts IndexOptions, maxProcs int) int { 685 var parallelism int 686 if opts.ShardConcurrency > 0 { 687 parallelism = int(opts.ShardConcurrency) 688 } else { 689 parallelism = s.CPUCount 690 } 691 692 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs 693 if parallelism > 4*maxProcs { 694 parallelism = 4 * maxProcs 695 } 696 697 // If index concurrency is set, then divide the parallelism by the number of 698 // repos we're indexing in parallel 699 if s.IndexConcurrency > 1 { 700 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency))) 701 } 702 703 return parallelism 704} 705 706func createEmptyShard(args *indexArgs) error { 707 bo := args.BuildOptions() 708 bo.SetDefaults() 709 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 710 711 if args.Incremental && bo.IncrementalSkipIndexing() { 712 return nil 713 } 714 715 builder, err := build.NewBuilder(*bo) 716 if err != nil { 717 return err 718 } 719 return builder.Finish() 720} 721 722// addDebugHandlers adds handlers specific to indexserver. 723func (s *Server) addDebugHandlers(mux *http.ServeMux) { 724 // Sourcegraph's site admin view requires indexserver to serve it's admin view 725 // on "/". 726 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 727 728 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 729 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 730 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 731 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 732 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 733 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 734} 735 736func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 737 if r.Method != "GET" { 738 w.Header().Set("Allow", "GET") 739 w.WriteHeader(http.StatusMethodNotAllowed) 740 return 741 } 742 743 response := struct { 744 Hostname string 745 }{ 746 Hostname: s.hostname, 747 } 748 749 b, err := json.Marshal(response) 750 if err != nil { 751 http.Error(w, err.Error(), http.StatusInternalServerError) 752 return 753 } 754 755 w.Header().Set("Content-Type", "application/json; charset=utf-8") 756 w.Write(b) 757} 758 759var rootTmpl = template.Must(template.New("name").Parse(` 760<html> 761 <body> 762 <a href="debug">Debug</a><br /> 763 <a href="debug/requests">Traces</a><br /> 764 {{.IndexMsg}}<br /> 765 <br /> 766 <h3>Reindex</h3> 767 {{if .Repos}} 768 <a href="?show_repos=false">hide repos</a><br /> 769 <table style="margin-top: 20px"> 770 <th style="text-align:left">Name</th> 771 <th style="text-align:left">ID (click to reindex)</th> 772 {{range .Repos}} 773 <tr> 774 <td>{{.Name}}</td> 775 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 776 </tr> 777 {{end}} 778 </table> 779 {{else}} 780 <a href="?show_repos=true">show repos</a><br /> 781 {{end}} 782 </body> 783</html> 784`)) 785 786func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 787 if r.Method != "GET" { 788 w.Header().Set("Allow", "GET") 789 w.WriteHeader(http.StatusMethodNotAllowed) 790 return 791 } 792 793 values := r.URL.Query() 794 795 // ?id= 796 indexMsg := "" 797 if v := values.Get("id"); v != "" { 798 id, err := strconv.Atoi(v) 799 if err != nil { 800 http.Error(w, err.Error(), http.StatusBadRequest) 801 return 802 } 803 indexMsg, _ = s.forceIndex(uint32(id)) 804 } 805 806 // ?show_repos= 807 showRepos := false 808 if v := values.Get("show_repos"); v != "" { 809 showRepos, _ = strconv.ParseBool(v) 810 } 811 812 type Repo struct { 813 ID uint32 814 Name string 815 } 816 var data struct { 817 Repos []Repo 818 IndexMsg string 819 } 820 821 data.IndexMsg = indexMsg 822 823 if showRepos { 824 s.queue.Iterate(func(opts *IndexOptions) { 825 data.Repos = append(data.Repos, Repo{ 826 ID: opts.RepoID, 827 Name: opts.Name, 828 }) 829 }) 830 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 831 } 832 833 _ = rootTmpl.Execute(w, data) 834} 835 836// handleReindex triggers a reindex asynocronously. If a reindex was triggered 837// the request returns with status 202. The caller can infer the new state of 838// the index by calling List. 839func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 840 if r.Method != http.MethodPost { 841 w.Header().Set("Allow", http.MethodPost) 842 w.WriteHeader(http.StatusMethodNotAllowed) 843 return 844 } 845 846 err := r.ParseForm() 847 if err != nil { 848 http.Error(w, err.Error(), http.StatusBadRequest) 849 return 850 } 851 852 id, err := strconv.Atoi(r.Form.Get("repo")) 853 if err != nil { 854 http.Error(w, err.Error(), http.StatusBadRequest) 855 return 856 } 857 858 go func() { s.forceIndex(uint32(id)) }() 859 860 // 202 Accepted 861 w.WriteHeader(http.StatusAccepted) 862} 863 864func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 865 withIndexed := true 866 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 867 withIndexed = b 868 } 869 870 var indexed []uint32 871 if withIndexed { 872 indexed = listIndexed(s.IndexDir) 873 } 874 875 repos, err := s.Sourcegraph.List(r.Context(), indexed) 876 if err != nil { 877 http.Error(w, err.Error(), http.StatusInternalServerError) 878 return 879 } 880 881 bw := bytes.Buffer{} 882 883 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 884 885 _, err = fmt.Fprintf(tw, "ID\tName\n") 886 if err != nil { 887 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 888 return 889 } 890 891 s.queue.mu.Lock() 892 name := "" 893 for _, id := range repos.IDs { 894 if item := s.queue.get(id); item != nil { 895 name = item.opts.Name 896 } else { 897 name = "" 898 } 899 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 900 if err != nil { 901 debug.Printf("handleDebugList: %s\n", err.Error()) 902 } 903 } 904 s.queue.mu.Unlock() 905 906 if err != nil { 907 http.Error(w, err.Error(), http.StatusInternalServerError) 908 return 909 } 910 911 err = tw.Flush() 912 if err != nil { 913 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 914 return 915 } 916 917 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 918 919 if _, err := io.Copy(w, &bw); err != nil { 920 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 921 return 922 } 923} 924 925// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 926// can run this command during periods of low usage (evenings, weekends) to 927// trigger an initial merge run. In the steady-state, merges happen rarely, even 928// on busy instances, and users can rely on automatic merging instead. 929func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 930 // A merge operation can take very long, depending on the number merges and the 931 // target size of the compound shards. We run the merge in the background and 932 // return immediately to the user. 933 // 934 // We track the status of the merge with metricShardMergingRunning. 935 go func() { 936 s.doMerge() 937 }() 938 _, _ = w.Write([]byte("merging enqueued\n")) 939} 940 941func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 942 indexed := listIndexed(s.IndexDir) 943 944 bw := bytes.Buffer{} 945 946 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 947 948 _, err := fmt.Fprintf(tw, "ID\tName\n") 949 if err != nil { 950 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 951 return 952 } 953 954 s.queue.mu.Lock() 955 name := "" 956 for _, id := range indexed { 957 if item := s.queue.get(id); item != nil { 958 name = item.opts.Name 959 } else { 960 name = "" 961 } 962 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 963 if err != nil { 964 debug.Printf("handleDebugIndexed: %s\n", err.Error()) 965 } 966 } 967 s.queue.mu.Unlock() 968 969 if err != nil { 970 http.Error(w, err.Error(), http.StatusInternalServerError) 971 return 972 } 973 974 err = tw.Flush() 975 if err != nil { 976 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 977 return 978 } 979 980 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 981 982 if _, err := io.Copy(w, &bw); err != nil { 983 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 984 return 985 } 986} 987 988// forceIndex will run the index job for repo name now. It will return always 989// return a string explaining what it did, even if it failed. 990func (s *Server) forceIndex(id uint32) (string, error) { 991 var opts IndexOptions 992 var err error 993 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 994 opts = o 995 }, func(_ uint32, e error) { 996 err = e 997 }, id) 998 if err != nil { 999 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 1000 } 1001 1002 args := s.indexArgs(opts) 1003 args.Incremental = false // force re-index 1004 1005 var state indexState 1006 ran := s.muIndexDir.With(opts.Name, func() { 1007 state, err = s.Index(args) 1008 }) 1009 if !ran { 1010 return fmt.Sprintf("index job for repository already running: %s", args), nil 1011 } 1012 if err != nil { 1013 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 1014 } 1015 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 1016} 1017 1018func listIndexed(indexDir string) []uint32 { 1019 index := getShards(indexDir) 1020 metricNumIndexed.Set(float64(len(index))) 1021 repoIDs := make([]uint32, 0, len(index)) 1022 for id := range index { 1023 repoIDs = append(repoIDs, id) 1024 } 1025 sort.Slice(repoIDs, func(i, j int) bool { 1026 return repoIDs[i] < repoIDs[j] 1027 }) 1028 return repoIDs 1029} 1030 1031// setupTmpDir sets up a temporary directory on the same volume as the 1032// indexes. 1033// 1034// If main is true we will delete older temp directories left around. main is 1035// false when this is a debug command. 1036func setupTmpDir(logger sglog.Logger, main bool, index string) error { 1037 // change the target tmp directory depending on if it's our main daemon or a 1038 // debug sub command. 1039 dir := ".indexserver.debug.tmp" 1040 if main { 1041 dir = ".indexserver.tmp" 1042 } 1043 1044 tmpRoot := filepath.Join(index, dir) 1045 1046 if main { 1047 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1048 err := os.RemoveAll(tmpRoot) 1049 if err != nil { 1050 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1051 } 1052 } 1053 1054 if err := os.MkdirAll(tmpRoot, 0o755); err != nil { 1055 return err 1056 } 1057 1058 return os.Setenv("TMPDIR", tmpRoot) 1059} 1060 1061func printMetaData(fn string) error { 1062 repo, indexMeta, err := zoekt.ReadMetadataPath(fn) 1063 if err != nil { 1064 return err 1065 } 1066 1067 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1068 if err != nil { 1069 return err 1070 } 1071 1072 err = json.NewEncoder(os.Stdout).Encode(repo) 1073 if err != nil { 1074 return err 1075 } 1076 return nil 1077} 1078 1079func printShardStats(fn string) error { 1080 f, err := os.Open(fn) 1081 if err != nil { 1082 return err 1083 } 1084 1085 iFile, err := zoekt.NewIndexFile(f) 1086 if err != nil { 1087 return err 1088 } 1089 1090 return zoekt.PrintNgramStats(iFile) 1091} 1092 1093func srcLogLevelIsDebug() bool { 1094 lvl := os.Getenv(sglog.EnvLogLevel) 1095 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1096} 1097 1098func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1099 v := os.Getenv(k) 1100 if v == "" { 1101 return defaultVal 1102 } 1103 b, err := strconv.ParseBool(v) 1104 if err != nil { 1105 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1106 } 1107 return b 1108} 1109 1110func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1111 v := os.Getenv(k) 1112 if v == "" { 1113 return defaultVal 1114 } 1115 i, err := strconv.ParseInt(v, 10, 64) 1116 if err != nil { 1117 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1118 } 1119 return i 1120} 1121 1122func getEnvWithDefaultInt(k string, defaultVal int) int { 1123 v := os.Getenv(k) 1124 if v == "" { 1125 return defaultVal 1126 } 1127 i, err := strconv.Atoi(k) 1128 if err != nil { 1129 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1130 } 1131 return i 1132} 1133 1134func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1135 v := os.Getenv(k) 1136 if v == "" { 1137 return defaultVal 1138 } 1139 i, err := strconv.ParseUint(v, 10, 64) 1140 if err != nil { 1141 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1142 } 1143 return i 1144} 1145 1146func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1147 v := os.Getenv(k) 1148 if v == "" { 1149 return defaultVal 1150 } 1151 f, err := strconv.ParseFloat(v, 64) 1152 if err != nil { 1153 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1154 } 1155 return f 1156} 1157 1158func getEnvWithDefaultString(k string, defaultVal string) string { 1159 v := os.Getenv(k) 1160 if v == "" { 1161 return defaultVal 1162 } 1163 return v 1164} 1165 1166func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1167 v := os.Getenv(k) 1168 if v == "" { 1169 return defaultVal 1170 } 1171 1172 d, err := time.ParseDuration(v) 1173 if err != nil { 1174 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1175 } 1176 return d 1177} 1178 1179func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1180 set := map[string]struct{}{} 1181 for _, v := range strings.Split(os.Getenv(k), ",") { 1182 v = strings.TrimSpace(v) 1183 if v != "" { 1184 set[v] = struct{}{} 1185 } 1186 } 1187 return set 1188} 1189 1190func joinStringSet(set map[string]struct{}, sep string) string { 1191 var xs []string 1192 for x := range set { 1193 xs = append(xs, x) 1194 } 1195 1196 return strings.Join(xs, sep) 1197} 1198 1199func setCompoundShardCounter(indexDir string) { 1200 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt")) 1201 if err != nil { 1202 log.Printf("setCompoundShardCounter: %s\n", err) 1203 return 1204 } 1205 metricNumberCompoundShards.Set(float64(len(fns))) 1206} 1207 1208func rootCmd() *ffcli.Command { 1209 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1210 conf := rootConfig{ 1211 Main: true, 1212 } 1213 conf.registerRootFlags(rootFs) 1214 1215 return &ffcli.Command{ 1216 FlagSet: rootFs, 1217 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1218 Subcommands: []*ffcli.Command{debugCmd()}, 1219 Exec: func(ctx context.Context, args []string) error { 1220 return startServer(conf) 1221 }, 1222 } 1223} 1224 1225type rootConfig struct { 1226 // Main is true if this rootConfig is for our main long running command (the 1227 // indexserver). Debug commands should not set this value. This is used to 1228 // determine if we need to run tmpfriend. 1229 Main bool 1230 1231 root string 1232 interval time.Duration 1233 index string 1234 indexConcurrency int64 1235 listen string 1236 hostname string 1237 cpuFraction float64 1238 1239 // config values related to shard merging 1240 disableShardMerging bool 1241 vacuumInterval time.Duration 1242 mergeInterval time.Duration 1243 targetSize int64 1244 minSize int64 1245 minAgeDays int 1246 1247 // config values related to backoff indexing repos with one or more consecutive failures 1248 backoffDuration time.Duration 1249 maxBackoffDuration time.Duration 1250} 1251 1252func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1253 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1254 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1255 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently") 1256 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1257 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1258 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1259 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1260 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1261 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1262 1263 // flags related to shard merging 1264 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging") 1265 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1266 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1267 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 1268 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 1269 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1270} 1271 1272func startServer(conf rootConfig) error { 1273 s, err := newServer(conf) 1274 if err != nil { 1275 return err 1276 } 1277 1278 profiler.Init("zoekt-sourcegraph-indexserver") 1279 setCompoundShardCounter(s.IndexDir) 1280 1281 if conf.listen != "" { 1282 1283 mux := http.NewServeMux() 1284 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1285 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1286 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1287 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1288 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1289 }...) 1290 s.addDebugHandlers(mux) 1291 1292 go func() { 1293 debug.Printf("serving HTTP on %s", conf.listen) 1294 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1295 }() 1296 1297 // Serve mux on a unix domain socket on a best-effort-basis so that 1298 // webserver can call the endpoints via the shared filesystem. 1299 // 1300 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1301 // on the socket due to permission errors. See 1302 // https://github.com/docker/for-mac/issues/6239 1303 go func() { 1304 serveHTTPOverSocket := func() error { 1305 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1306 // We cannot bind a socket to an existing pathname. 1307 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1308 return fmt.Errorf("error removing socket file: %s", socket) 1309 } 1310 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1311 // unixpacket). 1312 l, err := net.Listen("unix", socket) 1313 if err != nil { 1314 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1315 } 1316 // Indexserver (root) and webserver (Sourcegraph) run with 1317 // different users. Per default, the socket is created with 1318 // permission 755 (root root), which doesn't let webserver write to 1319 // it. 1320 // 1321 // See https://github.com/golang/go/issues/11822 for more context. 1322 if err := os.Chmod(socket, 0o777); err != nil { 1323 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1324 } 1325 debug.Printf("serving HTTP on %s", socket) 1326 return http.Serve(l, mux) 1327 } 1328 debug.Print(serveHTTPOverSocket()) 1329 }() 1330 } 1331 1332 oc := &ownerChecker{ 1333 Path: filepath.Join(conf.index, "owner.txt"), 1334 Hostname: conf.hostname, 1335 } 1336 go oc.Run() 1337 1338 logger := sglog.Scoped("metricsRegistration") 1339 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1340 1341 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1342 prometheus.DefaultRegisterer.MustRegister(c) 1343 1344 s.Run() 1345 return nil 1346} 1347 1348func newServer(conf rootConfig) (*Server, error) { 1349 logger := sglog.Scoped("server") 1350 1351 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1352 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1353 } 1354 if conf.index == "" { 1355 return nil, fmt.Errorf("must set -index") 1356 } 1357 if conf.root == "" { 1358 return nil, fmt.Errorf("must set -sourcegraph_url") 1359 } 1360 rootURL, err := url.Parse(conf.root) 1361 if err != nil { 1362 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1363 } 1364 1365 rootURL = addDefaultPort(rootURL) 1366 1367 // Tune GOMAXPROCS to match Linux container CPU quota. 1368 _, _ = maxprocs.Set() 1369 1370 // Automatically prepend our own path at the front, to minimize 1371 // required configuration. 1372 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1373 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1374 } 1375 1376 if _, err := os.Stat(conf.index); err != nil { 1377 if err := os.MkdirAll(conf.index, 0o755); err != nil { 1378 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1379 } 1380 } 1381 1382 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1383 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1384 } 1385 1386 if srcLogLevelIsDebug() { 1387 debug = log.New(os.Stderr, "", log.LstdFlags) 1388 } 1389 1390 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1391 if len(reposWithSeparateIndexingMetrics) > 0 { 1392 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1393 } 1394 1395 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1396 if len(deltaBuildRepositoriesAllowList) > 0 { 1397 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1398 } 1399 1400 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1401 if deltaShardNumberFallbackThreshold > 0 { 1402 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1403 } else { 1404 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1405 } 1406 1407 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1408 if len(reposShouldSkipSymbolsCalculation) > 0 { 1409 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1410 } 1411 1412 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1413 if indexingTimeout != defaultIndexingTimeout { 1414 debug.Printf("using configured indexing timeout: %s", indexingTimeout) 1415 } 1416 1417 var sg Sourcegraph 1418 if rootURL.IsAbs() { 1419 var batchSize int 1420 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1421 batchSize, err = strconv.Atoi(v) 1422 if err != nil { 1423 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1424 } 1425 } 1426 1427 opts := []SourcegraphClientOption{ 1428 WithBatchSize(batchSize), 1429 } 1430 1431 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1432 client, err := dialGRPCClient(rootURL.Host, logger) 1433 if err != nil { 1434 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1435 } 1436 1437 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...) 1438 1439 } else { 1440 sg = sourcegraphFake{ 1441 RootDir: rootURL.String(), 1442 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1443 } 1444 } 1445 1446 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1447 if cpuCount < 1 { 1448 cpuCount = 1 1449 } 1450 1451 if conf.indexConcurrency < 1 { 1452 conf.indexConcurrency = 1 1453 } else if conf.indexConcurrency > int64(cpuCount) { 1454 conf.indexConcurrency = int64(cpuCount) 1455 } 1456 1457 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1458 1459 return &Server{ 1460 logger: logger, 1461 Sourcegraph: sg, 1462 IndexDir: conf.index, 1463 IndexConcurrency: int(conf.indexConcurrency), 1464 Interval: conf.interval, 1465 CPUCount: cpuCount, 1466 queue: *q, 1467 shardMerging: !conf.disableShardMerging, 1468 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1469 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1470 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1471 hostname: conf.hostname, 1472 mergeOpts: mergeOpts{ 1473 vacuumInterval: conf.vacuumInterval, 1474 mergeInterval: conf.mergeInterval, 1475 targetSizeBytes: conf.targetSize * 1024 * 1024, 1476 minSizeBytes: conf.minSize * 1024 * 1024, 1477 minAgeDays: conf.minAgeDays, 1478 }, 1479 timeout: indexingTimeout, 1480 }, err 1481} 1482 1483// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1484// for the indexed-search-configuration gRPC service. 1485// 1486// The default backoff strategy is modeled after the default settings used by 1487// retryablehttp.DefaultClient. 1488// 1489// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1490// - Unavailable 1491// - Aborted 1492// 1493//go:embed default_grpc_service_configuration.json 1494var defaultGRPCServiceConfigurationJSON string 1495 1496func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1497 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1498 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1499 return invoker(ctx, method, req, reply, cc, opts...) 1500 } 1501} 1502 1503func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1504 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1505 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1506 return streamer(ctx, desc, cc, method, opts...) 1507 } 1508} 1509 1510// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1511// This can be overridden by providing custom Server/Dial options. 1512const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1513 1514func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) { 1515 metrics := clientMetricsOnce() 1516 1517 // If the service seems to be unavailable, this 1518 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1519 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1520 retryOpts := []retry.CallOption{ 1521 retry.WithMax(5), 1522 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1523 retry.WithCodes(codes.Unavailable), 1524 } 1525 1526 opts := []grpc.DialOption{ 1527 grpc.WithTransportCredentials(insecure.NewCredentials()), 1528 grpc.WithChainStreamInterceptor( 1529 metrics.StreamClientInterceptor(), 1530 messagesize.StreamClientInterceptor, 1531 internalActorStreamInterceptor(), 1532 internalerrs.LoggingStreamClientInterceptor(logger), 1533 internalerrs.PrometheusStreamClientInterceptor, 1534 retry.StreamClientInterceptor(retryOpts...), 1535 ), 1536 grpc.WithChainUnaryInterceptor( 1537 metrics.UnaryClientInterceptor(), 1538 messagesize.UnaryClientInterceptor, 1539 internalActorUnaryInterceptor(), 1540 internalerrs.LoggingUnaryClientInterceptor(logger), 1541 internalerrs.PrometheusUnaryClientInterceptor, 1542 retry.UnaryClientInterceptor(retryOpts...), 1543 ), 1544 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1545 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1546 } 1547 1548 opts = append(opts, additionalOpts...) 1549 1550 // Ensure that the message size options are set last, so they override any other 1551 // client-specific options that tweak the message size. 1552 // 1553 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1554 // take precedence over everything else with a uniform size setting that's easy to reason about. 1555 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1556 1557 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1558 // This is done lazily, so we can provide the client to use regardless of 1559 // whether we enabled gRPC or not initially. 1560 cc, err := grpc.Dial(addr, opts...) 1561 if err != nil { 1562 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1563 } 1564 1565 client := proto.NewZoektConfigurationServiceClient(cc) 1566 return client, nil 1567} 1568 1569// addDefaultPort adds a default port to a URL if one is not specified. 1570// 1571// If the URL scheme is "http" and no port is specified, "80" is used. 1572// If the scheme is "https", "443" is used. 1573// 1574// The original URL is not mutated. A copy is modified and returned. 1575func addDefaultPort(original *url.URL) *url.URL { 1576 if original == nil { 1577 return nil // don't panic 1578 } 1579 1580 if !original.IsAbs() { 1581 return original // don't do anything if the URL doesn't look like a remote URL 1582 } 1583 1584 if original.Scheme == "http" && original.Port() == "" { 1585 u := cloneURL(original) 1586 u.Host = net.JoinHostPort(u.Host, "80") 1587 return u 1588 } 1589 1590 if original.Scheme == "https" && original.Port() == "" { 1591 u := cloneURL(original) 1592 u.Host = net.JoinHostPort(u.Host, "443") 1593 return u 1594 } 1595 1596 return original 1597} 1598 1599// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1600// This is copied from net/http/clone.go 1601func cloneURL(u *url.URL) *url.URL { 1602 if u == nil { 1603 return nil 1604 } 1605 u2 := new(url.URL) 1606 *u2 = *u 1607 if u.User != nil { 1608 u2.User = new(url.Userinfo) 1609 *u2.User = *u.User 1610 } 1611 return u2 1612} 1613 1614func main() { 1615 liblog := sglog.Init(sglog.Resource{ 1616 Name: "zoekt-indexserver", 1617 Version: zoekt.Version, 1618 InstanceID: zoekt.HostnameBestEffort(), 1619 }) 1620 defer liblog.Sync() 1621 1622 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1623 log.Fatal(err) 1624 } 1625} 1626 1627// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1628// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1629// 1630// An error is returned of the provided environment variables fails to parse as a boolean. 1631func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1632 for _, envVar := range envVarNames { 1633 v := os.Getenv(envVar) 1634 if v == "" { 1635 continue 1636 } 1637 1638 b, err := strconv.ParseBool(v) 1639 if err != nil { 1640 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1641 } 1642 1643 return b, nil 1644 } 1645 1646 return defaultBool, nil 1647}