fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled 2// repositories on sourcegraph 3package main 4 5import ( 6 "bytes" 7 "context" 8 _ "embed" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "html/template" 14 "io" 15 "io/fs" 16 "log" 17 "math" 18 "math/rand" 19 "net" 20 "net/http" 21 "net/url" 22 "os" 23 "os/exec" 24 "os/signal" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strconv" 29 "strings" 30 "sync" 31 "text/tabwriter" 32 "time" 33 34 grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/openmetrics/v2" 35 "github.com/keegancsmith/tmpfriend" 36 "github.com/peterbourgon/ff/v3/ffcli" 37 "github.com/prometheus/client_golang/prometheus" 38 "github.com/prometheus/client_golang/prometheus/promauto" 39 sglog "github.com/sourcegraph/log" 40 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 41 "github.com/sourcegraph/zoekt/grpc/internalerrs" 42 "github.com/sourcegraph/zoekt/grpc/messagesize" 43 "go.uber.org/automaxprocs/maxprocs" 44 "golang.org/x/net/trace" 45 "golang.org/x/sys/unix" 46 "google.golang.org/grpc" 47 "google.golang.org/grpc/credentials/insecure" 48 "google.golang.org/grpc/metadata" 49 50 "github.com/sourcegraph/mountinfo" 51 52 "github.com/sourcegraph/zoekt" 53 "github.com/sourcegraph/zoekt/build" 54 "github.com/sourcegraph/zoekt/debugserver" 55 "github.com/sourcegraph/zoekt/internal/profiler" 56) 57 58var ( 59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 60 Name: "resolve_revisions_seconds", 61 Help: "A histogram of latencies for resolving all repository revisions.", 62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 63 }) 64 65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 66 Name: "resolve_revision_seconds", 67 Help: "A histogram of latencies for resolving a repository revision.", 68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 69 }, []string{"success"}) // success=true|false 70 71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 72 Name: "get_index_options_total", 73 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 74 }) 75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 76 Name: "get_index_options_error_total", 77 Help: "The total number of times we failed to get index options for a repository.", 78 }) 79 80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 81 Name: "index_repo_seconds", 82 Help: "A histogram of latencies for indexing a repository.", 83 Buckets: prometheus.ExponentialBucketsRange( 84 (100 * time.Millisecond).Seconds(), 85 (40*time.Minute + indexTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 86 20), 87 }, []string{ 88 "state", // state is an indexState 89 "name", // name of the repository that was indexed 90 }) 91 92 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 93 Name: "index_fetch_seconds", 94 Help: "A histogram of latencies for fetching a repository.", 95 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 96 }, []string{ 97 "success", // true|false 98 "name", // the name of the repository that the commits were fetched from 99 }) 100 101 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 102 Name: "index_incremental_index_state", 103 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 104 }, []string{"state"}) // state is build.IndexState 105 106 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 107 Name: "index_num_indexed", 108 Help: "Number of indexed repos by code host", 109 }) 110 111 metricNumAssigned = promauto.NewGauge(prometheus.GaugeOpts{ 112 Name: "index_num_assigned", 113 Help: "Number of repos assigned to this indexer by code host", 114 }) 115 116 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 117 Name: "index_failing_total", 118 Help: "Counts failures to index (indexing activity, should be used with rate())", 119 }) 120 121 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 122 Name: "index_indexing_total", 123 Help: "Counts indexings (indexing activity, should be used with rate())", 124 }) 125 126 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 127 Name: "index_num_stopped_tracking_total", 128 Help: "Counts the number of repos we stopped tracking.", 129 }) 130 131 clientMetricsOnce sync.Once 132 clientMetrics *grpc_prometheus.ClientMetrics 133) 134 135// set of repositories that we want to capture separate indexing metrics for 136var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 137 138type indexState string 139 140const ( 141 indexStateFail indexState = "fail" 142 indexStateSuccess indexState = "success" 143 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 144 indexStateNoop indexState = "noop" // We didn't need to update index 145 indexStateEmpty indexState = "empty" // index is empty (empty repo) 146) 147 148// Server is the main functionality of zoekt-sourcegraph-indexserver. It 149// exists to conveniently use all the options passed in via func main. 150type Server struct { 151 logger sglog.Logger 152 153 Sourcegraph Sourcegraph 154 BatchSize int 155 156 // IndexDir is the index directory to use. 157 IndexDir string 158 159 // IndexConcurrency is the number of repositories we index at once. 160 IndexConcurrency int 161 162 // Interval is how often we sync with Sourcegraph. 163 Interval time.Duration 164 // CPUCount is the amount of parallelism to use when indexing a 165 // repository. 166 CPUCount int 167 168 queue Queue 169 170 // muIndexDir protects the index directory from concurrent access. 171 muIndexDir indexMutex 172 173 // If true, shard merging is enabled. 174 shardMerging bool 175 176 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 177 // use delta-builds for instead of normal builds 178 deltaBuildRepositoriesAllowList map[string]struct{} 179 180 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 181 // before attempting a delta build. 182 deltaShardNumberFallbackThreshold uint64 183 184 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 185 // we skip calculating symbols metadata for during builds 186 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 187 188 hostname string 189 190 mergeOpts mergeOpts 191} 192 193var debug = log.New(io.Discard, "", log.LstdFlags) 194 195// our index commands should output something every 100mb they process. 196// 197// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 198// time." famous last words. A client was indexing a monorepo with 42 199// cores... 5m was not enough. 200const noOutputTimeout = 30 * time.Minute 201 202func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 203 out := &synchronizedBuffer{} 204 cmd.Stdout = out 205 cmd.Stderr = out 206 207 tr.LazyPrintf("%s", cmd.Args) 208 209 defer func() { 210 if err != nil { 211 outS := out.String() 212 tr.LazyPrintf("failed: %v", err) 213 tr.LazyPrintf("output: %s", out) 214 tr.SetError() 215 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 216 } 217 }() 218 219 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 220 221 if err := cmd.Start(); err != nil { 222 return err 223 } 224 225 errC := make(chan error) 226 go func() { 227 errC <- cmd.Wait() 228 }() 229 230 // This channel is set after we have sent sigquit. It allows us to follow up 231 // with a sigkill if the process doesn't quit after sigquit. 232 kill := make(<-chan time.Time) 233 234 lastLen := 0 235 for { 236 select { 237 case <-time.After(noOutputTimeout): 238 // Periodically check if we have had output. If not kill the process. 239 if out.Len() != lastLen { 240 lastLen = out.Len() 241 log.Printf("still running %s", cmd.Args) 242 } else { 243 // Send quit (C-\) first so we get a stack dump. 244 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 245 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 246 log.Println("quit failed:", err) 247 } 248 249 // send sigkill if still running in 10s 250 kill = time.After(10 * time.Second) 251 } 252 253 case <-kill: 254 log.Printf("still running, killing %s", cmd.Args) 255 if err := cmd.Process.Kill(); err != nil { 256 log.Println("kill failed:", err) 257 } 258 259 case err := <-errC: 260 if err != nil { 261 return err 262 } 263 264 tr.LazyPrintf("success") 265 return nil 266 } 267 } 268} 269 270// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 271// monitor the buffer while it is being written to. 272type synchronizedBuffer struct { 273 mu sync.Mutex 274 b bytes.Buffer 275} 276 277func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 278 sb.mu.Lock() 279 defer sb.mu.Unlock() 280 return sb.b.Write(p) 281} 282 283func (sb *synchronizedBuffer) Len() int { 284 sb.mu.Lock() 285 defer sb.mu.Unlock() 286 return sb.b.Len() 287} 288 289func (sb *synchronizedBuffer) String() string { 290 sb.mu.Lock() 291 defer sb.mu.Unlock() 292 return sb.b.String() 293} 294 295// pauseFileName if present in IndexDir will stop index jobs from 296// running. This is to make it possible to experiment with the content of the 297// IndexDir without the indexserver writing to it. 298const pauseFileName = "PAUSE" 299 300// Run the sync loop. This blocks forever. 301func (s *Server) Run() { 302 removeIncompleteShards(s.IndexDir) 303 304 // Start a goroutine which updates the queue with commits to index. 305 go func() { 306 // We update the list of indexed repos every Interval. To speed up manual 307 // testing we also listen for SIGUSR1 to trigger updates. 308 // 309 // "pkill -SIGUSR1 zoekt-sourcegra" 310 for range jitterTicker(s.Interval, unix.SIGUSR1) { 311 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 312 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 313 continue 314 } 315 316 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 317 if err != nil { 318 log.Printf("error listing repos: %s", err) 319 continue 320 } 321 322 debug.Printf("updating index queue with %d repositories", len(repos.IDs)) 323 324 // Stop indexing repos we don't need to track anymore 325 removed := s.queue.MaybeRemoveMissing(repos.IDs) 326 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 327 if len(removed) > 0 { 328 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 329 } 330 331 cleanupDone := make(chan struct{}) 332 go func() { 333 defer close(cleanupDone) 334 s.muIndexDir.Global(func() { 335 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 336 }) 337 }() 338 339 repos.IterateIndexOptions(s.queue.AddOrUpdate) 340 341 // IterateIndexOptions will only iterate over repositories that have 342 // changed since we last called list. However, we want to add all IDs 343 // back onto the queue just to check that what is on disk is still 344 // correct. This will use the last IndexOptions we stored in the 345 // queue. The repositories not on the queue (missing) need a forced 346 // fetch of IndexOptions. 347 missing := s.queue.Bump(repos.IDs) 348 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 349 350 setCompoundShardCounter(s.IndexDir) 351 352 <-cleanupDone 353 } 354 }() 355 356 go func() { 357 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 358 if s.shardMerging { 359 s.vacuum() 360 } 361 } 362 }() 363 364 go func() { 365 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 366 if s.shardMerging { 367 s.doMerge() 368 } 369 } 370 }() 371 372 for i := 0; i < s.IndexConcurrency; i++ { 373 go s.processQueue() 374 } 375 376 // block forever 377 select {} 378} 379 380// formatList returns a comma-separated list of the first min(len(v), m) items. 381func formatListUint32(v []uint32, m int) string { 382 if len(v) < m { 383 m = len(v) 384 } 385 386 sb := strings.Builder{} 387 for i := 0; i < m; i++ { 388 fmt.Fprintf(&sb, "%d, ", v[i]) 389 } 390 391 if len(v) > m { 392 sb.WriteString("...") 393 } 394 395 return strings.TrimRight(sb.String(), ", ") 396} 397 398func (s *Server) processQueue() { 399 for { 400 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 401 time.Sleep(time.Second) 402 continue 403 } 404 405 opts, ok := s.queue.Pop() 406 if !ok { 407 time.Sleep(time.Second) 408 continue 409 } 410 411 args := s.indexArgs(opts) 412 413 ran := s.muIndexDir.With(opts.Name, func() { 414 // only record time taken once we hold the lock. This avoids us 415 // recording time taken while merging/cleanup runs. 416 start := time.Now() 417 418 state, err := s.Index(args) 419 420 elapsed := time.Since(start) 421 422 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 423 424 if err != nil { 425 log.Printf("error indexing %s: %s", args.String(), err) 426 } 427 428 switch state { 429 case indexStateSuccess: 430 var branches []string 431 for _, b := range args.Branches { 432 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 433 } 434 s.logger.Info("updated index", 435 sglog.String("repo", args.Name), 436 sglog.Uint32("id", args.RepoID), 437 sglog.Strings("branches", branches), 438 sglog.Duration("duration", elapsed), 439 ) 440 case indexStateSuccessMeta: 441 log.Printf("updated meta %s in %v", args.String(), elapsed) 442 } 443 s.queue.SetIndexed(opts, state) 444 }) 445 446 if !ran { 447 // Someone else is processing the repository. We can just skip this job 448 // since the repository will be added back to the queue and we will 449 // converge to the correct behaviour. 450 debug.Printf("index job for repository already running: %s", args) 451 continue 452 } 453 } 454} 455 456// repoNameForMetric returns a normalized version of the given repository name that is 457// suitable for use with Prometheus metrics. 458func repoNameForMetric(repo string) string { 459 // Check to see if we want to be able to capture separate indexing metrics for this repository. 460 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 461 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 462 return repo 463 } 464 465 return "" 466} 467 468func batched(slice []uint32, size int) <-chan []uint32 { 469 c := make(chan []uint32) 470 go func() { 471 for len(slice) > 0 { 472 if size > len(slice) { 473 size = len(slice) 474 } 475 c <- slice[:size] 476 slice = slice[size:] 477 } 478 close(c) 479 }() 480 return c 481} 482 483// jitterTicker returns a ticker which ticks with a jitter. Each tick is 484// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 485// 486// sig is a list of signals which also cause the ticker to fire. This is a 487// convenience to allow manually triggering of the ticker. 488func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 489 ticker := make(chan struct{}) 490 491 go func() { 492 for { 493 ticker <- struct{}{} 494 ns := int64(d) 495 jitter := rand.Int63n(ns) 496 time.Sleep(time.Duration(ns/2 + jitter)) 497 } 498 }() 499 500 go func() { 501 if len(sig) == 0 { 502 return 503 } 504 505 c := make(chan os.Signal, 1) 506 signal.Notify(c, sig...) 507 for range c { 508 ticker <- struct{}{} 509 } 510 }() 511 512 return ticker 513} 514 515// Index starts an index job for repo name at commit. 516func (s *Server) Index(args *indexArgs) (state indexState, err error) { 517 tr := trace.New("index", args.Name) 518 519 defer func() { 520 if err != nil { 521 tr.SetError() 522 tr.LazyPrintf("error: %v", err) 523 state = indexStateFail 524 metricFailingTotal.Inc() 525 } 526 tr.LazyPrintf("state: %s", state) 527 tr.Finish() 528 }() 529 530 tr.LazyPrintf("branches: %v", args.Branches) 531 532 if len(args.Branches) == 0 { 533 return indexStateEmpty, createEmptyShard(args) 534 } 535 536 repositoryName := args.Name 537 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 538 tr.LazyPrintf("marking this repository for delta build") 539 args.UseDelta = true 540 } 541 542 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 543 544 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 545 tr.LazyPrintf("skipping symbols calculation") 546 args.Symbols = false 547 } 548 549 reason := "forced" 550 551 if args.Incremental { 552 bo := args.BuildOptions() 553 bo.SetDefaults() 554 incrementalState, fn := bo.IndexState() 555 reason = string(incrementalState) 556 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 557 558 switch incrementalState { 559 case build.IndexStateEqual: 560 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn) 561 return indexStateNoop, nil 562 563 case build.IndexStateMeta: 564 log.Printf("updating index.meta %s", args.String()) 565 566 if err := mergeMeta(bo); err != nil { 567 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 568 } else { 569 return indexStateSuccessMeta, nil 570 } 571 572 case build.IndexStateCorrupt: 573 log.Printf("falling back to full update: corrupt index: %s", args.String()) 574 } 575 } 576 577 log.Printf("updating index %s reason=%s", args.String(), reason) 578 579 metricIndexingTotal.Inc() 580 c := gitIndexConfig{ 581 runCmd: func(cmd *exec.Cmd) error { 582 return s.loggedRun(tr, cmd) 583 }, 584 585 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 586 return args.BuildOptions().FindRepositoryMetadata() 587 }, 588 } 589 590 err = gitIndex(c, args, s.Sourcegraph, s.logger) 591 if err != nil { 592 return indexStateFail, err 593 } 594 595 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 596 s.logger.Error("failed to update index status", 597 sglog.String("repo", args.Name), 598 sglog.Uint32("id", args.RepoID), 599 sglogBranches("branches", args.Branches), 600 sglog.Error(err), 601 ) 602 } 603 604 return indexStateSuccess, nil 605} 606 607// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 608// it can update the zoekt_repos table. 609func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 610 // We need to read from disk for IndexTime. 611 _, metadata, ok, err := c.findRepositoryMetadata(args) 612 if err != nil { 613 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 614 } 615 if !ok { 616 return errors.New("failed to find metadata for new/updated index") 617 } 618 619 status := []indexStatus{{ 620 RepoID: args.RepoID, 621 Branches: args.Branches, 622 IndexTimeUnix: metadata.IndexTime.Unix(), 623 }} 624 if err := sg.UpdateIndexStatus(status); err != nil { 625 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 626 } 627 628 return nil 629} 630 631func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 632 ss := make([]string, len(branches)) 633 for i, b := range branches { 634 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 635 } 636 return sglog.Strings(key, ss) 637} 638 639func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 640 return &indexArgs{ 641 IndexOptions: opts, 642 643 IndexDir: s.IndexDir, 644 Parallelism: s.CPUCount, 645 646 Incremental: true, 647 648 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 649 FileLimit: 1 << 20, 650 } 651} 652 653func createEmptyShard(args *indexArgs) error { 654 bo := args.BuildOptions() 655 bo.SetDefaults() 656 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 657 658 if args.Incremental && bo.IncrementalSkipIndexing() { 659 return nil 660 } 661 662 builder, err := build.NewBuilder(*bo) 663 if err != nil { 664 return err 665 } 666 return builder.Finish() 667} 668 669// addDebugHandlers adds handlers specific to indexserver. 670func (s *Server) addDebugHandlers(mux *http.ServeMux) { 671 // Sourcegraph's site admin view requires indexserver to serve it's admin view 672 // on "/". 673 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 674 675 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 676 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 677 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 678 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 679 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 680 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 681} 682 683func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 684 if r.Method != "GET" { 685 w.Header().Set("Allow", "GET") 686 w.WriteHeader(http.StatusMethodNotAllowed) 687 return 688 } 689 690 response := struct { 691 Hostname string 692 }{ 693 Hostname: s.hostname, 694 } 695 696 b, err := json.Marshal(response) 697 if err != nil { 698 http.Error(w, err.Error(), http.StatusInternalServerError) 699 return 700 } 701 702 w.Header().Set("Content-Type", "application/json; charset=utf-8") 703 w.Write(b) 704} 705 706var rootTmpl = template.Must(template.New("name").Parse(` 707<html> 708 <body> 709 <a href="debug">Debug</a><br /> 710 <a href="debug/requests">Traces</a><br /> 711 {{.IndexMsg}}<br /> 712 <br /> 713 <h3>Reindex</h3> 714 {{if .Repos}} 715 <a href="?show_repos=false">hide repos</a><br /> 716 <table style="margin-top: 20px"> 717 <th style="text-align:left">Name</th> 718 <th style="text-align:left">ID</th> 719 {{range .Repos}} 720 <tr> 721 <td>{{.Name}}</td> 722 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 723 </tr> 724 {{end}} 725 </table> 726 {{else}} 727 <a href="?show_repos=true">show repos</a><br /> 728 {{end}} 729 </body> 730</html> 731`)) 732 733func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 734 if r.Method != "GET" { 735 w.Header().Set("Allow", "GET") 736 w.WriteHeader(http.StatusMethodNotAllowed) 737 return 738 } 739 740 values := r.URL.Query() 741 742 // ?id= 743 indexMsg := "" 744 if v := values.Get("id"); v != "" { 745 id, err := strconv.Atoi(v) 746 if err != nil { 747 http.Error(w, err.Error(), http.StatusBadRequest) 748 return 749 } 750 indexMsg, _ = s.forceIndex(uint32(id)) 751 } 752 753 // ?show_repos= 754 showRepos := false 755 if v := values.Get("show_repos"); v != "" { 756 showRepos, _ = strconv.ParseBool(v) 757 } 758 759 type Repo struct { 760 ID uint32 761 Name string 762 } 763 var data struct { 764 Repos []Repo 765 IndexMsg string 766 } 767 768 data.IndexMsg = indexMsg 769 770 if showRepos { 771 s.queue.Iterate(func(opts *IndexOptions) { 772 data.Repos = append(data.Repos, Repo{ 773 ID: opts.RepoID, 774 Name: opts.Name, 775 }) 776 }) 777 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 778 } 779 780 _ = rootTmpl.Execute(w, data) 781} 782 783// handleReindex triggers a reindex asynocronously. If a reindex was triggered 784// the request returns with status 202. The caller can infer the new state of 785// the index by calling List. 786func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 787 if r.Method != http.MethodPost { 788 w.Header().Set("Allow", http.MethodPost) 789 w.WriteHeader(http.StatusMethodNotAllowed) 790 return 791 } 792 793 err := r.ParseForm() 794 if err != nil { 795 http.Error(w, err.Error(), http.StatusBadRequest) 796 return 797 } 798 799 id, err := strconv.Atoi(r.Form.Get("repo")) 800 if err != nil { 801 http.Error(w, err.Error(), http.StatusBadRequest) 802 return 803 } 804 805 go func() { s.forceIndex(uint32(id)) }() 806 807 // 202 Accepted 808 w.WriteHeader(http.StatusAccepted) 809} 810 811func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 812 withIndexed := true 813 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 814 withIndexed = b 815 } 816 817 var indexed []uint32 818 if withIndexed { 819 indexed = listIndexed(s.IndexDir) 820 } 821 822 repos, err := s.Sourcegraph.List(r.Context(), indexed) 823 if err != nil { 824 http.Error(w, err.Error(), http.StatusInternalServerError) 825 return 826 } 827 828 bw := bytes.Buffer{} 829 830 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 831 832 _, err = fmt.Fprintf(tw, "ID\tName\n") 833 if err != nil { 834 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 835 return 836 } 837 838 s.queue.mu.Lock() 839 name := "" 840 for _, id := range repos.IDs { 841 if item := s.queue.get(id); item != nil { 842 name = item.opts.Name 843 } else { 844 name = "" 845 } 846 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 847 if err != nil { 848 debug.Printf("handleDebugList: %s\n", err.Error()) 849 } 850 } 851 s.queue.mu.Unlock() 852 853 if err != nil { 854 http.Error(w, err.Error(), http.StatusInternalServerError) 855 return 856 } 857 858 err = tw.Flush() 859 if err != nil { 860 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 861 return 862 } 863 864 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 865 866 if _, err := io.Copy(w, &bw); err != nil { 867 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 868 return 869 } 870} 871 872// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 873// can run this command during periods of low usage (evenings, weekends) to 874// trigger an initial merge run. In the steady-state, merges happen rarely, even 875// on busy instances, and users can rely on automatic merging instead. 876func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 877 878 // A merge operation can take very long, depending on the number merges and the 879 // target size of the compound shards. We run the merge in the background and 880 // return immediately to the user. 881 // 882 // We track the status of the merge with metricShardMergingRunning. 883 go func() { 884 s.doMerge() 885 }() 886 _, _ = w.Write([]byte("merging enqueued\n")) 887} 888 889func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 890 indexed := listIndexed(s.IndexDir) 891 892 bw := bytes.Buffer{} 893 894 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 895 896 _, err := fmt.Fprintf(tw, "ID\tName\n") 897 if err != nil { 898 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 899 return 900 } 901 902 s.queue.mu.Lock() 903 name := "" 904 for _, id := range indexed { 905 if item := s.queue.get(id); item != nil { 906 name = item.opts.Name 907 } else { 908 name = "" 909 } 910 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 911 if err != nil { 912 debug.Printf("handleDebugIndexed: %s\n", err.Error()) 913 } 914 } 915 s.queue.mu.Unlock() 916 917 if err != nil { 918 http.Error(w, err.Error(), http.StatusInternalServerError) 919 return 920 } 921 922 err = tw.Flush() 923 if err != nil { 924 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 925 return 926 } 927 928 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 929 930 if _, err := io.Copy(w, &bw); err != nil { 931 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 932 return 933 } 934} 935 936// forceIndex will run the index job for repo name now. It will return always 937// return a string explaining what it did, even if it failed. 938func (s *Server) forceIndex(id uint32) (string, error) { 939 var opts IndexOptions 940 var err error 941 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 942 opts = o 943 }, func(_ uint32, e error) { 944 err = e 945 }, id) 946 if err != nil { 947 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 948 } 949 950 args := s.indexArgs(opts) 951 args.Incremental = false // force re-index 952 953 var state indexState 954 ran := s.muIndexDir.With(opts.Name, func() { 955 state, err = s.Index(args) 956 }) 957 if !ran { 958 return fmt.Sprintf("index job for repository already running: %s", args), nil 959 } 960 if err != nil { 961 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 962 } 963 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 964} 965 966func listIndexed(indexDir string) []uint32 { 967 index := getShards(indexDir) 968 metricNumIndexed.Set(float64(len(index))) 969 repoIDs := make([]uint32, 0, len(index)) 970 for id := range index { 971 repoIDs = append(repoIDs, id) 972 } 973 sort.Slice(repoIDs, func(i, j int) bool { 974 return repoIDs[i] < repoIDs[j] 975 }) 976 return repoIDs 977} 978 979// setupTmpDir sets up a temporary directory on the same volume as the 980// indexes. 981// 982// If main is true we will delete older temp directories left around. main is 983// false when this is a debug command. 984func setupTmpDir(main bool, index string) error { 985 // change the target tmp directory depending on if its our main daemon or a 986 // debug sub command. 987 dir := ".indexserver.debug.tmp" 988 if main { 989 dir = ".indexserver.tmp" 990 } 991 992 tmpRoot := filepath.Join(index, dir) 993 if err := os.MkdirAll(tmpRoot, 0755); err != nil { 994 return err 995 } 996 if !tmpfriend.IsTmpFriendDir(tmpRoot) { 997 _, err := tmpfriend.RootTempDir(tmpRoot) 998 return err 999 } 1000 return nil 1001} 1002 1003func printMetaData(fn string) error { 1004 repo, indexMeta, err := zoekt.ReadMetadataPath(fn) 1005 if err != nil { 1006 return err 1007 } 1008 1009 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1010 if err != nil { 1011 return err 1012 } 1013 1014 err = json.NewEncoder(os.Stdout).Encode(repo) 1015 if err != nil { 1016 return err 1017 } 1018 return nil 1019} 1020 1021func printShardStats(fn string) error { 1022 f, err := os.Open(fn) 1023 if err != nil { 1024 return err 1025 } 1026 1027 iFile, err := zoekt.NewIndexFile(f) 1028 if err != nil { 1029 return err 1030 } 1031 1032 return zoekt.PrintNgramStats(iFile) 1033} 1034 1035func srcLogLevelIsDebug() bool { 1036 lvl := os.Getenv(sglog.EnvLogLevel) 1037 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1038} 1039 1040func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1041 v := os.Getenv(k) 1042 if v == "" { 1043 return defaultVal 1044 } 1045 i, err := strconv.ParseInt(v, 10, 64) 1046 if err != nil { 1047 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1048 } 1049 return i 1050} 1051 1052func getEnvWithDefaultInt(k string, defaultVal int) int { 1053 v := os.Getenv(k) 1054 if v == "" { 1055 return defaultVal 1056 } 1057 i, err := strconv.Atoi(k) 1058 if err != nil { 1059 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1060 } 1061 return i 1062} 1063 1064func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1065 v := os.Getenv(k) 1066 if v == "" { 1067 return defaultVal 1068 } 1069 i, err := strconv.ParseUint(v, 10, 64) 1070 if err != nil { 1071 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1072 } 1073 return i 1074} 1075 1076func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1077 v := os.Getenv(k) 1078 if v == "" { 1079 return defaultVal 1080 } 1081 f, err := strconv.ParseFloat(v, 64) 1082 if err != nil { 1083 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1084 } 1085 return f 1086} 1087 1088func getEnvWithDefaultString(k string, defaultVal string) string { 1089 v := os.Getenv(k) 1090 if v == "" { 1091 return defaultVal 1092 } 1093 return v 1094} 1095 1096func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1097 v := os.Getenv(k) 1098 if v == "" { 1099 return defaultVal 1100 } 1101 1102 d, err := time.ParseDuration(v) 1103 if err != nil { 1104 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1105 } 1106 return d 1107} 1108 1109func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1110 v := os.Getenv(k) 1111 if v == "" { 1112 return defaultVal 1113 } 1114 1115 b, err := strconv.ParseBool(v) 1116 if err != nil { 1117 log.Fatalf("error parsing ENV %s to bool: %s", k, err) 1118 } 1119 return b 1120} 1121 1122func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1123 set := map[string]struct{}{} 1124 for _, v := range strings.Split(os.Getenv(k), ",") { 1125 v = strings.TrimSpace(v) 1126 if v != "" { 1127 set[v] = struct{}{} 1128 } 1129 } 1130 return set 1131} 1132 1133func joinStringSet(set map[string]struct{}, sep string) string { 1134 var xs []string 1135 for x := range set { 1136 xs = append(xs, x) 1137 } 1138 1139 return strings.Join(xs, sep) 1140} 1141 1142func setCompoundShardCounter(indexDir string) { 1143 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt")) 1144 if err != nil { 1145 log.Printf("setCompoundShardCounter: %s\n", err) 1146 return 1147 } 1148 metricNumberCompoundShards.Set(float64(len(fns))) 1149} 1150 1151func rootCmd() *ffcli.Command { 1152 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1153 conf := rootConfig{ 1154 Main: true, 1155 } 1156 conf.registerRootFlags(rootFs) 1157 1158 return &ffcli.Command{ 1159 FlagSet: rootFs, 1160 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1161 Subcommands: []*ffcli.Command{debugCmd()}, 1162 Exec: func(ctx context.Context, args []string) error { 1163 return startServer(conf) 1164 }, 1165 } 1166} 1167 1168type rootConfig struct { 1169 // Main is true if this rootConfig is for our main long running command (the 1170 // indexserver). Debug commands should not set this value. This is used to 1171 // determine if we need to run tmpfriend. 1172 Main bool 1173 1174 root string 1175 interval time.Duration 1176 index string 1177 indexConcurrency int64 1178 listen string 1179 hostname string 1180 cpuFraction float64 1181 blockProfileRate int 1182 1183 // config values related to shard merging 1184 vacuumInterval time.Duration 1185 mergeInterval time.Duration 1186 targetSize int64 1187 minSize int64 1188 minAgeDays int 1189 maxPriority float64 1190 1191 // config values related to backoff indexing repos with one or more consecutive failures 1192 backoffDuration time.Duration 1193 maxBackoffDuration time.Duration 1194 1195 // useGRPC is true if we should use the gRPC API to talk to Sourcegraph. 1196 useGRPC bool 1197} 1198 1199func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1200 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1201 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1202 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of concurrent index jobs to run.") 1203 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1204 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1205 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1206 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1207 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate") 1208 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1209 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1210 fs.BoolVar(&rc.useGRPC, "use_grpc", getEnvWithDefaultBool("GRPC_ENABLED", false), "use the gRPC API to talk to Sourcegraph") 1211 1212 // flags related to shard merging 1213 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1214 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1215 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 1216 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 1217 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1218 fs.Float64Var(&rc.maxPriority, "merge_max_priority", getEnvWithDefaultFloat64("SRC_MERGE_MAX_PRIORITY", 100), "the maximum priority a shard can have to be considered for merging.") 1219} 1220 1221func startServer(conf rootConfig) error { 1222 s, err := newServer(conf) 1223 if err != nil { 1224 return err 1225 } 1226 1227 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate) 1228 setCompoundShardCounter(s.IndexDir) 1229 1230 if conf.listen != "" { 1231 1232 mux := http.NewServeMux() 1233 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1234 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1235 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1236 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1237 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1238 }...) 1239 s.addDebugHandlers(mux) 1240 1241 go func() { 1242 debug.Printf("serving HTTP on %s", conf.listen) 1243 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1244 1245 }() 1246 1247 // Serve mux on a unix domain socket on a best-effort-basis so that 1248 // webserver can call the endpoints via the shared filesystem. 1249 // 1250 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1251 // on the socket due to permission errors. See 1252 // https://github.com/docker/for-mac/issues/6239 1253 go func() { 1254 serveHTTPOverSocket := func() error { 1255 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1256 // We cannot bind a socket to an existing pathname. 1257 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1258 return fmt.Errorf("error removing socket file: %s", socket) 1259 } 1260 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1261 // unixpacket). 1262 l, err := net.Listen("unix", socket) 1263 if err != nil { 1264 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1265 } 1266 // Indexserver (root) and webserver (Sourcegraph) run with 1267 // different users. Per default, the socket is created with 1268 // permission 755 (root root), which doesn't let webserver write to 1269 // it. 1270 // 1271 // See https://github.com/golang/go/issues/11822 for more context. 1272 if err := os.Chmod(socket, 0777); err != nil { 1273 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1274 } 1275 debug.Printf("serving HTTP on %s", socket) 1276 return http.Serve(l, mux) 1277 } 1278 debug.Print(serveHTTPOverSocket()) 1279 }() 1280 } 1281 1282 oc := &ownerChecker{ 1283 Path: filepath.Join(conf.index, "owner.txt"), 1284 Hostname: conf.hostname, 1285 } 1286 go oc.Run() 1287 1288 logger := sglog.Scoped("metricsRegistration", "") 1289 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1290 1291 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1292 prometheus.DefaultRegisterer.MustRegister(c) 1293 1294 s.Run() 1295 return nil 1296} 1297 1298func newServer(conf rootConfig) (*Server, error) { 1299 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1300 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1301 } 1302 if conf.index == "" { 1303 return nil, fmt.Errorf("must set -index") 1304 } 1305 if conf.root == "" { 1306 return nil, fmt.Errorf("must set -sourcegraph_url") 1307 } 1308 rootURL, err := url.Parse(conf.root) 1309 if err != nil { 1310 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1311 } 1312 1313 rootURL = addDefaultPort(rootURL) 1314 1315 // Tune GOMAXPROCS to match Linux container CPU quota. 1316 _, _ = maxprocs.Set() 1317 1318 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler. 1319 // The block profiler is disabled by default and should be enabled with care in production 1320 runtime.SetBlockProfileRate(conf.blockProfileRate) 1321 1322 // Automatically prepend our own path at the front, to minimize 1323 // required configuration. 1324 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1325 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1326 } 1327 1328 if _, err := os.Stat(conf.index); err != nil { 1329 if err := os.MkdirAll(conf.index, 0755); err != nil { 1330 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1331 } 1332 } 1333 1334 if err := setupTmpDir(conf.Main, conf.index); err != nil { 1335 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1336 } 1337 1338 if srcLogLevelIsDebug() { 1339 debug = log.New(os.Stderr, "", log.LstdFlags) 1340 } 1341 1342 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1343 if len(reposWithSeparateIndexingMetrics) > 0 { 1344 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1345 } 1346 1347 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1348 if len(deltaBuildRepositoriesAllowList) > 0 { 1349 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1350 } 1351 1352 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1353 if deltaShardNumberFallbackThreshold > 0 { 1354 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1355 } else { 1356 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1357 } 1358 1359 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1360 if len(reposShouldSkipSymbolsCalculation) > 0 { 1361 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1362 } 1363 1364 var sg Sourcegraph 1365 if rootURL.IsAbs() { 1366 var batchSize int 1367 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1368 batchSize, err = strconv.Atoi(v) 1369 if err != nil { 1370 return nil, fmt.Errorf("Invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1371 } 1372 } 1373 1374 opts := []SourcegraphClientOption{ 1375 WithBatchSize(batchSize), 1376 WithShouldUseGRPC(conf.useGRPC), 1377 } 1378 1379 logger := sglog.Scoped("zoektConfigurationGRPCClient", "") 1380 client, err := dialGRPCClient(rootURL.Host, logger) 1381 if err != nil { 1382 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1383 } 1384 1385 opts = append(opts, WithGRPCClient(client)) 1386 sg = newSourcegraphClient(rootURL, conf.hostname, opts...) 1387 1388 } else { 1389 sg = sourcegraphFake{ 1390 RootDir: rootURL.String(), 1391 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1392 } 1393 } 1394 1395 if conf.indexConcurrency < 1 { 1396 conf.indexConcurrency = 1 1397 } 1398 1399 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1400 if cpuCount < 1 { 1401 cpuCount = 1 1402 } 1403 1404 logger := sglog.Scoped("server", "periodically reindexes enabled repositories on sourcegraph") 1405 1406 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1407 1408 return &Server{ 1409 logger: logger, 1410 Sourcegraph: sg, 1411 IndexDir: conf.index, 1412 IndexConcurrency: int(conf.indexConcurrency), 1413 Interval: conf.interval, 1414 CPUCount: cpuCount, 1415 queue: *q, 1416 shardMerging: zoekt.ShardMergingEnabled(), 1417 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1418 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1419 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1420 hostname: conf.hostname, 1421 mergeOpts: mergeOpts{ 1422 vacuumInterval: conf.vacuumInterval, 1423 mergeInterval: conf.mergeInterval, 1424 targetSizeBytes: conf.targetSize * 1024 * 1024, 1425 minSizeBytes: conf.minSize * 1024 * 1024, 1426 minAgeDays: conf.minAgeDays, 1427 maxPriority: conf.maxPriority, 1428 }, 1429 }, err 1430} 1431 1432// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1433// for the indexed-search-configuration gRPC service. 1434// 1435// The default backoff strategy is modeled after the default settings used by 1436// retryablehttp.DefaultClient. 1437// 1438// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1439// - Unavailable 1440// - Aborted 1441// 1442//go:embed default_grpc_service_configuration.json 1443var defaultGRPCServiceConfigurationJSON string 1444 1445func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1446 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1447 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1448 return invoker(ctx, method, req, reply, cc, opts...) 1449 } 1450} 1451 1452func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1453 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1454 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1455 return streamer(ctx, desc, cc, method, opts...) 1456 } 1457} 1458 1459// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1460// This can be overridden by providing custom Server/Dial options. 1461const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1462 1463func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) { 1464 metrics := mustGetClientMetrics() 1465 1466 opts := []grpc.DialOption{ 1467 grpc.WithTransportCredentials(insecure.NewCredentials()), 1468 grpc.WithChainStreamInterceptor( 1469 grpc_prometheus.StreamClientInterceptor(metrics), 1470 internalActorStreamInterceptor(), 1471 internalerrs.LoggingStreamClientInterceptor(logger), 1472 internalerrs.PrometheusStreamClientInterceptor, 1473 ), 1474 grpc.WithChainUnaryInterceptor( 1475 grpc_prometheus.UnaryClientInterceptor(metrics), 1476 internalActorUnaryInterceptor(), 1477 internalerrs.LoggingUnaryClientInterceptor(logger), 1478 internalerrs.PrometheusUnaryClientInterceptor, 1479 ), 1480 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1481 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1482 } 1483 1484 opts = append(opts, additionalOpts...) 1485 1486 // Ensure that the message size options are set last, so they override any other 1487 // client-specific options that tweak the message size. 1488 // 1489 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1490 // take precedence over everything else with a uniform size setting that's easy to reason about. 1491 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1492 1493 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1494 // This is done lazily, so we can provide the client to use regardless of 1495 // whether we enabled gRPC or not initially. 1496 cc, err := grpc.Dial(addr, opts...) 1497 if err != nil { 1498 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1499 } 1500 1501 client := proto.NewZoektConfigurationServiceClient(cc) 1502 return client, nil 1503} 1504 1505// mustGetClientMetrics returns a singleton instance of the client metrics 1506// that are shared across all gRPC clients that this process creates. 1507// 1508// This function panics if the metrics cannot be registered with the default 1509// Prometheus registry. 1510func mustGetClientMetrics() *grpc_prometheus.ClientMetrics { 1511 clientMetricsOnce.Do(func() { 1512 clientMetrics = grpc_prometheus.NewRegisteredClientMetrics(prometheus.DefaultRegisterer, 1513 grpc_prometheus.WithClientCounterOptions(), 1514 grpc_prometheus.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 1515 grpc_prometheus.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 1516 grpc_prometheus.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 1517 ) 1518 }) 1519 1520 return clientMetrics 1521} 1522 1523// addDefaultPort adds a default port to a URL if one is not specified. 1524// 1525// If the URL scheme is "http" and no port is specified, "80" is used. 1526// If the scheme is "https", "443" is used. 1527// 1528// The original URL is not mutated. A copy is modified and returned. 1529func addDefaultPort(original *url.URL) *url.URL { 1530 if original == nil { 1531 return nil // don't panic 1532 } 1533 1534 if !original.IsAbs() { 1535 return original // don't do anything if the URL doesn't look like a remote URL 1536 } 1537 1538 if original.Scheme == "http" && original.Port() == "" { 1539 u := cloneURL(original) 1540 u.Host = net.JoinHostPort(u.Host, "80") 1541 return u 1542 } 1543 1544 if original.Scheme == "https" && original.Port() == "" { 1545 u := cloneURL(original) 1546 u.Host = net.JoinHostPort(u.Host, "443") 1547 return u 1548 } 1549 1550 return original 1551} 1552 1553// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1554// This is copied from net/http/clone.go 1555func cloneURL(u *url.URL) *url.URL { 1556 if u == nil { 1557 return nil 1558 } 1559 u2 := new(url.URL) 1560 *u2 = *u 1561 if u.User != nil { 1562 u2.User = new(url.Userinfo) 1563 *u2.User = *u.User 1564 } 1565 return u2 1566} 1567 1568func main() { 1569 liblog := sglog.Init(sglog.Resource{ 1570 Name: "zoekt-indexserver", 1571 Version: zoekt.Version, 1572 InstanceID: zoekt.HostnameBestEffort(), 1573 }) 1574 defer liblog.Sync() 1575 1576 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1577 log.Fatal(err) 1578 } 1579}