fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled 2// repositories on sourcegraph 3package main 4 5import ( 6 "bytes" 7 "context" 8 _ "embed" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "html/template" 14 "io" 15 "io/fs" 16 "log" 17 "math" 18 "math/rand" 19 "net" 20 "net/http" 21 "net/url" 22 "os" 23 "os/exec" 24 "os/signal" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strconv" 29 "strings" 30 "sync" 31 "text/tabwriter" 32 "time" 33 34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 36 "github.com/peterbourgon/ff/v3/ffcli" 37 "github.com/prometheus/client_golang/prometheus" 38 "github.com/prometheus/client_golang/prometheus/promauto" 39 sglog "github.com/sourcegraph/log" 40 "github.com/sourcegraph/mountinfo" 41 "go.uber.org/automaxprocs/maxprocs" 42 "golang.org/x/net/trace" 43 "golang.org/x/sys/unix" 44 "google.golang.org/grpc" 45 "google.golang.org/grpc/codes" 46 "google.golang.org/grpc/credentials/insecure" 47 "google.golang.org/grpc/metadata" 48 49 "github.com/sourcegraph/zoekt" 50 "github.com/sourcegraph/zoekt/build" 51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 52 "github.com/sourcegraph/zoekt/debugserver" 53 "github.com/sourcegraph/zoekt/grpc/internalerrs" 54 "github.com/sourcegraph/zoekt/grpc/messagesize" 55 "github.com/sourcegraph/zoekt/internal/profiler" 56) 57 58var ( 59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 60 Name: "resolve_revisions_seconds", 61 Help: "A histogram of latencies for resolving all repository revisions.", 62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 63 }) 64 65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 66 Name: "resolve_revision_seconds", 67 Help: "A histogram of latencies for resolving a repository revision.", 68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 69 }, []string{"success"}) // success=true|false 70 71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 72 Name: "get_index_options_total", 73 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 74 }) 75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 76 Name: "get_index_options_error_total", 77 Help: "The total number of times we failed to get index options for a repository.", 78 }) 79 80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 81 Name: "index_repo_seconds", 82 Help: "A histogram of latencies for indexing a repository.", 83 Buckets: prometheus.ExponentialBucketsRange( 84 (100 * time.Millisecond).Seconds(), 85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 86 20), 87 }, []string{ 88 "state", // state is an indexState 89 "name", // name of the repository that was indexed 90 }) 91 92 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 93 Name: "index_fetch_seconds", 94 Help: "A histogram of latencies for fetching a repository.", 95 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 96 }, []string{ 97 "success", // true|false 98 "name", // the name of the repository that the commits were fetched from 99 }) 100 101 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 102 Name: "index_incremental_index_state", 103 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 104 }, []string{"state"}) // state is build.IndexState 105 106 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 107 Name: "index_num_indexed", 108 Help: "Number of indexed repos by code host", 109 }) 110 111 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 112 Name: "index_failing_total", 113 Help: "Counts failures to index (indexing activity, should be used with rate())", 114 }) 115 116 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 117 Name: "index_indexing_total", 118 Help: "Counts indexings (indexing activity, should be used with rate())", 119 }) 120 121 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 122 Name: "index_num_stopped_tracking_total", 123 Help: "Counts the number of repos we stopped tracking.", 124 }) 125 126 clientMetricsOnce sync.Once 127 clientMetrics *grpcprom.ClientMetrics 128) 129 130// set of repositories that we want to capture separate indexing metrics for 131var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 132 133type indexState string 134 135const ( 136 indexStateFail indexState = "fail" 137 indexStateSuccess indexState = "success" 138 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 139 indexStateNoop indexState = "noop" // We didn't need to update index 140 indexStateEmpty indexState = "empty" // index is empty (empty repo) 141) 142 143// Server is the main functionality of zoekt-sourcegraph-indexserver. It 144// exists to conveniently use all the options passed in via func main. 145type Server struct { 146 logger sglog.Logger 147 148 Sourcegraph Sourcegraph 149 BatchSize int 150 151 // IndexDir is the index directory to use. 152 IndexDir string 153 154 // IndexConcurrency is the number of repositories we index at once. 155 IndexConcurrency int 156 157 // Interval is how often we sync with Sourcegraph. 158 Interval time.Duration 159 160 // CPUCount is the number of CPUs to use for indexing shards. 161 CPUCount int 162 163 queue Queue 164 165 // muIndexDir protects the index directory from concurrent access. 166 muIndexDir indexMutex 167 168 // If true, shard merging is enabled. 169 shardMerging bool 170 171 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 172 // use delta-builds for instead of normal builds 173 deltaBuildRepositoriesAllowList map[string]struct{} 174 175 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 176 // before attempting a delta build. 177 deltaShardNumberFallbackThreshold uint64 178 179 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 180 // we skip calculating symbols metadata for during builds 181 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 182 183 hostname string 184 185 mergeOpts mergeOpts 186 187 // timeout defines how long the index server waits before killing an indexing job. 188 timeout time.Duration 189} 190 191var debug = log.New(io.Discard, "", log.LstdFlags) 192 193// our index commands should output something every 100mb they process. 194// 195// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 196// time." famous last words. A client was indexing a monorepo with 42 197// cores... 5m was not enough. 198const noOutputTimeout = 30 * time.Minute 199 200func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 201 out := &synchronizedBuffer{} 202 cmd.Stdout = out 203 cmd.Stderr = out 204 205 tr.LazyPrintf("%s", cmd.Args) 206 207 defer func() { 208 if err != nil { 209 outS := out.String() 210 tr.LazyPrintf("failed: %v", err) 211 tr.LazyPrintf("output: %s", out) 212 tr.SetError() 213 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 214 } 215 }() 216 217 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 218 219 if err := cmd.Start(); err != nil { 220 return err 221 } 222 223 errC := make(chan error) 224 go func() { 225 errC <- cmd.Wait() 226 }() 227 228 // This channel is set after we have sent sigquit. It allows us to follow up 229 // with a sigkill if the process doesn't quit after sigquit. 230 kill := make(<-chan time.Time) 231 232 lastLen := 0 233 for { 234 select { 235 case <-time.After(noOutputTimeout): 236 // Periodically check if we have had output. If not kill the process. 237 if out.Len() != lastLen { 238 lastLen = out.Len() 239 log.Printf("still running %s", cmd.Args) 240 } else { 241 // Send quit (C-\) first so we get a stack dump. 242 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 243 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 244 log.Println("quit failed:", err) 245 } 246 247 // send sigkill if still running in 10s 248 kill = time.After(10 * time.Second) 249 } 250 251 case <-kill: 252 log.Printf("still running, killing %s", cmd.Args) 253 if err := cmd.Process.Kill(); err != nil { 254 log.Println("kill failed:", err) 255 } 256 257 case err := <-errC: 258 if err != nil { 259 return err 260 } 261 262 tr.LazyPrintf("success") 263 return nil 264 } 265 } 266} 267 268// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 269// monitor the buffer while it is being written to. 270type synchronizedBuffer struct { 271 mu sync.Mutex 272 b bytes.Buffer 273} 274 275func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 276 sb.mu.Lock() 277 defer sb.mu.Unlock() 278 return sb.b.Write(p) 279} 280 281func (sb *synchronizedBuffer) Len() int { 282 sb.mu.Lock() 283 defer sb.mu.Unlock() 284 return sb.b.Len() 285} 286 287func (sb *synchronizedBuffer) String() string { 288 sb.mu.Lock() 289 defer sb.mu.Unlock() 290 return sb.b.String() 291} 292 293// pauseFileName if present in IndexDir will stop index jobs from 294// running. This is to make it possible to experiment with the content of the 295// IndexDir without the indexserver writing to it. 296const pauseFileName = "PAUSE" 297 298// Run the sync loop. This blocks forever. 299func (s *Server) Run() { 300 removeIncompleteShards(s.IndexDir) 301 302 // Start a goroutine which updates the queue with commits to index. 303 go func() { 304 // We update the list of indexed repos every Interval. To speed up manual 305 // testing we also listen for SIGUSR1 to trigger updates. 306 // 307 // "pkill -SIGUSR1 zoekt-sourcegra" 308 for range jitterTicker(s.Interval, unix.SIGUSR1) { 309 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 310 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 311 continue 312 } 313 314 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 315 if err != nil { 316 log.Printf("error listing repos: %s", err) 317 continue 318 } 319 320 debug.Printf("updating index queue with %d repositories", len(repos.IDs)) 321 322 // Stop indexing repos we don't need to track anymore 323 removed := s.queue.MaybeRemoveMissing(repos.IDs) 324 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 325 if len(removed) > 0 { 326 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 327 } 328 329 cleanupDone := make(chan struct{}) 330 go func() { 331 defer close(cleanupDone) 332 s.muIndexDir.Global(func() { 333 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 334 }) 335 }() 336 337 repos.IterateIndexOptions(s.queue.AddOrUpdate) 338 339 // IterateIndexOptions will only iterate over repositories that have 340 // changed since we last called list. However, we want to add all IDs 341 // back onto the queue just to check that what is on disk is still 342 // correct. This will use the last IndexOptions we stored in the 343 // queue. The repositories not on the queue (missing) need a forced 344 // fetch of IndexOptions. 345 missing := s.queue.Bump(repos.IDs) 346 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 347 348 setCompoundShardCounter(s.IndexDir) 349 350 <-cleanupDone 351 } 352 }() 353 354 go func() { 355 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 356 if s.shardMerging { 357 s.vacuum() 358 } 359 } 360 }() 361 362 go func() { 363 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 364 if s.shardMerging { 365 s.doMerge() 366 } 367 } 368 }() 369 370 for i := 0; i < s.IndexConcurrency; i++ { 371 go s.processQueue() 372 } 373 374 // block forever 375 select {} 376} 377 378// formatList returns a comma-separated list of the first min(len(v), m) items. 379func formatListUint32(v []uint32, m int) string { 380 if len(v) < m { 381 m = len(v) 382 } 383 384 sb := strings.Builder{} 385 for i := 0; i < m; i++ { 386 fmt.Fprintf(&sb, "%d, ", v[i]) 387 } 388 389 if len(v) > m { 390 sb.WriteString("...") 391 } 392 393 return strings.TrimRight(sb.String(), ", ") 394} 395 396func (s *Server) processQueue() { 397 for { 398 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 399 time.Sleep(time.Second) 400 continue 401 } 402 403 opts, ok := s.queue.Pop() 404 if !ok { 405 time.Sleep(time.Second) 406 continue 407 } 408 409 args := s.indexArgs(opts) 410 411 ran := s.muIndexDir.With(opts.Name, func() { 412 // only record time taken once we hold the lock. This avoids us 413 // recording time taken while merging/cleanup runs. 414 start := time.Now() 415 416 state, err := s.Index(args) 417 418 elapsed := time.Since(start) 419 420 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 421 422 if err != nil { 423 log.Printf("error indexing %s: %s", args.String(), err) 424 } 425 426 switch state { 427 case indexStateSuccess: 428 var branches []string 429 for _, b := range args.Branches { 430 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 431 } 432 s.logger.Info("updated index", 433 sglog.String("repo", args.Name), 434 sglog.Uint32("id", args.RepoID), 435 sglog.Strings("branches", branches), 436 sglog.Duration("duration", elapsed), 437 ) 438 case indexStateSuccessMeta: 439 log.Printf("updated meta %s in %v", args.String(), elapsed) 440 } 441 s.queue.SetIndexed(opts, state) 442 }) 443 444 if !ran { 445 // Someone else is processing the repository. We can just skip this job 446 // since the repository will be added back to the queue and we will 447 // converge to the correct behaviour. 448 debug.Printf("index job for repository already running: %s", args) 449 continue 450 } 451 } 452} 453 454// repoNameForMetric returns a normalized version of the given repository name that is 455// suitable for use with Prometheus metrics. 456func repoNameForMetric(repo string) string { 457 // Check to see if we want to be able to capture separate indexing metrics for this repository. 458 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 459 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 460 return repo 461 } 462 463 return "" 464} 465 466func batched(slice []uint32, size int) <-chan []uint32 { 467 c := make(chan []uint32) 468 go func() { 469 for len(slice) > 0 { 470 if size > len(slice) { 471 size = len(slice) 472 } 473 c <- slice[:size] 474 slice = slice[size:] 475 } 476 close(c) 477 }() 478 return c 479} 480 481// jitterTicker returns a ticker which ticks with a jitter. Each tick is 482// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 483// 484// sig is a list of signals which also cause the ticker to fire. This is a 485// convenience to allow manually triggering of the ticker. 486func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 487 ticker := make(chan struct{}) 488 489 go func() { 490 for { 491 ticker <- struct{}{} 492 ns := int64(d) 493 jitter := rand.Int63n(ns) 494 time.Sleep(time.Duration(ns/2 + jitter)) 495 } 496 }() 497 498 go func() { 499 if len(sig) == 0 { 500 return 501 } 502 503 c := make(chan os.Signal, 1) 504 signal.Notify(c, sig...) 505 for range c { 506 ticker <- struct{}{} 507 } 508 }() 509 510 return ticker 511} 512 513// Index starts an index job for repo name at commit. 514func (s *Server) Index(args *indexArgs) (state indexState, err error) { 515 tr := trace.New("index", args.Name) 516 517 defer func() { 518 if err != nil { 519 tr.SetError() 520 tr.LazyPrintf("error: %v", err) 521 state = indexStateFail 522 metricFailingTotal.Inc() 523 } 524 tr.LazyPrintf("state: %s", state) 525 tr.Finish() 526 }() 527 528 tr.LazyPrintf("branches: %v", args.Branches) 529 530 if len(args.Branches) == 0 { 531 return indexStateEmpty, createEmptyShard(args) 532 } 533 534 repositoryName := args.Name 535 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 536 tr.LazyPrintf("marking this repository for delta build") 537 args.UseDelta = true 538 } 539 540 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 541 542 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 543 tr.LazyPrintf("skipping symbols calculation") 544 args.Symbols = false 545 } 546 547 reason := "forced" 548 549 if args.Incremental { 550 bo := args.BuildOptions() 551 bo.SetDefaults() 552 incrementalState, fn := bo.IndexState() 553 reason = string(incrementalState) 554 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 555 556 switch incrementalState { 557 case build.IndexStateEqual: 558 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn) 559 return indexStateNoop, nil 560 561 case build.IndexStateMeta: 562 log.Printf("updating index.meta %s", args.String()) 563 564 if err := mergeMeta(bo); err != nil { 565 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 566 } else { 567 return indexStateSuccessMeta, nil 568 } 569 570 case build.IndexStateCorrupt: 571 log.Printf("falling back to full update: corrupt index: %s", args.String()) 572 } 573 } 574 575 log.Printf("updating index %s reason=%s", args.String(), reason) 576 577 metricIndexingTotal.Inc() 578 c := gitIndexConfig{ 579 runCmd: func(cmd *exec.Cmd) error { 580 return s.loggedRun(tr, cmd) 581 }, 582 583 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 584 return args.BuildOptions().FindRepositoryMetadata() 585 }, 586 timeout: s.timeout, 587 } 588 589 err = gitIndex(c, args, s.Sourcegraph, s.logger) 590 if err != nil { 591 return indexStateFail, err 592 } 593 594 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 595 s.logger.Error("failed to update index status", 596 sglog.String("repo", args.Name), 597 sglog.Uint32("id", args.RepoID), 598 sglogBranches("branches", args.Branches), 599 sglog.Error(err), 600 ) 601 } 602 603 return indexStateSuccess, nil 604} 605 606// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 607// it can update the zoekt_repos table. 608func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 609 // We need to read from disk for IndexTime. 610 _, metadata, ok, err := c.findRepositoryMetadata(args) 611 if err != nil { 612 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 613 } 614 if !ok { 615 return errors.New("failed to find metadata for new/updated index") 616 } 617 618 status := []indexStatus{{ 619 RepoID: args.RepoID, 620 Branches: args.Branches, 621 IndexTimeUnix: metadata.IndexTime.Unix(), 622 }} 623 if err := sg.UpdateIndexStatus(status); err != nil { 624 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 625 } 626 627 return nil 628} 629 630func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 631 ss := make([]string, len(branches)) 632 for i, b := range branches { 633 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 634 } 635 return sglog.Strings(key, ss) 636} 637 638func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 639 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0)) 640 return &indexArgs{ 641 IndexOptions: opts, 642 IndexDir: s.IndexDir, 643 Parallelism: parallelism, 644 Incremental: true, 645 646 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 647 FileLimit: 1 << 20, 648 } 649} 650 651// parallelism consults both the server flags and index options to determine the number 652// of shards to index in parallel. If the CPUCount index option is provided, it always 653// overrides the server flag. 654func (s *Server) parallelism(opts IndexOptions, maxProcs int) int { 655 var parallelism int 656 if opts.ShardConcurrency > 0 { 657 parallelism = int(opts.ShardConcurrency) 658 } else { 659 parallelism = s.CPUCount 660 } 661 662 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs 663 if parallelism > 4*maxProcs { 664 parallelism = 4 * maxProcs 665 } 666 667 // If index concurrency is set, then divide the parallelism by the number of 668 // repos we're indexing in parallel 669 if s.IndexConcurrency > 1 { 670 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency))) 671 } 672 673 return parallelism 674} 675 676func createEmptyShard(args *indexArgs) error { 677 bo := args.BuildOptions() 678 bo.SetDefaults() 679 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 680 681 if args.Incremental && bo.IncrementalSkipIndexing() { 682 return nil 683 } 684 685 builder, err := build.NewBuilder(*bo) 686 if err != nil { 687 return err 688 } 689 return builder.Finish() 690} 691 692// addDebugHandlers adds handlers specific to indexserver. 693func (s *Server) addDebugHandlers(mux *http.ServeMux) { 694 // Sourcegraph's site admin view requires indexserver to serve it's admin view 695 // on "/". 696 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 697 698 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 699 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 700 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 701 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 702 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 703 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 704} 705 706func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 707 if r.Method != "GET" { 708 w.Header().Set("Allow", "GET") 709 w.WriteHeader(http.StatusMethodNotAllowed) 710 return 711 } 712 713 response := struct { 714 Hostname string 715 }{ 716 Hostname: s.hostname, 717 } 718 719 b, err := json.Marshal(response) 720 if err != nil { 721 http.Error(w, err.Error(), http.StatusInternalServerError) 722 return 723 } 724 725 w.Header().Set("Content-Type", "application/json; charset=utf-8") 726 w.Write(b) 727} 728 729var rootTmpl = template.Must(template.New("name").Parse(` 730<html> 731 <body> 732 <a href="debug">Debug</a><br /> 733 <a href="debug/requests">Traces</a><br /> 734 {{.IndexMsg}}<br /> 735 <br /> 736 <h3>Reindex</h3> 737 {{if .Repos}} 738 <a href="?show_repos=false">hide repos</a><br /> 739 <table style="margin-top: 20px"> 740 <th style="text-align:left">Name</th> 741 <th style="text-align:left">ID</th> 742 {{range .Repos}} 743 <tr> 744 <td>{{.Name}}</td> 745 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 746 </tr> 747 {{end}} 748 </table> 749 {{else}} 750 <a href="?show_repos=true">show repos</a><br /> 751 {{end}} 752 </body> 753</html> 754`)) 755 756func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 757 if r.Method != "GET" { 758 w.Header().Set("Allow", "GET") 759 w.WriteHeader(http.StatusMethodNotAllowed) 760 return 761 } 762 763 values := r.URL.Query() 764 765 // ?id= 766 indexMsg := "" 767 if v := values.Get("id"); v != "" { 768 id, err := strconv.Atoi(v) 769 if err != nil { 770 http.Error(w, err.Error(), http.StatusBadRequest) 771 return 772 } 773 indexMsg, _ = s.forceIndex(uint32(id)) 774 } 775 776 // ?show_repos= 777 showRepos := false 778 if v := values.Get("show_repos"); v != "" { 779 showRepos, _ = strconv.ParseBool(v) 780 } 781 782 type Repo struct { 783 ID uint32 784 Name string 785 } 786 var data struct { 787 Repos []Repo 788 IndexMsg string 789 } 790 791 data.IndexMsg = indexMsg 792 793 if showRepos { 794 s.queue.Iterate(func(opts *IndexOptions) { 795 data.Repos = append(data.Repos, Repo{ 796 ID: opts.RepoID, 797 Name: opts.Name, 798 }) 799 }) 800 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 801 } 802 803 _ = rootTmpl.Execute(w, data) 804} 805 806// handleReindex triggers a reindex asynocronously. If a reindex was triggered 807// the request returns with status 202. The caller can infer the new state of 808// the index by calling List. 809func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 810 if r.Method != http.MethodPost { 811 w.Header().Set("Allow", http.MethodPost) 812 w.WriteHeader(http.StatusMethodNotAllowed) 813 return 814 } 815 816 err := r.ParseForm() 817 if err != nil { 818 http.Error(w, err.Error(), http.StatusBadRequest) 819 return 820 } 821 822 id, err := strconv.Atoi(r.Form.Get("repo")) 823 if err != nil { 824 http.Error(w, err.Error(), http.StatusBadRequest) 825 return 826 } 827 828 go func() { s.forceIndex(uint32(id)) }() 829 830 // 202 Accepted 831 w.WriteHeader(http.StatusAccepted) 832} 833 834func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 835 withIndexed := true 836 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 837 withIndexed = b 838 } 839 840 var indexed []uint32 841 if withIndexed { 842 indexed = listIndexed(s.IndexDir) 843 } 844 845 repos, err := s.Sourcegraph.List(r.Context(), indexed) 846 if err != nil { 847 http.Error(w, err.Error(), http.StatusInternalServerError) 848 return 849 } 850 851 bw := bytes.Buffer{} 852 853 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 854 855 _, err = fmt.Fprintf(tw, "ID\tName\n") 856 if err != nil { 857 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 858 return 859 } 860 861 s.queue.mu.Lock() 862 name := "" 863 for _, id := range repos.IDs { 864 if item := s.queue.get(id); item != nil { 865 name = item.opts.Name 866 } else { 867 name = "" 868 } 869 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 870 if err != nil { 871 debug.Printf("handleDebugList: %s\n", err.Error()) 872 } 873 } 874 s.queue.mu.Unlock() 875 876 if err != nil { 877 http.Error(w, err.Error(), http.StatusInternalServerError) 878 return 879 } 880 881 err = tw.Flush() 882 if err != nil { 883 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 884 return 885 } 886 887 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 888 889 if _, err := io.Copy(w, &bw); err != nil { 890 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 891 return 892 } 893} 894 895// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 896// can run this command during periods of low usage (evenings, weekends) to 897// trigger an initial merge run. In the steady-state, merges happen rarely, even 898// on busy instances, and users can rely on automatic merging instead. 899func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 900 // A merge operation can take very long, depending on the number merges and the 901 // target size of the compound shards. We run the merge in the background and 902 // return immediately to the user. 903 // 904 // We track the status of the merge with metricShardMergingRunning. 905 go func() { 906 s.doMerge() 907 }() 908 _, _ = w.Write([]byte("merging enqueued\n")) 909} 910 911func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 912 indexed := listIndexed(s.IndexDir) 913 914 bw := bytes.Buffer{} 915 916 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 917 918 _, err := fmt.Fprintf(tw, "ID\tName\n") 919 if err != nil { 920 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 921 return 922 } 923 924 s.queue.mu.Lock() 925 name := "" 926 for _, id := range indexed { 927 if item := s.queue.get(id); item != nil { 928 name = item.opts.Name 929 } else { 930 name = "" 931 } 932 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 933 if err != nil { 934 debug.Printf("handleDebugIndexed: %s\n", err.Error()) 935 } 936 } 937 s.queue.mu.Unlock() 938 939 if err != nil { 940 http.Error(w, err.Error(), http.StatusInternalServerError) 941 return 942 } 943 944 err = tw.Flush() 945 if err != nil { 946 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 947 return 948 } 949 950 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 951 952 if _, err := io.Copy(w, &bw); err != nil { 953 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 954 return 955 } 956} 957 958// forceIndex will run the index job for repo name now. It will return always 959// return a string explaining what it did, even if it failed. 960func (s *Server) forceIndex(id uint32) (string, error) { 961 var opts IndexOptions 962 var err error 963 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 964 opts = o 965 }, func(_ uint32, e error) { 966 err = e 967 }, id) 968 if err != nil { 969 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 970 } 971 972 args := s.indexArgs(opts) 973 args.Incremental = false // force re-index 974 975 var state indexState 976 ran := s.muIndexDir.With(opts.Name, func() { 977 state, err = s.Index(args) 978 }) 979 if !ran { 980 return fmt.Sprintf("index job for repository already running: %s", args), nil 981 } 982 if err != nil { 983 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 984 } 985 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 986} 987 988func listIndexed(indexDir string) []uint32 { 989 index := getShards(indexDir) 990 metricNumIndexed.Set(float64(len(index))) 991 repoIDs := make([]uint32, 0, len(index)) 992 for id := range index { 993 repoIDs = append(repoIDs, id) 994 } 995 sort.Slice(repoIDs, func(i, j int) bool { 996 return repoIDs[i] < repoIDs[j] 997 }) 998 return repoIDs 999} 1000 1001// setupTmpDir sets up a temporary directory on the same volume as the 1002// indexes. 1003// 1004// If main is true we will delete older temp directories left around. main is 1005// false when this is a debug command. 1006func setupTmpDir(logger sglog.Logger, main bool, index string) error { 1007 // change the target tmp directory depending on if it's our main daemon or a 1008 // debug sub command. 1009 dir := ".indexserver.debug.tmp" 1010 if main { 1011 dir = ".indexserver.tmp" 1012 } 1013 1014 tmpRoot := filepath.Join(index, dir) 1015 1016 if main { 1017 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1018 err := os.RemoveAll(tmpRoot) 1019 if err != nil { 1020 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1021 } 1022 } 1023 1024 if err := os.MkdirAll(tmpRoot, 0o755); err != nil { 1025 return err 1026 } 1027 1028 return os.Setenv("TMPDIR", tmpRoot) 1029} 1030 1031func printMetaData(fn string) error { 1032 repo, indexMeta, err := zoekt.ReadMetadataPath(fn) 1033 if err != nil { 1034 return err 1035 } 1036 1037 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1038 if err != nil { 1039 return err 1040 } 1041 1042 err = json.NewEncoder(os.Stdout).Encode(repo) 1043 if err != nil { 1044 return err 1045 } 1046 return nil 1047} 1048 1049func printShardStats(fn string) error { 1050 f, err := os.Open(fn) 1051 if err != nil { 1052 return err 1053 } 1054 1055 iFile, err := zoekt.NewIndexFile(f) 1056 if err != nil { 1057 return err 1058 } 1059 1060 return zoekt.PrintNgramStats(iFile) 1061} 1062 1063func srcLogLevelIsDebug() bool { 1064 lvl := os.Getenv(sglog.EnvLogLevel) 1065 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1066} 1067 1068func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1069 v := os.Getenv(k) 1070 if v == "" { 1071 return defaultVal 1072 } 1073 i, err := strconv.ParseInt(v, 10, 64) 1074 if err != nil { 1075 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1076 } 1077 return i 1078} 1079 1080func getEnvWithDefaultInt(k string, defaultVal int) int { 1081 v := os.Getenv(k) 1082 if v == "" { 1083 return defaultVal 1084 } 1085 i, err := strconv.Atoi(k) 1086 if err != nil { 1087 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1088 } 1089 return i 1090} 1091 1092func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1093 v := os.Getenv(k) 1094 if v == "" { 1095 return defaultVal 1096 } 1097 i, err := strconv.ParseUint(v, 10, 64) 1098 if err != nil { 1099 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1100 } 1101 return i 1102} 1103 1104func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1105 v := os.Getenv(k) 1106 if v == "" { 1107 return defaultVal 1108 } 1109 f, err := strconv.ParseFloat(v, 64) 1110 if err != nil { 1111 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1112 } 1113 return f 1114} 1115 1116func getEnvWithDefaultString(k string, defaultVal string) string { 1117 v := os.Getenv(k) 1118 if v == "" { 1119 return defaultVal 1120 } 1121 return v 1122} 1123 1124func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1125 v := os.Getenv(k) 1126 if v == "" { 1127 return defaultVal 1128 } 1129 1130 d, err := time.ParseDuration(v) 1131 if err != nil { 1132 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1133 } 1134 return d 1135} 1136 1137func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1138 set := map[string]struct{}{} 1139 for _, v := range strings.Split(os.Getenv(k), ",") { 1140 v = strings.TrimSpace(v) 1141 if v != "" { 1142 set[v] = struct{}{} 1143 } 1144 } 1145 return set 1146} 1147 1148func joinStringSet(set map[string]struct{}, sep string) string { 1149 var xs []string 1150 for x := range set { 1151 xs = append(xs, x) 1152 } 1153 1154 return strings.Join(xs, sep) 1155} 1156 1157func setCompoundShardCounter(indexDir string) { 1158 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt")) 1159 if err != nil { 1160 log.Printf("setCompoundShardCounter: %s\n", err) 1161 return 1162 } 1163 metricNumberCompoundShards.Set(float64(len(fns))) 1164} 1165 1166func rootCmd() *ffcli.Command { 1167 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1168 conf := rootConfig{ 1169 Main: true, 1170 } 1171 conf.registerRootFlags(rootFs) 1172 1173 return &ffcli.Command{ 1174 FlagSet: rootFs, 1175 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1176 Subcommands: []*ffcli.Command{debugCmd()}, 1177 Exec: func(ctx context.Context, args []string) error { 1178 return startServer(conf) 1179 }, 1180 } 1181} 1182 1183type rootConfig struct { 1184 // Main is true if this rootConfig is for our main long running command (the 1185 // indexserver). Debug commands should not set this value. This is used to 1186 // determine if we need to run tmpfriend. 1187 Main bool 1188 1189 root string 1190 interval time.Duration 1191 index string 1192 indexConcurrency int64 1193 listen string 1194 hostname string 1195 cpuFraction float64 1196 blockProfileRate int 1197 1198 // config values related to shard merging 1199 vacuumInterval time.Duration 1200 mergeInterval time.Duration 1201 targetSize int64 1202 minSize int64 1203 minAgeDays int 1204 maxPriority float64 1205 1206 // config values related to backoff indexing repos with one or more consecutive failures 1207 backoffDuration time.Duration 1208 maxBackoffDuration time.Duration 1209} 1210 1211func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1212 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1213 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1214 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently") 1215 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1216 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1217 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1218 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1219 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate") 1220 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1221 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1222 1223 // flags related to shard merging 1224 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1225 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1226 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 1227 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 1228 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1229 fs.Float64Var(&rc.maxPriority, "merge_max_priority", getEnvWithDefaultFloat64("SRC_MERGE_MAX_PRIORITY", 100), "the maximum priority a shard can have to be considered for merging.") 1230} 1231 1232func startServer(conf rootConfig) error { 1233 s, err := newServer(conf) 1234 if err != nil { 1235 return err 1236 } 1237 1238 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate) 1239 setCompoundShardCounter(s.IndexDir) 1240 1241 if conf.listen != "" { 1242 1243 mux := http.NewServeMux() 1244 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1245 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1246 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1247 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1248 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1249 }...) 1250 s.addDebugHandlers(mux) 1251 1252 go func() { 1253 debug.Printf("serving HTTP on %s", conf.listen) 1254 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1255 }() 1256 1257 // Serve mux on a unix domain socket on a best-effort-basis so that 1258 // webserver can call the endpoints via the shared filesystem. 1259 // 1260 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1261 // on the socket due to permission errors. See 1262 // https://github.com/docker/for-mac/issues/6239 1263 go func() { 1264 serveHTTPOverSocket := func() error { 1265 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1266 // We cannot bind a socket to an existing pathname. 1267 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1268 return fmt.Errorf("error removing socket file: %s", socket) 1269 } 1270 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1271 // unixpacket). 1272 l, err := net.Listen("unix", socket) 1273 if err != nil { 1274 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1275 } 1276 // Indexserver (root) and webserver (Sourcegraph) run with 1277 // different users. Per default, the socket is created with 1278 // permission 755 (root root), which doesn't let webserver write to 1279 // it. 1280 // 1281 // See https://github.com/golang/go/issues/11822 for more context. 1282 if err := os.Chmod(socket, 0o777); err != nil { 1283 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1284 } 1285 debug.Printf("serving HTTP on %s", socket) 1286 return http.Serve(l, mux) 1287 } 1288 debug.Print(serveHTTPOverSocket()) 1289 }() 1290 } 1291 1292 oc := &ownerChecker{ 1293 Path: filepath.Join(conf.index, "owner.txt"), 1294 Hostname: conf.hostname, 1295 } 1296 go oc.Run() 1297 1298 logger := sglog.Scoped("metricsRegistration") 1299 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1300 1301 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1302 prometheus.DefaultRegisterer.MustRegister(c) 1303 1304 s.Run() 1305 return nil 1306} 1307 1308func newServer(conf rootConfig) (*Server, error) { 1309 logger := sglog.Scoped("server") 1310 1311 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1312 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1313 } 1314 if conf.index == "" { 1315 return nil, fmt.Errorf("must set -index") 1316 } 1317 if conf.root == "" { 1318 return nil, fmt.Errorf("must set -sourcegraph_url") 1319 } 1320 rootURL, err := url.Parse(conf.root) 1321 if err != nil { 1322 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1323 } 1324 1325 rootURL = addDefaultPort(rootURL) 1326 1327 // Tune GOMAXPROCS to match Linux container CPU quota. 1328 _, _ = maxprocs.Set() 1329 1330 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler. 1331 // The block profiler is disabled by default and should be enabled with care in production 1332 runtime.SetBlockProfileRate(conf.blockProfileRate) 1333 1334 // Automatically prepend our own path at the front, to minimize 1335 // required configuration. 1336 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1337 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1338 } 1339 1340 if _, err := os.Stat(conf.index); err != nil { 1341 if err := os.MkdirAll(conf.index, 0o755); err != nil { 1342 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1343 } 1344 } 1345 1346 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1347 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1348 } 1349 1350 if srcLogLevelIsDebug() { 1351 debug = log.New(os.Stderr, "", log.LstdFlags) 1352 } 1353 1354 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1355 if len(reposWithSeparateIndexingMetrics) > 0 { 1356 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1357 } 1358 1359 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1360 if len(deltaBuildRepositoriesAllowList) > 0 { 1361 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1362 } 1363 1364 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1365 if deltaShardNumberFallbackThreshold > 0 { 1366 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1367 } else { 1368 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1369 } 1370 1371 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1372 if len(reposShouldSkipSymbolsCalculation) > 0 { 1373 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1374 } 1375 1376 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1377 if indexingTimeout != defaultIndexingTimeout { 1378 debug.Printf("using configured indexing timeout: %s", indexingTimeout) 1379 } 1380 1381 var sg Sourcegraph 1382 if rootURL.IsAbs() { 1383 var batchSize int 1384 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1385 batchSize, err = strconv.Atoi(v) 1386 if err != nil { 1387 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1388 } 1389 } 1390 1391 opts := []SourcegraphClientOption{ 1392 WithBatchSize(batchSize), 1393 } 1394 1395 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1396 client, err := dialGRPCClient(rootURL.Host, logger) 1397 if err != nil { 1398 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1399 } 1400 1401 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...) 1402 1403 } else { 1404 sg = sourcegraphFake{ 1405 RootDir: rootURL.String(), 1406 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1407 } 1408 } 1409 1410 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1411 if cpuCount < 1 { 1412 cpuCount = 1 1413 } 1414 1415 if conf.indexConcurrency < 1 { 1416 conf.indexConcurrency = 1 1417 } else if conf.indexConcurrency > int64(cpuCount) { 1418 conf.indexConcurrency = int64(cpuCount) 1419 } 1420 1421 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1422 1423 return &Server{ 1424 logger: logger, 1425 Sourcegraph: sg, 1426 IndexDir: conf.index, 1427 IndexConcurrency: int(conf.indexConcurrency), 1428 Interval: conf.interval, 1429 CPUCount: cpuCount, 1430 queue: *q, 1431 shardMerging: zoekt.ShardMergingEnabled(), 1432 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1433 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1434 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1435 hostname: conf.hostname, 1436 mergeOpts: mergeOpts{ 1437 vacuumInterval: conf.vacuumInterval, 1438 mergeInterval: conf.mergeInterval, 1439 targetSizeBytes: conf.targetSize * 1024 * 1024, 1440 minSizeBytes: conf.minSize * 1024 * 1024, 1441 minAgeDays: conf.minAgeDays, 1442 maxPriority: conf.maxPriority, 1443 }, 1444 timeout: indexingTimeout, 1445 }, err 1446} 1447 1448// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1449// for the indexed-search-configuration gRPC service. 1450// 1451// The default backoff strategy is modeled after the default settings used by 1452// retryablehttp.DefaultClient. 1453// 1454// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1455// - Unavailable 1456// - Aborted 1457// 1458//go:embed default_grpc_service_configuration.json 1459var defaultGRPCServiceConfigurationJSON string 1460 1461func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1462 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1463 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1464 return invoker(ctx, method, req, reply, cc, opts...) 1465 } 1466} 1467 1468func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1469 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1470 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1471 return streamer(ctx, desc, cc, method, opts...) 1472 } 1473} 1474 1475// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1476// This can be overridden by providing custom Server/Dial options. 1477const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1478 1479func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) { 1480 metrics := mustGetClientMetrics() 1481 1482 // If the service seems to be unavailable, this 1483 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1484 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1485 retryOpts := []retry.CallOption{ 1486 retry.WithMax(5), 1487 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1488 retry.WithCodes(codes.Unavailable), 1489 } 1490 1491 opts := []grpc.DialOption{ 1492 grpc.WithTransportCredentials(insecure.NewCredentials()), 1493 grpc.WithChainStreamInterceptor( 1494 metrics.StreamClientInterceptor(), 1495 messagesize.StreamClientInterceptor, 1496 internalActorStreamInterceptor(), 1497 internalerrs.LoggingStreamClientInterceptor(logger), 1498 internalerrs.PrometheusStreamClientInterceptor, 1499 retry.StreamClientInterceptor(retryOpts...), 1500 ), 1501 grpc.WithChainUnaryInterceptor( 1502 metrics.UnaryClientInterceptor(), 1503 messagesize.UnaryClientInterceptor, 1504 internalActorUnaryInterceptor(), 1505 internalerrs.LoggingUnaryClientInterceptor(logger), 1506 internalerrs.PrometheusUnaryClientInterceptor, 1507 retry.UnaryClientInterceptor(retryOpts...), 1508 ), 1509 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1510 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1511 } 1512 1513 opts = append(opts, additionalOpts...) 1514 1515 // Ensure that the message size options are set last, so they override any other 1516 // client-specific options that tweak the message size. 1517 // 1518 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1519 // take precedence over everything else with a uniform size setting that's easy to reason about. 1520 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1521 1522 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1523 // This is done lazily, so we can provide the client to use regardless of 1524 // whether we enabled gRPC or not initially. 1525 cc, err := grpc.Dial(addr, opts...) 1526 if err != nil { 1527 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1528 } 1529 1530 client := proto.NewZoektConfigurationServiceClient(cc) 1531 return client, nil 1532} 1533 1534// mustGetClientMetrics returns a singleton instance of the client metrics 1535// that are shared across all gRPC clients that this process creates. 1536// 1537// This function panics if the metrics cannot be registered with the default 1538// Prometheus registry. 1539func mustGetClientMetrics() *grpcprom.ClientMetrics { 1540 clientMetricsOnce.Do(func() { 1541 clientMetrics = grpcprom.NewClientMetrics( 1542 grpcprom.WithClientCounterOptions(), 1543 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 1544 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 1545 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 1546 ) 1547 1548 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 1549 }) 1550 1551 return clientMetrics 1552} 1553 1554// addDefaultPort adds a default port to a URL if one is not specified. 1555// 1556// If the URL scheme is "http" and no port is specified, "80" is used. 1557// If the scheme is "https", "443" is used. 1558// 1559// The original URL is not mutated. A copy is modified and returned. 1560func addDefaultPort(original *url.URL) *url.URL { 1561 if original == nil { 1562 return nil // don't panic 1563 } 1564 1565 if !original.IsAbs() { 1566 return original // don't do anything if the URL doesn't look like a remote URL 1567 } 1568 1569 if original.Scheme == "http" && original.Port() == "" { 1570 u := cloneURL(original) 1571 u.Host = net.JoinHostPort(u.Host, "80") 1572 return u 1573 } 1574 1575 if original.Scheme == "https" && original.Port() == "" { 1576 u := cloneURL(original) 1577 u.Host = net.JoinHostPort(u.Host, "443") 1578 return u 1579 } 1580 1581 return original 1582} 1583 1584// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1585// This is copied from net/http/clone.go 1586func cloneURL(u *url.URL) *url.URL { 1587 if u == nil { 1588 return nil 1589 } 1590 u2 := new(url.URL) 1591 *u2 = *u 1592 if u.User != nil { 1593 u2.User = new(url.Userinfo) 1594 *u2.User = *u.User 1595 } 1596 return u2 1597} 1598 1599func main() { 1600 liblog := sglog.Init(sglog.Resource{ 1601 Name: "zoekt-indexserver", 1602 Version: zoekt.Version, 1603 InstanceID: zoekt.HostnameBestEffort(), 1604 }) 1605 defer liblog.Sync() 1606 1607 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1608 log.Fatal(err) 1609 } 1610} 1611 1612// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1613// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1614// 1615// An error is returned of the provided environment variables fails to parse as a boolean. 1616func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1617 for _, envVar := range envVarNames { 1618 v := os.Getenv(envVar) 1619 if v == "" { 1620 continue 1621 } 1622 1623 b, err := strconv.ParseBool(v) 1624 if err != nil { 1625 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1626 } 1627 1628 return b, nil 1629 } 1630 1631 return defaultBool, nil 1632}