fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Licensed under the Apache License, Version 2.0 (the "License"); 2// you may not use this file except in compliance with the License. 3// You may obtain a copy of the License at 4// 5// http://www.apache.org/licenses/LICENSE-2.0 6// 7// Unless required by applicable law or agreed to in writing, software 8// distributed under the License is distributed on an "AS IS" BASIS, 9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 10// See the License for the specific language governing permissions and 11// limitations under the License. 12 13// Command zoekt-sourcegraph-indexserver periodically reindexes repositories 14// from a Sourcegraph instance. It uses a "pull-based" design, where it periodically 15// reaches out to the Sourcegraph instance for the list of repositories to reindex. 16package main 17 18import ( 19 "bytes" 20 "context" 21 _ "embed" 22 "encoding/json" 23 "errors" 24 "flag" 25 "fmt" 26 "html/template" 27 "io" 28 "io/fs" 29 "log" 30 "math" 31 "math/rand" 32 "net" 33 "net/http" 34 "net/url" 35 "os" 36 "os/exec" 37 "os/signal" 38 "path/filepath" 39 "runtime" 40 "slices" 41 "sort" 42 "strconv" 43 "strings" 44 "sync" 45 "text/tabwriter" 46 "time" 47 48 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 49 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 50 "github.com/peterbourgon/ff/v3/ffcli" 51 "github.com/prometheus/client_golang/prometheus" 52 "github.com/prometheus/client_golang/prometheus/promauto" 53 sglog "github.com/sourcegraph/log" 54 "github.com/sourcegraph/mountinfo" 55 56 "github.com/sourcegraph/zoekt" 57 configv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/sourcegraph/zoekt/configuration/v1" 58 indexserverv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/zoekt/indexserver/v1" 59 "github.com/sourcegraph/zoekt/grpc/defaults" 60 "github.com/sourcegraph/zoekt/grpc/grpcutil" 61 "github.com/sourcegraph/zoekt/grpc/internalerrs" 62 "github.com/sourcegraph/zoekt/grpc/messagesize" 63 "github.com/sourcegraph/zoekt/index" 64 "github.com/sourcegraph/zoekt/internal/debugserver" 65 "github.com/sourcegraph/zoekt/internal/profiler" 66 "github.com/sourcegraph/zoekt/internal/tenant" 67 68 "go.uber.org/automaxprocs/maxprocs" 69 "go.uber.org/multierr" 70 "golang.org/x/net/trace" 71 "golang.org/x/sys/unix" 72 "google.golang.org/grpc" 73 "google.golang.org/grpc/codes" 74 "google.golang.org/grpc/credentials/insecure" 75 "google.golang.org/grpc/metadata" 76) 77 78var ( 79 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 80 Name: "resolve_revisions_seconds", 81 Help: "A histogram of latencies for resolving all repository revisions.", 82 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 83 }) 84 85 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 86 Name: "resolve_revision_seconds", 87 Help: "A histogram of latencies for resolving a repository revision.", 88 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 89 }, []string{"success"}) // success=true|false 90 91 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 92 Name: "get_index_options_total", 93 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 94 }) 95 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 96 Name: "get_index_options_error_total", 97 Help: "The total number of times we failed to get index options for a repository.", 98 }) 99 100 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 101 Name: "index_repo_seconds", 102 Help: "A histogram of latencies for indexing a repository.", 103 Buckets: prometheus.ExponentialBucketsRange( 104 (100 * time.Millisecond).Seconds(), 105 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 106 20), 107 }, []string{ 108 "state", // state is an indexState 109 "name", // name of the repository that was indexed 110 }) 111 112 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{ 113 Name: "index_indexing_delay_seconds", 114 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.", 115 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days 116 }, []string{ 117 "state", // state is an indexState 118 "name", // the name of the repository that was indexed 119 }) 120 121 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 122 Name: "index_fetch_seconds", 123 Help: "A histogram of latencies for fetching a repository.", 124 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 125 }, []string{ 126 "success", // true|false 127 "name", // the name of the repository that the commits were fetched from 128 }) 129 130 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 131 Name: "index_incremental_index_state", 132 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 133 }, []string{"state"}) // state is index.IndexState 134 135 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 136 Name: "index_num_indexed", 137 Help: "Number of indexed repos by code host", 138 }) 139 140 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 141 Name: "index_failing_total", 142 Help: "Counts failures to index (indexing activity, should be used with rate())", 143 }) 144 145 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 146 Name: "index_indexing_total", 147 Help: "Counts indexings (indexing activity, should be used with rate())", 148 }) 149 150 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 151 Name: "index_num_stopped_tracking_total", 152 Help: "Counts the number of repos we stopped tracking.", 153 }) 154 155 // clientMetricsOnce returns a singleton instance of the client metrics 156 // that are shared across all gRPC clients that this process creates. 157 // 158 // This function panics if the metrics cannot be registered with the default 159 // Prometheus registry. 160 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics { 161 clientMetrics := grpcprom.NewClientMetrics( 162 grpcprom.WithClientCounterOptions(), 163 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 164 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 165 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 166 ) 167 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 168 return clientMetrics 169 }) 170) 171 172// 1 MB; match https://sourcegraph.sourcegraph.com/r/github.com/sourcegraph/sourcegraph/-/blob/cmd/searcher/internal/search/store.go?L32 173const MaxFileSize = 1 << 20 174 175// set of repositories that we want to capture separate indexing metrics for 176var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 177 178type indexState string 179 180const ( 181 indexStateFail indexState = "fail" 182 indexStateSuccess indexState = "success" 183 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 184 indexStateNoop indexState = "noop" // We didn't need to update index 185 indexStateEmpty indexState = "empty" // index is empty (empty repo) 186) 187 188// Server is the main functionality of zoekt-sourcegraph-indexserver. It 189// exists to conveniently use all the options passed in via func main. 190type Server struct { 191 logger sglog.Logger 192 193 Sourcegraph Sourcegraph 194 195 // rootURL is the root URL of the Sourcegraph instance. 196 rootURL *url.URL 197 198 BatchSize int 199 200 // IndexDir is the index directory to use. 201 IndexDir string 202 203 // IndexConcurrency is the number of repositories we index at once. 204 IndexConcurrency int 205 206 // indexSemaphore limits the number of concurrent index operations 207 indexSemaphore chan struct{} 208 209 // Interval is how often we sync with Sourcegraph. 210 Interval time.Duration 211 212 // CPUCount is the number of CPUs to use for indexing shards. 213 CPUCount int 214 215 queue Queue 216 217 // muIndexDir protects the index directory from concurrent access. 218 muIndexDir indexMutex 219 220 // If true, shard merging is enabled. 221 shardMerging bool 222 223 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 224 // use delta-builds for instead of normal builds 225 deltaBuildRepositoriesAllowList map[string]struct{} 226 227 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 228 // before attempting a delta build. 229 deltaShardNumberFallbackThreshold uint64 230 231 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 232 // we skip calculating symbols metadata for during builds 233 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 234 235 hostname string 236 237 mergeOpts mergeOpts 238 239 // timeout defines how long the index server waits before killing an indexing job. 240 timeout time.Duration 241 242 indexserverv1.UnimplementedSourcegraphIndexserverServiceServer 243} 244 245var ( 246 debugLog = log.New(io.Discard, "[DEBUG] ", log.LstdFlags) 247 infoLog = log.New(os.Stderr, "[INFO] ", log.LstdFlags) 248 errorLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags) 249) 250 251// our index commands should output something every 100mb they process. 252// 253// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 254// time." famous last words. A client was indexing a monorepo with 42 255// cores... 5m was not enough. 256const noOutputTimeout = 30 * time.Minute 257 258func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 259 out := &synchronizedBuffer{} 260 cmd.Stdout = out 261 cmd.Stderr = out 262 263 tr.LazyPrintf("%s", cmd.Args) 264 265 defer func() { 266 if err != nil { 267 outS := out.String() 268 tr.LazyPrintf("failed: %v", err) 269 tr.LazyPrintf("output: %s", out) 270 tr.SetError() 271 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 272 } 273 }() 274 275 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 276 277 if err := cmd.Start(); err != nil { 278 return err 279 } 280 281 errC := make(chan error) 282 go func() { 283 errC <- cmd.Wait() 284 }() 285 286 // This channel is set after we have sent sigquit. It allows us to follow up 287 // with a sigkill if the process doesn't quit after sigquit. 288 kill := make(<-chan time.Time) 289 290 lastLen := 0 291 for { 292 select { 293 case <-time.After(noOutputTimeout): 294 // Periodically check if we have had output. If not kill the process. 295 if out.Len() != lastLen { 296 lastLen = out.Len() 297 infoLog.Printf("still running %s", cmd.Args) 298 } else { 299 // Send quit (C-\) first so we get a stack dump. 300 infoLog.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 301 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 302 errorLog.Println("quit failed:", err) 303 } 304 305 // send sigkill if still running in 10s 306 kill = time.After(10 * time.Second) 307 } 308 309 case <-kill: 310 infoLog.Printf("still running, killing %s", cmd.Args) 311 if err := cmd.Process.Kill(); err != nil { 312 errorLog.Println("kill failed:", err) 313 } 314 315 case err := <-errC: 316 if err != nil { 317 return err 318 } 319 320 tr.LazyPrintf("success") 321 return nil 322 } 323 } 324} 325 326// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 327// monitor the buffer while it is being written to. 328type synchronizedBuffer struct { 329 mu sync.Mutex 330 b bytes.Buffer 331} 332 333func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 334 sb.mu.Lock() 335 defer sb.mu.Unlock() 336 return sb.b.Write(p) 337} 338 339func (sb *synchronizedBuffer) Len() int { 340 sb.mu.Lock() 341 defer sb.mu.Unlock() 342 return sb.b.Len() 343} 344 345func (sb *synchronizedBuffer) String() string { 346 sb.mu.Lock() 347 defer sb.mu.Unlock() 348 return sb.b.String() 349} 350 351// pauseFileName if present in IndexDir will stop index jobs from 352// running. This is to make it possible to experiment with the content of the 353// IndexDir without the indexserver writing to it. 354const ( 355 pauseFileName = "PAUSE" 356 pauseEnvVar = "SRC_PAUSE_INDEXING" 357) 358 359// isIndexingPaused checks if indexing should be paused based on either: 360// 1. The presence of a PAUSE file in the index directory 361// 2. The SRC_PAUSE_INDEXING environment variable being set to true 362func isIndexingPaused(indexDir string) (bool, string) { 363 // Check for PAUSE file first 364 if b, err := os.ReadFile(filepath.Join(indexDir, pauseFileName)); err == nil { 365 return true, fmt.Sprintf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 366 } 367 368 // Then check environment variable 369 if getEnvWithDefaultBool(pauseEnvVar, false) { 370 return true, fmt.Sprintf("indexserver paused via %s environment variable", pauseEnvVar) 371 } 372 373 return false, "" 374} 375 376// Run the sync loop. This blocks forever. 377func (s *Server) Run() { 378 removeIncompleteShards(s.IndexDir) 379 380 // Start a goroutine which updates the queue with commits to index. 381 go func() { 382 // We update the list of indexed repos every Interval. To speed up manual 383 // testing we also listen for SIGUSR1 to trigger updates. 384 // "pkill -SIGUSR1 zoekt-sourcegra" 385 for range jitterTicker(s.Interval, unix.SIGUSR1) { 386 if paused, msg := isIndexingPaused(s.IndexDir); paused { 387 infoLog.Printf("%s", msg) 388 continue 389 } 390 391 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 392 if err != nil { 393 errorLog.Printf("error listing repos: %s", err) 394 continue 395 } 396 397 debugLog.Printf("updating index queue with %d repositories", len(repos.IDs)) 398 399 // Stop indexing repos we don't need to track anymore 400 removed := s.queue.MaybeRemoveMissing(repos.IDs) 401 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 402 if len(removed) > 0 { 403 infoLog.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 404 } 405 406 cleanupDone := make(chan struct{}) 407 go func() { 408 defer close(cleanupDone) 409 s.muIndexDir.Global(func() { 410 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 411 }) 412 }() 413 414 repos.IterateIndexOptions(s.queue.AddOrUpdate) 415 416 // IterateIndexOptions will only iterate over repositories that have 417 // changed since we last called list. However, we want to add all IDs 418 // back onto the queue just to check that what is on disk is still 419 // correct. This will use the last IndexOptions we stored in the 420 // queue. The repositories not on the queue (missing) need a forced 421 // fetch of IndexOptions. 422 missing := s.queue.Bump(repos.IDs) 423 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 424 425 setShardsCounter(s.IndexDir) 426 427 <-cleanupDone 428 } 429 }() 430 431 go func() { 432 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 433 if s.shardMerging { 434 s.vacuum() 435 } 436 } 437 }() 438 439 go func() { 440 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 441 if s.shardMerging { 442 s.doMerge() 443 } 444 } 445 }() 446 447 for range s.IndexConcurrency { 448 go s.processQueue() 449 } 450 451 // block forever 452 select {} 453} 454 455// formatList returns a comma-separated list of the first min(len(v), m) items. 456func formatListUint32(v []uint32, m int) string { 457 if len(v) < m { 458 m = len(v) 459 } 460 461 sb := strings.Builder{} 462 for i := range m { 463 fmt.Fprintf(&sb, "%d, ", v[i]) 464 } 465 466 if len(v) > m { 467 sb.WriteString("...") 468 } 469 470 return strings.TrimRight(sb.String(), ", ") 471} 472 473func (s *Server) processQueue() { 474 for { 475 if paused, _ := isIndexingPaused(s.IndexDir); paused { 476 time.Sleep(time.Second) 477 continue 478 } 479 480 item, ok := s.queue.Pop() 481 if !ok { 482 time.Sleep(time.Second) 483 continue 484 } 485 486 opts := item.Opts 487 args := s.indexArgs(opts) 488 489 ran := s.muIndexDir.With(opts.Name, func() { 490 // only record time taken once we hold the lock. This avoids us 491 // recording time taken while merging/cleanup runs. 492 start := time.Now() 493 494 state, err := s.index(context.Background(), args) 495 496 elapsed := time.Since(start) 497 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 498 499 indexDelay := time.Since(item.DateAddedToQueue) 500 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds()) 501 502 if err != nil { 503 errorLog.Printf("error indexing %s: %s", args.String(), err) 504 } 505 506 switch state { 507 case indexStateSuccess: 508 var branches []string 509 for _, b := range args.Branches { 510 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 511 } 512 s.logger.Info("updated index", 513 sglog.Int("tenant", args.TenantID), 514 sglog.String("repo", args.Name), 515 sglog.Uint32("id", args.RepoID), 516 sglog.Strings("branches", branches), 517 sglog.Duration("duration", elapsed), 518 sglog.Duration("index_delay", indexDelay), 519 ) 520 case indexStateSuccessMeta: 521 infoLog.Printf("updated meta %s in %v", args.String(), elapsed) 522 } 523 s.queue.SetIndexed(opts, state) 524 }) 525 526 if !ran { 527 // Someone else is processing the repository. We can just skip this job 528 // since the repository will be added back to the queue and we will 529 // converge to the correct behaviour. 530 debugLog.Printf("index job for repository already running: %s", args) 531 continue 532 } 533 } 534} 535 536// repoNameForMetric returns a normalized version of the given repository name that is 537// suitable for use with Prometheus metrics. 538func repoNameForMetric(repo string) string { 539 // Check to see if we want to be able to capture separate indexing metrics for this repository. 540 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 541 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 542 return repo 543 } 544 545 return "" 546} 547 548func batched(slice []uint32, size int) <-chan []uint32 { 549 c := make(chan []uint32) 550 go func() { 551 for len(slice) > 0 { 552 if size > len(slice) { 553 size = len(slice) 554 } 555 c <- slice[:size] 556 slice = slice[size:] 557 } 558 close(c) 559 }() 560 return c 561} 562 563// jitterTicker returns a ticker which ticks with a jitter. Each tick is 564// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 565// 566// sig is a list of signals which also cause the ticker to fire. This is a 567// convenience to allow manually triggering of the ticker. 568func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 569 ticker := make(chan struct{}) 570 571 go func() { 572 for { 573 ticker <- struct{}{} 574 ns := int64(d) 575 jitter := rand.Int63n(ns) 576 time.Sleep(time.Duration(ns/2 + jitter)) 577 } 578 }() 579 580 go func() { 581 if len(sig) == 0 { 582 return 583 } 584 585 c := make(chan os.Signal, 1) 586 signal.Notify(c, sig...) 587 for range c { 588 ticker <- struct{}{} 589 } 590 }() 591 592 return ticker 593} 594 595// Index starts an index job for repo name at commit. 596func (s *Server) index(ctx context.Context, args *indexArgs) (state indexState, err error) { 597 tr := trace.New("index", args.Name) 598 tr.SetMaxEvents(30) // Ensure we capture all indexing events 599 600 defer func() { 601 if err != nil { 602 tr.SetError() 603 tr.LazyPrintf("error: %v", err) 604 state = indexStateFail 605 metricFailingTotal.Inc() 606 } 607 tr.LazyPrintf("state: %s", state) 608 tr.Finish() 609 }() 610 611 // Sourcegraph should always provide a tenant ID. 612 if args.TenantID < 1 { 613 return indexStateFail, tenant.ErrMissingTenant 614 } 615 616 tr.LazyPrintf("branches: %v", args.Branches) 617 618 if len(args.Branches) == 0 { 619 return indexStateEmpty, createEmptyShard(args) 620 } 621 622 repositoryName := args.Name 623 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 624 tr.LazyPrintf("marking this repository for delta build") 625 args.UseDelta = true 626 } 627 628 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 629 630 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 631 tr.LazyPrintf("skipping symbols calculation") 632 args.Symbols = false 633 } 634 635 reason := "forced" 636 637 if args.Incremental { 638 bo := args.BuildOptions() 639 bo.SetDefaults() 640 incrementalState, fn := bo.IndexState() 641 reason = string(incrementalState) 642 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 643 644 switch incrementalState { 645 case index.IndexStateEqual: 646 debugLog.Printf("%s index already up to date. Shard=%s", args.String(), fn) 647 return indexStateNoop, nil 648 649 case index.IndexStateMeta: 650 infoLog.Printf("updating index.meta %s", args.String()) 651 652 // TODO(stefan) handle mergeMeta for tenant id. 653 if err := mergeMeta(bo); err != nil { 654 errorLog.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 655 } else { 656 return indexStateSuccessMeta, nil 657 } 658 659 case index.IndexStateCorrupt: 660 infoLog.Printf("falling back to full update: corrupt index: %s", args.String()) 661 } 662 } 663 664 infoLog.Printf("updating index %s reason=%s", args.String(), reason) 665 666 metricIndexingTotal.Inc() 667 c := gitIndexConfig{ 668 runCmd: func(cmd *exec.Cmd) error { 669 return s.loggedRun(tr, cmd) 670 }, 671 672 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 673 return args.BuildOptions().FindRepositoryMetadata() 674 }, 675 timeout: s.timeout, 676 } 677 678 err = gitIndex(ctx, c, args, s.Sourcegraph, s.logger) 679 if err != nil { 680 return indexStateFail, err 681 } 682 683 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 684 s.logger.Error("failed to update index status", 685 sglog.String("repo", args.Name), 686 sglog.Uint32("id", args.RepoID), 687 sglogBranches("branches", args.Branches), 688 sglog.Error(err), 689 ) 690 } 691 692 return indexStateSuccess, nil 693} 694 695// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 696// it can update the zoekt_repos table. 697func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 698 // We need to read from disk for IndexTime. 699 _, metadata, ok, err := c.findRepositoryMetadata(args) 700 if err != nil { 701 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 702 } 703 if !ok { 704 return errors.New("failed to find metadata for new/updated index") 705 } 706 707 status := []indexStatus{{ 708 RepoID: args.RepoID, 709 Branches: args.Branches, 710 IndexTimeUnix: metadata.IndexTime.Unix(), 711 }} 712 if err := sg.UpdateIndexStatus(status); err != nil { 713 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 714 } 715 716 return nil 717} 718 719func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 720 ss := make([]string, len(branches)) 721 for i, b := range branches { 722 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 723 } 724 return sglog.Strings(key, ss) 725} 726 727func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 728 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0)) 729 return &indexArgs{ 730 IndexOptions: opts, 731 IndexDir: s.IndexDir, 732 Parallelism: parallelism, 733 Incremental: true, 734 ShardMerging: s.shardMerging, 735 } 736} 737 738// parallelism consults both the server flags and index options to determine the number 739// of shards to index in parallel. If the CPUCount index option is provided, it always 740// overrides the server flag. 741func (s *Server) parallelism(opts IndexOptions, maxProcs int) int { 742 var parallelism int 743 if opts.ShardConcurrency > 0 { 744 parallelism = int(opts.ShardConcurrency) 745 } else { 746 parallelism = s.CPUCount 747 } 748 749 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs 750 if parallelism > 4*maxProcs { 751 parallelism = 4 * maxProcs 752 } 753 754 // If index concurrency is set, then divide the parallelism by the number of 755 // repos we're indexing in parallel 756 if s.IndexConcurrency > 1 { 757 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency))) 758 } 759 760 return parallelism 761} 762 763func createEmptyShard(args *indexArgs) error { 764 bo := args.BuildOptions() 765 bo.SetDefaults() 766 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 767 768 if args.Incremental && bo.IncrementalSkipIndexing() { 769 return nil 770 } 771 772 builder, err := index.NewBuilder(*bo) 773 if err != nil { 774 return err 775 } 776 return builder.Finish() 777} 778 779// addDebugHandlers adds handlers specific to indexserver. 780func (s *Server) addDebugHandlers(mux *http.ServeMux) { 781 // Sourcegraph's site admin view requires indexserver to serve it's admin view 782 // on "/". 783 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 784 785 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 786 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 787 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 788 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 789 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 790 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 791} 792 793func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 794 if r.Method != "GET" { 795 w.Header().Set("Allow", "GET") 796 w.WriteHeader(http.StatusMethodNotAllowed) 797 return 798 } 799 800 response := struct { 801 Hostname string 802 }{ 803 Hostname: s.hostname, 804 } 805 806 b, err := json.Marshal(response) 807 if err != nil { 808 http.Error(w, err.Error(), http.StatusInternalServerError) 809 return 810 } 811 812 w.Header().Set("Content-Type", "application/json; charset=utf-8") 813 w.Write(b) 814} 815 816var rootTmpl = template.Must(template.New("name").Parse(` 817<html> 818 <body> 819 <a href="debug">Debug</a><br /> 820 <a href="debug/requests">Traces</a><br /> 821 {{.IndexMsg}}<br /> 822 <br /> 823 <h3>Reindex</h3> 824 {{if .Repos}} 825 <a href="?show_repos=false">hide repos</a><br /> 826 <table style="margin-top: 20px"> 827 <th style="text-align:left">Name</th> 828 <th style="text-align:left">ID (click to reindex)</th> 829 {{range .Repos}} 830 <tr> 831 <td>{{.Name}}</td> 832 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 833 </tr> 834 {{end}} 835 </table> 836 {{else}} 837 <a href="?show_repos=true">show repos</a><br /> 838 {{end}} 839 </body> 840</html> 841`)) 842 843func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 844 if r.Method != "GET" { 845 w.Header().Set("Allow", "GET") 846 w.WriteHeader(http.StatusMethodNotAllowed) 847 return 848 } 849 850 values := r.URL.Query() 851 852 // ?id= 853 indexMsg := "" 854 if v := values.Get("id"); v != "" { 855 id, err := strconv.Atoi(v) 856 if err != nil { 857 http.Error(w, err.Error(), http.StatusBadRequest) 858 return 859 } 860 indexMsg, _ = s.forceIndex(r.Context(), uint32(id)) 861 } 862 863 // ?show_repos= 864 showRepos := false 865 if v := values.Get("show_repos"); v != "" { 866 showRepos, _ = strconv.ParseBool(v) 867 } 868 869 type Repo struct { 870 ID uint32 871 Name string 872 } 873 var data struct { 874 Repos []Repo 875 IndexMsg string 876 } 877 878 data.IndexMsg = indexMsg 879 880 if showRepos { 881 s.queue.Iterate(func(opts *IndexOptions) { 882 data.Repos = append(data.Repos, Repo{ 883 ID: opts.RepoID, 884 Name: opts.Name, 885 }) 886 }) 887 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 888 } 889 890 _ = rootTmpl.Execute(w, data) 891} 892 893// handleReindex triggers a reindex asynocronously. If a reindex was triggered 894// the request returns with status 202. The caller can infer the new state of 895// the index by calling List. 896func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 897 if r.Method != http.MethodPost { 898 w.Header().Set("Allow", http.MethodPost) 899 w.WriteHeader(http.StatusMethodNotAllowed) 900 return 901 } 902 903 err := r.ParseForm() 904 if err != nil { 905 http.Error(w, err.Error(), http.StatusBadRequest) 906 return 907 } 908 909 id, err := strconv.Atoi(r.Form.Get("repo")) 910 if err != nil { 911 http.Error(w, err.Error(), http.StatusBadRequest) 912 return 913 } 914 915 go func() { s.forceIndex(r.Context(), uint32(id)) }() 916 917 // 202 Accepted 918 w.WriteHeader(http.StatusAccepted) 919} 920 921func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 922 withIndexed := true 923 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 924 withIndexed = b 925 } 926 927 var indexed []uint32 928 if withIndexed { 929 indexed = listIndexed(s.IndexDir) 930 } 931 932 repos, err := s.Sourcegraph.List(r.Context(), indexed) 933 if err != nil { 934 http.Error(w, err.Error(), http.StatusInternalServerError) 935 return 936 } 937 938 bw := bytes.Buffer{} 939 940 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 941 942 _, err = fmt.Fprintf(tw, "ID\tName\n") 943 if err != nil { 944 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 945 return 946 } 947 948 s.queue.mu.Lock() 949 name := "" 950 for _, id := range repos.IDs { 951 if item := s.queue.get(id); item != nil { 952 name = item.opts.Name 953 } else { 954 name = "" 955 } 956 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 957 if err != nil { 958 debugLog.Printf("handleDebugList: %s\n", err.Error()) 959 } 960 } 961 s.queue.mu.Unlock() 962 963 if err != nil { 964 http.Error(w, err.Error(), http.StatusInternalServerError) 965 return 966 } 967 968 err = tw.Flush() 969 if err != nil { 970 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 971 return 972 } 973 974 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 975 976 if _, err := io.Copy(w, &bw); err != nil { 977 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 978 return 979 } 980} 981 982// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 983// can run this command during periods of low usage (evenings, weekends) to 984// trigger an initial merge run. In the steady-state, merges happen rarely, even 985// on busy instances, and users can rely on automatic merging instead. 986func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 987 // A merge operation can take very long, depending on the number merges and the 988 // target size of the compound shards. We run the merge in the background and 989 // return immediately to the user. 990 // 991 // We track the status of the merge with metricShardMergingRunning. 992 go func() { 993 s.doMerge() 994 }() 995 _, _ = w.Write([]byte("merging enqueued\n")) 996} 997 998func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 999 indexed := listIndexed(s.IndexDir) 1000 1001 bw := bytes.Buffer{} 1002 1003 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 1004 1005 _, err := fmt.Fprintf(tw, "ID\tName\n") 1006 if err != nil { 1007 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 1008 return 1009 } 1010 1011 s.queue.mu.Lock() 1012 name := "" 1013 for _, id := range indexed { 1014 if item := s.queue.get(id); item != nil { 1015 name = item.opts.Name 1016 } else { 1017 name = "" 1018 } 1019 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 1020 if err != nil { 1021 debugLog.Printf("handleDebugIndexed: %s\n", err.Error()) 1022 } 1023 } 1024 s.queue.mu.Unlock() 1025 1026 if err != nil { 1027 http.Error(w, err.Error(), http.StatusInternalServerError) 1028 return 1029 } 1030 1031 err = tw.Flush() 1032 if err != nil { 1033 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 1034 return 1035 } 1036 1037 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 1038 1039 if _, err := io.Copy(w, &bw); err != nil { 1040 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 1041 return 1042 } 1043} 1044 1045// forceIndex will run the index job for repo name now. It will return always 1046// return a string explaining what it did, even if it failed. 1047func (s *Server) forceIndex(ctx context.Context, id uint32) (string, error) { 1048 var opts IndexOptions 1049 var err error 1050 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 1051 opts = o 1052 }, func(_ uint32, e error) { 1053 err = e 1054 }, id) 1055 if err != nil { 1056 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 1057 } 1058 1059 args := s.indexArgs(opts) 1060 args.Incremental = false // force re-index 1061 1062 var state indexState 1063 ran := s.muIndexDir.With(opts.Name, func() { 1064 state, err = s.index(ctx, args) 1065 }) 1066 if !ran { 1067 return fmt.Sprintf("index job for repository already running: %s", args), nil 1068 } 1069 if err != nil { 1070 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 1071 } 1072 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 1073} 1074 1075// DeleteAllData deletes all shards in the index and trash dir belonging to the 1076// tenant associated with the request. The deletion is best-effort, which means 1077// we will delete as much as possible. If no error is returned, the caller can 1078// be certain that all data has been deleted. 1079func (s *Server) DeleteAllData(ctx context.Context, _ *indexserverv1.DeleteAllDataRequest) (*indexserverv1.DeleteAllDataResponse, error) { 1080 tnt, err := tenant.FromContext(ctx) 1081 if err != nil { 1082 return nil, err 1083 } 1084 s.logger.Warn("DeleteAllData", sglog.Int("tenant_id", tnt.ID())) 1085 1086 var merr error 1087 s.muIndexDir.Global(func() { 1088 // First, explode all compound shards that have repos from the tenant in 1089 // question. Because we hold the global lock, we can be sure that no new 1090 // merges start while we do this. 1091 if err := s.explodeTenantCompoundShards(ctx, func(path string) error { 1092 // We call explode in a separate process to protect indexserver. 1093 cmd := defaultExplodeCmd(path) 1094 1095 stdoutBuf := &bytes.Buffer{} 1096 stderrBuf := &bytes.Buffer{} 1097 cmd.Stdout = stdoutBuf 1098 cmd.Stderr = stderrBuf 1099 1100 err := cmd.Run() 1101 if err != nil { 1102 errorLog.Printf("explode failed: %v (stderr: %s)", err, stderrBuf.String()) 1103 return err 1104 } 1105 1106 infoLog.Printf("exploded shard: %s", stdoutBuf.String()) 1107 1108 return nil 1109 }); err != nil { 1110 merr = multierr.Append(merr, err) 1111 } 1112 1113 // Invariant: all shards from the tenant are simple shards. 1114 1115 if err := purgeTenantShards(ctx, s.IndexDir); err != nil { 1116 merr = multierr.Append(merr, err) 1117 } 1118 if err := purgeTenantShards(ctx, filepath.Join(s.IndexDir, ".trash")); err != nil { 1119 merr = multierr.Append(merr, err) 1120 } 1121 }) 1122 1123 return &indexserverv1.DeleteAllDataResponse{}, merr 1124} 1125 1126func listIndexed(indexDir string) []uint32 { 1127 index := getShards(indexDir) 1128 metricNumIndexed.Set(float64(len(index))) 1129 repoIDs := make([]uint32, 0, len(index)) 1130 for id := range index { 1131 repoIDs = append(repoIDs, id) 1132 } 1133 slices.Sort(repoIDs) 1134 return repoIDs 1135} 1136 1137// setupTmpDir sets up a temporary directory on the same volume as the 1138// indexes. 1139// 1140// If main is true we will delete older temp directories left around. main is 1141// false when this is a debug command. 1142func setupTmpDir(logger sglog.Logger, main bool, index string) error { 1143 // change the target tmp directory depending on if it's our main daemon or a 1144 // debug sub command. 1145 dir := ".indexserver.debug.tmp" 1146 if main { 1147 dir = ".indexserver.tmp" 1148 } 1149 1150 tmpRoot := filepath.Join(index, dir) 1151 1152 if main { 1153 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1154 err := os.RemoveAll(tmpRoot) 1155 if err != nil { 1156 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1157 } 1158 } 1159 1160 if err := os.MkdirAll(tmpRoot, 0o755); err != nil { 1161 return err 1162 } 1163 1164 return os.Setenv("TMPDIR", tmpRoot) 1165} 1166 1167func printMetaData(fn string) error { 1168 repo, indexMeta, err := index.ReadMetadataPath(fn) 1169 if err != nil { 1170 return err 1171 } 1172 1173 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1174 if err != nil { 1175 return err 1176 } 1177 1178 err = json.NewEncoder(os.Stdout).Encode(repo) 1179 if err != nil { 1180 return err 1181 } 1182 return nil 1183} 1184 1185func printShardStats(fn string) error { 1186 f, err := os.Open(fn) 1187 if err != nil { 1188 return err 1189 } 1190 1191 iFile, err := index.NewIndexFile(f) 1192 if err != nil { 1193 return err 1194 } 1195 1196 return index.PrintNgramStats(iFile) 1197} 1198 1199func srcLogLevelIsDebug() bool { 1200 lvl := os.Getenv(sglog.EnvLogLevel) 1201 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1202} 1203 1204func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1205 v := os.Getenv(k) 1206 if v == "" { 1207 return defaultVal 1208 } 1209 b, err := strconv.ParseBool(v) 1210 if err != nil { 1211 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1212 } 1213 return b 1214} 1215 1216func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1217 v := os.Getenv(k) 1218 if v == "" { 1219 return defaultVal 1220 } 1221 i, err := strconv.ParseInt(v, 10, 64) 1222 if err != nil { 1223 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1224 } 1225 return i 1226} 1227 1228func getEnvWithDefaultInt(k string, defaultVal int) int { 1229 v := os.Getenv(k) 1230 if v == "" { 1231 return defaultVal 1232 } 1233 i, err := strconv.Atoi(k) 1234 if err != nil { 1235 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1236 } 1237 return i 1238} 1239 1240func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1241 v := os.Getenv(k) 1242 if v == "" { 1243 return defaultVal 1244 } 1245 i, err := strconv.ParseUint(v, 10, 64) 1246 if err != nil { 1247 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1248 } 1249 return i 1250} 1251 1252func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1253 v := os.Getenv(k) 1254 if v == "" { 1255 return defaultVal 1256 } 1257 f, err := strconv.ParseFloat(v, 64) 1258 if err != nil { 1259 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1260 } 1261 return f 1262} 1263 1264func getEnvWithDefaultString(k string, defaultVal string) string { 1265 v := os.Getenv(k) 1266 if v == "" { 1267 return defaultVal 1268 } 1269 return v 1270} 1271 1272func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1273 v := os.Getenv(k) 1274 if v == "" { 1275 return defaultVal 1276 } 1277 1278 d, err := time.ParseDuration(v) 1279 if err != nil { 1280 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1281 } 1282 return d 1283} 1284 1285func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1286 set := map[string]struct{}{} 1287 for _, v := range strings.Split(os.Getenv(k), ",") { 1288 v = strings.TrimSpace(v) 1289 if v != "" { 1290 set[v] = struct{}{} 1291 } 1292 } 1293 return set 1294} 1295 1296func joinStringSet(set map[string]struct{}, sep string) string { 1297 var xs []string 1298 for x := range set { 1299 xs = append(xs, x) 1300 } 1301 1302 return strings.Join(xs, sep) 1303} 1304 1305func setShardsCounter(indexDir string) { 1306 fns, err := filepath.Glob(filepath.Join(indexDir, "*.zoekt")) 1307 if err != nil { 1308 errorLog.Printf("setShardsCounter: %s\n", err) 1309 return 1310 } 1311 metricNumberShards.Set(float64(len(fns))) 1312 1313 compoundFns := make([]string, 0, len(fns)) 1314 for _, fn := range fns { 1315 if strings.HasPrefix(filepath.Base(fn), "compound-") { 1316 compoundFns = append(compoundFns, fn) 1317 } 1318 } 1319 metricNumberCompoundShards.Set(float64(len(compoundFns))) 1320} 1321 1322func rootCmd() *ffcli.Command { 1323 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1324 conf := rootConfig{ 1325 Main: true, 1326 } 1327 conf.registerRootFlags(rootFs) 1328 1329 return &ffcli.Command{ 1330 FlagSet: rootFs, 1331 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1332 Subcommands: []*ffcli.Command{debugCmd()}, 1333 Exec: func(ctx context.Context, args []string) error { 1334 return startServer(conf) 1335 }, 1336 } 1337} 1338 1339type rootConfig struct { 1340 // Main is true if this rootConfig is for our main long running command (the 1341 // indexserver). Debug commands should not set this value. This is used to 1342 // determine if we need to run tmpfriend. 1343 Main bool 1344 1345 root string 1346 interval time.Duration 1347 index string 1348 indexConcurrency int64 1349 listen string 1350 hostname string 1351 cpuFraction float64 1352 1353 // config values related to shard merging 1354 disableShardMerging bool 1355 vacuumInterval time.Duration 1356 mergeInterval time.Duration 1357 targetSize int64 1358 minSize int64 1359 minAgeDays int 1360 1361 // config values related to backoff indexing repos with one or more consecutive failures 1362 backoffDuration time.Duration 1363 maxBackoffDuration time.Duration 1364} 1365 1366func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1367 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1368 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1369 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently") 1370 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", index.DefaultDir), "set index directory to use") 1371 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1372 fs.StringVar(&rc.hostname, "hostname", index.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1373 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1374 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1375 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1376 1377 // flags related to shard merging 1378 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging") 1379 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1380 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1381 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB") 1382 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB") 1383 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1384} 1385 1386func startServer(conf rootConfig) error { 1387 s, err := newServer(conf) 1388 if err != nil { 1389 return err 1390 } 1391 1392 profiler.Init("zoekt-sourcegraph-indexserver") 1393 setShardsCounter(s.IndexDir) 1394 1395 if conf.listen != "" { 1396 1397 mux := http.NewServeMux() 1398 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1399 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1400 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1401 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1402 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1403 }...) 1404 s.addDebugHandlers(mux) 1405 1406 go func() { 1407 debugLog.Printf("serving HTTP on %s", conf.listen) 1408 mux := grpcutil.MultiplexGRPC(newGRPCServer(sglog.Scoped("indexserver"), s), mux) 1409 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1410 }() 1411 1412 // Serve mux on a unix domain socket on a best-effort-basis so that 1413 // webserver can call the endpoints via the shared filesystem. 1414 // 1415 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1416 // on the socket due to permission errors. See 1417 // https://github.com/docker/for-mac/issues/6239 1418 go func() { 1419 serveHTTPOverSocket := func() error { 1420 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1421 // We cannot bind a socket to an existing pathname. 1422 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1423 return fmt.Errorf("error removing socket file: %s", socket) 1424 } 1425 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1426 // unixpacket). 1427 l, err := net.Listen("unix", socket) 1428 if err != nil { 1429 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1430 } 1431 // Indexserver (root) and webserver (Sourcegraph) run with 1432 // different users. Per default, the socket is created with 1433 // permission 755 (root root), which doesn't let webserver write to 1434 // it. 1435 // 1436 // See https://github.com/golang/go/issues/11822 for more context. 1437 if err := os.Chmod(socket, 0o777); err != nil { 1438 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1439 } 1440 debugLog.Printf("serving HTTP on %s", socket) 1441 return http.Serve(l, mux) 1442 } 1443 debugLog.Print(serveHTTPOverSocket()) 1444 }() 1445 } 1446 1447 oc := &ownerChecker{ 1448 Path: filepath.Join(conf.index, "owner.txt"), 1449 Hostname: conf.hostname, 1450 } 1451 go oc.Run() 1452 1453 logger := sglog.Scoped("metricsRegistration") 1454 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1455 1456 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1457 prometheus.DefaultRegisterer.MustRegister(c) 1458 1459 s.Run() 1460 return nil 1461} 1462 1463func newServer(conf rootConfig) (*Server, error) { 1464 logger := sglog.Scoped("server") 1465 1466 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1467 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1468 } 1469 if conf.index == "" { 1470 return nil, fmt.Errorf("must set -index") 1471 } 1472 if conf.root == "" { 1473 return nil, fmt.Errorf("must set -sourcegraph_url") 1474 } 1475 rootURL, err := url.Parse(conf.root) 1476 if err != nil { 1477 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1478 } 1479 1480 rootURL = addDefaultPort(rootURL) 1481 1482 // Tune GOMAXPROCS to match Linux container CPU quota. 1483 _, _ = maxprocs.Set() 1484 1485 // Automatically prepend our own path at the front, to minimize 1486 // required configuration. 1487 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1488 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1489 } 1490 1491 if _, err := os.Stat(conf.index); err != nil { 1492 if err := os.MkdirAll(conf.index, 0o755); err != nil { 1493 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1494 } 1495 } 1496 1497 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1498 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1499 } 1500 1501 if srcLogLevelIsDebug() { 1502 debugLog.SetOutput(os.Stderr) 1503 } 1504 1505 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1506 if len(reposWithSeparateIndexingMetrics) > 0 { 1507 debugLog.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1508 } 1509 1510 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1511 if len(deltaBuildRepositoriesAllowList) > 0 { 1512 debugLog.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1513 } 1514 1515 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1516 if deltaShardNumberFallbackThreshold > 0 { 1517 debugLog.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1518 } else { 1519 debugLog.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1520 } 1521 1522 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1523 if len(reposShouldSkipSymbolsCalculation) > 0 { 1524 debugLog.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1525 } 1526 1527 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1528 if indexingTimeout != defaultIndexingTimeout { 1529 debugLog.Printf("using configured indexing timeout: %s", indexingTimeout) 1530 } 1531 1532 var sg Sourcegraph 1533 if rootURL.IsAbs() { 1534 var batchSize int 1535 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1536 batchSize, err = strconv.Atoi(v) 1537 if err != nil { 1538 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1539 } 1540 } 1541 1542 opts := []SourcegraphClientOption{ 1543 WithBatchSize(batchSize), 1544 } 1545 1546 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1547 client, err := dialGRPCClient(rootURL.Host, logger) 1548 if err != nil { 1549 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1550 } 1551 1552 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...) 1553 1554 } else { 1555 sg = sourcegraphFake{ 1556 RootDir: rootURL.String(), 1557 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1558 } 1559 } 1560 1561 cpuCount := max(int(math.Round(float64(runtime.GOMAXPROCS(0))*(conf.cpuFraction))), 1) 1562 1563 if conf.indexConcurrency < 1 { 1564 conf.indexConcurrency = 1 1565 } else if conf.indexConcurrency > int64(cpuCount) { 1566 conf.indexConcurrency = int64(cpuCount) 1567 } 1568 1569 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1570 1571 return &Server{ 1572 logger: logger, 1573 rootURL: rootURL, 1574 Sourcegraph: sg, 1575 IndexDir: conf.index, 1576 IndexConcurrency: int(conf.indexConcurrency), 1577 Interval: conf.interval, 1578 CPUCount: cpuCount, 1579 queue: *q, 1580 shardMerging: !conf.disableShardMerging, 1581 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1582 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1583 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1584 hostname: conf.hostname, 1585 mergeOpts: mergeOpts{ 1586 vacuumInterval: conf.vacuumInterval, 1587 mergeInterval: conf.mergeInterval, 1588 targetSizeBytes: conf.targetSize * 1024 * 1024, 1589 minSizeBytes: conf.minSize * 1024 * 1024, 1590 minAgeDays: conf.minAgeDays, 1591 }, 1592 timeout: indexingTimeout, 1593 indexSemaphore: make(chan struct{}, int(conf.indexConcurrency)), 1594 }, err 1595} 1596 1597func newGRPCServer(logger sglog.Logger, s *Server, additionalOpts ...grpc.ServerOption) *grpc.Server { 1598 grpcServer := defaults.NewServer(logger, additionalOpts...) 1599 indexserverv1.RegisterSourcegraphIndexserverServiceServer(grpcServer, s) 1600 return grpcServer 1601} 1602 1603// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1604// for the indexed-search-configuration gRPC service. 1605// 1606// The default backoff strategy is modeled after the default settings used by 1607// retryablehttp.DefaultClient. 1608// 1609// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1610// - Unavailable 1611// - Aborted 1612// 1613//go:embed default_grpc_service_configuration.json 1614var defaultGRPCServiceConfigurationJSON string 1615 1616func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1617 return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1618 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1619 return invoker(ctx, method, req, reply, cc, opts...) 1620 } 1621} 1622 1623func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1624 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1625 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1626 return streamer(ctx, desc, cc, method, opts...) 1627 } 1628} 1629 1630// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1631// This can be overridden by providing custom Server/Dial options. 1632const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1633 1634func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (configv1.ZoektConfigurationServiceClient, error) { 1635 metrics := clientMetricsOnce() 1636 1637 // If the service seems to be unavailable, this 1638 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1639 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1640 retryOpts := []retry.CallOption{ 1641 retry.WithMax(5), 1642 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1643 retry.WithCodes(codes.Unavailable), 1644 } 1645 1646 opts := []grpc.DialOption{ 1647 grpc.WithTransportCredentials(insecure.NewCredentials()), 1648 grpc.WithChainStreamInterceptor( 1649 metrics.StreamClientInterceptor(), 1650 messagesize.StreamClientInterceptor, 1651 internalActorStreamInterceptor(), 1652 internalerrs.LoggingStreamClientInterceptor(logger), 1653 internalerrs.PrometheusStreamClientInterceptor, 1654 retry.StreamClientInterceptor(retryOpts...), 1655 ), 1656 grpc.WithChainUnaryInterceptor( 1657 metrics.UnaryClientInterceptor(), 1658 messagesize.UnaryClientInterceptor, 1659 internalActorUnaryInterceptor(), 1660 internalerrs.LoggingUnaryClientInterceptor(logger), 1661 internalerrs.PrometheusUnaryClientInterceptor, 1662 retry.UnaryClientInterceptor(retryOpts...), 1663 ), 1664 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1665 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1666 } 1667 1668 opts = append(opts, additionalOpts...) 1669 1670 // Ensure that the message size options are set last, so they override any other 1671 // client-specific options that tweak the message size. 1672 // 1673 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1674 // take precedence over everything else with a uniform size setting that's easy to reason about. 1675 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1676 1677 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1678 // This is done lazily, so we can provide the client to use regardless of 1679 // whether we enabled gRPC or not initially. 1680 cc, err := grpc.Dial(addr, opts...) 1681 if err != nil { 1682 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1683 } 1684 1685 client := configv1.NewZoektConfigurationServiceClient(cc) 1686 return client, nil 1687} 1688 1689// addDefaultPort adds a default port to a URL if one is not specified. 1690// 1691// If the URL scheme is "http" and no port is specified, "80" is used. 1692// If the scheme is "https", "443" is used. 1693// 1694// The original URL is not mutated. A copy is modified and returned. 1695func addDefaultPort(original *url.URL) *url.URL { 1696 if original == nil { 1697 return nil // don't panic 1698 } 1699 1700 if !original.IsAbs() { 1701 return original // don't do anything if the URL doesn't look like a remote URL 1702 } 1703 1704 if original.Scheme == "http" && original.Port() == "" { 1705 u := cloneURL(original) 1706 u.Host = net.JoinHostPort(u.Host, "80") 1707 return u 1708 } 1709 1710 if original.Scheme == "https" && original.Port() == "" { 1711 u := cloneURL(original) 1712 u.Host = net.JoinHostPort(u.Host, "443") 1713 return u 1714 } 1715 1716 return original 1717} 1718 1719// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1720// This is copied from net/http/clone.go 1721func cloneURL(u *url.URL) *url.URL { 1722 if u == nil { 1723 return nil 1724 } 1725 u2 := new(url.URL) 1726 *u2 = *u 1727 if u.User != nil { 1728 u2.User = new(url.Userinfo) 1729 *u2.User = *u.User 1730 } 1731 return u2 1732} 1733 1734func main() { 1735 liblog := sglog.Init(sglog.Resource{ 1736 Name: "zoekt-indexserver", 1737 Version: index.Version, 1738 InstanceID: index.HostnameBestEffort(), 1739 }) 1740 defer liblog.Sync() 1741 1742 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1743 log.Fatal(err) 1744 } 1745} 1746 1747// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1748// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1749// 1750// An error is returned of the provided environment variables fails to parse as a boolean. 1751func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1752 for _, envVar := range envVarNames { 1753 v := os.Getenv(envVar) 1754 if v == "" { 1755 continue 1756 } 1757 1758 b, err := strconv.ParseBool(v) 1759 if err != nil { 1760 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1761 } 1762 1763 return b, nil 1764 } 1765 1766 return defaultBool, nil 1767}