fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled 2// repositories on sourcegraph 3package main 4 5import ( 6 "bytes" 7 "context" 8 _ "embed" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "html/template" 14 "io" 15 "io/fs" 16 "log" 17 "math" 18 "math/rand" 19 "net" 20 "net/http" 21 "net/url" 22 "os" 23 "os/exec" 24 "os/signal" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strconv" 29 "strings" 30 "sync" 31 "text/tabwriter" 32 "time" 33 34 "github.com/keegancsmith/tmpfriend" 35 "github.com/peterbourgon/ff/v3/ffcli" 36 "github.com/prometheus/client_golang/prometheus" 37 "github.com/prometheus/client_golang/prometheus/promauto" 38 sglog "github.com/sourcegraph/log" 39 "go.uber.org/automaxprocs/maxprocs" 40 "golang.org/x/net/trace" 41 "golang.org/x/sys/unix" 42 "google.golang.org/grpc" 43 "google.golang.org/grpc/credentials/insecure" 44 "google.golang.org/grpc/metadata" 45 46 "github.com/sourcegraph/mountinfo" 47 48 "github.com/sourcegraph/zoekt" 49 "github.com/sourcegraph/zoekt/build" 50 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 51 "github.com/sourcegraph/zoekt/debugserver" 52 "github.com/sourcegraph/zoekt/internal/profiler" 53) 54 55var ( 56 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 57 Name: "resolve_revisions_seconds", 58 Help: "A histogram of latencies for resolving all repository revisions.", 59 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 60 }) 61 62 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 63 Name: "resolve_revision_seconds", 64 Help: "A histogram of latencies for resolving a repository revision.", 65 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 66 }, []string{"success"}) // success=true|false 67 68 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 69 Name: "get_index_options_total", 70 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 71 }) 72 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 73 Name: "get_index_options_error_total", 74 Help: "The total number of times we failed to get index options for a repository.", 75 }) 76 77 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 78 Name: "index_repo_seconds", 79 Help: "A histogram of latencies for indexing a repository.", 80 Buckets: prometheus.ExponentialBucketsRange( 81 (100 * time.Millisecond).Seconds(), 82 (40*time.Minute + indexTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 83 20), 84 }, []string{ 85 "state", // state is an indexState 86 "name", // name of the repository that was indexed 87 }) 88 89 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 90 Name: "index_fetch_seconds", 91 Help: "A histogram of latencies for fetching a repository.", 92 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 93 }, []string{ 94 "success", // true|false 95 "name", // the name of the repository that the commits were fetched from 96 }) 97 98 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 99 Name: "index_incremental_index_state", 100 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 101 }, []string{"state"}) // state is build.IndexState 102 103 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 104 Name: "index_num_indexed", 105 Help: "Number of indexed repos by code host", 106 }) 107 108 metricNumAssigned = promauto.NewGauge(prometheus.GaugeOpts{ 109 Name: "index_num_assigned", 110 Help: "Number of repos assigned to this indexer by code host", 111 }) 112 113 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 114 Name: "index_failing_total", 115 Help: "Counts failures to index (indexing activity, should be used with rate())", 116 }) 117 118 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 119 Name: "index_indexing_total", 120 Help: "Counts indexings (indexing activity, should be used with rate())", 121 }) 122 123 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 124 Name: "index_num_stopped_tracking_total", 125 Help: "Counts the number of repos we stopped tracking.", 126 }) 127) 128 129// set of repositories that we want to capture separate indexing metrics for 130var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 131 132type indexState string 133 134const ( 135 indexStateFail indexState = "fail" 136 indexStateSuccess indexState = "success" 137 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 138 indexStateNoop indexState = "noop" // We didn't need to update index 139 indexStateEmpty indexState = "empty" // index is empty (empty repo) 140) 141 142// Server is the main functionality of zoekt-sourcegraph-indexserver. It 143// exists to conveniently use all the options passed in via func main. 144type Server struct { 145 logger sglog.Logger 146 147 Sourcegraph Sourcegraph 148 BatchSize int 149 150 // IndexDir is the index directory to use. 151 IndexDir string 152 153 // IndexConcurrency is the number of repositories we index at once. 154 IndexConcurrency int 155 156 // Interval is how often we sync with Sourcegraph. 157 Interval time.Duration 158 // CPUCount is the amount of parallelism to use when indexing a 159 // repository. 160 CPUCount int 161 162 queue Queue 163 164 // muIndexDir protects the index directory from concurrent access. 165 muIndexDir indexMutex 166 167 // If true, shard merging is enabled. 168 shardMerging bool 169 170 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 171 // use delta-builds for instead of normal builds 172 deltaBuildRepositoriesAllowList map[string]struct{} 173 174 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 175 // before attempting a delta build. 176 deltaShardNumberFallbackThreshold uint64 177 178 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 179 // we skip calculating symbols metadata for during builds 180 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 181 182 hostname string 183 184 mergeOpts mergeOpts 185} 186 187var debug = log.New(io.Discard, "", log.LstdFlags) 188 189// our index commands should output something every 100mb they process. 190// 191// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 192// time." famous last words. A client was indexing a monorepo with 42 193// cores... 5m was not enough. 194const noOutputTimeout = 30 * time.Minute 195 196func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 197 out := &synchronizedBuffer{} 198 cmd.Stdout = out 199 cmd.Stderr = out 200 201 tr.LazyPrintf("%s", cmd.Args) 202 203 defer func() { 204 if err != nil { 205 outS := out.String() 206 tr.LazyPrintf("failed: %v", err) 207 tr.LazyPrintf("output: %s", out) 208 tr.SetError() 209 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 210 } 211 }() 212 213 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 214 215 if err := cmd.Start(); err != nil { 216 return err 217 } 218 219 errC := make(chan error) 220 go func() { 221 errC <- cmd.Wait() 222 }() 223 224 // This channel is set after we have sent sigquit. It allows us to follow up 225 // with a sigkill if the process doesn't quit after sigquit. 226 kill := make(<-chan time.Time) 227 228 lastLen := 0 229 for { 230 select { 231 case <-time.After(noOutputTimeout): 232 // Periodically check if we have had output. If not kill the process. 233 if out.Len() != lastLen { 234 lastLen = out.Len() 235 log.Printf("still running %s", cmd.Args) 236 } else { 237 // Send quit (C-\) first so we get a stack dump. 238 log.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 239 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 240 log.Println("quit failed:", err) 241 } 242 243 // send sigkill if still running in 10s 244 kill = time.After(10 * time.Second) 245 } 246 247 case <-kill: 248 log.Printf("still running, killing %s", cmd.Args) 249 if err := cmd.Process.Kill(); err != nil { 250 log.Println("kill failed:", err) 251 } 252 253 case err := <-errC: 254 if err != nil { 255 return err 256 } 257 258 tr.LazyPrintf("success") 259 return nil 260 } 261 } 262} 263 264// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 265// monitor the buffer while it is being written to. 266type synchronizedBuffer struct { 267 mu sync.Mutex 268 b bytes.Buffer 269} 270 271func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 272 sb.mu.Lock() 273 defer sb.mu.Unlock() 274 return sb.b.Write(p) 275} 276 277func (sb *synchronizedBuffer) Len() int { 278 sb.mu.Lock() 279 defer sb.mu.Unlock() 280 return sb.b.Len() 281} 282 283func (sb *synchronizedBuffer) String() string { 284 sb.mu.Lock() 285 defer sb.mu.Unlock() 286 return sb.b.String() 287} 288 289// pauseFileName if present in IndexDir will stop index jobs from 290// running. This is to make it possible to experiment with the content of the 291// IndexDir without the indexserver writing to it. 292const pauseFileName = "PAUSE" 293 294// Run the sync loop. This blocks forever. 295func (s *Server) Run() { 296 removeIncompleteShards(s.IndexDir) 297 298 // Start a goroutine which updates the queue with commits to index. 299 go func() { 300 // We update the list of indexed repos every Interval. To speed up manual 301 // testing we also listen for SIGUSR1 to trigger updates. 302 // 303 // "pkill -SIGUSR1 zoekt-sourcegra" 304 for range jitterTicker(s.Interval, unix.SIGUSR1) { 305 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 306 log.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 307 continue 308 } 309 310 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 311 if err != nil { 312 log.Printf("error listing repos: %s", err) 313 continue 314 } 315 316 debug.Printf("updating index queue with %d repositories", len(repos.IDs)) 317 318 // Stop indexing repos we don't need to track anymore 319 removed := s.queue.MaybeRemoveMissing(repos.IDs) 320 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 321 if len(removed) > 0 { 322 log.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 323 } 324 325 cleanupDone := make(chan struct{}) 326 go func() { 327 defer close(cleanupDone) 328 s.muIndexDir.Global(func() { 329 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 330 }) 331 }() 332 333 repos.IterateIndexOptions(s.queue.AddOrUpdate) 334 335 // IterateIndexOptions will only iterate over repositories that have 336 // changed since we last called list. However, we want to add all IDs 337 // back onto the queue just to check that what is on disk is still 338 // correct. This will use the last IndexOptions we stored in the 339 // queue. The repositories not on the queue (missing) need a forced 340 // fetch of IndexOptions. 341 missing := s.queue.Bump(repos.IDs) 342 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 343 344 setCompoundShardCounter(s.IndexDir) 345 346 <-cleanupDone 347 } 348 }() 349 350 go func() { 351 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 352 if s.shardMerging { 353 s.vacuum() 354 } 355 } 356 }() 357 358 go func() { 359 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 360 if s.shardMerging { 361 s.doMerge() 362 } 363 } 364 }() 365 366 for i := 0; i < s.IndexConcurrency; i++ { 367 go s.processQueue() 368 } 369 370 // block forever 371 select {} 372} 373 374// formatList returns a comma-separated list of the first min(len(v), m) items. 375func formatListUint32(v []uint32, m int) string { 376 if len(v) < m { 377 m = len(v) 378 } 379 380 sb := strings.Builder{} 381 for i := 0; i < m; i++ { 382 fmt.Fprintf(&sb, "%d, ", v[i]) 383 } 384 385 if len(v) > m { 386 sb.WriteString("...") 387 } 388 389 return strings.TrimRight(sb.String(), ", ") 390} 391 392func (s *Server) processQueue() { 393 for { 394 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 395 time.Sleep(time.Second) 396 continue 397 } 398 399 opts, ok := s.queue.Pop() 400 if !ok { 401 time.Sleep(time.Second) 402 continue 403 } 404 405 args := s.indexArgs(opts) 406 407 ran := s.muIndexDir.With(opts.Name, func() { 408 // only record time taken once we hold the lock. This avoids us 409 // recording time taken while merging/cleanup runs. 410 start := time.Now() 411 412 state, err := s.Index(args) 413 414 elapsed := time.Since(start) 415 416 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 417 418 if err != nil { 419 log.Printf("error indexing %s: %s", args.String(), err) 420 } 421 422 switch state { 423 case indexStateSuccess: 424 var branches []string 425 for _, b := range args.Branches { 426 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 427 } 428 s.logger.Info("updated index", 429 sglog.String("repo", args.Name), 430 sglog.Uint32("id", args.RepoID), 431 sglog.Strings("branches", branches), 432 sglog.Duration("duration", elapsed), 433 ) 434 case indexStateSuccessMeta: 435 log.Printf("updated meta %s in %v", args.String(), elapsed) 436 } 437 s.queue.SetIndexed(opts, state) 438 }) 439 440 if !ran { 441 // Someone else is processing the repository. We can just skip this job 442 // since the repository will be added back to the queue and we will 443 // converge to the correct behaviour. 444 debug.Printf("index job for repository already running: %s", args) 445 continue 446 } 447 } 448} 449 450// repoNameForMetric returns a normalized version of the given repository name that is 451// suitable for use with Prometheus metrics. 452func repoNameForMetric(repo string) string { 453 // Check to see if we want to be able to capture separate indexing metrics for this repository. 454 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 455 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 456 return repo 457 } 458 459 return "" 460} 461 462func batched(slice []uint32, size int) <-chan []uint32 { 463 c := make(chan []uint32) 464 go func() { 465 for len(slice) > 0 { 466 if size > len(slice) { 467 size = len(slice) 468 } 469 c <- slice[:size] 470 slice = slice[size:] 471 } 472 close(c) 473 }() 474 return c 475} 476 477// jitterTicker returns a ticker which ticks with a jitter. Each tick is 478// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 479// 480// sig is a list of signals which also cause the ticker to fire. This is a 481// convenience to allow manually triggering of the ticker. 482func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 483 ticker := make(chan struct{}) 484 485 go func() { 486 for { 487 ticker <- struct{}{} 488 ns := int64(d) 489 jitter := rand.Int63n(ns) 490 time.Sleep(time.Duration(ns/2 + jitter)) 491 } 492 }() 493 494 go func() { 495 if len(sig) == 0 { 496 return 497 } 498 499 c := make(chan os.Signal, 1) 500 signal.Notify(c, sig...) 501 for range c { 502 ticker <- struct{}{} 503 } 504 }() 505 506 return ticker 507} 508 509// Index starts an index job for repo name at commit. 510func (s *Server) Index(args *indexArgs) (state indexState, err error) { 511 tr := trace.New("index", args.Name) 512 513 defer func() { 514 if err != nil { 515 tr.SetError() 516 tr.LazyPrintf("error: %v", err) 517 state = indexStateFail 518 metricFailingTotal.Inc() 519 } 520 tr.LazyPrintf("state: %s", state) 521 tr.Finish() 522 }() 523 524 tr.LazyPrintf("branches: %v", args.Branches) 525 526 if len(args.Branches) == 0 { 527 return indexStateEmpty, createEmptyShard(args) 528 } 529 530 repositoryName := args.Name 531 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 532 tr.LazyPrintf("marking this repository for delta build") 533 args.UseDelta = true 534 } 535 536 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 537 538 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 539 tr.LazyPrintf("skipping symbols calculation") 540 args.Symbols = false 541 } 542 543 reason := "forced" 544 545 if args.Incremental { 546 bo := args.BuildOptions() 547 bo.SetDefaults() 548 incrementalState, fn := bo.IndexState() 549 reason = string(incrementalState) 550 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 551 552 switch incrementalState { 553 case build.IndexStateEqual: 554 debug.Printf("%s index already up to date. Shard=%s", args.String(), fn) 555 return indexStateNoop, nil 556 557 case build.IndexStateMeta: 558 log.Printf("updating index.meta %s", args.String()) 559 560 if err := mergeMeta(bo); err != nil { 561 log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 562 } else { 563 return indexStateSuccessMeta, nil 564 } 565 566 case build.IndexStateCorrupt: 567 log.Printf("falling back to full update: corrupt index: %s", args.String()) 568 } 569 } 570 571 log.Printf("updating index %s reason=%s", args.String(), reason) 572 573 metricIndexingTotal.Inc() 574 c := gitIndexConfig{ 575 runCmd: func(cmd *exec.Cmd) error { 576 return s.loggedRun(tr, cmd) 577 }, 578 579 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 580 return args.BuildOptions().FindRepositoryMetadata() 581 }, 582 } 583 584 err = gitIndex(c, args, s.Sourcegraph, s.logger) 585 if err != nil { 586 return indexStateFail, err 587 } 588 589 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 590 s.logger.Error("failed to update index status", 591 sglog.String("repo", args.Name), 592 sglog.Uint32("id", args.RepoID), 593 sglogBranches("branches", args.Branches), 594 sglog.Error(err), 595 ) 596 } 597 598 return indexStateSuccess, nil 599} 600 601// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 602// it can update the zoekt_repos table. 603func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 604 // We need to read from disk for IndexTime. 605 _, metadata, ok, err := c.findRepositoryMetadata(args) 606 if err != nil { 607 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 608 } 609 if !ok { 610 return errors.New("failed to find metadata for new/updated index") 611 } 612 613 status := []indexStatus{{ 614 RepoID: args.RepoID, 615 Branches: args.Branches, 616 IndexTimeUnix: metadata.IndexTime.Unix(), 617 }} 618 if err := sg.UpdateIndexStatus(status); err != nil { 619 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 620 } 621 622 return nil 623} 624 625func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 626 ss := make([]string, len(branches)) 627 for i, b := range branches { 628 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 629 } 630 return sglog.Strings(key, ss) 631} 632 633func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 634 return &indexArgs{ 635 IndexOptions: opts, 636 637 IndexDir: s.IndexDir, 638 Parallelism: s.CPUCount, 639 640 Incremental: true, 641 642 // 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 643 FileLimit: 1 << 20, 644 } 645} 646 647func createEmptyShard(args *indexArgs) error { 648 bo := args.BuildOptions() 649 bo.SetDefaults() 650 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 651 652 if args.Incremental && bo.IncrementalSkipIndexing() { 653 return nil 654 } 655 656 builder, err := build.NewBuilder(*bo) 657 if err != nil { 658 return err 659 } 660 return builder.Finish() 661} 662 663// addDebugHandlers adds handlers specific to indexserver. 664func (s *Server) addDebugHandlers(mux *http.ServeMux) { 665 // Sourcegraph's site admin view requires indexserver to serve it's admin view 666 // on "/". 667 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 668 669 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 670 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 671 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 672 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 673 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 674 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 675} 676 677func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 678 if r.Method != "GET" { 679 w.Header().Set("Allow", "GET") 680 w.WriteHeader(http.StatusMethodNotAllowed) 681 return 682 } 683 684 response := struct { 685 Hostname string 686 }{ 687 Hostname: s.hostname, 688 } 689 690 b, err := json.Marshal(response) 691 if err != nil { 692 http.Error(w, err.Error(), http.StatusInternalServerError) 693 return 694 } 695 696 w.Header().Set("Content-Type", "application/json; charset=utf-8") 697 w.Write(b) 698} 699 700var rootTmpl = template.Must(template.New("name").Parse(` 701<html> 702 <body> 703 <a href="debug">Debug</a><br /> 704 <a href="debug/requests">Traces</a><br /> 705 {{.IndexMsg}}<br /> 706 <br /> 707 <h3>Reindex</h3> 708 {{if .Repos}} 709 <a href="?show_repos=false">hide repos</a><br /> 710 <table style="margin-top: 20px"> 711 <th style="text-align:left">Name</th> 712 <th style="text-align:left">ID</th> 713 {{range .Repos}} 714 <tr> 715 <td>{{.Name}}</td> 716 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 717 </tr> 718 {{end}} 719 </table> 720 {{else}} 721 <a href="?show_repos=true">show repos</a><br /> 722 {{end}} 723 </body> 724</html> 725`)) 726 727func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 728 if r.Method != "GET" { 729 w.Header().Set("Allow", "GET") 730 w.WriteHeader(http.StatusMethodNotAllowed) 731 return 732 } 733 734 values := r.URL.Query() 735 736 // ?id= 737 indexMsg := "" 738 if v := values.Get("id"); v != "" { 739 id, err := strconv.Atoi(v) 740 if err != nil { 741 http.Error(w, err.Error(), http.StatusBadRequest) 742 return 743 } 744 indexMsg, _ = s.forceIndex(uint32(id)) 745 } 746 747 // ?show_repos= 748 showRepos := false 749 if v := values.Get("show_repos"); v != "" { 750 showRepos, _ = strconv.ParseBool(v) 751 } 752 753 type Repo struct { 754 ID uint32 755 Name string 756 } 757 var data struct { 758 Repos []Repo 759 IndexMsg string 760 } 761 762 data.IndexMsg = indexMsg 763 764 if showRepos { 765 s.queue.Iterate(func(opts *IndexOptions) { 766 data.Repos = append(data.Repos, Repo{ 767 ID: opts.RepoID, 768 Name: opts.Name, 769 }) 770 }) 771 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 772 } 773 774 _ = rootTmpl.Execute(w, data) 775} 776 777// handleReindex triggers a reindex asynocronously. If a reindex was triggered 778// the request returns with status 202. The caller can infer the new state of 779// the index by calling List. 780func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 781 if r.Method != http.MethodPost { 782 w.Header().Set("Allow", http.MethodPost) 783 w.WriteHeader(http.StatusMethodNotAllowed) 784 return 785 } 786 787 err := r.ParseForm() 788 if err != nil { 789 http.Error(w, err.Error(), http.StatusBadRequest) 790 return 791 } 792 793 id, err := strconv.Atoi(r.Form.Get("repo")) 794 if err != nil { 795 http.Error(w, err.Error(), http.StatusBadRequest) 796 return 797 } 798 799 go func() { s.forceIndex(uint32(id)) }() 800 801 // 202 Accepted 802 w.WriteHeader(http.StatusAccepted) 803} 804 805func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 806 withIndexed := true 807 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 808 withIndexed = b 809 } 810 811 var indexed []uint32 812 if withIndexed { 813 indexed = listIndexed(s.IndexDir) 814 } 815 816 repos, err := s.Sourcegraph.List(r.Context(), indexed) 817 if err != nil { 818 http.Error(w, err.Error(), http.StatusInternalServerError) 819 return 820 } 821 822 bw := bytes.Buffer{} 823 824 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 825 826 _, err = fmt.Fprintf(tw, "ID\tName\n") 827 if err != nil { 828 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 829 return 830 } 831 832 s.queue.mu.Lock() 833 name := "" 834 for _, id := range repos.IDs { 835 if item := s.queue.get(id); item != nil { 836 name = item.opts.Name 837 } else { 838 name = "" 839 } 840 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 841 if err != nil { 842 debug.Printf("handleDebugList: %s\n", err.Error()) 843 } 844 } 845 s.queue.mu.Unlock() 846 847 if err != nil { 848 http.Error(w, err.Error(), http.StatusInternalServerError) 849 return 850 } 851 852 err = tw.Flush() 853 if err != nil { 854 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 855 return 856 } 857 858 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 859 860 if _, err := io.Copy(w, &bw); err != nil { 861 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 862 return 863 } 864} 865 866// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 867// can run this command during periods of low usage (evenings, weekends) to 868// trigger an initial merge run. In the steady-state, merges happen rarely, even 869// on busy instances, and users can rely on automatic merging instead. 870func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 871 872 // A merge operation can take very long, depending on the number merges and the 873 // target size of the compound shards. We run the merge in the background and 874 // return immediately to the user. 875 // 876 // We track the status of the merge with metricShardMergingRunning. 877 go func() { 878 s.doMerge() 879 }() 880 _, _ = w.Write([]byte("merging enqueued\n")) 881} 882 883func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 884 indexed := listIndexed(s.IndexDir) 885 886 bw := bytes.Buffer{} 887 888 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 889 890 _, err := fmt.Fprintf(tw, "ID\tName\n") 891 if err != nil { 892 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 893 return 894 } 895 896 s.queue.mu.Lock() 897 name := "" 898 for _, id := range indexed { 899 if item := s.queue.get(id); item != nil { 900 name = item.opts.Name 901 } else { 902 name = "" 903 } 904 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 905 if err != nil { 906 debug.Printf("handleDebugIndexed: %s\n", err.Error()) 907 } 908 } 909 s.queue.mu.Unlock() 910 911 if err != nil { 912 http.Error(w, err.Error(), http.StatusInternalServerError) 913 return 914 } 915 916 err = tw.Flush() 917 if err != nil { 918 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 919 return 920 } 921 922 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 923 924 if _, err := io.Copy(w, &bw); err != nil { 925 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 926 return 927 } 928} 929 930// forceIndex will run the index job for repo name now. It will return always 931// return a string explaining what it did, even if it failed. 932func (s *Server) forceIndex(id uint32) (string, error) { 933 var opts IndexOptions 934 var err error 935 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 936 opts = o 937 }, func(_ uint32, e error) { 938 err = e 939 }, id) 940 if err != nil { 941 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 942 } 943 944 args := s.indexArgs(opts) 945 args.Incremental = false // force re-index 946 947 var state indexState 948 ran := s.muIndexDir.With(opts.Name, func() { 949 state, err = s.Index(args) 950 }) 951 if !ran { 952 return fmt.Sprintf("index job for repository already running: %s", args), nil 953 } 954 if err != nil { 955 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 956 } 957 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 958} 959 960func listIndexed(indexDir string) []uint32 { 961 index := getShards(indexDir) 962 metricNumIndexed.Set(float64(len(index))) 963 repoIDs := make([]uint32, 0, len(index)) 964 for id := range index { 965 repoIDs = append(repoIDs, id) 966 } 967 sort.Slice(repoIDs, func(i, j int) bool { 968 return repoIDs[i] < repoIDs[j] 969 }) 970 return repoIDs 971} 972 973// setupTmpDir sets up a temporary directory on the same volume as the 974// indexes. 975// 976// If main is true we will delete older temp directories left around. main is 977// false when this is a debug command. 978func setupTmpDir(main bool, index string) error { 979 // change the target tmp directory depending on if its our main daemon or a 980 // debug sub command. 981 dir := ".indexserver.debug.tmp" 982 if main { 983 dir = ".indexserver.tmp" 984 } 985 986 tmpRoot := filepath.Join(index, dir) 987 if err := os.MkdirAll(tmpRoot, 0755); err != nil { 988 return err 989 } 990 if !tmpfriend.IsTmpFriendDir(tmpRoot) { 991 _, err := tmpfriend.RootTempDir(tmpRoot) 992 return err 993 } 994 return nil 995} 996 997func printMetaData(fn string) error { 998 repo, indexMeta, err := zoekt.ReadMetadataPath(fn) 999 if err != nil { 1000 return err 1001 } 1002 1003 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1004 if err != nil { 1005 return err 1006 } 1007 1008 err = json.NewEncoder(os.Stdout).Encode(repo) 1009 if err != nil { 1010 return err 1011 } 1012 return nil 1013} 1014 1015func printShardStats(fn string) error { 1016 f, err := os.Open(fn) 1017 if err != nil { 1018 return err 1019 } 1020 1021 iFile, err := zoekt.NewIndexFile(f) 1022 if err != nil { 1023 return err 1024 } 1025 1026 return zoekt.PrintNgramStats(iFile) 1027} 1028 1029func srcLogLevelIsDebug() bool { 1030 lvl := os.Getenv(sglog.EnvLogLevel) 1031 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1032} 1033 1034func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1035 v := os.Getenv(k) 1036 if v == "" { 1037 return defaultVal 1038 } 1039 i, err := strconv.ParseInt(v, 10, 64) 1040 if err != nil { 1041 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1042 } 1043 return i 1044} 1045 1046func getEnvWithDefaultInt(k string, defaultVal int) int { 1047 v := os.Getenv(k) 1048 if v == "" { 1049 return defaultVal 1050 } 1051 i, err := strconv.Atoi(k) 1052 if err != nil { 1053 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1054 } 1055 return i 1056} 1057 1058func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1059 v := os.Getenv(k) 1060 if v == "" { 1061 return defaultVal 1062 } 1063 i, err := strconv.ParseUint(v, 10, 64) 1064 if err != nil { 1065 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1066 } 1067 return i 1068} 1069 1070func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1071 v := os.Getenv(k) 1072 if v == "" { 1073 return defaultVal 1074 } 1075 f, err := strconv.ParseFloat(v, 64) 1076 if err != nil { 1077 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1078 } 1079 return f 1080} 1081 1082func getEnvWithDefaultString(k string, defaultVal string) string { 1083 v := os.Getenv(k) 1084 if v == "" { 1085 return defaultVal 1086 } 1087 return v 1088} 1089 1090func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1091 v := os.Getenv(k) 1092 if v == "" { 1093 return defaultVal 1094 } 1095 1096 d, err := time.ParseDuration(v) 1097 if err != nil { 1098 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1099 } 1100 return d 1101} 1102 1103func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1104 v := os.Getenv(k) 1105 if v == "" { 1106 return defaultVal 1107 } 1108 1109 b, err := strconv.ParseBool(v) 1110 if err != nil { 1111 log.Fatalf("error parsing ENV %s to bool: %s", k, err) 1112 } 1113 return b 1114} 1115 1116func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1117 set := map[string]struct{}{} 1118 for _, v := range strings.Split(os.Getenv(k), ",") { 1119 v = strings.TrimSpace(v) 1120 if v != "" { 1121 set[v] = struct{}{} 1122 } 1123 } 1124 return set 1125} 1126 1127func joinStringSet(set map[string]struct{}, sep string) string { 1128 var xs []string 1129 for x := range set { 1130 xs = append(xs, x) 1131 } 1132 1133 return strings.Join(xs, sep) 1134} 1135 1136func setCompoundShardCounter(indexDir string) { 1137 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt")) 1138 if err != nil { 1139 log.Printf("setCompoundShardCounter: %s\n", err) 1140 return 1141 } 1142 metricNumberCompoundShards.Set(float64(len(fns))) 1143} 1144 1145func rootCmd() *ffcli.Command { 1146 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1147 conf := rootConfig{ 1148 Main: true, 1149 } 1150 conf.registerRootFlags(rootFs) 1151 1152 return &ffcli.Command{ 1153 FlagSet: rootFs, 1154 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1155 Subcommands: []*ffcli.Command{debugCmd()}, 1156 Exec: func(ctx context.Context, args []string) error { 1157 return startServer(conf) 1158 }, 1159 } 1160} 1161 1162type rootConfig struct { 1163 // Main is true if this rootConfig is for our main long running command (the 1164 // indexserver). Debug commands should not set this value. This is used to 1165 // determine if we need to run tmpfriend. 1166 Main bool 1167 1168 root string 1169 interval time.Duration 1170 index string 1171 indexConcurrency int64 1172 listen string 1173 hostname string 1174 cpuFraction float64 1175 blockProfileRate int 1176 1177 // config values related to shard merging 1178 vacuumInterval time.Duration 1179 mergeInterval time.Duration 1180 targetSize int64 1181 minSize int64 1182 minAgeDays int 1183 maxPriority float64 1184 1185 // config values related to backoff indexing repos with one or more consecutive failures 1186 backoffDuration time.Duration 1187 maxBackoffDuration time.Duration 1188 1189 // useGRPC is true if we should use the gRPC API to talk to Sourcegraph. 1190 useGRPC bool 1191} 1192 1193func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1194 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1195 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1196 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of concurrent index jobs to run.") 1197 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1198 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1199 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1200 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1201 fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate") 1202 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1203 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1204 fs.BoolVar(&rc.useGRPC, "use_grpc", getEnvWithDefaultBool("GRPC_ENABLED", false), "use the gRPC API to talk to Sourcegraph") 1205 1206 // flags related to shard merging 1207 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1208 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1209 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB") 1210 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") 1211 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1212 fs.Float64Var(&rc.maxPriority, "merge_max_priority", getEnvWithDefaultFloat64("SRC_MERGE_MAX_PRIORITY", 100), "the maximum priority a shard can have to be considered for merging.") 1213} 1214 1215func startServer(conf rootConfig) error { 1216 s, err := newServer(conf) 1217 if err != nil { 1218 return err 1219 } 1220 1221 profiler.Init("zoekt-sourcegraph-indexserver", zoekt.Version, conf.blockProfileRate) 1222 setCompoundShardCounter(s.IndexDir) 1223 1224 if conf.listen != "" { 1225 1226 mux := http.NewServeMux() 1227 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1228 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1229 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1230 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1231 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1232 }...) 1233 s.addDebugHandlers(mux) 1234 1235 go func() { 1236 debug.Printf("serving HTTP on %s", conf.listen) 1237 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1238 1239 }() 1240 1241 // Serve mux on a unix domain socket on a best-effort-basis so that 1242 // webserver can call the endpoints via the shared filesystem. 1243 // 1244 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1245 // on the socket due to permission errors. See 1246 // https://github.com/docker/for-mac/issues/6239 1247 go func() { 1248 serveHTTPOverSocket := func() error { 1249 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1250 // We cannot bind a socket to an existing pathname. 1251 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1252 return fmt.Errorf("error removing socket file: %s", socket) 1253 } 1254 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1255 // unixpacket). 1256 l, err := net.Listen("unix", socket) 1257 if err != nil { 1258 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1259 } 1260 // Indexserver (root) and webserver (Sourcegraph) run with 1261 // different users. Per default, the socket is created with 1262 // permission 755 (root root), which doesn't let webserver write to 1263 // it. 1264 // 1265 // See https://github.com/golang/go/issues/11822 for more context. 1266 if err := os.Chmod(socket, 0777); err != nil { 1267 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1268 } 1269 debug.Printf("serving HTTP on %s", socket) 1270 return http.Serve(l, mux) 1271 } 1272 debug.Print(serveHTTPOverSocket()) 1273 }() 1274 } 1275 1276 oc := &ownerChecker{ 1277 Path: filepath.Join(conf.index, "owner.txt"), 1278 Hostname: conf.hostname, 1279 } 1280 go oc.Run() 1281 1282 logger := sglog.Scoped("metricsRegistration", "") 1283 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1284 1285 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1286 prometheus.DefaultRegisterer.MustRegister(c) 1287 1288 s.Run() 1289 return nil 1290} 1291 1292func newServer(conf rootConfig) (*Server, error) { 1293 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1294 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1295 } 1296 if conf.index == "" { 1297 return nil, fmt.Errorf("must set -index") 1298 } 1299 if conf.root == "" { 1300 return nil, fmt.Errorf("must set -sourcegraph_url") 1301 } 1302 rootURL, err := url.Parse(conf.root) 1303 if err != nil { 1304 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1305 } 1306 1307 rootURL = addDefaultPort(rootURL) 1308 1309 // Tune GOMAXPROCS to match Linux container CPU quota. 1310 _, _ = maxprocs.Set() 1311 1312 // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler. 1313 // The block profiler is disabled by default and should be enabled with care in production 1314 runtime.SetBlockProfileRate(conf.blockProfileRate) 1315 1316 // Automatically prepend our own path at the front, to minimize 1317 // required configuration. 1318 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1319 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1320 } 1321 1322 if _, err := os.Stat(conf.index); err != nil { 1323 if err := os.MkdirAll(conf.index, 0755); err != nil { 1324 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1325 } 1326 } 1327 1328 if err := setupTmpDir(conf.Main, conf.index); err != nil { 1329 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1330 } 1331 1332 if srcLogLevelIsDebug() { 1333 debug = log.New(os.Stderr, "", log.LstdFlags) 1334 } 1335 1336 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1337 if len(reposWithSeparateIndexingMetrics) > 0 { 1338 debug.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1339 } 1340 1341 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1342 if len(deltaBuildRepositoriesAllowList) > 0 { 1343 debug.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1344 } 1345 1346 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1347 if deltaShardNumberFallbackThreshold > 0 { 1348 debug.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1349 } else { 1350 debug.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1351 } 1352 1353 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1354 if len(reposShouldSkipSymbolsCalculation) > 0 { 1355 debug.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1356 } 1357 1358 var sg Sourcegraph 1359 if rootURL.IsAbs() { 1360 var batchSize int 1361 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1362 batchSize, err = strconv.Atoi(v) 1363 if err != nil { 1364 return nil, fmt.Errorf("Invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1365 } 1366 } 1367 1368 opts := []SourcegraphClientOption{ 1369 WithBatchSize(batchSize), 1370 WithShouldUseGRPC(conf.useGRPC), 1371 } 1372 1373 gRPCConnectionOptions := []grpc.DialOption{ 1374 grpc.WithTransportCredentials(insecure.NewCredentials()), 1375 grpc.WithChainStreamInterceptor(internalActorStreamInterceptor()), 1376 grpc.WithChainUnaryInterceptor(internalActorUnaryInterceptor()), 1377 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1378 } 1379 1380 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1381 // This is done lazily, so we can provide the client to use regardless of 1382 // whether we enabled gRPC or not initially. 1383 cc, err := grpc.Dial(rootURL.Host, gRPCConnectionOptions...) 1384 if err != nil { 1385 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1386 } 1387 1388 client := proto.NewZoektConfigurationServiceClient(cc) 1389 opts = append(opts, WithGRPCClient(client)) 1390 1391 sg = newSourcegraphClient(rootURL, conf.hostname, opts...) 1392 1393 } else { 1394 sg = sourcegraphFake{ 1395 RootDir: rootURL.String(), 1396 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1397 } 1398 } 1399 1400 if conf.indexConcurrency < 1 { 1401 conf.indexConcurrency = 1 1402 } 1403 1404 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1405 if cpuCount < 1 { 1406 cpuCount = 1 1407 } 1408 1409 logger := sglog.Scoped("server", "periodically reindexes enabled repositories on sourcegraph") 1410 1411 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1412 1413 return &Server{ 1414 logger: logger, 1415 Sourcegraph: sg, 1416 IndexDir: conf.index, 1417 IndexConcurrency: int(conf.indexConcurrency), 1418 Interval: conf.interval, 1419 CPUCount: cpuCount, 1420 queue: *q, 1421 shardMerging: zoekt.ShardMergingEnabled(), 1422 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1423 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1424 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1425 hostname: conf.hostname, 1426 mergeOpts: mergeOpts{ 1427 vacuumInterval: conf.vacuumInterval, 1428 mergeInterval: conf.mergeInterval, 1429 targetSizeBytes: conf.targetSize * 1024 * 1024, 1430 minSizeBytes: conf.minSize * 1024 * 1024, 1431 minAgeDays: conf.minAgeDays, 1432 maxPriority: conf.maxPriority, 1433 }, 1434 }, err 1435} 1436 1437// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1438// for the indexed-search-configuration gRPC service. 1439// 1440// The default backoff strategy is modeled after the default settings used by 1441// retryablehttp.DefaultClient. 1442// 1443// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1444// - Unavailable 1445// - Aborted 1446// 1447//go:embed default_grpc_service_configuration.json 1448var defaultGRPCServiceConfigurationJSON string 1449 1450func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1451 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1452 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1453 return invoker(ctx, method, req, reply, cc, opts...) 1454 } 1455} 1456 1457func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1458 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1459 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1460 return streamer(ctx, desc, cc, method, opts...) 1461 } 1462} 1463 1464// addDefaultPort adds a default port to a URL if one is not specified. 1465// 1466// If the URL scheme is "http" and no port is specified, "80" is used. 1467// If the scheme is "https", "443" is used. 1468// 1469// The original URL is not mutated. A copy is modified and returned. 1470func addDefaultPort(original *url.URL) *url.URL { 1471 if original == nil { 1472 return nil // don't panic 1473 } 1474 1475 if !original.IsAbs() { 1476 return original // don't do anything if the URL doesn't look like a remote URL 1477 } 1478 1479 if original.Scheme == "http" && original.Port() == "" { 1480 u := cloneURL(original) 1481 u.Host = net.JoinHostPort(u.Host, "80") 1482 return u 1483 } 1484 1485 if original.Scheme == "https" && original.Port() == "" { 1486 u := cloneURL(original) 1487 u.Host = net.JoinHostPort(u.Host, "443") 1488 return u 1489 } 1490 1491 return original 1492} 1493 1494// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1495// This is copied from net/http/clone.go 1496func cloneURL(u *url.URL) *url.URL { 1497 if u == nil { 1498 return nil 1499 } 1500 u2 := new(url.URL) 1501 *u2 = *u 1502 if u.User != nil { 1503 u2.User = new(url.Userinfo) 1504 *u2.User = *u.User 1505 } 1506 return u2 1507} 1508 1509func main() { 1510 liblog := sglog.Init(sglog.Resource{ 1511 Name: "zoekt-indexserver", 1512 Version: zoekt.Version, 1513 InstanceID: zoekt.HostnameBestEffort(), 1514 }) 1515 defer liblog.Sync() 1516 1517 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1518 log.Fatal(err) 1519 } 1520}