fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled 2// repositories on sourcegraph 3package main 4 5import ( 6 "bytes" 7 "context" 8 _ "embed" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "html/template" 14 "io" 15 "io/fs" 16 "log" 17 "math" 18 "math/rand" 19 "net" 20 "net/http" 21 "net/url" 22 "os" 23 "os/exec" 24 "os/signal" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strconv" 29 "strings" 30 "sync" 31 "text/tabwriter" 32 "time" 33 34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 36 "github.com/peterbourgon/ff/v3/ffcli" 37 "github.com/prometheus/client_golang/prometheus" 38 "github.com/prometheus/client_golang/prometheus/promauto" 39 sglog "github.com/sourcegraph/log" 40 "github.com/sourcegraph/mountinfo" 41 "go.uber.org/automaxprocs/maxprocs" 42 "golang.org/x/net/trace" 43 "golang.org/x/sys/unix" 44 "google.golang.org/grpc" 45 "google.golang.org/grpc/codes" 46 "google.golang.org/grpc/credentials/insecure" 47 "google.golang.org/grpc/metadata" 48 49 "github.com/sourcegraph/zoekt" 50 "github.com/sourcegraph/zoekt/build" 51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 52 "github.com/sourcegraph/zoekt/debugserver" 53 "github.com/sourcegraph/zoekt/grpc/internalerrs" 54 "github.com/sourcegraph/zoekt/grpc/messagesize" 55 "github.com/sourcegraph/zoekt/internal/profiler" 56) 57 58var ( 59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 60 Name: "resolve_revisions_seconds", 61 Help: "A histogram of latencies for resolving all repository revisions.", 62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 63 }) 64 65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 66 Name: "resolve_revision_seconds", 67 Help: "A histogram of latencies for resolving a repository revision.", 68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 69 }, []string{"success"}) // success=true|false 70 71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 72 Name: "get_index_options_total", 73 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 74 }) 75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 76 Name: "get_index_options_error_total", 77 Help: "The total number of times we failed to get index options for a repository.", 78 }) 79 80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 81 Name: "index_repo_seconds", 82 Help: "A histogram of latencies for indexing a repository.", 83 Buckets: prometheus.ExponentialBucketsRange( 84 (100 * time.Millisecond).Seconds(), 85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 86 20), 87 }, []string{ 88 "state", // state is an indexState 89 "name", // name of the repository that was indexed 90 }) 91 92 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 93 Name: "index_fetch_seconds", 94 Help: "A histogram of latencies for fetching a repository.", 95 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 96 }, []string{ 97 "success", // true|false 98 "name", // the name of the repository that the commits were fetched from 99 }) 100 101 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 102 Name: "index_incremental_index_state", 103 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 104 }, []string{"state"}) // state is build.IndexState 105 106 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 107 Name: "index_num_indexed", 108 Help: "Number of indexed repos by code host", 109 }) 110 111 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 112 Name: "index_failing_total", 113 Help: "Counts failures to index (indexing activity, should be used with rate())", 114 }) 115 116 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 117 Name: "index_indexing_total", 118 Help: "Counts indexings (indexing activity, should be used with rate())", 119 }) 120 121 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 122 Name: "index_num_stopped_tracking_total", 123 Help: "Counts the number of repos we stopped tracking.", 124 }) 125 126 clientMetricsOnce sync.Once 127 clientMetrics *grpcprom.ClientMetrics 128) 129 130// set of repositories that we want to capture separate indexing metrics for 131var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 132 133type indexState string 134 135const ( 136 indexStateFail indexState = "fail" 137 indexStateSuccess indexState = "success" 138 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 139 indexStateNoop indexState = "noop" // We didn't need to update index 140 indexStateEmpty indexState = "empty" // index is empty (empty repo) 141) 142 143// Server is the main functionality of zoekt-sourcegraph-indexserver. It 144// exists to conveniently use all the options passed in via func main. 145type Server struct { 146 logger sglog.Logger 147 148 Sourcegraph Sourcegraph 149 BatchSize int 150 151 // IndexDir is the index directory to use. 152 IndexDir string 153 154 // IndexConcurrency is the number of repositories we index at once. 155 IndexConcurrency int 156 157 // Interval is how often we sync with Sourcegraph. 158 Interval time.Duration 159 160 // CPUCount is the number of CPUs to use for indexing shards. 161 CPUCount int 162 163 queue Queue 164 165 // muIndexDir protects the index directory from concurrent access. 166 muIndexDir indexMutex 167 168 // If true, shard merging is enabled. 169 shardMerging bool 170 171 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 172 // use delta-builds for instead of normal builds 173 deltaBuildRepositoriesAllowList map[string]struct{} 174 175 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 176 // before attempting a delta build. 177 deltaShardNumberFallbackThreshold uint64 178 179 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 180 // we skip calculating symbols metadata for during builds 181 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 182 183 hostname string 184 185 mergeOpts mergeOpts 186 187 // timeout defines how long the index server waits before killing an indexing job. 188 timeout time.Duration 189} 190 191var debug = log.New(io.Discard, "", log.LstdFlags) 192 193// our index commands should output something every 100mb they process. 194// 195// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 196// time." famous last words. A client was indexing a monorepo with 42 197// cores... 5m was not enough. 198const noOutputTimeout = 30 * time.Minute 199 200func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 201 out := &synchronizedBuffer{} 202 cmd.Stdout = out 203 cmd.Stderr = out 204 205 tr.LazyPrintf("%s", cmd.Args) 206 207 defer func() { 208 if err != nil { 209 outS := out.String() 210 tr.LazyPrintf("failed: %v", err) 211 tr.LazyPrintf("output: %s", out) 212 tr.SetError() 213 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 214 } 215 }() 216 217 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 218 219 if err := cmd.Start(); err != nil { 220 return err 221 } 222 223 errC := make(chan error) 224 go func() { 225 errC <- cmd.Wait() 226 }() 227 228 // This channel is set after we have sent sigquit. It allows us to follow up 229 // with a sigkill if the process doesn't quit after sigquit. 230 kill := make(<-chan time.Time) 231 232 lastLen := 0 233 for { 234 select { 235 case <-time.After(noOutputTimeout): 236 // Periodically check if we have had output. If not kill the process. 237 if out.Len() != lastLen { 238 lastLen = out.Len() 239 log.Printf("still running %s", cmd.Args) 240 } else { 241 // Send quit (C-\) first so we get a stack dump. 242 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 243 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 244 log.Println("quit failed:", err) 245 } 246 247 // send sigkill if still running in 10s 248 kill = time.After(10 * time.Second) 249 } 250 251 case <-kill: 252 log.Printf("still running, killing %s", cmd.Args) 253 if err := cmd.Process.Kill(); err != nil { 254 log.Println("kill failed:", err) 255 } 256 257 case err := <-errC: 258 if err != nil { 259 return err 260 } 261 262 tr.LazyPrintf("success") 263 return nil 264 } 265 } 266} 267 268// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 269// monitor the buffer while it is being written to. 270type synchronizedBuffer struct { 271 mu sync.Mutex 272 b bytes.Buffer 273} 274 275func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 276 sb.mu.Lock() 277 defer sb.mu.Unlock() 278 return sb.b.Write(p) 279} 280 281func (sb *synchronizedBuffer) Len() int { 282 sb.mu.Lock() 283 defer sb.mu.Unlock() 284 return sb.b.Len() 285} 286 287func (sb *synchronizedBuffer) String() string { 288 sb.mu.Lock() 289 defer sb.mu.Unlock() 290 return sb.b.String() 291} 292 293// pauseFileName if present in IndexDir will stop index jobs from 294// running. This is to make it possible to experiment with the content of the 295// IndexDir without the indexserver writing to it. 296const pauseFileName = "PAUSE" 297 298// Run the sync loop. This blocks forever. 299func (s *Server) Run() { 300 removeIncompleteShards(s.IndexDir) 301 302 // Start a goroutine which updates the queue with commits to index. 303 go func() { 304 // We update the list of indexed repos every Interval. To speed up manual 305 // testing we also listen for SIGUSR1 to trigger updates. 306 // 307 // "pkill -SIGUSR1 zoekt-sourcegra" 308 for range jitterTicker(s.Interval, unix.SIGUSR1) { 309 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 310 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 311 continue 312 } 313 314 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 315 if err != nil { 316 log.Printf("error listing repos: %s", err) 317 continue 318 } 319 320 debug.Printf("updating index queue with %d repositories", len(repos.IDs)) 321 322 // Stop indexing repos we don't need to track anymore 323 removed := s.queue.MaybeRemoveMissing(repos.IDs) 324 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 325 if len(removed) > 0 { 326 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 327 } 328 329 cleanupDone := make(chan struct{}) 330 go func() { 331 defer close(cleanupDone) 332 s.muIndexDir.Global(func() { 333 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 334 }) 335 }() 336 337 repos.IterateIndexOptions(s.queue.AddOrUpdate) 338 339 // IterateIndexOptions will only iterate over repositories that have 340 // changed since we last called list. However, we want to add all IDs 341 // back onto the queue just to check that what is on disk is still 342 // correct. This will use the last IndexOptions we stored in the 343 // queue. The repositories not on the queue (missing) need a forced 344 // fetch of IndexOptions. 345 missing := s.queue.Bump(repos.IDs) 346 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 347 348 setCompoundShardCounter(s.IndexDir) 349 350 <-cleanupDone 351 } 352 }() 353 354 go func() { 355 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 356 if s.shardMerging { 357 s.vacuum() 358 } 359 } 360 }() 361 362 go func() { 363 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 364 if s.shardMerging { 365 s.doMerge() 366 } 367 } 368 }() 369 370 for i := 0; i < s.IndexConcurrency; i++ { 371 go s.processQueue() 372 } 373 374 // block forever 375 select {} 376} 377 378// formatList returns a comma-separated list of the first min(len(v), m) items. 379func formatListUint32(v []uint32, m int) string { 380 if len(v) < m { 381 m = len(v) 382 } 383 384 sb := strings.Builder{} 385 for i := 0; i < m; i++ { 386 fmt.Fprintf(&sb, "%d, ", v[i]) 387 } 388 389 if len(v) > m { 390 sb.WriteString("...") 391 } 392 393 return strings.TrimRight(sb.String(), ", ") 394} 395 396func (s *Server) processQueue() { 397 for { 398 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 399 time.Sleep(time.Second) 400 continue 401 } 402 403 opts, ok := s.queue.Pop() 404 if !ok { 405 time.Sleep(time.Second) 406 continue 407 } 408 409 args := s.indexArgs(opts) 410 411 ran := s.muIndexDir.With(opts.Name, func() { 412 // only record time taken once we hold the lock. This avoids us 413 // recording time taken while merging/cleanup runs. 414 start := time.Now() 415 416 state, err := s.Index(args) 417 418 elapsed := time.Since(start) 419 420 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 421 422 if err != nil { 423 log.Printf("error indexing %s: %s", args.String(), err) 424 } 425 426 switch state { 427 case indexStateSuccess: 428 var branches []string 429 for _, b := range args.Branches { 430 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 431 } 432 s.logger.Info("updated index", 433 sglog.String("repo", args.Name), 434 sglog.Uint32("id", args.RepoID), 435 sglog.Strings("branches", branches), 436 sglog.Duration("duration", elapsed), 437 ) 438 case indexStateSuccessMeta: 439 log.Printf("updated meta %s in %v", args.String(), elapsed) 440 } 441 s.queue.SetIndexed(opts, state) 442 }) 443 444 if !ran { 445 // Someone else is processing the repository. We can just skip this job 446 // since the repository will be added back to the queue and we will 447 // converge to the correct behaviour. 448 debug.Printf("index job for repository already running: %s", args) 449 continue 450 } 451 } 452} 453 454// repoNameForMetric returns a normalized version of the given repository name that is 455// suitable for use with Prometheus metrics. 456func repoNameForMetric(repo string) string { 457 // Check to see if we want to be able to capture separate indexing metrics for this repository. 458 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 459 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 460 return repo 461 } 462 463 return "" 464} 465 466func batched(slice []uint32, size int) <-chan []uint32 { 467 c := make(chan []uint32) 468 go func() { 469 for len(slice) > 0 { 470 if size > len(slice) { 471 size = len(slice) 472 } 473 c <- slice[:size] 474 slice = slice[size:] 475 } 476 close(c) 477 }() 478 return c 479} 480 481// jitterTicker returns a ticker which ticks with a jitter. Each tick is 482// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 483// 484// sig is a list of signals which also cause the ticker to fire. This is a 485// convenience to allow manually triggering of the ticker. 486func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 487 ticker := make(chan struct{}) 488 489 go func() { 490 for { 491 ticker <- struct{}{} 492 ns := int64(d) 493 jitter := rand.Int63n(ns) 494 time.Sleep(time.Duration(ns/2 + jitter)) 495 } 496 }() 497 498 go func() { 499 if len(sig) == 0 { 500 return 501 } 502 503 c := make(chan os.Signal, 1) 504 signal.Notify(c, sig...) 505 for range c { 506 ticker <- struct{}{} 507 } 508 }() 509 510 return ticker 511} 512 513// Index starts an index job for repo name at commit. 514func (s *Server) Index(args *indexArgs) (state indexState, err error) { 515 tr := trace.New("index", args.Name) 516 517 defer func() { 518 if err != nil { 519 tr.SetError() 520 tr.LazyPrintf("error: %v", err) 521 state = indexStateFail 522 metricFailingTotal.Inc() 523 } 524 tr.LazyPrintf("state: %s", state) 525 tr.Finish() 526 }() 527 528 tr.LazyPrintf("branches: %v", args.Branches) 529 530 if len(args.Branches) == 0 { 531 return indexStateEmpty, createEmptyShard(args) 532 } 533 534 repositoryName := args.Name 535 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 536 tr.LazyPrintf("marking this repository for delta build") 537 args.UseDelta = true 538 } 539 540 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 541 542 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 543 tr.LazyPrintf("skipping symbols calculation") 544 args.Symbols = false 545 } 546 547 reason := "forced" 548 549 if args.Incremental { 550 bo := args.BuildOptions() 551 bo.SetDefaults() 552 incrementalState, fn := bo.IndexState() 553 reason = string(incrementalState) 554 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 555 556 switch incrementalState { 557 case build.IndexStateEqual: 558 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn) 559 return indexStateNoop, nil 560 561 case build.IndexStateMeta: 562 log.Printf("updating index.meta %s", args.String()) 563 564 if err := mergeMeta(bo); err != nil { 565 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 566 } else { 567 return indexStateSuccessMeta, nil 568 } 569 570 case build.IndexStateCorrupt: 571 log.Printf("falling back to full update: corrupt index: %s", args.String()) 572 } 573 } 574 575 log.Printf("updating index %s reason=%s", args.String(), reason) 576 577 metricIndexingTotal.Inc() 578 c := gitIndexConfig{ 579 runCmd: func(cmd *exec.Cmd) error { 580 return s.loggedRun(tr, cmd) 581 }, 582 583 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 584 return args.BuildOptions().FindRepositoryMetadata() 585 }, 586 timeout: s.timeout, 587 } 588 589 err = gitIndex(c, args, s.Sourcegraph, s.logger) 590 if err != nil { 591 return indexStateFail, err 592 } 593 594 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 595 s.logger.Error("failed to update index status", 596 sglog.String("repo", args.Name), 597 sglog.Uint32("id", args.RepoID), 598 sglogBranches("branches", args.Branches), 599 sglog.Error(err), 600 ) 601 } 602 603 return indexStateSuccess, nil 604} 605 606// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 607// it can update the zoekt_repos table. 608func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 609 // We need to read from disk for IndexTime. 610 _, metadata, ok, err := c.findRepositoryMetadata(args) 611 if err != nil { 612 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 613 } 614 if !ok { 615 return errors.New("failed to find metadata for new/updated index") 616 } 617 618 status := []indexStatus{{ 619 RepoID: args.RepoID, 620 Branches: args.Branches, 621 IndexTimeUnix: metadata.IndexTime.Unix(), 622 }} 623 if err := sg.UpdateIndexStatus(status); err != nil { 624 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 625 } 626 627 return nil 628} 629 630func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 631 ss := make([]string, len(branches)) 632 for i, b := range branches { 633 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 634 } 635 return sglog.Strings(key, ss) 636} 637 638func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 639 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0)) 640 return &indexArgs{ 641 IndexOptions: opts, 642 IndexDir: s.IndexDir, 643 Parallelism: parallelism, 644 Incremental: true, 645 646 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 647 FileLimit: 1 << 20, 648 649 ShardMerging: s.shardMerging, 650 } 651} 652 653// parallelism consults both the server flags and index options to determine the number 654// of shards to index in parallel. If the CPUCount index option is provided, it always 655// overrides the server flag. 656func (s *Server) parallelism(opts IndexOptions, maxProcs int) int { 657 var parallelism int 658 if opts.ShardConcurrency > 0 { 659 parallelism = int(opts.ShardConcurrency) 660 } else { 661 parallelism = s.CPUCount 662 } 663 664 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs 665 if parallelism > 4*maxProcs { 666 parallelism = 4 * maxProcs 667 } 668 669 // If index concurrency is set, then divide the parallelism by the number of 670 // repos we're indexing in parallel 671 if s.IndexConcurrency > 1 { 672 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency))) 673 } 674 675 return parallelism 676} 677 678func createEmptyShard(args *indexArgs) error { 679 bo := args.BuildOptions() 680 bo.SetDefaults() 681 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 682 683 if args.Incremental && bo.IncrementalSkipIndexing() { 684 return nil 685 } 686 687 builder, err := build.NewBuilder(*bo) 688 if err != nil { 689 return err 690 } 691 return builder.Finish() 692} 693 694// addDebugHandlers adds handlers specific to indexserver. 695func (s *Server) addDebugHandlers(mux *http.ServeMux) { 696 // Sourcegraph's site admin view requires indexserver to serve it's admin view 697 // on "/". 698 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 699 700 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 701 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 702 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 703 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 704 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 705 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 706} 707 708func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 709 if r.Method != "GET" { 710 w.Header().Set("Allow", "GET") 711 w.WriteHeader(http.StatusMethodNotAllowed) 712 return 713 } 714 715 response := struct { 716 Hostname string 717 }{ 718 Hostname: s.hostname, 719 } 720 721 b, err := json.Marshal(response) 722 if err != nil { 723 http.Error(w, err.Error(), http.StatusInternalServerError) 724 return 725 } 726 727 w.Header().Set("Content-Type", "application/json; charset=utf-8") 728 w.Write(b) 729} 730 731var rootTmpl = template.Must(template.New("name").Parse(` 732<html> 733 <body> 734 <a href="debug">Debug</a><br /> 735 <a href="debug/requests">Traces</a><br /> 736 {{.IndexMsg}}<br /> 737 <br /> 738 <h3>Reindex</h3> 739 {{if .Repos}} 740 <a href="?show_repos=false">hide repos</a><br /> 741 <table style="margin-top: 20px"> 742 <th style="text-align:left">Name</th> 743 <th style="text-align:left">ID</th> 744 {{range .Repos}} 745 <tr> 746 <td>{{.Name}}</td> 747 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 748 </tr> 749 {{end}} 750 </table> 751 {{else}} 752 <a href="?show_repos=true">show repos</a><br /> 753 {{end}} 754 </body> 755</html> 756`)) 757 758func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 759 if r.Method != "GET" { 760 w.Header().Set("Allow", "GET") 761 w.WriteHeader(http.StatusMethodNotAllowed) 762 return 763 } 764 765 values := r.URL.Query() 766 767 // ?id= 768 indexMsg := "" 769 if v := values.Get("id"); v != "" { 770 id, err := strconv.Atoi(v) 771 if err != nil { 772 http.Error(w, err.Error(), http.StatusBadRequest) 773 return 774 } 775 indexMsg, _ = s.forceIndex(uint32(id)) 776 } 777 778 // ?show_repos= 779 showRepos := false 780 if v := values.Get("show_repos"); v != "" { 781 showRepos, _ = strconv.ParseBool(v) 782 } 783 784 type Repo struct { 785 ID uint32 786 Name string 787 } 788 var data struct { 789 Repos []Repo 790 IndexMsg string 791 } 792 793 data.IndexMsg = indexMsg 794 795 if showRepos { 796 s.queue.Iterate(func(opts *IndexOptions) { 797 data.Repos = append(data.Repos, Repo{ 798 ID: opts.RepoID, 799 Name: opts.Name, 800 }) 801 }) 802 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 803 } 804 805 _ = rootTmpl.Execute(w, data) 806} 807 808// handleReindex triggers a reindex asynocronously. If a reindex was triggered 809// the request returns with status 202. The caller can infer the new state of 810// the index by calling List. 811func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 812 if r.Method != http.MethodPost { 813 w.Header().Set("Allow", http.MethodPost) 814 w.WriteHeader(http.StatusMethodNotAllowed) 815 return 816 } 817 818 err := r.ParseForm() 819 if err != nil { 820 http.Error(w, err.Error(), http.StatusBadRequest) 821 return 822 } 823 824 id, err := strconv.Atoi(r.Form.Get("repo")) 825 if err != nil { 826 http.Error(w, err.Error(), http.StatusBadRequest) 827 return 828 } 829 830 go func() { s.forceIndex(uint32(id)) }() 831 832 // 202 Accepted 833 w.WriteHeader(http.StatusAccepted) 834} 835 836func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 837 withIndexed := true 838 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 839 withIndexed = b 840 } 841 842 var indexed []uint32 843 if withIndexed { 844 indexed = listIndexed(s.IndexDir) 845 } 846 847 repos, err := s.Sourcegraph.List(r.Context(), indexed) 848 if err != nil { 849 http.Error(w, err.Error(), http.StatusInternalServerError) 850 return 851 } 852 853 bw := bytes.Buffer{} 854 855 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 856 857 _, err = fmt.Fprintf(tw, "ID\tName\n") 858 if err != nil { 859 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 860 return 861 } 862 863 s.queue.mu.Lock() 864 name := "" 865 for _, id := range repos.IDs { 866 if item := s.queue.get(id); item != nil { 867 name = item.opts.Name 868 } else { 869 name = "" 870 } 871 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 872 if err != nil { 873 debug.Printf("handleDebugList: %s\n", err.Error()) 874 } 875 } 876 s.queue.mu.Unlock() 877 878 if err != nil { 879 http.Error(w, err.Error(), http.StatusInternalServerError) 880 return 881 } 882 883 err = tw.Flush() 884 if err != nil { 885 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 886 return 887 } 888 889 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 890 891 if _, err := io.Copy(w, &bw); err != nil { 892 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 893 return 894 } 895} 896 897// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 898// can run this command during periods of low usage (evenings, weekends) to 899// trigger an initial merge run. In the steady-state, merges happen rarely, even 900// on busy instances, and users can rely on automatic merging instead. 901func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 902 // A merge operation can take very long, depending on the number merges and the 903 // target size of the compound shards. We run the merge in the background and 904 // return immediately to the user. 905 // 906 // We track the status of the merge with metricShardMergingRunning. 907 go func() { 908 s.doMerge() 909 }() 910 _, _ = w.Write([]byte("merging enqueued\n")) 911} 912 913func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 914 indexed := listIndexed(s.IndexDir) 915 916 bw := bytes.Buffer{} 917 918 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 919 920 _, err := fmt.Fprintf(tw, "ID\tName\n") 921 if err != nil { 922 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 923 return 924 } 925 926 s.queue.mu.Lock() 927 name := "" 928 for _, id := range indexed { 929 if item := s.queue.get(id); item != nil { 930 name = item.opts.Name 931 } else { 932 name = "" 933 } 934 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 935 if err != nil { 936 debug.Printf("handleDebugIndexed: %s\n", err.Error()) 937 } 938 } 939 s.queue.mu.Unlock() 940 941 if err != nil { 942 http.Error(w, err.Error(), http.StatusInternalServerError) 943 return 944 } 945 946 err = tw.Flush() 947 if err != nil { 948 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 949 return 950 } 951 952 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 953 954 if _, err := io.Copy(w, &bw); err != nil { 955 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 956 return 957 } 958} 959 960// forceIndex will run the index job for repo name now. It will return always 961// return a string explaining what it did, even if it failed. 962func (s *Server) forceIndex(id uint32) (string, error) { 963 var opts IndexOptions 964 var err error 965 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 966 opts = o 967 }, func(_ uint32, e error) { 968 err = e 969 }, id) 970 if err != nil { 971 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 972 } 973 974 args := s.indexArgs(opts) 975 args.Incremental = false // force re-index 976 977 var state indexState 978 ran := s.muIndexDir.With(opts.Name, func() { 979 state, err = s.Index(args) 980 }) 981 if !ran { 982 return fmt.Sprintf("index job for repository already running: %s", args), nil 983 } 984 if err != nil { 985 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 986 } 987 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 988} 989 990func listIndexed(indexDir string) []uint32 { 991 index := getShards(indexDir) 992 metricNumIndexed.Set(float64(len(index))) 993 repoIDs := make([]uint32, 0, len(index)) 994 for id := range index { 995 repoIDs = append(repoIDs, id) 996 } 997 sort.Slice(repoIDs, func(i, j int) bool { 998 return repoIDs[i] < repoIDs[j] 999 }) 1000 return repoIDs 1001} 1002 1003// setupTmpDir sets up a temporary directory on the same volume as the 1004// indexes. 1005// 1006// If main is true we will delete older temp directories left around. main is 1007// false when this is a debug command. 1008func setupTmpDir(logger sglog.Logger, main bool, index string) error { 1009 // change the target tmp directory depending on if it's our main daemon or a 1010 // debug sub command. 1011 dir := ".indexserver.debug.tmp" 1012 if main { 1013 dir = ".indexserver.tmp" 1014 } 1015 1016 tmpRoot := filepath.Join(index, dir) 1017 1018 if main { 1019 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1020 err := os.RemoveAll(tmpRoot) 1021 if err != nil { 1022 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1023 } 1024 } 1025 1026 if err := os.MkdirAll(tmpRoot, 0o755); err != nil { 1027 return err 1028 } 1029 1030 return os.Setenv("TMPDIR", tmpRoot) 1031} 1032 1033func printMetaData(fn string) error { 1034 repo, indexMeta, err := zoekt.ReadMetadataPath(fn) 1035 if err != nil { 1036 return err 1037 } 1038 1039 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1040 if err != nil { 1041 return err 1042 } 1043 1044 err = json.NewEncoder(os.Stdout).Encode(repo) 1045 if err != nil { 1046 return err 1047 } 1048 return nil 1049} 1050 1051func printShardStats(fn string) error { 1052 f, err := os.Open(fn) 1053 if err != nil { 1054 return err 1055 } 1056 1057 iFile, err := zoekt.NewIndexFile(f) 1058 if err != nil { 1059 return err 1060 } 1061 1062 return zoekt.PrintNgramStats(iFile) 1063} 1064 1065func srcLogLevelIsDebug() bool { 1066 lvl := os.Getenv(sglog.EnvLogLevel) 1067 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1068} 1069 1070func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1071 v := os.Getenv(k) 1072 if v == "" { 1073 return defaultVal 1074 } 1075 b, err := strconv.ParseBool(v) 1076 if err != nil { 1077 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1078 } 1079 return b 1080} 1081 1082func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1083 v := os.Getenv(k) 1084 if v == "" { 1085 return defaultVal 1086 } 1087 i, err := strconv.ParseInt(v, 10, 64) 1088 if err != nil { 1089 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1090 } 1091 return i 1092} 1093 1094func getEnvWithDefaultInt(k string, defaultVal int) int { 1095 v := os.Getenv(k) 1096 if v == "" { 1097 return defaultVal 1098 } 1099 i, err := strconv.Atoi(k) 1100 if err != nil { 1101 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1102 } 1103 return i 1104} 1105 1106func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1107 v := os.Getenv(k) 1108 if v == "" { 1109 return defaultVal 1110 } 1111 i, err := strconv.ParseUint(v, 10, 64) 1112 if err != nil { 1113 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1114 } 1115 return i 1116} 1117 1118func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1119 v := os.Getenv(k) 1120 if v == "" { 1121 return defaultVal 1122 } 1123 f, err := strconv.ParseFloat(v, 64) 1124 if err != nil { 1125 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1126 } 1127 return f 1128} 1129 1130func getEnvWithDefaultString(k string, defaultVal string) string { 1131 v := os.Getenv(k) 1132 if v == "" { 1133 return defaultVal 1134 } 1135 return v 1136} 1137 1138func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1139 v := os.Getenv(k) 1140 if v == "" { 1141 return defaultVal 1142 } 1143 1144 d, err := time.ParseDuration(v) 1145 if err != nil { 1146 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1147 } 1148 return d 1149} 1150 1151func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1152 set := map[string]struct{}{} 1153 for _, v := range strings.Split(os.Getenv(k), ",") { 1154 v = strings.TrimSpace(v) 1155 if v != "" { 1156 set[v] = struct{}{} 1157 } 1158 } 1159 return set 1160} 1161 1162func joinStringSet(set map[string]struct{}, sep string) string { 1163 var xs []string 1164 for x := range set { 1165 xs = append(xs, x) 1166 } 1167 1168 return strings.Join(xs, sep) 1169} 1170 1171func setCompoundShardCounter(indexDir string) { 1172 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt")) 1173 if err != nil { 1174 log.Printf("setCompoundShardCounter: %s\n", err) 1175 return 1176 } 1177 metricNumberCompoundShards.Set(float64(len(fns))) 1178} 1179 1180func rootCmd() *ffcli.Command { 1181 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1182 conf := rootConfig{ 1183 Main: true, 1184 } 1185 conf.registerRootFlags(rootFs) 1186 1187 return &ffcli.Command{ 1188 FlagSet: rootFs, 1189 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1190 Subcommands: []*ffcli.Command{debugCmd()}, 1191 Exec: func(ctx context.Context, args []string) error { 1192 return startServer(conf) 1193 }, 1194 } 1195} 1196 1197type rootConfig struct { 1198 // Main is true if this rootConfig is for our main long running command (the 1199 // indexserver). Debug commands should not set this value. This is used to 1200 // determine if we need to run tmpfriend. 1201 Main bool 1202 1203 root string 1204 interval time.Duration 1205 index string 1206 indexConcurrency int64 1207 listen string 1208 hostname string 1209 cpuFraction float64 1210 blockProfileRate int 1211 1212 // config values related to shard merging 1213 disableShardMerging bool 1214 vacuumInterval time.Duration 1215 mergeInterval time.Duration 1216 targetSize int64 1217 minSize int64 1218 minAgeDays int 1219 1220 // config values related to backoff indexing repos with one or more consecutive failures 1221 backoffDuration time.Duration 1222 maxBackoffDuration time.Duration 1223} 1224 1225func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1226 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1227 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1228 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently") 1229 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1230 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1231 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1232 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1233 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate") 1234 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1235 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1236 1237 // flags related to shard merging 1238 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging") 1239 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1240 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1241 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 1242 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 1243 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1244} 1245 1246func startServer(conf rootConfig) error { 1247 s, err := newServer(conf) 1248 if err != nil { 1249 return err 1250 } 1251 1252 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate) 1253 setCompoundShardCounter(s.IndexDir) 1254 1255 if conf.listen != "" { 1256 1257 mux := http.NewServeMux() 1258 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1259 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1260 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1261 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1262 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1263 }...) 1264 s.addDebugHandlers(mux) 1265 1266 go func() { 1267 debug.Printf("serving HTTP on %s", conf.listen) 1268 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1269 }() 1270 1271 // Serve mux on a unix domain socket on a best-effort-basis so that 1272 // webserver can call the endpoints via the shared filesystem. 1273 // 1274 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1275 // on the socket due to permission errors. See 1276 // https://github.com/docker/for-mac/issues/6239 1277 go func() { 1278 serveHTTPOverSocket := func() error { 1279 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1280 // We cannot bind a socket to an existing pathname. 1281 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1282 return fmt.Errorf("error removing socket file: %s", socket) 1283 } 1284 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1285 // unixpacket). 1286 l, err := net.Listen("unix", socket) 1287 if err != nil { 1288 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1289 } 1290 // Indexserver (root) and webserver (Sourcegraph) run with 1291 // different users. Per default, the socket is created with 1292 // permission 755 (root root), which doesn't let webserver write to 1293 // it. 1294 // 1295 // See https://github.com/golang/go/issues/11822 for more context. 1296 if err := os.Chmod(socket, 0o777); err != nil { 1297 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1298 } 1299 debug.Printf("serving HTTP on %s", socket) 1300 return http.Serve(l, mux) 1301 } 1302 debug.Print(serveHTTPOverSocket()) 1303 }() 1304 } 1305 1306 oc := &ownerChecker{ 1307 Path: filepath.Join(conf.index, "owner.txt"), 1308 Hostname: conf.hostname, 1309 } 1310 go oc.Run() 1311 1312 logger := sglog.Scoped("metricsRegistration") 1313 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1314 1315 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1316 prometheus.DefaultRegisterer.MustRegister(c) 1317 1318 s.Run() 1319 return nil 1320} 1321 1322func newServer(conf rootConfig) (*Server, error) { 1323 logger := sglog.Scoped("server") 1324 1325 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1326 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1327 } 1328 if conf.index == "" { 1329 return nil, fmt.Errorf("must set -index") 1330 } 1331 if conf.root == "" { 1332 return nil, fmt.Errorf("must set -sourcegraph_url") 1333 } 1334 rootURL, err := url.Parse(conf.root) 1335 if err != nil { 1336 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1337 } 1338 1339 rootURL = addDefaultPort(rootURL) 1340 1341 // Tune GOMAXPROCS to match Linux container CPU quota. 1342 _, _ = maxprocs.Set() 1343 1344 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler. 1345 // The block profiler is disabled by default and should be enabled with care in production 1346 runtime.SetBlockProfileRate(conf.blockProfileRate) 1347 1348 // Automatically prepend our own path at the front, to minimize 1349 // required configuration. 1350 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1351 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1352 } 1353 1354 if _, err := os.Stat(conf.index); err != nil { 1355 if err := os.MkdirAll(conf.index, 0o755); err != nil { 1356 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1357 } 1358 } 1359 1360 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1361 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1362 } 1363 1364 if srcLogLevelIsDebug() { 1365 debug = log.New(os.Stderr, "", log.LstdFlags) 1366 } 1367 1368 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1369 if len(reposWithSeparateIndexingMetrics) > 0 { 1370 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1371 } 1372 1373 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1374 if len(deltaBuildRepositoriesAllowList) > 0 { 1375 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1376 } 1377 1378 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1379 if deltaShardNumberFallbackThreshold > 0 { 1380 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1381 } else { 1382 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1383 } 1384 1385 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1386 if len(reposShouldSkipSymbolsCalculation) > 0 { 1387 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1388 } 1389 1390 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1391 if indexingTimeout != defaultIndexingTimeout { 1392 debug.Printf("using configured indexing timeout: %s", indexingTimeout) 1393 } 1394 1395 var sg Sourcegraph 1396 if rootURL.IsAbs() { 1397 var batchSize int 1398 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1399 batchSize, err = strconv.Atoi(v) 1400 if err != nil { 1401 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1402 } 1403 } 1404 1405 opts := []SourcegraphClientOption{ 1406 WithBatchSize(batchSize), 1407 } 1408 1409 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1410 client, err := dialGRPCClient(rootURL.Host, logger) 1411 if err != nil { 1412 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1413 } 1414 1415 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...) 1416 1417 } else { 1418 sg = sourcegraphFake{ 1419 RootDir: rootURL.String(), 1420 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1421 } 1422 } 1423 1424 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1425 if cpuCount < 1 { 1426 cpuCount = 1 1427 } 1428 1429 if conf.indexConcurrency < 1 { 1430 conf.indexConcurrency = 1 1431 } else if conf.indexConcurrency > int64(cpuCount) { 1432 conf.indexConcurrency = int64(cpuCount) 1433 } 1434 1435 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1436 1437 return &Server{ 1438 logger: logger, 1439 Sourcegraph: sg, 1440 IndexDir: conf.index, 1441 IndexConcurrency: int(conf.indexConcurrency), 1442 Interval: conf.interval, 1443 CPUCount: cpuCount, 1444 queue: *q, 1445 shardMerging: !conf.disableShardMerging, 1446 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1447 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1448 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1449 hostname: conf.hostname, 1450 mergeOpts: mergeOpts{ 1451 vacuumInterval: conf.vacuumInterval, 1452 mergeInterval: conf.mergeInterval, 1453 targetSizeBytes: conf.targetSize * 1024 * 1024, 1454 minSizeBytes: conf.minSize * 1024 * 1024, 1455 minAgeDays: conf.minAgeDays, 1456 }, 1457 timeout: indexingTimeout, 1458 }, err 1459} 1460 1461// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1462// for the indexed-search-configuration gRPC service. 1463// 1464// The default backoff strategy is modeled after the default settings used by 1465// retryablehttp.DefaultClient. 1466// 1467// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1468// - Unavailable 1469// - Aborted 1470// 1471//go:embed default_grpc_service_configuration.json 1472var defaultGRPCServiceConfigurationJSON string 1473 1474func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1475 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1476 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1477 return invoker(ctx, method, req, reply, cc, opts...) 1478 } 1479} 1480 1481func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1482 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1483 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1484 return streamer(ctx, desc, cc, method, opts...) 1485 } 1486} 1487 1488// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1489// This can be overridden by providing custom Server/Dial options. 1490const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1491 1492func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) { 1493 metrics := mustGetClientMetrics() 1494 1495 // If the service seems to be unavailable, this 1496 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1497 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1498 retryOpts := []retry.CallOption{ 1499 retry.WithMax(5), 1500 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1501 retry.WithCodes(codes.Unavailable), 1502 } 1503 1504 opts := []grpc.DialOption{ 1505 grpc.WithTransportCredentials(insecure.NewCredentials()), 1506 grpc.WithChainStreamInterceptor( 1507 metrics.StreamClientInterceptor(), 1508 messagesize.StreamClientInterceptor, 1509 internalActorStreamInterceptor(), 1510 internalerrs.LoggingStreamClientInterceptor(logger), 1511 internalerrs.PrometheusStreamClientInterceptor, 1512 retry.StreamClientInterceptor(retryOpts...), 1513 ), 1514 grpc.WithChainUnaryInterceptor( 1515 metrics.UnaryClientInterceptor(), 1516 messagesize.UnaryClientInterceptor, 1517 internalActorUnaryInterceptor(), 1518 internalerrs.LoggingUnaryClientInterceptor(logger), 1519 internalerrs.PrometheusUnaryClientInterceptor, 1520 retry.UnaryClientInterceptor(retryOpts...), 1521 ), 1522 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1523 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1524 } 1525 1526 opts = append(opts, additionalOpts...) 1527 1528 // Ensure that the message size options are set last, so they override any other 1529 // client-specific options that tweak the message size. 1530 // 1531 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1532 // take precedence over everything else with a uniform size setting that's easy to reason about. 1533 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1534 1535 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1536 // This is done lazily, so we can provide the client to use regardless of 1537 // whether we enabled gRPC or not initially. 1538 cc, err := grpc.Dial(addr, opts...) 1539 if err != nil { 1540 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1541 } 1542 1543 client := proto.NewZoektConfigurationServiceClient(cc) 1544 return client, nil 1545} 1546 1547// mustGetClientMetrics returns a singleton instance of the client metrics 1548// that are shared across all gRPC clients that this process creates. 1549// 1550// This function panics if the metrics cannot be registered with the default 1551// Prometheus registry. 1552func mustGetClientMetrics() *grpcprom.ClientMetrics { 1553 clientMetricsOnce.Do(func() { 1554 clientMetrics = grpcprom.NewClientMetrics( 1555 grpcprom.WithClientCounterOptions(), 1556 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 1557 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 1558 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 1559 ) 1560 1561 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 1562 }) 1563 1564 return clientMetrics 1565} 1566 1567// addDefaultPort adds a default port to a URL if one is not specified. 1568// 1569// If the URL scheme is "http" and no port is specified, "80" is used. 1570// If the scheme is "https", "443" is used. 1571// 1572// The original URL is not mutated. A copy is modified and returned. 1573func addDefaultPort(original *url.URL) *url.URL { 1574 if original == nil { 1575 return nil // don't panic 1576 } 1577 1578 if !original.IsAbs() { 1579 return original // don't do anything if the URL doesn't look like a remote URL 1580 } 1581 1582 if original.Scheme == "http" && original.Port() == "" { 1583 u := cloneURL(original) 1584 u.Host = net.JoinHostPort(u.Host, "80") 1585 return u 1586 } 1587 1588 if original.Scheme == "https" && original.Port() == "" { 1589 u := cloneURL(original) 1590 u.Host = net.JoinHostPort(u.Host, "443") 1591 return u 1592 } 1593 1594 return original 1595} 1596 1597// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1598// This is copied from net/http/clone.go 1599func cloneURL(u *url.URL) *url.URL { 1600 if u == nil { 1601 return nil 1602 } 1603 u2 := new(url.URL) 1604 *u2 = *u 1605 if u.User != nil { 1606 u2.User = new(url.Userinfo) 1607 *u2.User = *u.User 1608 } 1609 return u2 1610} 1611 1612func main() { 1613 liblog := sglog.Init(sglog.Resource{ 1614 Name: "zoekt-indexserver", 1615 Version: zoekt.Version, 1616 InstanceID: zoekt.HostnameBestEffort(), 1617 }) 1618 defer liblog.Sync() 1619 1620 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1621 log.Fatal(err) 1622 } 1623} 1624 1625// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1626// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1627// 1628// An error is returned of the provided environment variables fails to parse as a boolean. 1629func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1630 for _, envVar := range envVarNames { 1631 v := os.Getenv(envVar) 1632 if v == "" { 1633 continue 1634 } 1635 1636 b, err := strconv.ParseBool(v) 1637 if err != nil { 1638 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1639 } 1640 1641 return b, nil 1642 } 1643 1644 return defaultBool, nil 1645}