fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "hash/crc32" 9 "log" 10 "math/rand" 11 "net/url" 12 "os" 13 "os/exec" 14 "path" 15 "path/filepath" 16 "strconv" 17 "strings" 18 "time" 19 20 "github.com/go-git/go-git/v5" 21 "golang.org/x/net/trace" 22 23 "github.com/sourcegraph/zoekt" 24 configv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/sourcegraph/zoekt/configuration/v1" 25) 26 27// SourcegraphListResult is the return value of Sourcegraph.List. It is its 28// own type since internally we batch the calculation of index options. This 29// is exposed via IterateIndexOptions. 30// 31// This type has state and is coupled to the Sourcegraph implementation. 32type SourcegraphListResult struct { 33 // IDs is the set of Sourcegraph repository IDs that this replica needs 34 // to index. 35 IDs []uint32 36 37 // IterateIndexOptions best effort resolves the IndexOptions for RepoIDs. If 38 // any repository fails it internally logs. It uses the "config fingerprint" 39 // to reduce the amount of work done. This means we only resolve options for 40 // repositories which have been mutated since the last Sourcegraph.List 41 // call. 42 // 43 // Note: this has a side-effect of setting a the "config fingerprint". The 44 // config fingerprint means we only calculate index options for repositories 45 // that have changed since the last call to IterateIndexOptions. If you want 46 // to force calculation of index options use 47 // Sourcegraph.ForceIterateIndexOptions. 48 // 49 // Note: This should not be called concurrently with the Sourcegraph client. 50 IterateIndexOptions func(func(IndexOptions)) 51} 52 53// Sourcegraph represents the Sourcegraph service. It informs the indexserver 54// what to index and which options to use. 55type Sourcegraph interface { 56 // List returns a list of repository IDs to index as well as a facility to 57 // fetch the indexing options. 58 // 59 // Note: The return value is not safe to use concurrently with future calls 60 // to List. 61 List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) 62 63 // ForceIterateIndexOptions will best-effort calculate the index options for 64 // all repos. For each repo it will call either onSuccess or onError. This 65 // is the forced version of IterateIndexOptions, so will always calculate 66 // options for each id in repos. 67 ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) 68 69 // UpdateIndexStatus sends a request to Sourcegraph to confirm that the 70 // given repositories have been indexed. 71 UpdateIndexStatus(repositories []indexStatus) error 72} 73 74type SourcegraphClientOption func(*sourcegraphClient) 75 76// WithBatchSize controls how many repository configurations we request a time. 77// If BatchSize is 0, we default to requesting 10,000 repositories at once. 78func WithBatchSize(batchSize int) SourcegraphClientOption { 79 return func(c *sourcegraphClient) { 80 c.BatchSize = batchSize 81 } 82} 83 84func newSourcegraphClient(rootURL *url.URL, hostname string, grpcClient configv1.ZoektConfigurationServiceClient, opts ...SourcegraphClientOption) *sourcegraphClient { 85 client := &sourcegraphClient{ 86 Root: rootURL, 87 Hostname: hostname, 88 BatchSize: 0, 89 grpcClient: grpcClient, 90 } 91 92 for _, opt := range opts { 93 opt(client) 94 } 95 96 return client 97} 98 99// sourcegraphClient contains methods which interact with the sourcegraph API. 100type sourcegraphClient struct { 101 // Root is the base URL for the Sourcegraph instance to index. Normally 102 // http://sourcegraph-frontend-internal or http://localhost:3090. 103 Root *url.URL 104 105 // Hostname is the name we advertise to Sourcegraph when asking for the 106 // list of repositories to index. 107 Hostname string 108 109 // BatchSize is how many repository configurations we request at once. If 110 // zero a value of 10000 is used. 111 BatchSize int 112 113 // grpcClient is used to make requests to the Sourcegraph instance if gRPC is enabled. 114 grpcClient configv1.ZoektConfigurationServiceClient 115 116 // configFingerprintProto is the last config fingerprint (as GRPC) returned from 117 // Sourcegraph. It can be used for future calls to the configuration 118 // endpoint. 119 // 120 // configFingerprintProto is mutually exclusive with configFingerprint - this field 121 // will only be used if gRPC is enabled. 122 configFingerprintProto *configv1.Fingerprint 123 124 // configFingerprintReset tracks when we should zero out the 125 // configFingerprint. We want to periodically do this just in case our 126 // configFingerprint logic is faulty. When it is cleared out, we fallback to 127 // calculating everything. 128 configFingerprintReset time.Time 129} 130 131func (s *sourcegraphClient) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 132 repos, err := s.listRepoIDs(ctx, indexed) 133 if err != nil { 134 return nil, fmt.Errorf("listRepoIDs: %w", err) 135 } 136 137 batchSize := s.BatchSize 138 if batchSize == 0 { 139 batchSize = 10_000 140 } 141 142 // Check if we should recalculate everything. 143 if time.Now().After(s.configFingerprintReset) { 144 // for every 500 repos we wait a minute. 2021-12-15 on sourcegraph.com 145 // this works out to every 100 minutes. 146 next := time.Duration(len(indexed)) * time.Minute / 500 147 if min := 5 * time.Minute; next < min { 148 next = min 149 } 150 next += time.Duration(rand.Int63n(int64(next) / 4)) // jitter 151 s.configFingerprintReset = time.Now().Add(next) 152 153 s.configFingerprintProto = nil 154 } 155 156 // getIndexOptionsFunc is a function that can be used to get the index 157 // options for a set of repos (while properly handling any configuration fingerprint 158 // changes). 159 // 160 // In general, this function provides a consistent fingerprint for each batch call, 161 // and updates the server state with the new fingerprint. If any of the batch calls 162 // fail, the old fingerprint is restored. 163 type getIndexOptionsFunc func(repos ...uint32) ([]indexOptionsItem, error) 164 165 mkGetIndexOptionsFunc := func(tr trace.Trace) getIndexOptionsFunc { 166 startingFingerPrint := s.configFingerprintProto 167 tr.LazyPrintf("fingerprint: %s", startingFingerPrint.String()) 168 169 first := true 170 return func(repos ...uint32) ([]indexOptionsItem, error) { 171 options, nextFingerPrint, err := s.getIndexOptions(ctx, startingFingerPrint, repos) 172 if err != nil { 173 first = false 174 s.configFingerprintProto = startingFingerPrint 175 176 return nil, err 177 } 178 179 if first { 180 first = false 181 s.configFingerprintProto = nextFingerPrint 182 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint.String()) 183 } 184 185 return options, nil 186 } 187 } 188 189 iterate := func(f func(IndexOptions)) { 190 start := time.Now() 191 tr := trace.New("getIndexOptions", "") 192 tr.LazyPrintf("getting index options for %d repos", len(repos)) 193 194 defer func() { 195 metricResolveRevisionsDuration.Observe(time.Since(start).Seconds()) 196 tr.Finish() 197 }() 198 199 getIndexOptions := mkGetIndexOptionsFunc(tr) 200 201 // We ask the frontend to get index options in batches. 202 for repos := range batched(repos, batchSize) { 203 start := time.Now() 204 options, err := getIndexOptions(repos...) 205 duration := time.Since(start) 206 207 if err != nil { 208 metricResolveRevisionDuration.WithLabelValues("false").Observe(duration.Seconds()) 209 tr.LazyPrintf("failed fetching options batch: %v", err) 210 tr.SetError() 211 212 continue 213 } 214 215 metricResolveRevisionDuration.WithLabelValues("true").Observe(duration.Seconds()) 216 217 for _, o := range options { 218 metricGetIndexOptions.Inc() 219 220 if o.Error != "" { 221 metricGetIndexOptionsError.Inc() 222 tr.LazyPrintf("failed fetching options for %v: %v", o.Name, o.Error) 223 tr.SetError() 224 225 continue 226 } 227 f(o.IndexOptions) 228 } 229 } 230 } 231 232 return &SourcegraphListResult{ 233 IDs: repos, 234 IterateIndexOptions: iterate, 235 }, nil 236} 237 238func (s *sourcegraphClient) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 239 batchSize := s.BatchSize 240 if batchSize == 0 { 241 batchSize = 10_000 242 } 243 244 for repos := range batched(repos, batchSize) { 245 opts, _, err := s.getIndexOptions(context.Background(), nil, repos) 246 if err != nil { 247 for _, id := range repos { 248 onError(id, err) 249 } 250 continue 251 } 252 for _, o := range opts { 253 if o.RepoID > 0 && o.Error != "" { 254 onError(o.RepoID, errors.New(o.Error)) 255 } 256 if o.Error == "" { 257 onSuccess(o.IndexOptions) 258 } 259 } 260 } 261} 262 263// indexOptionsItem wraps IndexOptions to also include an error returned by 264// the API. 265type indexOptionsItem struct { 266 IndexOptions 267 Error string 268} 269 270func (o *indexOptionsItem) FromProto(x *configv1.ZoektIndexOptions) { 271 item := indexOptionsItem{} 272 item.IndexOptions.FromProto(x) 273 item.Error = x.GetError() 274 *o = item 275} 276 277func (o *indexOptionsItem) ToProto() *configv1.ZoektIndexOptions { 278 branches := make([]*configv1.ZoektRepositoryBranch, 0, len(o.Branches)) 279 for _, b := range o.Branches { 280 branches = append(branches, &configv1.ZoektRepositoryBranch{ 281 Name: b.Name, 282 Version: b.Version, 283 }) 284 } 285 286 languageMap := make([]*configv1.LanguageMapping, 0, len(o.LanguageMap)) 287 288 for lang, parser := range o.LanguageMap { 289 languageMap = append(languageMap, &configv1.LanguageMapping{ 290 Language: lang, 291 Ctags: configv1.CTagsParserType(parser), 292 }) 293 } 294 295 return &configv1.ZoektIndexOptions{ 296 RepoId: int32(o.RepoID), 297 LargeFiles: o.LargeFiles, 298 Symbols: o.Symbols, 299 Branches: branches, 300 Name: o.Name, 301 302 Priority: o.Priority, 303 304 Public: o.Public, 305 Fork: o.Fork, 306 Archived: o.Archived, 307 308 Error: o.Error, 309 310 LanguageMap: languageMap, 311 ShardConcurrency: o.ShardConcurrency, 312 313 TenantId: int64(o.TenantID), 314 } 315} 316 317func (s *sourcegraphClient) getIndexOptions(ctx context.Context, fingerprint *configv1.Fingerprint, repos []uint32) ([]indexOptionsItem, *configv1.Fingerprint, error) { 318 repoIDs := make([]int32, 0, len(repos)) 319 for _, id := range repos { 320 repoIDs = append(repoIDs, int32(id)) 321 } 322 323 req := configv1.SearchConfigurationRequest{ 324 RepoIds: repoIDs, 325 Fingerprint: fingerprint, 326 } 327 328 response, err := s.grpcClient.SearchConfiguration(ctx, &req) 329 if err != nil { 330 return nil, nil, err 331 } 332 333 protoItems := response.GetUpdatedOptions() 334 items := make([]indexOptionsItem, 0, len(protoItems)) 335 for _, x := range protoItems { 336 var item indexOptionsItem 337 item.FromProto(x) 338 item.IndexOptions.CloneURL = s.getCloneURL(item.Name) 339 340 items = append(items, item) 341 } 342 343 return items, response.GetFingerprint(), nil 344} 345 346func (s *sourcegraphClient) getCloneURL(name string) string { 347 return s.Root.ResolveReference(&url.URL{Path: path.Join("/.internal/git", name)}).String() 348} 349 350func (s *sourcegraphClient) listRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) { 351 var request configv1.ListRequest 352 request.Hostname = s.Hostname 353 request.IndexedIds = make([]int32, 0, len(indexed)) 354 for _, id := range indexed { 355 request.IndexedIds = append(request.IndexedIds, int32(id)) 356 } 357 358 response, err := s.grpcClient.List(ctx, &request) 359 if err != nil { 360 return nil, err 361 } 362 363 repoIDs := make([]uint32, 0, len(response.RepoIds)) 364 for _, id := range response.RepoIds { 365 repoIDs = append(repoIDs, uint32(id)) 366 } 367 368 return repoIDs, nil 369} 370 371type indexStatus struct { 372 RepoID uint32 373 Branches []zoekt.RepositoryBranch 374 IndexTimeUnix int64 375 State configv1.UpdateIndexStatusRequest_Repository_State 376 FailureMessage string 377} 378 379type updateIndexStatusRequest struct { 380 Repositories []indexStatus 381} 382 383func (u *updateIndexStatusRequest) ToProto() *configv1.UpdateIndexStatusRequest { 384 repositories := make([]*configv1.UpdateIndexStatusRequest_Repository, 0, len(u.Repositories)) 385 386 for _, repo := range u.Repositories { 387 branches := make([]*configv1.ZoektRepositoryBranch, 0, len(repo.Branches)) 388 389 for _, branch := range repo.Branches { 390 branches = append(branches, &configv1.ZoektRepositoryBranch{ 391 Name: branch.Name, 392 Version: branch.Version, 393 }) 394 } 395 396 repositories = append(repositories, &configv1.UpdateIndexStatusRequest_Repository{ 397 RepoId: repo.RepoID, 398 Branches: branches, 399 IndexTimeUnix: repo.IndexTimeUnix, 400 State: repo.State, 401 FailureMessage: repo.FailureMessage, 402 }) 403 } 404 405 return &configv1.UpdateIndexStatusRequest{ 406 Repositories: repositories, 407 } 408} 409 410func (u *updateIndexStatusRequest) FromProto(x *configv1.UpdateIndexStatusRequest) { 411 protoRepositories := x.GetRepositories() 412 repositories := make([]indexStatus, 0, len(protoRepositories)) 413 414 for _, repo := range x.GetRepositories() { 415 protoBranches := repo.GetBranches() 416 branches := make([]zoekt.RepositoryBranch, 0, len(protoBranches)) 417 418 for _, branch := range repo.GetBranches() { 419 branches = append(branches, zoekt.RepositoryBranch{ 420 Name: branch.GetName(), 421 Version: branch.GetVersion(), 422 }) 423 } 424 425 repositories = append(repositories, indexStatus{ 426 RepoID: repo.GetRepoId(), 427 Branches: branches, 428 IndexTimeUnix: repo.GetIndexTimeUnix(), 429 State: repo.GetState(), 430 FailureMessage: repo.GetFailureMessage(), 431 }) 432 } 433 434 *u = updateIndexStatusRequest{ 435 Repositories: repositories, 436 } 437} 438 439// UpdateIndexStatus sends a request to Sourcegraph to confirm that the given 440// repositories have been indexed. 441func (s *sourcegraphClient) UpdateIndexStatus(repositories []indexStatus) error { 442 r := updateIndexStatusRequest{Repositories: repositories} 443 444 request := r.ToProto() 445 _, err := s.grpcClient.UpdateIndexStatus(context.Background(), request) 446 if err != nil { 447 return fmt.Errorf("failed to update index status: %w", err) 448 } 449 450 return nil 451} 452 453type sourcegraphFake struct { 454 RootDir string 455 Log *log.Logger 456} 457 458func (sf sourcegraphFake) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 459 repos, err := sf.ListRepoIDs(ctx, indexed) 460 if err != nil { 461 return nil, err 462 } 463 464 iterate := func(f func(IndexOptions)) { 465 opts, err := sf.GetIndexOptions(repos...) 466 if err != nil { 467 sf.Log.Printf("WARN: ignoring GetIndexOptions error: %v", err) 468 } 469 for _, opt := range opts { 470 if opt.Error != "" { 471 sf.Log.Printf("WARN: ignoring GetIndexOptions error for %s: %v", opt.Name, opt.Error) 472 continue 473 } 474 f(opt.IndexOptions) 475 } 476 } 477 478 return &SourcegraphListResult{ 479 IDs: repos, 480 IterateIndexOptions: iterate, 481 }, nil 482} 483 484func (sf sourcegraphFake) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 485 opts, err := sf.GetIndexOptions(repos...) 486 if err != nil { 487 for _, id := range repos { 488 onError(id, err) 489 } 490 return 491 } 492 for _, o := range opts { 493 if o.RepoID > 0 && o.Error != "" { 494 onError(o.RepoID, errors.New(o.Error)) 495 } 496 if o.Error == "" { 497 onSuccess(o.IndexOptions) 498 } 499 } 500} 501 502func (sf sourcegraphFake) GetIndexOptions(repos ...uint32) ([]indexOptionsItem, error) { 503 reposIdx := map[uint32]int{} 504 for i, id := range repos { 505 reposIdx[id] = i 506 } 507 508 items := make([]indexOptionsItem, len(repos)) 509 err := sf.visitRepos(func(name string) { 510 idx, ok := reposIdx[sf.id(name)] 511 if !ok { 512 return 513 } 514 opts, err := sf.getIndexOptions(name) 515 if err != nil { 516 items[idx] = indexOptionsItem{Error: err.Error()} 517 } else { 518 items[idx] = indexOptionsItem{IndexOptions: opts} 519 } 520 }) 521 if err != nil { 522 return nil, err 523 } 524 525 for i := range items { 526 if items[i].Error == "" && items[i].RepoID == 0 { 527 items[i].Error = "not found" 528 } 529 } 530 531 return items, nil 532} 533 534func (sf sourcegraphFake) getIndexOptions(name string) (IndexOptions, error) { 535 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name)) 536 exists := func(p string) bool { 537 _, err := os.Stat(filepath.Join(dir, p)) 538 return err == nil 539 } 540 float := func(p string) float64 { 541 b, _ := os.ReadFile(filepath.Join(dir, p)) 542 f, _ := strconv.ParseFloat(string(bytes.TrimSpace(b)), 64) 543 return f 544 } 545 546 opts := IndexOptions{ 547 RepoID: sf.id(name), 548 Name: name, 549 CloneURL: sf.getCloneURL(name), 550 Symbols: true, 551 552 Public: !exists("SG_PRIVATE"), 553 Fork: exists("SG_FORK"), 554 Archived: exists("SG_ARCHIVED"), 555 556 Priority: float("SG_PRIORITY"), 557 } 558 559 branches, err := sf.getBranches(name) 560 if err != nil { 561 return opts, err 562 } 563 opts.Branches = branches 564 565 return opts, nil 566} 567 568func (sf sourcegraphFake) getBranches(name string) ([]zoekt.RepositoryBranch, error) { 569 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name)) 570 repo, err := git.PlainOpen(dir) 571 if err != nil { 572 return nil, err 573 } 574 575 cfg, err := repo.Config() 576 if err != nil { 577 return nil, err 578 } 579 580 sec := cfg.Raw.Section("zoekt") 581 branches := sec.Options.GetAll("branch") 582 if len(branches) == 0 { 583 branches = append(branches, "HEAD") 584 } 585 586 rBranches := make([]zoekt.RepositoryBranch, 0, len(branches)) 587 for _, branch := range branches { 588 cmd := exec.Command("git", "rev-parse", branch) 589 cmd.Dir = dir 590 if b, err := cmd.Output(); err != nil { 591 sf.Log.Printf("WARN: Could not get branch %s/%s", name, branch) 592 } else { 593 version := string(bytes.TrimSpace(b)) 594 rBranches = append(rBranches, zoekt.RepositoryBranch{ 595 Name: branch, 596 Version: version, 597 }) 598 } 599 } 600 601 if len(rBranches) == 0 { 602 return nil, fmt.Errorf("WARN: Could not get any branch revisions for repo %s", name) 603 } 604 605 return rBranches, nil 606} 607 608func (sf sourcegraphFake) id(name string) uint32 { 609 // allow overriding the ID. 610 idPath := filepath.Join(sf.RootDir, filepath.FromSlash(name), "SG_ID") 611 if b, _ := os.ReadFile(idPath); len(b) > 0 { 612 id, err := strconv.Atoi(strings.TrimSpace(string(b))) 613 if err == nil { 614 return uint32(id) 615 } 616 } 617 return fakeID(name) 618} 619 620func (sf sourcegraphFake) getCloneURL(name string) string { 621 return filepath.Join(sf.RootDir, filepath.FromSlash(name)) 622} 623 624func (sf sourcegraphFake) ListRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) { 625 var repos []uint32 626 err := sf.visitRepos(func(name string) { 627 repos = append(repos, sf.id(name)) 628 }) 629 return repos, err 630} 631 632func (sf sourcegraphFake) visitRepos(visit func(name string)) error { 633 return filepath.Walk(sf.RootDir, func(path string, fi os.FileInfo, fileErr error) error { 634 if fileErr != nil { 635 sf.Log.Printf("WARN: ignoring error searching %s: %v", path, fileErr) 636 return nil 637 } 638 if !fi.IsDir() { 639 return nil 640 } 641 642 gitdir := filepath.Join(path, ".git") 643 if fi, err := os.Stat(gitdir); err != nil || !fi.IsDir() { 644 return nil 645 } 646 647 subpath, err := filepath.Rel(sf.RootDir, path) 648 if err != nil { 649 // According to WalkFunc docs, path is always filepath.Join(root, 650 // subpath). So Rel should always work. 651 return fmt.Errorf("filepath.Walk returned %s which is not relative to %s: %w", path, sf.RootDir, err) 652 } 653 654 name := filepath.ToSlash(subpath) 655 visit(name) 656 657 return filepath.SkipDir 658 }) 659} 660 661func (s sourcegraphFake) UpdateIndexStatus(repositories []indexStatus) error { 662 // noop 663 return nil 664} 665 666// fakeID returns a deterministic ID based on name. Used for fakes and tests. 667func fakeID(name string) uint32 { 668 // magic at the end is to ensure we get a positive number when casting. 669 return uint32(crc32.ChecksumIEEE([]byte(name))%(1<<31-1) + 1) 670} 671 672type sourcegraphNop struct{} 673 674func (s sourcegraphNop) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 675 return nil, nil 676} 677 678func (s sourcegraphNop) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 679} 680 681func (s sourcegraphNop) UpdateIndexStatus(repositories []indexStatus) error { 682 return nil 683}