fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled 2// repositories on sourcegraph 3package main 4 5import ( 6 "bytes" 7 "context" 8 _ "embed" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "html/template" 14 "io" 15 "io/fs" 16 "log" 17 "math" 18 "math/rand" 19 "net" 20 "net/http" 21 "net/url" 22 "os" 23 "os/exec" 24 "os/signal" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strconv" 29 "strings" 30 "sync" 31 "text/tabwriter" 32 "time" 33 34 "github.com/keegancsmith/tmpfriend" 35 "github.com/peterbourgon/ff/v3/ffcli" 36 "github.com/prometheus/client_golang/prometheus" 37 "github.com/prometheus/client_golang/prometheus/promauto" 38 sglog "github.com/sourcegraph/log" 39 "go.uber.org/automaxprocs/maxprocs" 40 "golang.org/x/net/trace" 41 "golang.org/x/sys/unix" 42 "google.golang.org/grpc" 43 "google.golang.org/grpc/credentials/insecure" 44 "google.golang.org/grpc/metadata" 45 46 "github.com/sourcegraph/mountinfo" 47 48 "github.com/sourcegraph/zoekt" 49 "github.com/sourcegraph/zoekt/build" 50 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 51 "github.com/sourcegraph/zoekt/debugserver" 52 "github.com/sourcegraph/zoekt/internal/profiler" 53) 54 55var ( 56 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 57 Name: "resolve_revisions_seconds", 58 Help: "A histogram of latencies for resolving all repository revisions.", 59 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 60 }) 61 62 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 63 Name: "resolve_revision_seconds", 64 Help: "A histogram of latencies for resolving a repository revision.", 65 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 66 }, []string{"success"}) // success=true|false 67 68 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 69 Name: "get_index_options_total", 70 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 71 }) 72 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 73 Name: "get_index_options_error_total", 74 Help: "The total number of times we failed to get index options for a repository.", 75 }) 76 77 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 78 Name: "index_repo_seconds", 79 Help: "A histogram of latencies for indexing a repository.", 80 Buckets: prometheus.ExponentialBucketsRange( 81 (100 * time.Millisecond).Seconds(), 82 (40*time.Minute + indexTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 83 20), 84 }, []string{ 85 "state", // state is an indexState 86 "name", // name of the repository that was indexed 87 }) 88 89 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 90 Name: "index_fetch_seconds", 91 Help: "A histogram of latencies for fetching a repository.", 92 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 93 }, []string{ 94 "success", // true|false 95 "name", // the name of the repository that the commits were fetched from 96 }) 97 98 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 99 Name: "index_incremental_index_state", 100 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 101 }, []string{"state"}) // state is build.IndexState 102 103 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 104 Name: "index_num_indexed", 105 Help: "Number of indexed repos by code host", 106 }) 107 108 metricNumAssigned = promauto.NewGauge(prometheus.GaugeOpts{ 109 Name: "index_num_assigned", 110 Help: "Number of repos assigned to this indexer by code host", 111 }) 112 113 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 114 Name: "index_failing_total", 115 Help: "Counts failures to index (indexing activity, should be used with rate())", 116 }) 117 118 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 119 Name: "index_indexing_total", 120 Help: "Counts indexings (indexing activity, should be used with rate())", 121 }) 122 123 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 124 Name: "index_num_stopped_tracking_total", 125 Help: "Counts the number of repos we stopped tracking.", 126 }) 127) 128 129// set of repositories that we want to capture separate indexing metrics for 130var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 131 132type indexState string 133 134const ( 135 indexStateFail indexState = "fail" 136 indexStateSuccess indexState = "success" 137 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 138 indexStateNoop indexState = "noop" // We didn't need to update index 139 indexStateEmpty indexState = "empty" // index is empty (empty repo) 140) 141 142// Server is the main functionality of zoekt-sourcegraph-indexserver. It 143// exists to conveniently use all the options passed in via func main. 144type Server struct { 145 logger sglog.Logger 146 147 Sourcegraph Sourcegraph 148 BatchSize int 149 150 // IndexDir is the index directory to use. 151 IndexDir string 152 153 // IndexConcurrency is the number of repositories we index at once. 154 IndexConcurrency int 155 156 // Interval is how often we sync with Sourcegraph. 157 Interval time.Duration 158 // CPUCount is the amount of parallelism to use when indexing a 159 // repository. 160 CPUCount int 161 162 queue Queue 163 164 // muIndexDir protects the index directory from concurrent access. 165 muIndexDir indexMutex 166 167 // If true, shard merging is enabled. 168 shardMerging bool 169 170 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 171 // use delta-builds for instead of normal builds 172 deltaBuildRepositoriesAllowList map[string]struct{} 173 174 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 175 // before attempting a delta build. 176 deltaShardNumberFallbackThreshold uint64 177 178 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 179 // we skip calculating symbols metadata for during builds 180 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 181 182 hostname string 183 184 mergeOpts mergeOpts 185} 186 187var debug = log.New(io.Discard, "", log.LstdFlags) 188 189// our index commands should output something every 100mb they process. 190// 191// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 192// time." famous last words. A client was indexing a monorepo with 42 193// cores... 5m was not enough. 194const noOutputTimeout = 30 * time.Minute 195 196func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 197 out := &synchronizedBuffer{} 198 cmd.Stdout = out 199 cmd.Stderr = out 200 201 tr.LazyPrintf("%s", cmd.Args) 202 203 defer func() { 204 if err != nil { 205 outS := out.String() 206 tr.LazyPrintf("failed: %v", err) 207 tr.LazyPrintf("output: %s", out) 208 tr.SetError() 209 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 210 } 211 }() 212 213 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 214 215 if err := cmd.Start(); err != nil { 216 return err 217 } 218 219 errC := make(chan error) 220 go func() { 221 errC <- cmd.Wait() 222 }() 223 224 // This channel is set after we have sent sigquit. It allows us to follow up 225 // with a sigkill if the process doesn't quit after sigquit. 226 kill := make(<-chan time.Time) 227 228 lastLen := 0 229 for { 230 select { 231 case <-time.After(noOutputTimeout): 232 // Periodically check if we have had output. If not kill the process. 233 if out.Len() != lastLen { 234 lastLen = out.Len() 235 log.Printf("still running %s", cmd.Args) 236 } else { 237 // Send quit (C-\) first so we get a stack dump. 238 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 239 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 240 log.Println("quit failed:", err) 241 } 242 243 // send sigkill if still running in 10s 244 kill = time.After(10 * time.Second) 245 } 246 247 case <-kill: 248 log.Printf("still running, killing %s", cmd.Args) 249 if err := cmd.Process.Kill(); err != nil { 250 log.Println("kill failed:", err) 251 } 252 253 case err := <-errC: 254 if err != nil { 255 return err 256 } 257 258 tr.LazyPrintf("success") 259 return nil 260 } 261 } 262} 263 264// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 265// monitor the buffer while it is being written to. 266type synchronizedBuffer struct { 267 mu sync.Mutex 268 b bytes.Buffer 269} 270 271func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 272 sb.mu.Lock() 273 defer sb.mu.Unlock() 274 return sb.b.Write(p) 275} 276 277func (sb *synchronizedBuffer) Len() int { 278 sb.mu.Lock() 279 defer sb.mu.Unlock() 280 return sb.b.Len() 281} 282 283func (sb *synchronizedBuffer) String() string { 284 sb.mu.Lock() 285 defer sb.mu.Unlock() 286 return sb.b.String() 287} 288 289// pauseFileName if present in IndexDir will stop index jobs from 290// running. This is to make it possible to experiment with the content of the 291// IndexDir without the indexserver writing to it. 292const pauseFileName = "PAUSE" 293 294// Run the sync loop. This blocks forever. 295func (s *Server) Run() { 296 removeIncompleteShards(s.IndexDir) 297 298 // Start a goroutine which updates the queue with commits to index. 299 go func() { 300 // We update the list of indexed repos every Interval. To speed up manual 301 // testing we also listen for SIGUSR1 to trigger updates. 302 // 303 // "pkill -SIGUSR1 zoekt-sourcegra" 304 for range jitterTicker(s.Interval, unix.SIGUSR1) { 305 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 306 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 307 continue 308 } 309 310 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 311 if err != nil { 312 log.Printf("error listing repos: %s", err) 313 continue 314 } 315 316 debug.Printf("updating index queue with %d repositories", len(repos.IDs)) 317 318 // Stop indexing repos we don't need to track anymore 319 removed := s.queue.MaybeRemoveMissing(repos.IDs) 320 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 321 if len(removed) > 0 { 322 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 323 } 324 325 cleanupDone := make(chan struct{}) 326 go func() { 327 defer close(cleanupDone) 328 s.muIndexDir.Global(func() { 329 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 330 }) 331 }() 332 333 repos.IterateIndexOptions(s.queue.AddOrUpdate) 334 335 // IterateIndexOptions will only iterate over repositories that have 336 // changed since we last called list. However, we want to add all IDs 337 // back onto the queue just to check that what is on disk is still 338 // correct. This will use the last IndexOptions we stored in the 339 // queue. The repositories not on the queue (missing) need a forced 340 // fetch of IndexOptions. 341 missing := s.queue.Bump(repos.IDs) 342 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 343 344 setCompoundShardCounter(s.IndexDir) 345 346 <-cleanupDone 347 } 348 }() 349 350 go func() { 351 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 352 if s.shardMerging { 353 s.vacuum() 354 } 355 } 356 }() 357 358 go func() { 359 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 360 if s.shardMerging { 361 s.doMerge() 362 } 363 } 364 }() 365 366 for i := 0; i < s.IndexConcurrency; i++ { 367 go s.processQueue() 368 } 369 370 // block forever 371 select {} 372} 373 374// formatList returns a comma-separated list of the first min(len(v), m) items. 375func formatListUint32(v []uint32, m int) string { 376 if len(v) < m { 377 m = len(v) 378 } 379 380 sb := strings.Builder{} 381 for i := 0; i < m; i++ { 382 fmt.Fprintf(&sb, "%d, ", v[i]) 383 } 384 385 if len(v) > m { 386 sb.WriteString("...") 387 } 388 389 return strings.TrimRight(sb.String(), ", ") 390} 391 392func (s *Server) processQueue() { 393 for { 394 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 395 time.Sleep(time.Second) 396 continue 397 } 398 399 opts, ok := s.queue.Pop() 400 if !ok { 401 time.Sleep(time.Second) 402 continue 403 } 404 405 args := s.indexArgs(opts) 406 407 ran := s.muIndexDir.With(opts.Name, func() { 408 // only record time taken once we hold the lock. This avoids us 409 // recording time taken while merging/cleanup runs. 410 start := time.Now() 411 412 state, err := s.Index(args) 413 414 elapsed := time.Since(start) 415 416 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 417 418 if err != nil { 419 log.Printf("error indexing %s: %s", args.String(), err) 420 } 421 422 switch state { 423 case indexStateSuccess: 424 var branches []string 425 for _, b := range args.Branches { 426 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 427 } 428 s.logger.Info("updated index", 429 sglog.String("repo", args.Name), 430 sglog.Uint32("id", args.RepoID), 431 sglog.Strings("branches", branches), 432 sglog.Duration("duration", elapsed), 433 ) 434 case indexStateSuccessMeta: 435 log.Printf("updated meta %s in %v", args.String(), elapsed) 436 } 437 s.queue.SetIndexed(opts, state) 438 }) 439 440 if !ran { 441 // Someone else is processing the repository. We can just skip this job 442 // since the repository will be added back to the queue and we will 443 // converge to the correct behaviour. 444 debug.Printf("index job for repository already running: %s", args) 445 continue 446 } 447 } 448} 449 450// repoNameForMetric returns a normalized version of the given repository name that is 451// suitable for use with Prometheus metrics. 452func repoNameForMetric(repo string) string { 453 // Check to see if we want to be able to capture separate indexing metrics for this repository. 454 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 455 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 456 return repo 457 } 458 459 return "" 460} 461 462func batched(slice []uint32, size int) <-chan []uint32 { 463 c := make(chan []uint32) 464 go func() { 465 for len(slice) > 0 { 466 if size > len(slice) { 467 size = len(slice) 468 } 469 c <- slice[:size] 470 slice = slice[size:] 471 } 472 close(c) 473 }() 474 return c 475} 476 477// jitterTicker returns a ticker which ticks with a jitter. Each tick is 478// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 479// 480// sig is a list of signals which also cause the ticker to fire. This is a 481// convenience to allow manually triggering of the ticker. 482func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 483 ticker := make(chan struct{}) 484 485 go func() { 486 for { 487 ticker <- struct{}{} 488 ns := int64(d) 489 jitter := rand.Int63n(ns) 490 time.Sleep(time.Duration(ns/2 + jitter)) 491 } 492 }() 493 494 go func() { 495 if len(sig) == 0 { 496 return 497 } 498 499 c := make(chan os.Signal, 1) 500 signal.Notify(c, sig...) 501 for range c { 502 ticker <- struct{}{} 503 } 504 }() 505 506 return ticker 507} 508 509// Index starts an index job for repo name at commit. 510func (s *Server) Index(args *indexArgs) (state indexState, err error) { 511 tr := trace.New("index", args.Name) 512 513 defer func() { 514 if err != nil { 515 tr.SetError() 516 tr.LazyPrintf("error: %v", err) 517 state = indexStateFail 518 metricFailingTotal.Inc() 519 } 520 tr.LazyPrintf("state: %s", state) 521 tr.Finish() 522 }() 523 524 tr.LazyPrintf("branches: %v", args.Branches) 525 526 if len(args.Branches) == 0 { 527 return indexStateEmpty, createEmptyShard(args) 528 } 529 530 repositoryName := args.Name 531 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 532 tr.LazyPrintf("marking this repository for delta build") 533 args.UseDelta = true 534 } 535 536 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 537 538 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 539 tr.LazyPrintf("skipping symbols calculation") 540 args.Symbols = false 541 } 542 543 reason := "forced" 544 545 if args.Incremental { 546 bo := args.BuildOptions() 547 bo.SetDefaults() 548 incrementalState, fn := bo.IndexState() 549 reason = string(incrementalState) 550 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 551 552 switch incrementalState { 553 case build.IndexStateEqual: 554 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn) 555 return indexStateNoop, nil 556 557 case build.IndexStateMeta: 558 log.Printf("updating index.meta %s", args.String()) 559 560 if err := mergeMeta(bo); err != nil { 561 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 562 } else { 563 return indexStateSuccessMeta, nil 564 } 565 566 case build.IndexStateCorrupt: 567 log.Printf("falling back to full update: corrupt index: %s", args.String()) 568 } 569 } 570 571 log.Printf("updating index %s reason=%s", args.String(), reason) 572 573 metricIndexingTotal.Inc() 574 c := gitIndexConfig{ 575 runCmd: func(cmd *exec.Cmd) error { 576 return s.loggedRun(tr, cmd) 577 }, 578 579 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, ok bool, err error) { 580 return args.BuildOptions().FindRepositoryMetadata() 581 }, 582 } 583 584 err = gitIndex(c, args, s.Sourcegraph, s.logger) 585 if err != nil { 586 return indexStateFail, err 587 } 588 589 status := []indexStatus{{RepoID: args.RepoID, Branches: args.Branches}} 590 if err := s.Sourcegraph.UpdateIndexStatus(status); err != nil { 591 branches := make([]string, len(args.Branches)) 592 for i, b := range args.Branches { 593 branches[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 594 } 595 596 s.logger.Error("failed to update index status", 597 sglog.String("repo", args.Name), 598 sglog.Uint32("id", args.RepoID), 599 sglog.Strings("branches", branches), 600 sglog.Error(err), 601 ) 602 } 603 604 return indexStateSuccess, nil 605} 606 607func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 608 return &indexArgs{ 609 IndexOptions: opts, 610 611 IndexDir: s.IndexDir, 612 Parallelism: s.CPUCount, 613 614 Incremental: true, 615 616 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 617 FileLimit: 1 << 20, 618 } 619} 620 621func createEmptyShard(args *indexArgs) error { 622 bo := args.BuildOptions() 623 bo.SetDefaults() 624 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 625 626 if args.Incremental && bo.IncrementalSkipIndexing() { 627 return nil 628 } 629 630 builder, err := build.NewBuilder(*bo) 631 if err != nil { 632 return err 633 } 634 return builder.Finish() 635} 636 637// addDebugHandlers adds handlers specific to indexserver. 638func (s *Server) addDebugHandlers(mux *http.ServeMux) { 639 // Sourcegraph's site admin view requires indexserver to serve it's admin view 640 // on "/". 641 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 642 643 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 644 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 645 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 646 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 647 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 648 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 649} 650 651func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 652 if r.Method != "GET" { 653 w.Header().Set("Allow", "GET") 654 w.WriteHeader(http.StatusMethodNotAllowed) 655 return 656 } 657 658 response := struct { 659 Hostname string 660 }{ 661 Hostname: s.hostname, 662 } 663 664 b, err := json.Marshal(response) 665 if err != nil { 666 http.Error(w, err.Error(), http.StatusInternalServerError) 667 return 668 } 669 670 w.Header().Set("Content-Type", "application/json; charset=utf-8") 671 w.Write(b) 672} 673 674var rootTmpl = template.Must(template.New("name").Parse(` 675<html> 676 <body> 677 <a href="debug">Debug</a><br /> 678 <a href="debug/requests">Traces</a><br /> 679 {{.IndexMsg}}<br /> 680 <br /> 681 <h3>Reindex</h3> 682 {{if .Repos}} 683 <a href="?show_repos=false">hide repos</a><br /> 684 <table style="margin-top: 20px"> 685 <th style="text-align:left">Name</th> 686 <th style="text-align:left">ID</th> 687 {{range .Repos}} 688 <tr> 689 <td>{{.Name}}</td> 690 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 691 </tr> 692 {{end}} 693 </table> 694 {{else}} 695 <a href="?show_repos=true">show repos</a><br /> 696 {{end}} 697 </body> 698</html> 699`)) 700 701func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 702 if r.Method != "GET" { 703 w.Header().Set("Allow", "GET") 704 w.WriteHeader(http.StatusMethodNotAllowed) 705 return 706 } 707 708 values := r.URL.Query() 709 710 // ?id= 711 indexMsg := "" 712 if v := values.Get("id"); v != "" { 713 id, err := strconv.Atoi(v) 714 if err != nil { 715 http.Error(w, err.Error(), http.StatusBadRequest) 716 return 717 } 718 indexMsg, _ = s.forceIndex(uint32(id)) 719 } 720 721 // ?show_repos= 722 showRepos := false 723 if v := values.Get("show_repos"); v != "" { 724 showRepos, _ = strconv.ParseBool(v) 725 } 726 727 type Repo struct { 728 ID uint32 729 Name string 730 } 731 var data struct { 732 Repos []Repo 733 IndexMsg string 734 } 735 736 data.IndexMsg = indexMsg 737 738 if showRepos { 739 s.queue.Iterate(func(opts *IndexOptions) { 740 data.Repos = append(data.Repos, Repo{ 741 ID: opts.RepoID, 742 Name: opts.Name, 743 }) 744 }) 745 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 746 } 747 748 _ = rootTmpl.Execute(w, data) 749} 750 751// handleReindex triggers a reindex asynocronously. If a reindex was triggered 752// the request returns with status 202. The caller can infer the new state of 753// the index by calling List. 754func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 755 if r.Method != http.MethodPost { 756 w.Header().Set("Allow", http.MethodPost) 757 w.WriteHeader(http.StatusMethodNotAllowed) 758 return 759 } 760 761 err := r.ParseForm() 762 if err != nil { 763 http.Error(w, err.Error(), http.StatusBadRequest) 764 return 765 } 766 767 id, err := strconv.Atoi(r.Form.Get("repo")) 768 if err != nil { 769 http.Error(w, err.Error(), http.StatusBadRequest) 770 return 771 } 772 773 go func() { s.forceIndex(uint32(id)) }() 774 775 // 202 Accepted 776 w.WriteHeader(http.StatusAccepted) 777} 778 779func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 780 withIndexed := true 781 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 782 withIndexed = b 783 } 784 785 var indexed []uint32 786 if withIndexed { 787 indexed = listIndexed(s.IndexDir) 788 } 789 790 repos, err := s.Sourcegraph.List(r.Context(), indexed) 791 if err != nil { 792 http.Error(w, err.Error(), http.StatusInternalServerError) 793 return 794 } 795 796 bw := bytes.Buffer{} 797 798 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 799 800 _, err = fmt.Fprintf(tw, "ID\tName\n") 801 if err != nil { 802 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 803 return 804 } 805 806 s.queue.mu.Lock() 807 name := "" 808 for _, id := range repos.IDs { 809 if item := s.queue.get(id); item != nil { 810 name = item.opts.Name 811 } else { 812 name = "" 813 } 814 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 815 if err != nil { 816 debug.Printf("handleDebugList: %s\n", err.Error()) 817 } 818 } 819 s.queue.mu.Unlock() 820 821 if err != nil { 822 http.Error(w, err.Error(), http.StatusInternalServerError) 823 return 824 } 825 826 err = tw.Flush() 827 if err != nil { 828 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 829 return 830 } 831 832 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 833 834 if _, err := io.Copy(w, &bw); err != nil { 835 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 836 return 837 } 838} 839 840// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 841// can run this command during periods of low usage (evenings, weekends) to 842// trigger an initial merge run. In the steady-state, merges happen rarely, even 843// on busy instances, and users can rely on automatic merging instead. 844func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 845 846 // A merge operation can take very long, depending on the number merges and the 847 // target size of the compound shards. We run the merge in the background and 848 // return immediately to the user. 849 // 850 // We track the status of the merge with metricShardMergingRunning. 851 go func() { 852 s.doMerge() 853 }() 854 _, _ = w.Write([]byte("merging enqueued\n")) 855} 856 857func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 858 indexed := listIndexed(s.IndexDir) 859 860 bw := bytes.Buffer{} 861 862 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 863 864 _, err := fmt.Fprintf(tw, "ID\tName\n") 865 if err != nil { 866 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 867 return 868 } 869 870 s.queue.mu.Lock() 871 name := "" 872 for _, id := range indexed { 873 if item := s.queue.get(id); item != nil { 874 name = item.opts.Name 875 } else { 876 name = "" 877 } 878 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 879 if err != nil { 880 debug.Printf("handleDebugIndexed: %s\n", err.Error()) 881 } 882 } 883 s.queue.mu.Unlock() 884 885 if err != nil { 886 http.Error(w, err.Error(), http.StatusInternalServerError) 887 return 888 } 889 890 err = tw.Flush() 891 if err != nil { 892 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 893 return 894 } 895 896 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 897 898 if _, err := io.Copy(w, &bw); err != nil { 899 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 900 return 901 } 902} 903 904// forceIndex will run the index job for repo name now. It will return always 905// return a string explaining what it did, even if it failed. 906func (s *Server) forceIndex(id uint32) (string, error) { 907 var opts IndexOptions 908 var err error 909 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 910 opts = o 911 }, func(_ uint32, e error) { 912 err = e 913 }, id) 914 if err != nil { 915 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 916 } 917 918 args := s.indexArgs(opts) 919 args.Incremental = false // force re-index 920 921 var state indexState 922 ran := s.muIndexDir.With(opts.Name, func() { 923 state, err = s.Index(args) 924 }) 925 if !ran { 926 return fmt.Sprintf("index job for repository already running: %s", args), nil 927 } 928 if err != nil { 929 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 930 } 931 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 932} 933 934func listIndexed(indexDir string) []uint32 { 935 index := getShards(indexDir) 936 metricNumIndexed.Set(float64(len(index))) 937 repoIDs := make([]uint32, 0, len(index)) 938 for id := range index { 939 repoIDs = append(repoIDs, id) 940 } 941 sort.Slice(repoIDs, func(i, j int) bool { 942 return repoIDs[i] < repoIDs[j] 943 }) 944 return repoIDs 945} 946 947func hostnameBestEffort() string { 948 if h := os.Getenv("NODE_NAME"); h != "" { 949 return h 950 } 951 if h := os.Getenv("HOSTNAME"); h != "" { 952 return h 953 } 954 hostname, _ := os.Hostname() 955 return hostname 956} 957 958// setupTmpDir sets up a temporary directory on the same volume as the 959// indexes. 960// 961// If main is true we will delete older temp directories left around. main is 962// false when this is a debug command. 963func setupTmpDir(main bool, index string) error { 964 // change the target tmp directory depending on if its our main daemon or a 965 // debug sub command. 966 dir := ".indexserver.debug.tmp" 967 if main { 968 dir = ".indexserver.tmp" 969 } 970 971 tmpRoot := filepath.Join(index, dir) 972 if err := os.MkdirAll(tmpRoot, 0755); err != nil { 973 return err 974 } 975 if !tmpfriend.IsTmpFriendDir(tmpRoot) { 976 _, err := tmpfriend.RootTempDir(tmpRoot) 977 return err 978 } 979 return nil 980} 981 982func printMetaData(fn string) error { 983 repo, indexMeta, err := zoekt.ReadMetadataPath(fn) 984 if err != nil { 985 return err 986 } 987 988 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 989 if err != nil { 990 return err 991 } 992 993 err = json.NewEncoder(os.Stdout).Encode(repo) 994 if err != nil { 995 return err 996 } 997 return nil 998} 999 1000func printShardStats(fn string) error { 1001 f, err := os.Open(fn) 1002 if err != nil { 1003 return err 1004 } 1005 1006 iFile, err := zoekt.NewIndexFile(f) 1007 if err != nil { 1008 return err 1009 } 1010 1011 return zoekt.PrintNgramStats(iFile) 1012} 1013 1014func srcLogLevelIsDebug() bool { 1015 lvl := os.Getenv(sglog.EnvLogLevel) 1016 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1017} 1018 1019func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1020 v := os.Getenv(k) 1021 if v == "" { 1022 return defaultVal 1023 } 1024 i, err := strconv.ParseInt(v, 10, 64) 1025 if err != nil { 1026 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1027 } 1028 return i 1029} 1030 1031func getEnvWithDefaultInt(k string, defaultVal int) int { 1032 v := os.Getenv(k) 1033 if v == "" { 1034 return defaultVal 1035 } 1036 i, err := strconv.Atoi(k) 1037 if err != nil { 1038 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1039 } 1040 return i 1041} 1042 1043func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1044 v := os.Getenv(k) 1045 if v == "" { 1046 return defaultVal 1047 } 1048 i, err := strconv.ParseUint(v, 10, 64) 1049 if err != nil { 1050 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1051 } 1052 return i 1053} 1054 1055func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1056 v := os.Getenv(k) 1057 if v == "" { 1058 return defaultVal 1059 } 1060 f, err := strconv.ParseFloat(v, 64) 1061 if err != nil { 1062 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1063 } 1064 return f 1065} 1066 1067func getEnvWithDefaultString(k string, defaultVal string) string { 1068 v := os.Getenv(k) 1069 if v == "" { 1070 return defaultVal 1071 } 1072 return v 1073} 1074 1075func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1076 v := os.Getenv(k) 1077 if v == "" { 1078 return defaultVal 1079 } 1080 1081 d, err := time.ParseDuration(v) 1082 if err != nil { 1083 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1084 } 1085 return d 1086} 1087 1088func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1089 v := os.Getenv(k) 1090 if v == "" { 1091 return defaultVal 1092 } 1093 1094 b, err := strconv.ParseBool(v) 1095 if err != nil { 1096 log.Fatalf("error parsing ENV %s to bool: %s", k, err) 1097 } 1098 return b 1099} 1100 1101func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1102 set := map[string]struct{}{} 1103 for _, v := range strings.Split(os.Getenv(k), ",") { 1104 v = strings.TrimSpace(v) 1105 if v != "" { 1106 set[v] = struct{}{} 1107 } 1108 } 1109 return set 1110} 1111 1112func joinStringSet(set map[string]struct{}, sep string) string { 1113 var xs []string 1114 for x := range set { 1115 xs = append(xs, x) 1116 } 1117 1118 return strings.Join(xs, sep) 1119} 1120 1121func setCompoundShardCounter(indexDir string) { 1122 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt")) 1123 if err != nil { 1124 log.Printf("setCompoundShardCounter: %s\n", err) 1125 return 1126 } 1127 metricNumberCompoundShards.Set(float64(len(fns))) 1128} 1129 1130func rootCmd() *ffcli.Command { 1131 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1132 conf := rootConfig{ 1133 Main: true, 1134 } 1135 conf.registerRootFlags(rootFs) 1136 1137 return &ffcli.Command{ 1138 FlagSet: rootFs, 1139 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1140 Subcommands: []*ffcli.Command{debugCmd()}, 1141 Exec: func(ctx context.Context, args []string) error { 1142 return startServer(conf) 1143 }, 1144 } 1145} 1146 1147type rootConfig struct { 1148 // Main is true if this rootConfig is for our main long running command (the 1149 // indexserver). Debug commands should not set this value. This is used to 1150 // determine if we need to run tmpfriend. 1151 Main bool 1152 1153 root string 1154 interval time.Duration 1155 index string 1156 indexConcurrency int64 1157 listen string 1158 hostname string 1159 cpuFraction float64 1160 blockProfileRate int 1161 1162 // config values related to shard merging 1163 vacuumInterval time.Duration 1164 mergeInterval time.Duration 1165 targetSize int64 1166 minSize int64 1167 minAgeDays int 1168 maxPriority float64 1169 1170 // config values related to backoff indexing repos with one or more consecutive failures 1171 backoffDuration time.Duration 1172 maxBackoffDuration time.Duration 1173 1174 // useGRPC is true if we should use the gRPC API to talk to Sourcegraph. 1175 useGRPC bool 1176} 1177 1178func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1179 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1180 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1181 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of concurrent index jobs to run.") 1182 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1183 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1184 fs.StringVar(&rc.hostname, "hostname", hostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1185 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1186 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate") 1187 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1188 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1189 fs.BoolVar(&rc.useGRPC, "use_grpc", getEnvWithDefaultBool("GRPC_ENABLED", false), "use the gRPC API to talk to Sourcegraph") 1190 1191 // flags related to shard merging 1192 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1193 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1194 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 1195 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 1196 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1197 fs.Float64Var(&rc.maxPriority, "merge_max_priority", getEnvWithDefaultFloat64("SRC_MERGE_MAX_PRIORITY", 100), "the maximum priority a shard can have to be considered for merging.") 1198} 1199 1200func startServer(conf rootConfig) error { 1201 s, err := newServer(conf) 1202 if err != nil { 1203 return err 1204 } 1205 1206 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate) 1207 setCompoundShardCounter(s.IndexDir) 1208 1209 if conf.listen != "" { 1210 1211 mux := http.NewServeMux() 1212 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1213 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1214 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1215 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1216 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1217 }...) 1218 s.addDebugHandlers(mux) 1219 1220 go func() { 1221 debug.Printf("serving HTTP on %s", conf.listen) 1222 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1223 1224 }() 1225 1226 // Serve mux on a unix domain socket on a best-effort-basis so that 1227 // webserver can call the endpoints via the shared filesystem. 1228 // 1229 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1230 // on the socket due to permission errors. See 1231 // https://github.com/docker/for-mac/issues/6239 1232 go func() { 1233 serveHTTPOverSocket := func() error { 1234 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1235 // We cannot bind a socket to an existing pathname. 1236 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1237 return fmt.Errorf("error removing socket file: %s", socket) 1238 } 1239 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1240 // unixpacket). 1241 l, err := net.Listen("unix", socket) 1242 if err != nil { 1243 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1244 } 1245 // Indexserver (root) and webserver (Sourcegraph) run with 1246 // different users. Per default, the socket is created with 1247 // permission 755 (root root), which doesn't let webserver write to 1248 // it. 1249 // 1250 // See https://github.com/golang/go/issues/11822 for more context. 1251 if err := os.Chmod(socket, 0777); err != nil { 1252 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1253 } 1254 debug.Printf("serving HTTP on %s", socket) 1255 return http.Serve(l, mux) 1256 } 1257 debug.Print(serveHTTPOverSocket()) 1258 }() 1259 } 1260 1261 oc := &ownerChecker{ 1262 Path: filepath.Join(conf.index, "owner.txt"), 1263 Hostname: conf.hostname, 1264 } 1265 go oc.Run() 1266 1267 logger := sglog.Scoped("metricsRegistration", "") 1268 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1269 1270 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1271 prometheus.DefaultRegisterer.MustRegister(c) 1272 1273 s.Run() 1274 return nil 1275} 1276 1277func newServer(conf rootConfig) (*Server, error) { 1278 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1279 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1280 } 1281 if conf.index == "" { 1282 return nil, fmt.Errorf("must set -index") 1283 } 1284 if conf.root == "" { 1285 return nil, fmt.Errorf("must set -sourcegraph_url") 1286 } 1287 rootURL, err := url.Parse(conf.root) 1288 if err != nil { 1289 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1290 } 1291 1292 rootURL = addDefaultPort(rootURL) 1293 1294 // Tune GOMAXPROCS to match Linux container CPU quota. 1295 _, _ = maxprocs.Set() 1296 1297 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler. 1298 // The block profiler is disabled by default and should be enabled with care in production 1299 runtime.SetBlockProfileRate(conf.blockProfileRate) 1300 1301 // Automatically prepend our own path at the front, to minimize 1302 // required configuration. 1303 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1304 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1305 } 1306 1307 if _, err := os.Stat(conf.index); err != nil { 1308 if err := os.MkdirAll(conf.index, 0755); err != nil { 1309 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1310 } 1311 } 1312 1313 if err := setupTmpDir(conf.Main, conf.index); err != nil { 1314 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1315 } 1316 1317 if srcLogLevelIsDebug() { 1318 debug = log.New(os.Stderr, "", log.LstdFlags) 1319 } 1320 1321 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1322 if len(reposWithSeparateIndexingMetrics) > 0 { 1323 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1324 } 1325 1326 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1327 if len(deltaBuildRepositoriesAllowList) > 0 { 1328 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1329 } 1330 1331 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1332 if deltaShardNumberFallbackThreshold > 0 { 1333 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1334 } else { 1335 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1336 } 1337 1338 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1339 if len(reposShouldSkipSymbolsCalculation) > 0 { 1340 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1341 } 1342 1343 var sg Sourcegraph 1344 if rootURL.IsAbs() { 1345 var batchSize int 1346 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1347 batchSize, err = strconv.Atoi(v) 1348 if err != nil { 1349 return nil, fmt.Errorf("Invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1350 } 1351 } 1352 1353 opts := []SourcegraphClientOption{ 1354 WithBatchSize(batchSize), 1355 WithShouldUseGRPC(conf.useGRPC), 1356 } 1357 1358 gRPCConnectionOptions := []grpc.DialOption{ 1359 grpc.WithTransportCredentials(insecure.NewCredentials()), 1360 grpc.WithChainStreamInterceptor(internalActorStreamInterceptor()), 1361 grpc.WithChainUnaryInterceptor(internalActorUnaryInterceptor()), 1362 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1363 } 1364 1365 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1366 // This is done lazily, so we can provide the client to use regardless of 1367 // whether we enabled gRPC or not initially. 1368 cc, err := grpc.Dial(rootURL.Host, gRPCConnectionOptions...) 1369 if err != nil { 1370 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1371 } 1372 1373 client := proto.NewZoektConfigurationServiceClient(cc) 1374 opts = append(opts, WithGRPCClient(client)) 1375 1376 sg = newSourcegraphClient(rootURL, conf.hostname, opts...) 1377 1378 } else { 1379 sg = sourcegraphFake{ 1380 RootDir: rootURL.String(), 1381 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1382 } 1383 } 1384 1385 if conf.indexConcurrency < 1 { 1386 conf.indexConcurrency = 1 1387 } 1388 1389 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1390 if cpuCount < 1 { 1391 cpuCount = 1 1392 } 1393 1394 logger := sglog.Scoped("server", "periodically reindexes enabled repositories on sourcegraph") 1395 1396 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1397 1398 return &Server{ 1399 logger: logger, 1400 Sourcegraph: sg, 1401 IndexDir: conf.index, 1402 IndexConcurrency: int(conf.indexConcurrency), 1403 Interval: conf.interval, 1404 CPUCount: cpuCount, 1405 queue: *q, 1406 shardMerging: zoekt.ShardMergingEnabled(), 1407 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1408 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1409 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1410 hostname: conf.hostname, 1411 mergeOpts: mergeOpts{ 1412 vacuumInterval: conf.vacuumInterval, 1413 mergeInterval: conf.mergeInterval, 1414 targetSizeBytes: conf.targetSize * 1024 * 1024, 1415 minSizeBytes: conf.minSize * 1024 * 1024, 1416 minAgeDays: conf.minAgeDays, 1417 maxPriority: conf.maxPriority, 1418 }, 1419 }, err 1420} 1421 1422// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1423// for the indexed-search-configuration gRPC service. 1424// 1425// The default backoff strategy is modeled after the default settings used by 1426// retryablehttp.DefaultClient. 1427// 1428// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1429// - Unavailable 1430// - Aborted 1431// 1432//go:embed default_grpc_service_configuration.json 1433var defaultGRPCServiceConfigurationJSON string 1434 1435func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1436 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1437 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1438 return invoker(ctx, method, req, reply, cc, opts...) 1439 } 1440} 1441 1442func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1443 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1444 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1445 return streamer(ctx, desc, cc, method, opts...) 1446 } 1447} 1448 1449// addDefaultPort adds a default port to a URL if one is not specified. 1450// 1451// If the URL scheme is "http" and no port is specified, "80" is used. 1452// If the scheme is "https", "443" is used. 1453// 1454// The original URL is not mutated. A copy is modified and returned. 1455func addDefaultPort(original *url.URL) *url.URL { 1456 if original == nil { 1457 return nil // don't panic 1458 } 1459 1460 if !original.IsAbs() { 1461 return original // don't do anything if the URL doesn't look like a remote URL 1462 } 1463 1464 if original.Scheme == "http" && original.Port() == "" { 1465 u := cloneURL(original) 1466 u.Host = net.JoinHostPort(u.Host, "80") 1467 return u 1468 } 1469 1470 if original.Scheme == "https" && original.Port() == "" { 1471 u := cloneURL(original) 1472 u.Host = net.JoinHostPort(u.Host, "443") 1473 return u 1474 } 1475 1476 return original 1477} 1478 1479// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1480// This is copied from net/http/clone.go 1481func cloneURL(u *url.URL) *url.URL { 1482 if u == nil { 1483 return nil 1484 } 1485 u2 := new(url.URL) 1486 *u2 = *u 1487 if u.User != nil { 1488 u2.User = new(url.Userinfo) 1489 *u2.User = *u.User 1490 } 1491 return u2 1492} 1493 1494func main() { 1495 liblog := sglog.Init(sglog.Resource{ 1496 Name: "zoekt-indexserver", 1497 Version: zoekt.Version, 1498 InstanceID: hostnameBestEffort(), 1499 }) 1500 defer liblog.Sync() 1501 1502 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1503 log.Fatal(err) 1504 } 1505}