fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled 2// repositories on sourcegraph 3package main 4 5import ( 6 "bytes" 7 "context" 8 _ "embed" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "html/template" 14 "io" 15 "io/fs" 16 "log" 17 "math" 18 "math/rand" 19 "net" 20 "net/http" 21 "net/url" 22 "os" 23 "os/exec" 24 "os/signal" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strconv" 29 "strings" 30 "sync" 31 "text/tabwriter" 32 "time" 33 34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 36 "github.com/peterbourgon/ff/v3/ffcli" 37 "github.com/prometheus/client_golang/prometheus" 38 "github.com/prometheus/client_golang/prometheus/promauto" 39 sglog "github.com/sourcegraph/log" 40 "github.com/sourcegraph/mountinfo" 41 "go.uber.org/automaxprocs/maxprocs" 42 "golang.org/x/net/trace" 43 "golang.org/x/sys/unix" 44 "google.golang.org/grpc" 45 "google.golang.org/grpc/codes" 46 "google.golang.org/grpc/credentials/insecure" 47 "google.golang.org/grpc/metadata" 48 49 "github.com/sourcegraph/zoekt" 50 "github.com/sourcegraph/zoekt/build" 51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 52 "github.com/sourcegraph/zoekt/debugserver" 53 "github.com/sourcegraph/zoekt/grpc/internalerrs" 54 "github.com/sourcegraph/zoekt/grpc/messagesize" 55 "github.com/sourcegraph/zoekt/internal/profiler" 56) 57 58var ( 59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 60 Name: "resolve_revisions_seconds", 61 Help: "A histogram of latencies for resolving all repository revisions.", 62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 63 }) 64 65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 66 Name: "resolve_revision_seconds", 67 Help: "A histogram of latencies for resolving a repository revision.", 68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 69 }, []string{"success"}) // success=true|false 70 71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 72 Name: "get_index_options_total", 73 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 74 }) 75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 76 Name: "get_index_options_error_total", 77 Help: "The total number of times we failed to get index options for a repository.", 78 }) 79 80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 81 Name: "index_repo_seconds", 82 Help: "A histogram of latencies for indexing a repository.", 83 Buckets: prometheus.ExponentialBucketsRange( 84 (100 * time.Millisecond).Seconds(), 85 (40*time.Minute + indexTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 86 20), 87 }, []string{ 88 "state", // state is an indexState 89 "name", // name of the repository that was indexed 90 }) 91 92 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 93 Name: "index_fetch_seconds", 94 Help: "A histogram of latencies for fetching a repository.", 95 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 96 }, []string{ 97 "success", // true|false 98 "name", // the name of the repository that the commits were fetched from 99 }) 100 101 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 102 Name: "index_incremental_index_state", 103 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 104 }, []string{"state"}) // state is build.IndexState 105 106 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 107 Name: "index_num_indexed", 108 Help: "Number of indexed repos by code host", 109 }) 110 111 metricNumAssigned = promauto.NewGauge(prometheus.GaugeOpts{ 112 Name: "index_num_assigned", 113 Help: "Number of repos assigned to this indexer by code host", 114 }) 115 116 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 117 Name: "index_failing_total", 118 Help: "Counts failures to index (indexing activity, should be used with rate())", 119 }) 120 121 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 122 Name: "index_indexing_total", 123 Help: "Counts indexings (indexing activity, should be used with rate())", 124 }) 125 126 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 127 Name: "index_num_stopped_tracking_total", 128 Help: "Counts the number of repos we stopped tracking.", 129 }) 130 131 clientMetricsOnce sync.Once 132 clientMetrics *grpcprom.ClientMetrics 133) 134 135// set of repositories that we want to capture separate indexing metrics for 136var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 137 138type indexState string 139 140const ( 141 indexStateFail indexState = "fail" 142 indexStateSuccess indexState = "success" 143 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 144 indexStateNoop indexState = "noop" // We didn't need to update index 145 indexStateEmpty indexState = "empty" // index is empty (empty repo) 146) 147 148// Server is the main functionality of zoekt-sourcegraph-indexserver. It 149// exists to conveniently use all the options passed in via func main. 150type Server struct { 151 logger sglog.Logger 152 153 Sourcegraph Sourcegraph 154 BatchSize int 155 156 // IndexDir is the index directory to use. 157 IndexDir string 158 159 // IndexConcurrency is the number of repositories we index at once. 160 IndexConcurrency int 161 162 // Interval is how often we sync with Sourcegraph. 163 Interval time.Duration 164 // CPUCount is the amount of parallelism to use when indexing a 165 // repository. 166 CPUCount int 167 168 queue Queue 169 170 // muIndexDir protects the index directory from concurrent access. 171 muIndexDir indexMutex 172 173 // If true, shard merging is enabled. 174 shardMerging bool 175 176 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 177 // use delta-builds for instead of normal builds 178 deltaBuildRepositoriesAllowList map[string]struct{} 179 180 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 181 // before attempting a delta build. 182 deltaShardNumberFallbackThreshold uint64 183 184 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 185 // we skip calculating symbols metadata for during builds 186 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 187 188 hostname string 189 190 mergeOpts mergeOpts 191} 192 193var debug = log.New(io.Discard, "", log.LstdFlags) 194 195// our index commands should output something every 100mb they process. 196// 197// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 198// time." famous last words. A client was indexing a monorepo with 42 199// cores... 5m was not enough. 200const noOutputTimeout = 30 * time.Minute 201 202func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 203 out := &synchronizedBuffer{} 204 cmd.Stdout = out 205 cmd.Stderr = out 206 207 tr.LazyPrintf("%s", cmd.Args) 208 209 defer func() { 210 if err != nil { 211 outS := out.String() 212 tr.LazyPrintf("failed: %v", err) 213 tr.LazyPrintf("output: %s", out) 214 tr.SetError() 215 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 216 } 217 }() 218 219 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 220 221 if err := cmd.Start(); err != nil { 222 return err 223 } 224 225 errC := make(chan error) 226 go func() { 227 errC <- cmd.Wait() 228 }() 229 230 // This channel is set after we have sent sigquit. It allows us to follow up 231 // with a sigkill if the process doesn't quit after sigquit. 232 kill := make(<-chan time.Time) 233 234 lastLen := 0 235 for { 236 select { 237 case <-time.After(noOutputTimeout): 238 // Periodically check if we have had output. If not kill the process. 239 if out.Len() != lastLen { 240 lastLen = out.Len() 241 log.Printf("still running %s", cmd.Args) 242 } else { 243 // Send quit (C-\) first so we get a stack dump. 244 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 245 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 246 log.Println("quit failed:", err) 247 } 248 249 // send sigkill if still running in 10s 250 kill = time.After(10 * time.Second) 251 } 252 253 case <-kill: 254 log.Printf("still running, killing %s", cmd.Args) 255 if err := cmd.Process.Kill(); err != nil { 256 log.Println("kill failed:", err) 257 } 258 259 case err := <-errC: 260 if err != nil { 261 return err 262 } 263 264 tr.LazyPrintf("success") 265 return nil 266 } 267 } 268} 269 270// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 271// monitor the buffer while it is being written to. 272type synchronizedBuffer struct { 273 mu sync.Mutex 274 b bytes.Buffer 275} 276 277func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 278 sb.mu.Lock() 279 defer sb.mu.Unlock() 280 return sb.b.Write(p) 281} 282 283func (sb *synchronizedBuffer) Len() int { 284 sb.mu.Lock() 285 defer sb.mu.Unlock() 286 return sb.b.Len() 287} 288 289func (sb *synchronizedBuffer) String() string { 290 sb.mu.Lock() 291 defer sb.mu.Unlock() 292 return sb.b.String() 293} 294 295// pauseFileName if present in IndexDir will stop index jobs from 296// running. This is to make it possible to experiment with the content of the 297// IndexDir without the indexserver writing to it. 298const pauseFileName = "PAUSE" 299 300// Run the sync loop. This blocks forever. 301func (s *Server) Run() { 302 removeIncompleteShards(s.IndexDir) 303 304 // Start a goroutine which updates the queue with commits to index. 305 go func() { 306 // We update the list of indexed repos every Interval. To speed up manual 307 // testing we also listen for SIGUSR1 to trigger updates. 308 // 309 // "pkill -SIGUSR1 zoekt-sourcegra" 310 for range jitterTicker(s.Interval, unix.SIGUSR1) { 311 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 312 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 313 continue 314 } 315 316 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 317 if err != nil { 318 log.Printf("error listing repos: %s", err) 319 continue 320 } 321 322 debug.Printf("updating index queue with %d repositories", len(repos.IDs)) 323 324 // Stop indexing repos we don't need to track anymore 325 removed := s.queue.MaybeRemoveMissing(repos.IDs) 326 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 327 if len(removed) > 0 { 328 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 329 } 330 331 cleanupDone := make(chan struct{}) 332 go func() { 333 defer close(cleanupDone) 334 s.muIndexDir.Global(func() { 335 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 336 }) 337 }() 338 339 repos.IterateIndexOptions(s.queue.AddOrUpdate) 340 341 // IterateIndexOptions will only iterate over repositories that have 342 // changed since we last called list. However, we want to add all IDs 343 // back onto the queue just to check that what is on disk is still 344 // correct. This will use the last IndexOptions we stored in the 345 // queue. The repositories not on the queue (missing) need a forced 346 // fetch of IndexOptions. 347 missing := s.queue.Bump(repos.IDs) 348 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 349 350 setCompoundShardCounter(s.IndexDir) 351 352 <-cleanupDone 353 } 354 }() 355 356 go func() { 357 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 358 if s.shardMerging { 359 s.vacuum() 360 } 361 } 362 }() 363 364 go func() { 365 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 366 if s.shardMerging { 367 s.doMerge() 368 } 369 } 370 }() 371 372 for i := 0; i < s.IndexConcurrency; i++ { 373 go s.processQueue() 374 } 375 376 // block forever 377 select {} 378} 379 380// formatList returns a comma-separated list of the first min(len(v), m) items. 381func formatListUint32(v []uint32, m int) string { 382 if len(v) < m { 383 m = len(v) 384 } 385 386 sb := strings.Builder{} 387 for i := 0; i < m; i++ { 388 fmt.Fprintf(&sb, "%d, ", v[i]) 389 } 390 391 if len(v) > m { 392 sb.WriteString("...") 393 } 394 395 return strings.TrimRight(sb.String(), ", ") 396} 397 398func (s *Server) processQueue() { 399 for { 400 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 401 time.Sleep(time.Second) 402 continue 403 } 404 405 opts, ok := s.queue.Pop() 406 if !ok { 407 time.Sleep(time.Second) 408 continue 409 } 410 411 args := s.indexArgs(opts) 412 413 ran := s.muIndexDir.With(opts.Name, func() { 414 // only record time taken once we hold the lock. This avoids us 415 // recording time taken while merging/cleanup runs. 416 start := time.Now() 417 418 state, err := s.Index(args) 419 420 elapsed := time.Since(start) 421 422 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 423 424 if err != nil { 425 log.Printf("error indexing %s: %s", args.String(), err) 426 } 427 428 switch state { 429 case indexStateSuccess: 430 var branches []string 431 for _, b := range args.Branches { 432 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 433 } 434 s.logger.Info("updated index", 435 sglog.String("repo", args.Name), 436 sglog.Uint32("id", args.RepoID), 437 sglog.Strings("branches", branches), 438 sglog.Duration("duration", elapsed), 439 ) 440 case indexStateSuccessMeta: 441 log.Printf("updated meta %s in %v", args.String(), elapsed) 442 } 443 s.queue.SetIndexed(opts, state) 444 }) 445 446 if !ran { 447 // Someone else is processing the repository. We can just skip this job 448 // since the repository will be added back to the queue and we will 449 // converge to the correct behaviour. 450 debug.Printf("index job for repository already running: %s", args) 451 continue 452 } 453 } 454} 455 456// repoNameForMetric returns a normalized version of the given repository name that is 457// suitable for use with Prometheus metrics. 458func repoNameForMetric(repo string) string { 459 // Check to see if we want to be able to capture separate indexing metrics for this repository. 460 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 461 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 462 return repo 463 } 464 465 return "" 466} 467 468func batched(slice []uint32, size int) <-chan []uint32 { 469 c := make(chan []uint32) 470 go func() { 471 for len(slice) > 0 { 472 if size > len(slice) { 473 size = len(slice) 474 } 475 c <- slice[:size] 476 slice = slice[size:] 477 } 478 close(c) 479 }() 480 return c 481} 482 483// jitterTicker returns a ticker which ticks with a jitter. Each tick is 484// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 485// 486// sig is a list of signals which also cause the ticker to fire. This is a 487// convenience to allow manually triggering of the ticker. 488func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 489 ticker := make(chan struct{}) 490 491 go func() { 492 for { 493 ticker <- struct{}{} 494 ns := int64(d) 495 jitter := rand.Int63n(ns) 496 time.Sleep(time.Duration(ns/2 + jitter)) 497 } 498 }() 499 500 go func() { 501 if len(sig) == 0 { 502 return 503 } 504 505 c := make(chan os.Signal, 1) 506 signal.Notify(c, sig...) 507 for range c { 508 ticker <- struct{}{} 509 } 510 }() 511 512 return ticker 513} 514 515// Index starts an index job for repo name at commit. 516func (s *Server) Index(args *indexArgs) (state indexState, err error) { 517 tr := trace.New("index", args.Name) 518 519 defer func() { 520 if err != nil { 521 tr.SetError() 522 tr.LazyPrintf("error: %v", err) 523 state = indexStateFail 524 metricFailingTotal.Inc() 525 } 526 tr.LazyPrintf("state: %s", state) 527 tr.Finish() 528 }() 529 530 tr.LazyPrintf("branches: %v", args.Branches) 531 532 if len(args.Branches) == 0 { 533 return indexStateEmpty, createEmptyShard(args) 534 } 535 536 repositoryName := args.Name 537 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 538 tr.LazyPrintf("marking this repository for delta build") 539 args.UseDelta = true 540 } 541 542 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 543 544 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 545 tr.LazyPrintf("skipping symbols calculation") 546 args.Symbols = false 547 } 548 549 reason := "forced" 550 551 if args.Incremental { 552 bo := args.BuildOptions() 553 bo.SetDefaults() 554 incrementalState, fn := bo.IndexState() 555 reason = string(incrementalState) 556 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 557 558 switch incrementalState { 559 case build.IndexStateEqual: 560 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn) 561 return indexStateNoop, nil 562 563 case build.IndexStateMeta: 564 log.Printf("updating index.meta %s", args.String()) 565 566 if err := mergeMeta(bo); err != nil { 567 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 568 } else { 569 return indexStateSuccessMeta, nil 570 } 571 572 case build.IndexStateCorrupt: 573 log.Printf("falling back to full update: corrupt index: %s", args.String()) 574 } 575 } 576 577 log.Printf("updating index %s reason=%s", args.String(), reason) 578 579 metricIndexingTotal.Inc() 580 c := gitIndexConfig{ 581 runCmd: func(cmd *exec.Cmd) error { 582 return s.loggedRun(tr, cmd) 583 }, 584 585 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 586 return args.BuildOptions().FindRepositoryMetadata() 587 }, 588 } 589 590 err = gitIndex(c, args, s.Sourcegraph, s.logger) 591 if err != nil { 592 return indexStateFail, err 593 } 594 595 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 596 s.logger.Error("failed to update index status", 597 sglog.String("repo", args.Name), 598 sglog.Uint32("id", args.RepoID), 599 sglogBranches("branches", args.Branches), 600 sglog.Error(err), 601 ) 602 } 603 604 return indexStateSuccess, nil 605} 606 607// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 608// it can update the zoekt_repos table. 609func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 610 // We need to read from disk for IndexTime. 611 _, metadata, ok, err := c.findRepositoryMetadata(args) 612 if err != nil { 613 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 614 } 615 if !ok { 616 return errors.New("failed to find metadata for new/updated index") 617 } 618 619 status := []indexStatus{{ 620 RepoID: args.RepoID, 621 Branches: args.Branches, 622 IndexTimeUnix: metadata.IndexTime.Unix(), 623 }} 624 if err := sg.UpdateIndexStatus(status); err != nil { 625 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 626 } 627 628 return nil 629} 630 631func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 632 ss := make([]string, len(branches)) 633 for i, b := range branches { 634 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 635 } 636 return sglog.Strings(key, ss) 637} 638 639func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 640 return &indexArgs{ 641 IndexOptions: opts, 642 643 IndexDir: s.IndexDir, 644 Parallelism: s.CPUCount, 645 646 Incremental: true, 647 648 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 649 FileLimit: 1 << 20, 650 } 651} 652 653func createEmptyShard(args *indexArgs) error { 654 bo := args.BuildOptions() 655 bo.SetDefaults() 656 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 657 658 if args.Incremental && bo.IncrementalSkipIndexing() { 659 return nil 660 } 661 662 builder, err := build.NewBuilder(*bo) 663 if err != nil { 664 return err 665 } 666 return builder.Finish() 667} 668 669// addDebugHandlers adds handlers specific to indexserver. 670func (s *Server) addDebugHandlers(mux *http.ServeMux) { 671 // Sourcegraph's site admin view requires indexserver to serve it's admin view 672 // on "/". 673 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 674 675 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 676 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 677 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 678 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 679 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 680 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 681} 682 683func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 684 if r.Method != "GET" { 685 w.Header().Set("Allow", "GET") 686 w.WriteHeader(http.StatusMethodNotAllowed) 687 return 688 } 689 690 response := struct { 691 Hostname string 692 }{ 693 Hostname: s.hostname, 694 } 695 696 b, err := json.Marshal(response) 697 if err != nil { 698 http.Error(w, err.Error(), http.StatusInternalServerError) 699 return 700 } 701 702 w.Header().Set("Content-Type", "application/json; charset=utf-8") 703 w.Write(b) 704} 705 706var rootTmpl = template.Must(template.New("name").Parse(` 707<html> 708 <body> 709 <a href="debug">Debug</a><br /> 710 <a href="debug/requests">Traces</a><br /> 711 {{.IndexMsg}}<br /> 712 <br /> 713 <h3>Reindex</h3> 714 {{if .Repos}} 715 <a href="?show_repos=false">hide repos</a><br /> 716 <table style="margin-top: 20px"> 717 <th style="text-align:left">Name</th> 718 <th style="text-align:left">ID</th> 719 {{range .Repos}} 720 <tr> 721 <td>{{.Name}}</td> 722 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 723 </tr> 724 {{end}} 725 </table> 726 {{else}} 727 <a href="?show_repos=true">show repos</a><br /> 728 {{end}} 729 </body> 730</html> 731`)) 732 733func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 734 if r.Method != "GET" { 735 w.Header().Set("Allow", "GET") 736 w.WriteHeader(http.StatusMethodNotAllowed) 737 return 738 } 739 740 values := r.URL.Query() 741 742 // ?id= 743 indexMsg := "" 744 if v := values.Get("id"); v != "" { 745 id, err := strconv.Atoi(v) 746 if err != nil { 747 http.Error(w, err.Error(), http.StatusBadRequest) 748 return 749 } 750 indexMsg, _ = s.forceIndex(uint32(id)) 751 } 752 753 // ?show_repos= 754 showRepos := false 755 if v := values.Get("show_repos"); v != "" { 756 showRepos, _ = strconv.ParseBool(v) 757 } 758 759 type Repo struct { 760 ID uint32 761 Name string 762 } 763 var data struct { 764 Repos []Repo 765 IndexMsg string 766 } 767 768 data.IndexMsg = indexMsg 769 770 if showRepos { 771 s.queue.Iterate(func(opts *IndexOptions) { 772 data.Repos = append(data.Repos, Repo{ 773 ID: opts.RepoID, 774 Name: opts.Name, 775 }) 776 }) 777 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 778 } 779 780 _ = rootTmpl.Execute(w, data) 781} 782 783// handleReindex triggers a reindex asynocronously. If a reindex was triggered 784// the request returns with status 202. The caller can infer the new state of 785// the index by calling List. 786func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 787 if r.Method != http.MethodPost { 788 w.Header().Set("Allow", http.MethodPost) 789 w.WriteHeader(http.StatusMethodNotAllowed) 790 return 791 } 792 793 err := r.ParseForm() 794 if err != nil { 795 http.Error(w, err.Error(), http.StatusBadRequest) 796 return 797 } 798 799 id, err := strconv.Atoi(r.Form.Get("repo")) 800 if err != nil { 801 http.Error(w, err.Error(), http.StatusBadRequest) 802 return 803 } 804 805 go func() { s.forceIndex(uint32(id)) }() 806 807 // 202 Accepted 808 w.WriteHeader(http.StatusAccepted) 809} 810 811func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 812 withIndexed := true 813 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 814 withIndexed = b 815 } 816 817 var indexed []uint32 818 if withIndexed { 819 indexed = listIndexed(s.IndexDir) 820 } 821 822 repos, err := s.Sourcegraph.List(r.Context(), indexed) 823 if err != nil { 824 http.Error(w, err.Error(), http.StatusInternalServerError) 825 return 826 } 827 828 bw := bytes.Buffer{} 829 830 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 831 832 _, err = fmt.Fprintf(tw, "ID\tName\n") 833 if err != nil { 834 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 835 return 836 } 837 838 s.queue.mu.Lock() 839 name := "" 840 for _, id := range repos.IDs { 841 if item := s.queue.get(id); item != nil { 842 name = item.opts.Name 843 } else { 844 name = "" 845 } 846 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 847 if err != nil { 848 debug.Printf("handleDebugList: %s\n", err.Error()) 849 } 850 } 851 s.queue.mu.Unlock() 852 853 if err != nil { 854 http.Error(w, err.Error(), http.StatusInternalServerError) 855 return 856 } 857 858 err = tw.Flush() 859 if err != nil { 860 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 861 return 862 } 863 864 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 865 866 if _, err := io.Copy(w, &bw); err != nil { 867 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 868 return 869 } 870} 871 872// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 873// can run this command during periods of low usage (evenings, weekends) to 874// trigger an initial merge run. In the steady-state, merges happen rarely, even 875// on busy instances, and users can rely on automatic merging instead. 876func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 877 878 // A merge operation can take very long, depending on the number merges and the 879 // target size of the compound shards. We run the merge in the background and 880 // return immediately to the user. 881 // 882 // We track the status of the merge with metricShardMergingRunning. 883 go func() { 884 s.doMerge() 885 }() 886 _, _ = w.Write([]byte("merging enqueued\n")) 887} 888 889func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 890 indexed := listIndexed(s.IndexDir) 891 892 bw := bytes.Buffer{} 893 894 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 895 896 _, err := fmt.Fprintf(tw, "ID\tName\n") 897 if err != nil { 898 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 899 return 900 } 901 902 s.queue.mu.Lock() 903 name := "" 904 for _, id := range indexed { 905 if item := s.queue.get(id); item != nil { 906 name = item.opts.Name 907 } else { 908 name = "" 909 } 910 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 911 if err != nil { 912 debug.Printf("handleDebugIndexed: %s\n", err.Error()) 913 } 914 } 915 s.queue.mu.Unlock() 916 917 if err != nil { 918 http.Error(w, err.Error(), http.StatusInternalServerError) 919 return 920 } 921 922 err = tw.Flush() 923 if err != nil { 924 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 925 return 926 } 927 928 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 929 930 if _, err := io.Copy(w, &bw); err != nil { 931 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 932 return 933 } 934} 935 936// forceIndex will run the index job for repo name now. It will return always 937// return a string explaining what it did, even if it failed. 938func (s *Server) forceIndex(id uint32) (string, error) { 939 var opts IndexOptions 940 var err error 941 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 942 opts = o 943 }, func(_ uint32, e error) { 944 err = e 945 }, id) 946 if err != nil { 947 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 948 } 949 950 args := s.indexArgs(opts) 951 args.Incremental = false // force re-index 952 953 var state indexState 954 ran := s.muIndexDir.With(opts.Name, func() { 955 state, err = s.Index(args) 956 }) 957 if !ran { 958 return fmt.Sprintf("index job for repository already running: %s", args), nil 959 } 960 if err != nil { 961 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 962 } 963 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 964} 965 966func listIndexed(indexDir string) []uint32 { 967 index := getShards(indexDir) 968 metricNumIndexed.Set(float64(len(index))) 969 repoIDs := make([]uint32, 0, len(index)) 970 for id := range index { 971 repoIDs = append(repoIDs, id) 972 } 973 sort.Slice(repoIDs, func(i, j int) bool { 974 return repoIDs[i] < repoIDs[j] 975 }) 976 return repoIDs 977} 978 979// setupTmpDir sets up a temporary directory on the same volume as the 980// indexes. 981// 982// If main is true we will delete older temp directories left around. main is 983// false when this is a debug command. 984func setupTmpDir(logger sglog.Logger, main bool, index string) error { 985 // change the target tmp directory depending on if it's our main daemon or a 986 // debug sub command. 987 dir := ".indexserver.debug.tmp" 988 if main { 989 dir = ".indexserver.tmp" 990 } 991 992 tmpRoot := filepath.Join(index, dir) 993 994 if main { 995 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 996 err := os.RemoveAll(tmpRoot) 997 if err != nil { 998 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 999 } 1000 } 1001 1002 if err := os.MkdirAll(tmpRoot, 0755); err != nil { 1003 return err 1004 } 1005 1006 return os.Setenv("TMPDIR", tmpRoot) 1007} 1008 1009func printMetaData(fn string) error { 1010 repo, indexMeta, err := zoekt.ReadMetadataPath(fn) 1011 if err != nil { 1012 return err 1013 } 1014 1015 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1016 if err != nil { 1017 return err 1018 } 1019 1020 err = json.NewEncoder(os.Stdout).Encode(repo) 1021 if err != nil { 1022 return err 1023 } 1024 return nil 1025} 1026 1027func printShardStats(fn string) error { 1028 f, err := os.Open(fn) 1029 if err != nil { 1030 return err 1031 } 1032 1033 iFile, err := zoekt.NewIndexFile(f) 1034 if err != nil { 1035 return err 1036 } 1037 1038 return zoekt.PrintNgramStats(iFile) 1039} 1040 1041func srcLogLevelIsDebug() bool { 1042 lvl := os.Getenv(sglog.EnvLogLevel) 1043 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1044} 1045 1046func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1047 v := os.Getenv(k) 1048 if v == "" { 1049 return defaultVal 1050 } 1051 i, err := strconv.ParseInt(v, 10, 64) 1052 if err != nil { 1053 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1054 } 1055 return i 1056} 1057 1058func getEnvWithDefaultInt(k string, defaultVal int) int { 1059 v := os.Getenv(k) 1060 if v == "" { 1061 return defaultVal 1062 } 1063 i, err := strconv.Atoi(k) 1064 if err != nil { 1065 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1066 } 1067 return i 1068} 1069 1070func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1071 v := os.Getenv(k) 1072 if v == "" { 1073 return defaultVal 1074 } 1075 i, err := strconv.ParseUint(v, 10, 64) 1076 if err != nil { 1077 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1078 } 1079 return i 1080} 1081 1082func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1083 v := os.Getenv(k) 1084 if v == "" { 1085 return defaultVal 1086 } 1087 f, err := strconv.ParseFloat(v, 64) 1088 if err != nil { 1089 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1090 } 1091 return f 1092} 1093 1094func getEnvWithDefaultString(k string, defaultVal string) string { 1095 v := os.Getenv(k) 1096 if v == "" { 1097 return defaultVal 1098 } 1099 return v 1100} 1101 1102func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1103 v := os.Getenv(k) 1104 if v == "" { 1105 return defaultVal 1106 } 1107 1108 d, err := time.ParseDuration(v) 1109 if err != nil { 1110 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1111 } 1112 return d 1113} 1114 1115func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1116 v := os.Getenv(k) 1117 if v == "" { 1118 return defaultVal 1119 } 1120 1121 b, err := strconv.ParseBool(v) 1122 if err != nil { 1123 log.Fatalf("error parsing ENV %s to bool: %s", k, err) 1124 } 1125 return b 1126} 1127 1128func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1129 set := map[string]struct{}{} 1130 for _, v := range strings.Split(os.Getenv(k), ",") { 1131 v = strings.TrimSpace(v) 1132 if v != "" { 1133 set[v] = struct{}{} 1134 } 1135 } 1136 return set 1137} 1138 1139func joinStringSet(set map[string]struct{}, sep string) string { 1140 var xs []string 1141 for x := range set { 1142 xs = append(xs, x) 1143 } 1144 1145 return strings.Join(xs, sep) 1146} 1147 1148func setCompoundShardCounter(indexDir string) { 1149 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt")) 1150 if err != nil { 1151 log.Printf("setCompoundShardCounter: %s\n", err) 1152 return 1153 } 1154 metricNumberCompoundShards.Set(float64(len(fns))) 1155} 1156 1157func rootCmd() *ffcli.Command { 1158 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1159 conf := rootConfig{ 1160 Main: true, 1161 } 1162 conf.registerRootFlags(rootFs) 1163 1164 return &ffcli.Command{ 1165 FlagSet: rootFs, 1166 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1167 Subcommands: []*ffcli.Command{debugCmd()}, 1168 Exec: func(ctx context.Context, args []string) error { 1169 return startServer(conf) 1170 }, 1171 } 1172} 1173 1174type rootConfig struct { 1175 // Main is true if this rootConfig is for our main long running command (the 1176 // indexserver). Debug commands should not set this value. This is used to 1177 // determine if we need to run tmpfriend. 1178 Main bool 1179 1180 root string 1181 interval time.Duration 1182 index string 1183 indexConcurrency int64 1184 listen string 1185 hostname string 1186 cpuFraction float64 1187 blockProfileRate int 1188 1189 // config values related to shard merging 1190 vacuumInterval time.Duration 1191 mergeInterval time.Duration 1192 targetSize int64 1193 minSize int64 1194 minAgeDays int 1195 maxPriority float64 1196 1197 // config values related to backoff indexing repos with one or more consecutive failures 1198 backoffDuration time.Duration 1199 maxBackoffDuration time.Duration 1200 1201 // useGRPC is true if we should use the gRPC API to talk to Sourcegraph. 1202 useGRPC bool 1203} 1204 1205func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1206 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1207 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1208 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of concurrent index jobs to run.") 1209 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1210 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1211 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1212 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1213 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate") 1214 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1215 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1216 fs.BoolVar(&rc.useGRPC, "use_grpc", mustGetBoolFromEnvironmentVariables([]string{"GRPC_ENABLED", "SG_FEATURE_FLAG_GRPC"}, true), "use the gRPC API to talk to Sourcegraph") 1217 1218 // flags related to shard merging 1219 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1220 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1221 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 1222 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 1223 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1224 fs.Float64Var(&rc.maxPriority, "merge_max_priority", getEnvWithDefaultFloat64("SRC_MERGE_MAX_PRIORITY", 100), "the maximum priority a shard can have to be considered for merging.") 1225} 1226 1227func startServer(conf rootConfig) error { 1228 s, err := newServer(conf) 1229 if err != nil { 1230 return err 1231 } 1232 1233 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate) 1234 setCompoundShardCounter(s.IndexDir) 1235 1236 if conf.listen != "" { 1237 1238 mux := http.NewServeMux() 1239 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1240 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1241 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1242 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1243 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1244 }...) 1245 s.addDebugHandlers(mux) 1246 1247 go func() { 1248 debug.Printf("serving HTTP on %s", conf.listen) 1249 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1250 1251 }() 1252 1253 // Serve mux on a unix domain socket on a best-effort-basis so that 1254 // webserver can call the endpoints via the shared filesystem. 1255 // 1256 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1257 // on the socket due to permission errors. See 1258 // https://github.com/docker/for-mac/issues/6239 1259 go func() { 1260 serveHTTPOverSocket := func() error { 1261 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1262 // We cannot bind a socket to an existing pathname. 1263 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1264 return fmt.Errorf("error removing socket file: %s", socket) 1265 } 1266 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1267 // unixpacket). 1268 l, err := net.Listen("unix", socket) 1269 if err != nil { 1270 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1271 } 1272 // Indexserver (root) and webserver (Sourcegraph) run with 1273 // different users. Per default, the socket is created with 1274 // permission 755 (root root), which doesn't let webserver write to 1275 // it. 1276 // 1277 // See https://github.com/golang/go/issues/11822 for more context. 1278 if err := os.Chmod(socket, 0777); err != nil { 1279 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1280 } 1281 debug.Printf("serving HTTP on %s", socket) 1282 return http.Serve(l, mux) 1283 } 1284 debug.Print(serveHTTPOverSocket()) 1285 }() 1286 } 1287 1288 oc := &ownerChecker{ 1289 Path: filepath.Join(conf.index, "owner.txt"), 1290 Hostname: conf.hostname, 1291 } 1292 go oc.Run() 1293 1294 logger := sglog.Scoped("metricsRegistration", "") 1295 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1296 1297 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1298 prometheus.DefaultRegisterer.MustRegister(c) 1299 1300 s.Run() 1301 return nil 1302} 1303 1304func newServer(conf rootConfig) (*Server, error) { 1305 logger := sglog.Scoped("server", "periodically reindexes enabled repositories on sourcegraph") 1306 1307 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1308 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1309 } 1310 if conf.index == "" { 1311 return nil, fmt.Errorf("must set -index") 1312 } 1313 if conf.root == "" { 1314 return nil, fmt.Errorf("must set -sourcegraph_url") 1315 } 1316 rootURL, err := url.Parse(conf.root) 1317 if err != nil { 1318 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1319 } 1320 1321 rootURL = addDefaultPort(rootURL) 1322 1323 // Tune GOMAXPROCS to match Linux container CPU quota. 1324 _, _ = maxprocs.Set() 1325 1326 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler. 1327 // The block profiler is disabled by default and should be enabled with care in production 1328 runtime.SetBlockProfileRate(conf.blockProfileRate) 1329 1330 // Automatically prepend our own path at the front, to minimize 1331 // required configuration. 1332 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1333 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1334 } 1335 1336 if _, err := os.Stat(conf.index); err != nil { 1337 if err := os.MkdirAll(conf.index, 0755); err != nil { 1338 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1339 } 1340 } 1341 1342 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1343 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1344 } 1345 1346 if srcLogLevelIsDebug() { 1347 debug = log.New(os.Stderr, "", log.LstdFlags) 1348 } 1349 1350 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1351 if len(reposWithSeparateIndexingMetrics) > 0 { 1352 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1353 } 1354 1355 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1356 if len(deltaBuildRepositoriesAllowList) > 0 { 1357 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1358 } 1359 1360 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1361 if deltaShardNumberFallbackThreshold > 0 { 1362 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1363 } else { 1364 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1365 } 1366 1367 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1368 if len(reposShouldSkipSymbolsCalculation) > 0 { 1369 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1370 } 1371 1372 var sg Sourcegraph 1373 if rootURL.IsAbs() { 1374 var batchSize int 1375 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1376 batchSize, err = strconv.Atoi(v) 1377 if err != nil { 1378 return nil, fmt.Errorf("Invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1379 } 1380 } 1381 1382 opts := []SourcegraphClientOption{ 1383 WithBatchSize(batchSize), 1384 WithShouldUseGRPC(conf.useGRPC), 1385 } 1386 1387 logger := sglog.Scoped("zoektConfigurationGRPCClient", "") 1388 client, err := dialGRPCClient(rootURL.Host, logger) 1389 if err != nil { 1390 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1391 } 1392 1393 opts = append(opts, WithGRPCClient(client)) 1394 sg = newSourcegraphClient(rootURL, conf.hostname, opts...) 1395 1396 } else { 1397 sg = sourcegraphFake{ 1398 RootDir: rootURL.String(), 1399 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1400 } 1401 } 1402 1403 if conf.indexConcurrency < 1 { 1404 conf.indexConcurrency = 1 1405 } 1406 1407 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1408 if cpuCount < 1 { 1409 cpuCount = 1 1410 } 1411 1412 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1413 1414 return &Server{ 1415 logger: logger, 1416 Sourcegraph: sg, 1417 IndexDir: conf.index, 1418 IndexConcurrency: int(conf.indexConcurrency), 1419 Interval: conf.interval, 1420 CPUCount: cpuCount, 1421 queue: *q, 1422 shardMerging: zoekt.ShardMergingEnabled(), 1423 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1424 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1425 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1426 hostname: conf.hostname, 1427 mergeOpts: mergeOpts{ 1428 vacuumInterval: conf.vacuumInterval, 1429 mergeInterval: conf.mergeInterval, 1430 targetSizeBytes: conf.targetSize * 1024 * 1024, 1431 minSizeBytes: conf.minSize * 1024 * 1024, 1432 minAgeDays: conf.minAgeDays, 1433 maxPriority: conf.maxPriority, 1434 }, 1435 }, err 1436} 1437 1438// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1439// for the indexed-search-configuration gRPC service. 1440// 1441// The default backoff strategy is modeled after the default settings used by 1442// retryablehttp.DefaultClient. 1443// 1444// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1445// - Unavailable 1446// - Aborted 1447// 1448//go:embed default_grpc_service_configuration.json 1449var defaultGRPCServiceConfigurationJSON string 1450 1451func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1452 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1453 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1454 return invoker(ctx, method, req, reply, cc, opts...) 1455 } 1456} 1457 1458func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1459 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1460 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1461 return streamer(ctx, desc, cc, method, opts...) 1462 } 1463} 1464 1465// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1466// This can be overridden by providing custom Server/Dial options. 1467const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1468 1469func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) { 1470 metrics := mustGetClientMetrics() 1471 1472 // If the service seems to be unavailable, this 1473 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1474 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1475 retryOpts := []retry.CallOption{ 1476 retry.WithMax(5), 1477 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1478 retry.WithCodes(codes.Unavailable), 1479 } 1480 1481 opts := []grpc.DialOption{ 1482 grpc.WithTransportCredentials(insecure.NewCredentials()), 1483 grpc.WithChainStreamInterceptor( 1484 metrics.StreamClientInterceptor(), 1485 messagesize.StreamClientInterceptor, 1486 internalActorStreamInterceptor(), 1487 internalerrs.LoggingStreamClientInterceptor(logger), 1488 internalerrs.PrometheusStreamClientInterceptor, 1489 retry.StreamClientInterceptor(retryOpts...), 1490 ), 1491 grpc.WithChainUnaryInterceptor( 1492 metrics.UnaryClientInterceptor(), 1493 messagesize.UnaryClientInterceptor, 1494 internalActorUnaryInterceptor(), 1495 internalerrs.LoggingUnaryClientInterceptor(logger), 1496 internalerrs.PrometheusUnaryClientInterceptor, 1497 retry.UnaryClientInterceptor(retryOpts...), 1498 ), 1499 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1500 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1501 } 1502 1503 opts = append(opts, additionalOpts...) 1504 1505 // Ensure that the message size options are set last, so they override any other 1506 // client-specific options that tweak the message size. 1507 // 1508 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1509 // take precedence over everything else with a uniform size setting that's easy to reason about. 1510 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1511 1512 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1513 // This is done lazily, so we can provide the client to use regardless of 1514 // whether we enabled gRPC or not initially. 1515 cc, err := grpc.Dial(addr, opts...) 1516 if err != nil { 1517 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1518 } 1519 1520 client := proto.NewZoektConfigurationServiceClient(cc) 1521 return client, nil 1522} 1523 1524// mustGetClientMetrics returns a singleton instance of the client metrics 1525// that are shared across all gRPC clients that this process creates. 1526// 1527// This function panics if the metrics cannot be registered with the default 1528// Prometheus registry. 1529func mustGetClientMetrics() *grpcprom.ClientMetrics { 1530 clientMetricsOnce.Do(func() { 1531 clientMetrics = grpcprom.NewClientMetrics( 1532 grpcprom.WithClientCounterOptions(), 1533 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 1534 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 1535 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 1536 ) 1537 1538 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 1539 }) 1540 1541 return clientMetrics 1542} 1543 1544// addDefaultPort adds a default port to a URL if one is not specified. 1545// 1546// If the URL scheme is "http" and no port is specified, "80" is used. 1547// If the scheme is "https", "443" is used. 1548// 1549// The original URL is not mutated. A copy is modified and returned. 1550func addDefaultPort(original *url.URL) *url.URL { 1551 if original == nil { 1552 return nil // don't panic 1553 } 1554 1555 if !original.IsAbs() { 1556 return original // don't do anything if the URL doesn't look like a remote URL 1557 } 1558 1559 if original.Scheme == "http" && original.Port() == "" { 1560 u := cloneURL(original) 1561 u.Host = net.JoinHostPort(u.Host, "80") 1562 return u 1563 } 1564 1565 if original.Scheme == "https" && original.Port() == "" { 1566 u := cloneURL(original) 1567 u.Host = net.JoinHostPort(u.Host, "443") 1568 return u 1569 } 1570 1571 return original 1572} 1573 1574// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1575// This is copied from net/http/clone.go 1576func cloneURL(u *url.URL) *url.URL { 1577 if u == nil { 1578 return nil 1579 } 1580 u2 := new(url.URL) 1581 *u2 = *u 1582 if u.User != nil { 1583 u2.User = new(url.Userinfo) 1584 *u2.User = *u.User 1585 } 1586 return u2 1587} 1588 1589func main() { 1590 liblog := sglog.Init(sglog.Resource{ 1591 Name: "zoekt-indexserver", 1592 Version: zoekt.Version, 1593 InstanceID: zoekt.HostnameBestEffort(), 1594 }) 1595 defer liblog.Sync() 1596 1597 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1598 log.Fatal(err) 1599 } 1600} 1601 1602// mustGetBoolFromEnvironmentVariables is like getBoolFromEnvironmentVariables, but it panics 1603// if any of the provided environment variables fails to parse as a boolean. 1604func mustGetBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) bool { 1605 value, err := getBoolFromEnvironmentVariables(envVarNames, defaultBool) 1606 if err != nil { 1607 panic(err) 1608 } 1609 1610 return value 1611} 1612 1613// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1614// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1615// 1616// An error is returned of the provided environment variables fails to parse as a boolean. 1617func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1618 for _, envVar := range envVarNames { 1619 v := os.Getenv(envVar) 1620 if v == "" { 1621 continue 1622 } 1623 1624 b, err := strconv.ParseBool(v) 1625 if err != nil { 1626 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1627 } 1628 1629 return b, nil 1630 } 1631 1632 return defaultBool, nil 1633}