fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Licensed under the Apache License, Version 2.0 (the "License"); 2// you may not use this file except in compliance with the License. 3// You may obtain a copy of the License at 4// 5// http://www.apache.org/licenses/LICENSE-2.0 6// 7// Unless required by applicable law or agreed to in writing, software 8// distributed under the License is distributed on an "AS IS" BASIS, 9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 10// See the License for the specific language governing permissions and 11// limitations under the License. 12 13// Command zoekt-sourcegraph-indexserver periodically reindexes repositories 14// from a Sourcegraph instance. It uses a "pull-based" design, where it periodically 15// reaches out to the Sourcegraph instance for the list of repositories to reindex. 16package main 17 18import ( 19 "bytes" 20 "context" 21 _ "embed" 22 "encoding/json" 23 "errors" 24 "flag" 25 "fmt" 26 "html/template" 27 "io" 28 "io/fs" 29 "log" 30 "math" 31 "math/rand" 32 "net" 33 "net/http" 34 "net/url" 35 "os" 36 "os/exec" 37 "os/signal" 38 "path/filepath" 39 "runtime" 40 "slices" 41 "sort" 42 "strconv" 43 "strings" 44 "sync" 45 "text/tabwriter" 46 "time" 47 48 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 49 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 50 "github.com/peterbourgon/ff/v3/ffcli" 51 "github.com/prometheus/client_golang/prometheus" 52 "github.com/prometheus/client_golang/prometheus/promauto" 53 sglog "github.com/sourcegraph/log" 54 "github.com/sourcegraph/mountinfo" 55 56 "github.com/sourcegraph/zoekt" 57 configv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/sourcegraph/zoekt/configuration/v1" 58 indexserverv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/zoekt/indexserver/v1" 59 "github.com/sourcegraph/zoekt/grpc/defaults" 60 "github.com/sourcegraph/zoekt/grpc/grpcutil" 61 "github.com/sourcegraph/zoekt/grpc/internalerrs" 62 "github.com/sourcegraph/zoekt/grpc/messagesize" 63 "github.com/sourcegraph/zoekt/index" 64 "github.com/sourcegraph/zoekt/internal/debugserver" 65 "github.com/sourcegraph/zoekt/internal/profiler" 66 "github.com/sourcegraph/zoekt/internal/tenant" 67 68 "go.uber.org/automaxprocs/maxprocs" 69 "go.uber.org/multierr" 70 "golang.org/x/net/trace" 71 "golang.org/x/sys/unix" 72 "google.golang.org/grpc" 73 "google.golang.org/grpc/codes" 74 "google.golang.org/grpc/credentials/insecure" 75 "google.golang.org/grpc/metadata" 76) 77 78var ( 79 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 80 Name: "resolve_revisions_seconds", 81 Help: "A histogram of latencies for resolving all repository revisions.", 82 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 83 }) 84 85 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 86 Name: "resolve_revision_seconds", 87 Help: "A histogram of latencies for resolving a repository revision.", 88 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 89 }, []string{"success"}) // success=true|false 90 91 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 92 Name: "get_index_options_total", 93 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 94 }) 95 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 96 Name: "get_index_options_error_total", 97 Help: "The total number of times we failed to get index options for a repository.", 98 }) 99 100 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 101 Name: "index_repo_seconds", 102 Help: "A histogram of latencies for indexing a repository.", 103 Buckets: prometheus.ExponentialBucketsRange( 104 (100 * time.Millisecond).Seconds(), 105 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 106 20), 107 }, []string{ 108 "state", // state is an indexState 109 "name", // name of the repository that was indexed 110 }) 111 112 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{ 113 Name: "index_indexing_delay_seconds", 114 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.", 115 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days 116 }, []string{ 117 "state", // state is an indexState 118 "name", // the name of the repository that was indexed 119 }) 120 121 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 122 Name: "index_fetch_seconds", 123 Help: "A histogram of latencies for fetching a repository.", 124 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 125 }, []string{ 126 "success", // true|false 127 "name", // the name of the repository that the commits were fetched from 128 }) 129 130 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 131 Name: "index_incremental_index_state", 132 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 133 }, []string{"state"}) // state is index.IndexState 134 135 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 136 Name: "index_num_indexed", 137 Help: "Number of indexed repos by code host", 138 }) 139 140 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 141 Name: "index_failing_total", 142 Help: "Counts failures to index (indexing activity, should be used with rate())", 143 }) 144 145 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 146 Name: "index_indexing_total", 147 Help: "Counts indexings (indexing activity, should be used with rate())", 148 }) 149 150 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 151 Name: "index_num_stopped_tracking_total", 152 Help: "Counts the number of repos we stopped tracking.", 153 }) 154 155 // clientMetricsOnce returns a singleton instance of the client metrics 156 // that are shared across all gRPC clients that this process creates. 157 // 158 // This function panics if the metrics cannot be registered with the default 159 // Prometheus registry. 160 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics { 161 clientMetrics := grpcprom.NewClientMetrics( 162 grpcprom.WithClientCounterOptions(), 163 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 164 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 165 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 166 ) 167 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 168 return clientMetrics 169 }) 170) 171 172// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 173// NOTE: if you change this, you must also update gitIndex to use the same value when fetching the repo. 174const MaxFileSize = 1 << 20 175 176// set of repositories that we want to capture separate indexing metrics for 177var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 178 179type indexState string 180 181const ( 182 indexStateFail indexState = "fail" 183 indexStateSuccess indexState = "success" 184 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 185 indexStateNoop indexState = "noop" // We didn't need to update index 186 indexStateEmpty indexState = "empty" // index is empty (empty repo) 187) 188 189// Server is the main functionality of zoekt-sourcegraph-indexserver. It 190// exists to conveniently use all the options passed in via func main. 191type Server struct { 192 logger sglog.Logger 193 194 Sourcegraph Sourcegraph 195 196 // rootURL is the root URL of the Sourcegraph instance. 197 rootURL *url.URL 198 199 BatchSize int 200 201 // IndexDir is the index directory to use. 202 IndexDir string 203 204 // IndexConcurrency is the number of repositories we index at once. 205 IndexConcurrency int 206 207 // indexSemaphore limits the number of concurrent index operations 208 indexSemaphore chan struct{} 209 210 // Interval is how often we sync with Sourcegraph. 211 Interval time.Duration 212 213 // CPUCount is the number of CPUs to use for indexing shards. 214 CPUCount int 215 216 queue Queue 217 218 // muIndexDir protects the index directory from concurrent access. 219 muIndexDir indexMutex 220 221 // If true, shard merging is enabled. 222 shardMerging bool 223 224 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 225 // use delta-builds for instead of normal builds 226 deltaBuildRepositoriesAllowList map[string]struct{} 227 228 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 229 // before attempting a delta build. 230 deltaShardNumberFallbackThreshold uint64 231 232 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 233 // we skip calculating symbols metadata for during builds 234 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 235 236 hostname string 237 238 mergeOpts mergeOpts 239 240 // timeout defines how long the index server waits before killing an indexing job. 241 timeout time.Duration 242 243 indexserverv1.UnimplementedSourcegraphIndexserverServiceServer 244} 245 246var ( 247 debugLog = log.New(io.Discard, "[DEBUG] ", log.LstdFlags) 248 infoLog = log.New(os.Stderr, "[INFO] ", log.LstdFlags) 249 errorLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags) 250) 251 252// our index commands should output something every 100mb they process. 253// 254// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 255// time." famous last words. A client was indexing a monorepo with 42 256// cores... 5m was not enough. 257const noOutputTimeout = 30 * time.Minute 258 259func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 260 out := &synchronizedBuffer{} 261 cmd.Stdout = out 262 cmd.Stderr = out 263 264 tr.LazyPrintf("%s", cmd.Args) 265 266 defer func() { 267 if err != nil { 268 outS := out.String() 269 tr.LazyPrintf("failed: %v", err) 270 tr.LazyPrintf("output: %s", out) 271 tr.SetError() 272 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 273 } 274 }() 275 276 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 277 278 if err := cmd.Start(); err != nil { 279 return err 280 } 281 282 errC := make(chan error) 283 go func() { 284 errC <- cmd.Wait() 285 }() 286 287 // This channel is set after we have sent sigquit. It allows us to follow up 288 // with a sigkill if the process doesn't quit after sigquit. 289 kill := make(<-chan time.Time) 290 291 lastLen := 0 292 for { 293 select { 294 case <-time.After(noOutputTimeout): 295 // Periodically check if we have had output. If not kill the process. 296 if out.Len() != lastLen { 297 lastLen = out.Len() 298 infoLog.Printf("still running %s", cmd.Args) 299 } else { 300 // Send quit (C-\) first so we get a stack dump. 301 infoLog.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 302 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 303 errorLog.Println("quit failed:", err) 304 } 305 306 // send sigkill if still running in 10s 307 kill = time.After(10 * time.Second) 308 } 309 310 case <-kill: 311 infoLog.Printf("still running, killing %s", cmd.Args) 312 if err := cmd.Process.Kill(); err != nil { 313 errorLog.Println("kill failed:", err) 314 } 315 316 case err := <-errC: 317 if err != nil { 318 return err 319 } 320 321 tr.LazyPrintf("success") 322 return nil 323 } 324 } 325} 326 327// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 328// monitor the buffer while it is being written to. 329type synchronizedBuffer struct { 330 mu sync.Mutex 331 b bytes.Buffer 332} 333 334func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 335 sb.mu.Lock() 336 defer sb.mu.Unlock() 337 return sb.b.Write(p) 338} 339 340func (sb *synchronizedBuffer) Len() int { 341 sb.mu.Lock() 342 defer sb.mu.Unlock() 343 return sb.b.Len() 344} 345 346func (sb *synchronizedBuffer) String() string { 347 sb.mu.Lock() 348 defer sb.mu.Unlock() 349 return sb.b.String() 350} 351 352// pauseFileName if present in IndexDir will stop index jobs from 353// running. This is to make it possible to experiment with the content of the 354// IndexDir without the indexserver writing to it. 355const ( 356 pauseFileName = "PAUSE" 357 pauseEnvVar = "SRC_PAUSE_INDEXING" 358) 359 360// isIndexingPaused checks if indexing should be paused based on either: 361// 1. The presence of a PAUSE file in the index directory 362// 2. The SRC_PAUSE_INDEXING environment variable being set to true 363func isIndexingPaused(indexDir string) (bool, string) { 364 // Check for PAUSE file first 365 if b, err := os.ReadFile(filepath.Join(indexDir, pauseFileName)); err == nil { 366 return true, fmt.Sprintf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 367 } 368 369 // Then check environment variable 370 if getEnvWithDefaultBool(pauseEnvVar, false) { 371 return true, fmt.Sprintf("indexserver paused via %s environment variable", pauseEnvVar) 372 } 373 374 return false, "" 375} 376 377// Run the sync loop. This blocks forever. 378func (s *Server) Run() { 379 removeIncompleteShards(s.IndexDir) 380 381 // Start a goroutine which updates the queue with commits to index. 382 go func() { 383 // We update the list of indexed repos every Interval. To speed up manual 384 // testing we also listen for SIGUSR1 to trigger updates. 385 // "pkill -SIGUSR1 zoekt-sourcegra" 386 for range jitterTicker(s.Interval, unix.SIGUSR1) { 387 if paused, msg := isIndexingPaused(s.IndexDir); paused { 388 infoLog.Printf("%s", msg) 389 continue 390 } 391 392 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 393 if err != nil { 394 errorLog.Printf("error listing repos: %s", err) 395 continue 396 } 397 398 debugLog.Printf("updating index queue with %d repositories", len(repos.IDs)) 399 400 // Stop indexing repos we don't need to track anymore 401 removed := s.queue.MaybeRemoveMissing(repos.IDs) 402 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 403 if len(removed) > 0 { 404 infoLog.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 405 } 406 407 cleanupDone := make(chan struct{}) 408 go func() { 409 defer close(cleanupDone) 410 s.muIndexDir.Global(func() { 411 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 412 }) 413 }() 414 415 repos.IterateIndexOptions(s.queue.AddOrUpdate) 416 417 // IterateIndexOptions will only iterate over repositories that have 418 // changed since we last called list. However, we want to add all IDs 419 // back onto the queue just to check that what is on disk is still 420 // correct. This will use the last IndexOptions we stored in the 421 // queue. The repositories not on the queue (missing) need a forced 422 // fetch of IndexOptions. 423 missing := s.queue.Bump(repos.IDs) 424 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 425 426 setShardsCounter(s.IndexDir) 427 428 <-cleanupDone 429 } 430 }() 431 432 go func() { 433 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 434 if s.shardMerging { 435 s.vacuum() 436 } 437 } 438 }() 439 440 go func() { 441 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 442 if s.shardMerging { 443 s.doMerge() 444 } 445 } 446 }() 447 448 for range s.IndexConcurrency { 449 go s.processQueue() 450 } 451 452 // block forever 453 select {} 454} 455 456// formatList returns a comma-separated list of the first min(len(v), m) items. 457func formatListUint32(v []uint32, m int) string { 458 if len(v) < m { 459 m = len(v) 460 } 461 462 sb := strings.Builder{} 463 for i := range m { 464 fmt.Fprintf(&sb, "%d, ", v[i]) 465 } 466 467 if len(v) > m { 468 sb.WriteString("...") 469 } 470 471 return strings.TrimRight(sb.String(), ", ") 472} 473 474func (s *Server) processQueue() { 475 for { 476 if paused, _ := isIndexingPaused(s.IndexDir); paused { 477 time.Sleep(time.Second) 478 continue 479 } 480 481 item, ok := s.queue.Pop() 482 if !ok { 483 time.Sleep(time.Second) 484 continue 485 } 486 487 opts := item.Opts 488 args := s.indexArgs(opts) 489 490 ran := s.muIndexDir.With(opts.Name, func() { 491 // only record time taken once we hold the lock. This avoids us 492 // recording time taken while merging/cleanup runs. 493 start := time.Now() 494 495 state, err := s.index(context.Background(), args) 496 497 elapsed := time.Since(start) 498 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 499 500 indexDelay := time.Since(item.DateAddedToQueue) 501 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds()) 502 503 if err != nil { 504 errorLog.Printf("error indexing %s: %s", args.String(), err) 505 } 506 507 switch state { 508 case indexStateSuccess: 509 var branches []string 510 for _, b := range args.Branches { 511 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 512 } 513 s.logger.Info("updated index", 514 sglog.Int("tenant", args.TenantID), 515 sglog.String("repo", args.Name), 516 sglog.Uint32("id", args.RepoID), 517 sglog.Strings("branches", branches), 518 sglog.Duration("duration", elapsed), 519 sglog.Duration("index_delay", indexDelay), 520 ) 521 case indexStateSuccessMeta: 522 infoLog.Printf("updated meta %s in %v", args.String(), elapsed) 523 } 524 s.queue.SetIndexed(opts, state) 525 }) 526 527 if !ran { 528 // Someone else is processing the repository. We can just skip this job 529 // since the repository will be added back to the queue and we will 530 // converge to the correct behaviour. 531 debugLog.Printf("index job for repository already running: %s", args) 532 continue 533 } 534 } 535} 536 537// repoNameForMetric returns a normalized version of the given repository name that is 538// suitable for use with Prometheus metrics. 539func repoNameForMetric(repo string) string { 540 // Check to see if we want to be able to capture separate indexing metrics for this repository. 541 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 542 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 543 return repo 544 } 545 546 return "" 547} 548 549func batched(slice []uint32, size int) <-chan []uint32 { 550 c := make(chan []uint32) 551 go func() { 552 for len(slice) > 0 { 553 if size > len(slice) { 554 size = len(slice) 555 } 556 c <- slice[:size] 557 slice = slice[size:] 558 } 559 close(c) 560 }() 561 return c 562} 563 564// jitterTicker returns a ticker which ticks with a jitter. Each tick is 565// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 566// 567// sig is a list of signals which also cause the ticker to fire. This is a 568// convenience to allow manually triggering of the ticker. 569func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 570 ticker := make(chan struct{}) 571 572 go func() { 573 for { 574 ticker <- struct{}{} 575 ns := int64(d) 576 jitter := rand.Int63n(ns) 577 time.Sleep(time.Duration(ns/2 + jitter)) 578 } 579 }() 580 581 go func() { 582 if len(sig) == 0 { 583 return 584 } 585 586 c := make(chan os.Signal, 1) 587 signal.Notify(c, sig...) 588 for range c { 589 ticker <- struct{}{} 590 } 591 }() 592 593 return ticker 594} 595 596// Index starts an index job for repo name at commit. 597func (s *Server) index(ctx context.Context, args *indexArgs) (state indexState, err error) { 598 tr := trace.New("index", args.Name) 599 tr.SetMaxEvents(30) // Ensure we capture all indexing events 600 601 defer func() { 602 if err != nil { 603 tr.SetError() 604 tr.LazyPrintf("error: %v", err) 605 state = indexStateFail 606 metricFailingTotal.Inc() 607 } 608 tr.LazyPrintf("state: %s", state) 609 tr.Finish() 610 }() 611 612 // Sourcegraph should always provide a tenant ID. 613 if args.TenantID < 1 { 614 return indexStateFail, tenant.ErrMissingTenant 615 } 616 617 tr.LazyPrintf("branches: %v", args.Branches) 618 619 if len(args.Branches) == 0 { 620 return indexStateEmpty, createEmptyShard(args) 621 } 622 623 repositoryName := args.Name 624 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 625 tr.LazyPrintf("marking this repository for delta build") 626 args.UseDelta = true 627 } 628 629 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 630 631 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 632 tr.LazyPrintf("skipping symbols calculation") 633 args.Symbols = false 634 } 635 636 reason := "forced" 637 638 if args.Incremental { 639 bo := args.BuildOptions() 640 bo.SetDefaults() 641 incrementalState, fn := bo.IndexState() 642 reason = string(incrementalState) 643 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 644 645 switch incrementalState { 646 case index.IndexStateEqual: 647 debugLog.Printf("%s index already up to date. Shard=%s", args.String(), fn) 648 return indexStateNoop, nil 649 650 case index.IndexStateMeta: 651 infoLog.Printf("updating index.meta %s", args.String()) 652 653 // TODO(stefan) handle mergeMeta for tenant id. 654 if err := mergeMeta(bo); err != nil { 655 errorLog.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 656 } else { 657 return indexStateSuccessMeta, nil 658 } 659 660 case index.IndexStateCorrupt: 661 infoLog.Printf("falling back to full update: corrupt index: %s", args.String()) 662 } 663 } 664 665 infoLog.Printf("updating index %s reason=%s", args.String(), reason) 666 667 metricIndexingTotal.Inc() 668 c := gitIndexConfig{ 669 runCmd: func(cmd *exec.Cmd) error { 670 return s.loggedRun(tr, cmd) 671 }, 672 673 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 674 return args.BuildOptions().FindRepositoryMetadata() 675 }, 676 timeout: s.timeout, 677 } 678 679 err = gitIndex(ctx, c, args, s.Sourcegraph, s.logger) 680 if err != nil { 681 return indexStateFail, err 682 } 683 684 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 685 s.logger.Error("failed to update index status", 686 sglog.String("repo", args.Name), 687 sglog.Uint32("id", args.RepoID), 688 sglogBranches("branches", args.Branches), 689 sglog.Error(err), 690 ) 691 } 692 693 return indexStateSuccess, nil 694} 695 696// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 697// it can update the zoekt_repos table. 698func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 699 // We need to read from disk for IndexTime. 700 _, metadata, ok, err := c.findRepositoryMetadata(args) 701 if err != nil { 702 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 703 } 704 if !ok { 705 return errors.New("failed to find metadata for new/updated index") 706 } 707 708 status := []indexStatus{{ 709 RepoID: args.RepoID, 710 Branches: args.Branches, 711 IndexTimeUnix: metadata.IndexTime.Unix(), 712 }} 713 if err := sg.UpdateIndexStatus(status); err != nil { 714 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 715 } 716 717 return nil 718} 719 720func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 721 ss := make([]string, len(branches)) 722 for i, b := range branches { 723 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 724 } 725 return sglog.Strings(key, ss) 726} 727 728func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 729 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0)) 730 return &indexArgs{ 731 IndexOptions: opts, 732 IndexDir: s.IndexDir, 733 Parallelism: parallelism, 734 Incremental: true, 735 FileLimit: MaxFileSize, 736 ShardMerging: s.shardMerging, 737 } 738} 739 740// parallelism consults both the server flags and index options to determine the number 741// of shards to index in parallel. If the CPUCount index option is provided, it always 742// overrides the server flag. 743func (s *Server) parallelism(opts IndexOptions, maxProcs int) int { 744 var parallelism int 745 if opts.ShardConcurrency > 0 { 746 parallelism = int(opts.ShardConcurrency) 747 } else { 748 parallelism = s.CPUCount 749 } 750 751 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs 752 if parallelism > 4*maxProcs { 753 parallelism = 4 * maxProcs 754 } 755 756 // If index concurrency is set, then divide the parallelism by the number of 757 // repos we're indexing in parallel 758 if s.IndexConcurrency > 1 { 759 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency))) 760 } 761 762 return parallelism 763} 764 765func createEmptyShard(args *indexArgs) error { 766 bo := args.BuildOptions() 767 bo.SetDefaults() 768 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 769 770 if args.Incremental && bo.IncrementalSkipIndexing() { 771 return nil 772 } 773 774 builder, err := index.NewBuilder(*bo) 775 if err != nil { 776 return err 777 } 778 return builder.Finish() 779} 780 781// addDebugHandlers adds handlers specific to indexserver. 782func (s *Server) addDebugHandlers(mux *http.ServeMux) { 783 // Sourcegraph's site admin view requires indexserver to serve it's admin view 784 // on "/". 785 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 786 787 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 788 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 789 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 790 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 791 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 792 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 793} 794 795func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 796 if r.Method != "GET" { 797 w.Header().Set("Allow", "GET") 798 w.WriteHeader(http.StatusMethodNotAllowed) 799 return 800 } 801 802 response := struct { 803 Hostname string 804 }{ 805 Hostname: s.hostname, 806 } 807 808 b, err := json.Marshal(response) 809 if err != nil { 810 http.Error(w, err.Error(), http.StatusInternalServerError) 811 return 812 } 813 814 w.Header().Set("Content-Type", "application/json; charset=utf-8") 815 w.Write(b) 816} 817 818var rootTmpl = template.Must(template.New("name").Parse(` 819<html> 820 <body> 821 <a href="debug">Debug</a><br /> 822 <a href="debug/requests">Traces</a><br /> 823 {{.IndexMsg}}<br /> 824 <br /> 825 <h3>Reindex</h3> 826 {{if .Repos}} 827 <a href="?show_repos=false">hide repos</a><br /> 828 <table style="margin-top: 20px"> 829 <th style="text-align:left">Name</th> 830 <th style="text-align:left">ID (click to reindex)</th> 831 {{range .Repos}} 832 <tr> 833 <td>{{.Name}}</td> 834 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 835 </tr> 836 {{end}} 837 </table> 838 {{else}} 839 <a href="?show_repos=true">show repos</a><br /> 840 {{end}} 841 </body> 842</html> 843`)) 844 845func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 846 if r.Method != "GET" { 847 w.Header().Set("Allow", "GET") 848 w.WriteHeader(http.StatusMethodNotAllowed) 849 return 850 } 851 852 values := r.URL.Query() 853 854 // ?id= 855 indexMsg := "" 856 if v := values.Get("id"); v != "" { 857 id, err := strconv.Atoi(v) 858 if err != nil { 859 http.Error(w, err.Error(), http.StatusBadRequest) 860 return 861 } 862 indexMsg, _ = s.forceIndex(r.Context(), uint32(id)) 863 } 864 865 // ?show_repos= 866 showRepos := false 867 if v := values.Get("show_repos"); v != "" { 868 showRepos, _ = strconv.ParseBool(v) 869 } 870 871 type Repo struct { 872 ID uint32 873 Name string 874 } 875 var data struct { 876 Repos []Repo 877 IndexMsg string 878 } 879 880 data.IndexMsg = indexMsg 881 882 if showRepos { 883 s.queue.Iterate(func(opts *IndexOptions) { 884 data.Repos = append(data.Repos, Repo{ 885 ID: opts.RepoID, 886 Name: opts.Name, 887 }) 888 }) 889 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 890 } 891 892 _ = rootTmpl.Execute(w, data) 893} 894 895// handleReindex triggers a reindex asynocronously. If a reindex was triggered 896// the request returns with status 202. The caller can infer the new state of 897// the index by calling List. 898func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 899 if r.Method != http.MethodPost { 900 w.Header().Set("Allow", http.MethodPost) 901 w.WriteHeader(http.StatusMethodNotAllowed) 902 return 903 } 904 905 err := r.ParseForm() 906 if err != nil { 907 http.Error(w, err.Error(), http.StatusBadRequest) 908 return 909 } 910 911 id, err := strconv.Atoi(r.Form.Get("repo")) 912 if err != nil { 913 http.Error(w, err.Error(), http.StatusBadRequest) 914 return 915 } 916 917 go func() { s.forceIndex(r.Context(), uint32(id)) }() 918 919 // 202 Accepted 920 w.WriteHeader(http.StatusAccepted) 921} 922 923func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 924 withIndexed := true 925 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 926 withIndexed = b 927 } 928 929 var indexed []uint32 930 if withIndexed { 931 indexed = listIndexed(s.IndexDir) 932 } 933 934 repos, err := s.Sourcegraph.List(r.Context(), indexed) 935 if err != nil { 936 http.Error(w, err.Error(), http.StatusInternalServerError) 937 return 938 } 939 940 bw := bytes.Buffer{} 941 942 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 943 944 _, err = fmt.Fprintf(tw, "ID\tName\n") 945 if err != nil { 946 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 947 return 948 } 949 950 s.queue.mu.Lock() 951 name := "" 952 for _, id := range repos.IDs { 953 if item := s.queue.get(id); item != nil { 954 name = item.opts.Name 955 } else { 956 name = "" 957 } 958 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 959 if err != nil { 960 debugLog.Printf("handleDebugList: %s\n", err.Error()) 961 } 962 } 963 s.queue.mu.Unlock() 964 965 if err != nil { 966 http.Error(w, err.Error(), http.StatusInternalServerError) 967 return 968 } 969 970 err = tw.Flush() 971 if err != nil { 972 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 973 return 974 } 975 976 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 977 978 if _, err := io.Copy(w, &bw); err != nil { 979 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 980 return 981 } 982} 983 984// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 985// can run this command during periods of low usage (evenings, weekends) to 986// trigger an initial merge run. In the steady-state, merges happen rarely, even 987// on busy instances, and users can rely on automatic merging instead. 988func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 989 // A merge operation can take very long, depending on the number merges and the 990 // target size of the compound shards. We run the merge in the background and 991 // return immediately to the user. 992 // 993 // We track the status of the merge with metricShardMergingRunning. 994 go func() { 995 s.doMerge() 996 }() 997 _, _ = w.Write([]byte("merging enqueued\n")) 998} 999 1000func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 1001 indexed := listIndexed(s.IndexDir) 1002 1003 bw := bytes.Buffer{} 1004 1005 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 1006 1007 _, err := fmt.Fprintf(tw, "ID\tName\n") 1008 if err != nil { 1009 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 1010 return 1011 } 1012 1013 s.queue.mu.Lock() 1014 name := "" 1015 for _, id := range indexed { 1016 if item := s.queue.get(id); item != nil { 1017 name = item.opts.Name 1018 } else { 1019 name = "" 1020 } 1021 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 1022 if err != nil { 1023 debugLog.Printf("handleDebugIndexed: %s\n", err.Error()) 1024 } 1025 } 1026 s.queue.mu.Unlock() 1027 1028 if err != nil { 1029 http.Error(w, err.Error(), http.StatusInternalServerError) 1030 return 1031 } 1032 1033 err = tw.Flush() 1034 if err != nil { 1035 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 1036 return 1037 } 1038 1039 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 1040 1041 if _, err := io.Copy(w, &bw); err != nil { 1042 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 1043 return 1044 } 1045} 1046 1047// forceIndex will run the index job for repo name now. It will return always 1048// return a string explaining what it did, even if it failed. 1049func (s *Server) forceIndex(ctx context.Context, id uint32) (string, error) { 1050 var opts IndexOptions 1051 var err error 1052 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 1053 opts = o 1054 }, func(_ uint32, e error) { 1055 err = e 1056 }, id) 1057 if err != nil { 1058 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 1059 } 1060 1061 args := s.indexArgs(opts) 1062 args.Incremental = false // force re-index 1063 1064 var state indexState 1065 ran := s.muIndexDir.With(opts.Name, func() { 1066 state, err = s.index(ctx, args) 1067 }) 1068 if !ran { 1069 return fmt.Sprintf("index job for repository already running: %s", args), nil 1070 } 1071 if err != nil { 1072 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 1073 } 1074 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 1075} 1076 1077// DeleteAllData deletes all shards in the index and trash dir belonging to the 1078// tenant associated with the request. The deletion is best-effort, which means 1079// we will delete as much as possible. If no error is returned, the caller can 1080// be certain that all data has been deleted. 1081func (s *Server) DeleteAllData(ctx context.Context, _ *indexserverv1.DeleteAllDataRequest) (*indexserverv1.DeleteAllDataResponse, error) { 1082 tnt, err := tenant.FromContext(ctx) 1083 if err != nil { 1084 return nil, err 1085 } 1086 s.logger.Warn("DeleteAllData", sglog.Int("tenant_id", tnt.ID())) 1087 1088 var merr error 1089 s.muIndexDir.Global(func() { 1090 // First, explode all compound shards that have repos from the tenant in 1091 // question. Because we hold the global lock, we can be sure that no new 1092 // merges start while we do this. 1093 if err := s.explodeTenantCompoundShards(ctx, func(path string) error { 1094 // We call explode in a separate process to protect indexserver. 1095 cmd := defaultExplodeCmd(path) 1096 1097 stdoutBuf := &bytes.Buffer{} 1098 stderrBuf := &bytes.Buffer{} 1099 cmd.Stdout = stdoutBuf 1100 cmd.Stderr = stderrBuf 1101 1102 err := cmd.Run() 1103 if err != nil { 1104 errorLog.Printf("explode failed: %v (stderr: %s)", err, stderrBuf.String()) 1105 return err 1106 } 1107 1108 infoLog.Printf("exploded shard: %s", stdoutBuf.String()) 1109 1110 return nil 1111 }); err != nil { 1112 merr = multierr.Append(merr, err) 1113 } 1114 1115 // Invariant: all shards from the tenant are simple shards. 1116 1117 if err := purgeTenantShards(ctx, s.IndexDir); err != nil { 1118 merr = multierr.Append(merr, err) 1119 } 1120 if err := purgeTenantShards(ctx, filepath.Join(s.IndexDir, ".trash")); err != nil { 1121 merr = multierr.Append(merr, err) 1122 } 1123 }) 1124 1125 return &indexserverv1.DeleteAllDataResponse{}, merr 1126} 1127 1128func listIndexed(indexDir string) []uint32 { 1129 index := getShards(indexDir) 1130 metricNumIndexed.Set(float64(len(index))) 1131 repoIDs := make([]uint32, 0, len(index)) 1132 for id := range index { 1133 repoIDs = append(repoIDs, id) 1134 } 1135 slices.Sort(repoIDs) 1136 return repoIDs 1137} 1138 1139// setupTmpDir sets up a temporary directory on the same volume as the 1140// indexes. 1141// 1142// If main is true we will delete older temp directories left around. main is 1143// false when this is a debug command. 1144func setupTmpDir(logger sglog.Logger, main bool, index string) error { 1145 // change the target tmp directory depending on if it's our main daemon or a 1146 // debug sub command. 1147 dir := ".indexserver.debug.tmp" 1148 if main { 1149 dir = ".indexserver.tmp" 1150 } 1151 1152 tmpRoot := filepath.Join(index, dir) 1153 1154 if main { 1155 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1156 err := os.RemoveAll(tmpRoot) 1157 if err != nil { 1158 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1159 } 1160 } 1161 1162 if err := os.MkdirAll(tmpRoot, 0o755); err != nil { 1163 return err 1164 } 1165 1166 return os.Setenv("TMPDIR", tmpRoot) 1167} 1168 1169func printMetaData(fn string) error { 1170 repo, indexMeta, err := index.ReadMetadataPath(fn) 1171 if err != nil { 1172 return err 1173 } 1174 1175 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1176 if err != nil { 1177 return err 1178 } 1179 1180 err = json.NewEncoder(os.Stdout).Encode(repo) 1181 if err != nil { 1182 return err 1183 } 1184 return nil 1185} 1186 1187func printShardStats(fn string) error { 1188 f, err := os.Open(fn) 1189 if err != nil { 1190 return err 1191 } 1192 1193 iFile, err := index.NewIndexFile(f) 1194 if err != nil { 1195 return err 1196 } 1197 1198 return index.PrintNgramStats(iFile) 1199} 1200 1201func srcLogLevelIsDebug() bool { 1202 lvl := os.Getenv(sglog.EnvLogLevel) 1203 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1204} 1205 1206func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1207 v := os.Getenv(k) 1208 if v == "" { 1209 return defaultVal 1210 } 1211 b, err := strconv.ParseBool(v) 1212 if err != nil { 1213 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1214 } 1215 return b 1216} 1217 1218func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1219 v := os.Getenv(k) 1220 if v == "" { 1221 return defaultVal 1222 } 1223 i, err := strconv.ParseInt(v, 10, 64) 1224 if err != nil { 1225 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1226 } 1227 return i 1228} 1229 1230func getEnvWithDefaultInt(k string, defaultVal int) int { 1231 v := os.Getenv(k) 1232 if v == "" { 1233 return defaultVal 1234 } 1235 i, err := strconv.Atoi(k) 1236 if err != nil { 1237 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1238 } 1239 return i 1240} 1241 1242func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1243 v := os.Getenv(k) 1244 if v == "" { 1245 return defaultVal 1246 } 1247 i, err := strconv.ParseUint(v, 10, 64) 1248 if err != nil { 1249 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1250 } 1251 return i 1252} 1253 1254func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1255 v := os.Getenv(k) 1256 if v == "" { 1257 return defaultVal 1258 } 1259 f, err := strconv.ParseFloat(v, 64) 1260 if err != nil { 1261 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1262 } 1263 return f 1264} 1265 1266func getEnvWithDefaultString(k string, defaultVal string) string { 1267 v := os.Getenv(k) 1268 if v == "" { 1269 return defaultVal 1270 } 1271 return v 1272} 1273 1274func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1275 v := os.Getenv(k) 1276 if v == "" { 1277 return defaultVal 1278 } 1279 1280 d, err := time.ParseDuration(v) 1281 if err != nil { 1282 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1283 } 1284 return d 1285} 1286 1287func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1288 set := map[string]struct{}{} 1289 for _, v := range strings.Split(os.Getenv(k), ",") { 1290 v = strings.TrimSpace(v) 1291 if v != "" { 1292 set[v] = struct{}{} 1293 } 1294 } 1295 return set 1296} 1297 1298func joinStringSet(set map[string]struct{}, sep string) string { 1299 var xs []string 1300 for x := range set { 1301 xs = append(xs, x) 1302 } 1303 1304 return strings.Join(xs, sep) 1305} 1306 1307func setShardsCounter(indexDir string) { 1308 fns, err := filepath.Glob(filepath.Join(indexDir, "*.zoekt")) 1309 if err != nil { 1310 errorLog.Printf("setShardsCounter: %s\n", err) 1311 return 1312 } 1313 metricNumberShards.Set(float64(len(fns))) 1314 1315 compoundFns := make([]string, 0, len(fns)) 1316 for _, fn := range fns { 1317 if strings.HasPrefix(filepath.Base(fn), "compound-") { 1318 compoundFns = append(compoundFns, fn) 1319 } 1320 } 1321 metricNumberCompoundShards.Set(float64(len(compoundFns))) 1322} 1323 1324func rootCmd() *ffcli.Command { 1325 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1326 conf := rootConfig{ 1327 Main: true, 1328 } 1329 conf.registerRootFlags(rootFs) 1330 1331 return &ffcli.Command{ 1332 FlagSet: rootFs, 1333 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1334 Subcommands: []*ffcli.Command{debugCmd()}, 1335 Exec: func(ctx context.Context, args []string) error { 1336 return startServer(conf) 1337 }, 1338 } 1339} 1340 1341type rootConfig struct { 1342 // Main is true if this rootConfig is for our main long running command (the 1343 // indexserver). Debug commands should not set this value. This is used to 1344 // determine if we need to run tmpfriend. 1345 Main bool 1346 1347 root string 1348 interval time.Duration 1349 index string 1350 indexConcurrency int64 1351 listen string 1352 hostname string 1353 cpuFraction float64 1354 1355 // config values related to shard merging 1356 disableShardMerging bool 1357 vacuumInterval time.Duration 1358 mergeInterval time.Duration 1359 targetSize int64 1360 minSize int64 1361 minAgeDays int 1362 1363 // config values related to backoff indexing repos with one or more consecutive failures 1364 backoffDuration time.Duration 1365 maxBackoffDuration time.Duration 1366} 1367 1368func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1369 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1370 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1371 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently") 1372 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", index.DefaultDir), "set index directory to use") 1373 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1374 fs.StringVar(&rc.hostname, "hostname", index.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1375 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1376 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1377 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1378 1379 // flags related to shard merging 1380 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging") 1381 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1382 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1383 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB") 1384 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB") 1385 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1386} 1387 1388func startServer(conf rootConfig) error { 1389 s, err := newServer(conf) 1390 if err != nil { 1391 return err 1392 } 1393 1394 profiler.Init("zoekt-sourcegraph-indexserver") 1395 setShardsCounter(s.IndexDir) 1396 1397 if conf.listen != "" { 1398 1399 mux := http.NewServeMux() 1400 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1401 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1402 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1403 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1404 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1405 }...) 1406 s.addDebugHandlers(mux) 1407 1408 go func() { 1409 debugLog.Printf("serving HTTP on %s", conf.listen) 1410 mux := grpcutil.MultiplexGRPC(newGRPCServer(sglog.Scoped("indexserver"), s), mux) 1411 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1412 }() 1413 1414 // Serve mux on a unix domain socket on a best-effort-basis so that 1415 // webserver can call the endpoints via the shared filesystem. 1416 // 1417 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1418 // on the socket due to permission errors. See 1419 // https://github.com/docker/for-mac/issues/6239 1420 go func() { 1421 serveHTTPOverSocket := func() error { 1422 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1423 // We cannot bind a socket to an existing pathname. 1424 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1425 return fmt.Errorf("error removing socket file: %s", socket) 1426 } 1427 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1428 // unixpacket). 1429 l, err := net.Listen("unix", socket) 1430 if err != nil { 1431 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1432 } 1433 // Indexserver (root) and webserver (Sourcegraph) run with 1434 // different users. Per default, the socket is created with 1435 // permission 755 (root root), which doesn't let webserver write to 1436 // it. 1437 // 1438 // See https://github.com/golang/go/issues/11822 for more context. 1439 if err := os.Chmod(socket, 0o777); err != nil { 1440 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1441 } 1442 debugLog.Printf("serving HTTP on %s", socket) 1443 return http.Serve(l, mux) 1444 } 1445 debugLog.Print(serveHTTPOverSocket()) 1446 }() 1447 } 1448 1449 oc := &ownerChecker{ 1450 Path: filepath.Join(conf.index, "owner.txt"), 1451 Hostname: conf.hostname, 1452 } 1453 go oc.Run() 1454 1455 logger := sglog.Scoped("metricsRegistration") 1456 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1457 1458 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1459 prometheus.DefaultRegisterer.MustRegister(c) 1460 1461 s.Run() 1462 return nil 1463} 1464 1465func newServer(conf rootConfig) (*Server, error) { 1466 logger := sglog.Scoped("server") 1467 1468 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1469 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1470 } 1471 if conf.index == "" { 1472 return nil, fmt.Errorf("must set -index") 1473 } 1474 if conf.root == "" { 1475 return nil, fmt.Errorf("must set -sourcegraph_url") 1476 } 1477 rootURL, err := url.Parse(conf.root) 1478 if err != nil { 1479 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1480 } 1481 1482 rootURL = addDefaultPort(rootURL) 1483 1484 // Tune GOMAXPROCS to match Linux container CPU quota. 1485 _, _ = maxprocs.Set() 1486 1487 // Automatically prepend our own path at the front, to minimize 1488 // required configuration. 1489 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1490 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1491 } 1492 1493 if _, err := os.Stat(conf.index); err != nil { 1494 if err := os.MkdirAll(conf.index, 0o755); err != nil { 1495 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1496 } 1497 } 1498 1499 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1500 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1501 } 1502 1503 if srcLogLevelIsDebug() { 1504 debugLog.SetOutput(os.Stderr) 1505 } 1506 1507 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1508 if len(reposWithSeparateIndexingMetrics) > 0 { 1509 debugLog.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1510 } 1511 1512 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1513 if len(deltaBuildRepositoriesAllowList) > 0 { 1514 debugLog.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1515 } 1516 1517 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1518 if deltaShardNumberFallbackThreshold > 0 { 1519 debugLog.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1520 } else { 1521 debugLog.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1522 } 1523 1524 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1525 if len(reposShouldSkipSymbolsCalculation) > 0 { 1526 debugLog.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1527 } 1528 1529 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1530 if indexingTimeout != defaultIndexingTimeout { 1531 debugLog.Printf("using configured indexing timeout: %s", indexingTimeout) 1532 } 1533 1534 var sg Sourcegraph 1535 if rootURL.IsAbs() { 1536 var batchSize int 1537 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1538 batchSize, err = strconv.Atoi(v) 1539 if err != nil { 1540 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1541 } 1542 } 1543 1544 opts := []SourcegraphClientOption{ 1545 WithBatchSize(batchSize), 1546 } 1547 1548 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1549 client, err := dialGRPCClient(rootURL.Host, logger) 1550 if err != nil { 1551 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1552 } 1553 1554 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...) 1555 1556 } else { 1557 sg = sourcegraphFake{ 1558 RootDir: rootURL.String(), 1559 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1560 } 1561 } 1562 1563 cpuCount := max(int(math.Round(float64(runtime.GOMAXPROCS(0))*(conf.cpuFraction))), 1) 1564 1565 if conf.indexConcurrency < 1 { 1566 conf.indexConcurrency = 1 1567 } else if conf.indexConcurrency > int64(cpuCount) { 1568 conf.indexConcurrency = int64(cpuCount) 1569 } 1570 1571 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1572 1573 return &Server{ 1574 logger: logger, 1575 rootURL: rootURL, 1576 Sourcegraph: sg, 1577 IndexDir: conf.index, 1578 IndexConcurrency: int(conf.indexConcurrency), 1579 Interval: conf.interval, 1580 CPUCount: cpuCount, 1581 queue: *q, 1582 shardMerging: !conf.disableShardMerging, 1583 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1584 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1585 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1586 hostname: conf.hostname, 1587 mergeOpts: mergeOpts{ 1588 vacuumInterval: conf.vacuumInterval, 1589 mergeInterval: conf.mergeInterval, 1590 targetSizeBytes: conf.targetSize * 1024 * 1024, 1591 minSizeBytes: conf.minSize * 1024 * 1024, 1592 minAgeDays: conf.minAgeDays, 1593 }, 1594 timeout: indexingTimeout, 1595 indexSemaphore: make(chan struct{}, int(conf.indexConcurrency)), 1596 }, err 1597} 1598 1599func newGRPCServer(logger sglog.Logger, s *Server, additionalOpts ...grpc.ServerOption) *grpc.Server { 1600 grpcServer := defaults.NewServer(logger, additionalOpts...) 1601 indexserverv1.RegisterSourcegraphIndexserverServiceServer(grpcServer, s) 1602 return grpcServer 1603} 1604 1605// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1606// for the indexed-search-configuration gRPC service. 1607// 1608// The default backoff strategy is modeled after the default settings used by 1609// retryablehttp.DefaultClient. 1610// 1611// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1612// - Unavailable 1613// - Aborted 1614// 1615//go:embed default_grpc_service_configuration.json 1616var defaultGRPCServiceConfigurationJSON string 1617 1618func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1619 return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1620 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1621 return invoker(ctx, method, req, reply, cc, opts...) 1622 } 1623} 1624 1625func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1626 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1627 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1628 return streamer(ctx, desc, cc, method, opts...) 1629 } 1630} 1631 1632// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1633// This can be overridden by providing custom Server/Dial options. 1634const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1635 1636func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (configv1.ZoektConfigurationServiceClient, error) { 1637 metrics := clientMetricsOnce() 1638 1639 // If the service seems to be unavailable, this 1640 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1641 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1642 retryOpts := []retry.CallOption{ 1643 retry.WithMax(5), 1644 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1645 retry.WithCodes(codes.Unavailable), 1646 } 1647 1648 opts := []grpc.DialOption{ 1649 grpc.WithTransportCredentials(insecure.NewCredentials()), 1650 grpc.WithChainStreamInterceptor( 1651 metrics.StreamClientInterceptor(), 1652 messagesize.StreamClientInterceptor, 1653 internalActorStreamInterceptor(), 1654 internalerrs.LoggingStreamClientInterceptor(logger), 1655 internalerrs.PrometheusStreamClientInterceptor, 1656 retry.StreamClientInterceptor(retryOpts...), 1657 ), 1658 grpc.WithChainUnaryInterceptor( 1659 metrics.UnaryClientInterceptor(), 1660 messagesize.UnaryClientInterceptor, 1661 internalActorUnaryInterceptor(), 1662 internalerrs.LoggingUnaryClientInterceptor(logger), 1663 internalerrs.PrometheusUnaryClientInterceptor, 1664 retry.UnaryClientInterceptor(retryOpts...), 1665 ), 1666 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1667 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1668 } 1669 1670 opts = append(opts, additionalOpts...) 1671 1672 // Ensure that the message size options are set last, so they override any other 1673 // client-specific options that tweak the message size. 1674 // 1675 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1676 // take precedence over everything else with a uniform size setting that's easy to reason about. 1677 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1678 1679 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1680 // This is done lazily, so we can provide the client to use regardless of 1681 // whether we enabled gRPC or not initially. 1682 cc, err := grpc.Dial(addr, opts...) 1683 if err != nil { 1684 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1685 } 1686 1687 client := configv1.NewZoektConfigurationServiceClient(cc) 1688 return client, nil 1689} 1690 1691// addDefaultPort adds a default port to a URL if one is not specified. 1692// 1693// If the URL scheme is "http" and no port is specified, "80" is used. 1694// If the scheme is "https", "443" is used. 1695// 1696// The original URL is not mutated. A copy is modified and returned. 1697func addDefaultPort(original *url.URL) *url.URL { 1698 if original == nil { 1699 return nil // don't panic 1700 } 1701 1702 if !original.IsAbs() { 1703 return original // don't do anything if the URL doesn't look like a remote URL 1704 } 1705 1706 if original.Scheme == "http" && original.Port() == "" { 1707 u := cloneURL(original) 1708 u.Host = net.JoinHostPort(u.Host, "80") 1709 return u 1710 } 1711 1712 if original.Scheme == "https" && original.Port() == "" { 1713 u := cloneURL(original) 1714 u.Host = net.JoinHostPort(u.Host, "443") 1715 return u 1716 } 1717 1718 return original 1719} 1720 1721// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1722// This is copied from net/http/clone.go 1723func cloneURL(u *url.URL) *url.URL { 1724 if u == nil { 1725 return nil 1726 } 1727 u2 := new(url.URL) 1728 *u2 = *u 1729 if u.User != nil { 1730 u2.User = new(url.Userinfo) 1731 *u2.User = *u.User 1732 } 1733 return u2 1734} 1735 1736func main() { 1737 liblog := sglog.Init(sglog.Resource{ 1738 Name: "zoekt-indexserver", 1739 Version: index.Version, 1740 InstanceID: index.HostnameBestEffort(), 1741 }) 1742 defer liblog.Sync() 1743 1744 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1745 log.Fatal(err) 1746 } 1747} 1748 1749// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1750// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1751// 1752// An error is returned of the provided environment variables fails to parse as a boolean. 1753func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1754 for _, envVar := range envVarNames { 1755 v := os.Getenv(envVar) 1756 if v == "" { 1757 continue 1758 } 1759 1760 b, err := strconv.ParseBool(v) 1761 if err != nil { 1762 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1763 } 1764 1765 return b, nil 1766 } 1767 1768 return defaultBool, nil 1769}