fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "hash/crc32" 9 "log" 10 "math/rand" 11 "net/url" 12 "os" 13 "os/exec" 14 "path" 15 "path/filepath" 16 "strconv" 17 "strings" 18 "time" 19 20 "github.com/go-git/go-git/v5" 21 "golang.org/x/net/trace" 22 23 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 24 "github.com/sourcegraph/zoekt/ctags" 25 26 "github.com/sourcegraph/zoekt" 27) 28 29// SourcegraphListResult is the return value of Sourcegraph.List. It is its 30// own type since internally we batch the calculation of index options. This 31// is exposed via IterateIndexOptions. 32// 33// This type has state and is coupled to the Sourcegraph implementation. 34type SourcegraphListResult struct { 35 // IDs is the set of Sourcegraph repository IDs that this replica needs 36 // to index. 37 IDs []uint32 38 39 // IterateIndexOptions best effort resolves the IndexOptions for RepoIDs. If 40 // any repository fails it internally logs. It uses the "config fingerprint" 41 // to reduce the amount of work done. This means we only resolve options for 42 // repositories which have been mutated since the last Sourcegraph.List 43 // call. 44 // 45 // Note: this has a side-effect of setting a the "config fingerprint". The 46 // config fingerprint means we only calculate index options for repositories 47 // that have changed since the last call to IterateIndexOptions. If you want 48 // to force calculation of index options use 49 // Sourcegraph.ForceIterateIndexOptions. 50 // 51 // Note: This should not be called concurrently with the Sourcegraph client. 52 IterateIndexOptions func(func(IndexOptions)) 53} 54 55// Sourcegraph represents the Sourcegraph service. It informs the indexserver 56// what to index and which options to use. 57type Sourcegraph interface { 58 // List returns a list of repository IDs to index as well as a facility to 59 // fetch the indexing options. 60 // 61 // Note: The return value is not safe to use concurrently with future calls 62 // to List. 63 List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) 64 65 // ForceIterateIndexOptions will best-effort calculate the index options for 66 // all repos. For each repo it will call either onSuccess or onError. This 67 // is the forced version of IterateIndexOptions, so will always calculate 68 // options for each id in repos. 69 ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) 70 71 // UpdateIndexStatus sends a request to Sourcegraph to confirm that the 72 // given repositories have been indexed. 73 UpdateIndexStatus(repositories []indexStatus) error 74} 75 76type SourcegraphClientOption func(*sourcegraphClient) 77 78// WithBatchSize controls how many repository configurations we request a time. 79// If BatchSize is 0, we default to requesting 10,000 repositories at once. 80func WithBatchSize(batchSize int) SourcegraphClientOption { 81 return func(c *sourcegraphClient) { 82 c.BatchSize = batchSize 83 } 84} 85 86func newSourcegraphClient(rootURL *url.URL, hostname string, grpcClient proto.ZoektConfigurationServiceClient, opts ...SourcegraphClientOption) *sourcegraphClient { 87 client := &sourcegraphClient{ 88 Root: rootURL, 89 Hostname: hostname, 90 BatchSize: 0, 91 grpcClient: grpcClient, 92 } 93 94 for _, opt := range opts { 95 opt(client) 96 } 97 98 return client 99} 100 101// sourcegraphClient contains methods which interact with the sourcegraph API. 102type sourcegraphClient struct { 103 // Root is the base URL for the Sourcegraph instance to index. Normally 104 // http://sourcegraph-frontend-internal or http://localhost:3090. 105 Root *url.URL 106 107 // Hostname is the name we advertise to Sourcegraph when asking for the 108 // list of repositories to index. 109 Hostname string 110 111 // BatchSize is how many repository configurations we request at once. If 112 // zero a value of 10000 is used. 113 BatchSize int 114 115 // grpcClient is used to make requests to the Sourcegraph instance if gRPC is enabled. 116 grpcClient proto.ZoektConfigurationServiceClient 117 118 // configFingerprintProto is the last config fingerprint (as GRPC) returned from 119 // Sourcegraph. It can be used for future calls to the configuration 120 // endpoint. 121 // 122 // configFingerprintProto is mutually exclusive with configFingerprint - this field 123 // will only be used if gRPC is enabled. 124 configFingerprintProto *proto.Fingerprint 125 126 // configFingerprintReset tracks when we should zero out the 127 // configFingerprint. We want to periodically do this just in case our 128 // configFingerprint logic is faulty. When it is cleared out, we fallback to 129 // calculating everything. 130 configFingerprintReset time.Time 131} 132 133func (s *sourcegraphClient) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 134 repos, err := s.listRepoIDs(ctx, indexed) 135 if err != nil { 136 return nil, fmt.Errorf("listRepoIDs: %w", err) 137 } 138 139 batchSize := s.BatchSize 140 if batchSize == 0 { 141 batchSize = 10_000 142 } 143 144 // Check if we should recalculate everything. 145 if time.Now().After(s.configFingerprintReset) { 146 // for every 500 repos we wait a minute. 2021-12-15 on sourcegraph.com 147 // this works out to every 100 minutes. 148 next := time.Duration(len(indexed)) * time.Minute / 500 149 if min := 5 * time.Minute; next < min { 150 next = min 151 } 152 next += time.Duration(rand.Int63n(int64(next) / 4)) // jitter 153 s.configFingerprintReset = time.Now().Add(next) 154 155 s.configFingerprintProto = nil 156 } 157 158 // getIndexOptionsFunc is a function that can be used to get the index 159 // options for a set of repos (while properly handling any configuration fingerprint 160 // changes). 161 // 162 // In general, this function provides a consistent fingerprint for each batch call, 163 // and updates the server state with the new fingerprint. If any of the batch calls 164 // fail, the old fingerprint is restored. 165 type getIndexOptionsFunc func(repos ...uint32) ([]indexOptionsItem, error) 166 167 mkGetIndexOptionsFunc := func(tr trace.Trace) getIndexOptionsFunc { 168 startingFingerPrint := s.configFingerprintProto 169 tr.LazyPrintf("fingerprint: %s", startingFingerPrint.String()) 170 171 first := true 172 return func(repos ...uint32) ([]indexOptionsItem, error) { 173 options, nextFingerPrint, err := s.getIndexOptions(ctx, startingFingerPrint, repos) 174 if err != nil { 175 first = false 176 s.configFingerprintProto = startingFingerPrint 177 178 return nil, err 179 } 180 181 if first { 182 first = false 183 s.configFingerprintProto = nextFingerPrint 184 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint.String()) 185 } 186 187 return options, nil 188 } 189 } 190 191 iterate := func(f func(IndexOptions)) { 192 start := time.Now() 193 tr := trace.New("getIndexOptions", "") 194 tr.LazyPrintf("getting index options for %d repos", len(repos)) 195 196 defer func() { 197 metricResolveRevisionsDuration.Observe(time.Since(start).Seconds()) 198 tr.Finish() 199 }() 200 201 getIndexOptions := mkGetIndexOptionsFunc(tr) 202 203 // We ask the frontend to get index options in batches. 204 for repos := range batched(repos, batchSize) { 205 start := time.Now() 206 options, err := getIndexOptions(repos...) 207 duration := time.Since(start) 208 209 if err != nil { 210 metricResolveRevisionDuration.WithLabelValues("false").Observe(duration.Seconds()) 211 tr.LazyPrintf("failed fetching options batch: %v", err) 212 tr.SetError() 213 214 continue 215 } 216 217 metricResolveRevisionDuration.WithLabelValues("true").Observe(duration.Seconds()) 218 219 for _, o := range options { 220 metricGetIndexOptions.Inc() 221 222 if o.Error != "" { 223 metricGetIndexOptionsError.Inc() 224 tr.LazyPrintf("failed fetching options for %v: %v", o.Name, o.Error) 225 tr.SetError() 226 227 continue 228 } 229 f(o.IndexOptions) 230 } 231 } 232 } 233 234 return &SourcegraphListResult{ 235 IDs: repos, 236 IterateIndexOptions: iterate, 237 }, nil 238} 239 240func (s *sourcegraphClient) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 241 batchSize := s.BatchSize 242 if batchSize == 0 { 243 batchSize = 10_000 244 } 245 246 for repos := range batched(repos, batchSize) { 247 opts, _, err := s.getIndexOptions(context.Background(), nil, repos) 248 if err != nil { 249 for _, id := range repos { 250 onError(id, err) 251 } 252 continue 253 } 254 for _, o := range opts { 255 if o.RepoID > 0 && o.Error != "" { 256 onError(o.RepoID, errors.New(o.Error)) 257 } 258 if o.Error == "" { 259 onSuccess(o.IndexOptions) 260 } 261 } 262 } 263} 264 265// indexOptionsItem wraps IndexOptions to also include an error returned by 266// the API. 267type indexOptionsItem struct { 268 IndexOptions 269 Error string 270} 271 272func (o *indexOptionsItem) FromProto(x *proto.ZoektIndexOptions) { 273 branches := make([]zoekt.RepositoryBranch, 0, len(x.Branches)) 274 for _, b := range x.GetBranches() { 275 branches = append(branches, zoekt.RepositoryBranch{ 276 Name: b.GetName(), 277 Version: b.GetVersion(), 278 }) 279 } 280 281 item := indexOptionsItem{} 282 languageMap := make(map[string]ctags.CTagsParserType) 283 284 for _, lang := range x.GetLanguageMap() { 285 languageMap[lang.GetLanguage()] = ctags.CTagsParserType(lang.GetCtags().Number()) 286 } 287 288 item.IndexOptions = IndexOptions{ 289 RepoID: uint32(x.GetRepoId()), 290 LargeFiles: x.GetLargeFiles(), 291 Symbols: x.GetSymbols(), 292 Branches: branches, 293 Name: x.GetName(), 294 295 Priority: x.GetPriority(), 296 297 Public: x.GetPublic(), 298 Fork: x.GetFork(), 299 Archived: x.GetArchived(), 300 301 LanguageMap: languageMap, 302 ShardConcurrency: x.GetShardConcurrency(), 303 304 TenantID: int(x.TenantId), 305 } 306 307 item.Error = x.GetError() 308 309 *o = item 310} 311 312func (o *indexOptionsItem) ToProto() *proto.ZoektIndexOptions { 313 branches := make([]*proto.ZoektRepositoryBranch, 0, len(o.Branches)) 314 for _, b := range o.Branches { 315 branches = append(branches, &proto.ZoektRepositoryBranch{ 316 Name: b.Name, 317 Version: b.Version, 318 }) 319 } 320 321 languageMap := make([]*proto.LanguageMapping, 0, len(o.LanguageMap)) 322 323 for lang, parser := range o.LanguageMap { 324 languageMap = append(languageMap, &proto.LanguageMapping{ 325 Language: lang, 326 Ctags: proto.CTagsParserType(parser), 327 }) 328 } 329 330 return &proto.ZoektIndexOptions{ 331 RepoId: int32(o.RepoID), 332 LargeFiles: o.LargeFiles, 333 Symbols: o.Symbols, 334 Branches: branches, 335 Name: o.Name, 336 337 Priority: o.Priority, 338 339 Public: o.Public, 340 Fork: o.Fork, 341 Archived: o.Archived, 342 343 Error: o.Error, 344 345 LanguageMap: languageMap, 346 ShardConcurrency: o.ShardConcurrency, 347 348 TenantId: int64(o.TenantID), 349 } 350} 351 352func (s *sourcegraphClient) getIndexOptions(ctx context.Context, fingerprint *proto.Fingerprint, repos []uint32) ([]indexOptionsItem, *proto.Fingerprint, error) { 353 repoIDs := make([]int32, 0, len(repos)) 354 for _, id := range repos { 355 repoIDs = append(repoIDs, int32(id)) 356 } 357 358 req := proto.SearchConfigurationRequest{ 359 RepoIds: repoIDs, 360 Fingerprint: fingerprint, 361 } 362 363 response, err := s.grpcClient.SearchConfiguration(ctx, &req) 364 if err != nil { 365 return nil, nil, err 366 } 367 368 protoItems := response.GetUpdatedOptions() 369 items := make([]indexOptionsItem, 0, len(protoItems)) 370 for _, x := range protoItems { 371 var item indexOptionsItem 372 item.FromProto(x) 373 item.IndexOptions.CloneURL = s.getCloneURL(item.Name) 374 375 items = append(items, item) 376 } 377 378 return items, response.GetFingerprint(), nil 379} 380 381func (s *sourcegraphClient) getCloneURL(name string) string { 382 return s.Root.ResolveReference(&url.URL{Path: path.Join("/.internal/git", name)}).String() 383} 384 385func (s *sourcegraphClient) listRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) { 386 var request proto.ListRequest 387 request.Hostname = s.Hostname 388 request.IndexedIds = make([]int32, 0, len(indexed)) 389 for _, id := range indexed { 390 request.IndexedIds = append(request.IndexedIds, int32(id)) 391 } 392 393 response, err := s.grpcClient.List(ctx, &request) 394 if err != nil { 395 return nil, err 396 } 397 398 repoIDs := make([]uint32, 0, len(response.RepoIds)) 399 for _, id := range response.RepoIds { 400 repoIDs = append(repoIDs, uint32(id)) 401 } 402 403 return repoIDs, nil 404} 405 406type indexStatus struct { 407 RepoID uint32 408 Branches []zoekt.RepositoryBranch 409 IndexTimeUnix int64 410} 411 412type updateIndexStatusRequest struct { 413 Repositories []indexStatus 414} 415 416func (u *updateIndexStatusRequest) ToProto() *proto.UpdateIndexStatusRequest { 417 repositories := make([]*proto.UpdateIndexStatusRequest_Repository, 0, len(u.Repositories)) 418 419 for _, repo := range u.Repositories { 420 branches := make([]*proto.ZoektRepositoryBranch, 0, len(repo.Branches)) 421 422 for _, branch := range repo.Branches { 423 branches = append(branches, &proto.ZoektRepositoryBranch{ 424 Name: branch.Name, 425 Version: branch.Version, 426 }) 427 } 428 429 repositories = append(repositories, &proto.UpdateIndexStatusRequest_Repository{ 430 RepoId: repo.RepoID, 431 Branches: branches, 432 IndexTimeUnix: repo.IndexTimeUnix, 433 }) 434 } 435 436 return &proto.UpdateIndexStatusRequest{ 437 Repositories: repositories, 438 } 439} 440 441func (u *updateIndexStatusRequest) FromProto(x *proto.UpdateIndexStatusRequest) { 442 protoRepositories := x.GetRepositories() 443 repositories := make([]indexStatus, 0, len(protoRepositories)) 444 445 for _, repo := range x.GetRepositories() { 446 protoBranches := repo.GetBranches() 447 branches := make([]zoekt.RepositoryBranch, 0, len(protoBranches)) 448 449 for _, branch := range repo.GetBranches() { 450 branches = append(branches, zoekt.RepositoryBranch{ 451 Name: branch.GetName(), 452 Version: branch.GetVersion(), 453 }) 454 } 455 456 repositories = append(repositories, indexStatus{ 457 RepoID: repo.GetRepoId(), 458 Branches: branches, 459 IndexTimeUnix: repo.GetIndexTimeUnix(), 460 }) 461 } 462 463 *u = updateIndexStatusRequest{ 464 Repositories: repositories, 465 } 466} 467 468// UpdateIndexStatus sends a request to Sourcegraph to confirm that the given 469// repositories have been indexed. 470func (s *sourcegraphClient) UpdateIndexStatus(repositories []indexStatus) error { 471 r := updateIndexStatusRequest{Repositories: repositories} 472 473 request := r.ToProto() 474 _, err := s.grpcClient.UpdateIndexStatus(context.Background(), request) 475 if err != nil { 476 return fmt.Errorf("failed to update index status: %w", err) 477 } 478 479 return nil 480} 481 482type sourcegraphFake struct { 483 RootDir string 484 Log *log.Logger 485} 486 487func (sf sourcegraphFake) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 488 repos, err := sf.ListRepoIDs(ctx, indexed) 489 if err != nil { 490 return nil, err 491 } 492 493 iterate := func(f func(IndexOptions)) { 494 opts, err := sf.GetIndexOptions(repos...) 495 if err != nil { 496 sf.Log.Printf("WARN: ignoring GetIndexOptions error: %v", err) 497 } 498 for _, opt := range opts { 499 if opt.Error != "" { 500 sf.Log.Printf("WARN: ignoring GetIndexOptions error for %s: %v", opt.Name, opt.Error) 501 continue 502 } 503 f(opt.IndexOptions) 504 } 505 } 506 507 return &SourcegraphListResult{ 508 IDs: repos, 509 IterateIndexOptions: iterate, 510 }, nil 511} 512 513func (sf sourcegraphFake) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 514 opts, err := sf.GetIndexOptions(repos...) 515 if err != nil { 516 for _, id := range repos { 517 onError(id, err) 518 } 519 return 520 } 521 for _, o := range opts { 522 if o.RepoID > 0 && o.Error != "" { 523 onError(o.RepoID, errors.New(o.Error)) 524 } 525 if o.Error == "" { 526 onSuccess(o.IndexOptions) 527 } 528 } 529} 530 531func (sf sourcegraphFake) GetIndexOptions(repos ...uint32) ([]indexOptionsItem, error) { 532 reposIdx := map[uint32]int{} 533 for i, id := range repos { 534 reposIdx[id] = i 535 } 536 537 items := make([]indexOptionsItem, len(repos)) 538 err := sf.visitRepos(func(name string) { 539 idx, ok := reposIdx[sf.id(name)] 540 if !ok { 541 return 542 } 543 opts, err := sf.getIndexOptions(name) 544 if err != nil { 545 items[idx] = indexOptionsItem{Error: err.Error()} 546 } else { 547 items[idx] = indexOptionsItem{IndexOptions: opts} 548 } 549 }) 550 if err != nil { 551 return nil, err 552 } 553 554 for i := range items { 555 if items[i].Error == "" && items[i].RepoID == 0 { 556 items[i].Error = "not found" 557 } 558 } 559 560 return items, nil 561} 562 563func (sf sourcegraphFake) getIndexOptions(name string) (IndexOptions, error) { 564 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name)) 565 exists := func(p string) bool { 566 _, err := os.Stat(filepath.Join(dir, p)) 567 return err == nil 568 } 569 float := func(p string) float64 { 570 b, _ := os.ReadFile(filepath.Join(dir, p)) 571 f, _ := strconv.ParseFloat(string(bytes.TrimSpace(b)), 64) 572 return f 573 } 574 575 opts := IndexOptions{ 576 RepoID: sf.id(name), 577 Name: name, 578 CloneURL: sf.getCloneURL(name), 579 Symbols: true, 580 581 Public: !exists("SG_PRIVATE"), 582 Fork: exists("SG_FORK"), 583 Archived: exists("SG_ARCHIVED"), 584 585 Priority: float("SG_PRIORITY"), 586 } 587 588 branches, err := sf.getBranches(name) 589 if err != nil { 590 return opts, err 591 } 592 opts.Branches = branches 593 594 return opts, nil 595} 596 597func (sf sourcegraphFake) getBranches(name string) ([]zoekt.RepositoryBranch, error) { 598 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name)) 599 repo, err := git.PlainOpen(dir) 600 if err != nil { 601 return nil, err 602 } 603 604 cfg, err := repo.Config() 605 if err != nil { 606 return nil, err 607 } 608 609 sec := cfg.Raw.Section("zoekt") 610 branches := sec.Options.GetAll("branch") 611 if len(branches) == 0 { 612 branches = append(branches, "HEAD") 613 } 614 615 rBranches := make([]zoekt.RepositoryBranch, 0, len(branches)) 616 for _, branch := range branches { 617 cmd := exec.Command("git", "rev-parse", branch) 618 cmd.Dir = dir 619 if b, err := cmd.Output(); err != nil { 620 sf.Log.Printf("WARN: Could not get branch %s/%s", name, branch) 621 } else { 622 version := string(bytes.TrimSpace(b)) 623 rBranches = append(rBranches, zoekt.RepositoryBranch{ 624 Name: branch, 625 Version: version, 626 }) 627 } 628 } 629 630 if len(rBranches) == 0 { 631 return nil, fmt.Errorf("WARN: Could not get any branch revisions for repo %s", name) 632 } 633 634 return rBranches, nil 635} 636 637func (sf sourcegraphFake) id(name string) uint32 { 638 // allow overriding the ID. 639 idPath := filepath.Join(sf.RootDir, filepath.FromSlash(name), "SG_ID") 640 if b, _ := os.ReadFile(idPath); len(b) > 0 { 641 id, err := strconv.Atoi(strings.TrimSpace(string(b))) 642 if err == nil { 643 return uint32(id) 644 } 645 } 646 return fakeID(name) 647} 648 649func (sf sourcegraphFake) getCloneURL(name string) string { 650 return filepath.Join(sf.RootDir, filepath.FromSlash(name)) 651} 652 653func (sf sourcegraphFake) ListRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) { 654 var repos []uint32 655 err := sf.visitRepos(func(name string) { 656 repos = append(repos, sf.id(name)) 657 }) 658 return repos, err 659} 660 661func (sf sourcegraphFake) visitRepos(visit func(name string)) error { 662 return filepath.Walk(sf.RootDir, func(path string, fi os.FileInfo, fileErr error) error { 663 if fileErr != nil { 664 sf.Log.Printf("WARN: ignoring error searching %s: %v", path, fileErr) 665 return nil 666 } 667 if !fi.IsDir() { 668 return nil 669 } 670 671 gitdir := filepath.Join(path, ".git") 672 if fi, err := os.Stat(gitdir); err != nil || !fi.IsDir() { 673 return nil 674 } 675 676 subpath, err := filepath.Rel(sf.RootDir, path) 677 if err != nil { 678 // According to WalkFunc docs, path is always filepath.Join(root, 679 // subpath). So Rel should always work. 680 return fmt.Errorf("filepath.Walk returned %s which is not relative to %s: %w", path, sf.RootDir, err) 681 } 682 683 name := filepath.ToSlash(subpath) 684 visit(name) 685 686 return filepath.SkipDir 687 }) 688} 689 690func (s sourcegraphFake) UpdateIndexStatus(repositories []indexStatus) error { 691 // noop 692 return nil 693} 694 695// fakeID returns a deterministic ID based on name. Used for fakes and tests. 696func fakeID(name string) uint32 { 697 // magic at the end is to ensure we get a positive number when casting. 698 return uint32(crc32.ChecksumIEEE([]byte(name))%(1<<31-1) + 1) 699} 700 701type sourcegraphNop struct{} 702 703func (s sourcegraphNop) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 704 return nil, nil 705} 706 707func (s sourcegraphNop) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 708} 709 710func (s sourcegraphNop) UpdateIndexStatus(repositories []indexStatus) error { 711 return nil 712}