fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "hash/crc32" 9 "log" 10 "math/rand" 11 "net/url" 12 "os" 13 "os/exec" 14 "path" 15 "path/filepath" 16 "strconv" 17 "strings" 18 "time" 19 20 "github.com/go-git/go-git/v5" 21 "golang.org/x/net/trace" 22 23 "github.com/sourcegraph/zoekt" 24 configv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/sourcegraph/zoekt/configuration/v1" 25) 26 27// SourcegraphListResult is the return value of Sourcegraph.List. It is its 28// own type since internally we batch the calculation of index options. This 29// is exposed via IterateIndexOptions. 30// 31// This type has state and is coupled to the Sourcegraph implementation. 32type SourcegraphListResult struct { 33 // IDs is the set of Sourcegraph repository IDs that this replica needs 34 // to index. 35 IDs []uint32 36 37 // IterateIndexOptions best effort resolves the IndexOptions for RepoIDs. If 38 // any repository fails it internally logs. It uses the "config fingerprint" 39 // to reduce the amount of work done. This means we only resolve options for 40 // repositories which have been mutated since the last Sourcegraph.List 41 // call. 42 // 43 // Note: this has a side-effect of setting a the "config fingerprint". The 44 // config fingerprint means we only calculate index options for repositories 45 // that have changed since the last call to IterateIndexOptions. If you want 46 // to force calculation of index options use 47 // Sourcegraph.ForceIterateIndexOptions. 48 // 49 // Note: This should not be called concurrently with the Sourcegraph client. 50 IterateIndexOptions func(func(IndexOptions)) 51} 52 53// Sourcegraph represents the Sourcegraph service. It informs the indexserver 54// what to index and which options to use. 55type Sourcegraph interface { 56 // List returns a list of repository IDs to index as well as a facility to 57 // fetch the indexing options. 58 // 59 // Note: The return value is not safe to use concurrently with future calls 60 // to List. 61 List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) 62 63 // ForceIterateIndexOptions will best-effort calculate the index options for 64 // all repos. For each repo it will call either onSuccess or onError. This 65 // is the forced version of IterateIndexOptions, so will always calculate 66 // options for each id in repos. 67 ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) 68 69 // UpdateIndexStatus sends a request to Sourcegraph to confirm that the 70 // given repositories have been indexed. 71 UpdateIndexStatus(repositories []indexStatus) error 72} 73 74type SourcegraphClientOption func(*sourcegraphClient) 75 76// WithBatchSize controls how many repository configurations we request a time. 77// If BatchSize is 0, we default to requesting 10,000 repositories at once. 78func WithBatchSize(batchSize int) SourcegraphClientOption { 79 return func(c *sourcegraphClient) { 80 c.BatchSize = batchSize 81 } 82} 83 84func newSourcegraphClient(rootURL *url.URL, hostname string, grpcClient configv1.ZoektConfigurationServiceClient, opts ...SourcegraphClientOption) *sourcegraphClient { 85 client := &sourcegraphClient{ 86 Root: rootURL, 87 Hostname: hostname, 88 BatchSize: 0, 89 grpcClient: grpcClient, 90 } 91 92 for _, opt := range opts { 93 opt(client) 94 } 95 96 return client 97} 98 99// sourcegraphClient contains methods which interact with the sourcegraph API. 100type sourcegraphClient struct { 101 // Root is the base URL for the Sourcegraph instance to index. Normally 102 // http://sourcegraph-frontend-internal or http://localhost:3090. 103 Root *url.URL 104 105 // Hostname is the name we advertise to Sourcegraph when asking for the 106 // list of repositories to index. 107 Hostname string 108 109 // BatchSize is how many repository configurations we request at once. If 110 // zero a value of 10000 is used. 111 BatchSize int 112 113 // grpcClient is used to make requests to the Sourcegraph instance if gRPC is enabled. 114 grpcClient configv1.ZoektConfigurationServiceClient 115 116 // configFingerprintProto is the last config fingerprint (as GRPC) returned from 117 // Sourcegraph. It can be used for future calls to the configuration 118 // endpoint. 119 // 120 // configFingerprintProto is mutually exclusive with configFingerprint - this field 121 // will only be used if gRPC is enabled. 122 configFingerprintProto *configv1.Fingerprint 123 124 // configFingerprintReset tracks when we should zero out the 125 // configFingerprint. We want to periodically do this just in case our 126 // configFingerprint logic is faulty. When it is cleared out, we fallback to 127 // calculating everything. 128 configFingerprintReset time.Time 129} 130 131func (s *sourcegraphClient) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 132 repos, err := s.listRepoIDs(ctx, indexed) 133 if err != nil { 134 return nil, fmt.Errorf("listRepoIDs: %w", err) 135 } 136 137 batchSize := s.BatchSize 138 if batchSize == 0 { 139 batchSize = 10_000 140 } 141 142 // Check if we should recalculate everything. 143 if time.Now().After(s.configFingerprintReset) { 144 // for every 500 repos we wait a minute. 2021-12-15 on sourcegraph.com 145 // this works out to every 100 minutes. 146 next := time.Duration(len(indexed)) * time.Minute / 500 147 if min := 5 * time.Minute; next < min { 148 next = min 149 } 150 next += time.Duration(rand.Int63n(int64(next) / 4)) // jitter 151 s.configFingerprintReset = time.Now().Add(next) 152 153 s.configFingerprintProto = nil 154 } 155 156 // getIndexOptionsFunc is a function that can be used to get the index 157 // options for a set of repos (while properly handling any configuration fingerprint 158 // changes). 159 // 160 // In general, this function provides a consistent fingerprint for each batch call, 161 // and updates the server state with the new fingerprint. If any of the batch calls 162 // fail, the old fingerprint is restored. 163 type getIndexOptionsFunc func(repos ...uint32) ([]indexOptionsItem, error) 164 165 mkGetIndexOptionsFunc := func(tr trace.Trace) getIndexOptionsFunc { 166 startingFingerPrint := s.configFingerprintProto 167 tr.LazyPrintf("fingerprint: %s", startingFingerPrint.String()) 168 169 first := true 170 return func(repos ...uint32) ([]indexOptionsItem, error) { 171 options, nextFingerPrint, err := s.getIndexOptions(ctx, startingFingerPrint, repos) 172 if err != nil { 173 first = false 174 s.configFingerprintProto = startingFingerPrint 175 176 return nil, err 177 } 178 179 if first { 180 first = false 181 s.configFingerprintProto = nextFingerPrint 182 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint.String()) 183 } 184 185 return options, nil 186 } 187 } 188 189 iterate := func(f func(IndexOptions)) { 190 start := time.Now() 191 tr := trace.New("getIndexOptions", "") 192 tr.LazyPrintf("getting index options for %d repos", len(repos)) 193 194 defer func() { 195 metricResolveRevisionsDuration.Observe(time.Since(start).Seconds()) 196 tr.Finish() 197 }() 198 199 getIndexOptions := mkGetIndexOptionsFunc(tr) 200 201 // We ask the frontend to get index options in batches. 202 for repos := range batched(repos, batchSize) { 203 start := time.Now() 204 options, err := getIndexOptions(repos...) 205 duration := time.Since(start) 206 207 if err != nil { 208 metricResolveRevisionDuration.WithLabelValues("false").Observe(duration.Seconds()) 209 tr.LazyPrintf("failed fetching options batch: %v", err) 210 tr.SetError() 211 212 continue 213 } 214 215 metricResolveRevisionDuration.WithLabelValues("true").Observe(duration.Seconds()) 216 217 for _, o := range options { 218 metricGetIndexOptions.Inc() 219 220 if o.Error != "" { 221 metricGetIndexOptionsError.Inc() 222 tr.LazyPrintf("failed fetching options for %v: %v", o.Name, o.Error) 223 tr.SetError() 224 225 continue 226 } 227 f(o.IndexOptions) 228 } 229 } 230 } 231 232 return &SourcegraphListResult{ 233 IDs: repos, 234 IterateIndexOptions: iterate, 235 }, nil 236} 237 238func (s *sourcegraphClient) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 239 batchSize := s.BatchSize 240 if batchSize == 0 { 241 batchSize = 10_000 242 } 243 244 for repos := range batched(repos, batchSize) { 245 opts, _, err := s.getIndexOptions(context.Background(), nil, repos) 246 if err != nil { 247 for _, id := range repos { 248 onError(id, err) 249 } 250 continue 251 } 252 for _, o := range opts { 253 if o.RepoID > 0 && o.Error != "" { 254 onError(o.RepoID, errors.New(o.Error)) 255 } 256 if o.Error == "" { 257 onSuccess(o.IndexOptions) 258 } 259 } 260 } 261} 262 263// indexOptionsItem wraps IndexOptions to also include an error returned by 264// the API. 265type indexOptionsItem struct { 266 IndexOptions 267 Error string 268} 269 270func (o *indexOptionsItem) FromProto(x *configv1.ZoektIndexOptions) { 271 item := indexOptionsItem{} 272 item.IndexOptions.FromProto(x) 273 item.Error = x.GetError() 274 *o = item 275} 276 277func (o *indexOptionsItem) ToProto() *configv1.ZoektIndexOptions { 278 branches := make([]*configv1.ZoektRepositoryBranch, 0, len(o.Branches)) 279 for _, b := range o.Branches { 280 branches = append(branches, &configv1.ZoektRepositoryBranch{ 281 Name: b.Name, 282 Version: b.Version, 283 }) 284 } 285 286 languageMap := make([]*configv1.LanguageMapping, 0, len(o.LanguageMap)) 287 288 for lang, parser := range o.LanguageMap { 289 languageMap = append(languageMap, &configv1.LanguageMapping{ 290 Language: lang, 291 Ctags: configv1.CTagsParserType(parser), 292 }) 293 } 294 295 return &configv1.ZoektIndexOptions{ 296 RepoId: int32(o.RepoID), 297 LargeFiles: o.LargeFiles, 298 Symbols: o.Symbols, 299 Branches: branches, 300 Name: o.Name, 301 302 Priority: o.Priority, 303 304 Public: o.Public, 305 Fork: o.Fork, 306 Archived: o.Archived, 307 308 Error: o.Error, 309 310 LanguageMap: languageMap, 311 ShardConcurrency: o.ShardConcurrency, 312 313 TenantId: int64(o.TenantID), 314 } 315} 316 317func (s *sourcegraphClient) getIndexOptions(ctx context.Context, fingerprint *configv1.Fingerprint, repos []uint32) ([]indexOptionsItem, *configv1.Fingerprint, error) { 318 repoIDs := make([]int32, 0, len(repos)) 319 for _, id := range repos { 320 repoIDs = append(repoIDs, int32(id)) 321 } 322 323 req := configv1.SearchConfigurationRequest{ 324 RepoIds: repoIDs, 325 Fingerprint: fingerprint, 326 } 327 328 response, err := s.grpcClient.SearchConfiguration(ctx, &req) 329 if err != nil { 330 return nil, nil, err 331 } 332 333 protoItems := response.GetUpdatedOptions() 334 items := make([]indexOptionsItem, 0, len(protoItems)) 335 for _, x := range protoItems { 336 var item indexOptionsItem 337 item.FromProto(x) 338 item.IndexOptions.CloneURL = s.getCloneURL(item.Name) 339 340 items = append(items, item) 341 } 342 343 return items, response.GetFingerprint(), nil 344} 345 346func (s *sourcegraphClient) getCloneURL(name string) string { 347 return s.Root.ResolveReference(&url.URL{Path: path.Join("/.internal/git", name)}).String() 348} 349 350func (s *sourcegraphClient) listRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) { 351 var request configv1.ListRequest 352 request.Hostname = s.Hostname 353 request.IndexedIds = make([]int32, 0, len(indexed)) 354 for _, id := range indexed { 355 request.IndexedIds = append(request.IndexedIds, int32(id)) 356 } 357 358 response, err := s.grpcClient.List(ctx, &request) 359 if err != nil { 360 return nil, err 361 } 362 363 repoIDs := make([]uint32, 0, len(response.RepoIds)) 364 for _, id := range response.RepoIds { 365 repoIDs = append(repoIDs, uint32(id)) 366 } 367 368 return repoIDs, nil 369} 370 371type indexStatus struct { 372 RepoID uint32 373 Branches []zoekt.RepositoryBranch 374 IndexTimeUnix int64 375} 376 377type updateIndexStatusRequest struct { 378 Repositories []indexStatus 379} 380 381func (u *updateIndexStatusRequest) ToProto() *configv1.UpdateIndexStatusRequest { 382 repositories := make([]*configv1.UpdateIndexStatusRequest_Repository, 0, len(u.Repositories)) 383 384 for _, repo := range u.Repositories { 385 branches := make([]*configv1.ZoektRepositoryBranch, 0, len(repo.Branches)) 386 387 for _, branch := range repo.Branches { 388 branches = append(branches, &configv1.ZoektRepositoryBranch{ 389 Name: branch.Name, 390 Version: branch.Version, 391 }) 392 } 393 394 repositories = append(repositories, &configv1.UpdateIndexStatusRequest_Repository{ 395 RepoId: repo.RepoID, 396 Branches: branches, 397 IndexTimeUnix: repo.IndexTimeUnix, 398 }) 399 } 400 401 return &configv1.UpdateIndexStatusRequest{ 402 Repositories: repositories, 403 } 404} 405 406func (u *updateIndexStatusRequest) FromProto(x *configv1.UpdateIndexStatusRequest) { 407 protoRepositories := x.GetRepositories() 408 repositories := make([]indexStatus, 0, len(protoRepositories)) 409 410 for _, repo := range x.GetRepositories() { 411 protoBranches := repo.GetBranches() 412 branches := make([]zoekt.RepositoryBranch, 0, len(protoBranches)) 413 414 for _, branch := range repo.GetBranches() { 415 branches = append(branches, zoekt.RepositoryBranch{ 416 Name: branch.GetName(), 417 Version: branch.GetVersion(), 418 }) 419 } 420 421 repositories = append(repositories, indexStatus{ 422 RepoID: repo.GetRepoId(), 423 Branches: branches, 424 IndexTimeUnix: repo.GetIndexTimeUnix(), 425 }) 426 } 427 428 *u = updateIndexStatusRequest{ 429 Repositories: repositories, 430 } 431} 432 433// UpdateIndexStatus sends a request to Sourcegraph to confirm that the given 434// repositories have been indexed. 435func (s *sourcegraphClient) UpdateIndexStatus(repositories []indexStatus) error { 436 r := updateIndexStatusRequest{Repositories: repositories} 437 438 request := r.ToProto() 439 _, err := s.grpcClient.UpdateIndexStatus(context.Background(), request) 440 if err != nil { 441 return fmt.Errorf("failed to update index status: %w", err) 442 } 443 444 return nil 445} 446 447type sourcegraphFake struct { 448 RootDir string 449 Log *log.Logger 450} 451 452func (sf sourcegraphFake) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 453 repos, err := sf.ListRepoIDs(ctx, indexed) 454 if err != nil { 455 return nil, err 456 } 457 458 iterate := func(f func(IndexOptions)) { 459 opts, err := sf.GetIndexOptions(repos...) 460 if err != nil { 461 sf.Log.Printf("WARN: ignoring GetIndexOptions error: %v", err) 462 } 463 for _, opt := range opts { 464 if opt.Error != "" { 465 sf.Log.Printf("WARN: ignoring GetIndexOptions error for %s: %v", opt.Name, opt.Error) 466 continue 467 } 468 f(opt.IndexOptions) 469 } 470 } 471 472 return &SourcegraphListResult{ 473 IDs: repos, 474 IterateIndexOptions: iterate, 475 }, nil 476} 477 478func (sf sourcegraphFake) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 479 opts, err := sf.GetIndexOptions(repos...) 480 if err != nil { 481 for _, id := range repos { 482 onError(id, err) 483 } 484 return 485 } 486 for _, o := range opts { 487 if o.RepoID > 0 && o.Error != "" { 488 onError(o.RepoID, errors.New(o.Error)) 489 } 490 if o.Error == "" { 491 onSuccess(o.IndexOptions) 492 } 493 } 494} 495 496func (sf sourcegraphFake) GetIndexOptions(repos ...uint32) ([]indexOptionsItem, error) { 497 reposIdx := map[uint32]int{} 498 for i, id := range repos { 499 reposIdx[id] = i 500 } 501 502 items := make([]indexOptionsItem, len(repos)) 503 err := sf.visitRepos(func(name string) { 504 idx, ok := reposIdx[sf.id(name)] 505 if !ok { 506 return 507 } 508 opts, err := sf.getIndexOptions(name) 509 if err != nil { 510 items[idx] = indexOptionsItem{Error: err.Error()} 511 } else { 512 items[idx] = indexOptionsItem{IndexOptions: opts} 513 } 514 }) 515 if err != nil { 516 return nil, err 517 } 518 519 for i := range items { 520 if items[i].Error == "" && items[i].RepoID == 0 { 521 items[i].Error = "not found" 522 } 523 } 524 525 return items, nil 526} 527 528func (sf sourcegraphFake) getIndexOptions(name string) (IndexOptions, error) { 529 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name)) 530 exists := func(p string) bool { 531 _, err := os.Stat(filepath.Join(dir, p)) 532 return err == nil 533 } 534 float := func(p string) float64 { 535 b, _ := os.ReadFile(filepath.Join(dir, p)) 536 f, _ := strconv.ParseFloat(string(bytes.TrimSpace(b)), 64) 537 return f 538 } 539 540 opts := IndexOptions{ 541 RepoID: sf.id(name), 542 Name: name, 543 CloneURL: sf.getCloneURL(name), 544 Symbols: true, 545 546 Public: !exists("SG_PRIVATE"), 547 Fork: exists("SG_FORK"), 548 Archived: exists("SG_ARCHIVED"), 549 550 Priority: float("SG_PRIORITY"), 551 } 552 553 branches, err := sf.getBranches(name) 554 if err != nil { 555 return opts, err 556 } 557 opts.Branches = branches 558 559 return opts, nil 560} 561 562func (sf sourcegraphFake) getBranches(name string) ([]zoekt.RepositoryBranch, error) { 563 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name)) 564 repo, err := git.PlainOpen(dir) 565 if err != nil { 566 return nil, err 567 } 568 569 cfg, err := repo.Config() 570 if err != nil { 571 return nil, err 572 } 573 574 sec := cfg.Raw.Section("zoekt") 575 branches := sec.Options.GetAll("branch") 576 if len(branches) == 0 { 577 branches = append(branches, "HEAD") 578 } 579 580 rBranches := make([]zoekt.RepositoryBranch, 0, len(branches)) 581 for _, branch := range branches { 582 cmd := exec.Command("git", "rev-parse", branch) 583 cmd.Dir = dir 584 if b, err := cmd.Output(); err != nil { 585 sf.Log.Printf("WARN: Could not get branch %s/%s", name, branch) 586 } else { 587 version := string(bytes.TrimSpace(b)) 588 rBranches = append(rBranches, zoekt.RepositoryBranch{ 589 Name: branch, 590 Version: version, 591 }) 592 } 593 } 594 595 if len(rBranches) == 0 { 596 return nil, fmt.Errorf("WARN: Could not get any branch revisions for repo %s", name) 597 } 598 599 return rBranches, nil 600} 601 602func (sf sourcegraphFake) id(name string) uint32 { 603 // allow overriding the ID. 604 idPath := filepath.Join(sf.RootDir, filepath.FromSlash(name), "SG_ID") 605 if b, _ := os.ReadFile(idPath); len(b) > 0 { 606 id, err := strconv.Atoi(strings.TrimSpace(string(b))) 607 if err == nil { 608 return uint32(id) 609 } 610 } 611 return fakeID(name) 612} 613 614func (sf sourcegraphFake) getCloneURL(name string) string { 615 return filepath.Join(sf.RootDir, filepath.FromSlash(name)) 616} 617 618func (sf sourcegraphFake) ListRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) { 619 var repos []uint32 620 err := sf.visitRepos(func(name string) { 621 repos = append(repos, sf.id(name)) 622 }) 623 return repos, err 624} 625 626func (sf sourcegraphFake) visitRepos(visit func(name string)) error { 627 return filepath.Walk(sf.RootDir, func(path string, fi os.FileInfo, fileErr error) error { 628 if fileErr != nil { 629 sf.Log.Printf("WARN: ignoring error searching %s: %v", path, fileErr) 630 return nil 631 } 632 if !fi.IsDir() { 633 return nil 634 } 635 636 gitdir := filepath.Join(path, ".git") 637 if fi, err := os.Stat(gitdir); err != nil || !fi.IsDir() { 638 return nil 639 } 640 641 subpath, err := filepath.Rel(sf.RootDir, path) 642 if err != nil { 643 // According to WalkFunc docs, path is always filepath.Join(root, 644 // subpath). So Rel should always work. 645 return fmt.Errorf("filepath.Walk returned %s which is not relative to %s: %w", path, sf.RootDir, err) 646 } 647 648 name := filepath.ToSlash(subpath) 649 visit(name) 650 651 return filepath.SkipDir 652 }) 653} 654 655func (s sourcegraphFake) UpdateIndexStatus(repositories []indexStatus) error { 656 // noop 657 return nil 658} 659 660// fakeID returns a deterministic ID based on name. Used for fakes and tests. 661func fakeID(name string) uint32 { 662 // magic at the end is to ensure we get a positive number when casting. 663 return uint32(crc32.ChecksumIEEE([]byte(name))%(1<<31-1) + 1) 664} 665 666type sourcegraphNop struct{} 667 668func (s sourcegraphNop) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 669 return nil, nil 670} 671 672func (s sourcegraphNop) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 673} 674 675func (s sourcegraphNop) UpdateIndexStatus(repositories []indexStatus) error { 676 return nil 677}