fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled 2// repositories on sourcegraph 3package main 4 5import ( 6 "bytes" 7 "context" 8 _ "embed" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "html/template" 14 "io" 15 "io/fs" 16 "log" 17 "math" 18 "math/rand" 19 "net" 20 "net/http" 21 "net/url" 22 "os" 23 "os/exec" 24 "os/signal" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strconv" 29 "strings" 30 "sync" 31 "text/tabwriter" 32 "time" 33 34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 36 "github.com/peterbourgon/ff/v3/ffcli" 37 "github.com/prometheus/client_golang/prometheus" 38 "github.com/prometheus/client_golang/prometheus/promauto" 39 sglog "github.com/sourcegraph/log" 40 "github.com/sourcegraph/mountinfo" 41 "go.uber.org/automaxprocs/maxprocs" 42 "golang.org/x/net/trace" 43 "golang.org/x/sys/unix" 44 "google.golang.org/grpc" 45 "google.golang.org/grpc/codes" 46 "google.golang.org/grpc/credentials/insecure" 47 "google.golang.org/grpc/metadata" 48 49 "github.com/sourcegraph/zoekt" 50 "github.com/sourcegraph/zoekt/build" 51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 52 "github.com/sourcegraph/zoekt/debugserver" 53 "github.com/sourcegraph/zoekt/grpc/internalerrs" 54 "github.com/sourcegraph/zoekt/grpc/messagesize" 55 "github.com/sourcegraph/zoekt/internal/profiler" 56) 57 58var ( 59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 60 Name: "resolve_revisions_seconds", 61 Help: "A histogram of latencies for resolving all repository revisions.", 62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 63 }) 64 65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 66 Name: "resolve_revision_seconds", 67 Help: "A histogram of latencies for resolving a repository revision.", 68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 69 }, []string{"success"}) // success=true|false 70 71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 72 Name: "get_index_options_total", 73 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 74 }) 75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 76 Name: "get_index_options_error_total", 77 Help: "The total number of times we failed to get index options for a repository.", 78 }) 79 80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 81 Name: "index_repo_seconds", 82 Help: "A histogram of latencies for indexing a repository.", 83 Buckets: prometheus.ExponentialBucketsRange( 84 (100 * time.Millisecond).Seconds(), 85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 86 20), 87 }, []string{ 88 "state", // state is an indexState 89 "name", // name of the repository that was indexed 90 }) 91 92 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 93 Name: "index_fetch_seconds", 94 Help: "A histogram of latencies for fetching a repository.", 95 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 96 }, []string{ 97 "success", // true|false 98 "name", // the name of the repository that the commits were fetched from 99 }) 100 101 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 102 Name: "index_incremental_index_state", 103 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 104 }, []string{"state"}) // state is build.IndexState 105 106 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 107 Name: "index_num_indexed", 108 Help: "Number of indexed repos by code host", 109 }) 110 111 metricNumAssigned = promauto.NewGauge(prometheus.GaugeOpts{ 112 Name: "index_num_assigned", 113 Help: "Number of repos assigned to this indexer by code host", 114 }) 115 116 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 117 Name: "index_failing_total", 118 Help: "Counts failures to index (indexing activity, should be used with rate())", 119 }) 120 121 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 122 Name: "index_indexing_total", 123 Help: "Counts indexings (indexing activity, should be used with rate())", 124 }) 125 126 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 127 Name: "index_num_stopped_tracking_total", 128 Help: "Counts the number of repos we stopped tracking.", 129 }) 130 131 clientMetricsOnce sync.Once 132 clientMetrics *grpcprom.ClientMetrics 133) 134 135// set of repositories that we want to capture separate indexing metrics for 136var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 137 138type indexState string 139 140const ( 141 indexStateFail indexState = "fail" 142 indexStateSuccess indexState = "success" 143 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 144 indexStateNoop indexState = "noop" // We didn't need to update index 145 indexStateEmpty indexState = "empty" // index is empty (empty repo) 146) 147 148// Server is the main functionality of zoekt-sourcegraph-indexserver. It 149// exists to conveniently use all the options passed in via func main. 150type Server struct { 151 logger sglog.Logger 152 153 Sourcegraph Sourcegraph 154 BatchSize int 155 156 // IndexDir is the index directory to use. 157 IndexDir string 158 159 // IndexConcurrency is the number of repositories we index at once. 160 IndexConcurrency int 161 162 // Interval is how often we sync with Sourcegraph. 163 Interval time.Duration 164 // CPUCount is the amount of parallelism to use when indexing a 165 // repository. 166 CPUCount int 167 168 queue Queue 169 170 // muIndexDir protects the index directory from concurrent access. 171 muIndexDir indexMutex 172 173 // If true, shard merging is enabled. 174 shardMerging bool 175 176 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 177 // use delta-builds for instead of normal builds 178 deltaBuildRepositoriesAllowList map[string]struct{} 179 180 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 181 // before attempting a delta build. 182 deltaShardNumberFallbackThreshold uint64 183 184 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 185 // we skip calculating symbols metadata for during builds 186 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 187 188 hostname string 189 190 mergeOpts mergeOpts 191 192 // timeout defines how long the index server waits before killing an indexing job. 193 timeout time.Duration 194} 195 196var debug = log.New(io.Discard, "", log.LstdFlags) 197 198// our index commands should output something every 100mb they process. 199// 200// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 201// time." famous last words. A client was indexing a monorepo with 42 202// cores... 5m was not enough. 203const noOutputTimeout = 30 * time.Minute 204 205func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 206 out := &synchronizedBuffer{} 207 cmd.Stdout = out 208 cmd.Stderr = out 209 210 tr.LazyPrintf("%s", cmd.Args) 211 212 defer func() { 213 if err != nil { 214 outS := out.String() 215 tr.LazyPrintf("failed: %v", err) 216 tr.LazyPrintf("output: %s", out) 217 tr.SetError() 218 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 219 } 220 }() 221 222 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 223 224 if err := cmd.Start(); err != nil { 225 return err 226 } 227 228 errC := make(chan error) 229 go func() { 230 errC <- cmd.Wait() 231 }() 232 233 // This channel is set after we have sent sigquit. It allows us to follow up 234 // with a sigkill if the process doesn't quit after sigquit. 235 kill := make(<-chan time.Time) 236 237 lastLen := 0 238 for { 239 select { 240 case <-time.After(noOutputTimeout): 241 // Periodically check if we have had output. If not kill the process. 242 if out.Len() != lastLen { 243 lastLen = out.Len() 244 log.Printf("still running %s", cmd.Args) 245 } else { 246 // Send quit (C-\) first so we get a stack dump. 247 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 248 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 249 log.Println("quit failed:", err) 250 } 251 252 // send sigkill if still running in 10s 253 kill = time.After(10 * time.Second) 254 } 255 256 case <-kill: 257 log.Printf("still running, killing %s", cmd.Args) 258 if err := cmd.Process.Kill(); err != nil { 259 log.Println("kill failed:", err) 260 } 261 262 case err := <-errC: 263 if err != nil { 264 return err 265 } 266 267 tr.LazyPrintf("success") 268 return nil 269 } 270 } 271} 272 273// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 274// monitor the buffer while it is being written to. 275type synchronizedBuffer struct { 276 mu sync.Mutex 277 b bytes.Buffer 278} 279 280func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 281 sb.mu.Lock() 282 defer sb.mu.Unlock() 283 return sb.b.Write(p) 284} 285 286func (sb *synchronizedBuffer) Len() int { 287 sb.mu.Lock() 288 defer sb.mu.Unlock() 289 return sb.b.Len() 290} 291 292func (sb *synchronizedBuffer) String() string { 293 sb.mu.Lock() 294 defer sb.mu.Unlock() 295 return sb.b.String() 296} 297 298// pauseFileName if present in IndexDir will stop index jobs from 299// running. This is to make it possible to experiment with the content of the 300// IndexDir without the indexserver writing to it. 301const pauseFileName = "PAUSE" 302 303// Run the sync loop. This blocks forever. 304func (s *Server) Run() { 305 removeIncompleteShards(s.IndexDir) 306 307 // Start a goroutine which updates the queue with commits to index. 308 go func() { 309 // We update the list of indexed repos every Interval. To speed up manual 310 // testing we also listen for SIGUSR1 to trigger updates. 311 // 312 // "pkill -SIGUSR1 zoekt-sourcegra" 313 for range jitterTicker(s.Interval, unix.SIGUSR1) { 314 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 315 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 316 continue 317 } 318 319 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 320 if err != nil { 321 log.Printf("error listing repos: %s", err) 322 continue 323 } 324 325 debug.Printf("updating index queue with %d repositories", len(repos.IDs)) 326 327 // Stop indexing repos we don't need to track anymore 328 removed := s.queue.MaybeRemoveMissing(repos.IDs) 329 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 330 if len(removed) > 0 { 331 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 332 } 333 334 cleanupDone := make(chan struct{}) 335 go func() { 336 defer close(cleanupDone) 337 s.muIndexDir.Global(func() { 338 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 339 }) 340 }() 341 342 repos.IterateIndexOptions(s.queue.AddOrUpdate) 343 344 // IterateIndexOptions will only iterate over repositories that have 345 // changed since we last called list. However, we want to add all IDs 346 // back onto the queue just to check that what is on disk is still 347 // correct. This will use the last IndexOptions we stored in the 348 // queue. The repositories not on the queue (missing) need a forced 349 // fetch of IndexOptions. 350 missing := s.queue.Bump(repos.IDs) 351 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 352 353 setCompoundShardCounter(s.IndexDir) 354 355 <-cleanupDone 356 } 357 }() 358 359 go func() { 360 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 361 if s.shardMerging { 362 s.vacuum() 363 } 364 } 365 }() 366 367 go func() { 368 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 369 if s.shardMerging { 370 s.doMerge() 371 } 372 } 373 }() 374 375 for i := 0; i < s.IndexConcurrency; i++ { 376 go s.processQueue() 377 } 378 379 // block forever 380 select {} 381} 382 383// formatList returns a comma-separated list of the first min(len(v), m) items. 384func formatListUint32(v []uint32, m int) string { 385 if len(v) < m { 386 m = len(v) 387 } 388 389 sb := strings.Builder{} 390 for i := 0; i < m; i++ { 391 fmt.Fprintf(&sb, "%d, ", v[i]) 392 } 393 394 if len(v) > m { 395 sb.WriteString("...") 396 } 397 398 return strings.TrimRight(sb.String(), ", ") 399} 400 401func (s *Server) processQueue() { 402 for { 403 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 404 time.Sleep(time.Second) 405 continue 406 } 407 408 opts, ok := s.queue.Pop() 409 if !ok { 410 time.Sleep(time.Second) 411 continue 412 } 413 414 args := s.indexArgs(opts) 415 416 ran := s.muIndexDir.With(opts.Name, func() { 417 // only record time taken once we hold the lock. This avoids us 418 // recording time taken while merging/cleanup runs. 419 start := time.Now() 420 421 state, err := s.Index(args) 422 423 elapsed := time.Since(start) 424 425 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 426 427 if err != nil { 428 log.Printf("error indexing %s: %s", args.String(), err) 429 } 430 431 switch state { 432 case indexStateSuccess: 433 var branches []string 434 for _, b := range args.Branches { 435 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 436 } 437 s.logger.Info("updated index", 438 sglog.String("repo", args.Name), 439 sglog.Uint32("id", args.RepoID), 440 sglog.Strings("branches", branches), 441 sglog.Duration("duration", elapsed), 442 ) 443 case indexStateSuccessMeta: 444 log.Printf("updated meta %s in %v", args.String(), elapsed) 445 } 446 s.queue.SetIndexed(opts, state) 447 }) 448 449 if !ran { 450 // Someone else is processing the repository. We can just skip this job 451 // since the repository will be added back to the queue and we will 452 // converge to the correct behaviour. 453 debug.Printf("index job for repository already running: %s", args) 454 continue 455 } 456 } 457} 458 459// repoNameForMetric returns a normalized version of the given repository name that is 460// suitable for use with Prometheus metrics. 461func repoNameForMetric(repo string) string { 462 // Check to see if we want to be able to capture separate indexing metrics for this repository. 463 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 464 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 465 return repo 466 } 467 468 return "" 469} 470 471func batched(slice []uint32, size int) <-chan []uint32 { 472 c := make(chan []uint32) 473 go func() { 474 for len(slice) > 0 { 475 if size > len(slice) { 476 size = len(slice) 477 } 478 c <- slice[:size] 479 slice = slice[size:] 480 } 481 close(c) 482 }() 483 return c 484} 485 486// jitterTicker returns a ticker which ticks with a jitter. Each tick is 487// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 488// 489// sig is a list of signals which also cause the ticker to fire. This is a 490// convenience to allow manually triggering of the ticker. 491func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 492 ticker := make(chan struct{}) 493 494 go func() { 495 for { 496 ticker <- struct{}{} 497 ns := int64(d) 498 jitter := rand.Int63n(ns) 499 time.Sleep(time.Duration(ns/2 + jitter)) 500 } 501 }() 502 503 go func() { 504 if len(sig) == 0 { 505 return 506 } 507 508 c := make(chan os.Signal, 1) 509 signal.Notify(c, sig...) 510 for range c { 511 ticker <- struct{}{} 512 } 513 }() 514 515 return ticker 516} 517 518// Index starts an index job for repo name at commit. 519func (s *Server) Index(args *indexArgs) (state indexState, err error) { 520 tr := trace.New("index", args.Name) 521 522 defer func() { 523 if err != nil { 524 tr.SetError() 525 tr.LazyPrintf("error: %v", err) 526 state = indexStateFail 527 metricFailingTotal.Inc() 528 } 529 tr.LazyPrintf("state: %s", state) 530 tr.Finish() 531 }() 532 533 tr.LazyPrintf("branches: %v", args.Branches) 534 535 if len(args.Branches) == 0 { 536 return indexStateEmpty, createEmptyShard(args) 537 } 538 539 repositoryName := args.Name 540 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 541 tr.LazyPrintf("marking this repository for delta build") 542 args.UseDelta = true 543 } 544 545 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 546 547 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 548 tr.LazyPrintf("skipping symbols calculation") 549 args.Symbols = false 550 } 551 552 reason := "forced" 553 554 if args.Incremental { 555 bo := args.BuildOptions() 556 bo.SetDefaults() 557 incrementalState, fn := bo.IndexState() 558 reason = string(incrementalState) 559 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 560 561 switch incrementalState { 562 case build.IndexStateEqual: 563 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn) 564 return indexStateNoop, nil 565 566 case build.IndexStateMeta: 567 log.Printf("updating index.meta %s", args.String()) 568 569 if err := mergeMeta(bo); err != nil { 570 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 571 } else { 572 return indexStateSuccessMeta, nil 573 } 574 575 case build.IndexStateCorrupt: 576 log.Printf("falling back to full update: corrupt index: %s", args.String()) 577 } 578 } 579 580 log.Printf("updating index %s reason=%s", args.String(), reason) 581 582 metricIndexingTotal.Inc() 583 c := gitIndexConfig{ 584 runCmd: func(cmd *exec.Cmd) error { 585 return s.loggedRun(tr, cmd) 586 }, 587 588 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 589 return args.BuildOptions().FindRepositoryMetadata() 590 }, 591 timeout: s.timeout, 592 } 593 594 err = gitIndex(c, args, s.Sourcegraph, s.logger) 595 if err != nil { 596 return indexStateFail, err 597 } 598 599 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 600 s.logger.Error("failed to update index status", 601 sglog.String("repo", args.Name), 602 sglog.Uint32("id", args.RepoID), 603 sglogBranches("branches", args.Branches), 604 sglog.Error(err), 605 ) 606 } 607 608 return indexStateSuccess, nil 609} 610 611// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 612// it can update the zoekt_repos table. 613func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 614 // We need to read from disk for IndexTime. 615 _, metadata, ok, err := c.findRepositoryMetadata(args) 616 if err != nil { 617 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 618 } 619 if !ok { 620 return errors.New("failed to find metadata for new/updated index") 621 } 622 623 status := []indexStatus{{ 624 RepoID: args.RepoID, 625 Branches: args.Branches, 626 IndexTimeUnix: metadata.IndexTime.Unix(), 627 }} 628 if err := sg.UpdateIndexStatus(status); err != nil { 629 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 630 } 631 632 return nil 633} 634 635func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 636 ss := make([]string, len(branches)) 637 for i, b := range branches { 638 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 639 } 640 return sglog.Strings(key, ss) 641} 642 643func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 644 return &indexArgs{ 645 IndexOptions: opts, 646 647 IndexDir: s.IndexDir, 648 Parallelism: s.CPUCount, 649 650 Incremental: true, 651 652 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 653 FileLimit: 1 << 20, 654 } 655} 656 657func createEmptyShard(args *indexArgs) error { 658 bo := args.BuildOptions() 659 bo.SetDefaults() 660 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 661 662 if args.Incremental && bo.IncrementalSkipIndexing() { 663 return nil 664 } 665 666 builder, err := build.NewBuilder(*bo) 667 if err != nil { 668 return err 669 } 670 return builder.Finish() 671} 672 673// addDebugHandlers adds handlers specific to indexserver. 674func (s *Server) addDebugHandlers(mux *http.ServeMux) { 675 // Sourcegraph's site admin view requires indexserver to serve it's admin view 676 // on "/". 677 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 678 679 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 680 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 681 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 682 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 683 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 684 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 685} 686 687func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 688 if r.Method != "GET" { 689 w.Header().Set("Allow", "GET") 690 w.WriteHeader(http.StatusMethodNotAllowed) 691 return 692 } 693 694 response := struct { 695 Hostname string 696 }{ 697 Hostname: s.hostname, 698 } 699 700 b, err := json.Marshal(response) 701 if err != nil { 702 http.Error(w, err.Error(), http.StatusInternalServerError) 703 return 704 } 705 706 w.Header().Set("Content-Type", "application/json; charset=utf-8") 707 w.Write(b) 708} 709 710var rootTmpl = template.Must(template.New("name").Parse(` 711<html> 712 <body> 713 <a href="debug">Debug</a><br /> 714 <a href="debug/requests">Traces</a><br /> 715 {{.IndexMsg}}<br /> 716 <br /> 717 <h3>Reindex</h3> 718 {{if .Repos}} 719 <a href="?show_repos=false">hide repos</a><br /> 720 <table style="margin-top: 20px"> 721 <th style="text-align:left">Name</th> 722 <th style="text-align:left">ID</th> 723 {{range .Repos}} 724 <tr> 725 <td>{{.Name}}</td> 726 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 727 </tr> 728 {{end}} 729 </table> 730 {{else}} 731 <a href="?show_repos=true">show repos</a><br /> 732 {{end}} 733 </body> 734</html> 735`)) 736 737func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 738 if r.Method != "GET" { 739 w.Header().Set("Allow", "GET") 740 w.WriteHeader(http.StatusMethodNotAllowed) 741 return 742 } 743 744 values := r.URL.Query() 745 746 // ?id= 747 indexMsg := "" 748 if v := values.Get("id"); v != "" { 749 id, err := strconv.Atoi(v) 750 if err != nil { 751 http.Error(w, err.Error(), http.StatusBadRequest) 752 return 753 } 754 indexMsg, _ = s.forceIndex(uint32(id)) 755 } 756 757 // ?show_repos= 758 showRepos := false 759 if v := values.Get("show_repos"); v != "" { 760 showRepos, _ = strconv.ParseBool(v) 761 } 762 763 type Repo struct { 764 ID uint32 765 Name string 766 } 767 var data struct { 768 Repos []Repo 769 IndexMsg string 770 } 771 772 data.IndexMsg = indexMsg 773 774 if showRepos { 775 s.queue.Iterate(func(opts *IndexOptions) { 776 data.Repos = append(data.Repos, Repo{ 777 ID: opts.RepoID, 778 Name: opts.Name, 779 }) 780 }) 781 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 782 } 783 784 _ = rootTmpl.Execute(w, data) 785} 786 787// handleReindex triggers a reindex asynocronously. If a reindex was triggered 788// the request returns with status 202. The caller can infer the new state of 789// the index by calling List. 790func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 791 if r.Method != http.MethodPost { 792 w.Header().Set("Allow", http.MethodPost) 793 w.WriteHeader(http.StatusMethodNotAllowed) 794 return 795 } 796 797 err := r.ParseForm() 798 if err != nil { 799 http.Error(w, err.Error(), http.StatusBadRequest) 800 return 801 } 802 803 id, err := strconv.Atoi(r.Form.Get("repo")) 804 if err != nil { 805 http.Error(w, err.Error(), http.StatusBadRequest) 806 return 807 } 808 809 go func() { s.forceIndex(uint32(id)) }() 810 811 // 202 Accepted 812 w.WriteHeader(http.StatusAccepted) 813} 814 815func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 816 withIndexed := true 817 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 818 withIndexed = b 819 } 820 821 var indexed []uint32 822 if withIndexed { 823 indexed = listIndexed(s.IndexDir) 824 } 825 826 repos, err := s.Sourcegraph.List(r.Context(), indexed) 827 if err != nil { 828 http.Error(w, err.Error(), http.StatusInternalServerError) 829 return 830 } 831 832 bw := bytes.Buffer{} 833 834 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 835 836 _, err = fmt.Fprintf(tw, "ID\tName\n") 837 if err != nil { 838 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 839 return 840 } 841 842 s.queue.mu.Lock() 843 name := "" 844 for _, id := range repos.IDs { 845 if item := s.queue.get(id); item != nil { 846 name = item.opts.Name 847 } else { 848 name = "" 849 } 850 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 851 if err != nil { 852 debug.Printf("handleDebugList: %s\n", err.Error()) 853 } 854 } 855 s.queue.mu.Unlock() 856 857 if err != nil { 858 http.Error(w, err.Error(), http.StatusInternalServerError) 859 return 860 } 861 862 err = tw.Flush() 863 if err != nil { 864 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 865 return 866 } 867 868 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 869 870 if _, err := io.Copy(w, &bw); err != nil { 871 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 872 return 873 } 874} 875 876// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 877// can run this command during periods of low usage (evenings, weekends) to 878// trigger an initial merge run. In the steady-state, merges happen rarely, even 879// on busy instances, and users can rely on automatic merging instead. 880func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 881 882 // A merge operation can take very long, depending on the number merges and the 883 // target size of the compound shards. We run the merge in the background and 884 // return immediately to the user. 885 // 886 // We track the status of the merge with metricShardMergingRunning. 887 go func() { 888 s.doMerge() 889 }() 890 _, _ = w.Write([]byte("merging enqueued\n")) 891} 892 893func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 894 indexed := listIndexed(s.IndexDir) 895 896 bw := bytes.Buffer{} 897 898 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 899 900 _, err := fmt.Fprintf(tw, "ID\tName\n") 901 if err != nil { 902 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 903 return 904 } 905 906 s.queue.mu.Lock() 907 name := "" 908 for _, id := range indexed { 909 if item := s.queue.get(id); item != nil { 910 name = item.opts.Name 911 } else { 912 name = "" 913 } 914 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 915 if err != nil { 916 debug.Printf("handleDebugIndexed: %s\n", err.Error()) 917 } 918 } 919 s.queue.mu.Unlock() 920 921 if err != nil { 922 http.Error(w, err.Error(), http.StatusInternalServerError) 923 return 924 } 925 926 err = tw.Flush() 927 if err != nil { 928 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 929 return 930 } 931 932 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 933 934 if _, err := io.Copy(w, &bw); err != nil { 935 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 936 return 937 } 938} 939 940// forceIndex will run the index job for repo name now. It will return always 941// return a string explaining what it did, even if it failed. 942func (s *Server) forceIndex(id uint32) (string, error) { 943 var opts IndexOptions 944 var err error 945 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 946 opts = o 947 }, func(_ uint32, e error) { 948 err = e 949 }, id) 950 if err != nil { 951 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 952 } 953 954 args := s.indexArgs(opts) 955 args.Incremental = false // force re-index 956 957 var state indexState 958 ran := s.muIndexDir.With(opts.Name, func() { 959 state, err = s.Index(args) 960 }) 961 if !ran { 962 return fmt.Sprintf("index job for repository already running: %s", args), nil 963 } 964 if err != nil { 965 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 966 } 967 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 968} 969 970func listIndexed(indexDir string) []uint32 { 971 index := getShards(indexDir) 972 metricNumIndexed.Set(float64(len(index))) 973 repoIDs := make([]uint32, 0, len(index)) 974 for id := range index { 975 repoIDs = append(repoIDs, id) 976 } 977 sort.Slice(repoIDs, func(i, j int) bool { 978 return repoIDs[i] < repoIDs[j] 979 }) 980 return repoIDs 981} 982 983// setupTmpDir sets up a temporary directory on the same volume as the 984// indexes. 985// 986// If main is true we will delete older temp directories left around. main is 987// false when this is a debug command. 988func setupTmpDir(logger sglog.Logger, main bool, index string) error { 989 // change the target tmp directory depending on if it's our main daemon or a 990 // debug sub command. 991 dir := ".indexserver.debug.tmp" 992 if main { 993 dir = ".indexserver.tmp" 994 } 995 996 tmpRoot := filepath.Join(index, dir) 997 998 if main { 999 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1000 err := os.RemoveAll(tmpRoot) 1001 if err != nil { 1002 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1003 } 1004 } 1005 1006 if err := os.MkdirAll(tmpRoot, 0755); err != nil { 1007 return err 1008 } 1009 1010 return os.Setenv("TMPDIR", tmpRoot) 1011} 1012 1013func printMetaData(fn string) error { 1014 repo, indexMeta, err := zoekt.ReadMetadataPath(fn) 1015 if err != nil { 1016 return err 1017 } 1018 1019 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1020 if err != nil { 1021 return err 1022 } 1023 1024 err = json.NewEncoder(os.Stdout).Encode(repo) 1025 if err != nil { 1026 return err 1027 } 1028 return nil 1029} 1030 1031func printShardStats(fn string) error { 1032 f, err := os.Open(fn) 1033 if err != nil { 1034 return err 1035 } 1036 1037 iFile, err := zoekt.NewIndexFile(f) 1038 if err != nil { 1039 return err 1040 } 1041 1042 return zoekt.PrintNgramStats(iFile) 1043} 1044 1045func srcLogLevelIsDebug() bool { 1046 lvl := os.Getenv(sglog.EnvLogLevel) 1047 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1048} 1049 1050func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1051 v := os.Getenv(k) 1052 if v == "" { 1053 return defaultVal 1054 } 1055 i, err := strconv.ParseInt(v, 10, 64) 1056 if err != nil { 1057 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1058 } 1059 return i 1060} 1061 1062func getEnvWithDefaultInt(k string, defaultVal int) int { 1063 v := os.Getenv(k) 1064 if v == "" { 1065 return defaultVal 1066 } 1067 i, err := strconv.Atoi(k) 1068 if err != nil { 1069 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1070 } 1071 return i 1072} 1073 1074func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1075 v := os.Getenv(k) 1076 if v == "" { 1077 return defaultVal 1078 } 1079 i, err := strconv.ParseUint(v, 10, 64) 1080 if err != nil { 1081 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1082 } 1083 return i 1084} 1085 1086func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1087 v := os.Getenv(k) 1088 if v == "" { 1089 return defaultVal 1090 } 1091 f, err := strconv.ParseFloat(v, 64) 1092 if err != nil { 1093 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1094 } 1095 return f 1096} 1097 1098func getEnvWithDefaultString(k string, defaultVal string) string { 1099 v := os.Getenv(k) 1100 if v == "" { 1101 return defaultVal 1102 } 1103 return v 1104} 1105 1106func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1107 v := os.Getenv(k) 1108 if v == "" { 1109 return defaultVal 1110 } 1111 1112 d, err := time.ParseDuration(v) 1113 if err != nil { 1114 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1115 } 1116 return d 1117} 1118 1119func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1120 v := os.Getenv(k) 1121 if v == "" { 1122 return defaultVal 1123 } 1124 1125 b, err := strconv.ParseBool(v) 1126 if err != nil { 1127 log.Fatalf("error parsing ENV %s to bool: %s", k, err) 1128 } 1129 return b 1130} 1131 1132func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1133 set := map[string]struct{}{} 1134 for _, v := range strings.Split(os.Getenv(k), ",") { 1135 v = strings.TrimSpace(v) 1136 if v != "" { 1137 set[v] = struct{}{} 1138 } 1139 } 1140 return set 1141} 1142 1143func joinStringSet(set map[string]struct{}, sep string) string { 1144 var xs []string 1145 for x := range set { 1146 xs = append(xs, x) 1147 } 1148 1149 return strings.Join(xs, sep) 1150} 1151 1152func setCompoundShardCounter(indexDir string) { 1153 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt")) 1154 if err != nil { 1155 log.Printf("setCompoundShardCounter: %s\n", err) 1156 return 1157 } 1158 metricNumberCompoundShards.Set(float64(len(fns))) 1159} 1160 1161func rootCmd() *ffcli.Command { 1162 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1163 conf := rootConfig{ 1164 Main: true, 1165 } 1166 conf.registerRootFlags(rootFs) 1167 1168 return &ffcli.Command{ 1169 FlagSet: rootFs, 1170 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1171 Subcommands: []*ffcli.Command{debugCmd()}, 1172 Exec: func(ctx context.Context, args []string) error { 1173 return startServer(conf) 1174 }, 1175 } 1176} 1177 1178type rootConfig struct { 1179 // Main is true if this rootConfig is for our main long running command (the 1180 // indexserver). Debug commands should not set this value. This is used to 1181 // determine if we need to run tmpfriend. 1182 Main bool 1183 1184 root string 1185 interval time.Duration 1186 index string 1187 indexConcurrency int64 1188 listen string 1189 hostname string 1190 cpuFraction float64 1191 blockProfileRate int 1192 1193 // config values related to shard merging 1194 vacuumInterval time.Duration 1195 mergeInterval time.Duration 1196 targetSize int64 1197 minSize int64 1198 minAgeDays int 1199 maxPriority float64 1200 1201 // config values related to backoff indexing repos with one or more consecutive failures 1202 backoffDuration time.Duration 1203 maxBackoffDuration time.Duration 1204 1205 // useGRPC is true if we should use the gRPC API to talk to Sourcegraph. 1206 useGRPC bool 1207} 1208 1209func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1210 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1211 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1212 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of concurrent index jobs to run.") 1213 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1214 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1215 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1216 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1217 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate") 1218 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1219 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1220 fs.BoolVar(&rc.useGRPC, "use_grpc", mustGetBoolFromEnvironmentVariables([]string{"GRPC_ENABLED", "SG_FEATURE_FLAG_GRPC"}, true), "use the gRPC API to talk to Sourcegraph") 1221 1222 // flags related to shard merging 1223 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1224 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1225 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 1226 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 1227 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1228 fs.Float64Var(&rc.maxPriority, "merge_max_priority", getEnvWithDefaultFloat64("SRC_MERGE_MAX_PRIORITY", 100), "the maximum priority a shard can have to be considered for merging.") 1229} 1230 1231func startServer(conf rootConfig) error { 1232 s, err := newServer(conf) 1233 if err != nil { 1234 return err 1235 } 1236 1237 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate) 1238 setCompoundShardCounter(s.IndexDir) 1239 1240 if conf.listen != "" { 1241 1242 mux := http.NewServeMux() 1243 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1244 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1245 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1246 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1247 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1248 }...) 1249 s.addDebugHandlers(mux) 1250 1251 go func() { 1252 debug.Printf("serving HTTP on %s", conf.listen) 1253 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1254 1255 }() 1256 1257 // Serve mux on a unix domain socket on a best-effort-basis so that 1258 // webserver can call the endpoints via the shared filesystem. 1259 // 1260 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1261 // on the socket due to permission errors. See 1262 // https://github.com/docker/for-mac/issues/6239 1263 go func() { 1264 serveHTTPOverSocket := func() error { 1265 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1266 // We cannot bind a socket to an existing pathname. 1267 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1268 return fmt.Errorf("error removing socket file: %s", socket) 1269 } 1270 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1271 // unixpacket). 1272 l, err := net.Listen("unix", socket) 1273 if err != nil { 1274 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1275 } 1276 // Indexserver (root) and webserver (Sourcegraph) run with 1277 // different users. Per default, the socket is created with 1278 // permission 755 (root root), which doesn't let webserver write to 1279 // it. 1280 // 1281 // See https://github.com/golang/go/issues/11822 for more context. 1282 if err := os.Chmod(socket, 0777); err != nil { 1283 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1284 } 1285 debug.Printf("serving HTTP on %s", socket) 1286 return http.Serve(l, mux) 1287 } 1288 debug.Print(serveHTTPOverSocket()) 1289 }() 1290 } 1291 1292 oc := &ownerChecker{ 1293 Path: filepath.Join(conf.index, "owner.txt"), 1294 Hostname: conf.hostname, 1295 } 1296 go oc.Run() 1297 1298 logger := sglog.Scoped("metricsRegistration") 1299 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1300 1301 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1302 prometheus.DefaultRegisterer.MustRegister(c) 1303 1304 s.Run() 1305 return nil 1306} 1307 1308func newServer(conf rootConfig) (*Server, error) { 1309 logger := sglog.Scoped("server") 1310 1311 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1312 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1313 } 1314 if conf.index == "" { 1315 return nil, fmt.Errorf("must set -index") 1316 } 1317 if conf.root == "" { 1318 return nil, fmt.Errorf("must set -sourcegraph_url") 1319 } 1320 rootURL, err := url.Parse(conf.root) 1321 if err != nil { 1322 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1323 } 1324 1325 rootURL = addDefaultPort(rootURL) 1326 1327 // Tune GOMAXPROCS to match Linux container CPU quota. 1328 _, _ = maxprocs.Set() 1329 1330 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler. 1331 // The block profiler is disabled by default and should be enabled with care in production 1332 runtime.SetBlockProfileRate(conf.blockProfileRate) 1333 1334 // Automatically prepend our own path at the front, to minimize 1335 // required configuration. 1336 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1337 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1338 } 1339 1340 if _, err := os.Stat(conf.index); err != nil { 1341 if err := os.MkdirAll(conf.index, 0755); err != nil { 1342 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1343 } 1344 } 1345 1346 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1347 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1348 } 1349 1350 if srcLogLevelIsDebug() { 1351 debug = log.New(os.Stderr, "", log.LstdFlags) 1352 } 1353 1354 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1355 if len(reposWithSeparateIndexingMetrics) > 0 { 1356 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1357 } 1358 1359 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1360 if len(deltaBuildRepositoriesAllowList) > 0 { 1361 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1362 } 1363 1364 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1365 if deltaShardNumberFallbackThreshold > 0 { 1366 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1367 } else { 1368 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1369 } 1370 1371 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1372 if len(reposShouldSkipSymbolsCalculation) > 0 { 1373 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1374 } 1375 1376 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1377 if indexingTimeout != defaultIndexingTimeout { 1378 debug.Printf("using configured indexing timeout: %s", indexingTimeout) 1379 } 1380 1381 var sg Sourcegraph 1382 if rootURL.IsAbs() { 1383 var batchSize int 1384 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1385 batchSize, err = strconv.Atoi(v) 1386 if err != nil { 1387 return nil, fmt.Errorf("Invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1388 } 1389 } 1390 1391 opts := []SourcegraphClientOption{ 1392 WithBatchSize(batchSize), 1393 WithShouldUseGRPC(conf.useGRPC), 1394 } 1395 1396 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1397 client, err := dialGRPCClient(rootURL.Host, logger) 1398 if err != nil { 1399 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1400 } 1401 1402 opts = append(opts, WithGRPCClient(client)) 1403 sg = newSourcegraphClient(rootURL, conf.hostname, opts...) 1404 1405 } else { 1406 sg = sourcegraphFake{ 1407 RootDir: rootURL.String(), 1408 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1409 } 1410 } 1411 1412 if conf.indexConcurrency < 1 { 1413 conf.indexConcurrency = 1 1414 } 1415 1416 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1417 if cpuCount < 1 { 1418 cpuCount = 1 1419 } 1420 1421 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1422 1423 return &Server{ 1424 logger: logger, 1425 Sourcegraph: sg, 1426 IndexDir: conf.index, 1427 IndexConcurrency: int(conf.indexConcurrency), 1428 Interval: conf.interval, 1429 CPUCount: cpuCount, 1430 queue: *q, 1431 shardMerging: zoekt.ShardMergingEnabled(), 1432 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1433 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1434 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1435 hostname: conf.hostname, 1436 mergeOpts: mergeOpts{ 1437 vacuumInterval: conf.vacuumInterval, 1438 mergeInterval: conf.mergeInterval, 1439 targetSizeBytes: conf.targetSize * 1024 * 1024, 1440 minSizeBytes: conf.minSize * 1024 * 1024, 1441 minAgeDays: conf.minAgeDays, 1442 maxPriority: conf.maxPriority, 1443 }, 1444 timeout: indexingTimeout, 1445 }, err 1446} 1447 1448// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1449// for the indexed-search-configuration gRPC service. 1450// 1451// The default backoff strategy is modeled after the default settings used by 1452// retryablehttp.DefaultClient. 1453// 1454// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1455// - Unavailable 1456// - Aborted 1457// 1458//go:embed default_grpc_service_configuration.json 1459var defaultGRPCServiceConfigurationJSON string 1460 1461func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1462 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1463 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1464 return invoker(ctx, method, req, reply, cc, opts...) 1465 } 1466} 1467 1468func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1469 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1470 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1471 return streamer(ctx, desc, cc, method, opts...) 1472 } 1473} 1474 1475// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1476// This can be overridden by providing custom Server/Dial options. 1477const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1478 1479func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) { 1480 metrics := mustGetClientMetrics() 1481 1482 // If the service seems to be unavailable, this 1483 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1484 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1485 retryOpts := []retry.CallOption{ 1486 retry.WithMax(5), 1487 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1488 retry.WithCodes(codes.Unavailable), 1489 } 1490 1491 opts := []grpc.DialOption{ 1492 grpc.WithTransportCredentials(insecure.NewCredentials()), 1493 grpc.WithChainStreamInterceptor( 1494 metrics.StreamClientInterceptor(), 1495 messagesize.StreamClientInterceptor, 1496 internalActorStreamInterceptor(), 1497 internalerrs.LoggingStreamClientInterceptor(logger), 1498 internalerrs.PrometheusStreamClientInterceptor, 1499 retry.StreamClientInterceptor(retryOpts...), 1500 ), 1501 grpc.WithChainUnaryInterceptor( 1502 metrics.UnaryClientInterceptor(), 1503 messagesize.UnaryClientInterceptor, 1504 internalActorUnaryInterceptor(), 1505 internalerrs.LoggingUnaryClientInterceptor(logger), 1506 internalerrs.PrometheusUnaryClientInterceptor, 1507 retry.UnaryClientInterceptor(retryOpts...), 1508 ), 1509 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1510 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1511 } 1512 1513 opts = append(opts, additionalOpts...) 1514 1515 // Ensure that the message size options are set last, so they override any other 1516 // client-specific options that tweak the message size. 1517 // 1518 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1519 // take precedence over everything else with a uniform size setting that's easy to reason about. 1520 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1521 1522 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1523 // This is done lazily, so we can provide the client to use regardless of 1524 // whether we enabled gRPC or not initially. 1525 cc, err := grpc.Dial(addr, opts...) 1526 if err != nil { 1527 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1528 } 1529 1530 client := proto.NewZoektConfigurationServiceClient(cc) 1531 return client, nil 1532} 1533 1534// mustGetClientMetrics returns a singleton instance of the client metrics 1535// that are shared across all gRPC clients that this process creates. 1536// 1537// This function panics if the metrics cannot be registered with the default 1538// Prometheus registry. 1539func mustGetClientMetrics() *grpcprom.ClientMetrics { 1540 clientMetricsOnce.Do(func() { 1541 clientMetrics = grpcprom.NewClientMetrics( 1542 grpcprom.WithClientCounterOptions(), 1543 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 1544 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 1545 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 1546 ) 1547 1548 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 1549 }) 1550 1551 return clientMetrics 1552} 1553 1554// addDefaultPort adds a default port to a URL if one is not specified. 1555// 1556// If the URL scheme is "http" and no port is specified, "80" is used. 1557// If the scheme is "https", "443" is used. 1558// 1559// The original URL is not mutated. A copy is modified and returned. 1560func addDefaultPort(original *url.URL) *url.URL { 1561 if original == nil { 1562 return nil // don't panic 1563 } 1564 1565 if !original.IsAbs() { 1566 return original // don't do anything if the URL doesn't look like a remote URL 1567 } 1568 1569 if original.Scheme == "http" && original.Port() == "" { 1570 u := cloneURL(original) 1571 u.Host = net.JoinHostPort(u.Host, "80") 1572 return u 1573 } 1574 1575 if original.Scheme == "https" && original.Port() == "" { 1576 u := cloneURL(original) 1577 u.Host = net.JoinHostPort(u.Host, "443") 1578 return u 1579 } 1580 1581 return original 1582} 1583 1584// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1585// This is copied from net/http/clone.go 1586func cloneURL(u *url.URL) *url.URL { 1587 if u == nil { 1588 return nil 1589 } 1590 u2 := new(url.URL) 1591 *u2 = *u 1592 if u.User != nil { 1593 u2.User = new(url.Userinfo) 1594 *u2.User = *u.User 1595 } 1596 return u2 1597} 1598 1599func main() { 1600 liblog := sglog.Init(sglog.Resource{ 1601 Name: "zoekt-indexserver", 1602 Version: zoekt.Version, 1603 InstanceID: zoekt.HostnameBestEffort(), 1604 }) 1605 defer liblog.Sync() 1606 1607 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1608 log.Fatal(err) 1609 } 1610} 1611 1612// mustGetBoolFromEnvironmentVariables is like getBoolFromEnvironmentVariables, but it panics 1613// if any of the provided environment variables fails to parse as a boolean. 1614func mustGetBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) bool { 1615 value, err := getBoolFromEnvironmentVariables(envVarNames, defaultBool) 1616 if err != nil { 1617 panic(err) 1618 } 1619 1620 return value 1621} 1622 1623// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1624// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1625// 1626// An error is returned of the provided environment variables fails to parse as a boolean. 1627func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1628 for _, envVar := range envVarNames { 1629 v := os.Getenv(envVar) 1630 if v == "" { 1631 continue 1632 } 1633 1634 b, err := strconv.ParseBool(v) 1635 if err != nil { 1636 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1637 } 1638 1639 return b, nil 1640 } 1641 1642 return defaultBool, nil 1643}