fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Licensed under the Apache License, Version 2.0 (the "License"); 2// you may not use this file except in compliance with the License. 3// You may obtain a copy of the License at 4// 5// http://www.apache.org/licenses/LICENSE-2.0 6// 7// Unless required by applicable law or agreed to in writing, software 8// distributed under the License is distributed on an "AS IS" BASIS, 9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 10// See the License for the specific language governing permissions and 11// limitations under the License. 12 13// Command zoekt-sourcegraph-indexserver periodically reindexes repositories 14// from a Sourcegraph instance. It uses a "pull-based" design, where it periodically 15// reaches out to the Sourcegraph instance for the list of repositories to reindex. 16package main 17 18import ( 19 "bytes" 20 "context" 21 _ "embed" 22 "encoding/json" 23 "errors" 24 "flag" 25 "fmt" 26 "html/template" 27 "io" 28 "io/fs" 29 "log" 30 "math" 31 "math/rand" 32 "net" 33 "net/http" 34 "net/url" 35 "os" 36 "os/exec" 37 "os/signal" 38 "path" 39 "path/filepath" 40 "runtime" 41 "slices" 42 "sort" 43 "strconv" 44 "strings" 45 "sync" 46 "text/tabwriter" 47 "time" 48 49 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 50 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 51 "github.com/peterbourgon/ff/v3/ffcli" 52 "github.com/prometheus/client_golang/prometheus" 53 "github.com/prometheus/client_golang/prometheus/promauto" 54 sglog "github.com/sourcegraph/log" 55 "github.com/sourcegraph/mountinfo" 56 57 "github.com/sourcegraph/zoekt" 58 configv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/sourcegraph/zoekt/configuration/v1" 59 indexserverv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/zoekt/indexserver/v1" 60 "github.com/sourcegraph/zoekt/grpc/defaults" 61 "github.com/sourcegraph/zoekt/grpc/grpcutil" 62 "github.com/sourcegraph/zoekt/grpc/internalerrs" 63 "github.com/sourcegraph/zoekt/grpc/messagesize" 64 "github.com/sourcegraph/zoekt/index" 65 "github.com/sourcegraph/zoekt/internal/debugserver" 66 "github.com/sourcegraph/zoekt/internal/profiler" 67 "github.com/sourcegraph/zoekt/internal/tenant" 68 69 "go.uber.org/automaxprocs/maxprocs" 70 "go.uber.org/multierr" 71 "golang.org/x/net/trace" 72 "golang.org/x/sys/unix" 73 "google.golang.org/grpc" 74 "google.golang.org/grpc/codes" 75 "google.golang.org/grpc/credentials/insecure" 76 "google.golang.org/grpc/metadata" 77 "google.golang.org/grpc/status" 78) 79 80var ( 81 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 82 Name: "resolve_revisions_seconds", 83 Help: "A histogram of latencies for resolving all repository revisions.", 84 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 85 }) 86 87 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 88 Name: "resolve_revision_seconds", 89 Help: "A histogram of latencies for resolving a repository revision.", 90 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 91 }, []string{"success"}) // success=true|false 92 93 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 94 Name: "get_index_options_total", 95 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 96 }) 97 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 98 Name: "get_index_options_error_total", 99 Help: "The total number of times we failed to get index options for a repository.", 100 }) 101 102 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 103 Name: "index_repo_seconds", 104 Help: "A histogram of latencies for indexing a repository.", 105 Buckets: prometheus.ExponentialBucketsRange( 106 (100 * time.Millisecond).Seconds(), 107 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 108 20), 109 }, []string{ 110 "state", // state is an indexState 111 "name", // name of the repository that was indexed 112 }) 113 114 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{ 115 Name: "index_indexing_delay_seconds", 116 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.", 117 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days 118 }, []string{ 119 "state", // state is an indexState 120 "name", // the name of the repository that was indexed 121 }) 122 123 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 124 Name: "index_fetch_seconds", 125 Help: "A histogram of latencies for fetching a repository.", 126 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 127 }, []string{ 128 "success", // true|false 129 "name", // the name of the repository that the commits were fetched from 130 }) 131 132 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 133 Name: "index_incremental_index_state", 134 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 135 }, []string{"state"}) // state is index.IndexState 136 137 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 138 Name: "index_num_indexed", 139 Help: "Number of indexed repos by code host", 140 }) 141 142 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 143 Name: "index_failing_total", 144 Help: "Counts failures to index (indexing activity, should be used with rate())", 145 }) 146 147 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 148 Name: "index_indexing_total", 149 Help: "Counts indexings (indexing activity, should be used with rate())", 150 }) 151 152 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 153 Name: "index_num_stopped_tracking_total", 154 Help: "Counts the number of repos we stopped tracking.", 155 }) 156 157 // clientMetricsOnce returns a singleton instance of the client metrics 158 // that are shared across all gRPC clients that this process creates. 159 // 160 // This function panics if the metrics cannot be registered with the default 161 // Prometheus registry. 162 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics { 163 clientMetrics := grpcprom.NewClientMetrics( 164 grpcprom.WithClientCounterOptions(), 165 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 166 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 167 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 168 ) 169 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 170 return clientMetrics 171 }) 172) 173 174// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 175// NOTE: if you change this, you must also update gitIndex to use the same value when fetching the repo. 176const MaxFileSize = 1 << 20 177 178// set of repositories that we want to capture separate indexing metrics for 179var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 180 181type indexState string 182 183const ( 184 indexStateFail indexState = "fail" 185 indexStateSuccess indexState = "success" 186 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 187 indexStateNoop indexState = "noop" // We didn't need to update index 188 indexStateEmpty indexState = "empty" // index is empty (empty repo) 189) 190 191// Server is the main functionality of zoekt-sourcegraph-indexserver. It 192// exists to conveniently use all the options passed in via func main. 193type Server struct { 194 logger sglog.Logger 195 196 Sourcegraph Sourcegraph 197 198 // rootURL is the root URL of the Sourcegraph instance. 199 rootURL *url.URL 200 201 BatchSize int 202 203 // IndexDir is the index directory to use. 204 IndexDir string 205 206 // IndexConcurrency is the number of repositories we index at once. 207 IndexConcurrency int 208 209 // indexSemaphore limits the number of concurrent index operations 210 indexSemaphore chan struct{} 211 212 // Interval is how often we sync with Sourcegraph. 213 Interval time.Duration 214 215 // CPUCount is the number of CPUs to use for indexing shards. 216 CPUCount int 217 218 queue Queue 219 220 // muIndexDir protects the index directory from concurrent access. 221 muIndexDir indexMutex 222 223 // If true, shard merging is enabled. 224 shardMerging bool 225 226 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 227 // use delta-builds for instead of normal builds 228 deltaBuildRepositoriesAllowList map[string]struct{} 229 230 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 231 // before attempting a delta build. 232 deltaShardNumberFallbackThreshold uint64 233 234 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 235 // we skip calculating symbols metadata for during builds 236 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 237 238 hostname string 239 240 mergeOpts mergeOpts 241 242 // timeout defines how long the index server waits before killing an indexing job. 243 timeout time.Duration 244 245 indexserverv1.UnimplementedSourcegraphIndexserverServiceServer 246} 247 248var ( 249 debugLog = log.New(io.Discard, "[DEBUG] ", log.LstdFlags) 250 infoLog = log.New(os.Stderr, "[INFO] ", log.LstdFlags) 251 errorLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags) 252) 253 254// our index commands should output something every 100mb they process. 255// 256// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 257// time." famous last words. A client was indexing a monorepo with 42 258// cores... 5m was not enough. 259const noOutputTimeout = 30 * time.Minute 260 261func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 262 out := &synchronizedBuffer{} 263 cmd.Stdout = out 264 cmd.Stderr = out 265 266 tr.LazyPrintf("%s", cmd.Args) 267 268 defer func() { 269 if err != nil { 270 outS := out.String() 271 tr.LazyPrintf("failed: %v", err) 272 tr.LazyPrintf("output: %s", out) 273 tr.SetError() 274 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 275 } 276 }() 277 278 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 279 280 if err := cmd.Start(); err != nil { 281 return err 282 } 283 284 errC := make(chan error) 285 go func() { 286 errC <- cmd.Wait() 287 }() 288 289 // This channel is set after we have sent sigquit. It allows us to follow up 290 // with a sigkill if the process doesn't quit after sigquit. 291 kill := make(<-chan time.Time) 292 293 lastLen := 0 294 for { 295 select { 296 case <-time.After(noOutputTimeout): 297 // Periodically check if we have had output. If not kill the process. 298 if out.Len() != lastLen { 299 lastLen = out.Len() 300 infoLog.Printf("still running %s", cmd.Args) 301 } else { 302 // Send quit (C-\) first so we get a stack dump. 303 infoLog.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 304 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 305 errorLog.Println("quit failed:", err) 306 } 307 308 // send sigkill if still running in 10s 309 kill = time.After(10 * time.Second) 310 } 311 312 case <-kill: 313 infoLog.Printf("still running, killing %s", cmd.Args) 314 if err := cmd.Process.Kill(); err != nil { 315 errorLog.Println("kill failed:", err) 316 } 317 318 case err := <-errC: 319 if err != nil { 320 return err 321 } 322 323 tr.LazyPrintf("success") 324 return nil 325 } 326 } 327} 328 329// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 330// monitor the buffer while it is being written to. 331type synchronizedBuffer struct { 332 mu sync.Mutex 333 b bytes.Buffer 334} 335 336func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 337 sb.mu.Lock() 338 defer sb.mu.Unlock() 339 return sb.b.Write(p) 340} 341 342func (sb *synchronizedBuffer) Len() int { 343 sb.mu.Lock() 344 defer sb.mu.Unlock() 345 return sb.b.Len() 346} 347 348func (sb *synchronizedBuffer) String() string { 349 sb.mu.Lock() 350 defer sb.mu.Unlock() 351 return sb.b.String() 352} 353 354// pauseFileName if present in IndexDir will stop index jobs from 355// running. This is to make it possible to experiment with the content of the 356// IndexDir without the indexserver writing to it. 357const ( 358 pauseFileName = "PAUSE" 359 pauseEnvVar = "SRC_PAUSE_INDEXING" 360) 361 362// isIndexingPaused checks if indexing should be paused based on either: 363// 1. The presence of a PAUSE file in the index directory 364// 2. The SRC_PAUSE_INDEXING environment variable being set to true 365func isIndexingPaused(indexDir string) (bool, string) { 366 // Check for PAUSE file first 367 if b, err := os.ReadFile(filepath.Join(indexDir, pauseFileName)); err == nil { 368 return true, fmt.Sprintf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 369 } 370 371 // Then check environment variable 372 if getEnvWithDefaultBool(pauseEnvVar, false) { 373 return true, fmt.Sprintf("indexserver paused via %s environment variable", pauseEnvVar) 374 } 375 376 return false, "" 377} 378 379// Run the sync loop. This blocks forever. 380func (s *Server) Run() { 381 removeIncompleteShards(s.IndexDir) 382 383 // Start a goroutine which updates the queue with commits to index. 384 go func() { 385 // We update the list of indexed repos every Interval. To speed up manual 386 // testing we also listen for SIGUSR1 to trigger updates. 387 // "pkill -SIGUSR1 zoekt-sourcegra" 388 for range jitterTicker(s.Interval, unix.SIGUSR1) { 389 if paused, msg := isIndexingPaused(s.IndexDir); paused { 390 infoLog.Printf(msg) 391 continue 392 } 393 394 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 395 if err != nil { 396 errorLog.Printf("error listing repos: %s", err) 397 continue 398 } 399 400 debugLog.Printf("updating index queue with %d repositories", len(repos.IDs)) 401 402 // Stop indexing repos we don't need to track anymore 403 removed := s.queue.MaybeRemoveMissing(repos.IDs) 404 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 405 if len(removed) > 0 { 406 infoLog.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 407 } 408 409 cleanupDone := make(chan struct{}) 410 go func() { 411 defer close(cleanupDone) 412 s.muIndexDir.Global(func() { 413 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 414 }) 415 }() 416 417 repos.IterateIndexOptions(s.queue.AddOrUpdate) 418 419 // IterateIndexOptions will only iterate over repositories that have 420 // changed since we last called list. However, we want to add all IDs 421 // back onto the queue just to check that what is on disk is still 422 // correct. This will use the last IndexOptions we stored in the 423 // queue. The repositories not on the queue (missing) need a forced 424 // fetch of IndexOptions. 425 missing := s.queue.Bump(repos.IDs) 426 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 427 428 setShardsCounter(s.IndexDir) 429 430 <-cleanupDone 431 } 432 }() 433 434 go func() { 435 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 436 if s.shardMerging { 437 s.vacuum() 438 } 439 } 440 }() 441 442 go func() { 443 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 444 if s.shardMerging { 445 s.doMerge() 446 } 447 } 448 }() 449 450 for range s.IndexConcurrency { 451 go s.processQueue() 452 } 453 454 // block forever 455 select {} 456} 457 458// formatList returns a comma-separated list of the first min(len(v), m) items. 459func formatListUint32(v []uint32, m int) string { 460 if len(v) < m { 461 m = len(v) 462 } 463 464 sb := strings.Builder{} 465 for i := range m { 466 fmt.Fprintf(&sb, "%d, ", v[i]) 467 } 468 469 if len(v) > m { 470 sb.WriteString("...") 471 } 472 473 return strings.TrimRight(sb.String(), ", ") 474} 475 476func (s *Server) processQueue() { 477 for { 478 if paused, _ := isIndexingPaused(s.IndexDir); paused { 479 time.Sleep(time.Second) 480 continue 481 } 482 483 item, ok := s.queue.Pop() 484 if !ok { 485 time.Sleep(time.Second) 486 continue 487 } 488 489 opts := item.Opts 490 args := s.indexArgs(opts) 491 492 ran := s.muIndexDir.With(opts.Name, func() { 493 // only record time taken once we hold the lock. This avoids us 494 // recording time taken while merging/cleanup runs. 495 start := time.Now() 496 497 state, err := s.index(context.Background(), args) 498 499 elapsed := time.Since(start) 500 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 501 502 indexDelay := time.Since(item.DateAddedToQueue) 503 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds()) 504 505 if err != nil { 506 errorLog.Printf("error indexing %s: %s", args.String(), err) 507 } 508 509 switch state { 510 case indexStateSuccess: 511 var branches []string 512 for _, b := range args.Branches { 513 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 514 } 515 s.logger.Info("updated index", 516 sglog.Int("tenant", args.TenantID), 517 sglog.String("repo", args.Name), 518 sglog.Uint32("id", args.RepoID), 519 sglog.Strings("branches", branches), 520 sglog.Duration("duration", elapsed), 521 sglog.Duration("index_delay", indexDelay), 522 ) 523 case indexStateSuccessMeta: 524 infoLog.Printf("updated meta %s in %v", args.String(), elapsed) 525 } 526 s.queue.SetIndexed(opts, state) 527 }) 528 529 if !ran { 530 // Someone else is processing the repository. We can just skip this job 531 // since the repository will be added back to the queue and we will 532 // converge to the correct behaviour. 533 debugLog.Printf("index job for repository already running: %s", args) 534 continue 535 } 536 } 537} 538 539// repoNameForMetric returns a normalized version of the given repository name that is 540// suitable for use with Prometheus metrics. 541func repoNameForMetric(repo string) string { 542 // Check to see if we want to be able to capture separate indexing metrics for this repository. 543 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 544 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 545 return repo 546 } 547 548 return "" 549} 550 551func batched(slice []uint32, size int) <-chan []uint32 { 552 c := make(chan []uint32) 553 go func() { 554 for len(slice) > 0 { 555 if size > len(slice) { 556 size = len(slice) 557 } 558 c <- slice[:size] 559 slice = slice[size:] 560 } 561 close(c) 562 }() 563 return c 564} 565 566// jitterTicker returns a ticker which ticks with a jitter. Each tick is 567// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 568// 569// sig is a list of signals which also cause the ticker to fire. This is a 570// convenience to allow manually triggering of the ticker. 571func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 572 ticker := make(chan struct{}) 573 574 go func() { 575 for { 576 ticker <- struct{}{} 577 ns := int64(d) 578 jitter := rand.Int63n(ns) 579 time.Sleep(time.Duration(ns/2 + jitter)) 580 } 581 }() 582 583 go func() { 584 if len(sig) == 0 { 585 return 586 } 587 588 c := make(chan os.Signal, 1) 589 signal.Notify(c, sig...) 590 for range c { 591 ticker <- struct{}{} 592 } 593 }() 594 595 return ticker 596} 597 598// Index starts an index job for repo name at commit. 599func (s *Server) index(ctx context.Context, args *indexArgs) (state indexState, err error) { 600 tr := trace.New("index", args.Name) 601 tr.SetMaxEvents(30) // Ensure we capture all indexing events 602 603 defer func() { 604 if err != nil { 605 tr.SetError() 606 tr.LazyPrintf("error: %v", err) 607 state = indexStateFail 608 metricFailingTotal.Inc() 609 } 610 tr.LazyPrintf("state: %s", state) 611 tr.Finish() 612 }() 613 614 // Sourcegraph should always provide a tenant ID. 615 if args.TenantID < 1 { 616 return indexStateFail, tenant.ErrMissingTenant 617 } 618 619 tr.LazyPrintf("branches: %v", args.Branches) 620 621 if len(args.Branches) == 0 { 622 return indexStateEmpty, createEmptyShard(args) 623 } 624 625 repositoryName := args.Name 626 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 627 tr.LazyPrintf("marking this repository for delta build") 628 args.UseDelta = true 629 } 630 631 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 632 633 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 634 tr.LazyPrintf("skipping symbols calculation") 635 args.Symbols = false 636 } 637 638 reason := "forced" 639 640 if args.Incremental { 641 bo := args.BuildOptions() 642 bo.SetDefaults() 643 incrementalState, fn := bo.IndexState() 644 reason = string(incrementalState) 645 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 646 647 switch incrementalState { 648 case index.IndexStateEqual: 649 debugLog.Printf("%s index already up to date. Shard=%s", args.String(), fn) 650 return indexStateNoop, nil 651 652 case index.IndexStateMeta: 653 infoLog.Printf("updating index.meta %s", args.String()) 654 655 // TODO(stefan) handle mergeMeta for tenant id. 656 if err := mergeMeta(bo); err != nil { 657 errorLog.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 658 } else { 659 return indexStateSuccessMeta, nil 660 } 661 662 case index.IndexStateCorrupt: 663 infoLog.Printf("falling back to full update: corrupt index: %s", args.String()) 664 } 665 } 666 667 infoLog.Printf("updating index %s reason=%s", args.String(), reason) 668 669 metricIndexingTotal.Inc() 670 c := gitIndexConfig{ 671 runCmd: func(cmd *exec.Cmd) error { 672 return s.loggedRun(tr, cmd) 673 }, 674 675 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 676 return args.BuildOptions().FindRepositoryMetadata() 677 }, 678 timeout: s.timeout, 679 } 680 681 err = gitIndex(ctx, c, args, s.Sourcegraph, s.logger) 682 if err != nil { 683 return indexStateFail, err 684 } 685 686 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 687 s.logger.Error("failed to update index status", 688 sglog.String("repo", args.Name), 689 sglog.Uint32("id", args.RepoID), 690 sglogBranches("branches", args.Branches), 691 sglog.Error(err), 692 ) 693 } 694 695 return indexStateSuccess, nil 696} 697 698// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 699// it can update the zoekt_repos table. 700func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 701 // We need to read from disk for IndexTime. 702 _, metadata, ok, err := c.findRepositoryMetadata(args) 703 if err != nil { 704 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 705 } 706 if !ok { 707 return errors.New("failed to find metadata for new/updated index") 708 } 709 710 status := []indexStatus{{ 711 RepoID: args.RepoID, 712 Branches: args.Branches, 713 IndexTimeUnix: metadata.IndexTime.Unix(), 714 }} 715 if err := sg.UpdateIndexStatus(status); err != nil { 716 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 717 } 718 719 return nil 720} 721 722func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 723 ss := make([]string, len(branches)) 724 for i, b := range branches { 725 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 726 } 727 return sglog.Strings(key, ss) 728} 729 730func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 731 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0)) 732 return &indexArgs{ 733 IndexOptions: opts, 734 IndexDir: s.IndexDir, 735 Parallelism: parallelism, 736 Incremental: true, 737 FileLimit: MaxFileSize, 738 ShardMerging: s.shardMerging, 739 } 740} 741 742// parallelism consults both the server flags and index options to determine the number 743// of shards to index in parallel. If the CPUCount index option is provided, it always 744// overrides the server flag. 745func (s *Server) parallelism(opts IndexOptions, maxProcs int) int { 746 var parallelism int 747 if opts.ShardConcurrency > 0 { 748 parallelism = int(opts.ShardConcurrency) 749 } else { 750 parallelism = s.CPUCount 751 } 752 753 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs 754 if parallelism > 4*maxProcs { 755 parallelism = 4 * maxProcs 756 } 757 758 // If index concurrency is set, then divide the parallelism by the number of 759 // repos we're indexing in parallel 760 if s.IndexConcurrency > 1 { 761 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency))) 762 } 763 764 return parallelism 765} 766 767func createEmptyShard(args *indexArgs) error { 768 bo := args.BuildOptions() 769 bo.SetDefaults() 770 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 771 772 if args.Incremental && bo.IncrementalSkipIndexing() { 773 return nil 774 } 775 776 builder, err := index.NewBuilder(*bo) 777 if err != nil { 778 return err 779 } 780 return builder.Finish() 781} 782 783// addDebugHandlers adds handlers specific to indexserver. 784func (s *Server) addDebugHandlers(mux *http.ServeMux) { 785 // Sourcegraph's site admin view requires indexserver to serve it's admin view 786 // on "/". 787 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 788 789 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 790 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 791 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 792 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 793 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 794 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 795} 796 797func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 798 if r.Method != "GET" { 799 w.Header().Set("Allow", "GET") 800 w.WriteHeader(http.StatusMethodNotAllowed) 801 return 802 } 803 804 response := struct { 805 Hostname string 806 }{ 807 Hostname: s.hostname, 808 } 809 810 b, err := json.Marshal(response) 811 if err != nil { 812 http.Error(w, err.Error(), http.StatusInternalServerError) 813 return 814 } 815 816 w.Header().Set("Content-Type", "application/json; charset=utf-8") 817 w.Write(b) 818} 819 820var rootTmpl = template.Must(template.New("name").Parse(` 821<html> 822 <body> 823 <a href="debug">Debug</a><br /> 824 <a href="debug/requests">Traces</a><br /> 825 {{.IndexMsg}}<br /> 826 <br /> 827 <h3>Reindex</h3> 828 {{if .Repos}} 829 <a href="?show_repos=false">hide repos</a><br /> 830 <table style="margin-top: 20px"> 831 <th style="text-align:left">Name</th> 832 <th style="text-align:left">ID (click to reindex)</th> 833 {{range .Repos}} 834 <tr> 835 <td>{{.Name}}</td> 836 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 837 </tr> 838 {{end}} 839 </table> 840 {{else}} 841 <a href="?show_repos=true">show repos</a><br /> 842 {{end}} 843 </body> 844</html> 845`)) 846 847func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 848 if r.Method != "GET" { 849 w.Header().Set("Allow", "GET") 850 w.WriteHeader(http.StatusMethodNotAllowed) 851 return 852 } 853 854 values := r.URL.Query() 855 856 // ?id= 857 indexMsg := "" 858 if v := values.Get("id"); v != "" { 859 id, err := strconv.Atoi(v) 860 if err != nil { 861 http.Error(w, err.Error(), http.StatusBadRequest) 862 return 863 } 864 indexMsg, _ = s.forceIndex(r.Context(), uint32(id)) 865 } 866 867 // ?show_repos= 868 showRepos := false 869 if v := values.Get("show_repos"); v != "" { 870 showRepos, _ = strconv.ParseBool(v) 871 } 872 873 type Repo struct { 874 ID uint32 875 Name string 876 } 877 var data struct { 878 Repos []Repo 879 IndexMsg string 880 } 881 882 data.IndexMsg = indexMsg 883 884 if showRepos { 885 s.queue.Iterate(func(opts *IndexOptions) { 886 data.Repos = append(data.Repos, Repo{ 887 ID: opts.RepoID, 888 Name: opts.Name, 889 }) 890 }) 891 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 892 } 893 894 _ = rootTmpl.Execute(w, data) 895} 896 897// handleReindex triggers a reindex asynocronously. If a reindex was triggered 898// the request returns with status 202. The caller can infer the new state of 899// the index by calling List. 900func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 901 if r.Method != http.MethodPost { 902 w.Header().Set("Allow", http.MethodPost) 903 w.WriteHeader(http.StatusMethodNotAllowed) 904 return 905 } 906 907 err := r.ParseForm() 908 if err != nil { 909 http.Error(w, err.Error(), http.StatusBadRequest) 910 return 911 } 912 913 id, err := strconv.Atoi(r.Form.Get("repo")) 914 if err != nil { 915 http.Error(w, err.Error(), http.StatusBadRequest) 916 return 917 } 918 919 go func() { s.forceIndex(r.Context(), uint32(id)) }() 920 921 // 202 Accepted 922 w.WriteHeader(http.StatusAccepted) 923} 924 925func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 926 withIndexed := true 927 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 928 withIndexed = b 929 } 930 931 var indexed []uint32 932 if withIndexed { 933 indexed = listIndexed(s.IndexDir) 934 } 935 936 repos, err := s.Sourcegraph.List(r.Context(), indexed) 937 if err != nil { 938 http.Error(w, err.Error(), http.StatusInternalServerError) 939 return 940 } 941 942 bw := bytes.Buffer{} 943 944 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 945 946 _, err = fmt.Fprintf(tw, "ID\tName\n") 947 if err != nil { 948 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 949 return 950 } 951 952 s.queue.mu.Lock() 953 name := "" 954 for _, id := range repos.IDs { 955 if item := s.queue.get(id); item != nil { 956 name = item.opts.Name 957 } else { 958 name = "" 959 } 960 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 961 if err != nil { 962 debugLog.Printf("handleDebugList: %s\n", err.Error()) 963 } 964 } 965 s.queue.mu.Unlock() 966 967 if err != nil { 968 http.Error(w, err.Error(), http.StatusInternalServerError) 969 return 970 } 971 972 err = tw.Flush() 973 if err != nil { 974 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 975 return 976 } 977 978 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 979 980 if _, err := io.Copy(w, &bw); err != nil { 981 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 982 return 983 } 984} 985 986// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 987// can run this command during periods of low usage (evenings, weekends) to 988// trigger an initial merge run. In the steady-state, merges happen rarely, even 989// on busy instances, and users can rely on automatic merging instead. 990func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 991 // A merge operation can take very long, depending on the number merges and the 992 // target size of the compound shards. We run the merge in the background and 993 // return immediately to the user. 994 // 995 // We track the status of the merge with metricShardMergingRunning. 996 go func() { 997 s.doMerge() 998 }() 999 _, _ = w.Write([]byte("merging enqueued\n")) 1000} 1001 1002func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 1003 indexed := listIndexed(s.IndexDir) 1004 1005 bw := bytes.Buffer{} 1006 1007 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 1008 1009 _, err := fmt.Fprintf(tw, "ID\tName\n") 1010 if err != nil { 1011 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 1012 return 1013 } 1014 1015 s.queue.mu.Lock() 1016 name := "" 1017 for _, id := range indexed { 1018 if item := s.queue.get(id); item != nil { 1019 name = item.opts.Name 1020 } else { 1021 name = "" 1022 } 1023 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 1024 if err != nil { 1025 debugLog.Printf("handleDebugIndexed: %s\n", err.Error()) 1026 } 1027 } 1028 s.queue.mu.Unlock() 1029 1030 if err != nil { 1031 http.Error(w, err.Error(), http.StatusInternalServerError) 1032 return 1033 } 1034 1035 err = tw.Flush() 1036 if err != nil { 1037 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 1038 return 1039 } 1040 1041 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 1042 1043 if _, err := io.Copy(w, &bw); err != nil { 1044 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 1045 return 1046 } 1047} 1048 1049// forceIndex will run the index job for repo name now. It will return always 1050// return a string explaining what it did, even if it failed. 1051func (s *Server) forceIndex(ctx context.Context, id uint32) (string, error) { 1052 var opts IndexOptions 1053 var err error 1054 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 1055 opts = o 1056 }, func(_ uint32, e error) { 1057 err = e 1058 }, id) 1059 if err != nil { 1060 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 1061 } 1062 1063 args := s.indexArgs(opts) 1064 args.Incremental = false // force re-index 1065 1066 var state indexState 1067 ran := s.muIndexDir.With(opts.Name, func() { 1068 state, err = s.index(ctx, args) 1069 }) 1070 if !ran { 1071 return fmt.Sprintf("index job for repository already running: %s", args), nil 1072 } 1073 if err != nil { 1074 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 1075 } 1076 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 1077} 1078 1079// Index implements the gRPC method for indexing a repository. If the repository 1080// exists in the trash, it will attempt to recover it before indexing. The 1081// method returns metadata about the indexed repository including branches and 1082// and the time of indexing. The metadata might be empty if we fail to read the 1083// metadata after the index or if the repository is empty. 1084// 1085// Possible GRPC error codes: 1086// - InvalidArgument: returned when the index options are missing or invalid 1087// - AlreadyExists: returned when the repository is already being indexed by another process 1088// - Internal: returned when indexing fails 1089// - Canceled: returned when the context is canceled while waiting for an index slot 1090func (s *Server) Index(ctx context.Context, req *indexserverv1.IndexRequest) (*indexserverv1.IndexResponse, error) { 1091 return s.indexGRPC(ctx, req, s.index) 1092} 1093 1094// indexGRPC takes an index func which is used for testing. For production GRPC calls, use 1095// Server.Index instead 1096func (s *Server) indexGRPC(ctx context.Context, req *indexserverv1.IndexRequest, indexFunc func(ctx context.Context, args *indexArgs) (indexState, error)) (*indexserverv1.IndexResponse, error) { 1097 // Validate request 1098 if req.Options == nil { 1099 return nil, status.Error(codes.InvalidArgument, "index options are required") 1100 } 1101 1102 // Try to acquire semaphore, but respect context cancellation 1103 select { 1104 case s.indexSemaphore <- struct{}{}: 1105 defer func() { <-s.indexSemaphore }() 1106 case <-ctx.Done(): 1107 return nil, status.Error(codes.Canceled, "context canceled while waiting for index slot") 1108 case <-time.After(s.timeout): 1109 return nil, status.Error(codes.DeadlineExceeded, "timed out while waiting for index slot") 1110 } 1111 1112 var opts IndexOptions 1113 opts.FromProto(req.Options) 1114 opts.CloneURL = s.rootURL.ResolveReference(&url.URL{Path: path.Join("/.internal/git", req.Options.Name)}).String() 1115 args := s.indexArgs(opts) 1116 1117 // Recover the index from the trash if possible. First we have to make sure 1118 // the shard doesn't exist in the index dir, because it might potentially be 1119 // overwritten if we recover a shard from the trash. If we can find a shard 1120 // in the index dir, we skip recovery. In both cases we continue and let the 1121 // index process figure out whether the shard is up-to-date or requires a 1122 // reindex. 1123 if _, _, ok, err := args.BuildOptions().FindRepositoryMetadata(); err == nil && !ok { 1124 if s.recoverFromTrash(opts.RepoID) { 1125 infoLog.Printf("restored repository %d from trash", opts.RepoID) 1126 } 1127 } 1128 1129 // Index the repository. 1130 var indexErr error 1131 var indexTimeUnix int64 1132 var zoektRepo *zoekt.Repository 1133 1134 ran := s.muIndexDir.With(opts.Name, func() { 1135 // only record time taken once we hold the lock. This avoids us 1136 // recording time taken while merging/cleanup runs. 1137 start := time.Now() 1138 1139 var state indexState 1140 state, indexErr = indexFunc(ctx, args) 1141 elapsed := time.Since(start) 1142 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 1143 1144 if indexErr != nil { 1145 errorLog.Printf("error indexing %s: %s", args.String(), indexErr) 1146 indexErr = status.Errorf(codes.Internal, "failed to index repository: %v", indexErr) 1147 return 1148 } 1149 1150 readMetadata := func(args *indexArgs) (*zoekt.Repository, int64, error) { 1151 repo, metadata, ok, err := args.BuildOptions().FindRepositoryMetadata() 1152 if err != nil || !ok { 1153 return nil, 0, fmt.Errorf("failed to read metadata for %s: %w", args.String(), err) 1154 } 1155 return repo, metadata.IndexTime.Unix(), nil 1156 } 1157 1158 switch state { 1159 case indexStateSuccess: 1160 zoektRepo, indexTimeUnix, indexErr = readMetadata(args) 1161 if indexErr != nil { 1162 return 1163 } 1164 var branches []string 1165 for _, b := range zoektRepo.Branches { 1166 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 1167 } 1168 s.logger.Info("updated index", 1169 sglog.Int("tenant", args.TenantID), 1170 sglog.String("repo", args.Name), 1171 sglog.Uint32("id", args.RepoID), 1172 sglog.Strings("branches", branches), 1173 sglog.Duration("duration", elapsed), 1174 ) 1175 case indexStateSuccessMeta: 1176 zoektRepo, indexTimeUnix, indexErr = readMetadata(args) 1177 if indexErr != nil { 1178 return 1179 } 1180 infoLog.Printf("updated meta %s in %v", args.String(), elapsed) 1181 case indexStateNoop: 1182 zoektRepo, indexTimeUnix, indexErr = readMetadata(args) 1183 if indexErr != nil { 1184 return 1185 } 1186 case indexStateEmpty: 1187 zoektRepo = &zoekt.Repository{ 1188 ID: args.RepoID, 1189 } 1190 case indexStateFail: 1191 // This should never happen, because indexStateFail implies 1192 // indexErr!=nil and we exit early, but we'll handle it gracefully 1193 // just in case. 1194 indexErr = status.Error(codes.Internal, "failed to index repository") 1195 // Repository exists but is empty, return OK with empty response 1196 default: 1197 indexErr = status.Errorf(codes.Internal, "unknown index state: %s", state) 1198 } 1199 }) 1200 1201 if !ran { 1202 // Someone else is processing the repository. 1203 debugLog.Printf("index job for repository already running: %s", args) 1204 return nil, status.Error(codes.AlreadyExists, "repository is already being indexed") 1205 } 1206 1207 if indexErr != nil { 1208 return nil, indexErr 1209 } 1210 1211 if zoektRepo == nil { 1212 // This should never happen, because zoektRepo=nil means we either 1213 // failed to index or failed to read the metadata, both of which yield 1214 // indexErr!=nil. 1215 return nil, status.Error(codes.Internal, fmt.Sprintf("unexpected error: zoektRepo is nil for options %+v", opts)) 1216 } 1217 1218 // Convert branches to proto format 1219 repoBranchesProto := make([]*configv1.ZoektRepositoryBranch, len(zoektRepo.Branches)) 1220 for i, b := range zoektRepo.Branches { 1221 repoBranchesProto[i] = &configv1.ZoektRepositoryBranch{ 1222 Name: b.Name, 1223 Version: b.Version, 1224 } 1225 } 1226 1227 return &indexserverv1.IndexResponse{ 1228 RepoId: zoektRepo.ID, 1229 Branches: repoBranchesProto, 1230 IndexTimeUnix: indexTimeUnix, 1231 }, nil 1232} 1233 1234// recoverFromTrash attempts to recover an index from the trash or tombstones. 1235// It returns true if the repository was recovered. 1236func (s *Server) recoverFromTrash(repoID uint32) bool { 1237 trashShards := getShards(filepath.Join(s.IndexDir, ".trash")) 1238 if shards, ok := trashShards[repoID]; ok { 1239 moveAll(s.IndexDir, shards) 1240 return true 1241 } 1242 1243 tombstones := getTombstonedRepos(s.IndexDir) 1244 if tombstone, ok := tombstones[repoID]; ok { 1245 if err := index.UnsetTombstone(tombstone.Path, repoID); err == nil { 1246 return true 1247 } 1248 } 1249 1250 return false 1251} 1252 1253// Delete implements the gRPC method for deleting a repository. It moves the 1254// simple shards to the trash dir and tombstones repos in compound shards. 1255func (s *Server) Delete(ctx context.Context, req *indexserverv1.DeleteRequest) (*indexserverv1.DeleteResponse, error) { 1256 var err error 1257 1258 // Run the delete operation in a goroutine to be able to respond to context 1259 // cancellation while waiting for the lock. 1260 done := make(chan struct{}) 1261 go func() { 1262 defer close(done) 1263 s.muIndexDir.Global(func() { 1264 indexShards := getShards(s.IndexDir) 1265 for _, repoID := range req.RepoIds { 1266 if ctx.Err() != nil { 1267 err = status.Error(codes.Canceled, "context canceled") 1268 return 1269 } 1270 1271 if shards, ok := indexShards[repoID]; ok { 1272 simple := shards[:0] 1273 for _, shardItem := range shards { 1274 if s.shardMerging && maybeSetTombstone([]shard{shardItem}, repoID) { 1275 continue 1276 } 1277 1278 simple = append(simple, shardItem) 1279 } 1280 1281 if len(simple) == 0 { 1282 continue 1283 } 1284 1285 moveAll(filepath.Join(s.IndexDir, ".trash"), simple) 1286 } 1287 } 1288 }) 1289 }() 1290 1291 select { 1292 case <-done: 1293 if err != nil { 1294 return nil, err 1295 } 1296 case <-ctx.Done(): 1297 return nil, status.Error(codes.Canceled, "context canceled") 1298 case <-time.After(s.timeout): 1299 return nil, status.Error(codes.DeadlineExceeded, "timed out while waiting for delete operation to complete") 1300 } 1301 1302 return &indexserverv1.DeleteResponse{}, nil 1303} 1304 1305// DeleteAllData deletes all shards in the index and trash dir belonging to the 1306// tenant associated with the request. The deletion is best-effort, which means 1307// we will delete as much as possible. If no error is returned, the caller can 1308// be certain that all data has been deleted. 1309func (s *Server) DeleteAllData(ctx context.Context, _ *indexserverv1.DeleteAllDataRequest) (*indexserverv1.DeleteAllDataResponse, error) { 1310 tnt, err := tenant.FromContext(ctx) 1311 if err != nil { 1312 return nil, err 1313 } 1314 s.logger.Warn("DeleteAllData", sglog.Int("tenant_id", tnt.ID())) 1315 1316 var merr error 1317 s.muIndexDir.Global(func() { 1318 // First, explode all compound shards that have repos from the tenant in 1319 // question. Because we hold the global lock, we can be sure that no new 1320 // merges start while we do this. 1321 if err := s.explodeTenantCompoundShards(ctx, func(path string) error { 1322 // We call explode in a separate process to protect indexserver. 1323 cmd := defaultExplodeCmd(path) 1324 1325 stdoutBuf := &bytes.Buffer{} 1326 stderrBuf := &bytes.Buffer{} 1327 cmd.Stdout = stdoutBuf 1328 cmd.Stderr = stderrBuf 1329 1330 err := cmd.Run() 1331 if err != nil { 1332 errorLog.Printf("explode failed: %v (stderr: %s)", err, stderrBuf.String()) 1333 return err 1334 } 1335 1336 infoLog.Printf("exploded shard: %s", stdoutBuf.String()) 1337 1338 return nil 1339 }); err != nil { 1340 merr = multierr.Append(merr, err) 1341 } 1342 1343 // Invariant: all shards from the tenant are simple shards. 1344 1345 if err := purgeTenantShards(ctx, s.IndexDir); err != nil { 1346 merr = multierr.Append(merr, err) 1347 } 1348 if err := purgeTenantShards(ctx, filepath.Join(s.IndexDir, ".trash")); err != nil { 1349 merr = multierr.Append(merr, err) 1350 } 1351 }) 1352 1353 return &indexserverv1.DeleteAllDataResponse{}, merr 1354} 1355 1356func listIndexed(indexDir string) []uint32 { 1357 index := getShards(indexDir) 1358 metricNumIndexed.Set(float64(len(index))) 1359 repoIDs := make([]uint32, 0, len(index)) 1360 for id := range index { 1361 repoIDs = append(repoIDs, id) 1362 } 1363 slices.Sort(repoIDs) 1364 return repoIDs 1365} 1366 1367// setupTmpDir sets up a temporary directory on the same volume as the 1368// indexes. 1369// 1370// If main is true we will delete older temp directories left around. main is 1371// false when this is a debug command. 1372func setupTmpDir(logger sglog.Logger, main bool, index string) error { 1373 // change the target tmp directory depending on if it's our main daemon or a 1374 // debug sub command. 1375 dir := ".indexserver.debug.tmp" 1376 if main { 1377 dir = ".indexserver.tmp" 1378 } 1379 1380 tmpRoot := filepath.Join(index, dir) 1381 1382 if main { 1383 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1384 err := os.RemoveAll(tmpRoot) 1385 if err != nil { 1386 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1387 } 1388 } 1389 1390 if err := os.MkdirAll(tmpRoot, 0o755); err != nil { 1391 return err 1392 } 1393 1394 return os.Setenv("TMPDIR", tmpRoot) 1395} 1396 1397func printMetaData(fn string) error { 1398 repo, indexMeta, err := index.ReadMetadataPath(fn) 1399 if err != nil { 1400 return err 1401 } 1402 1403 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1404 if err != nil { 1405 return err 1406 } 1407 1408 err = json.NewEncoder(os.Stdout).Encode(repo) 1409 if err != nil { 1410 return err 1411 } 1412 return nil 1413} 1414 1415func printShardStats(fn string) error { 1416 f, err := os.Open(fn) 1417 if err != nil { 1418 return err 1419 } 1420 1421 iFile, err := index.NewIndexFile(f) 1422 if err != nil { 1423 return err 1424 } 1425 1426 return index.PrintNgramStats(iFile) 1427} 1428 1429func srcLogLevelIsDebug() bool { 1430 lvl := os.Getenv(sglog.EnvLogLevel) 1431 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1432} 1433 1434func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1435 v := os.Getenv(k) 1436 if v == "" { 1437 return defaultVal 1438 } 1439 b, err := strconv.ParseBool(v) 1440 if err != nil { 1441 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1442 } 1443 return b 1444} 1445 1446func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1447 v := os.Getenv(k) 1448 if v == "" { 1449 return defaultVal 1450 } 1451 i, err := strconv.ParseInt(v, 10, 64) 1452 if err != nil { 1453 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1454 } 1455 return i 1456} 1457 1458func getEnvWithDefaultInt(k string, defaultVal int) int { 1459 v := os.Getenv(k) 1460 if v == "" { 1461 return defaultVal 1462 } 1463 i, err := strconv.Atoi(k) 1464 if err != nil { 1465 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1466 } 1467 return i 1468} 1469 1470func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1471 v := os.Getenv(k) 1472 if v == "" { 1473 return defaultVal 1474 } 1475 i, err := strconv.ParseUint(v, 10, 64) 1476 if err != nil { 1477 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1478 } 1479 return i 1480} 1481 1482func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1483 v := os.Getenv(k) 1484 if v == "" { 1485 return defaultVal 1486 } 1487 f, err := strconv.ParseFloat(v, 64) 1488 if err != nil { 1489 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1490 } 1491 return f 1492} 1493 1494func getEnvWithDefaultString(k string, defaultVal string) string { 1495 v := os.Getenv(k) 1496 if v == "" { 1497 return defaultVal 1498 } 1499 return v 1500} 1501 1502func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1503 v := os.Getenv(k) 1504 if v == "" { 1505 return defaultVal 1506 } 1507 1508 d, err := time.ParseDuration(v) 1509 if err != nil { 1510 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1511 } 1512 return d 1513} 1514 1515func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1516 set := map[string]struct{}{} 1517 for _, v := range strings.Split(os.Getenv(k), ",") { 1518 v = strings.TrimSpace(v) 1519 if v != "" { 1520 set[v] = struct{}{} 1521 } 1522 } 1523 return set 1524} 1525 1526func joinStringSet(set map[string]struct{}, sep string) string { 1527 var xs []string 1528 for x := range set { 1529 xs = append(xs, x) 1530 } 1531 1532 return strings.Join(xs, sep) 1533} 1534 1535func setShardsCounter(indexDir string) { 1536 fns, err := filepath.Glob(filepath.Join(indexDir, "*.zoekt")) 1537 if err != nil { 1538 errorLog.Printf("setShardsCounter: %s\n", err) 1539 return 1540 } 1541 metricNumberShards.Set(float64(len(fns))) 1542 1543 compoundFns := make([]string, 0, len(fns)) 1544 for _, fn := range fns { 1545 if strings.HasPrefix(filepath.Base(fn), "compound-") { 1546 compoundFns = append(compoundFns, fn) 1547 } 1548 } 1549 metricNumberCompoundShards.Set(float64(len(compoundFns))) 1550} 1551 1552func rootCmd() *ffcli.Command { 1553 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1554 conf := rootConfig{ 1555 Main: true, 1556 } 1557 conf.registerRootFlags(rootFs) 1558 1559 return &ffcli.Command{ 1560 FlagSet: rootFs, 1561 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1562 Subcommands: []*ffcli.Command{debugCmd()}, 1563 Exec: func(ctx context.Context, args []string) error { 1564 return startServer(conf) 1565 }, 1566 } 1567} 1568 1569type rootConfig struct { 1570 // Main is true if this rootConfig is for our main long running command (the 1571 // indexserver). Debug commands should not set this value. This is used to 1572 // determine if we need to run tmpfriend. 1573 Main bool 1574 1575 root string 1576 interval time.Duration 1577 index string 1578 indexConcurrency int64 1579 listen string 1580 hostname string 1581 cpuFraction float64 1582 1583 // config values related to shard merging 1584 disableShardMerging bool 1585 vacuumInterval time.Duration 1586 mergeInterval time.Duration 1587 targetSize int64 1588 minSize int64 1589 minAgeDays int 1590 1591 // config values related to backoff indexing repos with one or more consecutive failures 1592 backoffDuration time.Duration 1593 maxBackoffDuration time.Duration 1594} 1595 1596func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1597 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1598 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1599 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently") 1600 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", index.DefaultDir), "set index directory to use") 1601 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1602 fs.StringVar(&rc.hostname, "hostname", index.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1603 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1604 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1605 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1606 1607 // flags related to shard merging 1608 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging") 1609 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1610 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1611 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB") 1612 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB") 1613 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1614} 1615 1616func startServer(conf rootConfig) error { 1617 s, err := newServer(conf) 1618 if err != nil { 1619 return err 1620 } 1621 1622 profiler.Init("zoekt-sourcegraph-indexserver") 1623 setShardsCounter(s.IndexDir) 1624 1625 if conf.listen != "" { 1626 1627 mux := http.NewServeMux() 1628 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1629 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1630 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1631 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1632 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1633 }...) 1634 s.addDebugHandlers(mux) 1635 1636 go func() { 1637 debugLog.Printf("serving HTTP on %s", conf.listen) 1638 mux := grpcutil.MultiplexGRPC(newGRPCServer(sglog.Scoped("indexserver"), s), mux) 1639 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1640 }() 1641 1642 // Serve mux on a unix domain socket on a best-effort-basis so that 1643 // webserver can call the endpoints via the shared filesystem. 1644 // 1645 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1646 // on the socket due to permission errors. See 1647 // https://github.com/docker/for-mac/issues/6239 1648 go func() { 1649 serveHTTPOverSocket := func() error { 1650 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1651 // We cannot bind a socket to an existing pathname. 1652 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1653 return fmt.Errorf("error removing socket file: %s", socket) 1654 } 1655 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1656 // unixpacket). 1657 l, err := net.Listen("unix", socket) 1658 if err != nil { 1659 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1660 } 1661 // Indexserver (root) and webserver (Sourcegraph) run with 1662 // different users. Per default, the socket is created with 1663 // permission 755 (root root), which doesn't let webserver write to 1664 // it. 1665 // 1666 // See https://github.com/golang/go/issues/11822 for more context. 1667 if err := os.Chmod(socket, 0o777); err != nil { 1668 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1669 } 1670 debugLog.Printf("serving HTTP on %s", socket) 1671 return http.Serve(l, mux) 1672 } 1673 debugLog.Print(serveHTTPOverSocket()) 1674 }() 1675 } 1676 1677 oc := &ownerChecker{ 1678 Path: filepath.Join(conf.index, "owner.txt"), 1679 Hostname: conf.hostname, 1680 } 1681 go oc.Run() 1682 1683 logger := sglog.Scoped("metricsRegistration") 1684 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1685 1686 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1687 prometheus.DefaultRegisterer.MustRegister(c) 1688 1689 s.Run() 1690 return nil 1691} 1692 1693func newServer(conf rootConfig) (*Server, error) { 1694 logger := sglog.Scoped("server") 1695 1696 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1697 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1698 } 1699 if conf.index == "" { 1700 return nil, fmt.Errorf("must set -index") 1701 } 1702 if conf.root == "" { 1703 return nil, fmt.Errorf("must set -sourcegraph_url") 1704 } 1705 rootURL, err := url.Parse(conf.root) 1706 if err != nil { 1707 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1708 } 1709 1710 rootURL = addDefaultPort(rootURL) 1711 1712 // Tune GOMAXPROCS to match Linux container CPU quota. 1713 _, _ = maxprocs.Set() 1714 1715 // Automatically prepend our own path at the front, to minimize 1716 // required configuration. 1717 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1718 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1719 } 1720 1721 if _, err := os.Stat(conf.index); err != nil { 1722 if err := os.MkdirAll(conf.index, 0o755); err != nil { 1723 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1724 } 1725 } 1726 1727 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1728 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1729 } 1730 1731 if srcLogLevelIsDebug() { 1732 debugLog.SetOutput(os.Stderr) 1733 } 1734 1735 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1736 if len(reposWithSeparateIndexingMetrics) > 0 { 1737 debugLog.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1738 } 1739 1740 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1741 if len(deltaBuildRepositoriesAllowList) > 0 { 1742 debugLog.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1743 } 1744 1745 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1746 if deltaShardNumberFallbackThreshold > 0 { 1747 debugLog.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1748 } else { 1749 debugLog.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1750 } 1751 1752 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1753 if len(reposShouldSkipSymbolsCalculation) > 0 { 1754 debugLog.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1755 } 1756 1757 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1758 if indexingTimeout != defaultIndexingTimeout { 1759 debugLog.Printf("using configured indexing timeout: %s", indexingTimeout) 1760 } 1761 1762 var sg Sourcegraph 1763 if rootURL.IsAbs() { 1764 var batchSize int 1765 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1766 batchSize, err = strconv.Atoi(v) 1767 if err != nil { 1768 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1769 } 1770 } 1771 1772 opts := []SourcegraphClientOption{ 1773 WithBatchSize(batchSize), 1774 } 1775 1776 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1777 client, err := dialGRPCClient(rootURL.Host, logger) 1778 if err != nil { 1779 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1780 } 1781 1782 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...) 1783 1784 } else { 1785 sg = sourcegraphFake{ 1786 RootDir: rootURL.String(), 1787 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1788 } 1789 } 1790 1791 cpuCount := max(int(math.Round(float64(runtime.GOMAXPROCS(0))*(conf.cpuFraction))), 1) 1792 1793 if conf.indexConcurrency < 1 { 1794 conf.indexConcurrency = 1 1795 } else if conf.indexConcurrency > int64(cpuCount) { 1796 conf.indexConcurrency = int64(cpuCount) 1797 } 1798 1799 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1800 1801 return &Server{ 1802 logger: logger, 1803 rootURL: rootURL, 1804 Sourcegraph: sg, 1805 IndexDir: conf.index, 1806 IndexConcurrency: int(conf.indexConcurrency), 1807 Interval: conf.interval, 1808 CPUCount: cpuCount, 1809 queue: *q, 1810 shardMerging: !conf.disableShardMerging, 1811 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1812 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1813 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1814 hostname: conf.hostname, 1815 mergeOpts: mergeOpts{ 1816 vacuumInterval: conf.vacuumInterval, 1817 mergeInterval: conf.mergeInterval, 1818 targetSizeBytes: conf.targetSize * 1024 * 1024, 1819 minSizeBytes: conf.minSize * 1024 * 1024, 1820 minAgeDays: conf.minAgeDays, 1821 }, 1822 timeout: indexingTimeout, 1823 indexSemaphore: make(chan struct{}, int(conf.indexConcurrency)), 1824 }, err 1825} 1826 1827func newGRPCServer(logger sglog.Logger, s *Server, additionalOpts ...grpc.ServerOption) *grpc.Server { 1828 grpcServer := defaults.NewServer(logger, additionalOpts...) 1829 indexserverv1.RegisterSourcegraphIndexserverServiceServer(grpcServer, s) 1830 return grpcServer 1831} 1832 1833// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1834// for the indexed-search-configuration gRPC service. 1835// 1836// The default backoff strategy is modeled after the default settings used by 1837// retryablehttp.DefaultClient. 1838// 1839// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1840// - Unavailable 1841// - Aborted 1842// 1843//go:embed default_grpc_service_configuration.json 1844var defaultGRPCServiceConfigurationJSON string 1845 1846func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1847 return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1848 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1849 return invoker(ctx, method, req, reply, cc, opts...) 1850 } 1851} 1852 1853func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1854 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1855 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1856 return streamer(ctx, desc, cc, method, opts...) 1857 } 1858} 1859 1860// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1861// This can be overridden by providing custom Server/Dial options. 1862const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1863 1864func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (configv1.ZoektConfigurationServiceClient, error) { 1865 metrics := clientMetricsOnce() 1866 1867 // If the service seems to be unavailable, this 1868 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1869 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1870 retryOpts := []retry.CallOption{ 1871 retry.WithMax(5), 1872 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1873 retry.WithCodes(codes.Unavailable), 1874 } 1875 1876 opts := []grpc.DialOption{ 1877 grpc.WithTransportCredentials(insecure.NewCredentials()), 1878 grpc.WithChainStreamInterceptor( 1879 metrics.StreamClientInterceptor(), 1880 messagesize.StreamClientInterceptor, 1881 internalActorStreamInterceptor(), 1882 internalerrs.LoggingStreamClientInterceptor(logger), 1883 internalerrs.PrometheusStreamClientInterceptor, 1884 retry.StreamClientInterceptor(retryOpts...), 1885 ), 1886 grpc.WithChainUnaryInterceptor( 1887 metrics.UnaryClientInterceptor(), 1888 messagesize.UnaryClientInterceptor, 1889 internalActorUnaryInterceptor(), 1890 internalerrs.LoggingUnaryClientInterceptor(logger), 1891 internalerrs.PrometheusUnaryClientInterceptor, 1892 retry.UnaryClientInterceptor(retryOpts...), 1893 ), 1894 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1895 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1896 } 1897 1898 opts = append(opts, additionalOpts...) 1899 1900 // Ensure that the message size options are set last, so they override any other 1901 // client-specific options that tweak the message size. 1902 // 1903 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1904 // take precedence over everything else with a uniform size setting that's easy to reason about. 1905 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1906 1907 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1908 // This is done lazily, so we can provide the client to use regardless of 1909 // whether we enabled gRPC or not initially. 1910 cc, err := grpc.Dial(addr, opts...) 1911 if err != nil { 1912 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1913 } 1914 1915 client := configv1.NewZoektConfigurationServiceClient(cc) 1916 return client, nil 1917} 1918 1919// addDefaultPort adds a default port to a URL if one is not specified. 1920// 1921// If the URL scheme is "http" and no port is specified, "80" is used. 1922// If the scheme is "https", "443" is used. 1923// 1924// The original URL is not mutated. A copy is modified and returned. 1925func addDefaultPort(original *url.URL) *url.URL { 1926 if original == nil { 1927 return nil // don't panic 1928 } 1929 1930 if !original.IsAbs() { 1931 return original // don't do anything if the URL doesn't look like a remote URL 1932 } 1933 1934 if original.Scheme == "http" && original.Port() == "" { 1935 u := cloneURL(original) 1936 u.Host = net.JoinHostPort(u.Host, "80") 1937 return u 1938 } 1939 1940 if original.Scheme == "https" && original.Port() == "" { 1941 u := cloneURL(original) 1942 u.Host = net.JoinHostPort(u.Host, "443") 1943 return u 1944 } 1945 1946 return original 1947} 1948 1949// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1950// This is copied from net/http/clone.go 1951func cloneURL(u *url.URL) *url.URL { 1952 if u == nil { 1953 return nil 1954 } 1955 u2 := new(url.URL) 1956 *u2 = *u 1957 if u.User != nil { 1958 u2.User = new(url.Userinfo) 1959 *u2.User = *u.User 1960 } 1961 return u2 1962} 1963 1964func main() { 1965 liblog := sglog.Init(sglog.Resource{ 1966 Name: "zoekt-indexserver", 1967 Version: index.Version, 1968 InstanceID: index.HostnameBestEffort(), 1969 }) 1970 defer liblog.Sync() 1971 1972 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1973 log.Fatal(err) 1974 } 1975} 1976 1977// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1978// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1979// 1980// An error is returned of the provided environment variables fails to parse as a boolean. 1981func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1982 for _, envVar := range envVarNames { 1983 v := os.Getenv(envVar) 1984 if v == "" { 1985 continue 1986 } 1987 1988 b, err := strconv.ParseBool(v) 1989 if err != nil { 1990 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1991 } 1992 1993 return b, nil 1994 } 1995 1996 return defaultBool, nil 1997}