fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled 2// repositories on sourcegraph 3package main 4 5import ( 6 "bytes" 7 "context" 8 _ "embed" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "html/template" 14 "io" 15 "io/fs" 16 "log" 17 "math" 18 "math/rand" 19 "net" 20 "net/http" 21 "net/url" 22 "os" 23 "os/exec" 24 "os/signal" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strconv" 29 "strings" 30 "sync" 31 "text/tabwriter" 32 "time" 33 34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 36 "github.com/peterbourgon/ff/v3/ffcli" 37 "github.com/prometheus/client_golang/prometheus" 38 "github.com/prometheus/client_golang/prometheus/promauto" 39 sglog "github.com/sourcegraph/log" 40 "github.com/sourcegraph/mountinfo" 41 "go.uber.org/automaxprocs/maxprocs" 42 "golang.org/x/net/trace" 43 "golang.org/x/sys/unix" 44 "google.golang.org/grpc" 45 "google.golang.org/grpc/codes" 46 "google.golang.org/grpc/credentials/insecure" 47 "google.golang.org/grpc/metadata" 48 49 "github.com/sourcegraph/zoekt" 50 "github.com/sourcegraph/zoekt/build" 51 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 52 "github.com/sourcegraph/zoekt/debugserver" 53 "github.com/sourcegraph/zoekt/grpc/internalerrs" 54 "github.com/sourcegraph/zoekt/grpc/messagesize" 55 "github.com/sourcegraph/zoekt/internal/profiler" 56 "github.com/sourcegraph/zoekt/internal/tenant" 57) 58 59var ( 60 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 61 Name: "resolve_revisions_seconds", 62 Help: "A histogram of latencies for resolving all repository revisions.", 63 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 64 }) 65 66 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 67 Name: "resolve_revision_seconds", 68 Help: "A histogram of latencies for resolving a repository revision.", 69 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 70 }, []string{"success"}) // success=true|false 71 72 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 73 Name: "get_index_options_total", 74 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 75 }) 76 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 77 Name: "get_index_options_error_total", 78 Help: "The total number of times we failed to get index options for a repository.", 79 }) 80 81 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 82 Name: "index_repo_seconds", 83 Help: "A histogram of latencies for indexing a repository.", 84 Buckets: prometheus.ExponentialBucketsRange( 85 (100 * time.Millisecond).Seconds(), 86 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 87 20), 88 }, []string{ 89 "state", // state is an indexState 90 "name", // name of the repository that was indexed 91 }) 92 93 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{ 94 Name: "index_indexing_delay_seconds", 95 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.", 96 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days 97 }, []string{ 98 "state", // state is an indexState 99 "name", // the name of the repository that was indexed 100 }) 101 102 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 103 Name: "index_fetch_seconds", 104 Help: "A histogram of latencies for fetching a repository.", 105 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 106 }, []string{ 107 "success", // true|false 108 "name", // the name of the repository that the commits were fetched from 109 }) 110 111 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 112 Name: "index_incremental_index_state", 113 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 114 }, []string{"state"}) // state is build.IndexState 115 116 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 117 Name: "index_num_indexed", 118 Help: "Number of indexed repos by code host", 119 }) 120 121 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 122 Name: "index_failing_total", 123 Help: "Counts failures to index (indexing activity, should be used with rate())", 124 }) 125 126 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 127 Name: "index_indexing_total", 128 Help: "Counts indexings (indexing activity, should be used with rate())", 129 }) 130 131 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 132 Name: "index_num_stopped_tracking_total", 133 Help: "Counts the number of repos we stopped tracking.", 134 }) 135 136 // clientMetricsOnce returns a singleton instance of the client metrics 137 // that are shared across all gRPC clients that this process creates. 138 // 139 // This function panics if the metrics cannot be registered with the default 140 // Prometheus registry. 141 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics { 142 clientMetrics := grpcprom.NewClientMetrics( 143 grpcprom.WithClientCounterOptions(), 144 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 145 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 146 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 147 ) 148 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 149 return clientMetrics 150 }) 151) 152 153// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 154// NOTE: if you change this, you must also update gitIndex to use the same value when fetching the repo. 155const MaxFileSize = 1 << 20 156 157// set of repositories that we want to capture separate indexing metrics for 158var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 159 160type indexState string 161 162const ( 163 indexStateFail indexState = "fail" 164 indexStateSuccess indexState = "success" 165 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 166 indexStateNoop indexState = "noop" // We didn't need to update index 167 indexStateEmpty indexState = "empty" // index is empty (empty repo) 168) 169 170// Server is the main functionality of zoekt-sourcegraph-indexserver. It 171// exists to conveniently use all the options passed in via func main. 172type Server struct { 173 logger sglog.Logger 174 175 Sourcegraph Sourcegraph 176 BatchSize int 177 178 // IndexDir is the index directory to use. 179 IndexDir string 180 181 // IndexConcurrency is the number of repositories we index at once. 182 IndexConcurrency int 183 184 // Interval is how often we sync with Sourcegraph. 185 Interval time.Duration 186 187 // CPUCount is the number of CPUs to use for indexing shards. 188 CPUCount int 189 190 queue Queue 191 192 // muIndexDir protects the index directory from concurrent access. 193 muIndexDir indexMutex 194 195 // If true, shard merging is enabled. 196 shardMerging bool 197 198 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 199 // use delta-builds for instead of normal builds 200 deltaBuildRepositoriesAllowList map[string]struct{} 201 202 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 203 // before attempting a delta build. 204 deltaShardNumberFallbackThreshold uint64 205 206 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 207 // we skip calculating symbols metadata for during builds 208 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 209 210 hostname string 211 212 mergeOpts mergeOpts 213 214 // timeout defines how long the index server waits before killing an indexing job. 215 timeout time.Duration 216} 217 218var ( 219 debugLog = log.New(io.Discard, "[DEBUG] ", log.LstdFlags) 220 infoLog = log.New(os.Stderr, "[INFO] ", log.LstdFlags) 221 errorLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags) 222) 223 224// our index commands should output something every 100mb they process. 225// 226// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 227// time." famous last words. A client was indexing a monorepo with 42 228// cores... 5m was not enough. 229const noOutputTimeout = 30 * time.Minute 230 231func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 232 out := &synchronizedBuffer{} 233 cmd.Stdout = out 234 cmd.Stderr = out 235 236 tr.LazyPrintf("%s", cmd.Args) 237 238 defer func() { 239 if err != nil { 240 outS := out.String() 241 tr.LazyPrintf("failed: %v", err) 242 tr.LazyPrintf("output: %s", out) 243 tr.SetError() 244 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 245 } 246 }() 247 248 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 249 250 if err := cmd.Start(); err != nil { 251 return err 252 } 253 254 errC := make(chan error) 255 go func() { 256 errC <- cmd.Wait() 257 }() 258 259 // This channel is set after we have sent sigquit. It allows us to follow up 260 // with a sigkill if the process doesn't quit after sigquit. 261 kill := make(<-chan time.Time) 262 263 lastLen := 0 264 for { 265 select { 266 case <-time.After(noOutputTimeout): 267 // Periodically check if we have had output. If not kill the process. 268 if out.Len() != lastLen { 269 lastLen = out.Len() 270 infoLog.Printf("still running %s", cmd.Args) 271 } else { 272 // Send quit (C-\) first so we get a stack dump. 273 infoLog.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 274 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 275 errorLog.Println("quit failed:", err) 276 } 277 278 // send sigkill if still running in 10s 279 kill = time.After(10 * time.Second) 280 } 281 282 case <-kill: 283 infoLog.Printf("still running, killing %s", cmd.Args) 284 if err := cmd.Process.Kill(); err != nil { 285 errorLog.Println("kill failed:", err) 286 } 287 288 case err := <-errC: 289 if err != nil { 290 return err 291 } 292 293 tr.LazyPrintf("success") 294 return nil 295 } 296 } 297} 298 299// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 300// monitor the buffer while it is being written to. 301type synchronizedBuffer struct { 302 mu sync.Mutex 303 b bytes.Buffer 304} 305 306func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 307 sb.mu.Lock() 308 defer sb.mu.Unlock() 309 return sb.b.Write(p) 310} 311 312func (sb *synchronizedBuffer) Len() int { 313 sb.mu.Lock() 314 defer sb.mu.Unlock() 315 return sb.b.Len() 316} 317 318func (sb *synchronizedBuffer) String() string { 319 sb.mu.Lock() 320 defer sb.mu.Unlock() 321 return sb.b.String() 322} 323 324// pauseFileName if present in IndexDir will stop index jobs from 325// running. This is to make it possible to experiment with the content of the 326// IndexDir without the indexserver writing to it. 327const pauseFileName = "PAUSE" 328 329// Run the sync loop. This blocks forever. 330func (s *Server) Run() { 331 removeIncompleteShards(s.IndexDir) 332 333 // Start a goroutine which updates the queue with commits to index. 334 go func() { 335 // We update the list of indexed repos every Interval. To speed up manual 336 // testing we also listen for SIGUSR1 to trigger updates. 337 // 338 // "pkill -SIGUSR1 zoekt-sourcegra" 339 for range jitterTicker(s.Interval, unix.SIGUSR1) { 340 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 341 infoLog.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 342 continue 343 } 344 345 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 346 if err != nil { 347 errorLog.Printf("error listing repos: %s", err) 348 continue 349 } 350 351 debugLog.Printf("updating index queue with %d repositories", len(repos.IDs)) 352 353 // Stop indexing repos we don't need to track anymore 354 removed := s.queue.MaybeRemoveMissing(repos.IDs) 355 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 356 if len(removed) > 0 { 357 infoLog.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 358 } 359 360 cleanupDone := make(chan struct{}) 361 go func() { 362 defer close(cleanupDone) 363 s.muIndexDir.Global(func() { 364 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 365 }) 366 }() 367 368 repos.IterateIndexOptions(s.queue.AddOrUpdate) 369 370 // IterateIndexOptions will only iterate over repositories that have 371 // changed since we last called list. However, we want to add all IDs 372 // back onto the queue just to check that what is on disk is still 373 // correct. This will use the last IndexOptions we stored in the 374 // queue. The repositories not on the queue (missing) need a forced 375 // fetch of IndexOptions. 376 missing := s.queue.Bump(repos.IDs) 377 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 378 379 setCompoundShardCounter(s.IndexDir) 380 381 <-cleanupDone 382 } 383 }() 384 385 go func() { 386 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 387 if s.shardMerging { 388 s.vacuum() 389 } 390 } 391 }() 392 393 go func() { 394 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 395 if s.shardMerging { 396 s.doMerge() 397 } 398 } 399 }() 400 401 for i := 0; i < s.IndexConcurrency; i++ { 402 go s.processQueue() 403 } 404 405 // block forever 406 select {} 407} 408 409// formatList returns a comma-separated list of the first min(len(v), m) items. 410func formatListUint32(v []uint32, m int) string { 411 if len(v) < m { 412 m = len(v) 413 } 414 415 sb := strings.Builder{} 416 for i := 0; i < m; i++ { 417 fmt.Fprintf(&sb, "%d, ", v[i]) 418 } 419 420 if len(v) > m { 421 sb.WriteString("...") 422 } 423 424 return strings.TrimRight(sb.String(), ", ") 425} 426 427func (s *Server) processQueue() { 428 for { 429 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 430 time.Sleep(time.Second) 431 continue 432 } 433 434 item, ok := s.queue.Pop() 435 if !ok { 436 time.Sleep(time.Second) 437 continue 438 } 439 440 opts := item.Opts 441 args := s.indexArgs(opts) 442 443 ran := s.muIndexDir.With(opts.Name, func() { 444 // only record time taken once we hold the lock. This avoids us 445 // recording time taken while merging/cleanup runs. 446 start := time.Now() 447 448 state, err := s.Index(args) 449 450 elapsed := time.Since(start) 451 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 452 453 indexDelay := time.Since(item.DateAddedToQueue) 454 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds()) 455 456 if err != nil { 457 errorLog.Printf("error indexing %s: %s", args.String(), err) 458 } 459 460 switch state { 461 case indexStateSuccess: 462 var branches []string 463 for _, b := range args.Branches { 464 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 465 } 466 s.logger.Info("updated index", 467 sglog.Int("tenant", args.TenantID), 468 sglog.String("repo", args.Name), 469 sglog.Uint32("id", args.RepoID), 470 sglog.Strings("branches", branches), 471 sglog.Duration("duration", elapsed), 472 sglog.Duration("index_delay", indexDelay), 473 ) 474 case indexStateSuccessMeta: 475 infoLog.Printf("updated meta %s in %v", args.String(), elapsed) 476 } 477 s.queue.SetIndexed(opts, state) 478 }) 479 480 if !ran { 481 // Someone else is processing the repository. We can just skip this job 482 // since the repository will be added back to the queue and we will 483 // converge to the correct behaviour. 484 debugLog.Printf("index job for repository already running: %s", args) 485 continue 486 } 487 } 488} 489 490// repoNameForMetric returns a normalized version of the given repository name that is 491// suitable for use with Prometheus metrics. 492func repoNameForMetric(repo string) string { 493 // Check to see if we want to be able to capture separate indexing metrics for this repository. 494 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 495 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 496 return repo 497 } 498 499 return "" 500} 501 502func batched(slice []uint32, size int) <-chan []uint32 { 503 c := make(chan []uint32) 504 go func() { 505 for len(slice) > 0 { 506 if size > len(slice) { 507 size = len(slice) 508 } 509 c <- slice[:size] 510 slice = slice[size:] 511 } 512 close(c) 513 }() 514 return c 515} 516 517// jitterTicker returns a ticker which ticks with a jitter. Each tick is 518// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 519// 520// sig is a list of signals which also cause the ticker to fire. This is a 521// convenience to allow manually triggering of the ticker. 522func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 523 ticker := make(chan struct{}) 524 525 go func() { 526 for { 527 ticker <- struct{}{} 528 ns := int64(d) 529 jitter := rand.Int63n(ns) 530 time.Sleep(time.Duration(ns/2 + jitter)) 531 } 532 }() 533 534 go func() { 535 if len(sig) == 0 { 536 return 537 } 538 539 c := make(chan os.Signal, 1) 540 signal.Notify(c, sig...) 541 for range c { 542 ticker <- struct{}{} 543 } 544 }() 545 546 return ticker 547} 548 549// Index starts an index job for repo name at commit. 550func (s *Server) Index(args *indexArgs) (state indexState, err error) { 551 tr := trace.New("index", args.Name) 552 tr.SetMaxEvents(30) // Ensure we capture all indexing events 553 554 defer func() { 555 if err != nil { 556 tr.SetError() 557 tr.LazyPrintf("error: %v", err) 558 state = indexStateFail 559 metricFailingTotal.Inc() 560 } 561 tr.LazyPrintf("state: %s", state) 562 tr.Finish() 563 }() 564 565 // Sourcegraph should always provide a tenant ID. 566 if args.TenantID < 1 { 567 return indexStateFail, tenant.ErrMissingTenant 568 } 569 570 tr.LazyPrintf("branches: %v", args.Branches) 571 572 if len(args.Branches) == 0 { 573 return indexStateEmpty, createEmptyShard(args) 574 } 575 576 repositoryName := args.Name 577 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 578 tr.LazyPrintf("marking this repository for delta build") 579 args.UseDelta = true 580 } 581 582 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 583 584 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 585 tr.LazyPrintf("skipping symbols calculation") 586 args.Symbols = false 587 } 588 589 reason := "forced" 590 591 if args.Incremental { 592 bo := args.BuildOptions() 593 bo.SetDefaults() 594 incrementalState, fn := bo.IndexState() 595 reason = string(incrementalState) 596 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 597 598 switch incrementalState { 599 case build.IndexStateEqual: 600 debugLog.Printf("%s index already up to date. Shard=%s", args.String(), fn) 601 return indexStateNoop, nil 602 603 case build.IndexStateMeta: 604 infoLog.Printf("updating index.meta %s", args.String()) 605 606 // TODO(stefan) handle mergeMeta for tenant id. 607 if err := mergeMeta(bo); err != nil { 608 errorLog.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 609 } else { 610 return indexStateSuccessMeta, nil 611 } 612 613 case build.IndexStateCorrupt: 614 infoLog.Printf("falling back to full update: corrupt index: %s", args.String()) 615 } 616 } 617 618 infoLog.Printf("updating index %s reason=%s", args.String(), reason) 619 620 metricIndexingTotal.Inc() 621 c := gitIndexConfig{ 622 runCmd: func(cmd *exec.Cmd) error { 623 return s.loggedRun(tr, cmd) 624 }, 625 626 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 627 return args.BuildOptions().FindRepositoryMetadata() 628 }, 629 timeout: s.timeout, 630 } 631 632 err = gitIndex(c, args, s.Sourcegraph, s.logger) 633 if err != nil { 634 return indexStateFail, err 635 } 636 637 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 638 s.logger.Error("failed to update index status", 639 sglog.String("repo", args.Name), 640 sglog.Uint32("id", args.RepoID), 641 sglogBranches("branches", args.Branches), 642 sglog.Error(err), 643 ) 644 } 645 646 return indexStateSuccess, nil 647} 648 649// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 650// it can update the zoekt_repos table. 651func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 652 // We need to read from disk for IndexTime. 653 _, metadata, ok, err := c.findRepositoryMetadata(args) 654 if err != nil { 655 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 656 } 657 if !ok { 658 return errors.New("failed to find metadata for new/updated index") 659 } 660 661 status := []indexStatus{{ 662 RepoID: args.RepoID, 663 Branches: args.Branches, 664 IndexTimeUnix: metadata.IndexTime.Unix(), 665 }} 666 if err := sg.UpdateIndexStatus(status); err != nil { 667 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 668 } 669 670 return nil 671} 672 673func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 674 ss := make([]string, len(branches)) 675 for i, b := range branches { 676 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 677 } 678 return sglog.Strings(key, ss) 679} 680 681func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 682 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0)) 683 return &indexArgs{ 684 IndexOptions: opts, 685 IndexDir: s.IndexDir, 686 Parallelism: parallelism, 687 Incremental: true, 688 FileLimit: MaxFileSize, 689 ShardMerging: s.shardMerging, 690 } 691} 692 693// parallelism consults both the server flags and index options to determine the number 694// of shards to index in parallel. If the CPUCount index option is provided, it always 695// overrides the server flag. 696func (s *Server) parallelism(opts IndexOptions, maxProcs int) int { 697 var parallelism int 698 if opts.ShardConcurrency > 0 { 699 parallelism = int(opts.ShardConcurrency) 700 } else { 701 parallelism = s.CPUCount 702 } 703 704 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs 705 if parallelism > 4*maxProcs { 706 parallelism = 4 * maxProcs 707 } 708 709 // If index concurrency is set, then divide the parallelism by the number of 710 // repos we're indexing in parallel 711 if s.IndexConcurrency > 1 { 712 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency))) 713 } 714 715 return parallelism 716} 717 718func createEmptyShard(args *indexArgs) error { 719 bo := args.BuildOptions() 720 bo.SetDefaults() 721 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 722 723 if args.Incremental && bo.IncrementalSkipIndexing() { 724 return nil 725 } 726 727 builder, err := build.NewBuilder(*bo) 728 if err != nil { 729 return err 730 } 731 return builder.Finish() 732} 733 734// addDebugHandlers adds handlers specific to indexserver. 735func (s *Server) addDebugHandlers(mux *http.ServeMux) { 736 // Sourcegraph's site admin view requires indexserver to serve it's admin view 737 // on "/". 738 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 739 740 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 741 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 742 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 743 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 744 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 745 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 746} 747 748func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 749 if r.Method != "GET" { 750 w.Header().Set("Allow", "GET") 751 w.WriteHeader(http.StatusMethodNotAllowed) 752 return 753 } 754 755 response := struct { 756 Hostname string 757 }{ 758 Hostname: s.hostname, 759 } 760 761 b, err := json.Marshal(response) 762 if err != nil { 763 http.Error(w, err.Error(), http.StatusInternalServerError) 764 return 765 } 766 767 w.Header().Set("Content-Type", "application/json; charset=utf-8") 768 w.Write(b) 769} 770 771var rootTmpl = template.Must(template.New("name").Parse(` 772<html> 773 <body> 774 <a href="debug">Debug</a><br /> 775 <a href="debug/requests">Traces</a><br /> 776 {{.IndexMsg}}<br /> 777 <br /> 778 <h3>Reindex</h3> 779 {{if .Repos}} 780 <a href="?show_repos=false">hide repos</a><br /> 781 <table style="margin-top: 20px"> 782 <th style="text-align:left">Name</th> 783 <th style="text-align:left">ID (click to reindex)</th> 784 {{range .Repos}} 785 <tr> 786 <td>{{.Name}}</td> 787 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 788 </tr> 789 {{end}} 790 </table> 791 {{else}} 792 <a href="?show_repos=true">show repos</a><br /> 793 {{end}} 794 </body> 795</html> 796`)) 797 798func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 799 if r.Method != "GET" { 800 w.Header().Set("Allow", "GET") 801 w.WriteHeader(http.StatusMethodNotAllowed) 802 return 803 } 804 805 values := r.URL.Query() 806 807 // ?id= 808 indexMsg := "" 809 if v := values.Get("id"); v != "" { 810 id, err := strconv.Atoi(v) 811 if err != nil { 812 http.Error(w, err.Error(), http.StatusBadRequest) 813 return 814 } 815 indexMsg, _ = s.forceIndex(uint32(id)) 816 } 817 818 // ?show_repos= 819 showRepos := false 820 if v := values.Get("show_repos"); v != "" { 821 showRepos, _ = strconv.ParseBool(v) 822 } 823 824 type Repo struct { 825 ID uint32 826 Name string 827 } 828 var data struct { 829 Repos []Repo 830 IndexMsg string 831 } 832 833 data.IndexMsg = indexMsg 834 835 if showRepos { 836 s.queue.Iterate(func(opts *IndexOptions) { 837 data.Repos = append(data.Repos, Repo{ 838 ID: opts.RepoID, 839 Name: opts.Name, 840 }) 841 }) 842 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 843 } 844 845 _ = rootTmpl.Execute(w, data) 846} 847 848// handleReindex triggers a reindex asynocronously. If a reindex was triggered 849// the request returns with status 202. The caller can infer the new state of 850// the index by calling List. 851func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 852 if r.Method != http.MethodPost { 853 w.Header().Set("Allow", http.MethodPost) 854 w.WriteHeader(http.StatusMethodNotAllowed) 855 return 856 } 857 858 err := r.ParseForm() 859 if err != nil { 860 http.Error(w, err.Error(), http.StatusBadRequest) 861 return 862 } 863 864 id, err := strconv.Atoi(r.Form.Get("repo")) 865 if err != nil { 866 http.Error(w, err.Error(), http.StatusBadRequest) 867 return 868 } 869 870 go func() { s.forceIndex(uint32(id)) }() 871 872 // 202 Accepted 873 w.WriteHeader(http.StatusAccepted) 874} 875 876func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 877 withIndexed := true 878 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 879 withIndexed = b 880 } 881 882 var indexed []uint32 883 if withIndexed { 884 indexed = listIndexed(s.IndexDir) 885 } 886 887 repos, err := s.Sourcegraph.List(r.Context(), indexed) 888 if err != nil { 889 http.Error(w, err.Error(), http.StatusInternalServerError) 890 return 891 } 892 893 bw := bytes.Buffer{} 894 895 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 896 897 _, err = fmt.Fprintf(tw, "ID\tName\n") 898 if err != nil { 899 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 900 return 901 } 902 903 s.queue.mu.Lock() 904 name := "" 905 for _, id := range repos.IDs { 906 if item := s.queue.get(id); item != nil { 907 name = item.opts.Name 908 } else { 909 name = "" 910 } 911 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 912 if err != nil { 913 debugLog.Printf("handleDebugList: %s\n", err.Error()) 914 } 915 } 916 s.queue.mu.Unlock() 917 918 if err != nil { 919 http.Error(w, err.Error(), http.StatusInternalServerError) 920 return 921 } 922 923 err = tw.Flush() 924 if err != nil { 925 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 926 return 927 } 928 929 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 930 931 if _, err := io.Copy(w, &bw); err != nil { 932 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 933 return 934 } 935} 936 937// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 938// can run this command during periods of low usage (evenings, weekends) to 939// trigger an initial merge run. In the steady-state, merges happen rarely, even 940// on busy instances, and users can rely on automatic merging instead. 941func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 942 // A merge operation can take very long, depending on the number merges and the 943 // target size of the compound shards. We run the merge in the background and 944 // return immediately to the user. 945 // 946 // We track the status of the merge with metricShardMergingRunning. 947 go func() { 948 s.doMerge() 949 }() 950 _, _ = w.Write([]byte("merging enqueued\n")) 951} 952 953func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 954 indexed := listIndexed(s.IndexDir) 955 956 bw := bytes.Buffer{} 957 958 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 959 960 _, err := fmt.Fprintf(tw, "ID\tName\n") 961 if err != nil { 962 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 963 return 964 } 965 966 s.queue.mu.Lock() 967 name := "" 968 for _, id := range indexed { 969 if item := s.queue.get(id); item != nil { 970 name = item.opts.Name 971 } else { 972 name = "" 973 } 974 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 975 if err != nil { 976 debugLog.Printf("handleDebugIndexed: %s\n", err.Error()) 977 } 978 } 979 s.queue.mu.Unlock() 980 981 if err != nil { 982 http.Error(w, err.Error(), http.StatusInternalServerError) 983 return 984 } 985 986 err = tw.Flush() 987 if err != nil { 988 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 989 return 990 } 991 992 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 993 994 if _, err := io.Copy(w, &bw); err != nil { 995 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 996 return 997 } 998} 999 1000// forceIndex will run the index job for repo name now. It will return always 1001// return a string explaining what it did, even if it failed. 1002func (s *Server) forceIndex(id uint32) (string, error) { 1003 var opts IndexOptions 1004 var err error 1005 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 1006 opts = o 1007 }, func(_ uint32, e error) { 1008 err = e 1009 }, id) 1010 if err != nil { 1011 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 1012 } 1013 1014 args := s.indexArgs(opts) 1015 args.Incremental = false // force re-index 1016 1017 var state indexState 1018 ran := s.muIndexDir.With(opts.Name, func() { 1019 state, err = s.Index(args) 1020 }) 1021 if !ran { 1022 return fmt.Sprintf("index job for repository already running: %s", args), nil 1023 } 1024 if err != nil { 1025 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 1026 } 1027 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 1028} 1029 1030func listIndexed(indexDir string) []uint32 { 1031 index := getShards(indexDir) 1032 metricNumIndexed.Set(float64(len(index))) 1033 repoIDs := make([]uint32, 0, len(index)) 1034 for id := range index { 1035 repoIDs = append(repoIDs, id) 1036 } 1037 sort.Slice(repoIDs, func(i, j int) bool { 1038 return repoIDs[i] < repoIDs[j] 1039 }) 1040 return repoIDs 1041} 1042 1043// setupTmpDir sets up a temporary directory on the same volume as the 1044// indexes. 1045// 1046// If main is true we will delete older temp directories left around. main is 1047// false when this is a debug command. 1048func setupTmpDir(logger sglog.Logger, main bool, index string) error { 1049 // change the target tmp directory depending on if it's our main daemon or a 1050 // debug sub command. 1051 dir := ".indexserver.debug.tmp" 1052 if main { 1053 dir = ".indexserver.tmp" 1054 } 1055 1056 tmpRoot := filepath.Join(index, dir) 1057 1058 if main { 1059 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1060 err := os.RemoveAll(tmpRoot) 1061 if err != nil { 1062 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1063 } 1064 } 1065 1066 if err := os.MkdirAll(tmpRoot, 0o755); err != nil { 1067 return err 1068 } 1069 1070 return os.Setenv("TMPDIR", tmpRoot) 1071} 1072 1073func printMetaData(fn string) error { 1074 repo, indexMeta, err := zoekt.ReadMetadataPath(fn) 1075 if err != nil { 1076 return err 1077 } 1078 1079 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1080 if err != nil { 1081 return err 1082 } 1083 1084 err = json.NewEncoder(os.Stdout).Encode(repo) 1085 if err != nil { 1086 return err 1087 } 1088 return nil 1089} 1090 1091func printShardStats(fn string) error { 1092 f, err := os.Open(fn) 1093 if err != nil { 1094 return err 1095 } 1096 1097 iFile, err := zoekt.NewIndexFile(f) 1098 if err != nil { 1099 return err 1100 } 1101 1102 return zoekt.PrintNgramStats(iFile) 1103} 1104 1105func srcLogLevelIsDebug() bool { 1106 lvl := os.Getenv(sglog.EnvLogLevel) 1107 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1108} 1109 1110func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1111 v := os.Getenv(k) 1112 if v == "" { 1113 return defaultVal 1114 } 1115 b, err := strconv.ParseBool(v) 1116 if err != nil { 1117 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1118 } 1119 return b 1120} 1121 1122func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1123 v := os.Getenv(k) 1124 if v == "" { 1125 return defaultVal 1126 } 1127 i, err := strconv.ParseInt(v, 10, 64) 1128 if err != nil { 1129 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1130 } 1131 return i 1132} 1133 1134func getEnvWithDefaultInt(k string, defaultVal int) int { 1135 v := os.Getenv(k) 1136 if v == "" { 1137 return defaultVal 1138 } 1139 i, err := strconv.Atoi(k) 1140 if err != nil { 1141 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1142 } 1143 return i 1144} 1145 1146func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1147 v := os.Getenv(k) 1148 if v == "" { 1149 return defaultVal 1150 } 1151 i, err := strconv.ParseUint(v, 10, 64) 1152 if err != nil { 1153 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1154 } 1155 return i 1156} 1157 1158func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1159 v := os.Getenv(k) 1160 if v == "" { 1161 return defaultVal 1162 } 1163 f, err := strconv.ParseFloat(v, 64) 1164 if err != nil { 1165 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1166 } 1167 return f 1168} 1169 1170func getEnvWithDefaultString(k string, defaultVal string) string { 1171 v := os.Getenv(k) 1172 if v == "" { 1173 return defaultVal 1174 } 1175 return v 1176} 1177 1178func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1179 v := os.Getenv(k) 1180 if v == "" { 1181 return defaultVal 1182 } 1183 1184 d, err := time.ParseDuration(v) 1185 if err != nil { 1186 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1187 } 1188 return d 1189} 1190 1191func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1192 set := map[string]struct{}{} 1193 for _, v := range strings.Split(os.Getenv(k), ",") { 1194 v = strings.TrimSpace(v) 1195 if v != "" { 1196 set[v] = struct{}{} 1197 } 1198 } 1199 return set 1200} 1201 1202func joinStringSet(set map[string]struct{}, sep string) string { 1203 var xs []string 1204 for x := range set { 1205 xs = append(xs, x) 1206 } 1207 1208 return strings.Join(xs, sep) 1209} 1210 1211func setCompoundShardCounter(indexDir string) { 1212 fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt")) 1213 if err != nil { 1214 errorLog.Printf("setCompoundShardCounter: %s\n", err) 1215 return 1216 } 1217 metricNumberCompoundShards.Set(float64(len(fns))) 1218} 1219 1220func rootCmd() *ffcli.Command { 1221 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1222 conf := rootConfig{ 1223 Main: true, 1224 } 1225 conf.registerRootFlags(rootFs) 1226 1227 return &ffcli.Command{ 1228 FlagSet: rootFs, 1229 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1230 Subcommands: []*ffcli.Command{debugCmd()}, 1231 Exec: func(ctx context.Context, args []string) error { 1232 return startServer(conf) 1233 }, 1234 } 1235} 1236 1237type rootConfig struct { 1238 // Main is true if this rootConfig is for our main long running command (the 1239 // indexserver). Debug commands should not set this value. This is used to 1240 // determine if we need to run tmpfriend. 1241 Main bool 1242 1243 root string 1244 interval time.Duration 1245 index string 1246 indexConcurrency int64 1247 listen string 1248 hostname string 1249 cpuFraction float64 1250 1251 // config values related to shard merging 1252 disableShardMerging bool 1253 vacuumInterval time.Duration 1254 mergeInterval time.Duration 1255 targetSize int64 1256 minSize int64 1257 minAgeDays int 1258 1259 // config values related to backoff indexing repos with one or more consecutive failures 1260 backoffDuration time.Duration 1261 maxBackoffDuration time.Duration 1262} 1263 1264func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1265 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1266 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1267 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently") 1268 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1269 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1270 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1271 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1272 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1273 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1274 1275 // flags related to shard merging 1276 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging") 1277 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1278 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1279 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB") 1280 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB") 1281 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1282} 1283 1284func startServer(conf rootConfig) error { 1285 s, err := newServer(conf) 1286 if err != nil { 1287 return err 1288 } 1289 1290 profiler.Init("zoekt-sourcegraph-indexserver") 1291 setCompoundShardCounter(s.IndexDir) 1292 1293 if conf.listen != "" { 1294 1295 mux := http.NewServeMux() 1296 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1297 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1298 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1299 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1300 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1301 }...) 1302 s.addDebugHandlers(mux) 1303 1304 go func() { 1305 debugLog.Printf("serving HTTP on %s", conf.listen) 1306 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1307 }() 1308 1309 // Serve mux on a unix domain socket on a best-effort-basis so that 1310 // webserver can call the endpoints via the shared filesystem. 1311 // 1312 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1313 // on the socket due to permission errors. See 1314 // https://github.com/docker/for-mac/issues/6239 1315 go func() { 1316 serveHTTPOverSocket := func() error { 1317 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1318 // We cannot bind a socket to an existing pathname. 1319 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1320 return fmt.Errorf("error removing socket file: %s", socket) 1321 } 1322 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1323 // unixpacket). 1324 l, err := net.Listen("unix", socket) 1325 if err != nil { 1326 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1327 } 1328 // Indexserver (root) and webserver (Sourcegraph) run with 1329 // different users. Per default, the socket is created with 1330 // permission 755 (root root), which doesn't let webserver write to 1331 // it. 1332 // 1333 // See https://github.com/golang/go/issues/11822 for more context. 1334 if err := os.Chmod(socket, 0o777); err != nil { 1335 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1336 } 1337 debugLog.Printf("serving HTTP on %s", socket) 1338 return http.Serve(l, mux) 1339 } 1340 debugLog.Print(serveHTTPOverSocket()) 1341 }() 1342 } 1343 1344 oc := &ownerChecker{ 1345 Path: filepath.Join(conf.index, "owner.txt"), 1346 Hostname: conf.hostname, 1347 } 1348 go oc.Run() 1349 1350 logger := sglog.Scoped("metricsRegistration") 1351 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1352 1353 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1354 prometheus.DefaultRegisterer.MustRegister(c) 1355 1356 s.Run() 1357 return nil 1358} 1359 1360func newServer(conf rootConfig) (*Server, error) { 1361 logger := sglog.Scoped("server") 1362 1363 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1364 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1365 } 1366 if conf.index == "" { 1367 return nil, fmt.Errorf("must set -index") 1368 } 1369 if conf.root == "" { 1370 return nil, fmt.Errorf("must set -sourcegraph_url") 1371 } 1372 rootURL, err := url.Parse(conf.root) 1373 if err != nil { 1374 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1375 } 1376 1377 rootURL = addDefaultPort(rootURL) 1378 1379 // Tune GOMAXPROCS to match Linux container CPU quota. 1380 _, _ = maxprocs.Set() 1381 1382 // Automatically prepend our own path at the front, to minimize 1383 // required configuration. 1384 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1385 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1386 } 1387 1388 if _, err := os.Stat(conf.index); err != nil { 1389 if err := os.MkdirAll(conf.index, 0o755); err != nil { 1390 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1391 } 1392 } 1393 1394 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1395 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1396 } 1397 1398 if srcLogLevelIsDebug() { 1399 debugLog.SetOutput(os.Stderr) 1400 } 1401 1402 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1403 if len(reposWithSeparateIndexingMetrics) > 0 { 1404 debugLog.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1405 } 1406 1407 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1408 if len(deltaBuildRepositoriesAllowList) > 0 { 1409 debugLog.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1410 } 1411 1412 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1413 if deltaShardNumberFallbackThreshold > 0 { 1414 debugLog.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1415 } else { 1416 debugLog.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1417 } 1418 1419 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1420 if len(reposShouldSkipSymbolsCalculation) > 0 { 1421 debugLog.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1422 } 1423 1424 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1425 if indexingTimeout != defaultIndexingTimeout { 1426 debugLog.Printf("using configured indexing timeout: %s", indexingTimeout) 1427 } 1428 1429 var sg Sourcegraph 1430 if rootURL.IsAbs() { 1431 var batchSize int 1432 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1433 batchSize, err = strconv.Atoi(v) 1434 if err != nil { 1435 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1436 } 1437 } 1438 1439 opts := []SourcegraphClientOption{ 1440 WithBatchSize(batchSize), 1441 } 1442 1443 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1444 client, err := dialGRPCClient(rootURL.Host, logger) 1445 if err != nil { 1446 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1447 } 1448 1449 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...) 1450 1451 } else { 1452 sg = sourcegraphFake{ 1453 RootDir: rootURL.String(), 1454 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1455 } 1456 } 1457 1458 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1459 if cpuCount < 1 { 1460 cpuCount = 1 1461 } 1462 1463 if conf.indexConcurrency < 1 { 1464 conf.indexConcurrency = 1 1465 } else if conf.indexConcurrency > int64(cpuCount) { 1466 conf.indexConcurrency = int64(cpuCount) 1467 } 1468 1469 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1470 1471 return &Server{ 1472 logger: logger, 1473 Sourcegraph: sg, 1474 IndexDir: conf.index, 1475 IndexConcurrency: int(conf.indexConcurrency), 1476 Interval: conf.interval, 1477 CPUCount: cpuCount, 1478 queue: *q, 1479 shardMerging: !conf.disableShardMerging, 1480 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1481 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1482 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1483 hostname: conf.hostname, 1484 mergeOpts: mergeOpts{ 1485 vacuumInterval: conf.vacuumInterval, 1486 mergeInterval: conf.mergeInterval, 1487 targetSizeBytes: conf.targetSize * 1024 * 1024, 1488 minSizeBytes: conf.minSize * 1024 * 1024, 1489 minAgeDays: conf.minAgeDays, 1490 }, 1491 timeout: indexingTimeout, 1492 }, err 1493} 1494 1495// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1496// for the indexed-search-configuration gRPC service. 1497// 1498// The default backoff strategy is modeled after the default settings used by 1499// retryablehttp.DefaultClient. 1500// 1501// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1502// - Unavailable 1503// - Aborted 1504// 1505//go:embed default_grpc_service_configuration.json 1506var defaultGRPCServiceConfigurationJSON string 1507 1508func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1509 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1510 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1511 return invoker(ctx, method, req, reply, cc, opts...) 1512 } 1513} 1514 1515func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1516 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1517 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1518 return streamer(ctx, desc, cc, method, opts...) 1519 } 1520} 1521 1522// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1523// This can be overridden by providing custom Server/Dial options. 1524const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1525 1526func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) { 1527 metrics := clientMetricsOnce() 1528 1529 // If the service seems to be unavailable, this 1530 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1531 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1532 retryOpts := []retry.CallOption{ 1533 retry.WithMax(5), 1534 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1535 retry.WithCodes(codes.Unavailable), 1536 } 1537 1538 opts := []grpc.DialOption{ 1539 grpc.WithTransportCredentials(insecure.NewCredentials()), 1540 grpc.WithChainStreamInterceptor( 1541 metrics.StreamClientInterceptor(), 1542 messagesize.StreamClientInterceptor, 1543 internalActorStreamInterceptor(), 1544 internalerrs.LoggingStreamClientInterceptor(logger), 1545 internalerrs.PrometheusStreamClientInterceptor, 1546 retry.StreamClientInterceptor(retryOpts...), 1547 ), 1548 grpc.WithChainUnaryInterceptor( 1549 metrics.UnaryClientInterceptor(), 1550 messagesize.UnaryClientInterceptor, 1551 internalActorUnaryInterceptor(), 1552 internalerrs.LoggingUnaryClientInterceptor(logger), 1553 internalerrs.PrometheusUnaryClientInterceptor, 1554 retry.UnaryClientInterceptor(retryOpts...), 1555 ), 1556 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1557 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1558 } 1559 1560 opts = append(opts, additionalOpts...) 1561 1562 // Ensure that the message size options are set last, so they override any other 1563 // client-specific options that tweak the message size. 1564 // 1565 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1566 // take precedence over everything else with a uniform size setting that's easy to reason about. 1567 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1568 1569 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1570 // This is done lazily, so we can provide the client to use regardless of 1571 // whether we enabled gRPC or not initially. 1572 cc, err := grpc.Dial(addr, opts...) 1573 if err != nil { 1574 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1575 } 1576 1577 client := proto.NewZoektConfigurationServiceClient(cc) 1578 return client, nil 1579} 1580 1581// addDefaultPort adds a default port to a URL if one is not specified. 1582// 1583// If the URL scheme is "http" and no port is specified, "80" is used. 1584// If the scheme is "https", "443" is used. 1585// 1586// The original URL is not mutated. A copy is modified and returned. 1587func addDefaultPort(original *url.URL) *url.URL { 1588 if original == nil { 1589 return nil // don't panic 1590 } 1591 1592 if !original.IsAbs() { 1593 return original // don't do anything if the URL doesn't look like a remote URL 1594 } 1595 1596 if original.Scheme == "http" && original.Port() == "" { 1597 u := cloneURL(original) 1598 u.Host = net.JoinHostPort(u.Host, "80") 1599 return u 1600 } 1601 1602 if original.Scheme == "https" && original.Port() == "" { 1603 u := cloneURL(original) 1604 u.Host = net.JoinHostPort(u.Host, "443") 1605 return u 1606 } 1607 1608 return original 1609} 1610 1611// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1612// This is copied from net/http/clone.go 1613func cloneURL(u *url.URL) *url.URL { 1614 if u == nil { 1615 return nil 1616 } 1617 u2 := new(url.URL) 1618 *u2 = *u 1619 if u.User != nil { 1620 u2.User = new(url.Userinfo) 1621 *u2.User = *u.User 1622 } 1623 return u2 1624} 1625 1626func main() { 1627 liblog := sglog.Init(sglog.Resource{ 1628 Name: "zoekt-indexserver", 1629 Version: zoekt.Version, 1630 InstanceID: zoekt.HostnameBestEffort(), 1631 }) 1632 defer liblog.Sync() 1633 1634 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1635 log.Fatal(err) 1636 } 1637} 1638 1639// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1640// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1641// 1642// An error is returned of the provided environment variables fails to parse as a boolean. 1643func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1644 for _, envVar := range envVarNames { 1645 v := os.Getenv(envVar) 1646 if v == "" { 1647 continue 1648 } 1649 1650 b, err := strconv.ParseBool(v) 1651 if err != nil { 1652 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1653 } 1654 1655 return b, nil 1656 } 1657 1658 return defaultBool, nil 1659}