fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "hash/crc32" 9 "log" 10 "math/rand" 11 "net/url" 12 "os" 13 "os/exec" 14 "path" 15 "path/filepath" 16 "strconv" 17 "strings" 18 "time" 19 20 "github.com/go-git/go-git/v5" 21 "golang.org/x/net/trace" 22 23 "github.com/sourcegraph/zoekt" 24 configv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/sourcegraph/zoekt/configuration/v1" 25 "github.com/sourcegraph/zoekt/internal/ctags" 26) 27 28// SourcegraphListResult is the return value of Sourcegraph.List. It is its 29// own type since internally we batch the calculation of index options. This 30// is exposed via IterateIndexOptions. 31// 32// This type has state and is coupled to the Sourcegraph implementation. 33type SourcegraphListResult struct { 34 // IDs is the set of Sourcegraph repository IDs that this replica needs 35 // to index. 36 IDs []uint32 37 38 // IterateIndexOptions best effort resolves the IndexOptions for RepoIDs. If 39 // any repository fails it internally logs. It uses the "config fingerprint" 40 // to reduce the amount of work done. This means we only resolve options for 41 // repositories which have been mutated since the last Sourcegraph.List 42 // call. 43 // 44 // Note: this has a side-effect of setting a the "config fingerprint". The 45 // config fingerprint means we only calculate index options for repositories 46 // that have changed since the last call to IterateIndexOptions. If you want 47 // to force calculation of index options use 48 // Sourcegraph.ForceIterateIndexOptions. 49 // 50 // Note: This should not be called concurrently with the Sourcegraph client. 51 IterateIndexOptions func(func(IndexOptions)) 52} 53 54// Sourcegraph represents the Sourcegraph service. It informs the indexserver 55// what to index and which options to use. 56type Sourcegraph interface { 57 // List returns a list of repository IDs to index as well as a facility to 58 // fetch the indexing options. 59 // 60 // Note: The return value is not safe to use concurrently with future calls 61 // to List. 62 List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) 63 64 // ForceIterateIndexOptions will best-effort calculate the index options for 65 // all repos. For each repo it will call either onSuccess or onError. This 66 // is the forced version of IterateIndexOptions, so will always calculate 67 // options for each id in repos. 68 ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) 69 70 // UpdateIndexStatus sends a request to Sourcegraph to confirm that the 71 // given repositories have been indexed. 72 UpdateIndexStatus(repositories []indexStatus) error 73} 74 75type SourcegraphClientOption func(*sourcegraphClient) 76 77// WithBatchSize controls how many repository configurations we request a time. 78// If BatchSize is 0, we default to requesting 10,000 repositories at once. 79func WithBatchSize(batchSize int) SourcegraphClientOption { 80 return func(c *sourcegraphClient) { 81 c.BatchSize = batchSize 82 } 83} 84 85func newSourcegraphClient(rootURL *url.URL, hostname string, grpcClient configv1.ZoektConfigurationServiceClient, opts ...SourcegraphClientOption) *sourcegraphClient { 86 client := &sourcegraphClient{ 87 Root: rootURL, 88 Hostname: hostname, 89 BatchSize: 0, 90 grpcClient: grpcClient, 91 } 92 93 for _, opt := range opts { 94 opt(client) 95 } 96 97 return client 98} 99 100// sourcegraphClient contains methods which interact with the sourcegraph API. 101type sourcegraphClient struct { 102 // Root is the base URL for the Sourcegraph instance to index. Normally 103 // http://sourcegraph-frontend-internal or http://localhost:3090. 104 Root *url.URL 105 106 // Hostname is the name we advertise to Sourcegraph when asking for the 107 // list of repositories to index. 108 Hostname string 109 110 // BatchSize is how many repository configurations we request at once. If 111 // zero a value of 10000 is used. 112 BatchSize int 113 114 // grpcClient is used to make requests to the Sourcegraph instance if gRPC is enabled. 115 grpcClient configv1.ZoektConfigurationServiceClient 116 117 // configFingerprintProto is the last config fingerprint (as GRPC) returned from 118 // Sourcegraph. It can be used for future calls to the configuration 119 // endpoint. 120 // 121 // configFingerprintProto is mutually exclusive with configFingerprint - this field 122 // will only be used if gRPC is enabled. 123 configFingerprintProto *configv1.Fingerprint 124 125 // configFingerprintReset tracks when we should zero out the 126 // configFingerprint. We want to periodically do this just in case our 127 // configFingerprint logic is faulty. When it is cleared out, we fallback to 128 // calculating everything. 129 configFingerprintReset time.Time 130} 131 132func (s *sourcegraphClient) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 133 repos, err := s.listRepoIDs(ctx, indexed) 134 if err != nil { 135 return nil, fmt.Errorf("listRepoIDs: %w", err) 136 } 137 138 batchSize := s.BatchSize 139 if batchSize == 0 { 140 batchSize = 10_000 141 } 142 143 // Check if we should recalculate everything. 144 if time.Now().After(s.configFingerprintReset) { 145 // for every 500 repos we wait a minute. 2021-12-15 on sourcegraph.com 146 // this works out to every 100 minutes. 147 next := time.Duration(len(indexed)) * time.Minute / 500 148 if min := 5 * time.Minute; next < min { 149 next = min 150 } 151 next += time.Duration(rand.Int63n(int64(next) / 4)) // jitter 152 s.configFingerprintReset = time.Now().Add(next) 153 154 s.configFingerprintProto = nil 155 } 156 157 // getIndexOptionsFunc is a function that can be used to get the index 158 // options for a set of repos (while properly handling any configuration fingerprint 159 // changes). 160 // 161 // In general, this function provides a consistent fingerprint for each batch call, 162 // and updates the server state with the new fingerprint. If any of the batch calls 163 // fail, the old fingerprint is restored. 164 type getIndexOptionsFunc func(repos ...uint32) ([]indexOptionsItem, error) 165 166 mkGetIndexOptionsFunc := func(tr trace.Trace) getIndexOptionsFunc { 167 startingFingerPrint := s.configFingerprintProto 168 tr.LazyPrintf("fingerprint: %s", startingFingerPrint.String()) 169 170 first := true 171 return func(repos ...uint32) ([]indexOptionsItem, error) { 172 options, nextFingerPrint, err := s.getIndexOptions(ctx, startingFingerPrint, repos) 173 if err != nil { 174 first = false 175 s.configFingerprintProto = startingFingerPrint 176 177 return nil, err 178 } 179 180 if first { 181 first = false 182 s.configFingerprintProto = nextFingerPrint 183 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint.String()) 184 } 185 186 return options, nil 187 } 188 } 189 190 iterate := func(f func(IndexOptions)) { 191 start := time.Now() 192 tr := trace.New("getIndexOptions", "") 193 tr.LazyPrintf("getting index options for %d repos", len(repos)) 194 195 defer func() { 196 metricResolveRevisionsDuration.Observe(time.Since(start).Seconds()) 197 tr.Finish() 198 }() 199 200 getIndexOptions := mkGetIndexOptionsFunc(tr) 201 202 // We ask the frontend to get index options in batches. 203 for repos := range batched(repos, batchSize) { 204 start := time.Now() 205 options, err := getIndexOptions(repos...) 206 duration := time.Since(start) 207 208 if err != nil { 209 metricResolveRevisionDuration.WithLabelValues("false").Observe(duration.Seconds()) 210 tr.LazyPrintf("failed fetching options batch: %v", err) 211 tr.SetError() 212 213 continue 214 } 215 216 metricResolveRevisionDuration.WithLabelValues("true").Observe(duration.Seconds()) 217 218 for _, o := range options { 219 metricGetIndexOptions.Inc() 220 221 if o.Error != "" { 222 metricGetIndexOptionsError.Inc() 223 tr.LazyPrintf("failed fetching options for %v: %v", o.Name, o.Error) 224 tr.SetError() 225 226 continue 227 } 228 f(o.IndexOptions) 229 } 230 } 231 } 232 233 return &SourcegraphListResult{ 234 IDs: repos, 235 IterateIndexOptions: iterate, 236 }, nil 237} 238 239func (s *sourcegraphClient) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 240 batchSize := s.BatchSize 241 if batchSize == 0 { 242 batchSize = 10_000 243 } 244 245 for repos := range batched(repos, batchSize) { 246 opts, _, err := s.getIndexOptions(context.Background(), nil, repos) 247 if err != nil { 248 for _, id := range repos { 249 onError(id, err) 250 } 251 continue 252 } 253 for _, o := range opts { 254 if o.RepoID > 0 && o.Error != "" { 255 onError(o.RepoID, errors.New(o.Error)) 256 } 257 if o.Error == "" { 258 onSuccess(o.IndexOptions) 259 } 260 } 261 } 262} 263 264// indexOptionsItem wraps IndexOptions to also include an error returned by 265// the API. 266type indexOptionsItem struct { 267 IndexOptions 268 Error string 269} 270 271func (o *indexOptionsItem) FromProto(x *configv1.ZoektIndexOptions) { 272 branches := make([]zoekt.RepositoryBranch, 0, len(x.Branches)) 273 for _, b := range x.GetBranches() { 274 branches = append(branches, zoekt.RepositoryBranch{ 275 Name: b.GetName(), 276 Version: b.GetVersion(), 277 }) 278 } 279 280 item := indexOptionsItem{} 281 languageMap := make(map[string]ctags.CTagsParserType) 282 283 for _, lang := range x.GetLanguageMap() { 284 languageMap[lang.GetLanguage()] = ctags.CTagsParserType(lang.GetCtags().Number()) 285 } 286 287 item.IndexOptions = IndexOptions{ 288 RepoID: uint32(x.GetRepoId()), 289 LargeFiles: x.GetLargeFiles(), 290 Symbols: x.GetSymbols(), 291 Branches: branches, 292 Name: x.GetName(), 293 294 Priority: x.GetPriority(), 295 296 Public: x.GetPublic(), 297 Fork: x.GetFork(), 298 Archived: x.GetArchived(), 299 300 LanguageMap: languageMap, 301 ShardConcurrency: x.GetShardConcurrency(), 302 303 TenantID: int(x.TenantId), 304 } 305 306 item.Error = x.GetError() 307 308 *o = item 309} 310 311func (o *indexOptionsItem) ToProto() *configv1.ZoektIndexOptions { 312 branches := make([]*configv1.ZoektRepositoryBranch, 0, len(o.Branches)) 313 for _, b := range o.Branches { 314 branches = append(branches, &configv1.ZoektRepositoryBranch{ 315 Name: b.Name, 316 Version: b.Version, 317 }) 318 } 319 320 languageMap := make([]*configv1.LanguageMapping, 0, len(o.LanguageMap)) 321 322 for lang, parser := range o.LanguageMap { 323 languageMap = append(languageMap, &configv1.LanguageMapping{ 324 Language: lang, 325 Ctags: configv1.CTagsParserType(parser), 326 }) 327 } 328 329 return &configv1.ZoektIndexOptions{ 330 RepoId: int32(o.RepoID), 331 LargeFiles: o.LargeFiles, 332 Symbols: o.Symbols, 333 Branches: branches, 334 Name: o.Name, 335 336 Priority: o.Priority, 337 338 Public: o.Public, 339 Fork: o.Fork, 340 Archived: o.Archived, 341 342 Error: o.Error, 343 344 LanguageMap: languageMap, 345 ShardConcurrency: o.ShardConcurrency, 346 347 TenantId: int64(o.TenantID), 348 } 349} 350 351func (s *sourcegraphClient) getIndexOptions(ctx context.Context, fingerprint *configv1.Fingerprint, repos []uint32) ([]indexOptionsItem, *configv1.Fingerprint, error) { 352 repoIDs := make([]int32, 0, len(repos)) 353 for _, id := range repos { 354 repoIDs = append(repoIDs, int32(id)) 355 } 356 357 req := configv1.SearchConfigurationRequest{ 358 RepoIds: repoIDs, 359 Fingerprint: fingerprint, 360 } 361 362 response, err := s.grpcClient.SearchConfiguration(ctx, &req) 363 if err != nil { 364 return nil, nil, err 365 } 366 367 protoItems := response.GetUpdatedOptions() 368 items := make([]indexOptionsItem, 0, len(protoItems)) 369 for _, x := range protoItems { 370 var item indexOptionsItem 371 item.FromProto(x) 372 item.IndexOptions.CloneURL = s.getCloneURL(item.Name) 373 374 items = append(items, item) 375 } 376 377 return items, response.GetFingerprint(), nil 378} 379 380func (s *sourcegraphClient) getCloneURL(name string) string { 381 return s.Root.ResolveReference(&url.URL{Path: path.Join("/.internal/git", name)}).String() 382} 383 384func (s *sourcegraphClient) listRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) { 385 var request configv1.ListRequest 386 request.Hostname = s.Hostname 387 request.IndexedIds = make([]int32, 0, len(indexed)) 388 for _, id := range indexed { 389 request.IndexedIds = append(request.IndexedIds, int32(id)) 390 } 391 392 response, err := s.grpcClient.List(ctx, &request) 393 if err != nil { 394 return nil, err 395 } 396 397 repoIDs := make([]uint32, 0, len(response.RepoIds)) 398 for _, id := range response.RepoIds { 399 repoIDs = append(repoIDs, uint32(id)) 400 } 401 402 return repoIDs, nil 403} 404 405type indexStatus struct { 406 RepoID uint32 407 Branches []zoekt.RepositoryBranch 408 IndexTimeUnix int64 409} 410 411type updateIndexStatusRequest struct { 412 Repositories []indexStatus 413} 414 415func (u *updateIndexStatusRequest) ToProto() *configv1.UpdateIndexStatusRequest { 416 repositories := make([]*configv1.UpdateIndexStatusRequest_Repository, 0, len(u.Repositories)) 417 418 for _, repo := range u.Repositories { 419 branches := make([]*configv1.ZoektRepositoryBranch, 0, len(repo.Branches)) 420 421 for _, branch := range repo.Branches { 422 branches = append(branches, &configv1.ZoektRepositoryBranch{ 423 Name: branch.Name, 424 Version: branch.Version, 425 }) 426 } 427 428 repositories = append(repositories, &configv1.UpdateIndexStatusRequest_Repository{ 429 RepoId: repo.RepoID, 430 Branches: branches, 431 IndexTimeUnix: repo.IndexTimeUnix, 432 }) 433 } 434 435 return &configv1.UpdateIndexStatusRequest{ 436 Repositories: repositories, 437 } 438} 439 440func (u *updateIndexStatusRequest) FromProto(x *configv1.UpdateIndexStatusRequest) { 441 protoRepositories := x.GetRepositories() 442 repositories := make([]indexStatus, 0, len(protoRepositories)) 443 444 for _, repo := range x.GetRepositories() { 445 protoBranches := repo.GetBranches() 446 branches := make([]zoekt.RepositoryBranch, 0, len(protoBranches)) 447 448 for _, branch := range repo.GetBranches() { 449 branches = append(branches, zoekt.RepositoryBranch{ 450 Name: branch.GetName(), 451 Version: branch.GetVersion(), 452 }) 453 } 454 455 repositories = append(repositories, indexStatus{ 456 RepoID: repo.GetRepoId(), 457 Branches: branches, 458 IndexTimeUnix: repo.GetIndexTimeUnix(), 459 }) 460 } 461 462 *u = updateIndexStatusRequest{ 463 Repositories: repositories, 464 } 465} 466 467// UpdateIndexStatus sends a request to Sourcegraph to confirm that the given 468// repositories have been indexed. 469func (s *sourcegraphClient) UpdateIndexStatus(repositories []indexStatus) error { 470 r := updateIndexStatusRequest{Repositories: repositories} 471 472 request := r.ToProto() 473 _, err := s.grpcClient.UpdateIndexStatus(context.Background(), request) 474 if err != nil { 475 return fmt.Errorf("failed to update index status: %w", err) 476 } 477 478 return nil 479} 480 481type sourcegraphFake struct { 482 RootDir string 483 Log *log.Logger 484} 485 486func (sf sourcegraphFake) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 487 repos, err := sf.ListRepoIDs(ctx, indexed) 488 if err != nil { 489 return nil, err 490 } 491 492 iterate := func(f func(IndexOptions)) { 493 opts, err := sf.GetIndexOptions(repos...) 494 if err != nil { 495 sf.Log.Printf("WARN: ignoring GetIndexOptions error: %v", err) 496 } 497 for _, opt := range opts { 498 if opt.Error != "" { 499 sf.Log.Printf("WARN: ignoring GetIndexOptions error for %s: %v", opt.Name, opt.Error) 500 continue 501 } 502 f(opt.IndexOptions) 503 } 504 } 505 506 return &SourcegraphListResult{ 507 IDs: repos, 508 IterateIndexOptions: iterate, 509 }, nil 510} 511 512func (sf sourcegraphFake) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 513 opts, err := sf.GetIndexOptions(repos...) 514 if err != nil { 515 for _, id := range repos { 516 onError(id, err) 517 } 518 return 519 } 520 for _, o := range opts { 521 if o.RepoID > 0 && o.Error != "" { 522 onError(o.RepoID, errors.New(o.Error)) 523 } 524 if o.Error == "" { 525 onSuccess(o.IndexOptions) 526 } 527 } 528} 529 530func (sf sourcegraphFake) GetIndexOptions(repos ...uint32) ([]indexOptionsItem, error) { 531 reposIdx := map[uint32]int{} 532 for i, id := range repos { 533 reposIdx[id] = i 534 } 535 536 items := make([]indexOptionsItem, len(repos)) 537 err := sf.visitRepos(func(name string) { 538 idx, ok := reposIdx[sf.id(name)] 539 if !ok { 540 return 541 } 542 opts, err := sf.getIndexOptions(name) 543 if err != nil { 544 items[idx] = indexOptionsItem{Error: err.Error()} 545 } else { 546 items[idx] = indexOptionsItem{IndexOptions: opts} 547 } 548 }) 549 if err != nil { 550 return nil, err 551 } 552 553 for i := range items { 554 if items[i].Error == "" && items[i].RepoID == 0 { 555 items[i].Error = "not found" 556 } 557 } 558 559 return items, nil 560} 561 562func (sf sourcegraphFake) getIndexOptions(name string) (IndexOptions, error) { 563 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name)) 564 exists := func(p string) bool { 565 _, err := os.Stat(filepath.Join(dir, p)) 566 return err == nil 567 } 568 float := func(p string) float64 { 569 b, _ := os.ReadFile(filepath.Join(dir, p)) 570 f, _ := strconv.ParseFloat(string(bytes.TrimSpace(b)), 64) 571 return f 572 } 573 574 opts := IndexOptions{ 575 RepoID: sf.id(name), 576 Name: name, 577 CloneURL: sf.getCloneURL(name), 578 Symbols: true, 579 580 Public: !exists("SG_PRIVATE"), 581 Fork: exists("SG_FORK"), 582 Archived: exists("SG_ARCHIVED"), 583 584 Priority: float("SG_PRIORITY"), 585 } 586 587 branches, err := sf.getBranches(name) 588 if err != nil { 589 return opts, err 590 } 591 opts.Branches = branches 592 593 return opts, nil 594} 595 596func (sf sourcegraphFake) getBranches(name string) ([]zoekt.RepositoryBranch, error) { 597 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name)) 598 repo, err := git.PlainOpen(dir) 599 if err != nil { 600 return nil, err 601 } 602 603 cfg, err := repo.Config() 604 if err != nil { 605 return nil, err 606 } 607 608 sec := cfg.Raw.Section("zoekt") 609 branches := sec.Options.GetAll("branch") 610 if len(branches) == 0 { 611 branches = append(branches, "HEAD") 612 } 613 614 rBranches := make([]zoekt.RepositoryBranch, 0, len(branches)) 615 for _, branch := range branches { 616 cmd := exec.Command("git", "rev-parse", branch) 617 cmd.Dir = dir 618 if b, err := cmd.Output(); err != nil { 619 sf.Log.Printf("WARN: Could not get branch %s/%s", name, branch) 620 } else { 621 version := string(bytes.TrimSpace(b)) 622 rBranches = append(rBranches, zoekt.RepositoryBranch{ 623 Name: branch, 624 Version: version, 625 }) 626 } 627 } 628 629 if len(rBranches) == 0 { 630 return nil, fmt.Errorf("WARN: Could not get any branch revisions for repo %s", name) 631 } 632 633 return rBranches, nil 634} 635 636func (sf sourcegraphFake) id(name string) uint32 { 637 // allow overriding the ID. 638 idPath := filepath.Join(sf.RootDir, filepath.FromSlash(name), "SG_ID") 639 if b, _ := os.ReadFile(idPath); len(b) > 0 { 640 id, err := strconv.Atoi(strings.TrimSpace(string(b))) 641 if err == nil { 642 return uint32(id) 643 } 644 } 645 return fakeID(name) 646} 647 648func (sf sourcegraphFake) getCloneURL(name string) string { 649 return filepath.Join(sf.RootDir, filepath.FromSlash(name)) 650} 651 652func (sf sourcegraphFake) ListRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) { 653 var repos []uint32 654 err := sf.visitRepos(func(name string) { 655 repos = append(repos, sf.id(name)) 656 }) 657 return repos, err 658} 659 660func (sf sourcegraphFake) visitRepos(visit func(name string)) error { 661 return filepath.Walk(sf.RootDir, func(path string, fi os.FileInfo, fileErr error) error { 662 if fileErr != nil { 663 sf.Log.Printf("WARN: ignoring error searching %s: %v", path, fileErr) 664 return nil 665 } 666 if !fi.IsDir() { 667 return nil 668 } 669 670 gitdir := filepath.Join(path, ".git") 671 if fi, err := os.Stat(gitdir); err != nil || !fi.IsDir() { 672 return nil 673 } 674 675 subpath, err := filepath.Rel(sf.RootDir, path) 676 if err != nil { 677 // According to WalkFunc docs, path is always filepath.Join(root, 678 // subpath). So Rel should always work. 679 return fmt.Errorf("filepath.Walk returned %s which is not relative to %s: %w", path, sf.RootDir, err) 680 } 681 682 name := filepath.ToSlash(subpath) 683 visit(name) 684 685 return filepath.SkipDir 686 }) 687} 688 689func (s sourcegraphFake) UpdateIndexStatus(repositories []indexStatus) error { 690 // noop 691 return nil 692} 693 694// fakeID returns a deterministic ID based on name. Used for fakes and tests. 695func fakeID(name string) uint32 { 696 // magic at the end is to ensure we get a positive number when casting. 697 return uint32(crc32.ChecksumIEEE([]byte(name))%(1<<31-1) + 1) 698} 699 700type sourcegraphNop struct{} 701 702func (s sourcegraphNop) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 703 return nil, nil 704} 705 706func (s sourcegraphNop) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 707} 708 709func (s sourcegraphNop) UpdateIndexStatus(repositories []indexStatus) error { 710 return nil 711}