fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled 2// repositories on sourcegraph 3package main 4 5import ( 6 "bytes" 7 "context" 8 _ "embed" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "html/template" 14 "io" 15 "io/fs" 16 "log" 17 "math" 18 "math/rand" 19 "net" 20 "net/http" 21 "net/url" 22 "os" 23 "os/exec" 24 "os/signal" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strconv" 29 "strings" 30 "sync" 31 "text/tabwriter" 32 "time" 33 34 "github.com/keegancsmith/tmpfriend" 35 "github.com/peterbourgon/ff/v3/ffcli" 36 "github.com/prometheus/client_golang/prometheus" 37 "github.com/prometheus/client_golang/prometheus/promauto" 38 sglog "github.com/sourcegraph/log" 39 "go.uber.org/automaxprocs/maxprocs" 40 "golang.org/x/net/trace" 41 "golang.org/x/sys/unix" 42 "google.golang.org/grpc" 43 "google.golang.org/grpc/credentials/insecure" 44 "google.golang.org/grpc/metadata" 45 46 "github.com/sourcegraph/mountinfo" 47 48 "github.com/sourcegraph/zoekt" 49 "github.com/sourcegraph/zoekt/build" 50 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 51 "github.com/sourcegraph/zoekt/debugserver" 52 "github.com/sourcegraph/zoekt/internal/profiler" 53) 54 55var ( 56 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 57 Name: "resolve_revisions_seconds", 58 Help: "A histogram of latencies for resolving all repository revisions.", 59 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 60 }) 61 62 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 63 Name: "resolve_revision_seconds", 64 Help: "A histogram of latencies for resolving a repository revision.", 65 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 66 }, []string{"success"}) // success=true|false 67 68 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 69 Name: "get_index_options_total", 70 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 71 }) 72 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 73 Name: "get_index_options_error_total", 74 Help: "The total number of times we failed to get index options for a repository.", 75 }) 76 77 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 78 Name: "index_repo_seconds", 79 Help: "A histogram of latencies for indexing a repository.", 80 Buckets: prometheus.ExponentialBucketsRange( 81 (100 * time.Millisecond).Seconds(), 82 (40*time.Minute + indexTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 83 20), 84 }, []string{ 85 "state", // state is an indexState 86 "name", // name of the repository that was indexed 87 }) 88 89 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 90 Name: "index_fetch_seconds", 91 Help: "A histogram of latencies for fetching a repository.", 92 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 93 }, []string{ 94 "success", // true|false 95 "name", // the name of the repository that the commits were fetched from 96 }) 97 98 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 99 Name: "index_incremental_index_state", 100 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 101 }, []string{"state"}) // state is build.IndexState 102 103 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 104 Name: "index_num_indexed", 105 Help: "Number of indexed repos by code host", 106 }) 107 108 metricNumAssigned = promauto.NewGauge(prometheus.GaugeOpts{ 109 Name: "index_num_assigned", 110 Help: "Number of repos assigned to this indexer by code host", 111 }) 112 113 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 114 Name: "index_failing_total", 115 Help: "Counts failures to index (indexing activity, should be used with rate())", 116 }) 117 118 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 119 Name: "index_indexing_total", 120 Help: "Counts indexings (indexing activity, should be used with rate())", 121 }) 122 123 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 124 Name: "index_num_stopped_tracking_total", 125 Help: "Counts the number of repos we stopped tracking.", 126 }) 127) 128 129// set of repositories that we want to capture separate indexing metrics for 130var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 131 132type indexState string 133 134const ( 135 indexStateFail indexState = "fail" 136 indexStateSuccess indexState = "success" 137 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 138 indexStateNoop indexState = "noop" // We didn't need to update index 139 indexStateEmpty indexState = "empty" // index is empty (empty repo) 140) 141 142// Server is the main functionality of zoekt-sourcegraph-indexserver. It 143// exists to conveniently use all the options passed in via func main. 144type Server struct { 145 logger sglog.Logger 146 147 Sourcegraph Sourcegraph 148 BatchSize int 149 150 // IndexDir is the index directory to use. 151 IndexDir string 152 153 // IndexConcurrency is the number of repositories we index at once. 154 IndexConcurrency int 155 156 // Interval is how often we sync with Sourcegraph. 157 Interval time.Duration 158 // CPUCount is the amount of parallelism to use when indexing a 159 // repository. 160 CPUCount int 161 162 queue Queue 163 164 // muIndexDir protects the index directory from concurrent access. 165 muIndexDir indexMutex 166 167 // If true, shard merging is enabled. 168 shardMerging bool 169 170 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 171 // use delta-builds for instead of normal builds 172 deltaBuildRepositoriesAllowList map[string]struct{} 173 174 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 175 // before attempting a delta build. 176 deltaShardNumberFallbackThreshold uint64 177 178 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 179 // we skip calculating symbols metadata for during builds 180 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 181 182 hostname string 183 184 mergeOpts mergeOpts 185} 186 187var debug = log.New(io.Discard, "", log.LstdFlags) 188 189// our index commands should output something every 100mb they process. 190// 191// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 192// time." famous last words. A client was indexing a monorepo with 42 193// cores... 5m was not enough. 194const noOutputTimeout = 30 * time.Minute 195 196func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 197 out := &synchronizedBuffer{} 198 cmd.Stdout = out 199 cmd.Stderr = out 200 201 tr.LazyPrintf("%s", cmd.Args) 202 203 defer func() { 204 if err != nil { 205 outS := out.String() 206 tr.LazyPrintf("failed: %v", err) 207 tr.LazyPrintf("output: %s", out) 208 tr.SetError() 209 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 210 } 211 }() 212 213 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 214 215 if err := cmd.Start(); err != nil { 216 return err 217 } 218 219 errC := make(chan error) 220 go func() { 221 errC <- cmd.Wait() 222 }() 223 224 // This channel is set after we have sent sigquit. It allows us to follow up 225 // with a sigkill if the process doesn't quit after sigquit. 226 kill := make(<-chan time.Time) 227 228 lastLen := 0 229 for { 230 select { 231 case <-time.After(noOutputTimeout): 232 // Periodically check if we have had output. If not kill the process. 233 if out.Len() != lastLen { 234 lastLen = out.Len() 235 log.Printf("still running %s", cmd.Args) 236 } else { 237 // Send quit (C-\) first so we get a stack dump. 238 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 239 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 240 log.Println("quit failed:", err) 241 } 242 243 // send sigkill if still running in 10s 244 kill = time.After(10 * time.Second) 245 } 246 247 case <-kill: 248 log.Printf("still running, killing %s", cmd.Args) 249 if err := cmd.Process.Kill(); err != nil { 250 log.Println("kill failed:", err) 251 } 252 253 case err := <-errC: 254 if err != nil { 255 return err 256 } 257 258 tr.LazyPrintf("success") 259 return nil 260 } 261 } 262} 263 264// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 265// monitor the buffer while it is being written to. 266type synchronizedBuffer struct { 267 mu sync.Mutex 268 b bytes.Buffer 269} 270 271func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 272 sb.mu.Lock() 273 defer sb.mu.Unlock() 274 return sb.b.Write(p) 275} 276 277func (sb *synchronizedBuffer) Len() int { 278 sb.mu.Lock() 279 defer sb.mu.Unlock() 280 return sb.b.Len() 281} 282 283func (sb *synchronizedBuffer) String() string { 284 sb.mu.Lock() 285 defer sb.mu.Unlock() 286 return sb.b.String() 287} 288 289// pauseFileName if present in IndexDir will stop index jobs from 290// running. This is to make it possible to experiment with the content of the 291// IndexDir without the indexserver writing to it. 292const pauseFileName = "PAUSE" 293 294// Run the sync loop. This blocks forever. 295func (s *Server) Run() { 296 removeIncompleteShards(s.IndexDir) 297 298 // Start a goroutine which updates the queue with commits to index. 299 go func() { 300 // We update the list of indexed repos every Interval. To speed up manual 301 // testing we also listen for SIGUSR1 to trigger updates. 302 // 303 // "pkill -SIGUSR1 zoekt-sourcegra" 304 for range jitterTicker(s.Interval, unix.SIGUSR1) { 305 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 306 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 307 continue 308 } 309 310 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 311 if err != nil { 312 log.Printf("error listing repos: %s", err) 313 continue 314 } 315 316 debug.Printf("updating index queue with %d repositories", len(repos.IDs)) 317 318 // Stop indexing repos we don't need to track anymore 319 removed := s.queue.MaybeRemoveMissing(repos.IDs) 320 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 321 if len(removed) > 0 { 322 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 323 } 324 325 cleanupDone := make(chan struct{}) 326 go func() { 327 defer close(cleanupDone) 328 s.muIndexDir.Global(func() { 329 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 330 }) 331 }() 332 333 repos.IterateIndexOptions(s.queue.AddOrUpdate) 334 335 // IterateIndexOptions will only iterate over repositories that have 336 // changed since we last called list. However, we want to add all IDs 337 // back onto the queue just to check that what is on disk is still 338 // correct. This will use the last IndexOptions we stored in the 339 // queue. The repositories not on the queue (missing) need a forced 340 // fetch of IndexOptions. 341 missing := s.queue.Bump(repos.IDs) 342 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 343 344 setCompoundShardCounter(s.IndexDir) 345 346 <-cleanupDone 347 } 348 }() 349 350 go func() { 351 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 352 if s.shardMerging { 353 s.vacuum() 354 } 355 } 356 }() 357 358 go func() { 359 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 360 if s.shardMerging { 361 s.doMerge() 362 } 363 } 364 }() 365 366 for i := 0; i < s.IndexConcurrency; i++ { 367 go s.processQueue() 368 } 369 370 // block forever 371 select {} 372} 373 374// formatList returns a comma-separated list of the first min(len(v), m) items. 375func formatListUint32(v []uint32, m int) string { 376 if len(v) < m { 377 m = len(v) 378 } 379 380 sb := strings.Builder{} 381 for i := 0; i < m; i++ { 382 fmt.Fprintf(&sb, "%d, ", v[i]) 383 } 384 385 if len(v) > m { 386 sb.WriteString("...") 387 } 388 389 return strings.TrimRight(sb.String(), ", ") 390} 391 392func (s *Server) processQueue() { 393 for { 394 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 395 time.Sleep(time.Second) 396 continue 397 } 398 399 opts, ok := s.queue.Pop() 400 if !ok { 401 time.Sleep(time.Second) 402 continue 403 } 404 405 args := s.indexArgs(opts) 406 407 ran := s.muIndexDir.With(opts.Name, func() { 408 // only record time taken once we hold the lock. This avoids us 409 // recording time taken while merging/cleanup runs. 410 start := time.Now() 411 412 state, err := s.Index(args) 413 414 elapsed := time.Since(start) 415 416 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 417 418 if err != nil { 419 log.Printf("error indexing %s: %s", args.String(), err) 420 } 421 422 switch state { 423 case indexStateSuccess: 424 var branches []string 425 for _, b := range args.Branches { 426 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 427 } 428 s.logger.Info("updated index", 429 sglog.String("repo", args.Name), 430 sglog.Uint32("id", args.RepoID), 431 sglog.Strings("branches", branches), 432 sglog.Duration("duration", elapsed), 433 ) 434 case indexStateSuccessMeta: 435 log.Printf("updated meta %s in %v", args.String(), elapsed) 436 } 437 s.queue.SetIndexed(opts, state) 438 }) 439 440 if !ran { 441 // Someone else is processing the repository. We can just skip this job 442 // since the repository will be added back to the queue and we will 443 // converge to the correct behaviour. 444 debug.Printf("index job for repository already running: %s", args) 445 continue 446 } 447 } 448} 449 450// repoNameForMetric returns a normalized version of the given repository name that is 451// suitable for use with Prometheus metrics. 452func repoNameForMetric(repo string) string { 453 // Check to see if we want to be able to capture separate indexing metrics for this repository. 454 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 455 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 456 return repo 457 } 458 459 return "" 460} 461 462func batched(slice []uint32, size int) <-chan []uint32 { 463 c := make(chan []uint32) 464 go func() { 465 for len(slice) > 0 { 466 if size > len(slice) { 467 size = len(slice) 468 } 469 c <- slice[:size] 470 slice = slice[size:] 471 } 472 close(c) 473 }() 474 return c 475} 476 477// jitterTicker returns a ticker which ticks with a jitter. Each tick is 478// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 479// 480// sig is a list of signals which also cause the ticker to fire. This is a 481// convenience to allow manually triggering of the ticker. 482func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 483 ticker := make(chan struct{}) 484 485 go func() { 486 for { 487 ticker <- struct{}{} 488 ns := int64(d) 489 jitter := rand.Int63n(ns) 490 time.Sleep(time.Duration(ns/2 + jitter)) 491 } 492 }() 493 494 go func() { 495 if len(sig) == 0 { 496 return 497 } 498 499 c := make(chan os.Signal, 1) 500 signal.Notify(c, sig...) 501 for range c { 502 ticker <- struct{}{} 503 } 504 }() 505 506 return ticker 507} 508 509// Index starts an index job for repo name at commit. 510func (s *Server) Index(args *indexArgs) (state indexState, err error) { 511 tr := trace.New("index", args.Name) 512 513 defer func() { 514 if err != nil { 515 tr.SetError() 516 tr.LazyPrintf("error: %v", err) 517 state = indexStateFail 518 metricFailingTotal.Inc() 519 } 520 tr.LazyPrintf("state: %s", state) 521 tr.Finish() 522 }() 523 524 tr.LazyPrintf("branches: %v", args.Branches) 525 526 if len(args.Branches) == 0 { 527 return indexStateEmpty, createEmptyShard(args) 528 } 529 530 repositoryName := args.Name 531 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 532 tr.LazyPrintf("marking this repository for delta build") 533 args.UseDelta = true 534 } 535 536 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 537 538 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 539 tr.LazyPrintf("skipping symbols calculation") 540 args.Symbols = false 541 } 542 543 reason := "forced" 544 545 if args.Incremental { 546 bo := args.BuildOptions() 547 bo.SetDefaults() 548 incrementalState, fn := bo.IndexState() 549 reason = string(incrementalState) 550 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 551 552 switch incrementalState { 553 case build.IndexStateEqual: 554 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn) 555 return indexStateNoop, nil 556 557 case build.IndexStateMeta: 558 log.Printf("updating index.meta %s", args.String()) 559 560 if err := mergeMeta(bo); err != nil { 561 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 562 } else { 563 return indexStateSuccessMeta, nil 564 } 565 566 case build.IndexStateCorrupt: 567 log.Printf("falling back to full update: corrupt index: %s", args.String()) 568 } 569 } 570 571 log.Printf("updating index %s reason=%s", args.String(), reason) 572 573 metricIndexingTotal.Inc() 574 c := gitIndexConfig{ 575 runCmd: func(cmd *exec.Cmd) error { 576 return s.loggedRun(tr, cmd) 577 }, 578 579 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 580 return args.BuildOptions().FindRepositoryMetadata() 581 }, 582 } 583 584 err = gitIndex(c, args, s.Sourcegraph, s.logger) 585 if err != nil { 586 return indexStateFail, err 587 } 588 589 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 590 s.logger.Error("failed to update index status", 591 sglog.String("repo", args.Name), 592 sglog.Uint32("id", args.RepoID), 593 sglogBranches("branches", args.Branches), 594 sglog.Error(err), 595 ) 596 } 597 598 return indexStateSuccess, nil 599} 600 601// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 602// it can update the zoekt_repos table. 603func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 604 // We need to read from disk for IndexTime. 605 _, metadata, ok, err := c.findRepositoryMetadata(args) 606 if err != nil { 607 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 608 } 609 if !ok { 610 return errors.New("failed to find metadata for new/updated index") 611 } 612 613 status := []indexStatus{{ 614 RepoID: args.RepoID, 615 Branches: args.Branches, 616 IndexTimeUnix: metadata.IndexTime.Unix(), 617 }} 618 if err := sg.UpdateIndexStatus(status); err != nil { 619 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 620 } 621 622 return nil 623} 624 625func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 626 ss := make([]string, len(branches)) 627 for i, b := range branches { 628 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 629 } 630 return sglog.Strings(key, ss) 631} 632 633func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 634 return &indexArgs{ 635 IndexOptions: opts, 636 637 IndexDir: s.IndexDir, 638 Parallelism: s.CPUCount, 639 640 Incremental: true, 641 642 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 643 FileLimit: 1 << 20, 644 } 645} 646 647func createEmptyShard(args *indexArgs) error { 648 bo := args.BuildOptions() 649 bo.SetDefaults() 650 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 651 652 if args.Incremental && bo.IncrementalSkipIndexing() { 653 return nil 654 } 655 656 builder, err := build.NewBuilder(*bo) 657 if err != nil { 658 return err 659 } 660 return builder.Finish() 661} 662 663// addDebugHandlers adds handlers specific to indexserver. 664func (s *Server) addDebugHandlers(mux *http.ServeMux) { 665 // Sourcegraph's site admin view requires indexserver to serve it's admin view 666 // on "/". 667 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 668 669 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 670 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 671 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 672 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 673 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 674 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 675} 676 677func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 678 if r.Method != "GET" { 679 w.Header().Set("Allow", "GET") 680 w.WriteHeader(http.StatusMethodNotAllowed) 681 return 682 } 683 684 response := struct { 685 Hostname string 686 }{ 687 Hostname: s.hostname, 688 } 689 690 b, err := json.Marshal(response) 691 if err != nil { 692 http.Error(w, err.Error(), http.StatusInternalServerError) 693 return 694 } 695 696 w.Header().Set("Content-Type", "application/json; charset=utf-8") 697 w.Write(b) 698} 699 700var rootTmpl = template.Must(template.New("name").Parse(` 701<html> 702 <body> 703 <a href="debug">Debug</a><br /> 704 <a href="debug/requests">Traces</a><br /> 705 {{.IndexMsg}}<br /> 706 <br /> 707 <h3>Reindex</h3> 708 {{if .Repos}} 709 <a href="?show_repos=false">hide repos</a><br /> 710 <table style="margin-top: 20px"> 711 <th style="text-align:left">Name</th> 712 <th style="text-align:left">ID</th> 713 {{range .Repos}} 714 <tr> 715 <td>{{.Name}}</td> 716 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 717 </tr> 718 {{end}} 719 </table> 720 {{else}} 721 <a href="?show_repos=true">show repos</a><br /> 722 {{end}} 723 </body> 724</html> 725`)) 726 727func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 728 if r.Method != "GET" { 729 w.Header().Set("Allow", "GET") 730 w.WriteHeader(http.StatusMethodNotAllowed) 731 return 732 } 733 734 values := r.URL.Query() 735 736 // ?id= 737 indexMsg := "" 738 if v := values.Get("id"); v != "" { 739 id, err := strconv.Atoi(v) 740 if err != nil { 741 http.Error(w, err.Error(), http.StatusBadRequest) 742 return 743 } 744 indexMsg, _ = s.forceIndex(uint32(id)) 745 } 746 747 // ?show_repos= 748 showRepos := false 749 if v := values.Get("show_repos"); v != "" { 750 showRepos, _ = strconv.ParseBool(v) 751 } 752 753 type Repo struct { 754 ID uint32 755 Name string 756 } 757 var data struct { 758 Repos []Repo 759 IndexMsg string 760 } 761 762 data.IndexMsg = indexMsg 763 764 if showRepos { 765 s.queue.Iterate(func(opts *IndexOptions) { 766 data.Repos = append(data.Repos, Repo{ 767 ID: opts.RepoID, 768 Name: opts.Name, 769 }) 770 }) 771 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 772 } 773 774 _ = rootTmpl.Execute(w, data) 775} 776 777// handleReindex triggers a reindex asynocronously. If a reindex was triggered 778// the request returns with status 202. The caller can infer the new state of 779// the index by calling List. 780func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 781 if r.Method != http.MethodPost { 782 w.Header().Set("Allow", http.MethodPost) 783 w.WriteHeader(http.StatusMethodNotAllowed) 784 return 785 } 786 787 err := r.ParseForm() 788 if err != nil { 789 http.Error(w, err.Error(), http.StatusBadRequest) 790 return 791 } 792 793 id, err := strconv.Atoi(r.Form.Get("repo")) 794 if err != nil { 795 http.Error(w, err.Error(), http.StatusBadRequest) 796 return 797 } 798 799 go func() { s.forceIndex(uint32(id)) }() 800 801 // 202 Accepted 802 w.WriteHeader(http.StatusAccepted) 803} 804 805func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 806 withIndexed := true 807 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 808 withIndexed = b 809 } 810 811 var indexed []uint32 812 if withIndexed { 813 indexed = listIndexed(s.IndexDir) 814 } 815 816 repos, err := s.Sourcegraph.List(r.Context(), indexed) 817 if err != nil { 818 http.Error(w, err.Error(), http.StatusInternalServerError) 819 return 820 } 821 822 bw := bytes.Buffer{} 823 824 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 825 826 _, err = fmt.Fprintf(tw, "ID\tName\n") 827 if err != nil { 828 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 829 return 830 } 831 832 s.queue.mu.Lock() 833 name := "" 834 for _, id := range repos.IDs { 835 if item := s.queue.get(id); item != nil { 836 name = item.opts.Name 837 } else { 838 name = "" 839 } 840 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 841 if err != nil { 842 debug.Printf("handleDebugList: %s\n", err.Error()) 843 } 844 } 845 s.queue.mu.Unlock() 846 847 if err != nil { 848 http.Error(w, err.Error(), http.StatusInternalServerError) 849 return 850 } 851 852 err = tw.Flush() 853 if err != nil { 854 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 855 return 856 } 857 858 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 859 860 if _, err := io.Copy(w, &bw); err != nil { 861 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 862 return 863 } 864} 865 866// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 867// can run this command during periods of low usage (evenings, weekends) to 868// trigger an initial merge run. In the steady-state, merges happen rarely, even 869// on busy instances, and users can rely on automatic merging instead. 870func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 871 872 // A merge operation can take very long, depending on the number merges and the 873 // target size of the compound shards. We run the merge in the background and 874 // return immediately to the user. 875 // 876 // We track the status of the merge with metricShardMergingRunning. 877 go func() { 878 s.doMerge() 879 }() 880 _, _ = w.Write([]byte("merging enqueued\n")) 881} 882 883func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 884 indexed := listIndexed(s.IndexDir) 885 886 bw := bytes.Buffer{} 887 888 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 889 890 _, err := fmt.Fprintf(tw, "ID\tName\n") 891 if err != nil { 892 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 893 return 894 } 895 896 s.queue.mu.Lock() 897 name := "" 898 for _, id := range indexed { 899 if item := s.queue.get(id); item != nil { 900 name = item.opts.Name 901 } else { 902 name = "" 903 } 904 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 905 if err != nil { 906 debug.Printf("handleDebugIndexed: %s\n", err.Error()) 907 } 908 } 909 s.queue.mu.Unlock() 910 911 if err != nil { 912 http.Error(w, err.Error(), http.StatusInternalServerError) 913 return 914 } 915 916 err = tw.Flush() 917 if err != nil { 918 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 919 return 920 } 921 922 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 923 924 if _, err := io.Copy(w, &bw); err != nil { 925 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 926 return 927 } 928} 929 930// forceIndex will run the index job for repo name now. It will return always 931// return a string explaining what it did, even if it failed. 932func (s *Server) forceIndex(id uint32) (string, error) { 933 var opts IndexOptions 934 var err error 935 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 936 opts = o 937 }, func(_ uint32, e error) { 938 err = e 939 }, id) 940 if err != nil { 941 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 942 } 943 944 args := s.indexArgs(opts) 945 args.Incremental = false // force re-index 946 947 var state indexState 948 ran := s.muIndexDir.With(opts.Name, func() { 949 state, err = s.Index(args) 950 }) 951 if !ran { 952 return fmt.Sprintf("index job for repository already running: %s", args), nil 953 } 954 if err != nil { 955 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 956 } 957 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 958} 959 960func listIndexed(indexDir string) []uint32 { 961 index := getShards(indexDir) 962 metricNumIndexed.Set(float64(len(index))) 963 repoIDs := make([]uint32, 0, len(index)) 964 for id := range index { 965 repoIDs = append(repoIDs, id) 966 } 967 sort.Slice(repoIDs, func(i, j int) bool { 968 return repoIDs[i] < repoIDs[j] 969 }) 970 return repoIDs 971} 972 973func hostnameBestEffort() string { 974 if h := os.Getenv("NODE_NAME"); h != "" { 975 return h 976 } 977 if h := os.Getenv("HOSTNAME"); h != "" { 978 return h 979 } 980 hostname, _ := os.Hostname() 981 return hostname 982} 983 984// setupTmpDir sets up a temporary directory on the same volume as the 985// indexes. 986// 987// If main is true we will delete older temp directories left around. main is 988// false when this is a debug command. 989func setupTmpDir(main bool, index string) error { 990 // change the target tmp directory depending on if its our main daemon or a 991 // debug sub command. 992 dir := ".indexserver.debug.tmp" 993 if main { 994 dir = ".indexserver.tmp" 995 } 996 997 tmpRoot := filepath.Join(index, dir) 998 if err := os.MkdirAll(tmpRoot, 0755); err != nil { 999 return err 1000 } 1001 if !tmpfriend.IsTmpFriendDir(tmpRoot) { 1002 _, err := tmpfriend.RootTempDir(tmpRoot) 1003 return err 1004 } 1005 return nil 1006} 1007 1008func printMetaData(fn string) error { 1009 repo, indexMeta, err := zoekt.ReadMetadataPath(fn) 1010 if err != nil { 1011 return err 1012 } 1013 1014 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1015 if err != nil { 1016 return err 1017 } 1018 1019 err = json.NewEncoder(os.Stdout).Encode(repo) 1020 if err != nil { 1021 return err 1022 } 1023 return nil 1024} 1025 1026func printShardStats(fn string) error { 1027 f, err := os.Open(fn) 1028 if err != nil { 1029 return err 1030 } 1031 1032 iFile, err := zoekt.NewIndexFile(f) 1033 if err != nil { 1034 return err 1035 } 1036 1037 return zoekt.PrintNgramStats(iFile) 1038} 1039 1040func srcLogLevelIsDebug() bool { 1041 lvl := os.Getenv(sglog.EnvLogLevel) 1042 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1043} 1044 1045func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1046 v := os.Getenv(k) 1047 if v == "" { 1048 return defaultVal 1049 } 1050 i, err := strconv.ParseInt(v, 10, 64) 1051 if err != nil { 1052 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1053 } 1054 return i 1055} 1056 1057func getEnvWithDefaultInt(k string, defaultVal int) int { 1058 v := os.Getenv(k) 1059 if v == "" { 1060 return defaultVal 1061 } 1062 i, err := strconv.Atoi(k) 1063 if err != nil { 1064 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1065 } 1066 return i 1067} 1068 1069func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1070 v := os.Getenv(k) 1071 if v == "" { 1072 return defaultVal 1073 } 1074 i, err := strconv.ParseUint(v, 10, 64) 1075 if err != nil { 1076 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1077 } 1078 return i 1079} 1080 1081func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1082 v := os.Getenv(k) 1083 if v == "" { 1084 return defaultVal 1085 } 1086 f, err := strconv.ParseFloat(v, 64) 1087 if err != nil { 1088 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1089 } 1090 return f 1091} 1092 1093func getEnvWithDefaultString(k string, defaultVal string) string { 1094 v := os.Getenv(k) 1095 if v == "" { 1096 return defaultVal 1097 } 1098 return v 1099} 1100 1101func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1102 v := os.Getenv(k) 1103 if v == "" { 1104 return defaultVal 1105 } 1106 1107 d, err := time.ParseDuration(v) 1108 if err != nil { 1109 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1110 } 1111 return d 1112} 1113 1114func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1115 v := os.Getenv(k) 1116 if v == "" { 1117 return defaultVal 1118 } 1119 1120 b, err := strconv.ParseBool(v) 1121 if err != nil { 1122 log.Fatalf("error parsing ENV %s to bool: %s", k, err) 1123 } 1124 return b 1125} 1126 1127func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1128 set := map[string]struct{}{} 1129 for _, v := range strings.Split(os.Getenv(k), ",") { 1130 v = strings.TrimSpace(v) 1131 if v != "" { 1132 set[v] = struct{}{} 1133 } 1134 } 1135 return set 1136} 1137 1138func joinStringSet(set map[string]struct{}, sep string) string { 1139 var xs []string 1140 for x := range set { 1141 xs = append(xs, x) 1142 } 1143 1144 return strings.Join(xs, sep) 1145} 1146 1147func setCompoundShardCounter(indexDir string) { 1148 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt")) 1149 if err != nil { 1150 log.Printf("setCompoundShardCounter: %s\n", err) 1151 return 1152 } 1153 metricNumberCompoundShards.Set(float64(len(fns))) 1154} 1155 1156func rootCmd() *ffcli.Command { 1157 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1158 conf := rootConfig{ 1159 Main: true, 1160 } 1161 conf.registerRootFlags(rootFs) 1162 1163 return &ffcli.Command{ 1164 FlagSet: rootFs, 1165 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1166 Subcommands: []*ffcli.Command{debugCmd()}, 1167 Exec: func(ctx context.Context, args []string) error { 1168 return startServer(conf) 1169 }, 1170 } 1171} 1172 1173type rootConfig struct { 1174 // Main is true if this rootConfig is for our main long running command (the 1175 // indexserver). Debug commands should not set this value. This is used to 1176 // determine if we need to run tmpfriend. 1177 Main bool 1178 1179 root string 1180 interval time.Duration 1181 index string 1182 indexConcurrency int64 1183 listen string 1184 hostname string 1185 cpuFraction float64 1186 blockProfileRate int 1187 1188 // config values related to shard merging 1189 vacuumInterval time.Duration 1190 mergeInterval time.Duration 1191 targetSize int64 1192 minSize int64 1193 minAgeDays int 1194 maxPriority float64 1195 1196 // config values related to backoff indexing repos with one or more consecutive failures 1197 backoffDuration time.Duration 1198 maxBackoffDuration time.Duration 1199 1200 // useGRPC is true if we should use the gRPC API to talk to Sourcegraph. 1201 useGRPC bool 1202} 1203 1204func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1205 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1206 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1207 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of concurrent index jobs to run.") 1208 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1209 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1210 fs.StringVar(&rc.hostname, "hostname", hostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1211 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1212 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate") 1213 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1214 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1215 fs.BoolVar(&rc.useGRPC, "use_grpc", getEnvWithDefaultBool("GRPC_ENABLED", false), "use the gRPC API to talk to Sourcegraph") 1216 1217 // flags related to shard merging 1218 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1219 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1220 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 1221 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 1222 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1223 fs.Float64Var(&rc.maxPriority, "merge_max_priority", getEnvWithDefaultFloat64("SRC_MERGE_MAX_PRIORITY", 100), "the maximum priority a shard can have to be considered for merging.") 1224} 1225 1226func startServer(conf rootConfig) error { 1227 s, err := newServer(conf) 1228 if err != nil { 1229 return err 1230 } 1231 1232 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate) 1233 setCompoundShardCounter(s.IndexDir) 1234 1235 if conf.listen != "" { 1236 1237 mux := http.NewServeMux() 1238 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1239 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1240 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1241 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1242 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1243 }...) 1244 s.addDebugHandlers(mux) 1245 1246 go func() { 1247 debug.Printf("serving HTTP on %s", conf.listen) 1248 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1249 1250 }() 1251 1252 // Serve mux on a unix domain socket on a best-effort-basis so that 1253 // webserver can call the endpoints via the shared filesystem. 1254 // 1255 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1256 // on the socket due to permission errors. See 1257 // https://github.com/docker/for-mac/issues/6239 1258 go func() { 1259 serveHTTPOverSocket := func() error { 1260 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1261 // We cannot bind a socket to an existing pathname. 1262 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1263 return fmt.Errorf("error removing socket file: %s", socket) 1264 } 1265 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1266 // unixpacket). 1267 l, err := net.Listen("unix", socket) 1268 if err != nil { 1269 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1270 } 1271 // Indexserver (root) and webserver (Sourcegraph) run with 1272 // different users. Per default, the socket is created with 1273 // permission 755 (root root), which doesn't let webserver write to 1274 // it. 1275 // 1276 // See https://github.com/golang/go/issues/11822 for more context. 1277 if err := os.Chmod(socket, 0777); err != nil { 1278 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1279 } 1280 debug.Printf("serving HTTP on %s", socket) 1281 return http.Serve(l, mux) 1282 } 1283 debug.Print(serveHTTPOverSocket()) 1284 }() 1285 } 1286 1287 oc := &ownerChecker{ 1288 Path: filepath.Join(conf.index, "owner.txt"), 1289 Hostname: conf.hostname, 1290 } 1291 go oc.Run() 1292 1293 logger := sglog.Scoped("metricsRegistration", "") 1294 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1295 1296 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1297 prometheus.DefaultRegisterer.MustRegister(c) 1298 1299 s.Run() 1300 return nil 1301} 1302 1303func newServer(conf rootConfig) (*Server, error) { 1304 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1305 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1306 } 1307 if conf.index == "" { 1308 return nil, fmt.Errorf("must set -index") 1309 } 1310 if conf.root == "" { 1311 return nil, fmt.Errorf("must set -sourcegraph_url") 1312 } 1313 rootURL, err := url.Parse(conf.root) 1314 if err != nil { 1315 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1316 } 1317 1318 rootURL = addDefaultPort(rootURL) 1319 1320 // Tune GOMAXPROCS to match Linux container CPU quota. 1321 _, _ = maxprocs.Set() 1322 1323 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler. 1324 // The block profiler is disabled by default and should be enabled with care in production 1325 runtime.SetBlockProfileRate(conf.blockProfileRate) 1326 1327 // Automatically prepend our own path at the front, to minimize 1328 // required configuration. 1329 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1330 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1331 } 1332 1333 if _, err := os.Stat(conf.index); err != nil { 1334 if err := os.MkdirAll(conf.index, 0755); err != nil { 1335 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1336 } 1337 } 1338 1339 if err := setupTmpDir(conf.Main, conf.index); err != nil { 1340 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1341 } 1342 1343 if srcLogLevelIsDebug() { 1344 debug = log.New(os.Stderr, "", log.LstdFlags) 1345 } 1346 1347 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1348 if len(reposWithSeparateIndexingMetrics) > 0 { 1349 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1350 } 1351 1352 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1353 if len(deltaBuildRepositoriesAllowList) > 0 { 1354 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1355 } 1356 1357 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1358 if deltaShardNumberFallbackThreshold > 0 { 1359 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1360 } else { 1361 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1362 } 1363 1364 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1365 if len(reposShouldSkipSymbolsCalculation) > 0 { 1366 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1367 } 1368 1369 var sg Sourcegraph 1370 if rootURL.IsAbs() { 1371 var batchSize int 1372 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1373 batchSize, err = strconv.Atoi(v) 1374 if err != nil { 1375 return nil, fmt.Errorf("Invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1376 } 1377 } 1378 1379 opts := []SourcegraphClientOption{ 1380 WithBatchSize(batchSize), 1381 WithShouldUseGRPC(conf.useGRPC), 1382 } 1383 1384 gRPCConnectionOptions := []grpc.DialOption{ 1385 grpc.WithTransportCredentials(insecure.NewCredentials()), 1386 grpc.WithChainStreamInterceptor(internalActorStreamInterceptor()), 1387 grpc.WithChainUnaryInterceptor(internalActorUnaryInterceptor()), 1388 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1389 } 1390 1391 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1392 // This is done lazily, so we can provide the client to use regardless of 1393 // whether we enabled gRPC or not initially. 1394 cc, err := grpc.Dial(rootURL.Host, gRPCConnectionOptions...) 1395 if err != nil { 1396 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1397 } 1398 1399 client := proto.NewZoektConfigurationServiceClient(cc) 1400 opts = append(opts, WithGRPCClient(client)) 1401 1402 sg = newSourcegraphClient(rootURL, conf.hostname, opts...) 1403 1404 } else { 1405 sg = sourcegraphFake{ 1406 RootDir: rootURL.String(), 1407 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1408 } 1409 } 1410 1411 if conf.indexConcurrency < 1 { 1412 conf.indexConcurrency = 1 1413 } 1414 1415 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1416 if cpuCount < 1 { 1417 cpuCount = 1 1418 } 1419 1420 logger := sglog.Scoped("server", "periodically reindexes enabled repositories on sourcegraph") 1421 1422 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1423 1424 return &Server{ 1425 logger: logger, 1426 Sourcegraph: sg, 1427 IndexDir: conf.index, 1428 IndexConcurrency: int(conf.indexConcurrency), 1429 Interval: conf.interval, 1430 CPUCount: cpuCount, 1431 queue: *q, 1432 shardMerging: zoekt.ShardMergingEnabled(), 1433 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1434 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1435 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1436 hostname: conf.hostname, 1437 mergeOpts: mergeOpts{ 1438 vacuumInterval: conf.vacuumInterval, 1439 mergeInterval: conf.mergeInterval, 1440 targetSizeBytes: conf.targetSize * 1024 * 1024, 1441 minSizeBytes: conf.minSize * 1024 * 1024, 1442 minAgeDays: conf.minAgeDays, 1443 maxPriority: conf.maxPriority, 1444 }, 1445 }, err 1446} 1447 1448// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1449// for the indexed-search-configuration gRPC service. 1450// 1451// The default backoff strategy is modeled after the default settings used by 1452// retryablehttp.DefaultClient. 1453// 1454// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1455// - Unavailable 1456// - Aborted 1457// 1458//go:embed default_grpc_service_configuration.json 1459var defaultGRPCServiceConfigurationJSON string 1460 1461func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1462 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1463 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1464 return invoker(ctx, method, req, reply, cc, opts...) 1465 } 1466} 1467 1468func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1469 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1470 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1471 return streamer(ctx, desc, cc, method, opts...) 1472 } 1473} 1474 1475// addDefaultPort adds a default port to a URL if one is not specified. 1476// 1477// If the URL scheme is "http" and no port is specified, "80" is used. 1478// If the scheme is "https", "443" is used. 1479// 1480// The original URL is not mutated. A copy is modified and returned. 1481func addDefaultPort(original *url.URL) *url.URL { 1482 if original == nil { 1483 return nil // don't panic 1484 } 1485 1486 if !original.IsAbs() { 1487 return original // don't do anything if the URL doesn't look like a remote URL 1488 } 1489 1490 if original.Scheme == "http" && original.Port() == "" { 1491 u := cloneURL(original) 1492 u.Host = net.JoinHostPort(u.Host, "80") 1493 return u 1494 } 1495 1496 if original.Scheme == "https" && original.Port() == "" { 1497 u := cloneURL(original) 1498 u.Host = net.JoinHostPort(u.Host, "443") 1499 return u 1500 } 1501 1502 return original 1503} 1504 1505// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1506// This is copied from net/http/clone.go 1507func cloneURL(u *url.URL) *url.URL { 1508 if u == nil { 1509 return nil 1510 } 1511 u2 := new(url.URL) 1512 *u2 = *u 1513 if u.User != nil { 1514 u2.User = new(url.Userinfo) 1515 *u2.User = *u.User 1516 } 1517 return u2 1518} 1519 1520func main() { 1521 liblog := sglog.Init(sglog.Resource{ 1522 Name: "zoekt-indexserver", 1523 Version: zoekt.Version, 1524 InstanceID: hostnameBestEffort(), 1525 }) 1526 defer liblog.Sync() 1527 1528 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1529 log.Fatal(err) 1530 } 1531}