fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Licensed under the Apache License, Version 2.0 (the "License"); 2// you may not use this file except in compliance with the License. 3// You may obtain a copy of the License at 4// 5// http://www.apache.org/licenses/LICENSE-2.0 6// 7// Unless required by applicable law or agreed to in writing, software 8// distributed under the License is distributed on an "AS IS" BASIS, 9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 10// See the License for the specific language governing permissions and 11// limitations under the License. 12 13// Command zoekt-sourcegraph-indexserver periodically reindexes repositories 14// from a Sourcegraph instance. It uses a "pull-based" design, where it periodically 15// reaches out to the Sourcegraph instance for the list of repositories to reindex. 16package main 17 18import ( 19 "bytes" 20 "context" 21 _ "embed" 22 "encoding/json" 23 "errors" 24 "flag" 25 "fmt" 26 "html/template" 27 "io" 28 "io/fs" 29 "log" 30 "math" 31 "math/rand" 32 "net" 33 "net/http" 34 "net/url" 35 "os" 36 "os/exec" 37 "os/signal" 38 "path/filepath" 39 "runtime" 40 "slices" 41 "sort" 42 "strconv" 43 "strings" 44 "sync" 45 "text/tabwriter" 46 "time" 47 48 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 49 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 50 "github.com/peterbourgon/ff/v3/ffcli" 51 "github.com/prometheus/client_golang/prometheus" 52 "github.com/prometheus/client_golang/prometheus/promauto" 53 sglog "github.com/sourcegraph/log" 54 "github.com/sourcegraph/mountinfo" 55 56 "github.com/sourcegraph/zoekt" 57 configv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/sourcegraph/zoekt/configuration/v1" 58 indexserverv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/zoekt/indexserver/v1" 59 "github.com/sourcegraph/zoekt/grpc/defaults" 60 "github.com/sourcegraph/zoekt/grpc/grpcutil" 61 "github.com/sourcegraph/zoekt/grpc/internalerrs" 62 "github.com/sourcegraph/zoekt/grpc/messagesize" 63 "github.com/sourcegraph/zoekt/index" 64 "github.com/sourcegraph/zoekt/internal/debugserver" 65 "github.com/sourcegraph/zoekt/internal/profiler" 66 "github.com/sourcegraph/zoekt/internal/tenant" 67 68 "go.uber.org/automaxprocs/maxprocs" 69 "go.uber.org/multierr" 70 "golang.org/x/net/trace" 71 "golang.org/x/sys/unix" 72 "google.golang.org/grpc" 73 "google.golang.org/grpc/codes" 74 "google.golang.org/grpc/credentials/insecure" 75 "google.golang.org/grpc/metadata" 76) 77 78var ( 79 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 80 Name: "resolve_revisions_seconds", 81 Help: "A histogram of latencies for resolving all repository revisions.", 82 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 83 }) 84 85 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 86 Name: "resolve_revision_seconds", 87 Help: "A histogram of latencies for resolving a repository revision.", 88 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 89 }, []string{"success"}) // success=true|false 90 91 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 92 Name: "get_index_options_total", 93 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 94 }) 95 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 96 Name: "get_index_options_error_total", 97 Help: "The total number of times we failed to get index options for a repository.", 98 }) 99 100 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 101 Name: "index_repo_seconds", 102 Help: "A histogram of latencies for indexing a repository.", 103 Buckets: prometheus.ExponentialBucketsRange( 104 (100 * time.Millisecond).Seconds(), 105 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 106 20), 107 }, []string{ 108 "state", // state is an indexState 109 "name", // name of the repository that was indexed 110 }) 111 112 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{ 113 Name: "index_indexing_delay_seconds", 114 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.", 115 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days 116 }, []string{ 117 "state", // state is an indexState 118 "name", // the name of the repository that was indexed 119 }) 120 121 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 122 Name: "index_fetch_seconds", 123 Help: "A histogram of latencies for fetching a repository.", 124 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 125 }, []string{ 126 "success", // true|false 127 "name", // the name of the repository that the commits were fetched from 128 }) 129 130 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 131 Name: "index_incremental_index_state", 132 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 133 }, []string{"state"}) // state is index.IndexState 134 135 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 136 Name: "index_num_indexed", 137 Help: "Number of indexed repos by code host", 138 }) 139 140 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 141 Name: "index_failing_total", 142 Help: "Counts failures to index (indexing activity, should be used with rate())", 143 }) 144 145 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 146 Name: "index_indexing_total", 147 Help: "Counts indexings (indexing activity, should be used with rate())", 148 }) 149 150 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 151 Name: "index_num_stopped_tracking_total", 152 Help: "Counts the number of repos we stopped tracking.", 153 }) 154 155 // clientMetricsOnce returns a singleton instance of the client metrics 156 // that are shared across all gRPC clients that this process creates. 157 // 158 // This function panics if the metrics cannot be registered with the default 159 // Prometheus registry. 160 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics { 161 clientMetrics := grpcprom.NewClientMetrics( 162 grpcprom.WithClientCounterOptions(), 163 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 164 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 165 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 166 ) 167 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 168 return clientMetrics 169 }) 170) 171 172// 1 MB; match https://sourcegraph.sourcegraph.com/r/github.com/sourcegraph/sourcegraph/-/blob/cmd/searcher/internal/search/store.go?L32 173const MaxFileSize = 1 << 20 174 175// set of repositories that we want to capture separate indexing metrics for 176var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 177 178type indexState string 179 180const ( 181 indexStateFail indexState = "fail" 182 indexStateSuccess indexState = "success" 183 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 184 indexStateNoop indexState = "noop" // We didn't need to update index 185 indexStateEmpty indexState = "empty" // index is empty (empty repo) 186) 187 188// Server is the main functionality of zoekt-sourcegraph-indexserver. It 189// exists to conveniently use all the options passed in via func main. 190type Server struct { 191 logger sglog.Logger 192 193 Sourcegraph Sourcegraph 194 195 // rootURL is the root URL of the Sourcegraph instance. 196 rootURL *url.URL 197 198 BatchSize int 199 200 // IndexDir is the index directory to use. 201 IndexDir string 202 203 // IndexConcurrency is the number of repositories we index at once. 204 IndexConcurrency int 205 206 // indexSemaphore limits the number of concurrent index operations 207 indexSemaphore chan struct{} 208 209 // Interval is how often we sync with Sourcegraph. 210 Interval time.Duration 211 212 // CPUCount is the number of CPUs to use for indexing shards. 213 CPUCount int 214 215 queue Queue 216 217 // muIndexDir protects the index directory from concurrent access. 218 muIndexDir indexMutex 219 220 // If true, shard merging is enabled. 221 shardMerging bool 222 223 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 224 // use delta-builds for instead of normal builds 225 deltaBuildRepositoriesAllowList map[string]struct{} 226 227 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 228 // before attempting a delta build. 229 deltaShardNumberFallbackThreshold uint64 230 231 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 232 // we skip calculating symbols metadata for during builds 233 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 234 235 hostname string 236 237 mergeOpts mergeOpts 238 239 // timeout defines how long the index server waits before killing an indexing job. 240 timeout time.Duration 241 242 indexserverv1.UnimplementedSourcegraphIndexserverServiceServer 243} 244 245var ( 246 debugLog = log.New(io.Discard, "[DEBUG] ", log.LstdFlags) 247 infoLog = log.New(os.Stderr, "[INFO] ", log.LstdFlags) 248 errorLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags) 249) 250 251// our index commands should output something every 100mb they process. 252// 253// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 254// time." famous last words. A client was indexing a monorepo with 42 255// cores... 5m was not enough. 256const noOutputTimeout = 30 * time.Minute 257 258func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 259 out := &synchronizedBuffer{} 260 cmd.Stdout = out 261 cmd.Stderr = out 262 263 tr.LazyPrintf("%s", cmd.Args) 264 265 defer func() { 266 if err != nil { 267 outS := out.String() 268 tr.LazyPrintf("failed: %v", err) 269 tr.LazyPrintf("output: %s", out) 270 tr.SetError() 271 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 272 } 273 }() 274 275 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 276 277 if err := cmd.Start(); err != nil { 278 return err 279 } 280 281 errC := make(chan error) 282 go func() { 283 errC <- cmd.Wait() 284 }() 285 286 // This channel is set after we have sent sigquit. It allows us to follow up 287 // with a sigkill if the process doesn't quit after sigquit. 288 kill := make(<-chan time.Time) 289 290 lastLen := 0 291 for { 292 select { 293 case <-time.After(noOutputTimeout): 294 // Periodically check if we have had output. If not kill the process. 295 if out.Len() != lastLen { 296 lastLen = out.Len() 297 infoLog.Printf("still running %s", cmd.Args) 298 } else { 299 // Send quit (C-\) first so we get a stack dump. 300 infoLog.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 301 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 302 errorLog.Println("quit failed:", err) 303 } 304 305 // send sigkill if still running in 10s 306 kill = time.After(10 * time.Second) 307 } 308 309 case <-kill: 310 infoLog.Printf("still running, killing %s", cmd.Args) 311 if err := cmd.Process.Kill(); err != nil { 312 errorLog.Println("kill failed:", err) 313 } 314 315 case err := <-errC: 316 if err != nil { 317 return err 318 } 319 320 tr.LazyPrintf("success") 321 return nil 322 } 323 } 324} 325 326// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 327// monitor the buffer while it is being written to. 328type synchronizedBuffer struct { 329 mu sync.Mutex 330 b bytes.Buffer 331} 332 333func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 334 sb.mu.Lock() 335 defer sb.mu.Unlock() 336 return sb.b.Write(p) 337} 338 339func (sb *synchronizedBuffer) Len() int { 340 sb.mu.Lock() 341 defer sb.mu.Unlock() 342 return sb.b.Len() 343} 344 345func (sb *synchronizedBuffer) String() string { 346 sb.mu.Lock() 347 defer sb.mu.Unlock() 348 return sb.b.String() 349} 350 351// pauseFileName if present in IndexDir will stop index jobs from 352// running. This is to make it possible to experiment with the content of the 353// IndexDir without the indexserver writing to it. 354const ( 355 pauseFileName = "PAUSE" 356 pauseEnvVar = "SRC_PAUSE_INDEXING" 357) 358 359// isIndexingPaused checks if indexing should be paused based on either: 360// 1. The presence of a PAUSE file in the index directory 361// 2. The SRC_PAUSE_INDEXING environment variable being set to true 362func isIndexingPaused(indexDir string) (bool, string) { 363 // Check for PAUSE file first 364 if b, err := os.ReadFile(filepath.Join(indexDir, pauseFileName)); err == nil { 365 return true, fmt.Sprintf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 366 } 367 368 // Then check environment variable 369 if getEnvWithDefaultBool(pauseEnvVar, false) { 370 return true, fmt.Sprintf("indexserver paused via %s environment variable", pauseEnvVar) 371 } 372 373 return false, "" 374} 375 376// Run the sync loop. This blocks forever. 377func (s *Server) Run() { 378 removeIncompleteShards(s.IndexDir) 379 380 // Start a goroutine which updates the queue with commits to index. 381 go func() { 382 // We update the list of indexed repos every Interval. To speed up manual 383 // testing we also listen for SIGUSR1 to trigger updates. 384 // "pkill -SIGUSR1 zoekt-sourcegra" 385 for range jitterTicker(s.Interval, unix.SIGUSR1) { 386 if paused, msg := isIndexingPaused(s.IndexDir); paused { 387 infoLog.Printf("%s", msg) 388 continue 389 } 390 391 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 392 if err != nil { 393 errorLog.Printf("error listing repos: %s", err) 394 continue 395 } 396 397 debugLog.Printf("updating index queue with %d repositories", len(repos.IDs)) 398 399 // Stop indexing repos we don't need to track anymore 400 removed := s.queue.MaybeRemoveMissing(repos.IDs) 401 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 402 if len(removed) > 0 { 403 infoLog.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 404 } 405 406 cleanupDone := make(chan struct{}) 407 go func() { 408 defer close(cleanupDone) 409 s.muIndexDir.Global(func() { 410 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 411 }) 412 }() 413 414 repos.IterateIndexOptions(s.queue.AddOrUpdate) 415 416 // IterateIndexOptions will only iterate over repositories that have 417 // changed since we last called list. However, we want to add all IDs 418 // back onto the queue just to check that what is on disk is still 419 // correct. This will use the last IndexOptions we stored in the 420 // queue. The repositories not on the queue (missing) need a forced 421 // fetch of IndexOptions. 422 missing := s.queue.Bump(repos.IDs) 423 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 424 425 setShardsCounter(s.IndexDir) 426 427 <-cleanupDone 428 } 429 }() 430 431 go func() { 432 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 433 if s.shardMerging { 434 s.vacuum() 435 } 436 } 437 }() 438 439 go func() { 440 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 441 if s.shardMerging { 442 s.doMerge() 443 } 444 } 445 }() 446 447 for range s.IndexConcurrency { 448 go s.processQueue() 449 } 450 451 // block forever 452 select {} 453} 454 455// formatList returns a comma-separated list of the first min(len(v), m) items. 456func formatListUint32(v []uint32, m int) string { 457 if len(v) < m { 458 m = len(v) 459 } 460 461 sb := strings.Builder{} 462 for i := range m { 463 fmt.Fprintf(&sb, "%d, ", v[i]) 464 } 465 466 if len(v) > m { 467 sb.WriteString("...") 468 } 469 470 return strings.TrimRight(sb.String(), ", ") 471} 472 473func (s *Server) processQueue() { 474 for { 475 if paused, _ := isIndexingPaused(s.IndexDir); paused { 476 time.Sleep(time.Second) 477 continue 478 } 479 480 item, ok := s.queue.Pop() 481 if !ok { 482 time.Sleep(time.Second) 483 continue 484 } 485 486 opts := item.Opts 487 args := s.indexArgs(opts) 488 489 ran := s.muIndexDir.With(opts.Name, func() { 490 // only record time taken once we hold the lock. This avoids us 491 // recording time taken while merging/cleanup runs. 492 start := time.Now() 493 494 state, err := s.index(context.Background(), args) 495 496 elapsed := time.Since(start) 497 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 498 499 indexDelay := time.Since(item.DateAddedToQueue) 500 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds()) 501 502 if err != nil { 503 errorLog.Printf("error indexing %s: %s", args.String(), err) 504 } 505 506 switch state { 507 case indexStateSuccess: 508 var branches []string 509 for _, b := range args.Branches { 510 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 511 } 512 s.logger.Info("updated index", 513 sglog.Int("tenant", args.TenantID), 514 sglog.String("repo", args.Name), 515 sglog.Uint32("id", args.RepoID), 516 sglog.Strings("branches", branches), 517 sglog.Duration("duration", elapsed), 518 sglog.Duration("index_delay", indexDelay), 519 ) 520 case indexStateSuccessMeta: 521 infoLog.Printf("updated meta %s in %v", args.String(), elapsed) 522 } 523 s.queue.SetIndexed(opts, state) 524 }) 525 526 if !ran { 527 // Someone else is processing the repository. We can just skip this job 528 // since the repository will be added back to the queue and we will 529 // converge to the correct behaviour. 530 debugLog.Printf("index job for repository already running: %s", args) 531 continue 532 } 533 } 534} 535 536// repoNameForMetric returns a normalized version of the given repository name that is 537// suitable for use with Prometheus metrics. 538func repoNameForMetric(repo string) string { 539 // Check to see if we want to be able to capture separate indexing metrics for this repository. 540 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 541 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 542 return repo 543 } 544 545 return "" 546} 547 548func batched(slice []uint32, size int) <-chan []uint32 { 549 c := make(chan []uint32) 550 go func() { 551 for len(slice) > 0 { 552 if size > len(slice) { 553 size = len(slice) 554 } 555 c <- slice[:size] 556 slice = slice[size:] 557 } 558 close(c) 559 }() 560 return c 561} 562 563// jitterTicker returns a ticker which ticks with a jitter. Each tick is 564// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 565// 566// sig is a list of signals which also cause the ticker to fire. This is a 567// convenience to allow manually triggering of the ticker. 568func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 569 ticker := make(chan struct{}) 570 571 go func() { 572 for { 573 ticker <- struct{}{} 574 ns := int64(d) 575 jitter := rand.Int63n(ns) 576 time.Sleep(time.Duration(ns/2 + jitter)) 577 } 578 }() 579 580 go func() { 581 if len(sig) == 0 { 582 return 583 } 584 585 c := make(chan os.Signal, 1) 586 signal.Notify(c, sig...) 587 for range c { 588 ticker <- struct{}{} 589 } 590 }() 591 592 return ticker 593} 594 595// Index starts an index job for repo name at commit. 596func (s *Server) index(ctx context.Context, args *indexArgs) (state indexState, err error) { 597 tr := trace.New("index", args.Name) 598 tr.SetMaxEvents(30) // Ensure we capture all indexing events 599 600 defer func() { 601 if err != nil { 602 tr.SetError() 603 tr.LazyPrintf("error: %v", err) 604 state = indexStateFail 605 metricFailingTotal.Inc() 606 } 607 tr.LazyPrintf("state: %s", state) 608 tr.Finish() 609 }() 610 611 // Sourcegraph should always provide a tenant ID. 612 if args.TenantID < 1 { 613 return indexStateFail, tenant.ErrMissingTenant 614 } 615 616 tr.LazyPrintf("branches: %v", args.Branches) 617 618 if len(args.Branches) == 0 { 619 return indexStateEmpty, createEmptyShard(args) 620 } 621 622 repositoryName := args.Name 623 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 624 tr.LazyPrintf("marking this repository for delta build") 625 args.UseDelta = true 626 } 627 628 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 629 630 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 631 tr.LazyPrintf("skipping symbols calculation") 632 args.Symbols = false 633 } 634 635 reason := "forced" 636 637 if args.Incremental { 638 bo := args.BuildOptions() 639 bo.SetDefaults() 640 incrementalState, fn := bo.IndexState() 641 reason = string(incrementalState) 642 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 643 644 switch incrementalState { 645 case index.IndexStateEqual: 646 debugLog.Printf("%s index already up to date. Shard=%s", args.String(), fn) 647 return indexStateNoop, nil 648 649 case index.IndexStateMeta: 650 infoLog.Printf("updating index.meta %s", args.String()) 651 652 // TODO(stefan) handle mergeMeta for tenant id. 653 if err := mergeMeta(bo); err != nil { 654 errorLog.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 655 } else { 656 return indexStateSuccessMeta, nil 657 } 658 659 case index.IndexStateCorrupt: 660 infoLog.Printf("falling back to full update: corrupt index: %s", args.String()) 661 } 662 } 663 664 infoLog.Printf("updating index %s reason=%s", args.String(), reason) 665 666 metricIndexingTotal.Inc() 667 c := gitIndexConfig{ 668 runCmd: func(cmd *exec.Cmd) error { 669 return s.loggedRun(tr, cmd) 670 }, 671 672 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 673 return args.BuildOptions().FindRepositoryMetadata() 674 }, 675 timeout: s.timeout, 676 } 677 678 logIndexStatusUpdateError := func(err error) { 679 s.logger.Error("failed to update index status", 680 sglog.String("repo", args.Name), 681 sglog.Uint32("id", args.RepoID), 682 sglogBranches("branches", args.Branches), 683 sglog.Error(err), 684 ) 685 } 686 687 err = gitIndex(ctx, c, args, s.Sourcegraph, s.logger) 688 if err != nil { 689 if statusErr := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph, err); statusErr != nil { 690 logIndexStatusUpdateError(statusErr) 691 } 692 return indexStateFail, err 693 } 694 695 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph, nil); err != nil { 696 logIndexStatusUpdateError(err) 697 } 698 699 return indexStateSuccess, nil 700} 701 702// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 703// it can update the zoekt_repos table. 704func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph, indexErr error) error { 705 state := configv1.UpdateIndexStatusRequest_Repository_STATE_SUCCESS 706 var failureMessage string 707 var indexTimeUnix int64 708 if indexErr != nil { 709 state = configv1.UpdateIndexStatusRequest_Repository_STATE_FAILURE 710 failureMessage = indexErr.Error() 711 712 // On failure, metadata may not exist yet. Include index time if we can, 713 // but do not block reporting the failure status. 714 if _, metadata, ok, err := c.findRepositoryMetadata(args); err == nil && ok { 715 indexTimeUnix = metadata.IndexTime.Unix() 716 } 717 } else { 718 // We need to read from disk for IndexTime. 719 _, metadata, ok, err := c.findRepositoryMetadata(args) 720 if err != nil { 721 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 722 } 723 if !ok { 724 return errors.New("failed to find metadata for new/updated index") 725 } 726 indexTimeUnix = metadata.IndexTime.Unix() 727 } 728 729 status := []indexStatus{{ 730 RepoID: args.RepoID, 731 Branches: args.Branches, 732 IndexTimeUnix: indexTimeUnix, 733 State: state, 734 FailureMessage: failureMessage, 735 }} 736 737 if err := sg.UpdateIndexStatus(status); err != nil { 738 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 739 } 740 741 return nil 742} 743 744func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 745 ss := make([]string, len(branches)) 746 for i, b := range branches { 747 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 748 } 749 return sglog.Strings(key, ss) 750} 751 752func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 753 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0)) 754 return &indexArgs{ 755 IndexOptions: opts, 756 IndexDir: s.IndexDir, 757 Parallelism: parallelism, 758 Incremental: true, 759 ShardMerging: s.shardMerging, 760 } 761} 762 763// parallelism consults both the server flags and index options to determine the number 764// of shards to index in parallel. If the CPUCount index option is provided, it always 765// overrides the server flag. 766func (s *Server) parallelism(opts IndexOptions, maxProcs int) int { 767 var parallelism int 768 if opts.ShardConcurrency > 0 { 769 parallelism = int(opts.ShardConcurrency) 770 } else { 771 parallelism = s.CPUCount 772 } 773 774 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs 775 if parallelism > 4*maxProcs { 776 parallelism = 4 * maxProcs 777 } 778 779 // If index concurrency is set, then divide the parallelism by the number of 780 // repos we're indexing in parallel 781 if s.IndexConcurrency > 1 { 782 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency))) 783 } 784 785 return parallelism 786} 787 788func createEmptyShard(args *indexArgs) error { 789 bo := args.BuildOptions() 790 bo.SetDefaults() 791 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 792 793 if args.Incremental && bo.IncrementalSkipIndexing() { 794 return nil 795 } 796 797 builder, err := index.NewBuilder(*bo) 798 if err != nil { 799 return err 800 } 801 return builder.Finish() 802} 803 804// addDebugHandlers adds handlers specific to indexserver. 805func (s *Server) addDebugHandlers(mux *http.ServeMux) { 806 // Sourcegraph's site admin view requires indexserver to serve it's admin view 807 // on "/". 808 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 809 810 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 811 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 812 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 813 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 814 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 815 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 816} 817 818func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 819 if r.Method != "GET" { 820 w.Header().Set("Allow", "GET") 821 w.WriteHeader(http.StatusMethodNotAllowed) 822 return 823 } 824 825 response := struct { 826 Hostname string 827 }{ 828 Hostname: s.hostname, 829 } 830 831 b, err := json.Marshal(response) 832 if err != nil { 833 http.Error(w, err.Error(), http.StatusInternalServerError) 834 return 835 } 836 837 w.Header().Set("Content-Type", "application/json; charset=utf-8") 838 w.Write(b) 839} 840 841var rootTmpl = template.Must(template.New("name").Parse(` 842<html> 843 <body> 844 <a href="debug">Debug</a><br /> 845 <a href="debug/requests">Traces</a><br /> 846 {{.IndexMsg}}<br /> 847 <br /> 848 <h3>Reindex</h3> 849 {{if .Repos}} 850 <a href="?show_repos=false">hide repos</a><br /> 851 <table style="margin-top: 20px"> 852 <th style="text-align:left">Name</th> 853 <th style="text-align:left">ID (click to reindex)</th> 854 {{range .Repos}} 855 <tr> 856 <td>{{.Name}}</td> 857 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 858 </tr> 859 {{end}} 860 </table> 861 {{else}} 862 <a href="?show_repos=true">show repos</a><br /> 863 {{end}} 864 </body> 865</html> 866`)) 867 868func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 869 if r.Method != "GET" { 870 w.Header().Set("Allow", "GET") 871 w.WriteHeader(http.StatusMethodNotAllowed) 872 return 873 } 874 875 values := r.URL.Query() 876 877 // ?id= 878 indexMsg := "" 879 if v := values.Get("id"); v != "" { 880 id, err := strconv.Atoi(v) 881 if err != nil { 882 http.Error(w, err.Error(), http.StatusBadRequest) 883 return 884 } 885 indexMsg, _ = s.forceIndex(r.Context(), uint32(id)) 886 } 887 888 // ?show_repos= 889 showRepos := false 890 if v := values.Get("show_repos"); v != "" { 891 showRepos, _ = strconv.ParseBool(v) 892 } 893 894 type Repo struct { 895 ID uint32 896 Name string 897 } 898 var data struct { 899 Repos []Repo 900 IndexMsg string 901 } 902 903 data.IndexMsg = indexMsg 904 905 if showRepos { 906 s.queue.Iterate(func(opts *IndexOptions) { 907 data.Repos = append(data.Repos, Repo{ 908 ID: opts.RepoID, 909 Name: opts.Name, 910 }) 911 }) 912 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 913 } 914 915 _ = rootTmpl.Execute(w, data) 916} 917 918// handleReindex triggers a reindex asynocronously. If a reindex was triggered 919// the request returns with status 202. The caller can infer the new state of 920// the index by calling List. 921func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 922 if r.Method != http.MethodPost { 923 w.Header().Set("Allow", http.MethodPost) 924 w.WriteHeader(http.StatusMethodNotAllowed) 925 return 926 } 927 928 err := r.ParseForm() 929 if err != nil { 930 http.Error(w, err.Error(), http.StatusBadRequest) 931 return 932 } 933 934 id, err := strconv.Atoi(r.Form.Get("repo")) 935 if err != nil { 936 http.Error(w, err.Error(), http.StatusBadRequest) 937 return 938 } 939 940 go func() { s.forceIndex(context.Background(), uint32(id)) }() 941 942 // 202 Accepted 943 w.WriteHeader(http.StatusAccepted) 944} 945 946func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 947 withIndexed := true 948 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 949 withIndexed = b 950 } 951 952 var indexed []uint32 953 if withIndexed { 954 indexed = listIndexed(s.IndexDir) 955 } 956 957 repos, err := s.Sourcegraph.List(r.Context(), indexed) 958 if err != nil { 959 http.Error(w, err.Error(), http.StatusInternalServerError) 960 return 961 } 962 963 bw := bytes.Buffer{} 964 965 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 966 967 _, err = fmt.Fprintf(tw, "ID\tName\n") 968 if err != nil { 969 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 970 return 971 } 972 973 s.queue.mu.Lock() 974 name := "" 975 for _, id := range repos.IDs { 976 if item := s.queue.get(id); item != nil { 977 name = item.opts.Name 978 } else { 979 name = "" 980 } 981 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 982 if err != nil { 983 debugLog.Printf("handleDebugList: %s\n", err.Error()) 984 } 985 } 986 s.queue.mu.Unlock() 987 988 if err != nil { 989 http.Error(w, err.Error(), http.StatusInternalServerError) 990 return 991 } 992 993 err = tw.Flush() 994 if err != nil { 995 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 996 return 997 } 998 999 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 1000 1001 if _, err := io.Copy(w, &bw); err != nil { 1002 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 1003 return 1004 } 1005} 1006 1007// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 1008// can run this command during periods of low usage (evenings, weekends) to 1009// trigger an initial merge run. In the steady-state, merges happen rarely, even 1010// on busy instances, and users can rely on automatic merging instead. 1011func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 1012 // A merge operation can take very long, depending on the number merges and the 1013 // target size of the compound shards. We run the merge in the background and 1014 // return immediately to the user. 1015 // 1016 // We track the status of the merge with metricShardMergingRunning. 1017 go func() { 1018 s.doMerge() 1019 }() 1020 _, _ = w.Write([]byte("merging enqueued\n")) 1021} 1022 1023func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 1024 indexed := listIndexed(s.IndexDir) 1025 1026 bw := bytes.Buffer{} 1027 1028 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 1029 1030 _, err := fmt.Fprintf(tw, "ID\tName\n") 1031 if err != nil { 1032 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 1033 return 1034 } 1035 1036 s.queue.mu.Lock() 1037 name := "" 1038 for _, id := range indexed { 1039 if item := s.queue.get(id); item != nil { 1040 name = item.opts.Name 1041 } else { 1042 name = "" 1043 } 1044 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 1045 if err != nil { 1046 debugLog.Printf("handleDebugIndexed: %s\n", err.Error()) 1047 } 1048 } 1049 s.queue.mu.Unlock() 1050 1051 if err != nil { 1052 http.Error(w, err.Error(), http.StatusInternalServerError) 1053 return 1054 } 1055 1056 err = tw.Flush() 1057 if err != nil { 1058 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 1059 return 1060 } 1061 1062 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 1063 1064 if _, err := io.Copy(w, &bw); err != nil { 1065 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 1066 return 1067 } 1068} 1069 1070// forceIndex will run the index job for repo name now. It will return always 1071// return a string explaining what it did, even if it failed. 1072func (s *Server) forceIndex(ctx context.Context, id uint32) (string, error) { 1073 var opts IndexOptions 1074 var err error 1075 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 1076 opts = o 1077 }, func(_ uint32, e error) { 1078 err = e 1079 }, id) 1080 if err != nil { 1081 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 1082 } 1083 1084 args := s.indexArgs(opts) 1085 args.Incremental = false // force re-index 1086 1087 var state indexState 1088 ran := s.muIndexDir.With(opts.Name, func() { 1089 state, err = s.index(ctx, args) 1090 }) 1091 if !ran { 1092 return fmt.Sprintf("index job for repository already running: %s", args), nil 1093 } 1094 if err != nil { 1095 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 1096 } 1097 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 1098} 1099 1100// DeleteAllData deletes all shards in the index and trash dir belonging to the 1101// tenant associated with the request. The deletion is best-effort, which means 1102// we will delete as much as possible. If no error is returned, the caller can 1103// be certain that all data has been deleted. 1104func (s *Server) DeleteAllData(ctx context.Context, _ *indexserverv1.DeleteAllDataRequest) (*indexserverv1.DeleteAllDataResponse, error) { 1105 tnt, err := tenant.FromContext(ctx) 1106 if err != nil { 1107 return nil, err 1108 } 1109 s.logger.Warn("DeleteAllData", sglog.Int("tenant_id", tnt.ID())) 1110 1111 var merr error 1112 s.muIndexDir.Global(func() { 1113 // First, explode all compound shards that have repos from the tenant in 1114 // question. Because we hold the global lock, we can be sure that no new 1115 // merges start while we do this. 1116 if err := s.explodeTenantCompoundShards(ctx, func(path string) error { 1117 // We call explode in a separate process to protect indexserver. 1118 cmd := defaultExplodeCmd(path) 1119 1120 stdoutBuf := &bytes.Buffer{} 1121 stderrBuf := &bytes.Buffer{} 1122 cmd.Stdout = stdoutBuf 1123 cmd.Stderr = stderrBuf 1124 1125 err := cmd.Run() 1126 if err != nil { 1127 errorLog.Printf("explode failed: %v (stderr: %s)", err, stderrBuf.String()) 1128 return err 1129 } 1130 1131 infoLog.Printf("exploded shard: %s", stdoutBuf.String()) 1132 1133 return nil 1134 }); err != nil { 1135 merr = multierr.Append(merr, err) 1136 } 1137 1138 // Invariant: all shards from the tenant are simple shards. 1139 1140 if err := purgeTenantShards(ctx, s.IndexDir); err != nil { 1141 merr = multierr.Append(merr, err) 1142 } 1143 if err := purgeTenantShards(ctx, filepath.Join(s.IndexDir, ".trash")); err != nil { 1144 merr = multierr.Append(merr, err) 1145 } 1146 }) 1147 1148 return &indexserverv1.DeleteAllDataResponse{}, merr 1149} 1150 1151func listIndexed(indexDir string) []uint32 { 1152 index := getShards(indexDir) 1153 metricNumIndexed.Set(float64(len(index))) 1154 repoIDs := make([]uint32, 0, len(index)) 1155 for id := range index { 1156 repoIDs = append(repoIDs, id) 1157 } 1158 slices.Sort(repoIDs) 1159 return repoIDs 1160} 1161 1162// setupTmpDir sets up a temporary directory on the same volume as the 1163// indexes. 1164// 1165// If main is true we will delete older temp directories left around. main is 1166// false when this is a debug command. 1167func setupTmpDir(logger sglog.Logger, main bool, index string) error { 1168 // change the target tmp directory depending on if it's our main daemon or a 1169 // debug sub command. 1170 dir := ".indexserver.debug.tmp" 1171 if main { 1172 dir = ".indexserver.tmp" 1173 } 1174 1175 tmpRoot := filepath.Join(index, dir) 1176 1177 if main { 1178 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1179 err := os.RemoveAll(tmpRoot) 1180 if err != nil { 1181 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1182 } 1183 } 1184 1185 if err := os.MkdirAll(tmpRoot, 0o755); err != nil { 1186 return err 1187 } 1188 1189 return os.Setenv("TMPDIR", tmpRoot) 1190} 1191 1192func printMetaData(fn string) error { 1193 repo, indexMeta, err := index.ReadMetadataPath(fn) 1194 if err != nil { 1195 return err 1196 } 1197 1198 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1199 if err != nil { 1200 return err 1201 } 1202 1203 err = json.NewEncoder(os.Stdout).Encode(repo) 1204 if err != nil { 1205 return err 1206 } 1207 return nil 1208} 1209 1210func printShardStats(fn string) error { 1211 f, err := os.Open(fn) 1212 if err != nil { 1213 return err 1214 } 1215 1216 iFile, err := index.NewIndexFile(f) 1217 if err != nil { 1218 return err 1219 } 1220 1221 return index.PrintNgramStats(iFile) 1222} 1223 1224func srcLogLevelIsDebug() bool { 1225 lvl := os.Getenv(sglog.EnvLogLevel) 1226 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1227} 1228 1229func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1230 v := os.Getenv(k) 1231 if v == "" { 1232 return defaultVal 1233 } 1234 b, err := strconv.ParseBool(v) 1235 if err != nil { 1236 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1237 } 1238 return b 1239} 1240 1241func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1242 v := os.Getenv(k) 1243 if v == "" { 1244 return defaultVal 1245 } 1246 i, err := strconv.ParseInt(v, 10, 64) 1247 if err != nil { 1248 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1249 } 1250 return i 1251} 1252 1253func getEnvWithDefaultInt(k string, defaultVal int) int { 1254 v := os.Getenv(k) 1255 if v == "" { 1256 return defaultVal 1257 } 1258 i, err := strconv.Atoi(k) 1259 if err != nil { 1260 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1261 } 1262 return i 1263} 1264 1265func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1266 v := os.Getenv(k) 1267 if v == "" { 1268 return defaultVal 1269 } 1270 i, err := strconv.ParseUint(v, 10, 64) 1271 if err != nil { 1272 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1273 } 1274 return i 1275} 1276 1277func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1278 v := os.Getenv(k) 1279 if v == "" { 1280 return defaultVal 1281 } 1282 f, err := strconv.ParseFloat(v, 64) 1283 if err != nil { 1284 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1285 } 1286 return f 1287} 1288 1289func getEnvWithDefaultString(k string, defaultVal string) string { 1290 v := os.Getenv(k) 1291 if v == "" { 1292 return defaultVal 1293 } 1294 return v 1295} 1296 1297func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1298 v := os.Getenv(k) 1299 if v == "" { 1300 return defaultVal 1301 } 1302 1303 d, err := time.ParseDuration(v) 1304 if err != nil { 1305 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1306 } 1307 return d 1308} 1309 1310func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1311 set := map[string]struct{}{} 1312 for _, v := range strings.Split(os.Getenv(k), ",") { 1313 v = strings.TrimSpace(v) 1314 if v != "" { 1315 set[v] = struct{}{} 1316 } 1317 } 1318 return set 1319} 1320 1321func joinStringSet(set map[string]struct{}, sep string) string { 1322 var xs []string 1323 for x := range set { 1324 xs = append(xs, x) 1325 } 1326 1327 return strings.Join(xs, sep) 1328} 1329 1330func setShardsCounter(indexDir string) { 1331 fns, err := filepath.Glob(filepath.Join(indexDir, "*.zoekt")) 1332 if err != nil { 1333 errorLog.Printf("setShardsCounter: %s\n", err) 1334 return 1335 } 1336 metricNumberShards.Set(float64(len(fns))) 1337 1338 compoundFns := make([]string, 0, len(fns)) 1339 for _, fn := range fns { 1340 if strings.HasPrefix(filepath.Base(fn), "compound-") { 1341 compoundFns = append(compoundFns, fn) 1342 } 1343 } 1344 metricNumberCompoundShards.Set(float64(len(compoundFns))) 1345} 1346 1347func rootCmd() *ffcli.Command { 1348 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1349 conf := rootConfig{ 1350 Main: true, 1351 } 1352 conf.registerRootFlags(rootFs) 1353 1354 return &ffcli.Command{ 1355 FlagSet: rootFs, 1356 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1357 Subcommands: []*ffcli.Command{debugCmd()}, 1358 Exec: func(ctx context.Context, args []string) error { 1359 return startServer(conf) 1360 }, 1361 } 1362} 1363 1364type rootConfig struct { 1365 // Main is true if this rootConfig is for our main long running command (the 1366 // indexserver). Debug commands should not set this value. This is used to 1367 // determine if we need to run tmpfriend. 1368 Main bool 1369 1370 root string 1371 interval time.Duration 1372 index string 1373 indexConcurrency int64 1374 listen string 1375 hostname string 1376 cpuFraction float64 1377 1378 // config values related to shard merging 1379 disableShardMerging bool 1380 vacuumInterval time.Duration 1381 mergeInterval time.Duration 1382 targetSize int64 1383 minSize int64 1384 minAgeDays int 1385 1386 // config values related to backoff indexing repos with one or more consecutive failures 1387 backoffDuration time.Duration 1388 maxBackoffDuration time.Duration 1389} 1390 1391func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1392 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1393 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1394 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently") 1395 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", index.DefaultDir), "set index directory to use") 1396 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1397 fs.StringVar(&rc.hostname, "hostname", index.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1398 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1399 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1400 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1401 1402 // flags related to shard merging 1403 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging") 1404 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1405 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1406 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB") 1407 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB") 1408 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1409} 1410 1411func startServer(conf rootConfig) error { 1412 s, err := newServer(conf) 1413 if err != nil { 1414 return err 1415 } 1416 1417 profiler.Init("zoekt-sourcegraph-indexserver") 1418 setShardsCounter(s.IndexDir) 1419 1420 if conf.listen != "" { 1421 1422 mux := http.NewServeMux() 1423 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1424 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1425 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1426 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1427 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1428 }...) 1429 s.addDebugHandlers(mux) 1430 1431 go func() { 1432 debugLog.Printf("serving HTTP on %s", conf.listen) 1433 mux := grpcutil.MultiplexGRPC(newGRPCServer(sglog.Scoped("indexserver"), s), mux) 1434 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1435 }() 1436 1437 // Serve mux on a unix domain socket on a best-effort-basis so that 1438 // webserver can call the endpoints via the shared filesystem. 1439 // 1440 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1441 // on the socket due to permission errors. See 1442 // https://github.com/docker/for-mac/issues/6239 1443 go func() { 1444 serveHTTPOverSocket := func() error { 1445 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1446 // We cannot bind a socket to an existing pathname. 1447 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1448 return fmt.Errorf("error removing socket file: %s", socket) 1449 } 1450 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1451 // unixpacket). 1452 l, err := net.Listen("unix", socket) 1453 if err != nil { 1454 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1455 } 1456 // Indexserver (root) and webserver (Sourcegraph) run with 1457 // different users. Per default, the socket is created with 1458 // permission 755 (root root), which doesn't let webserver write to 1459 // it. 1460 // 1461 // See https://github.com/golang/go/issues/11822 for more context. 1462 if err := os.Chmod(socket, 0o777); err != nil { 1463 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1464 } 1465 debugLog.Printf("serving HTTP on %s", socket) 1466 return http.Serve(l, mux) 1467 } 1468 debugLog.Print(serveHTTPOverSocket()) 1469 }() 1470 } 1471 1472 oc := &ownerChecker{ 1473 Path: filepath.Join(conf.index, "owner.txt"), 1474 Hostname: conf.hostname, 1475 } 1476 go oc.Run() 1477 1478 logger := sglog.Scoped("metricsRegistration") 1479 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1480 1481 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1482 prometheus.DefaultRegisterer.MustRegister(c) 1483 1484 s.Run() 1485 return nil 1486} 1487 1488func newServer(conf rootConfig) (*Server, error) { 1489 logger := sglog.Scoped("server") 1490 1491 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1492 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1493 } 1494 if conf.index == "" { 1495 return nil, fmt.Errorf("must set -index") 1496 } 1497 if conf.root == "" { 1498 return nil, fmt.Errorf("must set -sourcegraph_url") 1499 } 1500 rootURL, err := url.Parse(conf.root) 1501 if err != nil { 1502 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1503 } 1504 1505 rootURL = addDefaultPort(rootURL) 1506 1507 // Tune GOMAXPROCS to match Linux container CPU quota. 1508 _, _ = maxprocs.Set() 1509 1510 // Automatically prepend our own path at the front, to minimize 1511 // required configuration. 1512 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1513 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1514 } 1515 1516 if _, err := os.Stat(conf.index); err != nil { 1517 if err := os.MkdirAll(conf.index, 0o755); err != nil { 1518 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1519 } 1520 } 1521 1522 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1523 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1524 } 1525 1526 if srcLogLevelIsDebug() { 1527 debugLog.SetOutput(os.Stderr) 1528 } 1529 1530 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1531 if len(reposWithSeparateIndexingMetrics) > 0 { 1532 debugLog.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1533 } 1534 1535 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1536 if len(deltaBuildRepositoriesAllowList) > 0 { 1537 debugLog.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1538 } 1539 1540 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1541 if deltaShardNumberFallbackThreshold > 0 { 1542 debugLog.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1543 } else { 1544 debugLog.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1545 } 1546 1547 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1548 if len(reposShouldSkipSymbolsCalculation) > 0 { 1549 debugLog.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1550 } 1551 1552 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1553 if indexingTimeout != defaultIndexingTimeout { 1554 debugLog.Printf("using configured indexing timeout: %s", indexingTimeout) 1555 } 1556 1557 var sg Sourcegraph 1558 if rootURL.IsAbs() { 1559 var batchSize int 1560 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1561 batchSize, err = strconv.Atoi(v) 1562 if err != nil { 1563 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1564 } 1565 } 1566 1567 opts := []SourcegraphClientOption{ 1568 WithBatchSize(batchSize), 1569 } 1570 1571 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1572 client, err := dialGRPCClient(rootURL.Host, logger) 1573 if err != nil { 1574 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1575 } 1576 1577 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...) 1578 1579 } else { 1580 sg = sourcegraphFake{ 1581 RootDir: rootURL.String(), 1582 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1583 } 1584 } 1585 1586 cpuCount := max(int(math.Round(float64(runtime.GOMAXPROCS(0))*(conf.cpuFraction))), 1) 1587 1588 if conf.indexConcurrency < 1 { 1589 conf.indexConcurrency = 1 1590 } else if conf.indexConcurrency > int64(cpuCount) { 1591 conf.indexConcurrency = int64(cpuCount) 1592 } 1593 1594 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1595 1596 return &Server{ 1597 logger: logger, 1598 rootURL: rootURL, 1599 Sourcegraph: sg, 1600 IndexDir: conf.index, 1601 IndexConcurrency: int(conf.indexConcurrency), 1602 Interval: conf.interval, 1603 CPUCount: cpuCount, 1604 queue: *q, 1605 shardMerging: !conf.disableShardMerging, 1606 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1607 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1608 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1609 hostname: conf.hostname, 1610 mergeOpts: mergeOpts{ 1611 vacuumInterval: conf.vacuumInterval, 1612 mergeInterval: conf.mergeInterval, 1613 targetSizeBytes: conf.targetSize * 1024 * 1024, 1614 minSizeBytes: conf.minSize * 1024 * 1024, 1615 minAgeDays: conf.minAgeDays, 1616 }, 1617 timeout: indexingTimeout, 1618 indexSemaphore: make(chan struct{}, int(conf.indexConcurrency)), 1619 }, err 1620} 1621 1622func newGRPCServer(logger sglog.Logger, s *Server, additionalOpts ...grpc.ServerOption) *grpc.Server { 1623 grpcServer := defaults.NewServer(logger, additionalOpts...) 1624 indexserverv1.RegisterSourcegraphIndexserverServiceServer(grpcServer, s) 1625 return grpcServer 1626} 1627 1628// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1629// for the indexed-search-configuration gRPC service. 1630// 1631// The default backoff strategy is modeled after the default settings used by 1632// retryablehttp.DefaultClient. 1633// 1634// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1635// - Unavailable 1636// - Aborted 1637// 1638//go:embed default_grpc_service_configuration.json 1639var defaultGRPCServiceConfigurationJSON string 1640 1641func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1642 return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1643 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1644 return invoker(ctx, method, req, reply, cc, opts...) 1645 } 1646} 1647 1648func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1649 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1650 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1651 return streamer(ctx, desc, cc, method, opts...) 1652 } 1653} 1654 1655// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1656// This can be overridden by providing custom Server/Dial options. 1657const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1658 1659func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (configv1.ZoektConfigurationServiceClient, error) { 1660 metrics := clientMetricsOnce() 1661 1662 // If the service seems to be unavailable, this 1663 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1664 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1665 retryOpts := []retry.CallOption{ 1666 retry.WithMax(5), 1667 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1668 retry.WithCodes(codes.Unavailable), 1669 } 1670 1671 opts := []grpc.DialOption{ 1672 grpc.WithTransportCredentials(insecure.NewCredentials()), 1673 grpc.WithChainStreamInterceptor( 1674 metrics.StreamClientInterceptor(), 1675 messagesize.StreamClientInterceptor, 1676 internalActorStreamInterceptor(), 1677 internalerrs.LoggingStreamClientInterceptor(logger), 1678 internalerrs.PrometheusStreamClientInterceptor, 1679 retry.StreamClientInterceptor(retryOpts...), 1680 ), 1681 grpc.WithChainUnaryInterceptor( 1682 metrics.UnaryClientInterceptor(), 1683 messagesize.UnaryClientInterceptor, 1684 internalActorUnaryInterceptor(), 1685 internalerrs.LoggingUnaryClientInterceptor(logger), 1686 internalerrs.PrometheusUnaryClientInterceptor, 1687 retry.UnaryClientInterceptor(retryOpts...), 1688 ), 1689 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1690 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1691 } 1692 1693 opts = append(opts, additionalOpts...) 1694 1695 // Ensure that the message size options are set last, so they override any other 1696 // client-specific options that tweak the message size. 1697 // 1698 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1699 // take precedence over everything else with a uniform size setting that's easy to reason about. 1700 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1701 1702 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1703 // This is done lazily, so we can provide the client to use regardless of 1704 // whether we enabled gRPC or not initially. 1705 cc, err := grpc.Dial(addr, opts...) 1706 if err != nil { 1707 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1708 } 1709 1710 client := configv1.NewZoektConfigurationServiceClient(cc) 1711 return client, nil 1712} 1713 1714// addDefaultPort adds a default port to a URL if one is not specified. 1715// 1716// If the URL scheme is "http" and no port is specified, "80" is used. 1717// If the scheme is "https", "443" is used. 1718// 1719// The original URL is not mutated. A copy is modified and returned. 1720func addDefaultPort(original *url.URL) *url.URL { 1721 if original == nil { 1722 return nil // don't panic 1723 } 1724 1725 if !original.IsAbs() { 1726 return original // don't do anything if the URL doesn't look like a remote URL 1727 } 1728 1729 if original.Scheme == "http" && original.Port() == "" { 1730 u := cloneURL(original) 1731 u.Host = net.JoinHostPort(u.Host, "80") 1732 return u 1733 } 1734 1735 if original.Scheme == "https" && original.Port() == "" { 1736 u := cloneURL(original) 1737 u.Host = net.JoinHostPort(u.Host, "443") 1738 return u 1739 } 1740 1741 return original 1742} 1743 1744// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1745// This is copied from net/http/clone.go 1746func cloneURL(u *url.URL) *url.URL { 1747 if u == nil { 1748 return nil 1749 } 1750 u2 := new(url.URL) 1751 *u2 = *u 1752 if u.User != nil { 1753 u2.User = new(url.Userinfo) 1754 *u2.User = *u.User 1755 } 1756 return u2 1757} 1758 1759func main() { 1760 liblog := sglog.Init(sglog.Resource{ 1761 Name: "zoekt-indexserver", 1762 Version: index.Version, 1763 InstanceID: index.HostnameBestEffort(), 1764 }) 1765 defer liblog.Sync() 1766 1767 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1768 log.Fatal(err) 1769 } 1770} 1771 1772// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1773// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1774// 1775// An error is returned of the provided environment variables fails to parse as a boolean. 1776func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1777 for _, envVar := range envVarNames { 1778 v := os.Getenv(envVar) 1779 if v == "" { 1780 continue 1781 } 1782 1783 b, err := strconv.ParseBool(v) 1784 if err != nil { 1785 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1786 } 1787 1788 return b, nil 1789 } 1790 1791 return defaultBool, nil 1792}