fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled 2// repositories on sourcegraph 3package main 4 5import ( 6 "bytes" 7 "context" 8 _ "embed" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "html/template" 14 "io" 15 "io/fs" 16 "log" 17 "math" 18 "math/rand" 19 "net" 20 "net/http" 21 "net/url" 22 "os" 23 "os/exec" 24 "os/signal" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strconv" 29 "strings" 30 "sync" 31 "text/tabwriter" 32 "time" 33 34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 36 "github.com/peterbourgon/ff/v3/ffcli" 37 "github.com/prometheus/client_golang/prometheus" 38 "github.com/prometheus/client_golang/prometheus/promauto" 39 sglog "github.com/sourcegraph/log" 40 "github.com/sourcegraph/mountinfo" 41 "go.uber.org/automaxprocs/maxprocs" 42 "golang.org/x/net/trace" 43 "golang.org/x/sys/unix" 44 "google.golang.org/grpc" 45 "google.golang.org/grpc/codes" 46 "google.golang.org/grpc/credentials/insecure" 47 "google.golang.org/grpc/metadata" 48 49 "github.com/sourcegraph/zoekt" 50 "github.com/sourcegraph/zoekt/build" 51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 52 "github.com/sourcegraph/zoekt/debugserver" 53 "github.com/sourcegraph/zoekt/grpc/internalerrs" 54 "github.com/sourcegraph/zoekt/grpc/messagesize" 55 "github.com/sourcegraph/zoekt/internal/profiler" 56) 57 58var ( 59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 60 Name: "resolve_revisions_seconds", 61 Help: "A histogram of latencies for resolving all repository revisions.", 62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 63 }) 64 65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 66 Name: "resolve_revision_seconds", 67 Help: "A histogram of latencies for resolving a repository revision.", 68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 69 }, []string{"success"}) // success=true|false 70 71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 72 Name: "get_index_options_total", 73 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 74 }) 75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 76 Name: "get_index_options_error_total", 77 Help: "The total number of times we failed to get index options for a repository.", 78 }) 79 80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 81 Name: "index_repo_seconds", 82 Help: "A histogram of latencies for indexing a repository.", 83 Buckets: prometheus.ExponentialBucketsRange( 84 (100 * time.Millisecond).Seconds(), 85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 86 20), 87 }, []string{ 88 "state", // state is an indexState 89 "name", // name of the repository that was indexed 90 }) 91 92 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 93 Name: "index_fetch_seconds", 94 Help: "A histogram of latencies for fetching a repository.", 95 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 96 }, []string{ 97 "success", // true|false 98 "name", // the name of the repository that the commits were fetched from 99 }) 100 101 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 102 Name: "index_incremental_index_state", 103 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 104 }, []string{"state"}) // state is build.IndexState 105 106 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 107 Name: "index_num_indexed", 108 Help: "Number of indexed repos by code host", 109 }) 110 111 metricNumAssigned = promauto.NewGauge(prometheus.GaugeOpts{ 112 Name: "index_num_assigned", 113 Help: "Number of repos assigned to this indexer by code host", 114 }) 115 116 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 117 Name: "index_failing_total", 118 Help: "Counts failures to index (indexing activity, should be used with rate())", 119 }) 120 121 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 122 Name: "index_indexing_total", 123 Help: "Counts indexings (indexing activity, should be used with rate())", 124 }) 125 126 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 127 Name: "index_num_stopped_tracking_total", 128 Help: "Counts the number of repos we stopped tracking.", 129 }) 130 131 clientMetricsOnce sync.Once 132 clientMetrics *grpcprom.ClientMetrics 133) 134 135// set of repositories that we want to capture separate indexing metrics for 136var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 137 138type indexState string 139 140const ( 141 indexStateFail indexState = "fail" 142 indexStateSuccess indexState = "success" 143 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 144 indexStateNoop indexState = "noop" // We didn't need to update index 145 indexStateEmpty indexState = "empty" // index is empty (empty repo) 146) 147 148// Server is the main functionality of zoekt-sourcegraph-indexserver. It 149// exists to conveniently use all the options passed in via func main. 150type Server struct { 151 logger sglog.Logger 152 153 Sourcegraph Sourcegraph 154 BatchSize int 155 156 // IndexDir is the index directory to use. 157 IndexDir string 158 159 // IndexConcurrency is the number of repositories we index at once. 160 IndexConcurrency int 161 162 // Interval is how often we sync with Sourcegraph. 163 Interval time.Duration 164 // CPUCount is the amount of parallelism to use when indexing a 165 // repository. 166 CPUCount int 167 168 queue Queue 169 170 // muIndexDir protects the index directory from concurrent access. 171 muIndexDir indexMutex 172 173 // If true, shard merging is enabled. 174 shardMerging bool 175 176 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 177 // use delta-builds for instead of normal builds 178 deltaBuildRepositoriesAllowList map[string]struct{} 179 180 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 181 // before attempting a delta build. 182 deltaShardNumberFallbackThreshold uint64 183 184 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 185 // we skip calculating symbols metadata for during builds 186 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 187 188 hostname string 189 190 mergeOpts mergeOpts 191 192 // timeout defines how long the index server waits before killing an indexing job. 193 timeout time.Duration 194} 195 196var debug = log.New(io.Discard, "", log.LstdFlags) 197 198// our index commands should output something every 100mb they process. 199// 200// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 201// time." famous last words. A client was indexing a monorepo with 42 202// cores... 5m was not enough. 203const noOutputTimeout = 30 * time.Minute 204 205func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 206 out := &synchronizedBuffer{} 207 cmd.Stdout = out 208 cmd.Stderr = out 209 210 tr.LazyPrintf("%s", cmd.Args) 211 212 defer func() { 213 if err != nil { 214 outS := out.String() 215 tr.LazyPrintf("failed: %v", err) 216 tr.LazyPrintf("output: %s", out) 217 tr.SetError() 218 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 219 } 220 }() 221 222 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 223 224 if err := cmd.Start(); err != nil { 225 return err 226 } 227 228 errC := make(chan error) 229 go func() { 230 errC <- cmd.Wait() 231 }() 232 233 // This channel is set after we have sent sigquit. It allows us to follow up 234 // with a sigkill if the process doesn't quit after sigquit. 235 kill := make(<-chan time.Time) 236 237 lastLen := 0 238 for { 239 select { 240 case <-time.After(noOutputTimeout): 241 // Periodically check if we have had output. If not kill the process. 242 if out.Len() != lastLen { 243 lastLen = out.Len() 244 log.Printf("still running %s", cmd.Args) 245 } else { 246 // Send quit (C-\) first so we get a stack dump. 247 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 248 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 249 log.Println("quit failed:", err) 250 } 251 252 // send sigkill if still running in 10s 253 kill = time.After(10 * time.Second) 254 } 255 256 case <-kill: 257 log.Printf("still running, killing %s", cmd.Args) 258 if err := cmd.Process.Kill(); err != nil { 259 log.Println("kill failed:", err) 260 } 261 262 case err := <-errC: 263 if err != nil { 264 return err 265 } 266 267 tr.LazyPrintf("success") 268 return nil 269 } 270 } 271} 272 273// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 274// monitor the buffer while it is being written to. 275type synchronizedBuffer struct { 276 mu sync.Mutex 277 b bytes.Buffer 278} 279 280func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 281 sb.mu.Lock() 282 defer sb.mu.Unlock() 283 return sb.b.Write(p) 284} 285 286func (sb *synchronizedBuffer) Len() int { 287 sb.mu.Lock() 288 defer sb.mu.Unlock() 289 return sb.b.Len() 290} 291 292func (sb *synchronizedBuffer) String() string { 293 sb.mu.Lock() 294 defer sb.mu.Unlock() 295 return sb.b.String() 296} 297 298// pauseFileName if present in IndexDir will stop index jobs from 299// running. This is to make it possible to experiment with the content of the 300// IndexDir without the indexserver writing to it. 301const pauseFileName = "PAUSE" 302 303// Run the sync loop. This blocks forever. 304func (s *Server) Run() { 305 removeIncompleteShards(s.IndexDir) 306 307 // Start a goroutine which updates the queue with commits to index. 308 go func() { 309 // We update the list of indexed repos every Interval. To speed up manual 310 // testing we also listen for SIGUSR1 to trigger updates. 311 // 312 // "pkill -SIGUSR1 zoekt-sourcegra" 313 for range jitterTicker(s.Interval, unix.SIGUSR1) { 314 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 315 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 316 continue 317 } 318 319 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 320 if err != nil { 321 log.Printf("error listing repos: %s", err) 322 continue 323 } 324 325 debug.Printf("updating index queue with %d repositories", len(repos.IDs)) 326 327 // Stop indexing repos we don't need to track anymore 328 removed := s.queue.MaybeRemoveMissing(repos.IDs) 329 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 330 if len(removed) > 0 { 331 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 332 } 333 334 cleanupDone := make(chan struct{}) 335 go func() { 336 defer close(cleanupDone) 337 s.muIndexDir.Global(func() { 338 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 339 }) 340 }() 341 342 repos.IterateIndexOptions(s.queue.AddOrUpdate) 343 344 // IterateIndexOptions will only iterate over repositories that have 345 // changed since we last called list. However, we want to add all IDs 346 // back onto the queue just to check that what is on disk is still 347 // correct. This will use the last IndexOptions we stored in the 348 // queue. The repositories not on the queue (missing) need a forced 349 // fetch of IndexOptions. 350 missing := s.queue.Bump(repos.IDs) 351 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 352 353 setCompoundShardCounter(s.IndexDir) 354 355 <-cleanupDone 356 } 357 }() 358 359 go func() { 360 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 361 if s.shardMerging { 362 s.vacuum() 363 } 364 } 365 }() 366 367 go func() { 368 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 369 if s.shardMerging { 370 s.doMerge() 371 } 372 } 373 }() 374 375 for i := 0; i < s.IndexConcurrency; i++ { 376 go s.processQueue() 377 } 378 379 // block forever 380 select {} 381} 382 383// formatList returns a comma-separated list of the first min(len(v), m) items. 384func formatListUint32(v []uint32, m int) string { 385 if len(v) < m { 386 m = len(v) 387 } 388 389 sb := strings.Builder{} 390 for i := 0; i < m; i++ { 391 fmt.Fprintf(&sb, "%d, ", v[i]) 392 } 393 394 if len(v) > m { 395 sb.WriteString("...") 396 } 397 398 return strings.TrimRight(sb.String(), ", ") 399} 400 401func (s *Server) processQueue() { 402 for { 403 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 404 time.Sleep(time.Second) 405 continue 406 } 407 408 opts, ok := s.queue.Pop() 409 if !ok { 410 time.Sleep(time.Second) 411 continue 412 } 413 414 args := s.indexArgs(opts) 415 416 ran := s.muIndexDir.With(opts.Name, func() { 417 // only record time taken once we hold the lock. This avoids us 418 // recording time taken while merging/cleanup runs. 419 start := time.Now() 420 421 state, err := s.Index(args) 422 423 elapsed := time.Since(start) 424 425 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 426 427 if err != nil { 428 log.Printf("error indexing %s: %s", args.String(), err) 429 } 430 431 switch state { 432 case indexStateSuccess: 433 var branches []string 434 for _, b := range args.Branches { 435 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 436 } 437 s.logger.Info("updated index", 438 sglog.String("repo", args.Name), 439 sglog.Uint32("id", args.RepoID), 440 sglog.Strings("branches", branches), 441 sglog.Duration("duration", elapsed), 442 ) 443 case indexStateSuccessMeta: 444 log.Printf("updated meta %s in %v", args.String(), elapsed) 445 } 446 s.queue.SetIndexed(opts, state) 447 }) 448 449 if !ran { 450 // Someone else is processing the repository. We can just skip this job 451 // since the repository will be added back to the queue and we will 452 // converge to the correct behaviour. 453 debug.Printf("index job for repository already running: %s", args) 454 continue 455 } 456 } 457} 458 459// repoNameForMetric returns a normalized version of the given repository name that is 460// suitable for use with Prometheus metrics. 461func repoNameForMetric(repo string) string { 462 // Check to see if we want to be able to capture separate indexing metrics for this repository. 463 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 464 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 465 return repo 466 } 467 468 return "" 469} 470 471func batched(slice []uint32, size int) <-chan []uint32 { 472 c := make(chan []uint32) 473 go func() { 474 for len(slice) > 0 { 475 if size > len(slice) { 476 size = len(slice) 477 } 478 c <- slice[:size] 479 slice = slice[size:] 480 } 481 close(c) 482 }() 483 return c 484} 485 486// jitterTicker returns a ticker which ticks with a jitter. Each tick is 487// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 488// 489// sig is a list of signals which also cause the ticker to fire. This is a 490// convenience to allow manually triggering of the ticker. 491func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 492 ticker := make(chan struct{}) 493 494 go func() { 495 for { 496 ticker <- struct{}{} 497 ns := int64(d) 498 jitter := rand.Int63n(ns) 499 time.Sleep(time.Duration(ns/2 + jitter)) 500 } 501 }() 502 503 go func() { 504 if len(sig) == 0 { 505 return 506 } 507 508 c := make(chan os.Signal, 1) 509 signal.Notify(c, sig...) 510 for range c { 511 ticker <- struct{}{} 512 } 513 }() 514 515 return ticker 516} 517 518// Index starts an index job for repo name at commit. 519func (s *Server) Index(args *indexArgs) (state indexState, err error) { 520 tr := trace.New("index", args.Name) 521 522 defer func() { 523 if err != nil { 524 tr.SetError() 525 tr.LazyPrintf("error: %v", err) 526 state = indexStateFail 527 metricFailingTotal.Inc() 528 } 529 tr.LazyPrintf("state: %s", state) 530 tr.Finish() 531 }() 532 533 tr.LazyPrintf("branches: %v", args.Branches) 534 535 if len(args.Branches) == 0 { 536 return indexStateEmpty, createEmptyShard(args) 537 } 538 539 repositoryName := args.Name 540 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 541 tr.LazyPrintf("marking this repository for delta build") 542 args.UseDelta = true 543 } 544 545 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 546 547 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 548 tr.LazyPrintf("skipping symbols calculation") 549 args.Symbols = false 550 } 551 552 reason := "forced" 553 554 if args.Incremental { 555 bo := args.BuildOptions() 556 bo.SetDefaults() 557 incrementalState, fn := bo.IndexState() 558 reason = string(incrementalState) 559 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 560 561 switch incrementalState { 562 case build.IndexStateEqual: 563 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn) 564 return indexStateNoop, nil 565 566 case build.IndexStateMeta: 567 log.Printf("updating index.meta %s", args.String()) 568 569 if err := mergeMeta(bo); err != nil { 570 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 571 } else { 572 return indexStateSuccessMeta, nil 573 } 574 575 case build.IndexStateCorrupt: 576 log.Printf("falling back to full update: corrupt index: %s", args.String()) 577 } 578 } 579 580 log.Printf("updating index %s reason=%s", args.String(), reason) 581 582 metricIndexingTotal.Inc() 583 c := gitIndexConfig{ 584 runCmd: func(cmd *exec.Cmd) error { 585 return s.loggedRun(tr, cmd) 586 }, 587 588 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 589 return args.BuildOptions().FindRepositoryMetadata() 590 }, 591 timeout: s.timeout, 592 } 593 594 err = gitIndex(c, args, s.Sourcegraph, s.logger) 595 if err != nil { 596 return indexStateFail, err 597 } 598 599 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 600 s.logger.Error("failed to update index status", 601 sglog.String("repo", args.Name), 602 sglog.Uint32("id", args.RepoID), 603 sglogBranches("branches", args.Branches), 604 sglog.Error(err), 605 ) 606 } 607 608 return indexStateSuccess, nil 609} 610 611// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 612// it can update the zoekt_repos table. 613func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 614 // We need to read from disk for IndexTime. 615 _, metadata, ok, err := c.findRepositoryMetadata(args) 616 if err != nil { 617 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 618 } 619 if !ok { 620 return errors.New("failed to find metadata for new/updated index") 621 } 622 623 status := []indexStatus{{ 624 RepoID: args.RepoID, 625 Branches: args.Branches, 626 IndexTimeUnix: metadata.IndexTime.Unix(), 627 }} 628 if err := sg.UpdateIndexStatus(status); err != nil { 629 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 630 } 631 632 return nil 633} 634 635func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 636 ss := make([]string, len(branches)) 637 for i, b := range branches { 638 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 639 } 640 return sglog.Strings(key, ss) 641} 642 643func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 644 parallelism := math.Ceil(float64(s.CPUCount) / float64(s.IndexConcurrency)) 645 return &indexArgs{ 646 IndexOptions: opts, 647 648 IndexDir: s.IndexDir, 649 Parallelism: int(parallelism), 650 651 Incremental: true, 652 653 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 654 FileLimit: 1 << 20, 655 } 656} 657 658func createEmptyShard(args *indexArgs) error { 659 bo := args.BuildOptions() 660 bo.SetDefaults() 661 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 662 663 if args.Incremental && bo.IncrementalSkipIndexing() { 664 return nil 665 } 666 667 builder, err := build.NewBuilder(*bo) 668 if err != nil { 669 return err 670 } 671 return builder.Finish() 672} 673 674// addDebugHandlers adds handlers specific to indexserver. 675func (s *Server) addDebugHandlers(mux *http.ServeMux) { 676 // Sourcegraph's site admin view requires indexserver to serve it's admin view 677 // on "/". 678 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 679 680 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 681 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 682 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 683 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 684 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 685 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 686} 687 688func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 689 if r.Method != "GET" { 690 w.Header().Set("Allow", "GET") 691 w.WriteHeader(http.StatusMethodNotAllowed) 692 return 693 } 694 695 response := struct { 696 Hostname string 697 }{ 698 Hostname: s.hostname, 699 } 700 701 b, err := json.Marshal(response) 702 if err != nil { 703 http.Error(w, err.Error(), http.StatusInternalServerError) 704 return 705 } 706 707 w.Header().Set("Content-Type", "application/json; charset=utf-8") 708 w.Write(b) 709} 710 711var rootTmpl = template.Must(template.New("name").Parse(` 712<html> 713 <body> 714 <a href="debug">Debug</a><br /> 715 <a href="debug/requests">Traces</a><br /> 716 {{.IndexMsg}}<br /> 717 <br /> 718 <h3>Reindex</h3> 719 {{if .Repos}} 720 <a href="?show_repos=false">hide repos</a><br /> 721 <table style="margin-top: 20px"> 722 <th style="text-align:left">Name</th> 723 <th style="text-align:left">ID</th> 724 {{range .Repos}} 725 <tr> 726 <td>{{.Name}}</td> 727 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 728 </tr> 729 {{end}} 730 </table> 731 {{else}} 732 <a href="?show_repos=true">show repos</a><br /> 733 {{end}} 734 </body> 735</html> 736`)) 737 738func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 739 if r.Method != "GET" { 740 w.Header().Set("Allow", "GET") 741 w.WriteHeader(http.StatusMethodNotAllowed) 742 return 743 } 744 745 values := r.URL.Query() 746 747 // ?id= 748 indexMsg := "" 749 if v := values.Get("id"); v != "" { 750 id, err := strconv.Atoi(v) 751 if err != nil { 752 http.Error(w, err.Error(), http.StatusBadRequest) 753 return 754 } 755 indexMsg, _ = s.forceIndex(uint32(id)) 756 } 757 758 // ?show_repos= 759 showRepos := false 760 if v := values.Get("show_repos"); v != "" { 761 showRepos, _ = strconv.ParseBool(v) 762 } 763 764 type Repo struct { 765 ID uint32 766 Name string 767 } 768 var data struct { 769 Repos []Repo 770 IndexMsg string 771 } 772 773 data.IndexMsg = indexMsg 774 775 if showRepos { 776 s.queue.Iterate(func(opts *IndexOptions) { 777 data.Repos = append(data.Repos, Repo{ 778 ID: opts.RepoID, 779 Name: opts.Name, 780 }) 781 }) 782 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 783 } 784 785 _ = rootTmpl.Execute(w, data) 786} 787 788// handleReindex triggers a reindex asynocronously. If a reindex was triggered 789// the request returns with status 202. The caller can infer the new state of 790// the index by calling List. 791func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 792 if r.Method != http.MethodPost { 793 w.Header().Set("Allow", http.MethodPost) 794 w.WriteHeader(http.StatusMethodNotAllowed) 795 return 796 } 797 798 err := r.ParseForm() 799 if err != nil { 800 http.Error(w, err.Error(), http.StatusBadRequest) 801 return 802 } 803 804 id, err := strconv.Atoi(r.Form.Get("repo")) 805 if err != nil { 806 http.Error(w, err.Error(), http.StatusBadRequest) 807 return 808 } 809 810 go func() { s.forceIndex(uint32(id)) }() 811 812 // 202 Accepted 813 w.WriteHeader(http.StatusAccepted) 814} 815 816func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 817 withIndexed := true 818 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 819 withIndexed = b 820 } 821 822 var indexed []uint32 823 if withIndexed { 824 indexed = listIndexed(s.IndexDir) 825 } 826 827 repos, err := s.Sourcegraph.List(r.Context(), indexed) 828 if err != nil { 829 http.Error(w, err.Error(), http.StatusInternalServerError) 830 return 831 } 832 833 bw := bytes.Buffer{} 834 835 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 836 837 _, err = fmt.Fprintf(tw, "ID\tName\n") 838 if err != nil { 839 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 840 return 841 } 842 843 s.queue.mu.Lock() 844 name := "" 845 for _, id := range repos.IDs { 846 if item := s.queue.get(id); item != nil { 847 name = item.opts.Name 848 } else { 849 name = "" 850 } 851 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 852 if err != nil { 853 debug.Printf("handleDebugList: %s\n", err.Error()) 854 } 855 } 856 s.queue.mu.Unlock() 857 858 if err != nil { 859 http.Error(w, err.Error(), http.StatusInternalServerError) 860 return 861 } 862 863 err = tw.Flush() 864 if err != nil { 865 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 866 return 867 } 868 869 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 870 871 if _, err := io.Copy(w, &bw); err != nil { 872 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 873 return 874 } 875} 876 877// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 878// can run this command during periods of low usage (evenings, weekends) to 879// trigger an initial merge run. In the steady-state, merges happen rarely, even 880// on busy instances, and users can rely on automatic merging instead. 881func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 882 883 // A merge operation can take very long, depending on the number merges and the 884 // target size of the compound shards. We run the merge in the background and 885 // return immediately to the user. 886 // 887 // We track the status of the merge with metricShardMergingRunning. 888 go func() { 889 s.doMerge() 890 }() 891 _, _ = w.Write([]byte("merging enqueued\n")) 892} 893 894func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 895 indexed := listIndexed(s.IndexDir) 896 897 bw := bytes.Buffer{} 898 899 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 900 901 _, err := fmt.Fprintf(tw, "ID\tName\n") 902 if err != nil { 903 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 904 return 905 } 906 907 s.queue.mu.Lock() 908 name := "" 909 for _, id := range indexed { 910 if item := s.queue.get(id); item != nil { 911 name = item.opts.Name 912 } else { 913 name = "" 914 } 915 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 916 if err != nil { 917 debug.Printf("handleDebugIndexed: %s\n", err.Error()) 918 } 919 } 920 s.queue.mu.Unlock() 921 922 if err != nil { 923 http.Error(w, err.Error(), http.StatusInternalServerError) 924 return 925 } 926 927 err = tw.Flush() 928 if err != nil { 929 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 930 return 931 } 932 933 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 934 935 if _, err := io.Copy(w, &bw); err != nil { 936 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 937 return 938 } 939} 940 941// forceIndex will run the index job for repo name now. It will return always 942// return a string explaining what it did, even if it failed. 943func (s *Server) forceIndex(id uint32) (string, error) { 944 var opts IndexOptions 945 var err error 946 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 947 opts = o 948 }, func(_ uint32, e error) { 949 err = e 950 }, id) 951 if err != nil { 952 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 953 } 954 955 args := s.indexArgs(opts) 956 args.Incremental = false // force re-index 957 958 var state indexState 959 ran := s.muIndexDir.With(opts.Name, func() { 960 state, err = s.Index(args) 961 }) 962 if !ran { 963 return fmt.Sprintf("index job for repository already running: %s", args), nil 964 } 965 if err != nil { 966 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 967 } 968 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 969} 970 971func listIndexed(indexDir string) []uint32 { 972 index := getShards(indexDir) 973 metricNumIndexed.Set(float64(len(index))) 974 repoIDs := make([]uint32, 0, len(index)) 975 for id := range index { 976 repoIDs = append(repoIDs, id) 977 } 978 sort.Slice(repoIDs, func(i, j int) bool { 979 return repoIDs[i] < repoIDs[j] 980 }) 981 return repoIDs 982} 983 984// setupTmpDir sets up a temporary directory on the same volume as the 985// indexes. 986// 987// If main is true we will delete older temp directories left around. main is 988// false when this is a debug command. 989func setupTmpDir(logger sglog.Logger, main bool, index string) error { 990 // change the target tmp directory depending on if it's our main daemon or a 991 // debug sub command. 992 dir := ".indexserver.debug.tmp" 993 if main { 994 dir = ".indexserver.tmp" 995 } 996 997 tmpRoot := filepath.Join(index, dir) 998 999 if main { 1000 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1001 err := os.RemoveAll(tmpRoot) 1002 if err != nil { 1003 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1004 } 1005 } 1006 1007 if err := os.MkdirAll(tmpRoot, 0755); err != nil { 1008 return err 1009 } 1010 1011 return os.Setenv("TMPDIR", tmpRoot) 1012} 1013 1014func printMetaData(fn string) error { 1015 repo, indexMeta, err := zoekt.ReadMetadataPath(fn) 1016 if err != nil { 1017 return err 1018 } 1019 1020 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1021 if err != nil { 1022 return err 1023 } 1024 1025 err = json.NewEncoder(os.Stdout).Encode(repo) 1026 if err != nil { 1027 return err 1028 } 1029 return nil 1030} 1031 1032func printShardStats(fn string) error { 1033 f, err := os.Open(fn) 1034 if err != nil { 1035 return err 1036 } 1037 1038 iFile, err := zoekt.NewIndexFile(f) 1039 if err != nil { 1040 return err 1041 } 1042 1043 return zoekt.PrintNgramStats(iFile) 1044} 1045 1046func srcLogLevelIsDebug() bool { 1047 lvl := os.Getenv(sglog.EnvLogLevel) 1048 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1049} 1050 1051func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1052 v := os.Getenv(k) 1053 if v == "" { 1054 return defaultVal 1055 } 1056 i, err := strconv.ParseInt(v, 10, 64) 1057 if err != nil { 1058 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1059 } 1060 return i 1061} 1062 1063func getEnvWithDefaultInt(k string, defaultVal int) int { 1064 v := os.Getenv(k) 1065 if v == "" { 1066 return defaultVal 1067 } 1068 i, err := strconv.Atoi(k) 1069 if err != nil { 1070 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1071 } 1072 return i 1073} 1074 1075func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1076 v := os.Getenv(k) 1077 if v == "" { 1078 return defaultVal 1079 } 1080 i, err := strconv.ParseUint(v, 10, 64) 1081 if err != nil { 1082 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1083 } 1084 return i 1085} 1086 1087func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1088 v := os.Getenv(k) 1089 if v == "" { 1090 return defaultVal 1091 } 1092 f, err := strconv.ParseFloat(v, 64) 1093 if err != nil { 1094 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1095 } 1096 return f 1097} 1098 1099func getEnvWithDefaultString(k string, defaultVal string) string { 1100 v := os.Getenv(k) 1101 if v == "" { 1102 return defaultVal 1103 } 1104 return v 1105} 1106 1107func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1108 v := os.Getenv(k) 1109 if v == "" { 1110 return defaultVal 1111 } 1112 1113 d, err := time.ParseDuration(v) 1114 if err != nil { 1115 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1116 } 1117 return d 1118} 1119 1120func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1121 v := os.Getenv(k) 1122 if v == "" { 1123 return defaultVal 1124 } 1125 1126 b, err := strconv.ParseBool(v) 1127 if err != nil { 1128 log.Fatalf("error parsing ENV %s to bool: %s", k, err) 1129 } 1130 return b 1131} 1132 1133func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1134 set := map[string]struct{}{} 1135 for _, v := range strings.Split(os.Getenv(k), ",") { 1136 v = strings.TrimSpace(v) 1137 if v != "" { 1138 set[v] = struct{}{} 1139 } 1140 } 1141 return set 1142} 1143 1144func joinStringSet(set map[string]struct{}, sep string) string { 1145 var xs []string 1146 for x := range set { 1147 xs = append(xs, x) 1148 } 1149 1150 return strings.Join(xs, sep) 1151} 1152 1153func setCompoundShardCounter(indexDir string) { 1154 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt")) 1155 if err != nil { 1156 log.Printf("setCompoundShardCounter: %s\n", err) 1157 return 1158 } 1159 metricNumberCompoundShards.Set(float64(len(fns))) 1160} 1161 1162func rootCmd() *ffcli.Command { 1163 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1164 conf := rootConfig{ 1165 Main: true, 1166 } 1167 conf.registerRootFlags(rootFs) 1168 1169 return &ffcli.Command{ 1170 FlagSet: rootFs, 1171 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1172 Subcommands: []*ffcli.Command{debugCmd()}, 1173 Exec: func(ctx context.Context, args []string) error { 1174 return startServer(conf) 1175 }, 1176 } 1177} 1178 1179type rootConfig struct { 1180 // Main is true if this rootConfig is for our main long running command (the 1181 // indexserver). Debug commands should not set this value. This is used to 1182 // determine if we need to run tmpfriend. 1183 Main bool 1184 1185 root string 1186 interval time.Duration 1187 index string 1188 indexConcurrency int64 1189 listen string 1190 hostname string 1191 cpuFraction float64 1192 blockProfileRate int 1193 1194 // config values related to shard merging 1195 vacuumInterval time.Duration 1196 mergeInterval time.Duration 1197 targetSize int64 1198 minSize int64 1199 minAgeDays int 1200 maxPriority float64 1201 1202 // config values related to backoff indexing repos with one or more consecutive failures 1203 backoffDuration time.Duration 1204 maxBackoffDuration time.Duration 1205 1206 // useGRPC is true if we should use the gRPC API to talk to Sourcegraph. 1207 useGRPC bool 1208} 1209 1210func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1211 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1212 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1213 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of concurrent index jobs to run.") 1214 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1215 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1216 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1217 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1218 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate") 1219 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1220 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1221 fs.BoolVar(&rc.useGRPC, "use_grpc", mustGetBoolFromEnvironmentVariables([]string{"GRPC_ENABLED", "SG_FEATURE_FLAG_GRPC"}, true), "use the gRPC API to talk to Sourcegraph") 1222 1223 // flags related to shard merging 1224 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1225 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1226 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 1227 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 1228 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1229 fs.Float64Var(&rc.maxPriority, "merge_max_priority", getEnvWithDefaultFloat64("SRC_MERGE_MAX_PRIORITY", 100), "the maximum priority a shard can have to be considered for merging.") 1230} 1231 1232func startServer(conf rootConfig) error { 1233 s, err := newServer(conf) 1234 if err != nil { 1235 return err 1236 } 1237 1238 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate) 1239 setCompoundShardCounter(s.IndexDir) 1240 1241 if conf.listen != "" { 1242 1243 mux := http.NewServeMux() 1244 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1245 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1246 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1247 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1248 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1249 }...) 1250 s.addDebugHandlers(mux) 1251 1252 go func() { 1253 debug.Printf("serving HTTP on %s", conf.listen) 1254 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1255 1256 }() 1257 1258 // Serve mux on a unix domain socket on a best-effort-basis so that 1259 // webserver can call the endpoints via the shared filesystem. 1260 // 1261 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1262 // on the socket due to permission errors. See 1263 // https://github.com/docker/for-mac/issues/6239 1264 go func() { 1265 serveHTTPOverSocket := func() error { 1266 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1267 // We cannot bind a socket to an existing pathname. 1268 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1269 return fmt.Errorf("error removing socket file: %s", socket) 1270 } 1271 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1272 // unixpacket). 1273 l, err := net.Listen("unix", socket) 1274 if err != nil { 1275 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1276 } 1277 // Indexserver (root) and webserver (Sourcegraph) run with 1278 // different users. Per default, the socket is created with 1279 // permission 755 (root root), which doesn't let webserver write to 1280 // it. 1281 // 1282 // See https://github.com/golang/go/issues/11822 for more context. 1283 if err := os.Chmod(socket, 0777); err != nil { 1284 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1285 } 1286 debug.Printf("serving HTTP on %s", socket) 1287 return http.Serve(l, mux) 1288 } 1289 debug.Print(serveHTTPOverSocket()) 1290 }() 1291 } 1292 1293 oc := &ownerChecker{ 1294 Path: filepath.Join(conf.index, "owner.txt"), 1295 Hostname: conf.hostname, 1296 } 1297 go oc.Run() 1298 1299 logger := sglog.Scoped("metricsRegistration") 1300 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1301 1302 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1303 prometheus.DefaultRegisterer.MustRegister(c) 1304 1305 s.Run() 1306 return nil 1307} 1308 1309func newServer(conf rootConfig) (*Server, error) { 1310 logger := sglog.Scoped("server") 1311 1312 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1313 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1314 } 1315 if conf.index == "" { 1316 return nil, fmt.Errorf("must set -index") 1317 } 1318 if conf.root == "" { 1319 return nil, fmt.Errorf("must set -sourcegraph_url") 1320 } 1321 rootURL, err := url.Parse(conf.root) 1322 if err != nil { 1323 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1324 } 1325 1326 rootURL = addDefaultPort(rootURL) 1327 1328 // Tune GOMAXPROCS to match Linux container CPU quota. 1329 _, _ = maxprocs.Set() 1330 1331 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler. 1332 // The block profiler is disabled by default and should be enabled with care in production 1333 runtime.SetBlockProfileRate(conf.blockProfileRate) 1334 1335 // Automatically prepend our own path at the front, to minimize 1336 // required configuration. 1337 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1338 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1339 } 1340 1341 if _, err := os.Stat(conf.index); err != nil { 1342 if err := os.MkdirAll(conf.index, 0755); err != nil { 1343 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1344 } 1345 } 1346 1347 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1348 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1349 } 1350 1351 if srcLogLevelIsDebug() { 1352 debug = log.New(os.Stderr, "", log.LstdFlags) 1353 } 1354 1355 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1356 if len(reposWithSeparateIndexingMetrics) > 0 { 1357 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1358 } 1359 1360 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1361 if len(deltaBuildRepositoriesAllowList) > 0 { 1362 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1363 } 1364 1365 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1366 if deltaShardNumberFallbackThreshold > 0 { 1367 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1368 } else { 1369 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1370 } 1371 1372 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1373 if len(reposShouldSkipSymbolsCalculation) > 0 { 1374 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1375 } 1376 1377 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1378 if indexingTimeout != defaultIndexingTimeout { 1379 debug.Printf("using configured indexing timeout: %s", indexingTimeout) 1380 } 1381 1382 var sg Sourcegraph 1383 if rootURL.IsAbs() { 1384 var batchSize int 1385 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1386 batchSize, err = strconv.Atoi(v) 1387 if err != nil { 1388 return nil, fmt.Errorf("Invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1389 } 1390 } 1391 1392 opts := []SourcegraphClientOption{ 1393 WithBatchSize(batchSize), 1394 WithShouldUseGRPC(conf.useGRPC), 1395 } 1396 1397 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1398 client, err := dialGRPCClient(rootURL.Host, logger) 1399 if err != nil { 1400 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1401 } 1402 1403 opts = append(opts, WithGRPCClient(client)) 1404 sg = newSourcegraphClient(rootURL, conf.hostname, opts...) 1405 1406 } else { 1407 sg = sourcegraphFake{ 1408 RootDir: rootURL.String(), 1409 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1410 } 1411 } 1412 1413 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1414 if cpuCount < 1 { 1415 cpuCount = 1 1416 } 1417 1418 if conf.indexConcurrency < 1 { 1419 conf.indexConcurrency = 1 1420 } else if conf.indexConcurrency > int64(cpuCount) { 1421 conf.indexConcurrency = int64(cpuCount) 1422 } 1423 1424 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1425 1426 return &Server{ 1427 logger: logger, 1428 Sourcegraph: sg, 1429 IndexDir: conf.index, 1430 IndexConcurrency: int(conf.indexConcurrency), 1431 Interval: conf.interval, 1432 CPUCount: cpuCount, 1433 queue: *q, 1434 shardMerging: zoekt.ShardMergingEnabled(), 1435 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1436 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1437 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1438 hostname: conf.hostname, 1439 mergeOpts: mergeOpts{ 1440 vacuumInterval: conf.vacuumInterval, 1441 mergeInterval: conf.mergeInterval, 1442 targetSizeBytes: conf.targetSize * 1024 * 1024, 1443 minSizeBytes: conf.minSize * 1024 * 1024, 1444 minAgeDays: conf.minAgeDays, 1445 maxPriority: conf.maxPriority, 1446 }, 1447 timeout: indexingTimeout, 1448 }, err 1449} 1450 1451// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1452// for the indexed-search-configuration gRPC service. 1453// 1454// The default backoff strategy is modeled after the default settings used by 1455// retryablehttp.DefaultClient. 1456// 1457// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1458// - Unavailable 1459// - Aborted 1460// 1461//go:embed default_grpc_service_configuration.json 1462var defaultGRPCServiceConfigurationJSON string 1463 1464func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1465 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1466 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1467 return invoker(ctx, method, req, reply, cc, opts...) 1468 } 1469} 1470 1471func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1472 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1473 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1474 return streamer(ctx, desc, cc, method, opts...) 1475 } 1476} 1477 1478// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1479// This can be overridden by providing custom Server/Dial options. 1480const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1481 1482func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) { 1483 metrics := mustGetClientMetrics() 1484 1485 // If the service seems to be unavailable, this 1486 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1487 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1488 retryOpts := []retry.CallOption{ 1489 retry.WithMax(5), 1490 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1491 retry.WithCodes(codes.Unavailable), 1492 } 1493 1494 opts := []grpc.DialOption{ 1495 grpc.WithTransportCredentials(insecure.NewCredentials()), 1496 grpc.WithChainStreamInterceptor( 1497 metrics.StreamClientInterceptor(), 1498 messagesize.StreamClientInterceptor, 1499 internalActorStreamInterceptor(), 1500 internalerrs.LoggingStreamClientInterceptor(logger), 1501 internalerrs.PrometheusStreamClientInterceptor, 1502 retry.StreamClientInterceptor(retryOpts...), 1503 ), 1504 grpc.WithChainUnaryInterceptor( 1505 metrics.UnaryClientInterceptor(), 1506 messagesize.UnaryClientInterceptor, 1507 internalActorUnaryInterceptor(), 1508 internalerrs.LoggingUnaryClientInterceptor(logger), 1509 internalerrs.PrometheusUnaryClientInterceptor, 1510 retry.UnaryClientInterceptor(retryOpts...), 1511 ), 1512 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1513 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1514 } 1515 1516 opts = append(opts, additionalOpts...) 1517 1518 // Ensure that the message size options are set last, so they override any other 1519 // client-specific options that tweak the message size. 1520 // 1521 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1522 // take precedence over everything else with a uniform size setting that's easy to reason about. 1523 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1524 1525 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1526 // This is done lazily, so we can provide the client to use regardless of 1527 // whether we enabled gRPC or not initially. 1528 cc, err := grpc.Dial(addr, opts...) 1529 if err != nil { 1530 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1531 } 1532 1533 client := proto.NewZoektConfigurationServiceClient(cc) 1534 return client, nil 1535} 1536 1537// mustGetClientMetrics returns a singleton instance of the client metrics 1538// that are shared across all gRPC clients that this process creates. 1539// 1540// This function panics if the metrics cannot be registered with the default 1541// Prometheus registry. 1542func mustGetClientMetrics() *grpcprom.ClientMetrics { 1543 clientMetricsOnce.Do(func() { 1544 clientMetrics = grpcprom.NewClientMetrics( 1545 grpcprom.WithClientCounterOptions(), 1546 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 1547 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 1548 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 1549 ) 1550 1551 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 1552 }) 1553 1554 return clientMetrics 1555} 1556 1557// addDefaultPort adds a default port to a URL if one is not specified. 1558// 1559// If the URL scheme is "http" and no port is specified, "80" is used. 1560// If the scheme is "https", "443" is used. 1561// 1562// The original URL is not mutated. A copy is modified and returned. 1563func addDefaultPort(original *url.URL) *url.URL { 1564 if original == nil { 1565 return nil // don't panic 1566 } 1567 1568 if !original.IsAbs() { 1569 return original // don't do anything if the URL doesn't look like a remote URL 1570 } 1571 1572 if original.Scheme == "http" && original.Port() == "" { 1573 u := cloneURL(original) 1574 u.Host = net.JoinHostPort(u.Host, "80") 1575 return u 1576 } 1577 1578 if original.Scheme == "https" && original.Port() == "" { 1579 u := cloneURL(original) 1580 u.Host = net.JoinHostPort(u.Host, "443") 1581 return u 1582 } 1583 1584 return original 1585} 1586 1587// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1588// This is copied from net/http/clone.go 1589func cloneURL(u *url.URL) *url.URL { 1590 if u == nil { 1591 return nil 1592 } 1593 u2 := new(url.URL) 1594 *u2 = *u 1595 if u.User != nil { 1596 u2.User = new(url.Userinfo) 1597 *u2.User = *u.User 1598 } 1599 return u2 1600} 1601 1602func main() { 1603 liblog := sglog.Init(sglog.Resource{ 1604 Name: "zoekt-indexserver", 1605 Version: zoekt.Version, 1606 InstanceID: zoekt.HostnameBestEffort(), 1607 }) 1608 defer liblog.Sync() 1609 1610 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1611 log.Fatal(err) 1612 } 1613} 1614 1615// mustGetBoolFromEnvironmentVariables is like getBoolFromEnvironmentVariables, but it panics 1616// if any of the provided environment variables fails to parse as a boolean. 1617func mustGetBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) bool { 1618 value, err := getBoolFromEnvironmentVariables(envVarNames, defaultBool) 1619 if err != nil { 1620 panic(err) 1621 } 1622 1623 return value 1624} 1625 1626// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1627// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1628// 1629// An error is returned of the provided environment variables fails to parse as a boolean. 1630func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1631 for _, envVar := range envVarNames { 1632 v := os.Getenv(envVar) 1633 if v == "" { 1634 continue 1635 } 1636 1637 b, err := strconv.ParseBool(v) 1638 if err != nil { 1639 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1640 } 1641 1642 return b, nil 1643 } 1644 1645 return defaultBool, nil 1646}