fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1// Licensed under the Apache License, Version 2.0 (the "License"); 2// you may not use this file except in compliance with the License. 3// You may obtain a copy of the License at 4// 5// http://www.apache.org/licenses/LICENSE-2.0 6// 7// Unless required by applicable law or agreed to in writing, software 8// distributed under the License is distributed on an "AS IS" BASIS, 9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 10// See the License for the specific language governing permissions and 11// limitations under the License. 12 13// Command zoekt-sourcegraph-indexserver periodically reindexes repositories 14// from a Sourcegraph instance. It uses a "pull-based" design, where it periodically 15// reaches out to the Sourcegraph instance for the list of repositories to reindex. 16package main 17 18import ( 19 "bytes" 20 "context" 21 _ "embed" 22 "encoding/json" 23 "errors" 24 "flag" 25 "fmt" 26 "html/template" 27 "io" 28 "io/fs" 29 "log" 30 "math" 31 "math/rand" 32 "net" 33 "net/http" 34 "net/url" 35 "os" 36 "os/exec" 37 "os/signal" 38 "path/filepath" 39 "runtime" 40 "sort" 41 "strconv" 42 "strings" 43 "sync" 44 "text/tabwriter" 45 "time" 46 47 grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" 48 "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" 49 "github.com/peterbourgon/ff/v3/ffcli" 50 "github.com/prometheus/client_golang/prometheus" 51 "github.com/prometheus/client_golang/prometheus/promauto" 52 sglog "github.com/sourcegraph/log" 53 "github.com/sourcegraph/mountinfo" 54 55 "github.com/sourcegraph/zoekt" 56 configv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/sourcegraph/zoekt/configuration/v1" 57 indexserverv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/zoekt/indexserver/v1" 58 "github.com/sourcegraph/zoekt/grpc/defaults" 59 "github.com/sourcegraph/zoekt/grpc/grpcutil" 60 "github.com/sourcegraph/zoekt/grpc/internalerrs" 61 "github.com/sourcegraph/zoekt/grpc/messagesize" 62 "github.com/sourcegraph/zoekt/index" 63 "github.com/sourcegraph/zoekt/internal/debugserver" 64 "github.com/sourcegraph/zoekt/internal/profiler" 65 "github.com/sourcegraph/zoekt/internal/tenant" 66 67 "go.uber.org/automaxprocs/maxprocs" 68 "go.uber.org/multierr" 69 "golang.org/x/net/trace" 70 "golang.org/x/sys/unix" 71 "google.golang.org/grpc" 72 "google.golang.org/grpc/codes" 73 "google.golang.org/grpc/credentials/insecure" 74 "google.golang.org/grpc/metadata" 75) 76 77var ( 78 metricResolveRevisionsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 79 Name: "resolve_revisions_seconds", 80 Help: "A histogram of latencies for resolving all repository revisions.", 81 Buckets: prometheus.ExponentialBuckets(1, 10, 6), // 1s -> 27min 82 }) 83 84 metricResolveRevisionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 85 Name: "resolve_revision_seconds", 86 Help: "A histogram of latencies for resolving a repository revision.", 87 Buckets: prometheus.ExponentialBuckets(.25, 2, 4), // 250ms -> 2s 88 }, []string{"success"}) // success=true|false 89 90 metricGetIndexOptions = promauto.NewCounter(prometheus.CounterOpts{ 91 Name: "get_index_options_total", 92 Help: "The total number of times we tried to get index options for a repository. Includes errors.", 93 }) 94 metricGetIndexOptionsError = promauto.NewCounter(prometheus.CounterOpts{ 95 Name: "get_index_options_error_total", 96 Help: "The total number of times we failed to get index options for a repository.", 97 }) 98 99 metricIndexDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 100 Name: "index_repo_seconds", 101 Help: "A histogram of latencies for indexing a repository.", 102 Buckets: prometheus.ExponentialBucketsRange( 103 (100 * time.Millisecond).Seconds(), 104 (40*time.Minute + defaultIndexingTimeout).Seconds(), // add an extra 40 minutes to account for the time it takes to clone the repo 105 20), 106 }, []string{ 107 "state", // state is an indexState 108 "name", // name of the repository that was indexed 109 }) 110 111 metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{ 112 Name: "index_indexing_delay_seconds", 113 Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.", 114 Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days 115 }, []string{ 116 "state", // state is an indexState 117 "name", // the name of the repository that was indexed 118 }) 119 120 metricFetchDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 121 Name: "index_fetch_seconds", 122 Help: "A histogram of latencies for fetching a repository.", 123 Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30, 60, 180, 300, 600}, // 50ms -> 10 minutes 124 }, []string{ 125 "success", // true|false 126 "name", // the name of the repository that the commits were fetched from 127 }) 128 129 metricIndexIncrementalIndexState = promauto.NewCounterVec(prometheus.CounterOpts{ 130 Name: "index_incremental_index_state", 131 Help: "A count of the state on disk vs what we want to build. See zoekt/build.IndexState.", 132 }, []string{"state"}) // state is index.IndexState 133 134 metricNumIndexed = promauto.NewGauge(prometheus.GaugeOpts{ 135 Name: "index_num_indexed", 136 Help: "Number of indexed repos by code host", 137 }) 138 139 metricFailingTotal = promauto.NewCounter(prometheus.CounterOpts{ 140 Name: "index_failing_total", 141 Help: "Counts failures to index (indexing activity, should be used with rate())", 142 }) 143 144 metricIndexingTotal = promauto.NewCounter(prometheus.CounterOpts{ 145 Name: "index_indexing_total", 146 Help: "Counts indexings (indexing activity, should be used with rate())", 147 }) 148 149 metricNumStoppedTrackingTotal = promauto.NewCounter(prometheus.CounterOpts{ 150 Name: "index_num_stopped_tracking_total", 151 Help: "Counts the number of repos we stopped tracking.", 152 }) 153 154 // clientMetricsOnce returns a singleton instance of the client metrics 155 // that are shared across all gRPC clients that this process creates. 156 // 157 // This function panics if the metrics cannot be registered with the default 158 // Prometheus registry. 159 clientMetricsOnce = sync.OnceValue(func() *grpcprom.ClientMetrics { 160 clientMetrics := grpcprom.NewClientMetrics( 161 grpcprom.WithClientCounterOptions(), 162 grpcprom.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request 163 grpcprom.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC 164 grpcprom.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC 165 ) 166 prometheus.DefaultRegisterer.MustRegister(clientMetrics) 167 return clientMetrics 168 }) 169) 170 171// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22 172// NOTE: if you change this, you must also update gitIndex to use the same value when fetching the repo. 173const MaxFileSize = 1 << 20 174 175// set of repositories that we want to capture separate indexing metrics for 176var reposWithSeparateIndexingMetrics = make(map[string]struct{}) 177 178type indexState string 179 180const ( 181 indexStateFail indexState = "fail" 182 indexStateSuccess indexState = "success" 183 indexStateSuccessMeta indexState = "success_meta" // We only updated metadata 184 indexStateNoop indexState = "noop" // We didn't need to update index 185 indexStateEmpty indexState = "empty" // index is empty (empty repo) 186) 187 188// Server is the main functionality of zoekt-sourcegraph-indexserver. It 189// exists to conveniently use all the options passed in via func main. 190type Server struct { 191 logger sglog.Logger 192 193 Sourcegraph Sourcegraph 194 BatchSize int 195 196 // IndexDir is the index directory to use. 197 IndexDir string 198 199 // IndexConcurrency is the number of repositories we index at once. 200 IndexConcurrency int 201 202 // Interval is how often we sync with Sourcegraph. 203 Interval time.Duration 204 205 // CPUCount is the number of CPUs to use for indexing shards. 206 CPUCount int 207 208 queue Queue 209 210 // muIndexDir protects the index directory from concurrent access. 211 muIndexDir indexMutex 212 213 // If true, shard merging is enabled. 214 shardMerging bool 215 216 // deltaBuildRepositoriesAllowList is an allowlist for repositories that we 217 // use delta-builds for instead of normal builds 218 deltaBuildRepositoriesAllowList map[string]struct{} 219 220 // deltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist 221 // before attempting a delta build. 222 deltaShardNumberFallbackThreshold uint64 223 224 // repositoriesSkipSymbolsCalculationAllowList is an allowlist for repositories that 225 // we skip calculating symbols metadata for during builds 226 repositoriesSkipSymbolsCalculationAllowList map[string]struct{} 227 228 hostname string 229 230 mergeOpts mergeOpts 231 232 // timeout defines how long the index server waits before killing an indexing job. 233 timeout time.Duration 234 235 indexserverv1.UnimplementedSourcegraphIndexserverServiceServer 236} 237 238var ( 239 debugLog = log.New(io.Discard, "[DEBUG] ", log.LstdFlags) 240 infoLog = log.New(os.Stderr, "[INFO] ", log.LstdFlags) 241 errorLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags) 242) 243 244// our index commands should output something every 100mb they process. 245// 246// 2020-11-24 Keegan. "This should be rather quick so 5m is more than enough 247// time." famous last words. A client was indexing a monorepo with 42 248// cores... 5m was not enough. 249const noOutputTimeout = 30 * time.Minute 250 251func (s *Server) loggedRun(tr trace.Trace, cmd *exec.Cmd) (err error) { 252 out := &synchronizedBuffer{} 253 cmd.Stdout = out 254 cmd.Stderr = out 255 256 tr.LazyPrintf("%s", cmd.Args) 257 258 defer func() { 259 if err != nil { 260 outS := out.String() 261 tr.LazyPrintf("failed: %v", err) 262 tr.LazyPrintf("output: %s", out) 263 tr.SetError() 264 err = fmt.Errorf("command %s failed: %v\nOUT: %s", cmd.Args, err, outS) 265 } 266 }() 267 268 s.logger.Debug("logged run", sglog.Strings("args", cmd.Args)) 269 270 if err := cmd.Start(); err != nil { 271 return err 272 } 273 274 errC := make(chan error) 275 go func() { 276 errC <- cmd.Wait() 277 }() 278 279 // This channel is set after we have sent sigquit. It allows us to follow up 280 // with a sigkill if the process doesn't quit after sigquit. 281 kill := make(<-chan time.Time) 282 283 lastLen := 0 284 for { 285 select { 286 case <-time.After(noOutputTimeout): 287 // Periodically check if we have had output. If not kill the process. 288 if out.Len() != lastLen { 289 lastLen = out.Len() 290 infoLog.Printf("still running %s", cmd.Args) 291 } else { 292 // Send quit (C-\) first so we get a stack dump. 293 infoLog.Printf("no output for %s, quitting %s", noOutputTimeout, cmd.Args) 294 if err := cmd.Process.Signal(unix.SIGQUIT); err != nil { 295 errorLog.Println("quit failed:", err) 296 } 297 298 // send sigkill if still running in 10s 299 kill = time.After(10 * time.Second) 300 } 301 302 case <-kill: 303 infoLog.Printf("still running, killing %s", cmd.Args) 304 if err := cmd.Process.Kill(); err != nil { 305 errorLog.Println("kill failed:", err) 306 } 307 308 case err := <-errC: 309 if err != nil { 310 return err 311 } 312 313 tr.LazyPrintf("success") 314 return nil 315 } 316 } 317} 318 319// synchronizedBuffer wraps a strings.Builder with a mutex. Used so we can 320// monitor the buffer while it is being written to. 321type synchronizedBuffer struct { 322 mu sync.Mutex 323 b bytes.Buffer 324} 325 326func (sb *synchronizedBuffer) Write(p []byte) (int, error) { 327 sb.mu.Lock() 328 defer sb.mu.Unlock() 329 return sb.b.Write(p) 330} 331 332func (sb *synchronizedBuffer) Len() int { 333 sb.mu.Lock() 334 defer sb.mu.Unlock() 335 return sb.b.Len() 336} 337 338func (sb *synchronizedBuffer) String() string { 339 sb.mu.Lock() 340 defer sb.mu.Unlock() 341 return sb.b.String() 342} 343 344// pauseFileName if present in IndexDir will stop index jobs from 345// running. This is to make it possible to experiment with the content of the 346// IndexDir without the indexserver writing to it. 347const pauseFileName = "PAUSE" 348 349// Run the sync loop. This blocks forever. 350func (s *Server) Run() { 351 removeIncompleteShards(s.IndexDir) 352 353 // Start a goroutine which updates the queue with commits to index. 354 go func() { 355 // We update the list of indexed repos every Interval. To speed up manual 356 // testing we also listen for SIGUSR1 to trigger updates. 357 // 358 // "pkill -SIGUSR1 zoekt-sourcegra" 359 for range jitterTicker(s.Interval, unix.SIGUSR1) { 360 if b, err := os.ReadFile(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 361 infoLog.Printf("indexserver manually paused via PAUSE file: %s", string(bytes.TrimSpace(b))) 362 continue 363 } 364 365 repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) 366 if err != nil { 367 errorLog.Printf("error listing repos: %s", err) 368 continue 369 } 370 371 debugLog.Printf("updating index queue with %d repositories", len(repos.IDs)) 372 373 // Stop indexing repos we don't need to track anymore 374 removed := s.queue.MaybeRemoveMissing(repos.IDs) 375 metricNumStoppedTrackingTotal.Add(float64(len(removed))) 376 if len(removed) > 0 { 377 infoLog.Printf("stopped tracking %d repositories: %s", len(removed), formatListUint32(removed, 5)) 378 } 379 380 cleanupDone := make(chan struct{}) 381 go func() { 382 defer close(cleanupDone) 383 s.muIndexDir.Global(func() { 384 cleanup(s.IndexDir, repos.IDs, time.Now(), s.shardMerging) 385 }) 386 }() 387 388 repos.IterateIndexOptions(s.queue.AddOrUpdate) 389 390 // IterateIndexOptions will only iterate over repositories that have 391 // changed since we last called list. However, we want to add all IDs 392 // back onto the queue just to check that what is on disk is still 393 // correct. This will use the last IndexOptions we stored in the 394 // queue. The repositories not on the queue (missing) need a forced 395 // fetch of IndexOptions. 396 missing := s.queue.Bump(repos.IDs) 397 s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, func(uint32, error) {}, missing...) 398 399 setShardsCounter(s.IndexDir) 400 401 <-cleanupDone 402 } 403 }() 404 405 go func() { 406 for range jitterTicker(s.mergeOpts.vacuumInterval, unix.SIGUSR1) { 407 if s.shardMerging { 408 s.vacuum() 409 } 410 } 411 }() 412 413 go func() { 414 for range jitterTicker(s.mergeOpts.mergeInterval, unix.SIGUSR1) { 415 if s.shardMerging { 416 s.doMerge() 417 } 418 } 419 }() 420 421 for i := 0; i < s.IndexConcurrency; i++ { 422 go s.processQueue() 423 } 424 425 // block forever 426 select {} 427} 428 429// formatList returns a comma-separated list of the first min(len(v), m) items. 430func formatListUint32(v []uint32, m int) string { 431 if len(v) < m { 432 m = len(v) 433 } 434 435 sb := strings.Builder{} 436 for i := 0; i < m; i++ { 437 fmt.Fprintf(&sb, "%d, ", v[i]) 438 } 439 440 if len(v) > m { 441 sb.WriteString("...") 442 } 443 444 return strings.TrimRight(sb.String(), ", ") 445} 446 447func (s *Server) processQueue() { 448 for { 449 if _, err := os.Stat(filepath.Join(s.IndexDir, pauseFileName)); err == nil { 450 time.Sleep(time.Second) 451 continue 452 } 453 454 item, ok := s.queue.Pop() 455 if !ok { 456 time.Sleep(time.Second) 457 continue 458 } 459 460 opts := item.Opts 461 args := s.indexArgs(opts) 462 463 ran := s.muIndexDir.With(opts.Name, func() { 464 // only record time taken once we hold the lock. This avoids us 465 // recording time taken while merging/cleanup runs. 466 start := time.Now() 467 468 state, err := s.Index(args) 469 470 elapsed := time.Since(start) 471 metricIndexDuration.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(elapsed.Seconds()) 472 473 indexDelay := time.Since(item.DateAddedToQueue) 474 metricIndexingDelay.WithLabelValues(string(state), repoNameForMetric(opts.Name)).Observe(indexDelay.Seconds()) 475 476 if err != nil { 477 errorLog.Printf("error indexing %s: %s", args.String(), err) 478 } 479 480 switch state { 481 case indexStateSuccess: 482 var branches []string 483 for _, b := range args.Branches { 484 branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version)) 485 } 486 s.logger.Info("updated index", 487 sglog.Int("tenant", args.TenantID), 488 sglog.String("repo", args.Name), 489 sglog.Uint32("id", args.RepoID), 490 sglog.Strings("branches", branches), 491 sglog.Duration("duration", elapsed), 492 sglog.Duration("index_delay", indexDelay), 493 ) 494 case indexStateSuccessMeta: 495 infoLog.Printf("updated meta %s in %v", args.String(), elapsed) 496 } 497 s.queue.SetIndexed(opts, state) 498 }) 499 500 if !ran { 501 // Someone else is processing the repository. We can just skip this job 502 // since the repository will be added back to the queue and we will 503 // converge to the correct behaviour. 504 debugLog.Printf("index job for repository already running: %s", args) 505 continue 506 } 507 } 508} 509 510// repoNameForMetric returns a normalized version of the given repository name that is 511// suitable for use with Prometheus metrics. 512func repoNameForMetric(repo string) string { 513 // Check to see if we want to be able to capture separate indexing metrics for this repository. 514 // If we don't, set to a default string to keep the cardinality for the Prometheus metric manageable. 515 if _, ok := reposWithSeparateIndexingMetrics[repo]; ok { 516 return repo 517 } 518 519 return "" 520} 521 522func batched(slice []uint32, size int) <-chan []uint32 { 523 c := make(chan []uint32) 524 go func() { 525 for len(slice) > 0 { 526 if size > len(slice) { 527 size = len(slice) 528 } 529 c <- slice[:size] 530 slice = slice[size:] 531 } 532 close(c) 533 }() 534 return c 535} 536 537// jitterTicker returns a ticker which ticks with a jitter. Each tick is 538// uniformly selected from the range (d/2, d + d/2). It will tick on creation. 539// 540// sig is a list of signals which also cause the ticker to fire. This is a 541// convenience to allow manually triggering of the ticker. 542func jitterTicker(d time.Duration, sig ...os.Signal) <-chan struct{} { 543 ticker := make(chan struct{}) 544 545 go func() { 546 for { 547 ticker <- struct{}{} 548 ns := int64(d) 549 jitter := rand.Int63n(ns) 550 time.Sleep(time.Duration(ns/2 + jitter)) 551 } 552 }() 553 554 go func() { 555 if len(sig) == 0 { 556 return 557 } 558 559 c := make(chan os.Signal, 1) 560 signal.Notify(c, sig...) 561 for range c { 562 ticker <- struct{}{} 563 } 564 }() 565 566 return ticker 567} 568 569// Index starts an index job for repo name at commit. 570func (s *Server) Index(args *indexArgs) (state indexState, err error) { 571 tr := trace.New("index", args.Name) 572 tr.SetMaxEvents(30) // Ensure we capture all indexing events 573 574 defer func() { 575 if err != nil { 576 tr.SetError() 577 tr.LazyPrintf("error: %v", err) 578 state = indexStateFail 579 metricFailingTotal.Inc() 580 } 581 tr.LazyPrintf("state: %s", state) 582 tr.Finish() 583 }() 584 585 // Sourcegraph should always provide a tenant ID. 586 if args.TenantID < 1 { 587 return indexStateFail, tenant.ErrMissingTenant 588 } 589 590 tr.LazyPrintf("branches: %v", args.Branches) 591 592 if len(args.Branches) == 0 { 593 return indexStateEmpty, createEmptyShard(args) 594 } 595 596 repositoryName := args.Name 597 if _, ok := s.deltaBuildRepositoriesAllowList[repositoryName]; ok { 598 tr.LazyPrintf("marking this repository for delta build") 599 args.UseDelta = true 600 } 601 602 args.DeltaShardNumberFallbackThreshold = s.deltaShardNumberFallbackThreshold 603 604 if _, ok := s.repositoriesSkipSymbolsCalculationAllowList[repositoryName]; ok { 605 tr.LazyPrintf("skipping symbols calculation") 606 args.Symbols = false 607 } 608 609 reason := "forced" 610 611 if args.Incremental { 612 bo := args.BuildOptions() 613 bo.SetDefaults() 614 incrementalState, fn := bo.IndexState() 615 reason = string(incrementalState) 616 metricIndexIncrementalIndexState.WithLabelValues(string(incrementalState)).Inc() 617 618 switch incrementalState { 619 case index.IndexStateEqual: 620 debugLog.Printf("%s index already up to date. Shard=%s", args.String(), fn) 621 return indexStateNoop, nil 622 623 case index.IndexStateMeta: 624 infoLog.Printf("updating index.meta %s", args.String()) 625 626 // TODO(stefan) handle mergeMeta for tenant id. 627 if err := mergeMeta(bo); err != nil { 628 errorLog.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err) 629 } else { 630 return indexStateSuccessMeta, nil 631 } 632 633 case index.IndexStateCorrupt: 634 infoLog.Printf("falling back to full update: corrupt index: %s", args.String()) 635 } 636 } 637 638 infoLog.Printf("updating index %s reason=%s", args.String(), reason) 639 640 metricIndexingTotal.Inc() 641 c := gitIndexConfig{ 642 runCmd: func(cmd *exec.Cmd) error { 643 return s.loggedRun(tr, cmd) 644 }, 645 646 findRepositoryMetadata: func(args *indexArgs) (repository *zoekt.Repository, metadata *zoekt.IndexMetadata, ok bool, err error) { 647 return args.BuildOptions().FindRepositoryMetadata() 648 }, 649 timeout: s.timeout, 650 } 651 652 err = gitIndex(c, args, s.Sourcegraph, s.logger) 653 if err != nil { 654 return indexStateFail, err 655 } 656 657 if err := updateIndexStatusOnSourcegraph(c, args, s.Sourcegraph); err != nil { 658 s.logger.Error("failed to update index status", 659 sglog.String("repo", args.Name), 660 sglog.Uint32("id", args.RepoID), 661 sglogBranches("branches", args.Branches), 662 sglog.Error(err), 663 ) 664 } 665 666 return indexStateSuccess, nil 667} 668 669// updateIndexStatusOnSourcegraph pushes the current state to sourcegraph so 670// it can update the zoekt_repos table. 671func updateIndexStatusOnSourcegraph(c gitIndexConfig, args *indexArgs, sg Sourcegraph) error { 672 // We need to read from disk for IndexTime. 673 _, metadata, ok, err := c.findRepositoryMetadata(args) 674 if err != nil { 675 return fmt.Errorf("failed to read metadata for new/updated index: %w", err) 676 } 677 if !ok { 678 return errors.New("failed to find metadata for new/updated index") 679 } 680 681 status := []indexStatus{{ 682 RepoID: args.RepoID, 683 Branches: args.Branches, 684 IndexTimeUnix: metadata.IndexTime.Unix(), 685 }} 686 if err := sg.UpdateIndexStatus(status); err != nil { 687 return fmt.Errorf("failed to update sourcegraph with status: %w", err) 688 } 689 690 return nil 691} 692 693func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { 694 ss := make([]string, len(branches)) 695 for i, b := range branches { 696 ss[i] = fmt.Sprintf("%s=%s", b.Name, b.Version) 697 } 698 return sglog.Strings(key, ss) 699} 700 701func (s *Server) indexArgs(opts IndexOptions) *indexArgs { 702 parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0)) 703 return &indexArgs{ 704 IndexOptions: opts, 705 IndexDir: s.IndexDir, 706 Parallelism: parallelism, 707 Incremental: true, 708 FileLimit: MaxFileSize, 709 ShardMerging: s.shardMerging, 710 } 711} 712 713// parallelism consults both the server flags and index options to determine the number 714// of shards to index in parallel. If the CPUCount index option is provided, it always 715// overrides the server flag. 716func (s *Server) parallelism(opts IndexOptions, maxProcs int) int { 717 var parallelism int 718 if opts.ShardConcurrency > 0 { 719 parallelism = int(opts.ShardConcurrency) 720 } else { 721 parallelism = s.CPUCount 722 } 723 724 // In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs 725 if parallelism > 4*maxProcs { 726 parallelism = 4 * maxProcs 727 } 728 729 // If index concurrency is set, then divide the parallelism by the number of 730 // repos we're indexing in parallel 731 if s.IndexConcurrency > 1 { 732 parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency))) 733 } 734 735 return parallelism 736} 737 738func createEmptyShard(args *indexArgs) error { 739 bo := args.BuildOptions() 740 bo.SetDefaults() 741 bo.RepositoryDescription.Branches = []zoekt.RepositoryBranch{{Name: "HEAD", Version: "404aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} 742 743 if args.Incremental && bo.IncrementalSkipIndexing() { 744 return nil 745 } 746 747 builder, err := index.NewBuilder(*bo) 748 if err != nil { 749 return err 750 } 751 return builder.Finish() 752} 753 754// addDebugHandlers adds handlers specific to indexserver. 755func (s *Server) addDebugHandlers(mux *http.ServeMux) { 756 // Sourcegraph's site admin view requires indexserver to serve it's admin view 757 // on "/". 758 mux.Handle("/", http.HandlerFunc(s.handleRoot)) 759 760 mux.Handle("/debug/reindex", http.HandlerFunc(s.handleReindex)) 761 mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) 762 mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) 763 mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) 764 mux.Handle("/debug/queue", http.HandlerFunc(s.queue.handleDebugQueue)) 765 mux.Handle("/debug/host", http.HandlerFunc(s.handleHost)) 766} 767 768func (s *Server) handleHost(w http.ResponseWriter, r *http.Request) { 769 if r.Method != "GET" { 770 w.Header().Set("Allow", "GET") 771 w.WriteHeader(http.StatusMethodNotAllowed) 772 return 773 } 774 775 response := struct { 776 Hostname string 777 }{ 778 Hostname: s.hostname, 779 } 780 781 b, err := json.Marshal(response) 782 if err != nil { 783 http.Error(w, err.Error(), http.StatusInternalServerError) 784 return 785 } 786 787 w.Header().Set("Content-Type", "application/json; charset=utf-8") 788 w.Write(b) 789} 790 791var rootTmpl = template.Must(template.New("name").Parse(` 792<html> 793 <body> 794 <a href="debug">Debug</a><br /> 795 <a href="debug/requests">Traces</a><br /> 796 {{.IndexMsg}}<br /> 797 <br /> 798 <h3>Reindex</h3> 799 {{if .Repos}} 800 <a href="?show_repos=false">hide repos</a><br /> 801 <table style="margin-top: 20px"> 802 <th style="text-align:left">Name</th> 803 <th style="text-align:left">ID (click to reindex)</th> 804 {{range .Repos}} 805 <tr> 806 <td>{{.Name}}</td> 807 <td><a href="?id={{.ID}}&show_repos=true">{{.ID}}</a></id> 808 </tr> 809 {{end}} 810 </table> 811 {{else}} 812 <a href="?show_repos=true">show repos</a><br /> 813 {{end}} 814 </body> 815</html> 816`)) 817 818func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { 819 if r.Method != "GET" { 820 w.Header().Set("Allow", "GET") 821 w.WriteHeader(http.StatusMethodNotAllowed) 822 return 823 } 824 825 values := r.URL.Query() 826 827 // ?id= 828 indexMsg := "" 829 if v := values.Get("id"); v != "" { 830 id, err := strconv.Atoi(v) 831 if err != nil { 832 http.Error(w, err.Error(), http.StatusBadRequest) 833 return 834 } 835 indexMsg, _ = s.forceIndex(uint32(id)) 836 } 837 838 // ?show_repos= 839 showRepos := false 840 if v := values.Get("show_repos"); v != "" { 841 showRepos, _ = strconv.ParseBool(v) 842 } 843 844 type Repo struct { 845 ID uint32 846 Name string 847 } 848 var data struct { 849 Repos []Repo 850 IndexMsg string 851 } 852 853 data.IndexMsg = indexMsg 854 855 if showRepos { 856 s.queue.Iterate(func(opts *IndexOptions) { 857 data.Repos = append(data.Repos, Repo{ 858 ID: opts.RepoID, 859 Name: opts.Name, 860 }) 861 }) 862 sort.Slice(data.Repos, func(i, j int) bool { return data.Repos[i].Name < data.Repos[j].Name }) 863 } 864 865 _ = rootTmpl.Execute(w, data) 866} 867 868// handleReindex triggers a reindex asynocronously. If a reindex was triggered 869// the request returns with status 202. The caller can infer the new state of 870// the index by calling List. 871func (s *Server) handleReindex(w http.ResponseWriter, r *http.Request) { 872 if r.Method != http.MethodPost { 873 w.Header().Set("Allow", http.MethodPost) 874 w.WriteHeader(http.StatusMethodNotAllowed) 875 return 876 } 877 878 err := r.ParseForm() 879 if err != nil { 880 http.Error(w, err.Error(), http.StatusBadRequest) 881 return 882 } 883 884 id, err := strconv.Atoi(r.Form.Get("repo")) 885 if err != nil { 886 http.Error(w, err.Error(), http.StatusBadRequest) 887 return 888 } 889 890 go func() { s.forceIndex(uint32(id)) }() 891 892 // 202 Accepted 893 w.WriteHeader(http.StatusAccepted) 894} 895 896func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { 897 withIndexed := true 898 if b, err := strconv.ParseBool(r.URL.Query().Get("indexed")); err == nil { 899 withIndexed = b 900 } 901 902 var indexed []uint32 903 if withIndexed { 904 indexed = listIndexed(s.IndexDir) 905 } 906 907 repos, err := s.Sourcegraph.List(r.Context(), indexed) 908 if err != nil { 909 http.Error(w, err.Error(), http.StatusInternalServerError) 910 return 911 } 912 913 bw := bytes.Buffer{} 914 915 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 916 917 _, err = fmt.Fprintf(tw, "ID\tName\n") 918 if err != nil { 919 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 920 return 921 } 922 923 s.queue.mu.Lock() 924 name := "" 925 for _, id := range repos.IDs { 926 if item := s.queue.get(id); item != nil { 927 name = item.opts.Name 928 } else { 929 name = "" 930 } 931 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 932 if err != nil { 933 debugLog.Printf("handleDebugList: %s\n", err.Error()) 934 } 935 } 936 s.queue.mu.Unlock() 937 938 if err != nil { 939 http.Error(w, err.Error(), http.StatusInternalServerError) 940 return 941 } 942 943 err = tw.Flush() 944 if err != nil { 945 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 946 return 947 } 948 949 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 950 951 if _, err := io.Copy(w, &bw); err != nil { 952 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 953 return 954 } 955} 956 957// handleDebugMerge triggers a merge even if shard merging is not enabled. Users 958// can run this command during periods of low usage (evenings, weekends) to 959// trigger an initial merge run. In the steady-state, merges happen rarely, even 960// on busy instances, and users can rely on automatic merging instead. 961func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { 962 // A merge operation can take very long, depending on the number merges and the 963 // target size of the compound shards. We run the merge in the background and 964 // return immediately to the user. 965 // 966 // We track the status of the merge with metricShardMergingRunning. 967 go func() { 968 s.doMerge() 969 }() 970 _, _ = w.Write([]byte("merging enqueued\n")) 971} 972 973func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { 974 indexed := listIndexed(s.IndexDir) 975 976 bw := bytes.Buffer{} 977 978 tw := tabwriter.NewWriter(&bw, 16, 8, 4, ' ', 0) 979 980 _, err := fmt.Fprintf(tw, "ID\tName\n") 981 if err != nil { 982 http.Error(w, fmt.Sprintf("writing column headers: %s", err), http.StatusInternalServerError) 983 return 984 } 985 986 s.queue.mu.Lock() 987 name := "" 988 for _, id := range indexed { 989 if item := s.queue.get(id); item != nil { 990 name = item.opts.Name 991 } else { 992 name = "" 993 } 994 _, err = fmt.Fprintf(tw, "%d\t%s\n", id, name) 995 if err != nil { 996 debugLog.Printf("handleDebugIndexed: %s\n", err.Error()) 997 } 998 } 999 s.queue.mu.Unlock() 1000 1001 if err != nil { 1002 http.Error(w, err.Error(), http.StatusInternalServerError) 1003 return 1004 } 1005 1006 err = tw.Flush() 1007 if err != nil { 1008 http.Error(w, fmt.Sprintf("flushing tabwriter: %s", err), http.StatusInternalServerError) 1009 return 1010 } 1011 1012 w.Header().Set("Content-Length", strconv.Itoa(bw.Len())) 1013 1014 if _, err := io.Copy(w, &bw); err != nil { 1015 http.Error(w, fmt.Sprintf("copying output to response writer: %s", err), http.StatusInternalServerError) 1016 return 1017 } 1018} 1019 1020// forceIndex will run the index job for repo name now. It will return always 1021// return a string explaining what it did, even if it failed. 1022func (s *Server) forceIndex(id uint32) (string, error) { 1023 var opts IndexOptions 1024 var err error 1025 s.Sourcegraph.ForceIterateIndexOptions(func(o IndexOptions) { 1026 opts = o 1027 }, func(_ uint32, e error) { 1028 err = e 1029 }, id) 1030 if err != nil { 1031 return fmt.Sprintf("Indexing %d failed: %v", id, err), err 1032 } 1033 1034 args := s.indexArgs(opts) 1035 args.Incremental = false // force re-index 1036 1037 var state indexState 1038 ran := s.muIndexDir.With(opts.Name, func() { 1039 state, err = s.Index(args) 1040 }) 1041 if !ran { 1042 return fmt.Sprintf("index job for repository already running: %s", args), nil 1043 } 1044 if err != nil { 1045 return fmt.Sprintf("Indexing %s failed: %s", args.String(), err), err 1046 } 1047 return fmt.Sprintf("Indexed %s with state %s", args.String(), state), nil 1048} 1049 1050// DeleteAllData deletes all shards in the index and trash dir belonging to the 1051// tenant associated with the request. The deletion is best-effort, which means 1052// we will delete as much as possible. If no error is returned, the caller can 1053// be certain that all data has been deleted. 1054func (s *Server) DeleteAllData(ctx context.Context, _ *indexserverv1.DeleteAllDataRequest) (*indexserverv1.DeleteAllDataResponse, error) { 1055 tnt, err := tenant.FromContext(ctx) 1056 if err != nil { 1057 return nil, err 1058 } 1059 s.logger.Warn("DeleteAllData", sglog.Int("tenant_id", tnt.ID())) 1060 1061 var merr error 1062 s.muIndexDir.Global(func() { 1063 // First, explode all compound shards that have repos from the tenant in 1064 // question. Because we hold the global lock, we can be sure that no new 1065 // merges start while we do this. 1066 if err := s.explodeTenantCompoundShards(ctx, func(path string) error { 1067 // We call explode in a separate process to protect indexserver. 1068 cmd := defaultExplodeCmd(path) 1069 1070 stdoutBuf := &bytes.Buffer{} 1071 stderrBuf := &bytes.Buffer{} 1072 cmd.Stdout = stdoutBuf 1073 cmd.Stderr = stderrBuf 1074 1075 err := cmd.Run() 1076 if err != nil { 1077 errorLog.Printf("explode failed: %v (stderr: %s)", err, stderrBuf.String()) 1078 return err 1079 } 1080 1081 infoLog.Printf("exploded shard: %s", stdoutBuf.String()) 1082 1083 return nil 1084 }); err != nil { 1085 merr = multierr.Append(merr, err) 1086 } 1087 1088 // Invariant: all shards from the tenant are simple shards. 1089 1090 if err := purgeTenantShards(ctx, s.IndexDir); err != nil { 1091 merr = multierr.Append(merr, err) 1092 } 1093 if err := purgeTenantShards(ctx, filepath.Join(s.IndexDir, ".trash")); err != nil { 1094 merr = multierr.Append(merr, err) 1095 } 1096 }) 1097 1098 return &indexserverv1.DeleteAllDataResponse{}, merr 1099} 1100 1101func listIndexed(indexDir string) []uint32 { 1102 index := getShards(indexDir) 1103 metricNumIndexed.Set(float64(len(index))) 1104 repoIDs := make([]uint32, 0, len(index)) 1105 for id := range index { 1106 repoIDs = append(repoIDs, id) 1107 } 1108 sort.Slice(repoIDs, func(i, j int) bool { 1109 return repoIDs[i] < repoIDs[j] 1110 }) 1111 return repoIDs 1112} 1113 1114// setupTmpDir sets up a temporary directory on the same volume as the 1115// indexes. 1116// 1117// If main is true we will delete older temp directories left around. main is 1118// false when this is a debug command. 1119func setupTmpDir(logger sglog.Logger, main bool, index string) error { 1120 // change the target tmp directory depending on if it's our main daemon or a 1121 // debug sub command. 1122 dir := ".indexserver.debug.tmp" 1123 if main { 1124 dir = ".indexserver.tmp" 1125 } 1126 1127 tmpRoot := filepath.Join(index, dir) 1128 1129 if main { 1130 logger.Info("removing tmp dir", sglog.String("tmpRoot", tmpRoot)) 1131 err := os.RemoveAll(tmpRoot) 1132 if err != nil { 1133 logger.Error("failed to remove tmp dir", sglog.String("tmpRoot", tmpRoot), sglog.Error(err)) 1134 } 1135 } 1136 1137 if err := os.MkdirAll(tmpRoot, 0o755); err != nil { 1138 return err 1139 } 1140 1141 return os.Setenv("TMPDIR", tmpRoot) 1142} 1143 1144func printMetaData(fn string) error { 1145 repo, indexMeta, err := index.ReadMetadataPath(fn) 1146 if err != nil { 1147 return err 1148 } 1149 1150 err = json.NewEncoder(os.Stdout).Encode(indexMeta) 1151 if err != nil { 1152 return err 1153 } 1154 1155 err = json.NewEncoder(os.Stdout).Encode(repo) 1156 if err != nil { 1157 return err 1158 } 1159 return nil 1160} 1161 1162func printShardStats(fn string) error { 1163 f, err := os.Open(fn) 1164 if err != nil { 1165 return err 1166 } 1167 1168 iFile, err := index.NewIndexFile(f) 1169 if err != nil { 1170 return err 1171 } 1172 1173 return index.PrintNgramStats(iFile) 1174} 1175 1176func srcLogLevelIsDebug() bool { 1177 lvl := os.Getenv(sglog.EnvLogLevel) 1178 return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug") 1179} 1180 1181func getEnvWithDefaultBool(k string, defaultVal bool) bool { 1182 v := os.Getenv(k) 1183 if v == "" { 1184 return defaultVal 1185 } 1186 b, err := strconv.ParseBool(v) 1187 if err != nil { 1188 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1189 } 1190 return b 1191} 1192 1193func getEnvWithDefaultInt64(k string, defaultVal int64) int64 { 1194 v := os.Getenv(k) 1195 if v == "" { 1196 return defaultVal 1197 } 1198 i, err := strconv.ParseInt(v, 10, 64) 1199 if err != nil { 1200 log.Fatalf("error parsing ENV %s to int64: %s", k, err) 1201 } 1202 return i 1203} 1204 1205func getEnvWithDefaultInt(k string, defaultVal int) int { 1206 v := os.Getenv(k) 1207 if v == "" { 1208 return defaultVal 1209 } 1210 i, err := strconv.Atoi(k) 1211 if err != nil { 1212 log.Fatalf("error parsing ENV %s to int: %s", k, err) 1213 } 1214 return i 1215} 1216 1217func getEnvWithDefaultUint64(k string, defaultVal uint64) uint64 { 1218 v := os.Getenv(k) 1219 if v == "" { 1220 return defaultVal 1221 } 1222 i, err := strconv.ParseUint(v, 10, 64) 1223 if err != nil { 1224 log.Fatalf("error parsing ENV %s to uint64: %s", k, err) 1225 } 1226 return i 1227} 1228 1229func getEnvWithDefaultFloat64(k string, defaultVal float64) float64 { 1230 v := os.Getenv(k) 1231 if v == "" { 1232 return defaultVal 1233 } 1234 f, err := strconv.ParseFloat(v, 64) 1235 if err != nil { 1236 log.Fatalf("error parsing ENV %s to float64: %s", k, err) 1237 } 1238 return f 1239} 1240 1241func getEnvWithDefaultString(k string, defaultVal string) string { 1242 v := os.Getenv(k) 1243 if v == "" { 1244 return defaultVal 1245 } 1246 return v 1247} 1248 1249func getEnvWithDefaultDuration(k string, defaultVal time.Duration) time.Duration { 1250 v := os.Getenv(k) 1251 if v == "" { 1252 return defaultVal 1253 } 1254 1255 d, err := time.ParseDuration(v) 1256 if err != nil { 1257 log.Fatalf("error parsing ENV %s to duration: %s", k, err) 1258 } 1259 return d 1260} 1261 1262func getEnvWithDefaultEmptySet(k string) map[string]struct{} { 1263 set := map[string]struct{}{} 1264 for _, v := range strings.Split(os.Getenv(k), ",") { 1265 v = strings.TrimSpace(v) 1266 if v != "" { 1267 set[v] = struct{}{} 1268 } 1269 } 1270 return set 1271} 1272 1273func joinStringSet(set map[string]struct{}, sep string) string { 1274 var xs []string 1275 for x := range set { 1276 xs = append(xs, x) 1277 } 1278 1279 return strings.Join(xs, sep) 1280} 1281 1282func setShardsCounter(indexDir string) { 1283 fns, err := filepath.Glob(filepath.Join(indexDir, "*.zoekt")) 1284 if err != nil { 1285 errorLog.Printf("setShardsCounter: %s\n", err) 1286 return 1287 } 1288 metricNumberShards.Set(float64(len(fns))) 1289 1290 compoundFns := make([]string, 0, len(fns)) 1291 for _, fn := range fns { 1292 if strings.HasPrefix(filepath.Base(fn), "compound-") { 1293 compoundFns = append(compoundFns, fn) 1294 } 1295 } 1296 metricNumberCompoundShards.Set(float64(len(compoundFns))) 1297} 1298 1299func rootCmd() *ffcli.Command { 1300 rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) 1301 conf := rootConfig{ 1302 Main: true, 1303 } 1304 conf.registerRootFlags(rootFs) 1305 1306 return &ffcli.Command{ 1307 FlagSet: rootFs, 1308 ShortUsage: "zoekt-sourcegraph-indexserver [flags] [<subcommand>]", 1309 Subcommands: []*ffcli.Command{debugCmd()}, 1310 Exec: func(ctx context.Context, args []string) error { 1311 return startServer(conf) 1312 }, 1313 } 1314} 1315 1316type rootConfig struct { 1317 // Main is true if this rootConfig is for our main long running command (the 1318 // indexserver). Debug commands should not set this value. This is used to 1319 // determine if we need to run tmpfriend. 1320 Main bool 1321 1322 root string 1323 interval time.Duration 1324 index string 1325 indexConcurrency int64 1326 listen string 1327 hostname string 1328 cpuFraction float64 1329 1330 // config values related to shard merging 1331 disableShardMerging bool 1332 vacuumInterval time.Duration 1333 mergeInterval time.Duration 1334 targetSize int64 1335 minSize int64 1336 minAgeDays int 1337 1338 // config values related to backoff indexing repos with one or more consecutive failures 1339 backoffDuration time.Duration 1340 maxBackoffDuration time.Duration 1341} 1342 1343func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { 1344 fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") 1345 fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") 1346 fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently") 1347 fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", index.DefaultDir), "set index directory to use") 1348 fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") 1349 fs.StringVar(&rc.hostname, "hostname", index.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") 1350 fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") 1351 fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.") 1352 fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.") 1353 1354 // flags related to shard merging 1355 fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging") 1356 fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often") 1357 fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often") 1358 fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB") 1359 fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB") 1360 fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.") 1361} 1362 1363func startServer(conf rootConfig) error { 1364 s, err := newServer(conf) 1365 if err != nil { 1366 return err 1367 } 1368 1369 profiler.Init("zoekt-sourcegraph-indexserver") 1370 setShardsCounter(s.IndexDir) 1371 1372 if conf.listen != "" { 1373 1374 mux := http.NewServeMux() 1375 debugserver.AddHandlers(mux, true, []debugserver.DebugPage{ 1376 {Href: "debug/indexed", Text: "Indexed", Description: "list of all indexed repositories"}, 1377 {Href: "debug/list?indexed=false", Text: "Assigned (this instance)", Description: "list of all repositories that are assigned to this instance"}, 1378 {Href: "debug/list?indexed=true", Text: "Assigned (all)", Description: "same as above, but includes repositories which this instance temporarily holds during re-balancing"}, 1379 {Href: "debug/queue", Text: "Indexing Queue State", Description: "list of all repositories in the indexing queue, sorted by descending priority"}, 1380 }...) 1381 s.addDebugHandlers(mux) 1382 1383 go func() { 1384 debugLog.Printf("serving HTTP on %s", conf.listen) 1385 mux := grpcutil.MultiplexGRPC(newGRPCServer(sglog.Scoped("indexserver"), s), mux) 1386 log.Fatal(http.ListenAndServe(conf.listen, mux)) 1387 }() 1388 1389 // Serve mux on a unix domain socket on a best-effort-basis so that 1390 // webserver can call the endpoints via the shared filesystem. 1391 // 1392 // 2022-12-08: Docker for Mac with VirtioFS enabled will fail to listen 1393 // on the socket due to permission errors. See 1394 // https://github.com/docker/for-mac/issues/6239 1395 go func() { 1396 serveHTTPOverSocket := func() error { 1397 socket := filepath.Join(s.IndexDir, "indexserver.sock") 1398 // We cannot bind a socket to an existing pathname. 1399 if err := os.Remove(socket); err != nil && !errors.Is(err, fs.ErrNotExist) { 1400 return fmt.Errorf("error removing socket file: %s", socket) 1401 } 1402 // The "unix" network corresponds to stream sockets. (cf. unixgram, 1403 // unixpacket). 1404 l, err := net.Listen("unix", socket) 1405 if err != nil { 1406 return fmt.Errorf("failed to listen on socket %s: %w", socket, err) 1407 } 1408 // Indexserver (root) and webserver (Sourcegraph) run with 1409 // different users. Per default, the socket is created with 1410 // permission 755 (root root), which doesn't let webserver write to 1411 // it. 1412 // 1413 // See https://github.com/golang/go/issues/11822 for more context. 1414 if err := os.Chmod(socket, 0o777); err != nil { 1415 return fmt.Errorf("failed to change permission of socket %s: %w", socket, err) 1416 } 1417 debugLog.Printf("serving HTTP on %s", socket) 1418 return http.Serve(l, mux) 1419 } 1420 debugLog.Print(serveHTTPOverSocket()) 1421 }() 1422 } 1423 1424 oc := &ownerChecker{ 1425 Path: filepath.Join(conf.index, "owner.txt"), 1426 Hostname: conf.hostname, 1427 } 1428 go oc.Run() 1429 1430 logger := sglog.Scoped("metricsRegistration") 1431 opts := mountinfo.CollectorOpts{Namespace: "zoekt_indexserver"} 1432 1433 c := mountinfo.NewCollector(logger, opts, map[string]string{"indexDir": conf.index}) 1434 prometheus.DefaultRegisterer.MustRegister(c) 1435 1436 s.Run() 1437 return nil 1438} 1439 1440func newServer(conf rootConfig) (*Server, error) { 1441 logger := sglog.Scoped("server") 1442 1443 if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { 1444 return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") 1445 } 1446 if conf.index == "" { 1447 return nil, fmt.Errorf("must set -index") 1448 } 1449 if conf.root == "" { 1450 return nil, fmt.Errorf("must set -sourcegraph_url") 1451 } 1452 rootURL, err := url.Parse(conf.root) 1453 if err != nil { 1454 return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) 1455 } 1456 1457 rootURL = addDefaultPort(rootURL) 1458 1459 // Tune GOMAXPROCS to match Linux container CPU quota. 1460 _, _ = maxprocs.Set() 1461 1462 // Automatically prepend our own path at the front, to minimize 1463 // required configuration. 1464 if l, err := os.Readlink("/proc/self/exe"); err == nil { 1465 os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) 1466 } 1467 1468 if _, err := os.Stat(conf.index); err != nil { 1469 if err := os.MkdirAll(conf.index, 0o755); err != nil { 1470 return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) 1471 } 1472 } 1473 1474 if err := setupTmpDir(logger, conf.Main, conf.index); err != nil { 1475 return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) 1476 } 1477 1478 if srcLogLevelIsDebug() { 1479 debugLog.SetOutput(os.Stderr) 1480 } 1481 1482 reposWithSeparateIndexingMetrics = getEnvWithDefaultEmptySet("INDEXING_METRICS_REPOS_ALLOWLIST") 1483 if len(reposWithSeparateIndexingMetrics) > 0 { 1484 debugLog.Printf("capturing separate indexing metrics for: %s", joinStringSet(reposWithSeparateIndexingMetrics, ", ")) 1485 } 1486 1487 deltaBuildRepositoriesAllowList := getEnvWithDefaultEmptySet("DELTA_BUILD_REPOS_ALLOWLIST") 1488 if len(deltaBuildRepositoriesAllowList) > 0 { 1489 debugLog.Printf("using delta shard builds for: %s", joinStringSet(deltaBuildRepositoriesAllowList, ", ")) 1490 } 1491 1492 deltaShardNumberFallbackThreshold := getEnvWithDefaultUint64("DELTA_SHARD_NUMBER_FALLBACK_THRESHOLD", 150) 1493 if deltaShardNumberFallbackThreshold > 0 { 1494 debugLog.Printf("setting delta shard fallback threshold to %d shard(s)", deltaShardNumberFallbackThreshold) 1495 } else { 1496 debugLog.Printf("disabling delta build fallback behavior - delta builds will be performed regardless of the number of preexisting shards") 1497 } 1498 1499 reposShouldSkipSymbolsCalculation := getEnvWithDefaultEmptySet("SKIP_SYMBOLS_REPOS_ALLOWLIST") 1500 if len(reposShouldSkipSymbolsCalculation) > 0 { 1501 debugLog.Printf("skipping generating symbols metadata for: %s", joinStringSet(reposShouldSkipSymbolsCalculation, ", ")) 1502 } 1503 1504 indexingTimeout := getEnvWithDefaultDuration("INDEXING_TIMEOUT", defaultIndexingTimeout) 1505 if indexingTimeout != defaultIndexingTimeout { 1506 debugLog.Printf("using configured indexing timeout: %s", indexingTimeout) 1507 } 1508 1509 var sg Sourcegraph 1510 if rootURL.IsAbs() { 1511 var batchSize int 1512 if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { 1513 batchSize, err = strconv.Atoi(v) 1514 if err != nil { 1515 return nil, fmt.Errorf("invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") 1516 } 1517 } 1518 1519 opts := []SourcegraphClientOption{ 1520 WithBatchSize(batchSize), 1521 } 1522 1523 logger := sglog.Scoped("zoektConfigurationGRPCClient") 1524 client, err := dialGRPCClient(rootURL.Host, logger) 1525 if err != nil { 1526 return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err) 1527 } 1528 1529 sg = newSourcegraphClient(rootURL, conf.hostname, client, opts...) 1530 1531 } else { 1532 sg = sourcegraphFake{ 1533 RootDir: rootURL.String(), 1534 Log: log.New(os.Stderr, "sourcegraph: ", log.LstdFlags), 1535 } 1536 } 1537 1538 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) 1539 if cpuCount < 1 { 1540 cpuCount = 1 1541 } 1542 1543 if conf.indexConcurrency < 1 { 1544 conf.indexConcurrency = 1 1545 } else if conf.indexConcurrency > int64(cpuCount) { 1546 conf.indexConcurrency = int64(cpuCount) 1547 } 1548 1549 q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) 1550 1551 return &Server{ 1552 logger: logger, 1553 Sourcegraph: sg, 1554 IndexDir: conf.index, 1555 IndexConcurrency: int(conf.indexConcurrency), 1556 Interval: conf.interval, 1557 CPUCount: cpuCount, 1558 queue: *q, 1559 shardMerging: !conf.disableShardMerging, 1560 deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList, 1561 deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold, 1562 repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation, 1563 hostname: conf.hostname, 1564 mergeOpts: mergeOpts{ 1565 vacuumInterval: conf.vacuumInterval, 1566 mergeInterval: conf.mergeInterval, 1567 targetSizeBytes: conf.targetSize * 1024 * 1024, 1568 minSizeBytes: conf.minSize * 1024 * 1024, 1569 minAgeDays: conf.minAgeDays, 1570 }, 1571 timeout: indexingTimeout, 1572 }, err 1573} 1574 1575func newGRPCServer(logger sglog.Logger, s *Server, additionalOpts ...grpc.ServerOption) *grpc.Server { 1576 grpcServer := defaults.NewServer(logger, additionalOpts...) 1577 indexserverv1.RegisterSourcegraphIndexserverServiceServer(grpcServer, s) 1578 return grpcServer 1579} 1580 1581// defaultGRPCServiceConfigurationJSON is the default gRPC service configuration 1582// for the indexed-search-configuration gRPC service. 1583// 1584// The default backoff strategy is modeled after the default settings used by 1585// retryablehttp.DefaultClient. 1586// 1587// It retries on the following errors (see https://grpc.github.io/grpc/core/md_doc_statuscodes.html): 1588// - Unavailable 1589// - Aborted 1590// 1591//go:embed default_grpc_service_configuration.json 1592var defaultGRPCServiceConfigurationJSON string 1593 1594func internalActorUnaryInterceptor() grpc.UnaryClientInterceptor { 1595 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 1596 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1597 return invoker(ctx, method, req, reply, cc, opts...) 1598 } 1599} 1600 1601func internalActorStreamInterceptor() grpc.StreamClientInterceptor { 1602 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 1603 ctx = metadata.AppendToOutgoingContext(ctx, "X-Sourcegraph-Actor-UID", "internal") 1604 return streamer(ctx, desc, cc, method, opts...) 1605 } 1606} 1607 1608// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process. 1609// This can be overridden by providing custom Server/Dial options. 1610const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB 1611 1612func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (configv1.ZoektConfigurationServiceClient, error) { 1613 metrics := clientMetricsOnce() 1614 1615 // If the service seems to be unavailable, this 1616 // will retry after [1s, 2s, 4s, 8s, 16s] with a jitterFraction of .1 1617 // Ex: (on the first retry attempt, we will wait between [.9s and 1.1s]) 1618 retryOpts := []retry.CallOption{ 1619 retry.WithMax(5), 1620 retry.WithBackoff(retry.BackoffExponentialWithJitter(1*time.Second, .1)), 1621 retry.WithCodes(codes.Unavailable), 1622 } 1623 1624 opts := []grpc.DialOption{ 1625 grpc.WithTransportCredentials(insecure.NewCredentials()), 1626 grpc.WithChainStreamInterceptor( 1627 metrics.StreamClientInterceptor(), 1628 messagesize.StreamClientInterceptor, 1629 internalActorStreamInterceptor(), 1630 internalerrs.LoggingStreamClientInterceptor(logger), 1631 internalerrs.PrometheusStreamClientInterceptor, 1632 retry.StreamClientInterceptor(retryOpts...), 1633 ), 1634 grpc.WithChainUnaryInterceptor( 1635 metrics.UnaryClientInterceptor(), 1636 messagesize.UnaryClientInterceptor, 1637 internalActorUnaryInterceptor(), 1638 internalerrs.LoggingUnaryClientInterceptor(logger), 1639 internalerrs.PrometheusUnaryClientInterceptor, 1640 retry.UnaryClientInterceptor(retryOpts...), 1641 ), 1642 grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON), 1643 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)), 1644 } 1645 1646 opts = append(opts, additionalOpts...) 1647 1648 // Ensure that the message size options are set last, so they override any other 1649 // client-specific options that tweak the message size. 1650 // 1651 // The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they 1652 // take precedence over everything else with a uniform size setting that's easy to reason about. 1653 opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...) 1654 1655 // This dialer is used to connect via gRPC to the Sourcegraph instance. 1656 // This is done lazily, so we can provide the client to use regardless of 1657 // whether we enabled gRPC or not initially. 1658 cc, err := grpc.Dial(addr, opts...) 1659 if err != nil { 1660 return nil, fmt.Errorf("dialing %q: %w", addr, err) 1661 } 1662 1663 client := configv1.NewZoektConfigurationServiceClient(cc) 1664 return client, nil 1665} 1666 1667// addDefaultPort adds a default port to a URL if one is not specified. 1668// 1669// If the URL scheme is "http" and no port is specified, "80" is used. 1670// If the scheme is "https", "443" is used. 1671// 1672// The original URL is not mutated. A copy is modified and returned. 1673func addDefaultPort(original *url.URL) *url.URL { 1674 if original == nil { 1675 return nil // don't panic 1676 } 1677 1678 if !original.IsAbs() { 1679 return original // don't do anything if the URL doesn't look like a remote URL 1680 } 1681 1682 if original.Scheme == "http" && original.Port() == "" { 1683 u := cloneURL(original) 1684 u.Host = net.JoinHostPort(u.Host, "80") 1685 return u 1686 } 1687 1688 if original.Scheme == "https" && original.Port() == "" { 1689 u := cloneURL(original) 1690 u.Host = net.JoinHostPort(u.Host, "443") 1691 return u 1692 } 1693 1694 return original 1695} 1696 1697// cloneURL returns a copy of the URL. It is safe to mutate the returned URL. 1698// This is copied from net/http/clone.go 1699func cloneURL(u *url.URL) *url.URL { 1700 if u == nil { 1701 return nil 1702 } 1703 u2 := new(url.URL) 1704 *u2 = *u 1705 if u.User != nil { 1706 u2.User = new(url.Userinfo) 1707 *u2.User = *u.User 1708 } 1709 return u2 1710} 1711 1712func main() { 1713 liblog := sglog.Init(sglog.Resource{ 1714 Name: "zoekt-indexserver", 1715 Version: index.Version, 1716 InstanceID: index.HostnameBestEffort(), 1717 }) 1718 defer liblog.Sync() 1719 1720 if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { 1721 log.Fatal(err) 1722 } 1723} 1724 1725// getBoolFromEnvironmentVariables returns the boolean defined by the first environment 1726// variable listed in envVarNames that is set in the current process environment, or the defaultBool if none are set. 1727// 1728// An error is returned of the provided environment variables fails to parse as a boolean. 1729func getBoolFromEnvironmentVariables(envVarNames []string, defaultBool bool) (bool, error) { 1730 for _, envVar := range envVarNames { 1731 v := os.Getenv(envVar) 1732 if v == "" { 1733 continue 1734 } 1735 1736 b, err := strconv.ParseBool(v) 1737 if err != nil { 1738 return false, fmt.Errorf("parsing environment variable %q to boolean: %v", envVar, err) 1739 } 1740 1741 return b, nil 1742 } 1743 1744 return defaultBool, nil 1745}