fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Command zoekt-sourcegraph-indexserver periodically reindexes enabled 2// repositories on sourcegraph 3package main 4 5import ( 6 "bytes" 7 "context" 8 _ "embed" 9 "encoding/json" 10 "errors" 11 "flag" 12 "fmt" 13 "html/template" 14 "io" 15 "io/fs" 16 "log" 17 "math" 18 "math/rand" 19 "net" 20 "net/http" 21 "net/url" 22 "os" 23 "os/exec" 24 "os/signal" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strconv" 29 "strings" 30 "sync" 31 "text/tabwriter" 32 "time" 33 34 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 35 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 36 "github.com/peterbourgon/ff/v3/ffcli" 37 "github.com/prometheus/client_golang/prometheus" 38 "github.com/prometheus/client_golang/prometheus/promauto" 39 sglog "github.com/sourcegraph/log" 40 "github.com/sourcegraph/mountinfo" 41 "github.com/sourcegraph/zoekt" 42 "github.com/sourcegraph/zoekt/build" 43 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 44 "github.com/sourcegraph/zoekt/grpc/internalerrs" 45 "github.com/sourcegraph/zoekt/grpc/messagesize" 46 "github.com/sourcegraph/zoekt/internal/debugserver" 47 "github.com/sourcegraph/zoekt/internal/profiler" 48 "github.com/sourcegraph/zoekt/internal/tenant" 49 "go.uber.org/automaxprocs/maxprocs" 50 "golang.org/x/net/trace" 51 "golang.org/x/sys/unix" 52 "google.golang.org/grpc" 53 "google.golang.org/grpc/codes" 54 "google.golang.org/grpc/credentials/insecure" 55 "google.golang.org/grpc/metadata" 56) 57 58var ( 59 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 60 Name: "resolve_revisions_seconds", 61 Help: "A histogram of latencies for resolving all repository revisions.", 62 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 63 }) 64 65 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 66 Name: "resolve_revision_seconds", 67 Help: "A histogram of latencies for resolving a repository revision.", 68 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 69 }, []string{"success"}) // success=true|false 70 71 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 72 Name: "get_index_options_total", 73 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 74 }) 75 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 76 Name: "get_index_options_error_total", 77 Help: "The total number of times we failed to get index options for a repository.", 78 }) 79 80 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 81 Name: "index_repo_seconds", 82 Help: "A histogram of latencies for indexing a repository.", 83 Buckets: prometheus.ExponentialBucketsRange( 84 (100 * time.Millisecond).Seconds(), 85 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 86 20), 87 }, []string{ 88 "state", // state is an indexState 89 "name", // name of the repository that was indexed 90 }) 91 92 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{ 93 Name: "index_indexing_delay_seconds", 94 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.", 95 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days 96 }, []string{ 97 "state", // state is an indexState 98 "name", // the name of the repository that was indexed 99 }) 100 101 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 102 Name: "index_fetch_seconds", 103 Help: "A histogram of latencies for fetching a repository.", 104 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 105 }, []string{ 106 "success", // true|false 107 "name", // the name of the repository that the commits were fetched from 108 }) 109 110 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 111 Name: "index_incremental_index_state", 112 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 113 }, []string{"state"}) // state is build.IndexState 114 115 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 116 Name: "index_num_indexed", 117 Help: "Number of indexed repos by code host", 118 }) 119 120 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 121 Name: "index_failing_total", 122 Help: "Counts failures to index (indexing activity, should be used with rate())", 123 }) 124 125 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 126 Name: "index_indexing_total", 127 Help: "Counts indexings (indexing activity, should be used with rate())", 128 }) 129 130 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 131 Name: "index_num_stopped_tracking_total", 132 Help: "Counts the number of repos we stopped tracking.", 133 }) 134 135 // clientMetricsOnce returns a singleton instance of the client metrics 136 // that are shared across all gRPC clients that this process creates. 137 // 138 // This function panics if the metrics cannot be registered with the default 139 // Prometheus registry. 140 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics { 141 clientMetrics := grpcprom.NewClientMetrics( 142 grpcprom.WithClientCounterOptions(), 143 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 144 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 145 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 146 ) 147 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 148 return clientMetrics 149 }) 150) 151 152// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 153// NOTE: if you change this, you must also update gitIndex to use the same value when fetching the repo. 154const MaxFileSize = 1 << 20 155 156// set of repositories that we want to capture separate indexing metrics for 157var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 158 159type indexState string 160 161const ( 162 indexStateFail indexState = "fail" 163 indexStateSuccess indexState = "success" 164 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 165 indexStateNoop indexState = "noop" // We didn't need to update index 166 indexStateEmpty indexState = "empty" // index is empty (empty repo) 167) 168 169// Server is the main functionality of zoekt-sourcegraph-indexserver. It 170// exists to conveniently use all the options passed in via func main. 171type Server struct { 172 logger sglog.Logger 173 174 Sourcegraph Sourcegraph 175 BatchSize int 176 177 // IndexDir is the index directory to use. 178 IndexDir string 179 180 // IndexConcurrency is the number of repositories we index at once. 181 IndexConcurrency int 182 183 // Interval is how often we sync with Sourcegraph. 184 Interval time.Duration 185 186 // CPUCount is the number of CPUs to use for indexing shards. 187 CPUCount int 188 189 queue Queue 190 191 // muIndexDir protects the index directory from concurrent access. 192 muIndexDir indexMutex 193 194 // If true, shard merging is enabled. 195 shardMerging bool 196 197 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 198 // use delta-builds for instead of normal builds 199 deltaBuildRepositoriesAllowList map[string]struct{} 200 201 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 202 // before attempting a delta build. 203 deltaShardNumberFallbackThreshold uint64 204 205 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 206 // we skip calculating symbols metadata for during builds 207 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 208 209 hostname string 210 211 mergeOpts mergeOpts 212 213 // timeout defines how long the index server waits before killing an indexing job. 214 timeout time.Duration 215} 216 217var ( 218 debugLog = log.New(io.Discard, "[DEBUG] ", log.LstdFlags) 219 infoLog = log.New(os.Stderr, "[INFO] ", log.LstdFlags) 220 errorLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags) 221) 222 223// our index commands should output something every 100mb they process. 224// 225// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 226// time." famous last words. A client was indexing a monorepo with 42 227// cores... 5m was not enough. 228const noOutputTimeout = 30 * time.Minute 229 230func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 231 out := &synchronizedBuffer{} 232 cmd.Stdout = out 233 cmd.Stderr = out 234 235 tr.LazyPrintf("%s", cmd.Args) 236 237 defer func() { 238 if err != nil { 239 outS := out.String() 240 tr.LazyPrintf("failed: %v", err) 241 tr.LazyPrintf("output: %s", out) 242 tr.SetError() 243 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 244 } 245 }() 246 247 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 248 249 if err := cmd.Start(); err != nil { 250 return err 251 } 252 253 errC := make(chan error) 254 go func() { 255 errC <- cmd.Wait() 256 }() 257 258 // This channel is set after we have sent sigquit. It allows us to follow up 259 // with a sigkill if the process doesn't quit after sigquit. 260 kill := make(<-chan time.Time) 261 262 lastLen := 0 263 for { 264 select { 265 case <-time.After(noOutputTimeout): 266 // Periodically check if we have had output. If not kill the process. 267 if out.Len() != lastLen { 268 lastLen = out.Len() 269 infoLog.Printf("still running %s", cmd.Args) 270 } else { 271 // Send quit (C-\) first so we get a stack dump. 272 infoLog.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 273 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 274 errorLog.Println("quit failed:", err) 275 } 276 277 // send sigkill if still running in 10s 278 kill = time.After(10 * time.Second) 279 } 280 281 case <-kill: 282 infoLog.Printf("still running, killing %s", cmd.Args) 283 if err := cmd.Process.Kill(); err != nil { 284 errorLog.Println("kill failed:", err) 285 } 286 287 case err := <-errC: 288 if err != nil { 289 return err 290 } 291 292 tr.LazyPrintf("success") 293 return nil 294 } 295 } 296} 297 298// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 299// monitor the buffer while it is being written to. 300type synchronizedBuffer struct { 301 mu sync.Mutex 302 b bytes.Buffer 303} 304 305func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 306 sb.mu.Lock() 307 defer sb.mu.Unlock() 308 return sb.b.Write(p) 309} 310 311func (sb *synchronizedBuffer) Len() int { 312 sb.mu.Lock() 313 defer sb.mu.Unlock() 314 return sb.b.Len() 315} 316 317func (sb *synchronizedBuffer) String() string { 318 sb.mu.Lock() 319 defer sb.mu.Unlock() 320 return sb.b.String() 321} 322 323// pauseFileName if present in IndexDir will stop index jobs from 324// running. This is to make it possible to experiment with the content of the 325// IndexDir without the indexserver writing to it. 326const pauseFileName = "PAUSE" 327 328// Run the sync loop. This blocks forever. 329func (s *Server) Run() { 330 removeIncompleteShards(s.IndexDir) 331 332 // Start a goroutine which updates the queue with commits to index. 333 go func() { 334 // We update the list of indexed repos every Interval. To speed up manual 335 // testing we also listen for SIGUSR1 to trigger updates. 336 // 337 // "pkill -SIGUSR1 zoekt-sourcegra" 338 for range jitterTicker(s.Interval, unix.SIGUSR1) { 339 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 340 infoLog.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 341 continue 342 } 343 344 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 345 if err != nil { 346 errorLog.Printf("error listing repos: %s", err) 347 continue 348 } 349 350 debugLog.Printf("updating index queue with %d repositories", len(repos.IDs)) 351 352 // Stop indexing repos we don't need to track anymore 353 removed := s.queue.MaybeRemoveMissing(repos.IDs) 354 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 355 if len(removed) > 0 { 356 infoLog.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 357 } 358 359 cleanupDone := make(chan struct{}) 360 go func() { 361 defer close(cleanupDone) 362 s.muIndexDir.Global(func() { 363 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 364 }) 365 }() 366 367 repos.IterateIndexOptions(s.queue.AddOrUpdate) 368 369 // IterateIndexOptions will only iterate over repositories that have 370 // changed since we last called list. However, we want to add all IDs 371 // back onto the queue just to check that what is on disk is still 372 // correct. This will use the last IndexOptions we stored in the 373 // queue. The repositories not on the queue (missing) need a forced 374 // fetch of IndexOptions. 375 missing := s.queue.Bump(repos.IDs) 376 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 377 378 setShardsCounter(s.IndexDir) 379 380 <-cleanupDone 381 } 382 }() 383 384 go func() { 385 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 386 if s.shardMerging { 387 s.vacuum() 388 } 389 } 390 }() 391 392 go func() { 393 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 394 if s.shardMerging { 395 s.doMerge() 396 } 397 } 398 }() 399 400 for i := 0; i < s.IndexConcurrency; i++ { 401 go s.processQueue() 402 } 403 404 // block forever 405 select {} 406} 407 408// formatList returns a comma-separated list of the first min(len(v), m) items. 409func formatListUint32(v []uint32, m int) string { 410 if len(v) < m { 411 m = len(v) 412 } 413 414 sb := strings.Builder{} 415 for i := 0; i < m; i++ { 416 fmt.Fprintf(&sb, "%d, ", v[i]) 417 } 418 419 if len(v) > m { 420 sb.WriteString("...") 421 } 422 423 return strings.TrimRight(sb.String(), ", ") 424} 425 426func (s *Server) processQueue() { 427 for { 428 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 429 time.Sleep(time.Second) 430 continue 431 } 432 433 item, ok := s.queue.Pop() 434 if !ok { 435 time.Sleep(time.Second) 436 continue 437 } 438 439 opts := item.Opts 440 args := s.indexArgs(opts) 441 442 ran := s.muIndexDir.With(opts.Name, func() { 443 // only record time taken once we hold the lock. This avoids us 444 // recording time taken while merging/cleanup runs. 445 start := time.Now() 446 447 state, err := s.Index(args) 448 449 elapsed := time.Since(start) 450 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 451 452 indexDelay := time.Since(item.DateAddedToQueue) 453 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds()) 454 455 if err != nil { 456 errorLog.Printf("error indexing %s: %s", args.String(), err) 457 } 458 459 switch state { 460 case indexStateSuccess: 461 var branches []string 462 for _, b := range args.Branches { 463 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 464 } 465 s.logger.Info("updated index", 466 sglog.Int("tenant", args.TenantID), 467 sglog.String("repo", args.Name), 468 sglog.Uint32("id", args.RepoID), 469 sglog.Strings("branches", branches), 470 sglog.Duration("duration", elapsed), 471 sglog.Duration("index_delay", indexDelay), 472 ) 473 case indexStateSuccessMeta: 474 infoLog.Printf("updated meta %s in %v", args.String(), elapsed) 475 } 476 s.queue.SetIndexed(opts, state) 477 }) 478 479 if !ran { 480 // Someone else is processing the repository. We can just skip this job 481 // since the repository will be added back to the queue and we will 482 // converge to the correct behaviour. 483 debugLog.Printf("index job for repository already running: %s", args) 484 continue 485 } 486 } 487} 488 489// repoNameForMetric returns a normalized version of the given repository name that is 490// suitable for use with Prometheus metrics. 491func repoNameForMetric(repo string) string { 492 // Check to see if we want to be able to capture separate indexing metrics for this repository. 493 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 494 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 495 return repo 496 } 497 498 return "" 499} 500 501func batched(slice []uint32, size int) <-chan []uint32 { 502 c := make(chan []uint32) 503 go func() { 504 for len(slice) > 0 { 505 if size > len(slice) { 506 size = len(slice) 507 } 508 c <- slice[:size] 509 slice = slice[size:] 510 } 511 close(c) 512 }() 513 return c 514} 515 516// jitterTicker returns a ticker which ticks with a jitter. Each tick is 517// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 518// 519// sig is a list of signals which also cause the ticker to fire. This is a 520// convenience to allow manually triggering of the ticker. 521func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 522 ticker := make(chan struct{}) 523 524 go func() { 525 for { 526 ticker <- struct{}{} 527 ns := int64(d) 528 jitter := rand.Int63n(ns) 529 time.Sleep(time.Duration(ns/2 + jitter)) 530 } 531 }() 532 533 go func() { 534 if len(sig) == 0 { 535 return 536 } 537 538 c := make(chan os.Signal, 1) 539 signal.Notify(c, sig...) 540 for range c { 541 ticker <- struct{}{} 542 } 543 }() 544 545 return ticker 546} 547 548// Index starts an index job for repo name at commit. 549func (s *Server) Index(args *indexArgs) (state indexState, err error) { 550 tr := trace.New("index", args.Name) 551 tr.SetMaxEvents(30) // Ensure we capture all indexing events 552 553 defer func() { 554 if err != nil { 555 tr.SetError() 556 tr.LazyPrintf("error: %v", err) 557 state = indexStateFail 558 metricFailingTotal.Inc() 559 } 560 tr.LazyPrintf("state: %s", state) 561 tr.Finish() 562 }() 563 564 // Sourcegraph should always provide a tenant ID. 565 if args.TenantID < 1 { 566 return indexStateFail, tenant.ErrMissingTenant 567 } 568 569 tr.LazyPrintf("branches: %v", args.Branches) 570 571 if len(args.Branches) == 0 { 572 return indexStateEmpty, createEmptyShard(args) 573 } 574 575 repositoryName := args.Name 576 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 577 tr.LazyPrintf("marking this repository for delta build") 578 args.UseDelta = true 579 } 580 581 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 582 583 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 584 tr.LazyPrintf("skipping symbols calculation") 585 args.Symbols = false 586 } 587 588 reason := "forced" 589 590 if args.Incremental { 591 bo := args.BuildOptions() 592 bo.SetDefaults() 593 incrementalState, fn := bo.IndexState() 594 reason = string(incrementalState) 595 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 596 597 switch incrementalState { 598 case build.IndexStateEqual: 599 debugLog.Printf("%s index already up to date. Shard=%s", args.String(), fn) 600 return indexStateNoop, nil 601 602 case build.IndexStateMeta: 603 infoLog.Printf("updating index.meta %s", args.String()) 604 605 // TODO(stefan) handle mergeMeta for tenant id. 606 if err := mergeMeta(bo); err != nil { 607 errorLog.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 608 } else { 609 return indexStateSuccessMeta, nil 610 } 611 612 case build.IndexStateCorrupt: 613 infoLog.Printf("falling back to full update: corrupt index: %s", args.String()) 614 } 615 } 616 617 infoLog.Printf("updating index %s reason=%s", args.String(), reason) 618 619 metricIndexingTotal.Inc() 620 c := gitIndexConfig{ 621 runCmd: func(cmd *exec.Cmd) error { 622 return s.loggedRun(tr, cmd) 623 }, 624 625 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 626 return args.BuildOptions().FindRepositoryMetadata() 627 }, 628 timeout: s.timeout, 629 } 630 631 err = gitIndex(c, args, s.Sourcegraph, s.logger) 632 if err != nil { 633 return indexStateFail, err 634 } 635 636 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 637 s.logger.Error("failed to update index status", 638 sglog.String("repo", args.Name), 639 sglog.Uint32("id", args.RepoID), 640 sglogBranches("branches", args.Branches), 641 sglog.Error(err), 642 ) 643 } 644 645 return indexStateSuccess, nil 646} 647 648// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 649// it can update the zoekt_repos table. 650func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 651 // We need to read from disk for IndexTime. 652 _, metadata, ok, err := c.findRepositoryMetadata(args) 653 if err != nil { 654 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 655 } 656 if !ok { 657 return errors.New("failed to find metadata for new/updated index") 658 } 659 660 status := []indexStatus{{ 661 RepoID: args.RepoID, 662 Branches: args.Branches, 663 IndexTimeUnix: metadata.IndexTime.Unix(), 664 }} 665 if err := sg.UpdateIndexStatus(status); err != nil { 666 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 667 } 668 669 return nil 670} 671 672func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 673 ss := make([]string, len(branches)) 674 for i, b := range branches { 675 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 676 } 677 return sglog.Strings(key, ss) 678} 679 680func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 681 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0)) 682 return &indexArgs{ 683 IndexOptions: opts, 684 IndexDir: s.IndexDir, 685 Parallelism: parallelism, 686 Incremental: true, 687 FileLimit: MaxFileSize, 688 ShardMerging: s.shardMerging, 689 } 690} 691 692// parallelism consults both the server flags and index options to determine the number 693// of shards to index in parallel. If the CPUCount index option is provided, it always 694// overrides the server flag. 695func (s *Server) parallelism(opts IndexOptions, maxProcs int) int { 696 var parallelism int 697 if opts.ShardConcurrency > 0 { 698 parallelism = int(opts.ShardConcurrency) 699 } else { 700 parallelism = s.CPUCount 701 } 702 703 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs 704 if parallelism > 4*maxProcs { 705 parallelism = 4 * maxProcs 706 } 707 708 // If index concurrency is set, then divide the parallelism by the number of 709 // repos we're indexing in parallel 710 if s.IndexConcurrency > 1 { 711 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency))) 712 } 713 714 return parallelism 715} 716 717func createEmptyShard(args *indexArgs) error { 718 bo := args.BuildOptions() 719 bo.SetDefaults() 720 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 721 722 if args.Incremental && bo.IncrementalSkipIndexing() { 723 return nil 724 } 725 726 builder, err := build.NewBuilder(*bo) 727 if err != nil { 728 return err 729 } 730 return builder.Finish() 731} 732 733// addDebugHandlers adds handlers specific to indexserver. 734func (s *Server) addDebugHandlers(mux *http.ServeMux) { 735 // Sourcegraph's site admin view requires indexserver to serve it's admin view 736 // on "/". 737 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 738 739 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 740 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 741 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 742 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 743 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 744 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 745} 746 747func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 748 if r.Method != "GET" { 749 w.Header().Set("Allow", "GET") 750 w.WriteHeader(http.StatusMethodNotAllowed) 751 return 752 } 753 754 response := struct { 755 Hostname string 756 }{ 757 Hostname: s.hostname, 758 } 759 760 b, err := json.Marshal(response) 761 if err != nil { 762 http.Error(w, err.Error(), http.StatusInternalServerError) 763 return 764 } 765 766 w.Header().Set("Content-Type", "application/json; charset=utf-8") 767 w.Write(b) 768} 769 770var rootTmpl = template.Must(template.New("name").Parse(` 771<html> 772 <body> 773 <a href="debug">Debug</a><br /> 774 <a href="debug/requests">Traces</a><br /> 775 {{.IndexMsg}}<br /> 776 <br /> 777 <h3>Reindex</h3> 778 {{if .Repos}} 779 <a href="?show_repos=false">hide repos</a><br /> 780 <table style="margin-top: 20px"> 781 <th style="text-align:left">Name</th> 782 <th style="text-align:left">ID (click to reindex)</th> 783 {{range .Repos}} 784 <tr> 785 <td>{{.Name}}</td> 786 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 787 </tr> 788 {{end}} 789 </table> 790 {{else}} 791 <a href="?show_repos=true">show repos</a><br /> 792 {{end}} 793 </body> 794</html> 795`)) 796 797func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 798 if r.Method != "GET" { 799 w.Header().Set("Allow", "GET") 800 w.WriteHeader(http.StatusMethodNotAllowed) 801 return 802 } 803 804 values := r.URL.Query() 805 806 // ?id= 807 indexMsg := "" 808 if v := values.Get("id"); v != "" { 809 id, err := strconv.Atoi(v) 810 if err != nil { 811 http.Error(w, err.Error(), http.StatusBadRequest) 812 return 813 } 814 indexMsg, _ = s.forceIndex(uint32(id)) 815 } 816 817 // ?show_repos= 818 showRepos := false 819 if v := values.Get("show_repos"); v != "" { 820 showRepos, _ = strconv.ParseBool(v) 821 } 822 823 type Repo struct { 824 ID uint32 825 Name string 826 } 827 var data struct { 828 Repos []Repo 829 IndexMsg string 830 } 831 832 data.IndexMsg = indexMsg 833 834 if showRepos { 835 s.queue.Iterate(func(opts *IndexOptions) { 836 data.Repos = append(data.Repos, Repo{ 837 ID: opts.RepoID, 838 Name: opts.Name, 839 }) 840 }) 841 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 842 } 843 844 _ = rootTmpl.Execute(w, data) 845} 846 847// handleReindex triggers a reindex asynocronously. If a reindex was triggered 848// the request returns with status 202. The caller can infer the new state of 849// the index by calling List. 850func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 851 if r.Method != http.MethodPost { 852 w.Header().Set("Allow", http.MethodPost) 853 w.WriteHeader(http.StatusMethodNotAllowed) 854 return 855 } 856 857 err := r.ParseForm() 858 if err != nil { 859 http.Error(w, err.Error(), http.StatusBadRequest) 860 return 861 } 862 863 id, err := strconv.Atoi(r.Form.Get("repo")) 864 if err != nil { 865 http.Error(w, err.Error(), http.StatusBadRequest) 866 return 867 } 868 869 go func() { s.forceIndex(uint32(id)) }() 870 871 // 202 Accepted 872 w.WriteHeader(http.StatusAccepted) 873} 874 875func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 876 withIndexed := true 877 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 878 withIndexed = b 879 } 880 881 var indexed []uint32 882 if withIndexed { 883 indexed = listIndexed(s.IndexDir) 884 } 885 886 repos, err := s.Sourcegraph.List(r.Context(), indexed) 887 if err != nil { 888 http.Error(w, err.Error(), http.StatusInternalServerError) 889 return 890 } 891 892 bw := bytes.Buffer{} 893 894 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 895 896 _, err = fmt.Fprintf(tw, "ID\tName\n") 897 if err != nil { 898 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 899 return 900 } 901 902 s.queue.mu.Lock() 903 name := "" 904 for _, id := range repos.IDs { 905 if item := s.queue.get(id); item != nil { 906 name = item.opts.Name 907 } else { 908 name = "" 909 } 910 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 911 if err != nil { 912 debugLog.Printf("handleDebugList: %s\n", err.Error()) 913 } 914 } 915 s.queue.mu.Unlock() 916 917 if err != nil { 918 http.Error(w, err.Error(), http.StatusInternalServerError) 919 return 920 } 921 922 err = tw.Flush() 923 if err != nil { 924 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 925 return 926 } 927 928 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 929 930 if _, err := io.Copy(w, &bw); err != nil { 931 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 932 return 933 } 934} 935 936// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 937// can run this command during periods of low usage (evenings, weekends) to 938// trigger an initial merge run. In the steady-state, merges happen rarely, even 939// on busy instances, and users can rely on automatic merging instead. 940func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 941 // A merge operation can take very long, depending on the number merges and the 942 // target size of the compound shards. We run the merge in the background and 943 // return immediately to the user. 944 // 945 // We track the status of the merge with metricShardMergingRunning. 946 go func() { 947 s.doMerge() 948 }() 949 _, _ = w.Write([]byte("merging enqueued\n")) 950} 951 952func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 953 indexed := listIndexed(s.IndexDir) 954 955 bw := bytes.Buffer{} 956 957 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 958 959 _, err := fmt.Fprintf(tw, "ID\tName\n") 960 if err != nil { 961 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 962 return 963 } 964 965 s.queue.mu.Lock() 966 name := "" 967 for _, id := range indexed { 968 if item := s.queue.get(id); item != nil { 969 name = item.opts.Name 970 } else { 971 name = "" 972 } 973 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 974 if err != nil { 975 debugLog.Printf("handleDebugIndexed: %s\n", err.Error()) 976 } 977 } 978 s.queue.mu.Unlock() 979 980 if err != nil { 981 http.Error(w, err.Error(), http.StatusInternalServerError) 982 return 983 } 984 985 err = tw.Flush() 986 if err != nil { 987 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 988 return 989 } 990 991 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 992 993 if _, err := io.Copy(w, &bw); err != nil { 994 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 995 return 996 } 997} 998 999// forceIndex will run the index job for repo name now. It will return always 1000// return a string explaining what it did, even if it failed. 1001func (s *Server) forceIndex(id uint32) (string, error) { 1002 var opts IndexOptions 1003 var err error 1004 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 1005 opts = o 1006 }, func(_ uint32, e error) { 1007 err = e 1008 }, id) 1009 if err != nil { 1010 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 1011 } 1012 1013 args := s.indexArgs(opts) 1014 args.Incremental = false // force re-index 1015 1016 var state indexState 1017 ran := s.muIndexDir.With(opts.Name, func() { 1018 state, err = s.Index(args) 1019 }) 1020 if !ran { 1021 return fmt.Sprintf("index job for repository already running: %s", args), nil 1022 } 1023 if err != nil { 1024 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 1025 } 1026 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 1027} 1028 1029func listIndexed(indexDir string) []uint32 { 1030 index := getShards(indexDir) 1031 metricNumIndexed.Set(float64(len(index))) 1032 repoIDs := make([]uint32, 0, len(index)) 1033 for id := range index { 1034 repoIDs = append(repoIDs, id) 1035 } 1036 sort.Slice(repoIDs, func(i, j int) bool { 1037 return repoIDs[i] < repoIDs[j] 1038 }) 1039 return repoIDs 1040} 1041 1042// setupTmpDir sets up a temporary directory on the same volume as the 1043// indexes. 1044// 1045// If main is true we will delete older temp directories left around. main is 1046// false when this is a debug command. 1047func setupTmpDir(logger sglog.Logger, main bool, index string) error { 1048 // change the target tmp directory depending on if it's our main daemon or a 1049 // debug sub command. 1050 dir := ".indexserver.debug.tmp" 1051 if main { 1052 dir = ".indexserver.tmp" 1053 } 1054 1055 tmpRoot := filepath.Join(index, dir) 1056 1057 if main { 1058 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1059 err := os.RemoveAll(tmpRoot) 1060 if err != nil { 1061 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1062 } 1063 } 1064 1065 if err := os.MkdirAll(tmpRoot, 0o755); err != nil { 1066 return err 1067 } 1068 1069 return os.Setenv("TMPDIR", tmpRoot) 1070} 1071 1072func printMetaData(fn string) error { 1073 repo, indexMeta, err := zoekt.ReadMetadataPath(fn) 1074 if err != nil { 1075 return err 1076 } 1077 1078 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1079 if err != nil { 1080 return err 1081 } 1082 1083 err = json.NewEncoder(os.Stdout).Encode(repo) 1084 if err != nil { 1085 return err 1086 } 1087 return nil 1088} 1089 1090func printShardStats(fn string) error { 1091 f, err := os.Open(fn) 1092 if err != nil { 1093 return err 1094 } 1095 1096 iFile, err := zoekt.NewIndexFile(f) 1097 if err != nil { 1098 return err 1099 } 1100 1101 return zoekt.PrintNgramStats(iFile) 1102} 1103 1104func srcLogLevelIsDebug() bool { 1105 lvl := os.Getenv(sglog.EnvLogLevel) 1106 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1107} 1108 1109func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1110 v := os.Getenv(k) 1111 if v == "" { 1112 return defaultVal 1113 } 1114 b, err := strconv.ParseBool(v) 1115 if err != nil { 1116 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1117 } 1118 return b 1119} 1120 1121func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1122 v := os.Getenv(k) 1123 if v == "" { 1124 return defaultVal 1125 } 1126 i, err := strconv.ParseInt(v, 10, 64) 1127 if err != nil { 1128 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1129 } 1130 return i 1131} 1132 1133func getEnvWithDefaultInt(k string, defaultVal int) int { 1134 v := os.Getenv(k) 1135 if v == "" { 1136 return defaultVal 1137 } 1138 i, err := strconv.Atoi(k) 1139 if err != nil { 1140 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1141 } 1142 return i 1143} 1144 1145func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1146 v := os.Getenv(k) 1147 if v == "" { 1148 return defaultVal 1149 } 1150 i, err := strconv.ParseUint(v, 10, 64) 1151 if err != nil { 1152 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1153 } 1154 return i 1155} 1156 1157func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1158 v := os.Getenv(k) 1159 if v == "" { 1160 return defaultVal 1161 } 1162 f, err := strconv.ParseFloat(v, 64) 1163 if err != nil { 1164 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1165 } 1166 return f 1167} 1168 1169func getEnvWithDefaultString(k string, defaultVal string) string { 1170 v := os.Getenv(k) 1171 if v == "" { 1172 return defaultVal 1173 } 1174 return v 1175} 1176 1177func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1178 v := os.Getenv(k) 1179 if v == "" { 1180 return defaultVal 1181 } 1182 1183 d, err := time.ParseDuration(v) 1184 if err != nil { 1185 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1186 } 1187 return d 1188} 1189 1190func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1191 set := map[string]struct{}{} 1192 for _, v := range strings.Split(os.Getenv(k), ",") { 1193 v = strings.TrimSpace(v) 1194 if v != "" { 1195 set[v] = struct{}{} 1196 } 1197 } 1198 return set 1199} 1200 1201func joinStringSet(set map[string]struct{}, sep string) string { 1202 var xs []string 1203 for x := range set { 1204 xs = append(xs, x) 1205 } 1206 1207 return strings.Join(xs, sep) 1208} 1209 1210func setShardsCounter(indexDir string) { 1211 fns, err := filepath.Glob(filepath.Join(indexDir, "*.zoekt")) 1212 if err != nil { 1213 errorLog.Printf("setShardsCounter: %s\n", err) 1214 return 1215 } 1216 metricNumberShards.Set(float64(len(fns))) 1217 1218 compoundFns := make([]string, 0, len(fns)) 1219 for _, fn := range fns { 1220 if strings.HasPrefix(filepath.Base(fn), "compound-") { 1221 compoundFns = append(compoundFns, fn) 1222 } 1223 } 1224 metricNumberCompoundShards.Set(float64(len(fns))) 1225} 1226 1227func rootCmd() *ffcli.Command { 1228 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1229 conf := rootConfig{ 1230 Main: true, 1231 } 1232 conf.registerRootFlags(rootFs) 1233 1234 return &ffcli.Command{ 1235 FlagSet: rootFs, 1236 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1237 Subcommands: []*ffcli.Command{debugCmd()}, 1238 Exec: func(ctx context.Context, args []string) error { 1239 return startServer(conf) 1240 }, 1241 } 1242} 1243 1244type rootConfig struct { 1245 // Main is true if this rootConfig is for our main long running command (the 1246 // indexserver). Debug commands should not set this value. This is used to 1247 // determine if we need to run tmpfriend. 1248 Main bool 1249 1250 root string 1251 interval time.Duration 1252 index string 1253 indexConcurrency int64 1254 listen string 1255 hostname string 1256 cpuFraction float64 1257 1258 // config values related to shard merging 1259 disableShardMerging bool 1260 vacuumInterval time.Duration 1261 mergeInterval time.Duration 1262 targetSize int64 1263 minSize int64 1264 minAgeDays int 1265 1266 // config values related to backoff indexing repos with one or more consecutive failures 1267 backoffDuration time.Duration 1268 maxBackoffDuration time.Duration 1269} 1270 1271func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1272 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1273 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1274 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently") 1275 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") 1276 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1277 fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1278 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1279 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1280 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1281 1282 // flags related to shard merging 1283 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging") 1284 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1285 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1286 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB") 1287 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB") 1288 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1289} 1290 1291func startServer(conf rootConfig) error { 1292 s, err := newServer(conf) 1293 if err != nil { 1294 return err 1295 } 1296 1297 profiler.Init("zoekt-sourcegraph-indexserver") 1298 setShardsCounter(s.IndexDir) 1299 1300 if conf.listen != "" { 1301 1302 mux := http.NewServeMux() 1303 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1304 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1305 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1306 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1307 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1308 }...) 1309 s.addDebugHandlers(mux) 1310 1311 go func() { 1312 debugLog.Printf("serving HTTP on %s", conf.listen) 1313 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1314 }() 1315 1316 // Serve mux on a unix domain socket on a best-effort-basis so that 1317 // webserver can call the endpoints via the shared filesystem. 1318 // 1319 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1320 // on the socket due to permission errors. See 1321 // https://github.com/docker/for-mac/issues/6239 1322 go func() { 1323 serveHTTPOverSocket := func() error { 1324 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1325 // We cannot bind a socket to an existing pathname. 1326 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1327 return fmt.Errorf("error removing socket file: %s", socket) 1328 } 1329 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1330 // unixpacket). 1331 l, err := net.Listen("unix", socket) 1332 if err != nil { 1333 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1334 } 1335 // Indexserver (root) and webserver (Sourcegraph) run with 1336 // different users. Per default, the socket is created with 1337 // permission 755 (root root), which doesn't let webserver write to 1338 // it. 1339 // 1340 // See https://github.com/golang/go/issues/11822 for more context. 1341 if err := os.Chmod(socket, 0o777); err != nil { 1342 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1343 } 1344 debugLog.Printf("serving HTTP on %s", socket) 1345 return http.Serve(l, mux) 1346 } 1347 debugLog.Print(serveHTTPOverSocket()) 1348 }() 1349 } 1350 1351 oc := &ownerChecker{ 1352 Path: filepath.Join(conf.index, "owner.txt"), 1353 Hostname: conf.hostname, 1354 } 1355 go oc.Run() 1356 1357 logger := sglog.Scoped("metricsRegistration") 1358 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1359 1360 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1361 prometheus.DefaultRegisterer.MustRegister(c) 1362 1363 s.Run() 1364 return nil 1365} 1366 1367func newServer(conf rootConfig) (*Server, error) { 1368 logger := sglog.Scoped("server") 1369 1370 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1371 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1372 } 1373 if conf.index == "" { 1374 return nil, fmt.Errorf("must set -index") 1375 } 1376 if conf.root == "" { 1377 return nil, fmt.Errorf("must set -sourcegraph_url") 1378 } 1379 rootURL, err := url.Parse(conf.root) 1380 if err != nil { 1381 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1382 } 1383 1384 rootURL = addDefaultPort(rootURL) 1385 1386 // Tune GOMAXPROCS to match Linux container CPU quota. 1387 _, _ = maxprocs.Set() 1388 1389 // Automatically prepend our own path at the front, to minimize 1390 // required configuration. 1391 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1392 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1393 } 1394 1395 if _, err := os.Stat(conf.index); err != nil { 1396 if err := os.MkdirAll(conf.index, 0o755); err != nil { 1397 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1398 } 1399 } 1400 1401 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1402 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1403 } 1404 1405 if srcLogLevelIsDebug() { 1406 debugLog.SetOutput(os.Stderr) 1407 } 1408 1409 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1410 if len(reposWithSeparateIndexingMetrics) > 0 { 1411 debugLog.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1412 } 1413 1414 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1415 if len(deltaBuildRepositoriesAllowList) > 0 { 1416 debugLog.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1417 } 1418 1419 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1420 if deltaShardNumberFallbackThreshold > 0 { 1421 debugLog.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1422 } else { 1423 debugLog.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1424 } 1425 1426 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1427 if len(reposShouldSkipSymbolsCalculation) > 0 { 1428 debugLog.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1429 } 1430 1431 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1432 if indexingTimeout != defaultIndexingTimeout { 1433 debugLog.Printf("using configured indexing timeout: %s", indexingTimeout) 1434 } 1435 1436 var sg Sourcegraph 1437 if rootURL.IsAbs() { 1438 var batchSize int 1439 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1440 batchSize, err = strconv.Atoi(v) 1441 if err != nil { 1442 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1443 } 1444 } 1445 1446 opts := []SourcegraphClientOption{ 1447 WithBatchSize(batchSize), 1448 } 1449 1450 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1451 client, err := dialGRPCClient(rootURL.Host, logger) 1452 if err != nil { 1453 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1454 } 1455 1456 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...) 1457 1458 } else { 1459 sg = sourcegraphFake{ 1460 RootDir: rootURL.String(), 1461 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1462 } 1463 } 1464 1465 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1466 if cpuCount < 1 { 1467 cpuCount = 1 1468 } 1469 1470 if conf.indexConcurrency < 1 { 1471 conf.indexConcurrency = 1 1472 } else if conf.indexConcurrency > int64(cpuCount) { 1473 conf.indexConcurrency = int64(cpuCount) 1474 } 1475 1476 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1477 1478 return &Server{ 1479 logger: logger, 1480 Sourcegraph: sg, 1481 IndexDir: conf.index, 1482 IndexConcurrency: int(conf.indexConcurrency), 1483 Interval: conf.interval, 1484 CPUCount: cpuCount, 1485 queue: *q, 1486 shardMerging: !conf.disableShardMerging, 1487 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1488 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1489 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1490 hostname: conf.hostname, 1491 mergeOpts: mergeOpts{ 1492 vacuumInterval: conf.vacuumInterval, 1493 mergeInterval: conf.mergeInterval, 1494 targetSizeBytes: conf.targetSize * 1024 * 1024, 1495 minSizeBytes: conf.minSize * 1024 * 1024, 1496 minAgeDays: conf.minAgeDays, 1497 }, 1498 timeout: indexingTimeout, 1499 }, err 1500} 1501 1502// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1503// for the indexed-search-configuration gRPC service. 1504// 1505// The default backoff strategy is modeled after the default settings used by 1506// retryablehttp.DefaultClient. 1507// 1508// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1509// - Unavailable 1510// - Aborted 1511// 1512//go:embed default_grpc_service_configuration.json 1513var defaultGRPCServiceConfigurationJSON string 1514 1515func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1516 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1517 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1518 return invoker(ctx, method, req, reply, cc, opts...) 1519 } 1520} 1521 1522func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1523 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1524 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1525 return streamer(ctx, desc, cc, method, opts...) 1526 } 1527} 1528 1529// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1530// This can be overridden by providing custom Server/Dial options. 1531const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1532 1533func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) { 1534 metrics := clientMetricsOnce() 1535 1536 // If the service seems to be unavailable, this 1537 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1538 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1539 retryOpts := []retry.CallOption{ 1540 retry.WithMax(5), 1541 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1542 retry.WithCodes(codes.Unavailable), 1543 } 1544 1545 opts := []grpc.DialOption{ 1546 grpc.WithTransportCredentials(insecure.NewCredentials()), 1547 grpc.WithChainStreamInterceptor( 1548 metrics.StreamClientInterceptor(), 1549 messagesize.StreamClientInterceptor, 1550 internalActorStreamInterceptor(), 1551 internalerrs.LoggingStreamClientInterceptor(logger), 1552 internalerrs.PrometheusStreamClientInterceptor, 1553 retry.StreamClientInterceptor(retryOpts...), 1554 ), 1555 grpc.WithChainUnaryInterceptor( 1556 metrics.UnaryClientInterceptor(), 1557 messagesize.UnaryClientInterceptor, 1558 internalActorUnaryInterceptor(), 1559 internalerrs.LoggingUnaryClientInterceptor(logger), 1560 internalerrs.PrometheusUnaryClientInterceptor, 1561 retry.UnaryClientInterceptor(retryOpts...), 1562 ), 1563 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1564 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1565 } 1566 1567 opts = append(opts, additionalOpts...) 1568 1569 // Ensure that the message size options are set last, so they override any other 1570 // client-specific options that tweak the message size. 1571 // 1572 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1573 // take precedence over everything else with a uniform size setting that's easy to reason about. 1574 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1575 1576 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1577 // This is done lazily, so we can provide the client to use regardless of 1578 // whether we enabled gRPC or not initially. 1579 cc, err := grpc.Dial(addr, opts...) 1580 if err != nil { 1581 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1582 } 1583 1584 client := proto.NewZoektConfigurationServiceClient(cc) 1585 return client, nil 1586} 1587 1588// addDefaultPort adds a default port to a URL if one is not specified. 1589// 1590// If the URL scheme is "http" and no port is specified, "80" is used. 1591// If the scheme is "https", "443" is used. 1592// 1593// The original URL is not mutated. A copy is modified and returned. 1594func addDefaultPort(original *url.URL) *url.URL { 1595 if original == nil { 1596 return nil // don't panic 1597 } 1598 1599 if !original.IsAbs() { 1600 return original // don't do anything if the URL doesn't look like a remote URL 1601 } 1602 1603 if original.Scheme == "http" && original.Port() == "" { 1604 u := cloneURL(original) 1605 u.Host = net.JoinHostPort(u.Host, "80") 1606 return u 1607 } 1608 1609 if original.Scheme == "https" && original.Port() == "" { 1610 u := cloneURL(original) 1611 u.Host = net.JoinHostPort(u.Host, "443") 1612 return u 1613 } 1614 1615 return original 1616} 1617 1618// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1619// This is copied from net/http/clone.go 1620func cloneURL(u *url.URL) *url.URL { 1621 if u == nil { 1622 return nil 1623 } 1624 u2 := new(url.URL) 1625 *u2 = *u 1626 if u.User != nil { 1627 u2.User = new(url.Userinfo) 1628 *u2.User = *u.User 1629 } 1630 return u2 1631} 1632 1633func main() { 1634 liblog := sglog.Init(sglog.Resource{ 1635 Name: "zoekt-indexserver", 1636 Version: zoekt.Version, 1637 InstanceID: zoekt.HostnameBestEffort(), 1638 }) 1639 defer liblog.Sync() 1640 1641 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1642 log.Fatal(err) 1643 } 1644} 1645 1646// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1647// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1648// 1649// An error is returned of the provided environment variables fails to parse as a boolean. 1650func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1651 for _, envVar := range envVarNames { 1652 v := os.Getenv(envVar) 1653 if v == "" { 1654 continue 1655 } 1656 1657 b, err := strconv.ParseBool(v) 1658 if err != nil { 1659 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1660 } 1661 1662 return b, nil 1663 } 1664 1665 return defaultBool, nil 1666}