fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled 2// repositories on sourcegraph 3package main 4 5import ( 6 "bytes" 7 "context" 8 _ "embed" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "html/template" 14 "io" 15 "io/fs" 16 "log" 17 "math" 18 "math/rand" 19 "net" 20 "net/http" 21 "net/url" 22 "os" 23 "os/exec" 24 "os/signal" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strconv" 29 "strings" 30 "sync" 31 "text/tabwriter" 32 "time" 33 34 "github.com/keegancsmith/tmpfriend" 35 "github.com/peterbourgon/ff/v3/ffcli" 36 "github.com/prometheus/client_golang/prometheus" 37 "github.com/prometheus/client_golang/prometheus/promauto" 38 sglog "github.com/sourcegraph/log" 39 "github.com/sourcegraph/zoekt/grpc/internalerrs" 40 "github.com/sourcegraph/zoekt/grpc/messagesize" 41 "go.uber.org/automaxprocs/maxprocs" 42 "golang.org/x/net/trace" 43 "golang.org/x/sys/unix" 44 "google.golang.org/grpc" 45 "google.golang.org/grpc/credentials/insecure" 46 "google.golang.org/grpc/metadata" 47 48 "github.com/sourcegraph/mountinfo" 49 50 "github.com/sourcegraph/zoekt" 51 "github.com/sourcegraph/zoekt/build" 52 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 53 "github.com/sourcegraph/zoekt/debugserver" 54 "github.com/sourcegraph/zoekt/internal/profiler" 55) 56 57var ( 58 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 59 Name: "resolve_revisions_seconds", 60 Help: "A histogram of latencies for resolving all repository revisions.", 61 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 62 }) 63 64 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 65 Name: "resolve_revision_seconds", 66 Help: "A histogram of latencies for resolving a repository revision.", 67 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 68 }, []string{"success"}) // success=true|false 69 70 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 71 Name: "get_index_options_total", 72 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 73 }) 74 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 75 Name: "get_index_options_error_total", 76 Help: "The total number of times we failed to get index options for a repository.", 77 }) 78 79 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 80 Name: "index_repo_seconds", 81 Help: "A histogram of latencies for indexing a repository.", 82 Buckets: prometheus.ExponentialBucketsRange( 83 (100 * time.Millisecond).Seconds(), 84 (40*time.Minute + indexTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 85 20), 86 }, []string{ 87 "state", // state is an indexState 88 "name", // name of the repository that was indexed 89 }) 90 91 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 92 Name: "index_fetch_seconds", 93 Help: "A histogram of latencies for fetching a repository.", 94 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 95 }, []string{ 96 "success", // true|false 97 "name", // the name of the repository that the commits were fetched from 98 }) 99 100 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 101 Name: "index_incremental_index_state", 102 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 103 }, []string{"state"}) // state is build.IndexState 104 105 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 106 Name: "index_num_indexed", 107 Help: "Number of indexed repos by code host", 108 }) 109 110 metricNumAssigned = promauto.NewGauge(prometheus.GaugeOpts{ 111 Name: "index_num_assigned", 112 Help: "Number of repos assigned to this indexer by code host", 113 }) 114 115 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 116 Name: "index_failing_total", 117 Help: "Counts failures to index (indexing activity, should be used with rate())", 118 }) 119 120 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 121 Name: "index_indexing_total", 122 Help: "Counts indexings (indexing activity, should be used with rate())", 123 }) 124 125 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 126 Name: "index_num_stopped_tracking_total", 127 Help: "Counts the number of repos we stopped tracking.", 128 }) 129) 130 131// set of repositories that we want to capture separate indexing metrics for 132var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 133 134type indexState string 135 136const ( 137 indexStateFail indexState = "fail" 138 indexStateSuccess indexState = "success" 139 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 140 indexStateNoop indexState = "noop" // We didn't need to update index 141 indexStateEmpty indexState = "empty" // index is empty (empty repo) 142) 143 144// Server is the main functionality of zoekt-sourcegraph-indexserver. It 145// exists to conveniently use all the options passed in via func main. 146type Server struct { 147 logger sglog.Logger 148 149 Sourcegraph Sourcegraph 150 BatchSize int 151 152 // IndexDir is the index directory to use. 153 IndexDir string 154 155 // IndexConcurrency is the number of repositories we index at once. 156 IndexConcurrency int 157 158 // Interval is how often we sync with Sourcegraph. 159 Interval time.Duration 160 // CPUCount is the amount of parallelism to use when indexing a 161 // repository. 162 CPUCount int 163 164 queue Queue 165 166 // muIndexDir protects the index directory from concurrent access. 167 muIndexDir indexMutex 168 169 // If true, shard merging is enabled. 170 shardMerging bool 171 172 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 173 // use delta-builds for instead of normal builds 174 deltaBuildRepositoriesAllowList map[string]struct{} 175 176 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 177 // before attempting a delta build. 178 deltaShardNumberFallbackThreshold uint64 179 180 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 181 // we skip calculating symbols metadata for during builds 182 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 183 184 hostname string 185 186 mergeOpts mergeOpts 187} 188 189var debug = log.New(io.Discard, "", log.LstdFlags) 190 191// our index commands should output something every 100mb they process. 192// 193// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 194// time." famous last words. A client was indexing a monorepo with 42 195// cores... 5m was not enough. 196const noOutputTimeout = 30 * time.Minute 197 198func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 199 out := &synchronizedBuffer{} 200 cmd.Stdout = out 201 cmd.Stderr = out 202 203 tr.LazyPrintf("%s", cmd.Args) 204 205 defer func() { 206 if err != nil { 207 outS := out.String() 208 tr.LazyPrintf("failed: %v", err) 209 tr.LazyPrintf("output: %s", out) 210 tr.SetError() 211 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 212 } 213 }() 214 215 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 216 217 if err := cmd.Start(); err != nil { 218 return err 219 } 220 221 errC := make(chan error) 222 go func() { 223 errC <- cmd.Wait() 224 }() 225 226 // This channel is set after we have sent sigquit. It allows us to follow up 227 // with a sigkill if the process doesn't quit after sigquit. 228 kill := make(<-chan time.Time) 229 230 lastLen := 0 231 for { 232 select { 233 case <-time.After(noOutputTimeout): 234 // Periodically check if we have had output. If not kill the process. 235 if out.Len() != lastLen { 236 lastLen = out.Len() 237 log.Printf("still running %s", cmd.Args) 238 } else { 239 // Send quit (C-\) first so we get a stack dump. 240 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 241 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 242 log.Println("quit failed:", err) 243 } 244 245 // send sigkill if still running in 10s 246 kill = time.After(10 * time.Second) 247 } 248 249 case <-kill: 250 log.Printf("still running, killing %s", cmd.Args) 251 if err := cmd.Process.Kill(); err != nil { 252 log.Println("kill failed:", err) 253 } 254 255 case err := <-errC: 256 if err != nil { 257 return err 258 } 259 260 tr.LazyPrintf("success") 261 return nil 262 } 263 } 264} 265 266// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 267// monitor the buffer while it is being written to. 268type synchronizedBuffer struct { 269 mu sync.Mutex 270 b bytes.Buffer 271} 272 273func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 274 sb.mu.Lock() 275 defer sb.mu.Unlock() 276 return sb.b.Write(p) 277} 278 279func (sb *synchronizedBuffer) Len() int { 280 sb.mu.Lock() 281 defer sb.mu.Unlock() 282 return sb.b.Len() 283} 284 285func (sb *synchronizedBuffer) String() string { 286 sb.mu.Lock() 287 defer sb.mu.Unlock() 288 return sb.b.String() 289} 290 291// pauseFileName if present in IndexDir will stop index jobs from 292// running. This is to make it possible to experiment with the content of the 293// IndexDir without the indexserver writing to it. 294const pauseFileName = "PAUSE" 295 296// Run the sync loop. This blocks forever. 297func (s *Server) Run() { 298 removeIncompleteShards(s.IndexDir) 299 300 // Start a goroutine which updates the queue with commits to index. 301 go func() { 302 // We update the list of indexed repos every Interval. To speed up manual 303 // testing we also listen for SIGUSR1 to trigger updates. 304 // 305 // "pkill -SIGUSR1 zoekt-sourcegra" 306 for range jitterTicker(s.Interval, unix.SIGUSR1) { 307 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 308 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 309 continue 310 } 311 312 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 313 if err != nil { 314 log.Printf("error listing repos: %s", err) 315 continue 316 } 317 318 debug.Printf("updating index queue with %d repositories", len(repos.IDs)) 319 320 // Stop indexing repos we don't need to track anymore 321 removed := s.queue.MaybeRemoveMissing(repos.IDs) 322 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 323 if len(removed) > 0 { 324 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 325 } 326 327 cleanupDone := make(chan struct{}) 328 go func() { 329 defer close(cleanupDone) 330 s.muIndexDir.Global(func() { 331 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 332 }) 333 }() 334 335 repos.IterateIndexOptions(s.queue.AddOrUpdate) 336 337 // IterateIndexOptions will only iterate over repositories that have 338 // changed since we last called list. However, we want to add all IDs 339 // back onto the queue just to check that what is on disk is still 340 // correct. This will use the last IndexOptions we stored in the 341 // queue. The repositories not on the queue (missing) need a forced 342 // fetch of IndexOptions. 343 missing := s.queue.Bump(repos.IDs) 344 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 345 346 setCompoundShardCounter(s.IndexDir) 347 348 <-cleanupDone 349 } 350 }() 351 352 go func() { 353 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 354 if s.shardMerging { 355 s.vacuum() 356 } 357 } 358 }() 359 360 go func() { 361 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 362 if s.shardMerging { 363 s.doMerge() 364 } 365 } 366 }() 367 368 for i := 0; i < s.IndexConcurrency; i++ { 369 go s.processQueue() 370 } 371 372 // block forever 373 select {} 374} 375 376// formatList returns a comma-separated list of the first min(len(v), m) items. 377func formatListUint32(v []uint32, m int) string { 378 if len(v) < m { 379 m = len(v) 380 } 381 382 sb := strings.Builder{} 383 for i := 0; i < m; i++ { 384 fmt.Fprintf(&sb, "%d, ", v[i]) 385 } 386 387 if len(v) > m { 388 sb.WriteString("...") 389 } 390 391 return strings.TrimRight(sb.String(), ", ") 392} 393 394func (s *Server) processQueue() { 395 for { 396 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 397 time.Sleep(time.Second) 398 continue 399 } 400 401 opts, ok := s.queue.Pop() 402 if !ok { 403 time.Sleep(time.Second) 404 continue 405 } 406 407 args := s.indexArgs(opts) 408 409 ran := s.muIndexDir.With(opts.Name, func() { 410 // only record time taken once we hold the lock. This avoids us 411 // recording time taken while merging/cleanup runs. 412 start := time.Now() 413 414 state, err := s.Index(args) 415 416 elapsed := time.Since(start) 417 418 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 419 420 if err != nil { 421 log.Printf("error indexing %s: %s", args.String(), err) 422 } 423 424 switch state { 425 case indexStateSuccess: 426 var branches []string 427 for _, b := range args.Branches { 428 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 429 } 430 s.logger.Info("updated index", 431 sglog.String("repo", args.Name), 432 sglog.Uint32("id", args.RepoID), 433 sglog.Strings("branches", branches), 434 sglog.Duration("duration", elapsed), 435 ) 436 case indexStateSuccessMeta: 437 log.Printf("updated meta %s in %v", args.String(), elapsed) 438 } 439 s.queue.SetIndexed(opts, state) 440 }) 441 442 if !ran { 443 // Someone else is processing the repository. We can just skip this job 444 // since the repository will be added back to the queue and we will 445 // converge to the correct behaviour. 446 debug.Printf("index job for repository already running: %s", args) 447 continue 448 } 449 } 450} 451 452// repoNameForMetric returns a normalized version of the given repository name that is 453// suitable for use with Prometheus metrics. 454func repoNameForMetric(repo string) string { 455 // Check to see if we want to be able to capture separate indexing metrics for this repository. 456 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 457 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 458 return repo 459 } 460 461 return "" 462} 463 464func batched(slice []uint32, size int) <-chan []uint32 { 465 c := make(chan []uint32) 466 go func() { 467 for len(slice) > 0 { 468 if size > len(slice) { 469 size = len(slice) 470 } 471 c <- slice[:size] 472 slice = slice[size:] 473 } 474 close(c) 475 }() 476 return c 477} 478 479// jitterTicker returns a ticker which ticks with a jitter. Each tick is 480// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 481// 482// sig is a list of signals which also cause the ticker to fire. This is a 483// convenience to allow manually triggering of the ticker. 484func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 485 ticker := make(chan struct{}) 486 487 go func() { 488 for { 489 ticker <- struct{}{} 490 ns := int64(d) 491 jitter := rand.Int63n(ns) 492 time.Sleep(time.Duration(ns/2 + jitter)) 493 } 494 }() 495 496 go func() { 497 if len(sig) == 0 { 498 return 499 } 500 501 c := make(chan os.Signal, 1) 502 signal.Notify(c, sig...) 503 for range c { 504 ticker <- struct{}{} 505 } 506 }() 507 508 return ticker 509} 510 511// Index starts an index job for repo name at commit. 512func (s *Server) Index(args *indexArgs) (state indexState, err error) { 513 tr := trace.New("index", args.Name) 514 515 defer func() { 516 if err != nil { 517 tr.SetError() 518 tr.LazyPrintf("error: %v", err) 519 state = indexStateFail 520 metricFailingTotal.Inc() 521 } 522 tr.LazyPrintf("state: %s", state) 523 tr.Finish() 524 }() 525 526 tr.LazyPrintf("branches: %v", args.Branches) 527 528 if len(args.Branches) == 0 { 529 return indexStateEmpty, createEmptyShard(args) 530 } 531 532 repositoryName := args.Name 533 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 534 tr.LazyPrintf("marking this repository for delta build") 535 args.UseDelta = true 536 } 537 538 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 539 540 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 541 tr.LazyPrintf("skipping symbols calculation") 542 args.Symbols = false 543 } 544 545 reason := "forced" 546 547 if args.Incremental { 548 bo := args.BuildOptions() 549 bo.SetDefaults() 550 incrementalState, fn := bo.IndexState() 551 reason = string(incrementalState) 552 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 553 554 switch incrementalState { 555 case build.IndexStateEqual: 556 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn) 557 return indexStateNoop, nil 558 559 case build.IndexStateMeta: 560 log.Printf("updating index.meta %s", args.String()) 561 562 if err := mergeMeta(bo); err != nil { 563 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 564 } else { 565 return indexStateSuccessMeta, nil 566 } 567 568 case build.IndexStateCorrupt: 569 log.Printf("falling back to full update: corrupt index: %s", args.String()) 570 } 571 } 572 573 log.Printf("updating index %s reason=%s", args.String(), reason) 574 575 metricIndexingTotal.Inc() 576 c := gitIndexConfig{ 577 runCmd: func(cmd *exec.Cmd) error { 578 return s.loggedRun(tr, cmd) 579 }, 580 581 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 582 return args.BuildOptions().FindRepositoryMetadata() 583 }, 584 } 585 586 err = gitIndex(c, args, s.Sourcegraph, s.logger) 587 if err != nil { 588 return indexStateFail, err 589 } 590 591 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 592 s.logger.Error("failed to update index status", 593 sglog.String("repo", args.Name), 594 sglog.Uint32("id", args.RepoID), 595 sglogBranches("branches", args.Branches), 596 sglog.Error(err), 597 ) 598 } 599 600 return indexStateSuccess, nil 601} 602 603// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 604// it can update the zoekt_repos table. 605func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 606 // We need to read from disk for IndexTime. 607 _, metadata, ok, err := c.findRepositoryMetadata(args) 608 if err != nil { 609 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 610 } 611 if !ok { 612 return errors.New("failed to find metadata for new/updated index") 613 } 614 615 status := []indexStatus{{ 616 RepoID: args.RepoID, 617 Branches: args.Branches, 618 IndexTimeUnix: metadata.IndexTime.Unix(), 619 }} 620 if err := sg.UpdateIndexStatus(status); err != nil { 621 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 622 } 623 624 return nil 625} 626 627func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 628 ss := make([]string, len(branches)) 629 for i, b := range branches { 630 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 631 } 632 return sglog.Strings(key, ss) 633} 634 635func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 636 return &indexArgs{ 637 IndexOptions: opts, 638 639 IndexDir: s.IndexDir, 640 Parallelism: s.CPUCount, 641 642 Incremental: true, 643 644 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 645 FileLimit: 1 << 20, 646 } 647} 648 649func createEmptyShard(args *indexArgs) error { 650 bo := args.BuildOptions() 651 bo.SetDefaults() 652 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 653 654 if args.Incremental && bo.IncrementalSkipIndexing() { 655 return nil 656 } 657 658 builder, err := build.NewBuilder(*bo) 659 if err != nil { 660 return err 661 } 662 return builder.Finish() 663} 664 665// addDebugHandlers adds handlers specific to indexserver. 666func (s *Server) addDebugHandlers(mux *http.ServeMux) { 667 // Sourcegraph's site admin view requires indexserver to serve it's admin view 668 // on "/". 669 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 670 671 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 672 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 673 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 674 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 675 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 676 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 677} 678 679func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 680 if r.Method != "GET" { 681 w.Header().Set("Allow", "GET") 682 w.WriteHeader(http.StatusMethodNotAllowed) 683 return 684 } 685 686 response := struct { 687 Hostname string 688 }{ 689 Hostname: s.hostname, 690 } 691 692 b, err := json.Marshal(response) 693 if err != nil { 694 http.Error(w, err.Error(), http.StatusInternalServerError) 695 return 696 } 697 698 w.Header().Set("Content-Type", "application/json; charset=utf-8") 699 w.Write(b) 700} 701 702var rootTmpl = template.Must(template.New("name").Parse(` 703<html> 704 <body> 705 <a href="debug">Debug</a><br /> 706 <a href="debug/requests">Traces</a><br /> 707 {{.IndexMsg}}<br /> 708 <br /> 709 <h3>Reindex</h3> 710 {{if .Repos}} 711 <a href="?show_repos=false">hide repos</a><br /> 712 <table style="margin-top: 20px"> 713 <th style="text-align:left">Name</th> 714 <th style="text-align:left">ID</th> 715 {{range .Repos}} 716 <tr> 717 <td>{{.Name}}</td> 718 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 719 </tr> 720 {{end}} 721 </table> 722 {{else}} 723 <a href="?show_repos=true">show repos</a><br /> 724 {{end}} 725 </body> 726</html> 727`)) 728 729func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 730 if r.Method != "GET" { 731 w.Header().Set("Allow", "GET") 732 w.WriteHeader(http.StatusMethodNotAllowed) 733 return 734 } 735 736 values := r.URL.Query() 737 738 // ?id= 739 indexMsg := "" 740 if v := values.Get("id"); v != "" { 741 id, err := strconv.Atoi(v) 742 if err != nil { 743 http.Error(w, err.Error(), http.StatusBadRequest) 744 return 745 } 746 indexMsg, _ = s.forceIndex(uint32(id)) 747 } 748 749 // ?show_repos= 750 showRepos := false 751 if v := values.Get("show_repos"); v != "" { 752 showRepos, _ = strconv.ParseBool(v) 753 } 754 755 type Repo struct { 756 ID uint32 757 Name string 758 } 759 var data struct { 760 Repos []Repo 761 IndexMsg string 762 } 763 764 data.IndexMsg = indexMsg 765 766 if showRepos { 767 s.queue.Iterate(func(opts *IndexOptions) { 768 data.Repos = append(data.Repos, Repo{ 769 ID: opts.RepoID, 770 Name: opts.Name, 771 }) 772 }) 773 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 774 } 775 776 _ = rootTmpl.Execute(w, data) 777} 778 779// handleReindex triggers a reindex asynocronously. If a reindex was triggered 780// the request returns with status 202. The caller can infer the new state of 781// the index by calling List. 782func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 783 if r.Method != http.MethodPost { 784 w.Header().Set("Allow", http.MethodPost) 785 w.WriteHeader(http.StatusMethodNotAllowed) 786 return 787 } 788 789 err := r.ParseForm() 790 if err != nil { 791 http.Error(w, err.Error(), http.StatusBadRequest) 792 return 793 } 794 795 id, err := strconv.Atoi(r.Form.Get("repo")) 796 if err != nil { 797 http.Error(w, err.Error(), http.StatusBadRequest) 798 return 799 } 800 801 go func() { s.forceIndex(uint32(id)) }() 802 803 // 202 Accepted 804 w.WriteHeader(http.StatusAccepted) 805} 806 807func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 808 withIndexed := true 809 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 810 withIndexed = b 811 } 812 813 var indexed []uint32 814 if withIndexed { 815 indexed = listIndexed(s.IndexDir) 816 } 817 818 repos, err := s.Sourcegraph.List(r.Context(), indexed) 819 if err != nil { 820 http.Error(w, err.Error(), http.StatusInternalServerError) 821 return 822 } 823 824 bw := bytes.Buffer{} 825 826 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 827 828 _, err = fmt.Fprintf(tw, "ID\tName\n") 829 if err != nil { 830 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 831 return 832 } 833 834 s.queue.mu.Lock() 835 name := "" 836 for _, id := range repos.IDs { 837 if item := s.queue.get(id); item != nil { 838 name = item.opts.Name 839 } else { 840 name = "" 841 } 842 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 843 if err != nil { 844 debug.Printf("handleDebugList: %s\n", err.Error()) 845 } 846 } 847 s.queue.mu.Unlock() 848 849 if err != nil { 850 http.Error(w, err.Error(), http.StatusInternalServerError) 851 return 852 } 853 854 err = tw.Flush() 855 if err != nil { 856 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 857 return 858 } 859 860 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 861 862 if _, err := io.Copy(w, &bw); err != nil { 863 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 864 return 865 } 866} 867 868// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 869// can run this command during periods of low usage (evenings, weekends) to 870// trigger an initial merge run. In the steady-state, merges happen rarely, even 871// on busy instances, and users can rely on automatic merging instead. 872func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 873 874 // A merge operation can take very long, depending on the number merges and the 875 // target size of the compound shards. We run the merge in the background and 876 // return immediately to the user. 877 // 878 // We track the status of the merge with metricShardMergingRunning. 879 go func() { 880 s.doMerge() 881 }() 882 _, _ = w.Write([]byte("merging enqueued\n")) 883} 884 885func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 886 indexed := listIndexed(s.IndexDir) 887 888 bw := bytes.Buffer{} 889 890 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 891 892 _, err := fmt.Fprintf(tw, "ID\tName\n") 893 if err != nil { 894 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 895 return 896 } 897 898 s.queue.mu.Lock() 899 name := "" 900 for _, id := range indexed { 901 if item := s.queue.get(id); item != nil { 902 name = item.opts.Name 903 } else { 904 name = "" 905 } 906 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 907 if err != nil { 908 debug.Printf("handleDebugIndexed: %s\n", err.Error()) 909 } 910 } 911 s.queue.mu.Unlock() 912 913 if err != nil { 914 http.Error(w, err.Error(), http.StatusInternalServerError) 915 return 916 } 917 918 err = tw.Flush() 919 if err != nil { 920 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 921 return 922 } 923 924 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 925 926 if _, err := io.Copy(w, &bw); err != nil { 927 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 928 return 929 } 930} 931 932// forceIndex will run the index job for repo name now. It will return always 933// return a string explaining what it did, even if it failed. 934func (s *Server) forceIndex(id uint32) (string, error) { 935 var opts IndexOptions 936 var err error 937 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 938 opts = o 939 }, func(_ uint32, e error) { 940 err = e 941 }, id) 942 if err != nil { 943 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 944 } 945 946 args := s.indexArgs(opts) 947 args.Incremental = false // force re-index 948 949 var state indexState 950 ran := s.muIndexDir.With(opts.Name, func() { 951 state, err = s.Index(args) 952 }) 953 if !ran { 954 return fmt.Sprintf("index job for repository already running: %s", args), nil 955 } 956 if err != nil { 957 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 958 } 959 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 960} 961 962func listIndexed(indexDir string) []uint32 { 963 index := getShards(indexDir) 964 metricNumIndexed.Set(float64(len(index))) 965 repoIDs := make([]uint32, 0, len(index)) 966 for id := range index { 967 repoIDs = append(repoIDs, id) 968 } 969 sort.Slice(repoIDs, func(i, j int) bool { 970 return repoIDs[i] < repoIDs[j] 971 }) 972 return repoIDs 973} 974 975// setupTmpDir sets up a temporary directory on the same volume as the 976// indexes. 977// 978// If main is true we will delete older temp directories left around. main is 979// false when this is a debug command. 980func setupTmpDir(main bool, index string) error { 981 // change the target tmp directory depending on if its our main daemon or a 982 // debug sub command. 983 dir := ".indexserver.debug.tmp" 984 if main { 985 dir = ".indexserver.tmp" 986 } 987 988 tmpRoot := filepath.Join(index, dir) 989 if err := os.MkdirAll(tmpRoot, 0755); err != nil { 990 return err 991 } 992 if !tmpfriend.IsTmpFriendDir(tmpRoot) { 993 _, err := tmpfriend.RootTempDir(tmpRoot) 994 return err 995 } 996 return nil 997} 998 999func printMetaData(fn string) error { 1000 repo, indexMeta, err := zoekt.ReadMetadataPath(fn) 1001 if err != nil { 1002 return err 1003 } 1004 1005 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1006 if err != nil { 1007 return err 1008 } 1009 1010 err = json.NewEncoder(os.Stdout).Encode(repo) 1011 if err != nil { 1012 return err 1013 } 1014 return nil 1015} 1016 1017func printShardStats(fn string) error { 1018 f, err := os.Open(fn) 1019 if err != nil { 1020 return err 1021 } 1022 1023 iFile, err := zoekt.NewIndexFile(f) 1024 if err != nil { 1025 return err 1026 } 1027 1028 return zoekt.PrintNgramStats(iFile) 1029} 1030 1031func srcLogLevelIsDebug() bool { 1032 lvl := os.Getenv(sglog.EnvLogLevel) 1033 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1034} 1035 1036func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1037 v := os.Getenv(k) 1038 if v == "" { 1039 return defaultVal 1040 } 1041 i, err := strconv.ParseInt(v, 10, 64) 1042 if err != nil { 1043 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1044 } 1045 return i 1046} 1047 1048func getEnvWithDefaultInt(k string, defaultVal int) int { 1049 v := os.Getenv(k) 1050 if v == "" { 1051 return defaultVal 1052 } 1053 i, err := strconv.Atoi(k) 1054 if err != nil { 1055 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1056 } 1057 return i 1058} 1059 1060func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1061 v := os.Getenv(k) 1062 if v == "" { 1063 return defaultVal 1064 } 1065 i, err := strconv.ParseUint(v, 10, 64) 1066 if err != nil { 1067 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1068 } 1069 return i 1070} 1071 1072func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1073 v := os.Getenv(k) 1074 if v == "" { 1075 return defaultVal 1076 } 1077 f, err := strconv.ParseFloat(v, 64) 1078 if err != nil { 1079 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1080 } 1081 return f 1082} 1083 1084func getEnvWithDefaultString(k string, defaultVal string) string { 1085 v := os.Getenv(k) 1086 if v == "" { 1087 return defaultVal 1088 } 1089 return v 1090} 1091 1092func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1093 v := os.Getenv(k) 1094 if v == "" { 1095 return defaultVal 1096 } 1097 1098 d, err := time.ParseDuration(v) 1099 if err != nil { 1100 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1101 } 1102 return d 1103} 1104 1105func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1106 v := os.Getenv(k) 1107 if v == "" { 1108 return defaultVal 1109 } 1110 1111 b, err := strconv.ParseBool(v) 1112 if err != nil { 1113 log.Fatalf("error parsing ENV %s to bool: %s", k, err) 1114 } 1115 return b 1116} 1117 1118func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1119 set := map[string]struct{}{} 1120 for _, v := range strings.Split(os.Getenv(k), ",") { 1121 v = strings.TrimSpace(v) 1122 if v != "" { 1123 set[v] = struct{}{} 1124 } 1125 } 1126 return set 1127} 1128 1129func joinStringSet(set map[string]struct{}, sep string) string { 1130 var xs []string 1131 for x := range set { 1132 xs = append(xs, x) 1133 } 1134 1135 return strings.Join(xs, sep) 1136} 1137 1138func setCompoundShardCounter(indexDir string) { 1139 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt")) 1140 if err != nil { 1141 log.Printf("setCompoundShardCounter: %s\n", err) 1142 return 1143 } 1144 metricNumberCompoundShards.Set(float64(len(fns))) 1145} 1146 1147func rootCmd() *ffcli.Command { 1148 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1149 conf := rootConfig{ 1150 Main: true, 1151 } 1152 conf.registerRootFlags(rootFs) 1153 1154 return &ffcli.Command{ 1155 FlagSet: rootFs, 1156 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1157 Subcommands: []*ffcli.Command{debugCmd()}, 1158 Exec: func(ctx context.Context, args []string) error { 1159 return startServer(conf) 1160 }, 1161 } 1162} 1163 1164type rootConfig struct { 1165 // Main is true if this rootConfig is for our main long running command (the 1166 // indexserver). Debug commands should not set this value. This is used to 1167 // determine if we need to run tmpfriend. 1168 Main bool 1169 1170 root string 1171 interval time.Duration 1172 index string 1173 indexConcurrency int64 1174 listen string 1175 hostname string 1176 cpuFraction float64 1177 blockProfileRate int 1178 1179 // config values related to shard merging 1180 vacuumInterval time.Duration 1181 mergeInterval time.Duration 1182 targetSize int64 1183 minSize int64 1184 minAgeDays int 1185 maxPriority float64 1186 1187 // config values related to backoff indexing repos with one or more consecutive failures 1188 backoffDuration time.Duration 1189 maxBackoffDuration time.Duration 1190 1191 // useGRPC is true if we should use the gRPC API to talk to Sourcegraph. 1192 useGRPC bool 1193} 1194 1195func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1196 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1197 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1198 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of concurrent index jobs to run.") 1199 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1200 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1201 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1202 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1203 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate") 1204 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1205 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1206 fs.BoolVar(&rc.useGRPC, "use_grpc", getEnvWithDefaultBool("GRPC_ENABLED", false), "use the gRPC API to talk to Sourcegraph") 1207 1208 // flags related to shard merging 1209 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1210 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1211 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 1212 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 1213 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1214 fs.Float64Var(&rc.maxPriority, "merge_max_priority", getEnvWithDefaultFloat64("SRC_MERGE_MAX_PRIORITY", 100), "the maximum priority a shard can have to be considered for merging.") 1215} 1216 1217func startServer(conf rootConfig) error { 1218 s, err := newServer(conf) 1219 if err != nil { 1220 return err 1221 } 1222 1223 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate) 1224 setCompoundShardCounter(s.IndexDir) 1225 1226 if conf.listen != "" { 1227 1228 mux := http.NewServeMux() 1229 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1230 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1231 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1232 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1233 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1234 }...) 1235 s.addDebugHandlers(mux) 1236 1237 go func() { 1238 debug.Printf("serving HTTP on %s", conf.listen) 1239 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1240 1241 }() 1242 1243 // Serve mux on a unix domain socket on a best-effort-basis so that 1244 // webserver can call the endpoints via the shared filesystem. 1245 // 1246 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1247 // on the socket due to permission errors. See 1248 // https://github.com/docker/for-mac/issues/6239 1249 go func() { 1250 serveHTTPOverSocket := func() error { 1251 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1252 // We cannot bind a socket to an existing pathname. 1253 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1254 return fmt.Errorf("error removing socket file: %s", socket) 1255 } 1256 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1257 // unixpacket). 1258 l, err := net.Listen("unix", socket) 1259 if err != nil { 1260 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1261 } 1262 // Indexserver (root) and webserver (Sourcegraph) run with 1263 // different users. Per default, the socket is created with 1264 // permission 755 (root root), which doesn't let webserver write to 1265 // it. 1266 // 1267 // See https://github.com/golang/go/issues/11822 for more context. 1268 if err := os.Chmod(socket, 0777); err != nil { 1269 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1270 } 1271 debug.Printf("serving HTTP on %s", socket) 1272 return http.Serve(l, mux) 1273 } 1274 debug.Print(serveHTTPOverSocket()) 1275 }() 1276 } 1277 1278 oc := &ownerChecker{ 1279 Path: filepath.Join(conf.index, "owner.txt"), 1280 Hostname: conf.hostname, 1281 } 1282 go oc.Run() 1283 1284 logger := sglog.Scoped("metricsRegistration", "") 1285 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1286 1287 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1288 prometheus.DefaultRegisterer.MustRegister(c) 1289 1290 s.Run() 1291 return nil 1292} 1293 1294func newServer(conf rootConfig) (*Server, error) { 1295 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1296 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1297 } 1298 if conf.index == "" { 1299 return nil, fmt.Errorf("must set -index") 1300 } 1301 if conf.root == "" { 1302 return nil, fmt.Errorf("must set -sourcegraph_url") 1303 } 1304 rootURL, err := url.Parse(conf.root) 1305 if err != nil { 1306 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1307 } 1308 1309 rootURL = addDefaultPort(rootURL) 1310 1311 // Tune GOMAXPROCS to match Linux container CPU quota. 1312 _, _ = maxprocs.Set() 1313 1314 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler. 1315 // The block profiler is disabled by default and should be enabled with care in production 1316 runtime.SetBlockProfileRate(conf.blockProfileRate) 1317 1318 // Automatically prepend our own path at the front, to minimize 1319 // required configuration. 1320 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1321 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1322 } 1323 1324 if _, err := os.Stat(conf.index); err != nil { 1325 if err := os.MkdirAll(conf.index, 0755); err != nil { 1326 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1327 } 1328 } 1329 1330 if err := setupTmpDir(conf.Main, conf.index); err != nil { 1331 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1332 } 1333 1334 if srcLogLevelIsDebug() { 1335 debug = log.New(os.Stderr, "", log.LstdFlags) 1336 } 1337 1338 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1339 if len(reposWithSeparateIndexingMetrics) > 0 { 1340 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1341 } 1342 1343 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1344 if len(deltaBuildRepositoriesAllowList) > 0 { 1345 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1346 } 1347 1348 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1349 if deltaShardNumberFallbackThreshold > 0 { 1350 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1351 } else { 1352 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1353 } 1354 1355 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1356 if len(reposShouldSkipSymbolsCalculation) > 0 { 1357 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1358 } 1359 1360 var sg Sourcegraph 1361 if rootURL.IsAbs() { 1362 var batchSize int 1363 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1364 batchSize, err = strconv.Atoi(v) 1365 if err != nil { 1366 return nil, fmt.Errorf("Invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1367 } 1368 } 1369 1370 opts := []SourcegraphClientOption{ 1371 WithBatchSize(batchSize), 1372 WithShouldUseGRPC(conf.useGRPC), 1373 } 1374 1375 logger := sglog.Scoped("zoektConfigurationGRPCClient", "") 1376 client, err := dialGRPCClient(rootURL.Host, logger) 1377 if err != nil { 1378 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1379 } 1380 1381 opts = append(opts, WithGRPCClient(client)) 1382 sg = newSourcegraphClient(rootURL, conf.hostname, opts...) 1383 1384 } else { 1385 sg = sourcegraphFake{ 1386 RootDir: rootURL.String(), 1387 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1388 } 1389 } 1390 1391 if conf.indexConcurrency < 1 { 1392 conf.indexConcurrency = 1 1393 } 1394 1395 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1396 if cpuCount < 1 { 1397 cpuCount = 1 1398 } 1399 1400 logger := sglog.Scoped("server", "periodically reindexes enabled repositories on sourcegraph") 1401 1402 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1403 1404 return &Server{ 1405 logger: logger, 1406 Sourcegraph: sg, 1407 IndexDir: conf.index, 1408 IndexConcurrency: int(conf.indexConcurrency), 1409 Interval: conf.interval, 1410 CPUCount: cpuCount, 1411 queue: *q, 1412 shardMerging: zoekt.ShardMergingEnabled(), 1413 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1414 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1415 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1416 hostname: conf.hostname, 1417 mergeOpts: mergeOpts{ 1418 vacuumInterval: conf.vacuumInterval, 1419 mergeInterval: conf.mergeInterval, 1420 targetSizeBytes: conf.targetSize * 1024 * 1024, 1421 minSizeBytes: conf.minSize * 1024 * 1024, 1422 minAgeDays: conf.minAgeDays, 1423 maxPriority: conf.maxPriority, 1424 }, 1425 }, err 1426} 1427 1428// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1429// for the indexed-search-configuration gRPC service. 1430// 1431// The default backoff strategy is modeled after the default settings used by 1432// retryablehttp.DefaultClient. 1433// 1434// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1435// - Unavailable 1436// - Aborted 1437// 1438//go:embed default_grpc_service_configuration.json 1439var defaultGRPCServiceConfigurationJSON string 1440 1441func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1442 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1443 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1444 return invoker(ctx, method, req, reply, cc, opts...) 1445 } 1446} 1447 1448func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1449 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1450 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1451 return streamer(ctx, desc, cc, method, opts...) 1452 } 1453} 1454 1455// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1456// This can be overridden by providing custom Server/Dial options. 1457const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1458 1459func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) { 1460 opts := []grpc.DialOption{ 1461 grpc.WithTransportCredentials(insecure.NewCredentials()), 1462 grpc.WithChainStreamInterceptor( 1463 internalActorStreamInterceptor(), 1464 internalerrs.LoggingStreamClientInterceptor(logger), 1465 internalerrs.PrometheusStreamClientInterceptor, 1466 ), 1467 grpc.WithChainUnaryInterceptor( 1468 internalActorUnaryInterceptor(), 1469 internalerrs.LoggingUnaryClientInterceptor(logger), 1470 internalerrs.PrometheusUnaryClientInterceptor, 1471 ), 1472 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1473 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1474 } 1475 1476 opts = append(opts, additionalOpts...) 1477 1478 // Ensure that the message size options are set last, so they override any other 1479 // client-specific options that tweak the message size. 1480 // 1481 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1482 // take precedence over everything else with a uniform size setting that's easy to reason about. 1483 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1484 1485 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1486 // This is done lazily, so we can provide the client to use regardless of 1487 // whether we enabled gRPC or not initially. 1488 cc, err := grpc.Dial(addr, opts...) 1489 if err != nil { 1490 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1491 } 1492 1493 client := proto.NewZoektConfigurationServiceClient(cc) 1494 return client, nil 1495} 1496 1497// addDefaultPort adds a default port to a URL if one is not specified. 1498// 1499// If the URL scheme is "http" and no port is specified, "80" is used. 1500// If the scheme is "https", "443" is used. 1501// 1502// The original URL is not mutated. A copy is modified and returned. 1503func addDefaultPort(original *url.URL) *url.URL { 1504 if original == nil { 1505 return nil // don't panic 1506 } 1507 1508 if !original.IsAbs() { 1509 return original // don't do anything if the URL doesn't look like a remote URL 1510 } 1511 1512 if original.Scheme == "http" && original.Port() == "" { 1513 u := cloneURL(original) 1514 u.Host = net.JoinHostPort(u.Host, "80") 1515 return u 1516 } 1517 1518 if original.Scheme == "https" && original.Port() == "" { 1519 u := cloneURL(original) 1520 u.Host = net.JoinHostPort(u.Host, "443") 1521 return u 1522 } 1523 1524 return original 1525} 1526 1527// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1528// This is copied from net/http/clone.go 1529func cloneURL(u *url.URL) *url.URL { 1530 if u == nil { 1531 return nil 1532 } 1533 u2 := new(url.URL) 1534 *u2 = *u 1535 if u.User != nil { 1536 u2.User = new(url.Userinfo) 1537 *u2.User = *u.User 1538 } 1539 return u2 1540} 1541 1542func main() { 1543 liblog := sglog.Init(sglog.Resource{ 1544 Name: "zoekt-indexserver", 1545 Version: zoekt.Version, 1546 InstanceID: zoekt.HostnameBestEffort(), 1547 }) 1548 defer liblog.Sync() 1549 1550 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1551 log.Fatal(err) 1552 } 1553}