fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "bufio" 5 "bytes" 6 "context" 7 "encoding/json" 8 "errors" 9 "fmt" 10 "hash/crc32" 11 "io" 12 "log" 13 "math/rand" 14 "net/http" 15 "net/url" 16 "os" 17 "os/exec" 18 "path" 19 "path/filepath" 20 "strconv" 21 "strings" 22 "time" 23 24 "github.com/go-git/go-git/v5" 25 retryablehttp "github.com/hashicorp/go-retryablehttp" 26 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 27 "golang.org/x/net/trace" 28 "google.golang.org/grpc" 29 30 "github.com/sourcegraph/zoekt" 31) 32 33// SourcegraphListResult is the return value of Sourcegraph.List. It is its 34// own type since internally we batch the calculation of index options. This 35// is exposed via IterateIndexOptions. 36// 37// This type has state and is coupled to the Sourcegraph implementation. 38type SourcegraphListResult struct { 39 // IDs is the set of Sourcegraph repository IDs that this replica needs 40 // to index. 41 IDs []uint32 42 43 // IterateIndexOptions best effort resolves the IndexOptions for RepoIDs. If 44 // any repository fails it internally logs. It uses the "config fingerprint" 45 // to reduce the amount of work done. This means we only resolve options for 46 // repositories which have been mutated since the last Sourcegraph.List 47 // call. 48 // 49 // Note: this has a side-effect of setting a the "config fingerprint". The 50 // config fingerprint means we only calculate index options for repositories 51 // that have changed since the last call to IterateIndexOptions. If you want 52 // to force calculation of index options use 53 // Sourcegraph.ForceIterateIndexOptions. 54 // 55 // Note: This should not be called concurrently with the Sourcegraph client. 56 IterateIndexOptions func(func(IndexOptions)) 57} 58 59// Sourcegraph represents the Sourcegraph service. It informs the indexserver 60// what to index and which options to use. 61type Sourcegraph interface { 62 // List returns a list of repository IDs to index as well as a facility to 63 // fetch the indexing options. 64 // 65 // Note: The return value is not safe to use concurrently with future calls 66 // to List. 67 List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) 68 69 // ForceIterateIndexOptions will best-effort calculate the index options for 70 // all repos. For each repo it will call either onSuccess or onError. This 71 // is the forced version of IterateIndexOptions, so will always calculate 72 // options for each id in repos. 73 ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) 74 75 // GetDocumentRanks returns a map from paths within the given repo to their 76 // rank vectors. Paths are assumed to be ordered by each pairwise component of 77 // the resulting vector, higher ranks coming earlier 78 GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) 79 80 // UpdateIndexStatus sends a request to Sourcegraph to confirm that the 81 // given repositories have been indexed. 82 UpdateIndexStatus(repositories []indexStatus) error 83} 84 85type SourcegraphClientOption func(*sourcegraphClient) 86 87// WithBatchSize controls how many repository configurations we request a time. 88// If BatchSize is 0, we default to requesting 10,000 repositories at once. 89func WithBatchSize(batchSize int) SourcegraphClientOption { 90 return func(c *sourcegraphClient) { 91 c.BatchSize = batchSize 92 } 93} 94 95// WithShouldUseGRPC enables or disables the use of gRPC for communicating with Sourcegraph. 96func WithShouldUseGRPC(useGRPC bool) SourcegraphClientOption { 97 return func(c *sourcegraphClient) { 98 c.useGRPC = useGRPC 99 } 100} 101 102// WithGRPCClient sets the gRPC client to use for communicating with Sourcegraph. 103func WithGRPCClient(client proto.ZoektConfigurationServiceClient) SourcegraphClientOption { 104 return func(c *sourcegraphClient) { 105 c.grpcClient = client 106 } 107} 108 109func newSourcegraphClient(rootURL *url.URL, hostname string, opts ...SourcegraphClientOption) *sourcegraphClient { 110 httpClient := retryablehttp.NewClient() 111 httpClient.Logger = debug 112 113 // Sourcegraph might return an error message in the body if StatusCode==500. The 114 // default behavior of the go-retryablehttp restClient is to drain the body and not 115 // to propagate the error. Hence, we call ErrorPropagatedRetryPolicy instead of 116 // DefaultRetryPolicy and augment the error with the response body if possible. 117 httpClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) { 118 shouldRetry, checkErr := retryablehttp.ErrorPropagatedRetryPolicy(ctx, resp, err) 119 120 if resp != nil && resp.StatusCode == http.StatusInternalServerError { 121 if b, e := io.ReadAll(resp.Body); e == nil { 122 checkErr = fmt.Errorf("%w: body=%q", checkErr, string(b)) 123 } 124 } 125 126 return shouldRetry, checkErr 127 } 128 129 client := &sourcegraphClient{ 130 Root: rootURL, 131 restClient: httpClient, 132 Hostname: hostname, 133 BatchSize: 0, 134 grpcClient: noopGRPCClient{}, 135 useGRPC: false, // disable gRPC by default 136 } 137 138 for _, opt := range opts { 139 opt(client) 140 } 141 142 return client 143 144} 145 146// sourcegraphClient contains methods which interact with the sourcegraph API. 147type sourcegraphClient struct { 148 // Root is the base URL for the Sourcegraph instance to index. Normally 149 // http://sourcegraph-frontend-internal or http://localhost:3090. 150 Root *url.URL 151 152 // Hostname is the name we advertise to Sourcegraph when asking for the 153 // list of repositories to index. 154 Hostname string 155 156 // BatchSize is how many repository configurations we request at once. If 157 // zero a value of 10000 is used. 158 BatchSize int 159 160 // restClient is used to make requests to the Sourcegraph instance. Prefer to 161 // use .doRequest() to ensure the appropriate headers are set. 162 restClient *retryablehttp.Client 163 164 // grpcClient is used to make requests to the Sourcegraph instance if gRPC is enabled. 165 grpcClient proto.ZoektConfigurationServiceClient 166 167 // configFingerprint is the last config fingerprint returned from 168 // Sourcegraph. It can be used for future calls to the configuration 169 // endpoint. 170 // 171 // configFingerprint is mutually exclusive with configFingerprintProto - this field 172 // will only be used if gRPC is disabled. 173 configFingerprint string 174 175 // configFingerprintProto is the last config fingerprint (as GRPC) returned from 176 // Sourcegraph. It can be used for future calls to the configuration 177 // endpoint. 178 // 179 // configFingerprintProto is mutually exclusive with configFingerprint - this field 180 // will only be used if gRPC is enabled. 181 configFingerprintProto *proto.Fingerprint 182 183 // configFingerprintReset tracks when we should zero out the 184 // configFingerprint. We want to periodically do this just in case our 185 // configFingerprint logic is faulty. When it is cleared out, we fallback to 186 // calculating everything. 187 configFingerprintReset time.Time 188 189 // useGRPC indicates whether we should use a gRPC client to communicate with Sourcegraph. 190 useGRPC bool 191} 192 193// GetDocumentRanks asks Sourcegraph for a mapping of file paths to rank 194// vectors. 195func (s *sourcegraphClient) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) { 196 if s.useGRPC { 197 return s.getDocumentRanksGRPC(ctx, repoName) 198 } 199 200 return s.getDocumentRanksREST(ctx, repoName) 201} 202 203func (s *sourcegraphClient) getDocumentRanksGRPC(ctx context.Context, repoName string) (RepoPathRanks, error) { 204 resp, err := s.grpcClient.DocumentRanks(ctx, &proto.DocumentRanksRequest{Repository: repoName}) 205 if err != nil { 206 return RepoPathRanks{}, err 207 } 208 209 var out RepoPathRanks 210 out.FromProto(resp) 211 212 return out, nil 213} 214 215func (s *sourcegraphClient) getDocumentRanksREST(ctx context.Context, repoName string) (RepoPathRanks, error) { 216 u := s.Root.ResolveReference(&url.URL{ 217 Path: "/.internal/ranks/" + strings.Trim(repoName, "/") + "/documents", 218 }) 219 220 b, err := s.get(ctx, u) 221 if err != nil { 222 return RepoPathRanks{}, err 223 } 224 225 ranks := RepoPathRanks{} 226 err = json.Unmarshal(b, &ranks) 227 if err != nil { 228 return RepoPathRanks{}, err 229 } 230 231 return ranks, nil 232} 233 234func (s *sourcegraphClient) get(ctx context.Context, u *url.URL) ([]byte, error) { 235 req, err := retryablehttp.NewRequestWithContext(ctx, "GET", u.String(), nil) 236 if err != nil { 237 return nil, err 238 } 239 240 resp, err := s.doRequest(req) 241 if err != nil { 242 return nil, err 243 } 244 defer resp.Body.Close() 245 246 if resp.StatusCode != http.StatusOK { 247 b, err := io.ReadAll(io.LimitReader(resp.Body, 1024)) 248 _ = resp.Body.Close() 249 if err != nil { 250 return nil, err 251 } 252 return nil, &url.Error{ 253 Op: "Get", 254 URL: u.String(), 255 Err: fmt.Errorf("%s: %s", resp.Status, string(b)), 256 } 257 } 258 259 b, err := io.ReadAll(resp.Body) 260 if err != nil { 261 return nil, err 262 } 263 264 return b, nil 265} 266 267func (s *sourcegraphClient) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 268 repos, err := s.listRepoIDs(ctx, indexed) 269 if err != nil { 270 return nil, fmt.Errorf("listRepoIDs: %w", err) 271 } 272 273 batchSize := s.BatchSize 274 if batchSize == 0 { 275 batchSize = 10_000 276 } 277 278 // Check if we should recalculate everything. 279 if time.Now().After(s.configFingerprintReset) { 280 // for every 500 repos we wait a minute. 2021-12-15 on sourcegraph.com 281 // this works out to every 100 minutes. 282 next := time.Duration(len(indexed)) * time.Minute / 500 283 if min := 5 * time.Minute; next < min { 284 next = min 285 } 286 next += time.Duration(rand.Int63n(int64(next) / 4)) // jitter 287 s.configFingerprintReset = time.Now().Add(next) 288 289 s.configFingerprintProto = nil 290 s.configFingerprint = "" 291 } 292 293 // getIndexOptionsFunc is a function that can be used to get the index 294 // options for a set of repos (while properly handling any configuration fingerprint 295 // changes). 296 // 297 // In general, this function provides a consistent fingerprint for each batch call, 298 // and updates the server state with the new fingerprint. If any of the batch calls 299 // fail, the old fingerprint is restored. 300 type getIndexOptionsFunc func(repos ...uint32) ([]indexOptionsItem, error) 301 302 // default to REST 303 mkGetIndexOptionsFunc := func(tr trace.Trace) getIndexOptionsFunc { 304 startingFingerPrint := s.configFingerprint 305 tr.LazyPrintf("fingerprint: %s", startingFingerPrint) 306 307 first := true 308 return func(repos ...uint32) ([]indexOptionsItem, error) { 309 options, nextFingerPrint, err := s.getIndexOptionsREST(startingFingerPrint, repos...) 310 if err != nil { 311 first = false 312 s.configFingerprint = startingFingerPrint 313 314 return nil, err 315 } 316 317 if first { 318 first = false 319 s.configFingerprint = nextFingerPrint 320 321 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint) 322 } 323 324 return options, nil 325 } 326 } 327 328 // If we enabled GRPC, use our gRPC client instead. 329 if s.useGRPC { 330 mkGetIndexOptionsFunc = func(tr trace.Trace) getIndexOptionsFunc { 331 startingFingerPrint := s.configFingerprintProto 332 tr.LazyPrintf("fingerprint: %s", startingFingerPrint.String()) 333 334 first := true 335 return func(repos ...uint32) ([]indexOptionsItem, error) { 336 options, nextFingerPrint, err := s.getIndexOptionsGRPC(ctx, startingFingerPrint, repos) 337 if err != nil { 338 first = false 339 s.configFingerprintProto = startingFingerPrint 340 341 return nil, err 342 } 343 344 if first { 345 first = false 346 s.configFingerprintProto = nextFingerPrint 347 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint.String()) 348 } 349 350 return options, nil 351 } 352 } 353 } 354 355 iterate := func(f func(IndexOptions)) { 356 start := time.Now() 357 tr := trace.New("getIndexOptions", "") 358 tr.LazyPrintf("getting index options for %d repos", len(repos)) 359 360 defer func() { 361 metricResolveRevisionsDuration.Observe(time.Since(start).Seconds()) 362 tr.Finish() 363 }() 364 365 getIndexOptions := mkGetIndexOptionsFunc(tr) 366 367 // We ask the frontend to get index options in batches. 368 for repos := range batched(repos, batchSize) { 369 start := time.Now() 370 options, err := getIndexOptions(repos...) 371 duration := time.Since(start) 372 373 if err != nil { 374 metricResolveRevisionDuration.WithLabelValues("false").Observe(duration.Seconds()) 375 tr.LazyPrintf("failed fetching options batch: %v", err) 376 tr.SetError() 377 378 continue 379 } 380 381 metricResolveRevisionDuration.WithLabelValues("true").Observe(duration.Seconds()) 382 383 for _, o := range options { 384 metricGetIndexOptions.Inc() 385 386 if o.Error != "" { 387 metricGetIndexOptionsError.Inc() 388 tr.LazyPrintf("failed fetching options for %v: %v", o.Name, o.Error) 389 tr.SetError() 390 391 continue 392 } 393 f(o.IndexOptions) 394 } 395 } 396 } 397 398 return &SourcegraphListResult{ 399 IDs: repos, 400 IterateIndexOptions: iterate, 401 }, nil 402} 403 404func (s *sourcegraphClient) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 405 batchSize := s.BatchSize 406 if batchSize == 0 { 407 batchSize = 10_000 408 } 409 410 getIndexOptions := func(repos ...uint32) ([]indexOptionsItem, error) { 411 opts, _, err := s.getIndexOptionsREST("", repos...) 412 return opts, err 413 } 414 415 if s.useGRPC { 416 getIndexOptions = func(repos ...uint32) ([]indexOptionsItem, error) { 417 opts, _, err := s.getIndexOptionsGRPC(context.Background(), nil, repos) 418 return opts, err 419 } 420 } 421 422 for repos := range batched(repos, batchSize) { 423 opts, err := getIndexOptions(repos...) 424 if err != nil { 425 for _, id := range repos { 426 onError(id, err) 427 } 428 continue 429 } 430 for _, o := range opts { 431 if o.RepoID > 0 && o.Error != "" { 432 onError(o.RepoID, errors.New(o.Error)) 433 } 434 if o.Error == "" { 435 onSuccess(o.IndexOptions) 436 } 437 } 438 } 439} 440 441// indexOptionsItem wraps IndexOptions to also include an error returned by 442// the API. 443type indexOptionsItem struct { 444 IndexOptions 445 Error string 446} 447 448func (o *indexOptionsItem) FromProto(x *proto.ZoektIndexOptions) { 449 branches := make([]zoekt.RepositoryBranch, 0, len(x.Branches)) 450 for _, b := range x.GetBranches() { 451 branches = append(branches, zoekt.RepositoryBranch{ 452 Name: b.GetName(), 453 Version: b.GetVersion(), 454 }) 455 } 456 457 item := indexOptionsItem{} 458 languageMap := make(map[string]uint8) 459 460 for _, lang := range x.GetLanguageMap() { 461 languageMap[lang.GetLanguage()] = uint8(lang.GetCtags().Number()) 462 } 463 464 item.IndexOptions = IndexOptions{ 465 RepoID: uint32(x.GetRepoId()), 466 LargeFiles: x.GetLargeFiles(), 467 Symbols: x.GetSymbols(), 468 Branches: branches, 469 Name: x.GetName(), 470 471 Priority: x.GetPriority(), 472 473 DocumentRanksVersion: x.GetDocumentRanksVersion(), 474 475 Public: x.GetPublic(), 476 Fork: x.GetFork(), 477 Archived: x.GetArchived(), 478 479 LanguageMap: languageMap, 480 } 481 482 item.Error = x.GetError() 483 484 *o = item 485} 486 487func (o *indexOptionsItem) ToProto() *proto.ZoektIndexOptions { 488 branches := make([]*proto.ZoektRepositoryBranch, 0, len(o.Branches)) 489 for _, b := range o.Branches { 490 branches = append(branches, &proto.ZoektRepositoryBranch{ 491 Name: b.Name, 492 Version: b.Version, 493 }) 494 } 495 496 languageMap := make([]*proto.LanguageMapping, 0, len(o.LanguageMap)) 497 498 for lang, parser := range o.LanguageMap { 499 languageMap = append(languageMap, &proto.LanguageMapping{ 500 Language: lang, 501 Ctags: proto.CTagsParserType(parser), 502 }) 503 } 504 505 return &proto.ZoektIndexOptions{ 506 RepoId: int32(o.RepoID), 507 LargeFiles: o.LargeFiles, 508 Symbols: o.Symbols, 509 Branches: branches, 510 Name: o.Name, 511 512 Priority: o.Priority, 513 514 DocumentRanksVersion: o.DocumentRanksVersion, 515 516 Public: o.Public, 517 Fork: o.Fork, 518 Archived: o.Archived, 519 520 Error: o.Error, 521 522 LanguageMap: languageMap, 523 } 524} 525 526func (s *sourcegraphClient) getIndexOptionsGRPC(ctx context.Context, fingerprint *proto.Fingerprint, repos []uint32) ([]indexOptionsItem, *proto.Fingerprint, error) { 527 repoIDs := make([]int32, 0, len(repos)) 528 for _, id := range repos { 529 repoIDs = append(repoIDs, int32(id)) 530 } 531 532 req := proto.SearchConfigurationRequest{ 533 RepoIds: repoIDs, 534 Fingerprint: fingerprint, 535 } 536 537 response, err := s.grpcClient.SearchConfiguration(ctx, &req) 538 if err != nil { 539 return nil, nil, err 540 } 541 542 protoItems := response.GetUpdatedOptions() 543 items := make([]indexOptionsItem, 0, len(protoItems)) 544 for _, x := range protoItems { 545 var item indexOptionsItem 546 item.FromProto(x) 547 item.IndexOptions.CloneURL = s.getCloneURL(item.Name) 548 549 items = append(items, item) 550 } 551 552 return items, response.GetFingerprint(), nil 553} 554 555const fingerprintHeader = "X-Sourcegraph-Config-Fingerprint" 556 557func (s *sourcegraphClient) getIndexOptionsREST(fingerprint string, repos ...uint32) ([]indexOptionsItem, string, error) { 558 u := s.Root.ResolveReference(&url.URL{ 559 Path: "/.internal/search/configuration", 560 }) 561 562 repoIDs := make([]string, len(repos)) 563 for i, id := range repos { 564 repoIDs[i] = strconv.Itoa(int(id)) 565 } 566 data := url.Values{"repoID": repoIDs} 567 req, err := retryablehttp.NewRequest("POST", u.String(), []byte(data.Encode())) 568 if err != nil { 569 return nil, "", err 570 } 571 req.Header.Set("Content-Type", "application/x-www-form-urlencoded") 572 if fingerprint != "" { 573 req.Header.Set(fingerprintHeader, fingerprint) 574 } 575 576 resp, err := s.doRequest(req) 577 if err != nil { 578 return nil, "", err 579 } 580 defer resp.Body.Close() 581 582 if resp.StatusCode != http.StatusOK { 583 b, err := io.ReadAll(io.LimitReader(resp.Body, 1024)) 584 _ = resp.Body.Close() 585 if err != nil { 586 return nil, "", err 587 } 588 return nil, "", &url.Error{ 589 Op: "Get", 590 URL: u.String(), 591 Err: fmt.Errorf("%s: %s", resp.Status, string(b)), 592 } 593 } 594 595 dec := json.NewDecoder(resp.Body) 596 var opts []indexOptionsItem 597 for { 598 var opt indexOptionsItem 599 err := dec.Decode(&opt) 600 if err == io.EOF { 601 break 602 } 603 if err != nil { 604 return nil, "", fmt.Errorf("error decoding body: %w", err) 605 } 606 opt.CloneURL = s.getCloneURL(opt.Name) 607 opts = append(opts, opt) 608 } 609 610 return opts, resp.Header.Get(fingerprintHeader), nil 611} 612 613func (s *sourcegraphClient) getCloneURL(name string) string { 614 return s.Root.ResolveReference(&url.URL{Path: path.Join("/.internal/git", name)}).String() 615} 616 617func (s *sourcegraphClient) listRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) { 618 if s.useGRPC { 619 return s.listRepoIDsGRPC(ctx, indexed) 620 } 621 622 return s.listRepoIDsREST(ctx, indexed) 623} 624 625func (s *sourcegraphClient) listRepoIDsGRPC(ctx context.Context, indexed []uint32) ([]uint32, error) { 626 var request proto.ListRequest 627 request.Hostname = s.Hostname 628 request.IndexedIds = make([]int32, 0, len(indexed)) 629 for _, id := range indexed { 630 request.IndexedIds = append(request.IndexedIds, int32(id)) 631 } 632 633 response, err := s.grpcClient.List(ctx, &request) 634 if err != nil { 635 return nil, err 636 } 637 638 repoIDs := make([]uint32, 0, len(response.RepoIds)) 639 for _, id := range response.RepoIds { 640 repoIDs = append(repoIDs, uint32(id)) 641 } 642 643 return repoIDs, nil 644} 645 646func (s *sourcegraphClient) listRepoIDsREST(_ context.Context, indexed []uint32) ([]uint32, error) { 647 body, err := json.Marshal(&struct { 648 Hostname string 649 IndexedIDs []uint32 650 }{ 651 Hostname: s.Hostname, 652 IndexedIDs: indexed, 653 }) 654 if err != nil { 655 return nil, err 656 } 657 658 u := s.Root.ResolveReference(&url.URL{Path: "/.internal/repos/index"}) 659 req, err := retryablehttp.NewRequest(http.MethodPost, u.String(), bytes.NewReader(body)) 660 if err != nil { 661 return nil, err 662 } 663 req.Header.Set("Content-Type", "application/json; charset=utf-8") 664 665 resp, err := s.doRequest(req) 666 if err != nil { 667 return nil, err 668 } 669 defer resp.Body.Close() 670 671 if resp.StatusCode != http.StatusOK { 672 return nil, fmt.Errorf("failed to list repositories: status %s", resp.Status) 673 } 674 675 var data struct { 676 RepoIDs []uint32 677 } 678 err = json.NewDecoder(resp.Body).Decode(&data) 679 if err != nil { 680 return nil, err 681 } 682 683 return data.RepoIDs, nil 684} 685 686type indexStatus struct { 687 RepoID uint32 688 Branches []zoekt.RepositoryBranch 689 IndexTimeUnix int64 690} 691 692type updateIndexStatusRequest struct { 693 Repositories []indexStatus 694} 695 696func (u *updateIndexStatusRequest) ToProto() *proto.UpdateIndexStatusRequest { 697 repositories := make([]*proto.UpdateIndexStatusRequest_Repository, 0, len(u.Repositories)) 698 699 for _, repo := range u.Repositories { 700 branches := make([]*proto.ZoektRepositoryBranch, 0, len(repo.Branches)) 701 702 for _, branch := range repo.Branches { 703 branches = append(branches, &proto.ZoektRepositoryBranch{ 704 Name: branch.Name, 705 Version: branch.Version, 706 }) 707 } 708 709 repositories = append(repositories, &proto.UpdateIndexStatusRequest_Repository{ 710 RepoId: repo.RepoID, 711 Branches: branches, 712 IndexTimeUnix: repo.IndexTimeUnix, 713 }) 714 } 715 716 return &proto.UpdateIndexStatusRequest{ 717 Repositories: repositories, 718 } 719} 720 721func (u *updateIndexStatusRequest) FromProto(x *proto.UpdateIndexStatusRequest) { 722 protoRepositories := x.GetRepositories() 723 repositories := make([]indexStatus, 0, len(protoRepositories)) 724 725 for _, repo := range x.GetRepositories() { 726 protoBranches := repo.GetBranches() 727 branches := make([]zoekt.RepositoryBranch, 0, len(protoBranches)) 728 729 for _, branch := range repo.GetBranches() { 730 branches = append(branches, zoekt.RepositoryBranch{ 731 Name: branch.GetName(), 732 Version: branch.GetVersion(), 733 }) 734 } 735 736 repositories = append(repositories, indexStatus{ 737 RepoID: repo.GetRepoId(), 738 Branches: branches, 739 IndexTimeUnix: repo.GetIndexTimeUnix(), 740 }) 741 } 742 743 *u = updateIndexStatusRequest{ 744 Repositories: repositories, 745 } 746} 747 748// UpdateIndexStatus sends a request to Sourcegraph to confirm that the given 749// repositories have been indexed. 750func (s *sourcegraphClient) UpdateIndexStatus(repositories []indexStatus) error { 751 r := updateIndexStatusRequest{Repositories: repositories} 752 753 if s.useGRPC { 754 return s.updateIndexStatusGRPC(r) 755 } 756 757 return s.updateIndexStatusREST(r) 758} 759 760func (s *sourcegraphClient) updateIndexStatusGRPC(r updateIndexStatusRequest) error { 761 request := r.ToProto() 762 _, err := s.grpcClient.UpdateIndexStatus(context.Background(), request) 763 764 if err != nil { 765 return fmt.Errorf("failed to update index status: %w", err) 766 } 767 768 return nil 769} 770 771func (s *sourcegraphClient) updateIndexStatusREST(r updateIndexStatusRequest) error { 772 payload, err := json.Marshal(r) 773 if err != nil { 774 return err 775 } 776 777 u := s.Root.ResolveReference(&url.URL{Path: "/.internal/search/index-status"}) 778 req, err := retryablehttp.NewRequest(http.MethodPost, u.String(), bytes.NewReader(payload)) 779 if err != nil { 780 return err 781 } 782 req.Header.Set("Content-Type", "application/json; charset=utf-8") 783 784 resp, err := s.doRequest(req) 785 if err != nil { 786 return err 787 } 788 defer resp.Body.Close() 789 790 if resp.StatusCode != http.StatusOK { 791 return fmt.Errorf("failed to update index status: status %s", resp.Status) 792 } 793 794 return nil 795} 796 797// doRequest executes the provided request after adding the appropriate headers 798// for interacting with a Sourcegraph instance. 799func (s *sourcegraphClient) doRequest(req *retryablehttp.Request) (*http.Response, error) { 800 // Make all requests as an internal user. 801 // 802 // Should match github.com/sourcegraph/sourcegraph/internal/actor.headerKeyActorUID 803 // and github.com/sourcegraph/sourcegraph/internal/actor.headerValueInternalActor 804 req.Header.Set("X-Sourcegraph-Actor-UID", "internal") 805 return s.restClient.Do(req) 806} 807 808type sourcegraphFake struct { 809 RootDir string 810 Log *log.Logger 811} 812 813// GetDocumentRanks expects a file where each line has the following format: 814// path<tab>rank... where rank is a float64. 815func (sf sourcegraphFake) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) { 816 dir := filepath.Join(sf.RootDir, filepath.FromSlash(repoName)) 817 818 fd, err := os.Open(filepath.Join(dir, "SG_DOCUMENT_RANKS")) 819 if err != nil { 820 return RepoPathRanks{}, err 821 } 822 823 ranks := RepoPathRanks{} 824 825 sum := 0.0 826 count := 0 827 scanner := bufio.NewScanner(fd) 828 for scanner.Scan() { 829 s := scanner.Text() 830 pathRanks := strings.Split(s, "\t") 831 if rank, err := strconv.ParseFloat(pathRanks[1], 64); err == nil { 832 ranks.Paths[pathRanks[0]] = rank 833 sum += rank 834 count++ 835 } 836 } 837 838 if err := scanner.Err(); err != nil { 839 return RepoPathRanks{}, err 840 } 841 842 ranks.MeanRank = sum / float64(count) 843 return ranks, nil 844} 845 846func floats64(s string) []float64 { 847 parts := strings.Split(s, ",") 848 849 var r []float64 850 for _, rank := range parts { 851 f, err := strconv.ParseFloat(rank, 64) 852 if err != nil { 853 return nil 854 } 855 r = append(r, f) 856 } 857 858 return r 859} 860 861func (sf sourcegraphFake) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 862 repos, err := sf.ListRepoIDs(ctx, indexed) 863 if err != nil { 864 return nil, err 865 } 866 867 iterate := func(f func(IndexOptions)) { 868 opts, err := sf.GetIndexOptions(repos...) 869 if err != nil { 870 sf.Log.Printf("WARN: ignoring GetIndexOptions error: %v", err) 871 } 872 for _, opt := range opts { 873 if opt.Error != "" { 874 sf.Log.Printf("WARN: ignoring GetIndexOptions error for %s: %v", opt.Name, opt.Error) 875 continue 876 } 877 f(opt.IndexOptions) 878 } 879 } 880 881 return &SourcegraphListResult{ 882 IDs: repos, 883 IterateIndexOptions: iterate, 884 }, nil 885} 886 887func (sf sourcegraphFake) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 888 opts, err := sf.GetIndexOptions(repos...) 889 if err != nil { 890 for _, id := range repos { 891 onError(id, err) 892 } 893 return 894 } 895 for _, o := range opts { 896 if o.RepoID > 0 && o.Error != "" { 897 onError(o.RepoID, errors.New(o.Error)) 898 } 899 if o.Error == "" { 900 onSuccess(o.IndexOptions) 901 } 902 } 903} 904 905func (sf sourcegraphFake) GetIndexOptions(repos ...uint32) ([]indexOptionsItem, error) { 906 reposIdx := map[uint32]int{} 907 for i, id := range repos { 908 reposIdx[id] = i 909 } 910 911 items := make([]indexOptionsItem, len(repos)) 912 err := sf.visitRepos(func(name string) { 913 idx, ok := reposIdx[sf.id(name)] 914 if !ok { 915 return 916 } 917 opts, err := sf.getIndexOptions(name) 918 if err != nil { 919 items[idx] = indexOptionsItem{Error: err.Error()} 920 } else { 921 items[idx] = indexOptionsItem{IndexOptions: opts} 922 } 923 }) 924 925 if err != nil { 926 return nil, err 927 } 928 929 for i := range items { 930 if items[i].Error == "" && items[i].RepoID == 0 { 931 items[i].Error = "not found" 932 } 933 } 934 935 return items, nil 936} 937 938func (sf sourcegraphFake) getIndexOptions(name string) (IndexOptions, error) { 939 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name)) 940 exists := func(p string) bool { 941 _, err := os.Stat(filepath.Join(dir, p)) 942 return err == nil 943 } 944 float := func(p string) float64 { 945 b, _ := os.ReadFile(filepath.Join(dir, p)) 946 f, _ := strconv.ParseFloat(string(bytes.TrimSpace(b)), 64) 947 return f 948 } 949 950 opts := IndexOptions{ 951 RepoID: sf.id(name), 952 Name: name, 953 CloneURL: sf.getCloneURL(name), 954 Symbols: true, 955 956 Public: !exists("SG_PRIVATE"), 957 Fork: exists("SG_FORK"), 958 Archived: exists("SG_ARCHIVED"), 959 960 Priority: float("SG_PRIORITY"), 961 } 962 963 if stat, err := os.Stat(filepath.Join(dir, "SG_DOCUMENT_RANKS")); err == nil { 964 opts.DocumentRanksVersion = stat.ModTime().String() 965 } 966 967 branches, err := sf.getBranches(name) 968 if err != nil { 969 return opts, err 970 } 971 opts.Branches = branches 972 973 return opts, nil 974} 975 976func (sf sourcegraphFake) getBranches(name string) ([]zoekt.RepositoryBranch, error) { 977 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name)) 978 repo, err := git.PlainOpen(dir) 979 if err != nil { 980 return nil, err 981 } 982 983 cfg, err := repo.Config() 984 if err != nil { 985 return nil, err 986 } 987 988 sec := cfg.Raw.Section("zoekt") 989 branches := sec.Options.GetAll("branch") 990 if len(branches) == 0 { 991 branches = append(branches, "HEAD") 992 } 993 994 rBranches := make([]zoekt.RepositoryBranch, 0, len(branches)) 995 for _, branch := range branches { 996 cmd := exec.Command("git", "rev-parse", branch) 997 cmd.Dir = dir 998 if b, err := cmd.Output(); err != nil { 999 sf.Log.Printf("WARN: Could not get branch %s/%s", name, branch) 1000 } else { 1001 version := string(bytes.TrimSpace(b)) 1002 rBranches = append(rBranches, zoekt.RepositoryBranch{ 1003 Name: branch, 1004 Version: version, 1005 }) 1006 } 1007 } 1008 1009 if len(rBranches) == 0 { 1010 return nil, fmt.Errorf("WARN: Could not get any branch revisions for repo %s", name) 1011 } 1012 1013 return rBranches, nil 1014} 1015 1016func (sf sourcegraphFake) id(name string) uint32 { 1017 // allow overriding the ID. 1018 idPath := filepath.Join(sf.RootDir, filepath.FromSlash(name), "SG_ID") 1019 if b, _ := os.ReadFile(idPath); len(b) > 0 { 1020 id, err := strconv.Atoi(strings.TrimSpace(string(b))) 1021 if err == nil { 1022 return uint32(id) 1023 } 1024 } 1025 return fakeID(name) 1026} 1027 1028func (sf sourcegraphFake) getCloneURL(name string) string { 1029 return filepath.Join(sf.RootDir, filepath.FromSlash(name)) 1030} 1031 1032func (sf sourcegraphFake) ListRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) { 1033 var repos []uint32 1034 err := sf.visitRepos(func(name string) { 1035 repos = append(repos, sf.id(name)) 1036 }) 1037 return repos, err 1038} 1039 1040func (sf sourcegraphFake) visitRepos(visit func(name string)) error { 1041 return filepath.Walk(sf.RootDir, func(path string, fi os.FileInfo, fileErr error) error { 1042 if fileErr != nil { 1043 sf.Log.Printf("WARN: ignoring error searching %s: %v", path, fileErr) 1044 return nil 1045 } 1046 if !fi.IsDir() { 1047 return nil 1048 } 1049 1050 gitdir := filepath.Join(path, ".git") 1051 if fi, err := os.Stat(gitdir); err != nil || !fi.IsDir() { 1052 return nil 1053 } 1054 1055 subpath, err := filepath.Rel(sf.RootDir, path) 1056 if err != nil { 1057 // According to WalkFunc docs, path is always filepath.Join(root, 1058 // subpath). So Rel should always work. 1059 return fmt.Errorf("filepath.Walk returned %s which is not relative to %s: %w", path, sf.RootDir, err) 1060 } 1061 1062 name := filepath.ToSlash(subpath) 1063 visit(name) 1064 1065 return filepath.SkipDir 1066 }) 1067} 1068 1069func (s sourcegraphFake) UpdateIndexStatus(repositories []indexStatus) error { 1070 // noop 1071 return nil 1072} 1073 1074// fakeID returns a deterministic ID based on name. Used for fakes and tests. 1075func fakeID(name string) uint32 { 1076 // magic at the end is to ensure we get a positive number when casting. 1077 return uint32(crc32.ChecksumIEEE([]byte(name))%(1<<31-1) + 1) 1078} 1079 1080type sourcegraphNop struct{} 1081 1082func (s sourcegraphNop) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 1083 return nil, nil 1084} 1085 1086func (s sourcegraphNop) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 1087 return 1088} 1089 1090func (s sourcegraphNop) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) { 1091 return RepoPathRanks{}, nil 1092} 1093 1094func (s sourcegraphNop) UpdateIndexStatus(repositories []indexStatus) error { 1095 return nil 1096} 1097 1098type RepoPathRanks struct { 1099 MeanRank float64 `json:"mean_reference_count"` 1100 Paths map[string]float64 `json:"paths"` 1101} 1102 1103func (r *RepoPathRanks) FromProto(x *proto.DocumentRanksResponse) { 1104 protoPaths := x.GetPaths() 1105 ranks := make(map[string]float64, len(protoPaths)) 1106 for filePath, rank := range protoPaths { 1107 ranks[filePath] = rank 1108 } 1109 1110 *r = RepoPathRanks{ 1111 MeanRank: x.GetMeanRank(), 1112 Paths: ranks, 1113 } 1114} 1115 1116func (r *RepoPathRanks) ToProto() *proto.DocumentRanksResponse { 1117 paths := make(map[string]float64, len(r.Paths)) 1118 for filePath, rank := range r.Paths { 1119 paths[filePath] = rank 1120 } 1121 1122 return &proto.DocumentRanksResponse{ 1123 MeanRank: r.MeanRank, 1124 Paths: paths, 1125 } 1126} 1127 1128type noopGRPCClient struct{} 1129 1130func (n noopGRPCClient) SearchConfiguration(ctx context.Context, in *proto.SearchConfigurationRequest, opts ...grpc.CallOption) (*proto.SearchConfigurationResponse, error) { 1131 return nil, fmt.Errorf("grpc client not enabled") 1132} 1133 1134func (n noopGRPCClient) List(ctx context.Context, in *proto.ListRequest, opts ...grpc.CallOption) (*proto.ListResponse, error) { 1135 return nil, fmt.Errorf("grpc client not enabled") 1136} 1137 1138func (n noopGRPCClient) DocumentRanks(ctx context.Context, in *proto.DocumentRanksRequest, opts ...grpc.CallOption) (*proto.DocumentRanksResponse, error) { 1139 return nil, fmt.Errorf("grpc client not enabled") 1140} 1141 1142func (n noopGRPCClient) UpdateIndexStatus(ctx context.Context, in *proto.UpdateIndexStatusRequest, opts ...grpc.CallOption) (*proto.UpdateIndexStatusResponse, error) { 1143 return nil, fmt.Errorf("grpc client not enabled") 1144} 1145 1146var _ proto.ZoektConfigurationServiceClient = noopGRPCClient{}