fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Licensed under the Apache License, Version 2.0 (the "License"); 2// you may not use this file except in compliance with the License. 3// You may obtain a copy of the License at 4// 5// http://www.apache.org/licenses/LICENSE-2.0 6// 7// Unless required by applicable law or agreed to in writing, software 8// distributed under the License is distributed on an "AS IS" BASIS, 9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 10// See the License for the specific language governing permissions and 11// limitations under the License. 12 13// Command zoekt-sourcegraph-indexserver periodically reindexes repositories 14// from a Sourcegraph instance. It uses a "pull-based" design, where it periodically 15// reaches out to the Sourcegraph instance for the list of repositories to reindex. 16package main 17 18import ( 19 "bytes" 20 "context" 21 _ "embed" 22 "encoding/json" 23 "errors" 24 "flag" 25 "fmt" 26 "html/template" 27 "io" 28 "io/fs" 29 "log" 30 "math" 31 "math/rand" 32 "net" 33 "net/http" 34 "net/url" 35 "os" 36 "os/exec" 37 "os/signal" 38 "path/filepath" 39 "runtime" 40 "sort" 41 "strconv" 42 "strings" 43 "sync" 44 "text/tabwriter" 45 "time" 46 47 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 48 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 49 "github.com/peterbourgon/ff/v3/ffcli" 50 "github.com/prometheus/client_golang/prometheus" 51 "github.com/prometheus/client_golang/prometheus/promauto" 52 sglog "github.com/sourcegraph/log" 53 "github.com/sourcegraph/mountinfo" 54 "github.com/sourcegraph/zoekt" 55 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 56 "github.com/sourcegraph/zoekt/grpc/internalerrs" 57 "github.com/sourcegraph/zoekt/grpc/messagesize" 58 "github.com/sourcegraph/zoekt/index" 59 "github.com/sourcegraph/zoekt/internal/debugserver" 60 "github.com/sourcegraph/zoekt/internal/profiler" 61 "github.com/sourcegraph/zoekt/internal/tenant" 62 63 "go.uber.org/automaxprocs/maxprocs" 64 "golang.org/x/net/trace" 65 "golang.org/x/sys/unix" 66 "google.golang.org/grpc" 67 "google.golang.org/grpc/codes" 68 "google.golang.org/grpc/credentials/insecure" 69 "google.golang.org/grpc/metadata" 70) 71 72var ( 73 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 74 Name: "resolve_revisions_seconds", 75 Help: "A histogram of latencies for resolving all repository revisions.", 76 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 77 }) 78 79 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 80 Name: "resolve_revision_seconds", 81 Help: "A histogram of latencies for resolving a repository revision.", 82 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 83 }, []string{"success"}) // success=true|false 84 85 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 86 Name: "get_index_options_total", 87 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 88 }) 89 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 90 Name: "get_index_options_error_total", 91 Help: "The total number of times we failed to get index options for a repository.", 92 }) 93 94 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 95 Name: "index_repo_seconds", 96 Help: "A histogram of latencies for indexing a repository.", 97 Buckets: prometheus.ExponentialBucketsRange( 98 (100 * time.Millisecond).Seconds(), 99 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 100 20), 101 }, []string{ 102 "state", // state is an indexState 103 "name", // name of the repository that was indexed 104 }) 105 106 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{ 107 Name: "index_indexing_delay_seconds", 108 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.", 109 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days 110 }, []string{ 111 "state", // state is an indexState 112 "name", // the name of the repository that was indexed 113 }) 114 115 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 116 Name: "index_fetch_seconds", 117 Help: "A histogram of latencies for fetching a repository.", 118 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 119 }, []string{ 120 "success", // true|false 121 "name", // the name of the repository that the commits were fetched from 122 }) 123 124 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 125 Name: "index_incremental_index_state", 126 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 127 }, []string{"state"}) // state is index.IndexState 128 129 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 130 Name: "index_num_indexed", 131 Help: "Number of indexed repos by code host", 132 }) 133 134 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 135 Name: "index_failing_total", 136 Help: "Counts failures to index (indexing activity, should be used with rate())", 137 }) 138 139 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 140 Name: "index_indexing_total", 141 Help: "Counts indexings (indexing activity, should be used with rate())", 142 }) 143 144 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 145 Name: "index_num_stopped_tracking_total", 146 Help: "Counts the number of repos we stopped tracking.", 147 }) 148 149 // clientMetricsOnce returns a singleton instance of the client metrics 150 // that are shared across all gRPC clients that this process creates. 151 // 152 // This function panics if the metrics cannot be registered with the default 153 // Prometheus registry. 154 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics { 155 clientMetrics := grpcprom.NewClientMetrics( 156 grpcprom.WithClientCounterOptions(), 157 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 158 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 159 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 160 ) 161 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 162 return clientMetrics 163 }) 164) 165 166// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 167// NOTE: if you change this, you must also update gitIndex to use the same value when fetching the repo. 168const MaxFileSize = 1 << 20 169 170// set of repositories that we want to capture separate indexing metrics for 171var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 172 173type indexState string 174 175const ( 176 indexStateFail indexState = "fail" 177 indexStateSuccess indexState = "success" 178 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 179 indexStateNoop indexState = "noop" // We didn't need to update index 180 indexStateEmpty indexState = "empty" // index is empty (empty repo) 181) 182 183// Server is the main functionality of zoekt-sourcegraph-indexserver. It 184// exists to conveniently use all the options passed in via func main. 185type Server struct { 186 logger sglog.Logger 187 188 Sourcegraph Sourcegraph 189 BatchSize int 190 191 // IndexDir is the index directory to use. 192 IndexDir string 193 194 // IndexConcurrency is the number of repositories we index at once. 195 IndexConcurrency int 196 197 // Interval is how often we sync with Sourcegraph. 198 Interval time.Duration 199 200 // CPUCount is the number of CPUs to use for indexing shards. 201 CPUCount int 202 203 queue Queue 204 205 // muIndexDir protects the index directory from concurrent access. 206 muIndexDir indexMutex 207 208 // If true, shard merging is enabled. 209 shardMerging bool 210 211 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 212 // use delta-builds for instead of normal builds 213 deltaBuildRepositoriesAllowList map[string]struct{} 214 215 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 216 // before attempting a delta build. 217 deltaShardNumberFallbackThreshold uint64 218 219 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 220 // we skip calculating symbols metadata for during builds 221 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 222 223 hostname string 224 225 mergeOpts mergeOpts 226 227 // timeout defines how long the index server waits before killing an indexing job. 228 timeout time.Duration 229} 230 231var ( 232 debugLog = log.New(io.Discard, "[DEBUG] ", log.LstdFlags) 233 infoLog = log.New(os.Stderr, "[INFO] ", log.LstdFlags) 234 errorLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags) 235) 236 237// our index commands should output something every 100mb they process. 238// 239// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 240// time." famous last words. A client was indexing a monorepo with 42 241// cores... 5m was not enough. 242const noOutputTimeout = 30 * time.Minute 243 244func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 245 out := &synchronizedBuffer{} 246 cmd.Stdout = out 247 cmd.Stderr = out 248 249 tr.LazyPrintf("%s", cmd.Args) 250 251 defer func() { 252 if err != nil { 253 outS := out.String() 254 tr.LazyPrintf("failed: %v", err) 255 tr.LazyPrintf("output: %s", out) 256 tr.SetError() 257 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 258 } 259 }() 260 261 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 262 263 if err := cmd.Start(); err != nil { 264 return err 265 } 266 267 errC := make(chan error) 268 go func() { 269 errC <- cmd.Wait() 270 }() 271 272 // This channel is set after we have sent sigquit. It allows us to follow up 273 // with a sigkill if the process doesn't quit after sigquit. 274 kill := make(<-chan time.Time) 275 276 lastLen := 0 277 for { 278 select { 279 case <-time.After(noOutputTimeout): 280 // Periodically check if we have had output. If not kill the process. 281 if out.Len() != lastLen { 282 lastLen = out.Len() 283 infoLog.Printf("still running %s", cmd.Args) 284 } else { 285 // Send quit (C-\) first so we get a stack dump. 286 infoLog.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 287 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 288 errorLog.Println("quit failed:", err) 289 } 290 291 // send sigkill if still running in 10s 292 kill = time.After(10 * time.Second) 293 } 294 295 case <-kill: 296 infoLog.Printf("still running, killing %s", cmd.Args) 297 if err := cmd.Process.Kill(); err != nil { 298 errorLog.Println("kill failed:", err) 299 } 300 301 case err := <-errC: 302 if err != nil { 303 return err 304 } 305 306 tr.LazyPrintf("success") 307 return nil 308 } 309 } 310} 311 312// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 313// monitor the buffer while it is being written to. 314type synchronizedBuffer struct { 315 mu sync.Mutex 316 b bytes.Buffer 317} 318 319func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 320 sb.mu.Lock() 321 defer sb.mu.Unlock() 322 return sb.b.Write(p) 323} 324 325func (sb *synchronizedBuffer) Len() int { 326 sb.mu.Lock() 327 defer sb.mu.Unlock() 328 return sb.b.Len() 329} 330 331func (sb *synchronizedBuffer) String() string { 332 sb.mu.Lock() 333 defer sb.mu.Unlock() 334 return sb.b.String() 335} 336 337// pauseFileName if present in IndexDir will stop index jobs from 338// running. This is to make it possible to experiment with the content of the 339// IndexDir without the indexserver writing to it. 340const pauseFileName = "PAUSE" 341 342// Run the sync loop. This blocks forever. 343func (s *Server) Run() { 344 removeIncompleteShards(s.IndexDir) 345 346 // Start a goroutine which updates the queue with commits to index. 347 go func() { 348 // We update the list of indexed repos every Interval. To speed up manual 349 // testing we also listen for SIGUSR1 to trigger updates. 350 // 351 // "pkill -SIGUSR1 zoekt-sourcegra" 352 for range jitterTicker(s.Interval, unix.SIGUSR1) { 353 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 354 infoLog.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 355 continue 356 } 357 358 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 359 if err != nil { 360 errorLog.Printf("error listing repos: %s", err) 361 continue 362 } 363 364 debugLog.Printf("updating index queue with %d repositories", len(repos.IDs)) 365 366 // Stop indexing repos we don't need to track anymore 367 removed := s.queue.MaybeRemoveMissing(repos.IDs) 368 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 369 if len(removed) > 0 { 370 infoLog.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 371 } 372 373 cleanupDone := make(chan struct{}) 374 go func() { 375 defer close(cleanupDone) 376 s.muIndexDir.Global(func() { 377 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 378 }) 379 }() 380 381 repos.IterateIndexOptions(s.queue.AddOrUpdate) 382 383 // IterateIndexOptions will only iterate over repositories that have 384 // changed since we last called list. However, we want to add all IDs 385 // back onto the queue just to check that what is on disk is still 386 // correct. This will use the last IndexOptions we stored in the 387 // queue. The repositories not on the queue (missing) need a forced 388 // fetch of IndexOptions. 389 missing := s.queue.Bump(repos.IDs) 390 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 391 392 setShardsCounter(s.IndexDir) 393 394 <-cleanupDone 395 } 396 }() 397 398 go func() { 399 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 400 if s.shardMerging { 401 s.vacuum() 402 } 403 } 404 }() 405 406 go func() { 407 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 408 if s.shardMerging { 409 s.doMerge() 410 } 411 } 412 }() 413 414 for i := 0; i < s.IndexConcurrency; i++ { 415 go s.processQueue() 416 } 417 418 // block forever 419 select {} 420} 421 422// formatList returns a comma-separated list of the first min(len(v), m) items. 423func formatListUint32(v []uint32, m int) string { 424 if len(v) < m { 425 m = len(v) 426 } 427 428 sb := strings.Builder{} 429 for i := 0; i < m; i++ { 430 fmt.Fprintf(&sb, "%d, ", v[i]) 431 } 432 433 if len(v) > m { 434 sb.WriteString("...") 435 } 436 437 return strings.TrimRight(sb.String(), ", ") 438} 439 440func (s *Server) processQueue() { 441 for { 442 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 443 time.Sleep(time.Second) 444 continue 445 } 446 447 item, ok := s.queue.Pop() 448 if !ok { 449 time.Sleep(time.Second) 450 continue 451 } 452 453 opts := item.Opts 454 args := s.indexArgs(opts) 455 456 ran := s.muIndexDir.With(opts.Name, func() { 457 // only record time taken once we hold the lock. This avoids us 458 // recording time taken while merging/cleanup runs. 459 start := time.Now() 460 461 state, err := s.Index(args) 462 463 elapsed := time.Since(start) 464 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 465 466 indexDelay := time.Since(item.DateAddedToQueue) 467 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds()) 468 469 if err != nil { 470 errorLog.Printf("error indexing %s: %s", args.String(), err) 471 } 472 473 switch state { 474 case indexStateSuccess: 475 var branches []string 476 for _, b := range args.Branches { 477 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 478 } 479 s.logger.Info("updated index", 480 sglog.Int("tenant", args.TenantID), 481 sglog.String("repo", args.Name), 482 sglog.Uint32("id", args.RepoID), 483 sglog.Strings("branches", branches), 484 sglog.Duration("duration", elapsed), 485 sglog.Duration("index_delay", indexDelay), 486 ) 487 case indexStateSuccessMeta: 488 infoLog.Printf("updated meta %s in %v", args.String(), elapsed) 489 } 490 s.queue.SetIndexed(opts, state) 491 }) 492 493 if !ran { 494 // Someone else is processing the repository. We can just skip this job 495 // since the repository will be added back to the queue and we will 496 // converge to the correct behaviour. 497 debugLog.Printf("index job for repository already running: %s", args) 498 continue 499 } 500 } 501} 502 503// repoNameForMetric returns a normalized version of the given repository name that is 504// suitable for use with Prometheus metrics. 505func repoNameForMetric(repo string) string { 506 // Check to see if we want to be able to capture separate indexing metrics for this repository. 507 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 508 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 509 return repo 510 } 511 512 return "" 513} 514 515func batched(slice []uint32, size int) <-chan []uint32 { 516 c := make(chan []uint32) 517 go func() { 518 for len(slice) > 0 { 519 if size > len(slice) { 520 size = len(slice) 521 } 522 c <- slice[:size] 523 slice = slice[size:] 524 } 525 close(c) 526 }() 527 return c 528} 529 530// jitterTicker returns a ticker which ticks with a jitter. Each tick is 531// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 532// 533// sig is a list of signals which also cause the ticker to fire. This is a 534// convenience to allow manually triggering of the ticker. 535func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 536 ticker := make(chan struct{}) 537 538 go func() { 539 for { 540 ticker <- struct{}{} 541 ns := int64(d) 542 jitter := rand.Int63n(ns) 543 time.Sleep(time.Duration(ns/2 + jitter)) 544 } 545 }() 546 547 go func() { 548 if len(sig) == 0 { 549 return 550 } 551 552 c := make(chan os.Signal, 1) 553 signal.Notify(c, sig...) 554 for range c { 555 ticker <- struct{}{} 556 } 557 }() 558 559 return ticker 560} 561 562// Index starts an index job for repo name at commit. 563func (s *Server) Index(args *indexArgs) (state indexState, err error) { 564 tr := trace.New("index", args.Name) 565 tr.SetMaxEvents(30) // Ensure we capture all indexing events 566 567 defer func() { 568 if err != nil { 569 tr.SetError() 570 tr.LazyPrintf("error: %v", err) 571 state = indexStateFail 572 metricFailingTotal.Inc() 573 } 574 tr.LazyPrintf("state: %s", state) 575 tr.Finish() 576 }() 577 578 // Sourcegraph should always provide a tenant ID. 579 if args.TenantID < 1 { 580 return indexStateFail, tenant.ErrMissingTenant 581 } 582 583 tr.LazyPrintf("branches: %v", args.Branches) 584 585 if len(args.Branches) == 0 { 586 return indexStateEmpty, createEmptyShard(args) 587 } 588 589 repositoryName := args.Name 590 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 591 tr.LazyPrintf("marking this repository for delta build") 592 args.UseDelta = true 593 } 594 595 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 596 597 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 598 tr.LazyPrintf("skipping symbols calculation") 599 args.Symbols = false 600 } 601 602 reason := "forced" 603 604 if args.Incremental { 605 bo := args.BuildOptions() 606 bo.SetDefaults() 607 incrementalState, fn := bo.IndexState() 608 reason = string(incrementalState) 609 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 610 611 switch incrementalState { 612 case index.IndexStateEqual: 613 debugLog.Printf("%s index already up to date. Shard=%s", args.String(), fn) 614 return indexStateNoop, nil 615 616 case index.IndexStateMeta: 617 infoLog.Printf("updating index.meta %s", args.String()) 618 619 // TODO(stefan) handle mergeMeta for tenant id. 620 if err := mergeMeta(bo); err != nil { 621 errorLog.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 622 } else { 623 return indexStateSuccessMeta, nil 624 } 625 626 case index.IndexStateCorrupt: 627 infoLog.Printf("falling back to full update: corrupt index: %s", args.String()) 628 } 629 } 630 631 infoLog.Printf("updating index %s reason=%s", args.String(), reason) 632 633 metricIndexingTotal.Inc() 634 c := gitIndexConfig{ 635 runCmd: func(cmd *exec.Cmd) error { 636 return s.loggedRun(tr, cmd) 637 }, 638 639 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 640 return args.BuildOptions().FindRepositoryMetadata() 641 }, 642 timeout: s.timeout, 643 } 644 645 err = gitIndex(c, args, s.Sourcegraph, s.logger) 646 if err != nil { 647 return indexStateFail, err 648 } 649 650 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 651 s.logger.Error("failed to update index status", 652 sglog.String("repo", args.Name), 653 sglog.Uint32("id", args.RepoID), 654 sglogBranches("branches", args.Branches), 655 sglog.Error(err), 656 ) 657 } 658 659 return indexStateSuccess, nil 660} 661 662// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 663// it can update the zoekt_repos table. 664func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 665 // We need to read from disk for IndexTime. 666 _, metadata, ok, err := c.findRepositoryMetadata(args) 667 if err != nil { 668 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 669 } 670 if !ok { 671 return errors.New("failed to find metadata for new/updated index") 672 } 673 674 status := []indexStatus{{ 675 RepoID: args.RepoID, 676 Branches: args.Branches, 677 IndexTimeUnix: metadata.IndexTime.Unix(), 678 }} 679 if err := sg.UpdateIndexStatus(status); err != nil { 680 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 681 } 682 683 return nil 684} 685 686func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 687 ss := make([]string, len(branches)) 688 for i, b := range branches { 689 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 690 } 691 return sglog.Strings(key, ss) 692} 693 694func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 695 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0)) 696 return &indexArgs{ 697 IndexOptions: opts, 698 IndexDir: s.IndexDir, 699 Parallelism: parallelism, 700 Incremental: true, 701 FileLimit: MaxFileSize, 702 ShardMerging: s.shardMerging, 703 } 704} 705 706// parallelism consults both the server flags and index options to determine the number 707// of shards to index in parallel. If the CPUCount index option is provided, it always 708// overrides the server flag. 709func (s *Server) parallelism(opts IndexOptions, maxProcs int) int { 710 var parallelism int 711 if opts.ShardConcurrency > 0 { 712 parallelism = int(opts.ShardConcurrency) 713 } else { 714 parallelism = s.CPUCount 715 } 716 717 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs 718 if parallelism > 4*maxProcs { 719 parallelism = 4 * maxProcs 720 } 721 722 // If index concurrency is set, then divide the parallelism by the number of 723 // repos we're indexing in parallel 724 if s.IndexConcurrency > 1 { 725 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency))) 726 } 727 728 return parallelism 729} 730 731func createEmptyShard(args *indexArgs) error { 732 bo := args.BuildOptions() 733 bo.SetDefaults() 734 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 735 736 if args.Incremental && bo.IncrementalSkipIndexing() { 737 return nil 738 } 739 740 builder, err := index.NewBuilder(*bo) 741 if err != nil { 742 return err 743 } 744 return builder.Finish() 745} 746 747// addDebugHandlers adds handlers specific to indexserver. 748func (s *Server) addDebugHandlers(mux *http.ServeMux) { 749 // Sourcegraph's site admin view requires indexserver to serve it's admin view 750 // on "/". 751 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 752 753 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 754 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 755 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 756 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 757 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 758 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 759} 760 761func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 762 if r.Method != "GET" { 763 w.Header().Set("Allow", "GET") 764 w.WriteHeader(http.StatusMethodNotAllowed) 765 return 766 } 767 768 response := struct { 769 Hostname string 770 }{ 771 Hostname: s.hostname, 772 } 773 774 b, err := json.Marshal(response) 775 if err != nil { 776 http.Error(w, err.Error(), http.StatusInternalServerError) 777 return 778 } 779 780 w.Header().Set("Content-Type", "application/json; charset=utf-8") 781 w.Write(b) 782} 783 784var rootTmpl = template.Must(template.New("name").Parse(` 785<html> 786 <body> 787 <a href="debug">Debug</a><br /> 788 <a href="debug/requests">Traces</a><br /> 789 {{.IndexMsg}}<br /> 790 <br /> 791 <h3>Reindex</h3> 792 {{if .Repos}} 793 <a href="?show_repos=false">hide repos</a><br /> 794 <table style="margin-top: 20px"> 795 <th style="text-align:left">Name</th> 796 <th style="text-align:left">ID (click to reindex)</th> 797 {{range .Repos}} 798 <tr> 799 <td>{{.Name}}</td> 800 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 801 </tr> 802 {{end}} 803 </table> 804 {{else}} 805 <a href="?show_repos=true">show repos</a><br /> 806 {{end}} 807 </body> 808</html> 809`)) 810 811func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 812 if r.Method != "GET" { 813 w.Header().Set("Allow", "GET") 814 w.WriteHeader(http.StatusMethodNotAllowed) 815 return 816 } 817 818 values := r.URL.Query() 819 820 // ?id= 821 indexMsg := "" 822 if v := values.Get("id"); v != "" { 823 id, err := strconv.Atoi(v) 824 if err != nil { 825 http.Error(w, err.Error(), http.StatusBadRequest) 826 return 827 } 828 indexMsg, _ = s.forceIndex(uint32(id)) 829 } 830 831 // ?show_repos= 832 showRepos := false 833 if v := values.Get("show_repos"); v != "" { 834 showRepos, _ = strconv.ParseBool(v) 835 } 836 837 type Repo struct { 838 ID uint32 839 Name string 840 } 841 var data struct { 842 Repos []Repo 843 IndexMsg string 844 } 845 846 data.IndexMsg = indexMsg 847 848 if showRepos { 849 s.queue.Iterate(func(opts *IndexOptions) { 850 data.Repos = append(data.Repos, Repo{ 851 ID: opts.RepoID, 852 Name: opts.Name, 853 }) 854 }) 855 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 856 } 857 858 _ = rootTmpl.Execute(w, data) 859} 860 861// handleReindex triggers a reindex asynocronously. If a reindex was triggered 862// the request returns with status 202. The caller can infer the new state of 863// the index by calling List. 864func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 865 if r.Method != http.MethodPost { 866 w.Header().Set("Allow", http.MethodPost) 867 w.WriteHeader(http.StatusMethodNotAllowed) 868 return 869 } 870 871 err := r.ParseForm() 872 if err != nil { 873 http.Error(w, err.Error(), http.StatusBadRequest) 874 return 875 } 876 877 id, err := strconv.Atoi(r.Form.Get("repo")) 878 if err != nil { 879 http.Error(w, err.Error(), http.StatusBadRequest) 880 return 881 } 882 883 go func() { s.forceIndex(uint32(id)) }() 884 885 // 202 Accepted 886 w.WriteHeader(http.StatusAccepted) 887} 888 889func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 890 withIndexed := true 891 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 892 withIndexed = b 893 } 894 895 var indexed []uint32 896 if withIndexed { 897 indexed = listIndexed(s.IndexDir) 898 } 899 900 repos, err := s.Sourcegraph.List(r.Context(), indexed) 901 if err != nil { 902 http.Error(w, err.Error(), http.StatusInternalServerError) 903 return 904 } 905 906 bw := bytes.Buffer{} 907 908 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 909 910 _, err = fmt.Fprintf(tw, "ID\tName\n") 911 if err != nil { 912 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 913 return 914 } 915 916 s.queue.mu.Lock() 917 name := "" 918 for _, id := range repos.IDs { 919 if item := s.queue.get(id); item != nil { 920 name = item.opts.Name 921 } else { 922 name = "" 923 } 924 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 925 if err != nil { 926 debugLog.Printf("handleDebugList: %s\n", err.Error()) 927 } 928 } 929 s.queue.mu.Unlock() 930 931 if err != nil { 932 http.Error(w, err.Error(), http.StatusInternalServerError) 933 return 934 } 935 936 err = tw.Flush() 937 if err != nil { 938 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 939 return 940 } 941 942 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 943 944 if _, err := io.Copy(w, &bw); err != nil { 945 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 946 return 947 } 948} 949 950// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 951// can run this command during periods of low usage (evenings, weekends) to 952// trigger an initial merge run. In the steady-state, merges happen rarely, even 953// on busy instances, and users can rely on automatic merging instead. 954func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 955 // A merge operation can take very long, depending on the number merges and the 956 // target size of the compound shards. We run the merge in the background and 957 // return immediately to the user. 958 // 959 // We track the status of the merge with metricShardMergingRunning. 960 go func() { 961 s.doMerge() 962 }() 963 _, _ = w.Write([]byte("merging enqueued\n")) 964} 965 966func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 967 indexed := listIndexed(s.IndexDir) 968 969 bw := bytes.Buffer{} 970 971 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 972 973 _, err := fmt.Fprintf(tw, "ID\tName\n") 974 if err != nil { 975 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 976 return 977 } 978 979 s.queue.mu.Lock() 980 name := "" 981 for _, id := range indexed { 982 if item := s.queue.get(id); item != nil { 983 name = item.opts.Name 984 } else { 985 name = "" 986 } 987 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 988 if err != nil { 989 debugLog.Printf("handleDebugIndexed: %s\n", err.Error()) 990 } 991 } 992 s.queue.mu.Unlock() 993 994 if err != nil { 995 http.Error(w, err.Error(), http.StatusInternalServerError) 996 return 997 } 998 999 err = tw.Flush() 1000 if err != nil { 1001 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 1002 return 1003 } 1004 1005 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 1006 1007 if _, err := io.Copy(w, &bw); err != nil { 1008 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 1009 return 1010 } 1011} 1012 1013// forceIndex will run the index job for repo name now. It will return always 1014// return a string explaining what it did, even if it failed. 1015func (s *Server) forceIndex(id uint32) (string, error) { 1016 var opts IndexOptions 1017 var err error 1018 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 1019 opts = o 1020 }, func(_ uint32, e error) { 1021 err = e 1022 }, id) 1023 if err != nil { 1024 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 1025 } 1026 1027 args := s.indexArgs(opts) 1028 args.Incremental = false // force re-index 1029 1030 var state indexState 1031 ran := s.muIndexDir.With(opts.Name, func() { 1032 state, err = s.Index(args) 1033 }) 1034 if !ran { 1035 return fmt.Sprintf("index job for repository already running: %s", args), nil 1036 } 1037 if err != nil { 1038 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 1039 } 1040 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 1041} 1042 1043func listIndexed(indexDir string) []uint32 { 1044 index := getShards(indexDir) 1045 metricNumIndexed.Set(float64(len(index))) 1046 repoIDs := make([]uint32, 0, len(index)) 1047 for id := range index { 1048 repoIDs = append(repoIDs, id) 1049 } 1050 sort.Slice(repoIDs, func(i, j int) bool { 1051 return repoIDs[i] < repoIDs[j] 1052 }) 1053 return repoIDs 1054} 1055 1056// setupTmpDir sets up a temporary directory on the same volume as the 1057// indexes. 1058// 1059// If main is true we will delete older temp directories left around. main is 1060// false when this is a debug command. 1061func setupTmpDir(logger sglog.Logger, main bool, index string) error { 1062 // change the target tmp directory depending on if it's our main daemon or a 1063 // debug sub command. 1064 dir := ".indexserver.debug.tmp" 1065 if main { 1066 dir = ".indexserver.tmp" 1067 } 1068 1069 tmpRoot := filepath.Join(index, dir) 1070 1071 if main { 1072 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1073 err := os.RemoveAll(tmpRoot) 1074 if err != nil { 1075 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1076 } 1077 } 1078 1079 if err := os.MkdirAll(tmpRoot, 0o755); err != nil { 1080 return err 1081 } 1082 1083 return os.Setenv("TMPDIR", tmpRoot) 1084} 1085 1086func printMetaData(fn string) error { 1087 repo, indexMeta, err := index.ReadMetadataPath(fn) 1088 if err != nil { 1089 return err 1090 } 1091 1092 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1093 if err != nil { 1094 return err 1095 } 1096 1097 err = json.NewEncoder(os.Stdout).Encode(repo) 1098 if err != nil { 1099 return err 1100 } 1101 return nil 1102} 1103 1104func printShardStats(fn string) error { 1105 f, err := os.Open(fn) 1106 if err != nil { 1107 return err 1108 } 1109 1110 iFile, err := index.NewIndexFile(f) 1111 if err != nil { 1112 return err 1113 } 1114 1115 return index.PrintNgramStats(iFile) 1116} 1117 1118func srcLogLevelIsDebug() bool { 1119 lvl := os.Getenv(sglog.EnvLogLevel) 1120 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1121} 1122 1123func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1124 v := os.Getenv(k) 1125 if v == "" { 1126 return defaultVal 1127 } 1128 b, err := strconv.ParseBool(v) 1129 if err != nil { 1130 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1131 } 1132 return b 1133} 1134 1135func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1136 v := os.Getenv(k) 1137 if v == "" { 1138 return defaultVal 1139 } 1140 i, err := strconv.ParseInt(v, 10, 64) 1141 if err != nil { 1142 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1143 } 1144 return i 1145} 1146 1147func getEnvWithDefaultInt(k string, defaultVal int) int { 1148 v := os.Getenv(k) 1149 if v == "" { 1150 return defaultVal 1151 } 1152 i, err := strconv.Atoi(k) 1153 if err != nil { 1154 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1155 } 1156 return i 1157} 1158 1159func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1160 v := os.Getenv(k) 1161 if v == "" { 1162 return defaultVal 1163 } 1164 i, err := strconv.ParseUint(v, 10, 64) 1165 if err != nil { 1166 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1167 } 1168 return i 1169} 1170 1171func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1172 v := os.Getenv(k) 1173 if v == "" { 1174 return defaultVal 1175 } 1176 f, err := strconv.ParseFloat(v, 64) 1177 if err != nil { 1178 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1179 } 1180 return f 1181} 1182 1183func getEnvWithDefaultString(k string, defaultVal string) string { 1184 v := os.Getenv(k) 1185 if v == "" { 1186 return defaultVal 1187 } 1188 return v 1189} 1190 1191func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1192 v := os.Getenv(k) 1193 if v == "" { 1194 return defaultVal 1195 } 1196 1197 d, err := time.ParseDuration(v) 1198 if err != nil { 1199 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1200 } 1201 return d 1202} 1203 1204func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1205 set := map[string]struct{}{} 1206 for _, v := range strings.Split(os.Getenv(k), ",") { 1207 v = strings.TrimSpace(v) 1208 if v != "" { 1209 set[v] = struct{}{} 1210 } 1211 } 1212 return set 1213} 1214 1215func joinStringSet(set map[string]struct{}, sep string) string { 1216 var xs []string 1217 for x := range set { 1218 xs = append(xs, x) 1219 } 1220 1221 return strings.Join(xs, sep) 1222} 1223 1224func setShardsCounter(indexDir string) { 1225 fns, err := filepath.Glob(filepath.Join(indexDir, "*.zoekt")) 1226 if err != nil { 1227 errorLog.Printf("setShardsCounter: %s\n", err) 1228 return 1229 } 1230 metricNumberShards.Set(float64(len(fns))) 1231 1232 compoundFns := make([]string, 0, len(fns)) 1233 for _, fn := range fns { 1234 if strings.HasPrefix(filepath.Base(fn), "compound-") { 1235 compoundFns = append(compoundFns, fn) 1236 } 1237 } 1238 metricNumberCompoundShards.Set(float64(len(fns))) 1239} 1240 1241func rootCmd() *ffcli.Command { 1242 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1243 conf := rootConfig{ 1244 Main: true, 1245 } 1246 conf.registerRootFlags(rootFs) 1247 1248 return &ffcli.Command{ 1249 FlagSet: rootFs, 1250 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1251 Subcommands: []*ffcli.Command{debugCmd()}, 1252 Exec: func(ctx context.Context, args []string) error { 1253 return startServer(conf) 1254 }, 1255 } 1256} 1257 1258type rootConfig struct { 1259 // Main is true if this rootConfig is for our main long running command (the 1260 // indexserver). Debug commands should not set this value. This is used to 1261 // determine if we need to run tmpfriend. 1262 Main bool 1263 1264 root string 1265 interval time.Duration 1266 index string 1267 indexConcurrency int64 1268 listen string 1269 hostname string 1270 cpuFraction float64 1271 1272 // config values related to shard merging 1273 disableShardMerging bool 1274 vacuumInterval time.Duration 1275 mergeInterval time.Duration 1276 targetSize int64 1277 minSize int64 1278 minAgeDays int 1279 1280 // config values related to backoff indexing repos with one or more consecutive failures 1281 backoffDuration time.Duration 1282 maxBackoffDuration time.Duration 1283} 1284 1285func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1286 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1287 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1288 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently") 1289 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", index.DefaultDir), "set index directory to use") 1290 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1291 fs.StringVar(&rc.hostname, "hostname", index.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1292 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1293 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1294 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1295 1296 // flags related to shard merging 1297 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging") 1298 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1299 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1300 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB") 1301 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB") 1302 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1303} 1304 1305func startServer(conf rootConfig) error { 1306 s, err := newServer(conf) 1307 if err != nil { 1308 return err 1309 } 1310 1311 profiler.Init("zoekt-sourcegraph-indexserver") 1312 setShardsCounter(s.IndexDir) 1313 1314 if conf.listen != "" { 1315 1316 mux := http.NewServeMux() 1317 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1318 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1319 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1320 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1321 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1322 }...) 1323 s.addDebugHandlers(mux) 1324 1325 go func() { 1326 debugLog.Printf("serving HTTP on %s", conf.listen) 1327 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1328 }() 1329 1330 // Serve mux on a unix domain socket on a best-effort-basis so that 1331 // webserver can call the endpoints via the shared filesystem. 1332 // 1333 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1334 // on the socket due to permission errors. See 1335 // https://github.com/docker/for-mac/issues/6239 1336 go func() { 1337 serveHTTPOverSocket := func() error { 1338 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1339 // We cannot bind a socket to an existing pathname. 1340 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1341 return fmt.Errorf("error removing socket file: %s", socket) 1342 } 1343 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1344 // unixpacket). 1345 l, err := net.Listen("unix", socket) 1346 if err != nil { 1347 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1348 } 1349 // Indexserver (root) and webserver (Sourcegraph) run with 1350 // different users. Per default, the socket is created with 1351 // permission 755 (root root), which doesn't let webserver write to 1352 // it. 1353 // 1354 // See https://github.com/golang/go/issues/11822 for more context. 1355 if err := os.Chmod(socket, 0o777); err != nil { 1356 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1357 } 1358 debugLog.Printf("serving HTTP on %s", socket) 1359 return http.Serve(l, mux) 1360 } 1361 debugLog.Print(serveHTTPOverSocket()) 1362 }() 1363 } 1364 1365 oc := &ownerChecker{ 1366 Path: filepath.Join(conf.index, "owner.txt"), 1367 Hostname: conf.hostname, 1368 } 1369 go oc.Run() 1370 1371 logger := sglog.Scoped("metricsRegistration") 1372 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1373 1374 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1375 prometheus.DefaultRegisterer.MustRegister(c) 1376 1377 s.Run() 1378 return nil 1379} 1380 1381func newServer(conf rootConfig) (*Server, error) { 1382 logger := sglog.Scoped("server") 1383 1384 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1385 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1386 } 1387 if conf.index == "" { 1388 return nil, fmt.Errorf("must set -index") 1389 } 1390 if conf.root == "" { 1391 return nil, fmt.Errorf("must set -sourcegraph_url") 1392 } 1393 rootURL, err := url.Parse(conf.root) 1394 if err != nil { 1395 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1396 } 1397 1398 rootURL = addDefaultPort(rootURL) 1399 1400 // Tune GOMAXPROCS to match Linux container CPU quota. 1401 _, _ = maxprocs.Set() 1402 1403 // Automatically prepend our own path at the front, to minimize 1404 // required configuration. 1405 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1406 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1407 } 1408 1409 if _, err := os.Stat(conf.index); err != nil { 1410 if err := os.MkdirAll(conf.index, 0o755); err != nil { 1411 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1412 } 1413 } 1414 1415 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1416 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1417 } 1418 1419 if srcLogLevelIsDebug() { 1420 debugLog.SetOutput(os.Stderr) 1421 } 1422 1423 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1424 if len(reposWithSeparateIndexingMetrics) > 0 { 1425 debugLog.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1426 } 1427 1428 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1429 if len(deltaBuildRepositoriesAllowList) > 0 { 1430 debugLog.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1431 } 1432 1433 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1434 if deltaShardNumberFallbackThreshold > 0 { 1435 debugLog.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1436 } else { 1437 debugLog.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1438 } 1439 1440 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1441 if len(reposShouldSkipSymbolsCalculation) > 0 { 1442 debugLog.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1443 } 1444 1445 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1446 if indexingTimeout != defaultIndexingTimeout { 1447 debugLog.Printf("using configured indexing timeout: %s", indexingTimeout) 1448 } 1449 1450 var sg Sourcegraph 1451 if rootURL.IsAbs() { 1452 var batchSize int 1453 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1454 batchSize, err = strconv.Atoi(v) 1455 if err != nil { 1456 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1457 } 1458 } 1459 1460 opts := []SourcegraphClientOption{ 1461 WithBatchSize(batchSize), 1462 } 1463 1464 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1465 client, err := dialGRPCClient(rootURL.Host, logger) 1466 if err != nil { 1467 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1468 } 1469 1470 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...) 1471 1472 } else { 1473 sg = sourcegraphFake{ 1474 RootDir: rootURL.String(), 1475 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1476 } 1477 } 1478 1479 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1480 if cpuCount < 1 { 1481 cpuCount = 1 1482 } 1483 1484 if conf.indexConcurrency < 1 { 1485 conf.indexConcurrency = 1 1486 } else if conf.indexConcurrency > int64(cpuCount) { 1487 conf.indexConcurrency = int64(cpuCount) 1488 } 1489 1490 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1491 1492 return &Server{ 1493 logger: logger, 1494 Sourcegraph: sg, 1495 IndexDir: conf.index, 1496 IndexConcurrency: int(conf.indexConcurrency), 1497 Interval: conf.interval, 1498 CPUCount: cpuCount, 1499 queue: *q, 1500 shardMerging: !conf.disableShardMerging, 1501 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1502 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1503 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1504 hostname: conf.hostname, 1505 mergeOpts: mergeOpts{ 1506 vacuumInterval: conf.vacuumInterval, 1507 mergeInterval: conf.mergeInterval, 1508 targetSizeBytes: conf.targetSize * 1024 * 1024, 1509 minSizeBytes: conf.minSize * 1024 * 1024, 1510 minAgeDays: conf.minAgeDays, 1511 }, 1512 timeout: indexingTimeout, 1513 }, err 1514} 1515 1516// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1517// for the indexed-search-configuration gRPC service. 1518// 1519// The default backoff strategy is modeled after the default settings used by 1520// retryablehttp.DefaultClient. 1521// 1522// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1523// - Unavailable 1524// - Aborted 1525// 1526//go:embed default_grpc_service_configuration.json 1527var defaultGRPCServiceConfigurationJSON string 1528 1529func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1530 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1531 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1532 return invoker(ctx, method, req, reply, cc, opts...) 1533 } 1534} 1535 1536func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1537 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1538 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1539 return streamer(ctx, desc, cc, method, opts...) 1540 } 1541} 1542 1543// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1544// This can be overridden by providing custom Server/Dial options. 1545const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1546 1547func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) { 1548 metrics := clientMetricsOnce() 1549 1550 // If the service seems to be unavailable, this 1551 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1552 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1553 retryOpts := []retry.CallOption{ 1554 retry.WithMax(5), 1555 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1556 retry.WithCodes(codes.Unavailable), 1557 } 1558 1559 opts := []grpc.DialOption{ 1560 grpc.WithTransportCredentials(insecure.NewCredentials()), 1561 grpc.WithChainStreamInterceptor( 1562 metrics.StreamClientInterceptor(), 1563 messagesize.StreamClientInterceptor, 1564 internalActorStreamInterceptor(), 1565 internalerrs.LoggingStreamClientInterceptor(logger), 1566 internalerrs.PrometheusStreamClientInterceptor, 1567 retry.StreamClientInterceptor(retryOpts...), 1568 ), 1569 grpc.WithChainUnaryInterceptor( 1570 metrics.UnaryClientInterceptor(), 1571 messagesize.UnaryClientInterceptor, 1572 internalActorUnaryInterceptor(), 1573 internalerrs.LoggingUnaryClientInterceptor(logger), 1574 internalerrs.PrometheusUnaryClientInterceptor, 1575 retry.UnaryClientInterceptor(retryOpts...), 1576 ), 1577 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1578 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1579 } 1580 1581 opts = append(opts, additionalOpts...) 1582 1583 // Ensure that the message size options are set last, so they override any other 1584 // client-specific options that tweak the message size. 1585 // 1586 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1587 // take precedence over everything else with a uniform size setting that's easy to reason about. 1588 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1589 1590 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1591 // This is done lazily, so we can provide the client to use regardless of 1592 // whether we enabled gRPC or not initially. 1593 cc, err := grpc.Dial(addr, opts...) 1594 if err != nil { 1595 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1596 } 1597 1598 client := proto.NewZoektConfigurationServiceClient(cc) 1599 return client, nil 1600} 1601 1602// addDefaultPort adds a default port to a URL if one is not specified. 1603// 1604// If the URL scheme is "http" and no port is specified, "80" is used. 1605// If the scheme is "https", "443" is used. 1606// 1607// The original URL is not mutated. A copy is modified and returned. 1608func addDefaultPort(original *url.URL) *url.URL { 1609 if original == nil { 1610 return nil // don't panic 1611 } 1612 1613 if !original.IsAbs() { 1614 return original // don't do anything if the URL doesn't look like a remote URL 1615 } 1616 1617 if original.Scheme == "http" && original.Port() == "" { 1618 u := cloneURL(original) 1619 u.Host = net.JoinHostPort(u.Host, "80") 1620 return u 1621 } 1622 1623 if original.Scheme == "https" && original.Port() == "" { 1624 u := cloneURL(original) 1625 u.Host = net.JoinHostPort(u.Host, "443") 1626 return u 1627 } 1628 1629 return original 1630} 1631 1632// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1633// This is copied from net/http/clone.go 1634func cloneURL(u *url.URL) *url.URL { 1635 if u == nil { 1636 return nil 1637 } 1638 u2 := new(url.URL) 1639 *u2 = *u 1640 if u.User != nil { 1641 u2.User = new(url.Userinfo) 1642 *u2.User = *u.User 1643 } 1644 return u2 1645} 1646 1647func main() { 1648 liblog := sglog.Init(sglog.Resource{ 1649 Name: "zoekt-indexserver", 1650 Version: index.Version, 1651 InstanceID: index.HostnameBestEffort(), 1652 }) 1653 defer liblog.Sync() 1654 1655 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1656 log.Fatal(err) 1657 } 1658} 1659 1660// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1661// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1662// 1663// An error is returned of the provided environment variables fails to parse as a boolean. 1664func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1665 for _, envVar := range envVarNames { 1666 v := os.Getenv(envVar) 1667 if v == "" { 1668 continue 1669 } 1670 1671 b, err := strconv.ParseBool(v) 1672 if err != nil { 1673 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1674 } 1675 1676 return b, nil 1677 } 1678 1679 return defaultBool, nil 1680}