fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Licensed under the Apache License, Version 2.0 (the "License"); 2// you may not use this file except in compliance with the License. 3// You may obtain a copy of the License at 4// 5// http://www.apache.org/licenses/LICENSE-2.0 6// 7// Unless required by applicable law or agreed to in writing, software 8// distributed under the License is distributed on an "AS IS" BASIS, 9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 10// See the License for the specific language governing permissions and 11// limitations under the License. 12 13// Command zoekt-sourcegraph-indexserver periodically reindexes repositories 14// from a Sourcegraph instance. It uses a "pull-based" design, where it periodically 15// reaches out to the Sourcegraph instance for the list of repositories to reindex. 16package main 17 18import ( 19 "bytes" 20 "context" 21 _ "embed" 22 "encoding/json" 23 "errors" 24 "flag" 25 "fmt" 26 "html/template" 27 "io" 28 "io/fs" 29 "log" 30 "math" 31 "math/rand" 32 "net" 33 "net/http" 34 "net/url" 35 "os" 36 "os/exec" 37 "os/signal" 38 "path/filepath" 39 "runtime" 40 "slices" 41 "sort" 42 "strconv" 43 "strings" 44 "sync" 45 "text/tabwriter" 46 "time" 47 "unicode/utf8" 48 49 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 50 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 51 "github.com/peterbourgon/ff/v3/ffcli" 52 "github.com/prometheus/client_golang/prometheus" 53 "github.com/prometheus/client_golang/prometheus/promauto" 54 sglog "github.com/sourcegraph/log" 55 "github.com/sourcegraph/mountinfo" 56 57 "github.com/sourcegraph/zoekt" 58 configv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/sourcegraph/zoekt/configuration/v1" 59 indexserverv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/zoekt/indexserver/v1" 60 "github.com/sourcegraph/zoekt/grpc/defaults" 61 "github.com/sourcegraph/zoekt/grpc/grpcutil" 62 "github.com/sourcegraph/zoekt/grpc/internalerrs" 63 "github.com/sourcegraph/zoekt/grpc/messagesize" 64 "github.com/sourcegraph/zoekt/index" 65 "github.com/sourcegraph/zoekt/internal/debugserver" 66 "github.com/sourcegraph/zoekt/internal/profiler" 67 "github.com/sourcegraph/zoekt/internal/tenant" 68 69 "go.uber.org/automaxprocs/maxprocs" 70 "go.uber.org/multierr" 71 "golang.org/x/net/trace" 72 "golang.org/x/sys/unix" 73 "google.golang.org/grpc" 74 "google.golang.org/grpc/codes" 75 "google.golang.org/grpc/credentials/insecure" 76 "google.golang.org/grpc/metadata" 77) 78 79var ( 80 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 81 Name: "resolve_revisions_seconds", 82 Help: "A histogram of latencies for resolving all repository revisions.", 83 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 84 }) 85 86 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 87 Name: "resolve_revision_seconds", 88 Help: "A histogram of latencies for resolving a repository revision.", 89 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 90 }, []string{"success"}) // success=true|false 91 92 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 93 Name: "get_index_options_total", 94 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 95 }) 96 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 97 Name: "get_index_options_error_total", 98 Help: "The total number of times we failed to get index options for a repository.", 99 }) 100 101 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 102 Name: "index_repo_seconds", 103 Help: "A histogram of latencies for indexing a repository.", 104 Buckets: prometheus.ExponentialBucketsRange( 105 (100 * time.Millisecond).Seconds(), 106 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 107 20), 108 }, []string{ 109 "state", // state is an indexState 110 "name", // name of the repository that was indexed 111 }) 112 113 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{ 114 Name: "index_indexing_delay_seconds", 115 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.", 116 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days 117 }, []string{ 118 "state", // state is an indexState 119 "name", // the name of the repository that was indexed 120 }) 121 122 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 123 Name: "index_fetch_seconds", 124 Help: "A histogram of latencies for fetching a repository.", 125 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 126 }, []string{ 127 "success", // true|false 128 "name", // the name of the repository that the commits were fetched from 129 }) 130 131 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 132 Name: "index_incremental_index_state", 133 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 134 }, []string{"state"}) // state is index.IndexState 135 136 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 137 Name: "index_num_indexed", 138 Help: "Number of indexed repos by code host", 139 }) 140 141 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 142 Name: "index_failing_total", 143 Help: "Counts failures to index (indexing activity, should be used with rate())", 144 }) 145 146 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 147 Name: "index_indexing_total", 148 Help: "Counts indexings (indexing activity, should be used with rate())", 149 }) 150 151 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 152 Name: "index_num_stopped_tracking_total", 153 Help: "Counts the number of repos we stopped tracking.", 154 }) 155 156 // clientMetricsOnce returns a singleton instance of the client metrics 157 // that are shared across all gRPC clients that this process creates. 158 // 159 // This function panics if the metrics cannot be registered with the default 160 // Prometheus registry. 161 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics { 162 clientMetrics := grpcprom.NewClientMetrics( 163 grpcprom.WithClientCounterOptions(), 164 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 165 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 166 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 167 ) 168 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 169 return clientMetrics 170 }) 171) 172 173// 1 MB; match https://sourcegraph.sourcegraph.com/r/github.com/sourcegraph/sourcegraph/-/blob/cmd/searcher/internal/search/store.go?L32 174const MaxFileSize = 1 << 20 175 176// set of repositories that we want to capture separate indexing metrics for 177var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 178 179type indexState string 180 181const ( 182 indexStateFail indexState = "fail" 183 indexStateSuccess indexState = "success" 184 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 185 indexStateNoop indexState = "noop" // We didn't need to update index 186 indexStateEmpty indexState = "empty" // index is empty (empty repo) 187) 188 189// Server is the main functionality of zoekt-sourcegraph-indexserver. It 190// exists to conveniently use all the options passed in via func main. 191type Server struct { 192 logger sglog.Logger 193 194 Sourcegraph Sourcegraph 195 196 // rootURL is the root URL of the Sourcegraph instance. 197 rootURL *url.URL 198 199 BatchSize int 200 201 // IndexDir is the index directory to use. 202 IndexDir string 203 204 // IndexConcurrency is the number of repositories we index at once. 205 IndexConcurrency int 206 207 // indexSemaphore limits the number of concurrent index operations 208 indexSemaphore chan struct{} 209 210 // Interval is how often we sync with Sourcegraph. 211 Interval time.Duration 212 213 // CPUCount is the number of CPUs to use for indexing shards. 214 CPUCount int 215 216 queue Queue 217 218 // muIndexDir protects the index directory from concurrent access. 219 muIndexDir indexMutex 220 221 // If true, shard merging is enabled. 222 shardMerging bool 223 224 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 225 // use delta-builds for instead of normal builds 226 deltaBuildRepositoriesAllowList map[string]struct{} 227 228 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 229 // before attempting a delta build. 230 deltaShardNumberFallbackThreshold uint64 231 232 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 233 // we skip calculating symbols metadata for during builds 234 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 235 236 hostname string 237 238 mergeOpts mergeOpts 239 240 // timeout defines how long the index server waits before killing an indexing job. 241 timeout time.Duration 242 243 indexserverv1.UnimplementedSourcegraphIndexserverServiceServer 244} 245 246var ( 247 debugLog = log.New(io.Discard, "[DEBUG] ", log.LstdFlags) 248 infoLog = log.New(os.Stderr, "[INFO] ", log.LstdFlags) 249 errorLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags) 250) 251 252// our index commands should output something every 100mb they process. 253// 254// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 255// time." famous last words. A client was indexing a monorepo with 42 256// cores... 5m was not enough. 257const noOutputTimeout = 30 * time.Minute 258 259const ( 260 maxFailureMessageBytes = 12 * 1024 261 failureMessageTruncationMarker = "\n\n[Error message truncated]\n\n" 262) 263 264func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 265 out := &synchronizedBuffer{} 266 cmd.Stdout = out 267 cmd.Stderr = out 268 269 tr.LazyPrintf("%s", cmd.Args) 270 271 defer func() { 272 if err != nil { 273 outS := out.String() 274 tr.LazyPrintf("failed: %v", err) 275 tr.LazyPrintf("output: %s", out) 276 tr.SetError() 277 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 278 } 279 }() 280 281 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 282 283 if err := cmd.Start(); err != nil { 284 return err 285 } 286 287 errC := make(chan error) 288 go func() { 289 errC <- cmd.Wait() 290 }() 291 292 // This channel is set after we have sent sigquit. It allows us to follow up 293 // with a sigkill if the process doesn't quit after sigquit. 294 kill := make(<-chan time.Time) 295 296 lastLen := 0 297 for { 298 select { 299 case <-time.After(noOutputTimeout): 300 // Periodically check if we have had output. If not kill the process. 301 if out.Len() != lastLen { 302 lastLen = out.Len() 303 infoLog.Printf("still running %s", cmd.Args) 304 } else { 305 // Send quit (C-\) first so we get a stack dump. 306 infoLog.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 307 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 308 errorLog.Println("quit failed:", err) 309 } 310 311 // send sigkill if still running in 10s 312 kill = time.After(10 * time.Second) 313 } 314 315 case <-kill: 316 infoLog.Printf("still running, killing %s", cmd.Args) 317 if err := cmd.Process.Kill(); err != nil { 318 errorLog.Println("kill failed:", err) 319 } 320 321 case err := <-errC: 322 if err != nil { 323 return err 324 } 325 326 tr.LazyPrintf("success") 327 return nil 328 } 329 } 330} 331 332// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 333// monitor the buffer while it is being written to. 334type synchronizedBuffer struct { 335 mu sync.Mutex 336 b bytes.Buffer 337} 338 339func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 340 sb.mu.Lock() 341 defer sb.mu.Unlock() 342 return sb.b.Write(p) 343} 344 345func (sb *synchronizedBuffer) Len() int { 346 sb.mu.Lock() 347 defer sb.mu.Unlock() 348 return sb.b.Len() 349} 350 351func (sb *synchronizedBuffer) String() string { 352 sb.mu.Lock() 353 defer sb.mu.Unlock() 354 return sb.b.String() 355} 356 357// pauseFileName if present in IndexDir will stop index jobs from 358// running. This is to make it possible to experiment with the content of the 359// IndexDir without the indexserver writing to it. 360const ( 361 pauseFileName = "PAUSE" 362 pauseEnvVar = "SRC_PAUSE_INDEXING" 363) 364 365// isIndexingPaused checks if indexing should be paused based on either: 366// 1. The presence of a PAUSE file in the index directory 367// 2. The SRC_PAUSE_INDEXING environment variable being set to true 368func isIndexingPaused(indexDir string) (bool, string) { 369 // Check for PAUSE file first 370 if b, err := os.ReadFile(filepath.Join(indexDir, pauseFileName)); err == nil { 371 return true, fmt.Sprintf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 372 } 373 374 // Then check environment variable 375 if getEnvWithDefaultBool(pauseEnvVar, false) { 376 return true, fmt.Sprintf("indexserver paused via %s environment variable", pauseEnvVar) 377 } 378 379 return false, "" 380} 381 382// Run the sync loop. This blocks forever. 383func (s *Server) Run() { 384 removeIncompleteShards(s.IndexDir) 385 386 // Start a goroutine which updates the queue with commits to index. 387 go func() { 388 // We update the list of indexed repos every Interval. To speed up manual 389 // testing we also listen for SIGUSR1 to trigger updates. 390 // "pkill -SIGUSR1 zoekt-sourcegra" 391 for range jitterTicker(s.Interval, unix.SIGUSR1) { 392 if paused, msg := isIndexingPaused(s.IndexDir); paused { 393 infoLog.Printf("%s", msg) 394 continue 395 } 396 397 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 398 if err != nil { 399 errorLog.Printf("error listing repos: %s", err) 400 continue 401 } 402 403 debugLog.Printf("updating index queue with %d repositories", len(repos.IDs)) 404 405 // Stop indexing repos we don't need to track anymore 406 removed := s.queue.MaybeRemoveMissing(repos.IDs) 407 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 408 if len(removed) > 0 { 409 infoLog.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 410 } 411 412 cleanupDone := make(chan struct{}) 413 go func() { 414 defer close(cleanupDone) 415 s.muIndexDir.Global(func() { 416 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 417 }) 418 }() 419 420 repos.IterateIndexOptions(s.queue.AddOrUpdate) 421 422 // IterateIndexOptions will only iterate over repositories that have 423 // changed since we last called list. However, we want to add all IDs 424 // back onto the queue just to check that what is on disk is still 425 // correct. This will use the last IndexOptions we stored in the 426 // queue. The repositories not on the queue (missing) need a forced 427 // fetch of IndexOptions. 428 missing := s.queue.Bump(repos.IDs) 429 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 430 431 setShardsCounter(s.IndexDir) 432 433 <-cleanupDone 434 } 435 }() 436 437 go func() { 438 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 439 if s.shardMerging { 440 s.vacuum() 441 } 442 } 443 }() 444 445 go func() { 446 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 447 if s.shardMerging { 448 s.doMerge() 449 } 450 } 451 }() 452 453 for range s.IndexConcurrency { 454 go s.processQueue() 455 } 456 457 // block forever 458 select {} 459} 460 461// formatList returns a comma-separated list of the first min(len(v), m) items. 462func formatListUint32(v []uint32, m int) string { 463 if len(v) < m { 464 m = len(v) 465 } 466 467 sb := strings.Builder{} 468 for i := range m { 469 fmt.Fprintf(&sb, "%d, ", v[i]) 470 } 471 472 if len(v) > m { 473 sb.WriteString("...") 474 } 475 476 return strings.TrimRight(sb.String(), ", ") 477} 478 479func (s *Server) processQueue() { 480 for { 481 if paused, _ := isIndexingPaused(s.IndexDir); paused { 482 time.Sleep(time.Second) 483 continue 484 } 485 486 item, ok := s.queue.Pop() 487 if !ok { 488 time.Sleep(time.Second) 489 continue 490 } 491 492 opts := item.Opts 493 args := s.indexArgs(opts) 494 495 ran := s.muIndexDir.With(opts.Name, func() { 496 // only record time taken once we hold the lock. This avoids us 497 // recording time taken while merging/cleanup runs. 498 start := time.Now() 499 500 state, err := s.index(context.Background(), args) 501 502 elapsed := time.Since(start) 503 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 504 505 indexDelay := time.Since(item.DateAddedToQueue) 506 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds()) 507 508 if err != nil { 509 errorLog.Printf("error indexing %s: %s", args.String(), err) 510 } 511 512 switch state { 513 case indexStateSuccess: 514 var branches []string 515 for _, b := range args.Branches { 516 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 517 } 518 s.logger.Info("updated index", 519 sglog.Int("tenant", args.TenantID), 520 sglog.String("repo", args.Name), 521 sglog.Uint32("id", args.RepoID), 522 sglog.Strings("branches", branches), 523 sglog.Duration("duration", elapsed), 524 sglog.Duration("index_delay", indexDelay), 525 ) 526 case indexStateSuccessMeta: 527 infoLog.Printf("updated meta %s in %v", args.String(), elapsed) 528 } 529 s.queue.SetIndexed(opts, state) 530 }) 531 532 if !ran { 533 // Someone else is processing the repository. We can just skip this job 534 // since the repository will be added back to the queue and we will 535 // converge to the correct behaviour. 536 debugLog.Printf("index job for repository already running: %s", args) 537 continue 538 } 539 } 540} 541 542// repoNameForMetric returns a normalized version of the given repository name that is 543// suitable for use with Prometheus metrics. 544func repoNameForMetric(repo string) string { 545 // Check to see if we want to be able to capture separate indexing metrics for this repository. 546 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 547 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 548 return repo 549 } 550 551 return "" 552} 553 554func batched(slice []uint32, size int) <-chan []uint32 { 555 c := make(chan []uint32) 556 go func() { 557 for len(slice) > 0 { 558 if size > len(slice) { 559 size = len(slice) 560 } 561 c <- slice[:size] 562 slice = slice[size:] 563 } 564 close(c) 565 }() 566 return c 567} 568 569// jitterTicker returns a ticker which ticks with a jitter. Each tick is 570// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 571// 572// sig is a list of signals which also cause the ticker to fire. This is a 573// convenience to allow manually triggering of the ticker. 574func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 575 ticker := make(chan struct{}) 576 577 go func() { 578 for { 579 ticker <- struct{}{} 580 ns := int64(d) 581 jitter := rand.Int63n(ns) 582 time.Sleep(time.Duration(ns/2 + jitter)) 583 } 584 }() 585 586 go func() { 587 if len(sig) == 0 { 588 return 589 } 590 591 c := make(chan os.Signal, 1) 592 signal.Notify(c, sig...) 593 for range c { 594 ticker <- struct{}{} 595 } 596 }() 597 598 return ticker 599} 600 601// Index starts an index job for repo name at commit. 602func (s *Server) index(ctx context.Context, args *indexArgs) (state indexState, err error) { 603 tr := trace.New("index", args.Name) 604 tr.SetMaxEvents(30) // Ensure we capture all indexing events 605 606 defer func() { 607 if err != nil { 608 tr.SetError() 609 tr.LazyPrintf("error: %v", err) 610 state = indexStateFail 611 metricFailingTotal.Inc() 612 } 613 tr.LazyPrintf("state: %s", state) 614 tr.Finish() 615 }() 616 617 // Sourcegraph should always provide a tenant ID. 618 if args.TenantID < 1 { 619 return indexStateFail, tenant.ErrMissingTenant 620 } 621 622 tr.LazyPrintf("branches: %v", args.Branches) 623 624 if len(args.Branches) == 0 { 625 return indexStateEmpty, createEmptyShard(args) 626 } 627 628 repositoryName := args.Name 629 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 630 tr.LazyPrintf("marking this repository for delta build") 631 args.UseDelta = true 632 } 633 634 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 635 636 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 637 tr.LazyPrintf("skipping symbols calculation") 638 args.Symbols = false 639 } 640 641 reason := "forced" 642 643 if args.Incremental { 644 bo := args.BuildOptions() 645 bo.SetDefaults() 646 incrementalState, fn := bo.IndexState() 647 reason = string(incrementalState) 648 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 649 650 switch incrementalState { 651 case index.IndexStateEqual: 652 debugLog.Printf("%s index already up to date. Shard=%s", args.String(), fn) 653 return indexStateNoop, nil 654 655 case index.IndexStateMeta: 656 infoLog.Printf("updating index.meta %s", args.String()) 657 658 // TODO(stefan) handle mergeMeta for tenant id. 659 if err := mergeMeta(bo); err != nil { 660 errorLog.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 661 } else { 662 return indexStateSuccessMeta, nil 663 } 664 665 case index.IndexStateCorrupt: 666 infoLog.Printf("falling back to full update: corrupt index: %s", args.String()) 667 } 668 } 669 670 infoLog.Printf("updating index %s reason=%s", args.String(), reason) 671 672 metricIndexingTotal.Inc() 673 c := gitIndexConfig{ 674 runCmd: func(cmd *exec.Cmd) error { 675 return s.loggedRun(tr, cmd) 676 }, 677 678 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 679 return args.BuildOptions().FindRepositoryMetadata() 680 }, 681 timeout: s.timeout, 682 } 683 684 logIndexStatusUpdateError := func(err error) { 685 s.logger.Error("failed to update index status", 686 sglog.String("repo", args.Name), 687 sglog.Uint32("id", args.RepoID), 688 sglogBranches("branches", args.Branches), 689 sglog.Error(err), 690 ) 691 } 692 693 err = gitIndex(ctx, c, args, s.Sourcegraph, s.logger) 694 if err != nil { 695 if statusErr := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph, err); statusErr != nil { 696 logIndexStatusUpdateError(statusErr) 697 } 698 return indexStateFail, err 699 } 700 701 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph, nil); err != nil { 702 logIndexStatusUpdateError(err) 703 } 704 705 return indexStateSuccess, nil 706} 707 708// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 709// it can update the zoekt_repos table. 710func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph, indexErr error) error { 711 state := configv1.UpdateIndexStatusRequest_Repository_STATE_SUCCESS 712 var failureMessage string 713 var indexTimeUnix int64 714 if indexErr != nil { 715 state = configv1.UpdateIndexStatusRequest_Repository_STATE_FAILURE 716 failureMessage = truncateFailureMessageForSourcegraph(indexErr.Error()) 717 718 // On failure, metadata may not exist yet. Include index time if we can, 719 // but do not block reporting the failure status. 720 if _, metadata, ok, err := c.findRepositoryMetadata(args); err == nil && ok { 721 indexTimeUnix = metadata.IndexTime.Unix() 722 } 723 } else { 724 // We need to read from disk for IndexTime. 725 _, metadata, ok, err := c.findRepositoryMetadata(args) 726 if err != nil { 727 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 728 } 729 if !ok { 730 return errors.New("failed to find metadata for new/updated index") 731 } 732 indexTimeUnix = metadata.IndexTime.Unix() 733 } 734 735 status := []indexStatus{{ 736 RepoID: args.RepoID, 737 Branches: args.Branches, 738 IndexTimeUnix: indexTimeUnix, 739 State: state, 740 FailureMessage: failureMessage, 741 }} 742 743 if err := sg.UpdateIndexStatus(status); err != nil { 744 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 745 } 746 747 return nil 748} 749 750func truncateFailureMessageForSourcegraph(s string) string { 751 if len(s) <= maxFailureMessageBytes { 752 return s 753 } 754 755 budget := maxFailureMessageBytes - len(failureMessageTruncationMarker) 756 headBytes := budget / 2 757 tailBytes := budget - headBytes 758 759 return utf8PrefixBytes(s, headBytes) + failureMessageTruncationMarker + utf8SuffixBytes(s, tailBytes) 760} 761 762func utf8PrefixBytes(s string, maxBytes int) string { 763 if len(s) <= maxBytes { 764 return s 765 } 766 767 if maxBytes <= 0 { 768 return "" 769 } 770 771 for maxBytes > 0 && !utf8.RuneStart(s[maxBytes]) { 772 maxBytes-- 773 } 774 775 return s[:maxBytes] 776} 777 778func utf8SuffixBytes(s string, maxBytes int) string { 779 if len(s) <= maxBytes { 780 return s 781 } 782 783 if maxBytes <= 0 { 784 return "" 785 } 786 787 start := len(s) - maxBytes 788 for start < len(s) && !utf8.RuneStart(s[start]) { 789 start++ 790 } 791 792 return s[start:] 793} 794 795func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 796 ss := make([]string, len(branches)) 797 for i, b := range branches { 798 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 799 } 800 return sglog.Strings(key, ss) 801} 802 803func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 804 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0)) 805 return &indexArgs{ 806 IndexOptions: opts, 807 IndexDir: s.IndexDir, 808 Parallelism: parallelism, 809 Incremental: true, 810 ShardMerging: s.shardMerging, 811 } 812} 813 814// parallelism consults both the server flags and index options to determine the number 815// of shards to index in parallel. If the CPUCount index option is provided, it always 816// overrides the server flag. 817func (s *Server) parallelism(opts IndexOptions, maxProcs int) int { 818 var parallelism int 819 if opts.ShardConcurrency > 0 { 820 parallelism = int(opts.ShardConcurrency) 821 } else { 822 parallelism = s.CPUCount 823 } 824 825 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs 826 if parallelism > 4*maxProcs { 827 parallelism = 4 * maxProcs 828 } 829 830 // If index concurrency is set, then divide the parallelism by the number of 831 // repos we're indexing in parallel 832 if s.IndexConcurrency > 1 { 833 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency))) 834 } 835 836 return parallelism 837} 838 839func createEmptyShard(args *indexArgs) error { 840 bo := args.BuildOptions() 841 bo.SetDefaults() 842 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 843 844 if args.Incremental && bo.IncrementalSkipIndexing() { 845 return nil 846 } 847 848 builder, err := index.NewBuilder(*bo) 849 if err != nil { 850 return err 851 } 852 return builder.Finish() 853} 854 855// addDebugHandlers adds handlers specific to indexserver. 856func (s *Server) addDebugHandlers(mux *http.ServeMux) { 857 // Sourcegraph's site admin view requires indexserver to serve it's admin view 858 // on "/". 859 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 860 861 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 862 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 863 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 864 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 865 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 866 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 867} 868 869func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 870 if r.Method != "GET" { 871 w.Header().Set("Allow", "GET") 872 w.WriteHeader(http.StatusMethodNotAllowed) 873 return 874 } 875 876 response := struct { 877 Hostname string 878 }{ 879 Hostname: s.hostname, 880 } 881 882 b, err := json.Marshal(response) 883 if err != nil { 884 http.Error(w, err.Error(), http.StatusInternalServerError) 885 return 886 } 887 888 w.Header().Set("Content-Type", "application/json; charset=utf-8") 889 w.Write(b) 890} 891 892var rootTmpl = template.Must(template.New("name").Parse(` 893<html> 894 <body> 895 <a href="debug">Debug</a><br /> 896 <a href="debug/requests">Traces</a><br /> 897 {{.IndexMsg}}<br /> 898 <br /> 899 <h3>Reindex</h3> 900 {{if .Repos}} 901 <a href="?show_repos=false">hide repos</a><br /> 902 <table style="margin-top: 20px"> 903 <th style="text-align:left">Name</th> 904 <th style="text-align:left">ID (click to reindex)</th> 905 {{range .Repos}} 906 <tr> 907 <td>{{.Name}}</td> 908 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 909 </tr> 910 {{end}} 911 </table> 912 {{else}} 913 <a href="?show_repos=true">show repos</a><br /> 914 {{end}} 915 </body> 916</html> 917`)) 918 919func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 920 if r.Method != "GET" { 921 w.Header().Set("Allow", "GET") 922 w.WriteHeader(http.StatusMethodNotAllowed) 923 return 924 } 925 926 values := r.URL.Query() 927 928 // ?id= 929 indexMsg := "" 930 if v := values.Get("id"); v != "" { 931 id, err := strconv.Atoi(v) 932 if err != nil { 933 http.Error(w, err.Error(), http.StatusBadRequest) 934 return 935 } 936 indexMsg, _ = s.forceIndex(r.Context(), uint32(id)) 937 } 938 939 // ?show_repos= 940 showRepos := false 941 if v := values.Get("show_repos"); v != "" { 942 showRepos, _ = strconv.ParseBool(v) 943 } 944 945 type Repo struct { 946 ID uint32 947 Name string 948 } 949 var data struct { 950 Repos []Repo 951 IndexMsg string 952 } 953 954 data.IndexMsg = indexMsg 955 956 if showRepos { 957 s.queue.Iterate(func(opts *IndexOptions) { 958 data.Repos = append(data.Repos, Repo{ 959 ID: opts.RepoID, 960 Name: opts.Name, 961 }) 962 }) 963 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 964 } 965 966 _ = rootTmpl.Execute(w, data) 967} 968 969// handleReindex triggers a reindex asynocronously. If a reindex was triggered 970// the request returns with status 202. The caller can infer the new state of 971// the index by calling List. 972func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 973 if r.Method != http.MethodPost { 974 w.Header().Set("Allow", http.MethodPost) 975 w.WriteHeader(http.StatusMethodNotAllowed) 976 return 977 } 978 979 err := r.ParseForm() 980 if err != nil { 981 http.Error(w, err.Error(), http.StatusBadRequest) 982 return 983 } 984 985 id, err := strconv.Atoi(r.Form.Get("repo")) 986 if err != nil { 987 http.Error(w, err.Error(), http.StatusBadRequest) 988 return 989 } 990 991 go func() { s.forceIndex(context.Background(), uint32(id)) }() 992 993 // 202 Accepted 994 w.WriteHeader(http.StatusAccepted) 995} 996 997func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 998 withIndexed := true 999 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 1000 withIndexed = b 1001 } 1002 1003 var indexed []uint32 1004 if withIndexed { 1005 indexed = listIndexed(s.IndexDir) 1006 } 1007 1008 repos, err := s.Sourcegraph.List(r.Context(), indexed) 1009 if err != nil { 1010 http.Error(w, err.Error(), http.StatusInternalServerError) 1011 return 1012 } 1013 1014 bw := bytes.Buffer{} 1015 1016 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 1017 1018 _, err = fmt.Fprintf(tw, "ID\tName\n") 1019 if err != nil { 1020 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 1021 return 1022 } 1023 1024 s.queue.mu.Lock() 1025 name := "" 1026 for _, id := range repos.IDs { 1027 if item := s.queue.get(id); item != nil { 1028 name = item.opts.Name 1029 } else { 1030 name = "" 1031 } 1032 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 1033 if err != nil { 1034 debugLog.Printf("handleDebugList: %s\n", err.Error()) 1035 } 1036 } 1037 s.queue.mu.Unlock() 1038 1039 if err != nil { 1040 http.Error(w, err.Error(), http.StatusInternalServerError) 1041 return 1042 } 1043 1044 err = tw.Flush() 1045 if err != nil { 1046 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 1047 return 1048 } 1049 1050 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 1051 1052 if _, err := io.Copy(w, &bw); err != nil { 1053 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 1054 return 1055 } 1056} 1057 1058// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 1059// can run this command during periods of low usage (evenings, weekends) to 1060// trigger an initial merge run. In the steady-state, merges happen rarely, even 1061// on busy instances, and users can rely on automatic merging instead. 1062func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 1063 // A merge operation can take very long, depending on the number merges and the 1064 // target size of the compound shards. We run the merge in the background and 1065 // return immediately to the user. 1066 // 1067 // We track the status of the merge with metricShardMergingRunning. 1068 go func() { 1069 s.doMerge() 1070 }() 1071 _, _ = w.Write([]byte("merging enqueued\n")) 1072} 1073 1074func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 1075 indexed := listIndexed(s.IndexDir) 1076 1077 bw := bytes.Buffer{} 1078 1079 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 1080 1081 _, err := fmt.Fprintf(tw, "ID\tName\n") 1082 if err != nil { 1083 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 1084 return 1085 } 1086 1087 s.queue.mu.Lock() 1088 name := "" 1089 for _, id := range indexed { 1090 if item := s.queue.get(id); item != nil { 1091 name = item.opts.Name 1092 } else { 1093 name = "" 1094 } 1095 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 1096 if err != nil { 1097 debugLog.Printf("handleDebugIndexed: %s\n", err.Error()) 1098 } 1099 } 1100 s.queue.mu.Unlock() 1101 1102 if err != nil { 1103 http.Error(w, err.Error(), http.StatusInternalServerError) 1104 return 1105 } 1106 1107 err = tw.Flush() 1108 if err != nil { 1109 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 1110 return 1111 } 1112 1113 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 1114 1115 if _, err := io.Copy(w, &bw); err != nil { 1116 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 1117 return 1118 } 1119} 1120 1121// forceIndex will run the index job for repo name now. It will return always 1122// return a string explaining what it did, even if it failed. 1123func (s *Server) forceIndex(ctx context.Context, id uint32) (string, error) { 1124 var opts IndexOptions 1125 var err error 1126 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 1127 opts = o 1128 }, func(_ uint32, e error) { 1129 err = e 1130 }, id) 1131 if err != nil { 1132 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 1133 } 1134 1135 args := s.indexArgs(opts) 1136 args.Incremental = false // force re-index 1137 1138 var state indexState 1139 ran := s.muIndexDir.With(opts.Name, func() { 1140 state, err = s.index(ctx, args) 1141 }) 1142 if !ran { 1143 return fmt.Sprintf("index job for repository already running: %s", args), nil 1144 } 1145 if err != nil { 1146 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 1147 } 1148 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 1149} 1150 1151// DeleteAllData deletes all shards in the index and trash dir belonging to the 1152// tenant associated with the request. The deletion is best-effort, which means 1153// we will delete as much as possible. If no error is returned, the caller can 1154// be certain that all data has been deleted. 1155func (s *Server) DeleteAllData(ctx context.Context, _ *indexserverv1.DeleteAllDataRequest) (*indexserverv1.DeleteAllDataResponse, error) { 1156 tnt, err := tenant.FromContext(ctx) 1157 if err != nil { 1158 return nil, err 1159 } 1160 s.logger.Warn("DeleteAllData", sglog.Int("tenant_id", tnt.ID())) 1161 1162 var merr error 1163 s.muIndexDir.Global(func() { 1164 // First, explode all compound shards that have repos from the tenant in 1165 // question. Because we hold the global lock, we can be sure that no new 1166 // merges start while we do this. 1167 if err := s.explodeTenantCompoundShards(ctx, func(path string) error { 1168 // We call explode in a separate process to protect indexserver. 1169 cmd := defaultExplodeCmd(path) 1170 1171 stdoutBuf := &bytes.Buffer{} 1172 stderrBuf := &bytes.Buffer{} 1173 cmd.Stdout = stdoutBuf 1174 cmd.Stderr = stderrBuf 1175 1176 err := cmd.Run() 1177 if err != nil { 1178 errorLog.Printf("explode failed: %v (stderr: %s)", err, stderrBuf.String()) 1179 return err 1180 } 1181 1182 infoLog.Printf("exploded shard: %s", stdoutBuf.String()) 1183 1184 return nil 1185 }); err != nil { 1186 merr = multierr.Append(merr, err) 1187 } 1188 1189 // Invariant: all shards from the tenant are simple shards. 1190 1191 if err := purgeTenantShards(ctx, s.IndexDir); err != nil { 1192 merr = multierr.Append(merr, err) 1193 } 1194 if err := purgeTenantShards(ctx, filepath.Join(s.IndexDir, ".trash")); err != nil { 1195 merr = multierr.Append(merr, err) 1196 } 1197 }) 1198 1199 return &indexserverv1.DeleteAllDataResponse{}, merr 1200} 1201 1202func listIndexed(indexDir string) []uint32 { 1203 index := getShards(indexDir) 1204 metricNumIndexed.Set(float64(len(index))) 1205 repoIDs := make([]uint32, 0, len(index)) 1206 for id := range index { 1207 repoIDs = append(repoIDs, id) 1208 } 1209 slices.Sort(repoIDs) 1210 return repoIDs 1211} 1212 1213// setupTmpDir sets up a temporary directory on the same volume as the 1214// indexes. 1215// 1216// If main is true we will delete older temp directories left around. main is 1217// false when this is a debug command. 1218func setupTmpDir(logger sglog.Logger, main bool, index string) error { 1219 // change the target tmp directory depending on if it's our main daemon or a 1220 // debug sub command. 1221 dir := ".indexserver.debug.tmp" 1222 if main { 1223 dir = ".indexserver.tmp" 1224 } 1225 1226 tmpRoot := filepath.Join(index, dir) 1227 1228 if main { 1229 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1230 err := os.RemoveAll(tmpRoot) 1231 if err != nil { 1232 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1233 } 1234 } 1235 1236 if err := os.MkdirAll(tmpRoot, 0o755); err != nil { 1237 return err 1238 } 1239 1240 return os.Setenv("TMPDIR", tmpRoot) 1241} 1242 1243func printMetaData(fn string) error { 1244 repo, indexMeta, err := index.ReadMetadataPath(fn) 1245 if err != nil { 1246 return err 1247 } 1248 1249 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1250 if err != nil { 1251 return err 1252 } 1253 1254 err = json.NewEncoder(os.Stdout).Encode(repo) 1255 if err != nil { 1256 return err 1257 } 1258 return nil 1259} 1260 1261func printShardStats(fn string) error { 1262 f, err := os.Open(fn) 1263 if err != nil { 1264 return err 1265 } 1266 1267 iFile, err := index.NewIndexFile(f) 1268 if err != nil { 1269 return err 1270 } 1271 1272 return index.PrintNgramStats(iFile) 1273} 1274 1275func srcLogLevelIsDebug() bool { 1276 lvl := os.Getenv(sglog.EnvLogLevel) 1277 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1278} 1279 1280func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1281 v := os.Getenv(k) 1282 if v == "" { 1283 return defaultVal 1284 } 1285 b, err := strconv.ParseBool(v) 1286 if err != nil { 1287 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1288 } 1289 return b 1290} 1291 1292func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1293 v := os.Getenv(k) 1294 if v == "" { 1295 return defaultVal 1296 } 1297 i, err := strconv.ParseInt(v, 10, 64) 1298 if err != nil { 1299 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1300 } 1301 return i 1302} 1303 1304func getEnvWithDefaultInt(k string, defaultVal int) int { 1305 v := os.Getenv(k) 1306 if v == "" { 1307 return defaultVal 1308 } 1309 i, err := strconv.Atoi(k) 1310 if err != nil { 1311 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1312 } 1313 return i 1314} 1315 1316func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1317 v := os.Getenv(k) 1318 if v == "" { 1319 return defaultVal 1320 } 1321 i, err := strconv.ParseUint(v, 10, 64) 1322 if err != nil { 1323 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1324 } 1325 return i 1326} 1327 1328func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1329 v := os.Getenv(k) 1330 if v == "" { 1331 return defaultVal 1332 } 1333 f, err := strconv.ParseFloat(v, 64) 1334 if err != nil { 1335 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1336 } 1337 return f 1338} 1339 1340func getEnvWithDefaultString(k string, defaultVal string) string { 1341 v := os.Getenv(k) 1342 if v == "" { 1343 return defaultVal 1344 } 1345 return v 1346} 1347 1348func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1349 v := os.Getenv(k) 1350 if v == "" { 1351 return defaultVal 1352 } 1353 1354 d, err := time.ParseDuration(v) 1355 if err != nil { 1356 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1357 } 1358 return d 1359} 1360 1361func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1362 set := map[string]struct{}{} 1363 for _, v := range strings.Split(os.Getenv(k), ",") { 1364 v = strings.TrimSpace(v) 1365 if v != "" { 1366 set[v] = struct{}{} 1367 } 1368 } 1369 return set 1370} 1371 1372func joinStringSet(set map[string]struct{}, sep string) string { 1373 var xs []string 1374 for x := range set { 1375 xs = append(xs, x) 1376 } 1377 1378 return strings.Join(xs, sep) 1379} 1380 1381func setShardsCounter(indexDir string) { 1382 fns, err := filepath.Glob(filepath.Join(indexDir, "*.zoekt")) 1383 if err != nil { 1384 errorLog.Printf("setShardsCounter: %s\n", err) 1385 return 1386 } 1387 metricNumberShards.Set(float64(len(fns))) 1388 1389 compoundFns := make([]string, 0, len(fns)) 1390 for _, fn := range fns { 1391 if strings.HasPrefix(filepath.Base(fn), "compound-") { 1392 compoundFns = append(compoundFns, fn) 1393 } 1394 } 1395 metricNumberCompoundShards.Set(float64(len(compoundFns))) 1396} 1397 1398func rootCmd() *ffcli.Command { 1399 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1400 conf := rootConfig{ 1401 Main: true, 1402 } 1403 conf.registerRootFlags(rootFs) 1404 1405 return &ffcli.Command{ 1406 FlagSet: rootFs, 1407 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1408 Subcommands: []*ffcli.Command{debugCmd()}, 1409 Exec: func(ctx context.Context, args []string) error { 1410 return startServer(conf) 1411 }, 1412 } 1413} 1414 1415type rootConfig struct { 1416 // Main is true if this rootConfig is for our main long running command (the 1417 // indexserver). Debug commands should not set this value. This is used to 1418 // determine if we need to run tmpfriend. 1419 Main bool 1420 1421 root string 1422 interval time.Duration 1423 index string 1424 indexConcurrency int64 1425 listen string 1426 hostname string 1427 cpuFraction float64 1428 1429 // config values related to shard merging 1430 disableShardMerging bool 1431 vacuumInterval time.Duration 1432 mergeInterval time.Duration 1433 targetSize int64 1434 minSize int64 1435 minAgeDays int 1436 1437 // config values related to backoff indexing repos with one or more consecutive failures 1438 backoffDuration time.Duration 1439 maxBackoffDuration time.Duration 1440} 1441 1442func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1443 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1444 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1445 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently") 1446 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", index.DefaultDir), "set index directory to use") 1447 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1448 fs.StringVar(&rc.hostname, "hostname", index.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1449 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1450 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1451 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1452 1453 // flags related to shard merging 1454 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging") 1455 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1456 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1457 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB") 1458 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB") 1459 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1460} 1461 1462func startServer(conf rootConfig) error { 1463 s, err := newServer(conf) 1464 if err != nil { 1465 return err 1466 } 1467 1468 profiler.Init("zoekt-sourcegraph-indexserver") 1469 setShardsCounter(s.IndexDir) 1470 1471 if conf.listen != "" { 1472 1473 mux := http.NewServeMux() 1474 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1475 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1476 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1477 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1478 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1479 }...) 1480 s.addDebugHandlers(mux) 1481 1482 go func() { 1483 debugLog.Printf("serving HTTP on %s", conf.listen) 1484 mux := grpcutil.MultiplexGRPC(newGRPCServer(sglog.Scoped("indexserver"), s), mux) 1485 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1486 }() 1487 1488 // Serve mux on a unix domain socket on a best-effort-basis so that 1489 // webserver can call the endpoints via the shared filesystem. 1490 // 1491 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1492 // on the socket due to permission errors. See 1493 // https://github.com/docker/for-mac/issues/6239 1494 go func() { 1495 serveHTTPOverSocket := func() error { 1496 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1497 // We cannot bind a socket to an existing pathname. 1498 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1499 return fmt.Errorf("error removing socket file: %s", socket) 1500 } 1501 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1502 // unixpacket). 1503 l, err := net.Listen("unix", socket) 1504 if err != nil { 1505 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1506 } 1507 // Indexserver (root) and webserver (Sourcegraph) run with 1508 // different users. Per default, the socket is created with 1509 // permission 755 (root root), which doesn't let webserver write to 1510 // it. 1511 // 1512 // See https://github.com/golang/go/issues/11822 for more context. 1513 if err := os.Chmod(socket, 0o777); err != nil { 1514 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1515 } 1516 debugLog.Printf("serving HTTP on %s", socket) 1517 return http.Serve(l, mux) 1518 } 1519 debugLog.Print(serveHTTPOverSocket()) 1520 }() 1521 } 1522 1523 oc := &ownerChecker{ 1524 Path: filepath.Join(conf.index, "owner.txt"), 1525 Hostname: conf.hostname, 1526 } 1527 go oc.Run() 1528 1529 logger := sglog.Scoped("metricsRegistration") 1530 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1531 1532 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1533 prometheus.DefaultRegisterer.MustRegister(c) 1534 1535 s.Run() 1536 return nil 1537} 1538 1539func newServer(conf rootConfig) (*Server, error) { 1540 logger := sglog.Scoped("server") 1541 1542 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1543 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1544 } 1545 if conf.index == "" { 1546 return nil, fmt.Errorf("must set -index") 1547 } 1548 if conf.root == "" { 1549 return nil, fmt.Errorf("must set -sourcegraph_url") 1550 } 1551 rootURL, err := url.Parse(conf.root) 1552 if err != nil { 1553 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1554 } 1555 1556 rootURL = addDefaultPort(rootURL) 1557 1558 // Tune GOMAXPROCS to match Linux container CPU quota. 1559 _, _ = maxprocs.Set() 1560 1561 // Automatically prepend our own path at the front, to minimize 1562 // required configuration. 1563 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1564 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1565 } 1566 1567 if _, err := os.Stat(conf.index); err != nil { 1568 if err := os.MkdirAll(conf.index, 0o755); err != nil { 1569 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1570 } 1571 } 1572 1573 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1574 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1575 } 1576 1577 if srcLogLevelIsDebug() { 1578 debugLog.SetOutput(os.Stderr) 1579 } 1580 1581 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1582 if len(reposWithSeparateIndexingMetrics) > 0 { 1583 debugLog.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1584 } 1585 1586 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1587 if len(deltaBuildRepositoriesAllowList) > 0 { 1588 debugLog.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1589 } 1590 1591 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1592 if deltaShardNumberFallbackThreshold > 0 { 1593 debugLog.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1594 } else { 1595 debugLog.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1596 } 1597 1598 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1599 if len(reposShouldSkipSymbolsCalculation) > 0 { 1600 debugLog.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1601 } 1602 1603 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1604 if indexingTimeout != defaultIndexingTimeout { 1605 debugLog.Printf("using configured indexing timeout: %s", indexingTimeout) 1606 } 1607 1608 var sg Sourcegraph 1609 if rootURL.IsAbs() { 1610 var batchSize int 1611 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1612 batchSize, err = strconv.Atoi(v) 1613 if err != nil { 1614 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1615 } 1616 } 1617 1618 opts := []SourcegraphClientOption{ 1619 WithBatchSize(batchSize), 1620 } 1621 1622 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1623 client, err := dialGRPCClient(rootURL.Host, logger) 1624 if err != nil { 1625 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1626 } 1627 1628 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...) 1629 1630 } else { 1631 sg = sourcegraphFake{ 1632 RootDir: rootURL.String(), 1633 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1634 } 1635 } 1636 1637 cpuCount := max(int(math.Round(float64(runtime.GOMAXPROCS(0))*(conf.cpuFraction))), 1) 1638 1639 if conf.indexConcurrency < 1 { 1640 conf.indexConcurrency = 1 1641 } else if conf.indexConcurrency > int64(cpuCount) { 1642 conf.indexConcurrency = int64(cpuCount) 1643 } 1644 1645 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1646 1647 return &Server{ 1648 logger: logger, 1649 rootURL: rootURL, 1650 Sourcegraph: sg, 1651 IndexDir: conf.index, 1652 IndexConcurrency: int(conf.indexConcurrency), 1653 Interval: conf.interval, 1654 CPUCount: cpuCount, 1655 queue: *q, 1656 shardMerging: !conf.disableShardMerging, 1657 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1658 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1659 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1660 hostname: conf.hostname, 1661 mergeOpts: mergeOpts{ 1662 vacuumInterval: conf.vacuumInterval, 1663 mergeInterval: conf.mergeInterval, 1664 targetSizeBytes: conf.targetSize * 1024 * 1024, 1665 minSizeBytes: conf.minSize * 1024 * 1024, 1666 minAgeDays: conf.minAgeDays, 1667 }, 1668 timeout: indexingTimeout, 1669 indexSemaphore: make(chan struct{}, int(conf.indexConcurrency)), 1670 }, err 1671} 1672 1673func newGRPCServer(logger sglog.Logger, s *Server, additionalOpts ...grpc.ServerOption) *grpc.Server { 1674 grpcServer := defaults.NewServer(logger, additionalOpts...) 1675 indexserverv1.RegisterSourcegraphIndexserverServiceServer(grpcServer, s) 1676 return grpcServer 1677} 1678 1679// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1680// for the indexed-search-configuration gRPC service. 1681// 1682// The default backoff strategy is modeled after the default settings used by 1683// retryablehttp.DefaultClient. 1684// 1685// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1686// - Unavailable 1687// - Aborted 1688// 1689//go:embed default_grpc_service_configuration.json 1690var defaultGRPCServiceConfigurationJSON string 1691 1692func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1693 return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1694 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1695 return invoker(ctx, method, req, reply, cc, opts...) 1696 } 1697} 1698 1699func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1700 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1701 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1702 return streamer(ctx, desc, cc, method, opts...) 1703 } 1704} 1705 1706// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1707// This can be overridden by providing custom Server/Dial options. 1708const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1709 1710func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (configv1.ZoektConfigurationServiceClient, error) { 1711 metrics := clientMetricsOnce() 1712 1713 // If the service seems to be unavailable, this 1714 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1715 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1716 retryOpts := []retry.CallOption{ 1717 retry.WithMax(5), 1718 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1719 retry.WithCodes(codes.Unavailable), 1720 } 1721 1722 opts := []grpc.DialOption{ 1723 grpc.WithTransportCredentials(insecure.NewCredentials()), 1724 grpc.WithChainStreamInterceptor( 1725 metrics.StreamClientInterceptor(), 1726 messagesize.StreamClientInterceptor, 1727 internalActorStreamInterceptor(), 1728 internalerrs.LoggingStreamClientInterceptor(logger), 1729 internalerrs.PrometheusStreamClientInterceptor, 1730 retry.StreamClientInterceptor(retryOpts...), 1731 ), 1732 grpc.WithChainUnaryInterceptor( 1733 metrics.UnaryClientInterceptor(), 1734 messagesize.UnaryClientInterceptor, 1735 internalActorUnaryInterceptor(), 1736 internalerrs.LoggingUnaryClientInterceptor(logger), 1737 internalerrs.PrometheusUnaryClientInterceptor, 1738 retry.UnaryClientInterceptor(retryOpts...), 1739 ), 1740 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1741 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1742 } 1743 1744 opts = append(opts, additionalOpts...) 1745 1746 // Ensure that the message size options are set last, so they override any other 1747 // client-specific options that tweak the message size. 1748 // 1749 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1750 // take precedence over everything else with a uniform size setting that's easy to reason about. 1751 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1752 1753 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1754 // This is done lazily, so we can provide the client to use regardless of 1755 // whether we enabled gRPC or not initially. 1756 cc, err := grpc.Dial(addr, opts...) 1757 if err != nil { 1758 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1759 } 1760 1761 client := configv1.NewZoektConfigurationServiceClient(cc) 1762 return client, nil 1763} 1764 1765// addDefaultPort adds a default port to a URL if one is not specified. 1766// 1767// If the URL scheme is "http" and no port is specified, "80" is used. 1768// If the scheme is "https", "443" is used. 1769// 1770// The original URL is not mutated. A copy is modified and returned. 1771func addDefaultPort(original *url.URL) *url.URL { 1772 if original == nil { 1773 return nil // don't panic 1774 } 1775 1776 if !original.IsAbs() { 1777 return original // don't do anything if the URL doesn't look like a remote URL 1778 } 1779 1780 if original.Scheme == "http" && original.Port() == "" { 1781 u := cloneURL(original) 1782 u.Host = net.JoinHostPort(u.Host, "80") 1783 return u 1784 } 1785 1786 if original.Scheme == "https" && original.Port() == "" { 1787 u := cloneURL(original) 1788 u.Host = net.JoinHostPort(u.Host, "443") 1789 return u 1790 } 1791 1792 return original 1793} 1794 1795// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1796// This is copied from net/http/clone.go 1797func cloneURL(u *url.URL) *url.URL { 1798 if u == nil { 1799 return nil 1800 } 1801 u2 := new(url.URL) 1802 *u2 = *u 1803 if u.User != nil { 1804 u2.User = new(url.Userinfo) 1805 *u2.User = *u.User 1806 } 1807 return u2 1808} 1809 1810func main() { 1811 liblog := sglog.Init(sglog.Resource{ 1812 Name: "zoekt-indexserver", 1813 Version: index.Version, 1814 InstanceID: index.HostnameBestEffort(), 1815 }) 1816 defer liblog.Sync() 1817 1818 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1819 log.Fatal(err) 1820 } 1821} 1822 1823// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1824// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1825// 1826// An error is returned of the provided environment variables fails to parse as a boolean. 1827func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1828 for _, envVar := range envVarNames { 1829 v := os.Getenv(envVar) 1830 if v == "" { 1831 continue 1832 } 1833 1834 b, err := strconv.ParseBool(v) 1835 if err != nil { 1836 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1837 } 1838 1839 return b, nil 1840 } 1841 1842 return defaultBool, nil 1843}