fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "bufio" 5 "bytes" 6 "context" 7 "encoding/json" 8 "errors" 9 "fmt" 10 "hash/crc32" 11 "io" 12 "log" 13 "math/rand" 14 "net/http" 15 "net/url" 16 "os" 17 "os/exec" 18 "path" 19 "path/filepath" 20 "strconv" 21 "strings" 22 "time" 23 24 "github.com/go-git/go-git/v5" 25 retryablehttp "github.com/hashicorp/go-retryablehttp" 26 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 27 "github.com/sourcegraph/zoekt/ctags" 28 "golang.org/x/net/trace" 29 "google.golang.org/grpc" 30 31 "github.com/sourcegraph/zoekt" 32) 33 34// SourcegraphListResult is the return value of Sourcegraph.List. It is its 35// own type since internally we batch the calculation of index options. This 36// is exposed via IterateIndexOptions. 37// 38// This type has state and is coupled to the Sourcegraph implementation. 39type SourcegraphListResult struct { 40 // IDs is the set of Sourcegraph repository IDs that this replica needs 41 // to index. 42 IDs []uint32 43 44 // IterateIndexOptions best effort resolves the IndexOptions for RepoIDs. If 45 // any repository fails it internally logs. It uses the "config fingerprint" 46 // to reduce the amount of work done. This means we only resolve options for 47 // repositories which have been mutated since the last Sourcegraph.List 48 // call. 49 // 50 // Note: this has a side-effect of setting a the "config fingerprint". The 51 // config fingerprint means we only calculate index options for repositories 52 // that have changed since the last call to IterateIndexOptions. If you want 53 // to force calculation of index options use 54 // Sourcegraph.ForceIterateIndexOptions. 55 // 56 // Note: This should not be called concurrently with the Sourcegraph client. 57 IterateIndexOptions func(func(IndexOptions)) 58} 59 60// Sourcegraph represents the Sourcegraph service. It informs the indexserver 61// what to index and which options to use. 62type Sourcegraph interface { 63 // List returns a list of repository IDs to index as well as a facility to 64 // fetch the indexing options. 65 // 66 // Note: The return value is not safe to use concurrently with future calls 67 // to List. 68 List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) 69 70 // ForceIterateIndexOptions will best-effort calculate the index options for 71 // all repos. For each repo it will call either onSuccess or onError. This 72 // is the forced version of IterateIndexOptions, so will always calculate 73 // options for each id in repos. 74 ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) 75 76 // GetDocumentRanks returns a map from paths within the given repo to their 77 // rank vectors. Paths are assumed to be ordered by each pairwise component of 78 // the resulting vector, higher ranks coming earlier 79 GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) 80 81 // UpdateIndexStatus sends a request to Sourcegraph to confirm that the 82 // given repositories have been indexed. 83 UpdateIndexStatus(repositories []indexStatus) error 84} 85 86type SourcegraphClientOption func(*sourcegraphClient) 87 88// WithBatchSize controls how many repository configurations we request a time. 89// If BatchSize is 0, we default to requesting 10,000 repositories at once. 90func WithBatchSize(batchSize int) SourcegraphClientOption { 91 return func(c *sourcegraphClient) { 92 c.BatchSize = batchSize 93 } 94} 95 96// WithShouldUseGRPC enables or disables the use of gRPC for communicating with Sourcegraph. 97func WithShouldUseGRPC(useGRPC bool) SourcegraphClientOption { 98 return func(c *sourcegraphClient) { 99 c.useGRPC = useGRPC 100 } 101} 102 103// WithGRPCClient sets the gRPC client to use for communicating with Sourcegraph. 104func WithGRPCClient(client proto.ZoektConfigurationServiceClient) SourcegraphClientOption { 105 return func(c *sourcegraphClient) { 106 c.grpcClient = client 107 } 108} 109 110func newSourcegraphClient(rootURL *url.URL, hostname string, opts ...SourcegraphClientOption) *sourcegraphClient { 111 httpClient := retryablehttp.NewClient() 112 httpClient.Logger = debug 113 114 // Sourcegraph might return an error message in the body if StatusCode==500. The 115 // default behavior of the go-retryablehttp restClient is to drain the body and not 116 // to propagate the error. Hence, we call ErrorPropagatedRetryPolicy instead of 117 // DefaultRetryPolicy and augment the error with the response body if possible. 118 httpClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) { 119 shouldRetry, checkErr := retryablehttp.ErrorPropagatedRetryPolicy(ctx, resp, err) 120 121 if resp != nil && resp.StatusCode == http.StatusInternalServerError { 122 if b, e := io.ReadAll(resp.Body); e == nil { 123 checkErr = fmt.Errorf("%w: body=%q", checkErr, string(b)) 124 } 125 } 126 127 return shouldRetry, checkErr 128 } 129 130 client := &sourcegraphClient{ 131 Root: rootURL, 132 restClient: httpClient, 133 Hostname: hostname, 134 BatchSize: 0, 135 grpcClient: noopGRPCClient{}, 136 useGRPC: false, // disable gRPC by default 137 } 138 139 for _, opt := range opts { 140 opt(client) 141 } 142 143 return client 144 145} 146 147// sourcegraphClient contains methods which interact with the sourcegraph API. 148type sourcegraphClient struct { 149 // Root is the base URL for the Sourcegraph instance to index. Normally 150 // http://sourcegraph-frontend-internal or http://localhost:3090. 151 Root *url.URL 152 153 // Hostname is the name we advertise to Sourcegraph when asking for the 154 // list of repositories to index. 155 Hostname string 156 157 // BatchSize is how many repository configurations we request at once. If 158 // zero a value of 10000 is used. 159 BatchSize int 160 161 // restClient is used to make requests to the Sourcegraph instance. Prefer to 162 // use .doRequest() to ensure the appropriate headers are set. 163 restClient *retryablehttp.Client 164 165 // grpcClient is used to make requests to the Sourcegraph instance if gRPC is enabled. 166 grpcClient proto.ZoektConfigurationServiceClient 167 168 // configFingerprint is the last config fingerprint returned from 169 // Sourcegraph. It can be used for future calls to the configuration 170 // endpoint. 171 // 172 // configFingerprint is mutually exclusive with configFingerprintProto - this field 173 // will only be used if gRPC is disabled. 174 configFingerprint string 175 176 // configFingerprintProto is the last config fingerprint (as GRPC) returned from 177 // Sourcegraph. It can be used for future calls to the configuration 178 // endpoint. 179 // 180 // configFingerprintProto is mutually exclusive with configFingerprint - this field 181 // will only be used if gRPC is enabled. 182 configFingerprintProto *proto.Fingerprint 183 184 // configFingerprintReset tracks when we should zero out the 185 // configFingerprint. We want to periodically do this just in case our 186 // configFingerprint logic is faulty. When it is cleared out, we fallback to 187 // calculating everything. 188 configFingerprintReset time.Time 189 190 // useGRPC indicates whether we should use a gRPC client to communicate with Sourcegraph. 191 useGRPC bool 192} 193 194// GetDocumentRanks asks Sourcegraph for a mapping of file paths to rank 195// vectors. 196func (s *sourcegraphClient) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) { 197 if s.useGRPC { 198 return s.getDocumentRanksGRPC(ctx, repoName) 199 } 200 201 return s.getDocumentRanksREST(ctx, repoName) 202} 203 204func (s *sourcegraphClient) getDocumentRanksGRPC(ctx context.Context, repoName string) (RepoPathRanks, error) { 205 resp, err := s.grpcClient.DocumentRanks(ctx, &proto.DocumentRanksRequest{Repository: repoName}) 206 if err != nil { 207 return RepoPathRanks{}, err 208 } 209 210 var out RepoPathRanks 211 out.FromProto(resp) 212 213 return out, nil 214} 215 216func (s *sourcegraphClient) getDocumentRanksREST(ctx context.Context, repoName string) (RepoPathRanks, error) { 217 u := s.Root.ResolveReference(&url.URL{ 218 Path: "/.internal/ranks/" + strings.Trim(repoName, "/") + "/documents", 219 }) 220 221 b, err := s.get(ctx, u) 222 if err != nil { 223 return RepoPathRanks{}, err 224 } 225 226 ranks := RepoPathRanks{} 227 err = json.Unmarshal(b, &ranks) 228 if err != nil { 229 return RepoPathRanks{}, err 230 } 231 232 return ranks, nil 233} 234 235func (s *sourcegraphClient) get(ctx context.Context, u *url.URL) ([]byte, error) { 236 req, err := retryablehttp.NewRequestWithContext(ctx, "GET", u.String(), nil) 237 if err != nil { 238 return nil, err 239 } 240 241 resp, err := s.doRequest(req) 242 if err != nil { 243 return nil, err 244 } 245 defer resp.Body.Close() 246 247 if resp.StatusCode != http.StatusOK { 248 b, err := io.ReadAll(io.LimitReader(resp.Body, 1024)) 249 _ = resp.Body.Close() 250 if err != nil { 251 return nil, err 252 } 253 return nil, &url.Error{ 254 Op: "Get", 255 URL: u.String(), 256 Err: fmt.Errorf("%s: %s", resp.Status, string(b)), 257 } 258 } 259 260 b, err := io.ReadAll(resp.Body) 261 if err != nil { 262 return nil, err 263 } 264 265 return b, nil 266} 267 268func (s *sourcegraphClient) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 269 repos, err := s.listRepoIDs(ctx, indexed) 270 if err != nil { 271 return nil, fmt.Errorf("listRepoIDs: %w", err) 272 } 273 274 batchSize := s.BatchSize 275 if batchSize == 0 { 276 batchSize = 10_000 277 } 278 279 // Check if we should recalculate everything. 280 if time.Now().After(s.configFingerprintReset) { 281 // for every 500 repos we wait a minute. 2021-12-15 on sourcegraph.com 282 // this works out to every 100 minutes. 283 next := time.Duration(len(indexed)) * time.Minute / 500 284 if min := 5 * time.Minute; next < min { 285 next = min 286 } 287 next += time.Duration(rand.Int63n(int64(next) / 4)) // jitter 288 s.configFingerprintReset = time.Now().Add(next) 289 290 s.configFingerprintProto = nil 291 s.configFingerprint = "" 292 } 293 294 // getIndexOptionsFunc is a function that can be used to get the index 295 // options for a set of repos (while properly handling any configuration fingerprint 296 // changes). 297 // 298 // In general, this function provides a consistent fingerprint for each batch call, 299 // and updates the server state with the new fingerprint. If any of the batch calls 300 // fail, the old fingerprint is restored. 301 type getIndexOptionsFunc func(repos ...uint32) ([]indexOptionsItem, error) 302 303 // default to REST 304 mkGetIndexOptionsFunc := func(tr trace.Trace) getIndexOptionsFunc { 305 startingFingerPrint := s.configFingerprint 306 tr.LazyPrintf("fingerprint: %s", startingFingerPrint) 307 308 first := true 309 return func(repos ...uint32) ([]indexOptionsItem, error) { 310 options, nextFingerPrint, err := s.getIndexOptionsREST(startingFingerPrint, repos...) 311 if err != nil { 312 first = false 313 s.configFingerprint = startingFingerPrint 314 315 return nil, err 316 } 317 318 if first { 319 first = false 320 s.configFingerprint = nextFingerPrint 321 322 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint) 323 } 324 325 return options, nil 326 } 327 } 328 329 // If we enabled GRPC, use our gRPC client instead. 330 if s.useGRPC { 331 mkGetIndexOptionsFunc = func(tr trace.Trace) getIndexOptionsFunc { 332 startingFingerPrint := s.configFingerprintProto 333 tr.LazyPrintf("fingerprint: %s", startingFingerPrint.String()) 334 335 first := true 336 return func(repos ...uint32) ([]indexOptionsItem, error) { 337 options, nextFingerPrint, err := s.getIndexOptionsGRPC(ctx, startingFingerPrint, repos) 338 if err != nil { 339 first = false 340 s.configFingerprintProto = startingFingerPrint 341 342 return nil, err 343 } 344 345 if first { 346 first = false 347 s.configFingerprintProto = nextFingerPrint 348 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint.String()) 349 } 350 351 return options, nil 352 } 353 } 354 } 355 356 iterate := func(f func(IndexOptions)) { 357 start := time.Now() 358 tr := trace.New("getIndexOptions", "") 359 tr.LazyPrintf("getting index options for %d repos", len(repos)) 360 361 defer func() { 362 metricResolveRevisionsDuration.Observe(time.Since(start).Seconds()) 363 tr.Finish() 364 }() 365 366 getIndexOptions := mkGetIndexOptionsFunc(tr) 367 368 // We ask the frontend to get index options in batches. 369 for repos := range batched(repos, batchSize) { 370 start := time.Now() 371 options, err := getIndexOptions(repos...) 372 duration := time.Since(start) 373 374 if err != nil { 375 metricResolveRevisionDuration.WithLabelValues("false").Observe(duration.Seconds()) 376 tr.LazyPrintf("failed fetching options batch: %v", err) 377 tr.SetError() 378 379 continue 380 } 381 382 metricResolveRevisionDuration.WithLabelValues("true").Observe(duration.Seconds()) 383 384 for _, o := range options { 385 metricGetIndexOptions.Inc() 386 387 if o.Error != "" { 388 metricGetIndexOptionsError.Inc() 389 tr.LazyPrintf("failed fetching options for %v: %v", o.Name, o.Error) 390 tr.SetError() 391 392 continue 393 } 394 f(o.IndexOptions) 395 } 396 } 397 } 398 399 return &SourcegraphListResult{ 400 IDs: repos, 401 IterateIndexOptions: iterate, 402 }, nil 403} 404 405func (s *sourcegraphClient) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 406 batchSize := s.BatchSize 407 if batchSize == 0 { 408 batchSize = 10_000 409 } 410 411 getIndexOptions := func(repos ...uint32) ([]indexOptionsItem, error) { 412 opts, _, err := s.getIndexOptionsREST("", repos...) 413 return opts, err 414 } 415 416 if s.useGRPC { 417 getIndexOptions = func(repos ...uint32) ([]indexOptionsItem, error) { 418 opts, _, err := s.getIndexOptionsGRPC(context.Background(), nil, repos) 419 return opts, err 420 } 421 } 422 423 for repos := range batched(repos, batchSize) { 424 opts, err := getIndexOptions(repos...) 425 if err != nil { 426 for _, id := range repos { 427 onError(id, err) 428 } 429 continue 430 } 431 for _, o := range opts { 432 if o.RepoID > 0 && o.Error != "" { 433 onError(o.RepoID, errors.New(o.Error)) 434 } 435 if o.Error == "" { 436 onSuccess(o.IndexOptions) 437 } 438 } 439 } 440} 441 442// indexOptionsItem wraps IndexOptions to also include an error returned by 443// the API. 444type indexOptionsItem struct { 445 IndexOptions 446 Error string 447} 448 449func (o *indexOptionsItem) FromProto(x *proto.ZoektIndexOptions) { 450 branches := make([]zoekt.RepositoryBranch, 0, len(x.Branches)) 451 for _, b := range x.GetBranches() { 452 branches = append(branches, zoekt.RepositoryBranch{ 453 Name: b.GetName(), 454 Version: b.GetVersion(), 455 }) 456 } 457 458 item := indexOptionsItem{} 459 languageMap := make(map[string]ctags.CTagsParserType) 460 461 for _, lang := range x.GetLanguageMap() { 462 languageMap[lang.GetLanguage()] = ctags.CTagsParserType(lang.GetCtags().Number()) 463 } 464 465 item.IndexOptions = IndexOptions{ 466 RepoID: uint32(x.GetRepoId()), 467 LargeFiles: x.GetLargeFiles(), 468 Symbols: x.GetSymbols(), 469 Branches: branches, 470 Name: x.GetName(), 471 472 Priority: x.GetPriority(), 473 474 DocumentRanksVersion: x.GetDocumentRanksVersion(), 475 476 Public: x.GetPublic(), 477 Fork: x.GetFork(), 478 Archived: x.GetArchived(), 479 480 LanguageMap: languageMap, 481 ShardConcurrency: x.GetShardConcurrency(), 482 } 483 484 item.Error = x.GetError() 485 486 *o = item 487} 488 489func (o *indexOptionsItem) ToProto() *proto.ZoektIndexOptions { 490 branches := make([]*proto.ZoektRepositoryBranch, 0, len(o.Branches)) 491 for _, b := range o.Branches { 492 branches = append(branches, &proto.ZoektRepositoryBranch{ 493 Name: b.Name, 494 Version: b.Version, 495 }) 496 } 497 498 languageMap := make([]*proto.LanguageMapping, 0, len(o.LanguageMap)) 499 500 for lang, parser := range o.LanguageMap { 501 languageMap = append(languageMap, &proto.LanguageMapping{ 502 Language: lang, 503 Ctags: proto.CTagsParserType(parser), 504 }) 505 } 506 507 return &proto.ZoektIndexOptions{ 508 RepoId: int32(o.RepoID), 509 LargeFiles: o.LargeFiles, 510 Symbols: o.Symbols, 511 Branches: branches, 512 Name: o.Name, 513 514 Priority: o.Priority, 515 516 DocumentRanksVersion: o.DocumentRanksVersion, 517 518 Public: o.Public, 519 Fork: o.Fork, 520 Archived: o.Archived, 521 522 Error: o.Error, 523 524 LanguageMap: languageMap, 525 ShardConcurrency: o.ShardConcurrency, 526 } 527} 528 529func (s *sourcegraphClient) getIndexOptionsGRPC(ctx context.Context, fingerprint *proto.Fingerprint, repos []uint32) ([]indexOptionsItem, *proto.Fingerprint, error) { 530 repoIDs := make([]int32, 0, len(repos)) 531 for _, id := range repos { 532 repoIDs = append(repoIDs, int32(id)) 533 } 534 535 req := proto.SearchConfigurationRequest{ 536 RepoIds: repoIDs, 537 Fingerprint: fingerprint, 538 } 539 540 response, err := s.grpcClient.SearchConfiguration(ctx, &req) 541 if err != nil { 542 return nil, nil, err 543 } 544 545 protoItems := response.GetUpdatedOptions() 546 items := make([]indexOptionsItem, 0, len(protoItems)) 547 for _, x := range protoItems { 548 var item indexOptionsItem 549 item.FromProto(x) 550 item.IndexOptions.CloneURL = s.getCloneURL(item.Name) 551 552 items = append(items, item) 553 } 554 555 return items, response.GetFingerprint(), nil 556} 557 558const fingerprintHeader = "X-Sourcegraph-Config-Fingerprint" 559 560func (s *sourcegraphClient) getIndexOptionsREST(fingerprint string, repos ...uint32) ([]indexOptionsItem, string, error) { 561 u := s.Root.ResolveReference(&url.URL{ 562 Path: "/.internal/search/configuration", 563 }) 564 565 repoIDs := make([]string, len(repos)) 566 for i, id := range repos { 567 repoIDs[i] = strconv.Itoa(int(id)) 568 } 569 data := url.Values{"repoID": repoIDs} 570 req, err := retryablehttp.NewRequest("POST", u.String(), []byte(data.Encode())) 571 if err != nil { 572 return nil, "", err 573 } 574 req.Header.Set("Content-Type", "application/x-www-form-urlencoded") 575 if fingerprint != "" { 576 req.Header.Set(fingerprintHeader, fingerprint) 577 } 578 579 resp, err := s.doRequest(req) 580 if err != nil { 581 return nil, "", err 582 } 583 defer resp.Body.Close() 584 585 if resp.StatusCode != http.StatusOK { 586 b, err := io.ReadAll(io.LimitReader(resp.Body, 1024)) 587 _ = resp.Body.Close() 588 if err != nil { 589 return nil, "", err 590 } 591 return nil, "", &url.Error{ 592 Op: "Get", 593 URL: u.String(), 594 Err: fmt.Errorf("%s: %s", resp.Status, string(b)), 595 } 596 } 597 598 dec := json.NewDecoder(resp.Body) 599 var opts []indexOptionsItem 600 for { 601 var opt indexOptionsItem 602 err := dec.Decode(&opt) 603 if err == io.EOF { 604 break 605 } 606 if err != nil { 607 return nil, "", fmt.Errorf("error decoding body: %w", err) 608 } 609 opt.CloneURL = s.getCloneURL(opt.Name) 610 opts = append(opts, opt) 611 } 612 613 return opts, resp.Header.Get(fingerprintHeader), nil 614} 615 616func (s *sourcegraphClient) getCloneURL(name string) string { 617 return s.Root.ResolveReference(&url.URL{Path: path.Join("/.internal/git", name)}).String() 618} 619 620func (s *sourcegraphClient) listRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) { 621 if s.useGRPC { 622 return s.listRepoIDsGRPC(ctx, indexed) 623 } 624 625 return s.listRepoIDsREST(ctx, indexed) 626} 627 628func (s *sourcegraphClient) listRepoIDsGRPC(ctx context.Context, indexed []uint32) ([]uint32, error) { 629 var request proto.ListRequest 630 request.Hostname = s.Hostname 631 request.IndexedIds = make([]int32, 0, len(indexed)) 632 for _, id := range indexed { 633 request.IndexedIds = append(request.IndexedIds, int32(id)) 634 } 635 636 response, err := s.grpcClient.List(ctx, &request) 637 if err != nil { 638 return nil, err 639 } 640 641 repoIDs := make([]uint32, 0, len(response.RepoIds)) 642 for _, id := range response.RepoIds { 643 repoIDs = append(repoIDs, uint32(id)) 644 } 645 646 return repoIDs, nil 647} 648 649func (s *sourcegraphClient) listRepoIDsREST(_ context.Context, indexed []uint32) ([]uint32, error) { 650 body, err := json.Marshal(&struct { 651 Hostname string 652 IndexedIDs []uint32 653 }{ 654 Hostname: s.Hostname, 655 IndexedIDs: indexed, 656 }) 657 if err != nil { 658 return nil, err 659 } 660 661 u := s.Root.ResolveReference(&url.URL{Path: "/.internal/repos/index"}) 662 req, err := retryablehttp.NewRequest(http.MethodPost, u.String(), bytes.NewReader(body)) 663 if err != nil { 664 return nil, err 665 } 666 req.Header.Set("Content-Type", "application/json; charset=utf-8") 667 668 resp, err := s.doRequest(req) 669 if err != nil { 670 return nil, err 671 } 672 defer resp.Body.Close() 673 674 if resp.StatusCode != http.StatusOK { 675 return nil, fmt.Errorf("failed to list repositories: status %s", resp.Status) 676 } 677 678 var data struct { 679 RepoIDs []uint32 680 } 681 err = json.NewDecoder(resp.Body).Decode(&data) 682 if err != nil { 683 return nil, err 684 } 685 686 return data.RepoIDs, nil 687} 688 689type indexStatus struct { 690 RepoID uint32 691 Branches []zoekt.RepositoryBranch 692 IndexTimeUnix int64 693} 694 695type updateIndexStatusRequest struct { 696 Repositories []indexStatus 697} 698 699func (u *updateIndexStatusRequest) ToProto() *proto.UpdateIndexStatusRequest { 700 repositories := make([]*proto.UpdateIndexStatusRequest_Repository, 0, len(u.Repositories)) 701 702 for _, repo := range u.Repositories { 703 branches := make([]*proto.ZoektRepositoryBranch, 0, len(repo.Branches)) 704 705 for _, branch := range repo.Branches { 706 branches = append(branches, &proto.ZoektRepositoryBranch{ 707 Name: branch.Name, 708 Version: branch.Version, 709 }) 710 } 711 712 repositories = append(repositories, &proto.UpdateIndexStatusRequest_Repository{ 713 RepoId: repo.RepoID, 714 Branches: branches, 715 IndexTimeUnix: repo.IndexTimeUnix, 716 }) 717 } 718 719 return &proto.UpdateIndexStatusRequest{ 720 Repositories: repositories, 721 } 722} 723 724func (u *updateIndexStatusRequest) FromProto(x *proto.UpdateIndexStatusRequest) { 725 protoRepositories := x.GetRepositories() 726 repositories := make([]indexStatus, 0, len(protoRepositories)) 727 728 for _, repo := range x.GetRepositories() { 729 protoBranches := repo.GetBranches() 730 branches := make([]zoekt.RepositoryBranch, 0, len(protoBranches)) 731 732 for _, branch := range repo.GetBranches() { 733 branches = append(branches, zoekt.RepositoryBranch{ 734 Name: branch.GetName(), 735 Version: branch.GetVersion(), 736 }) 737 } 738 739 repositories = append(repositories, indexStatus{ 740 RepoID: repo.GetRepoId(), 741 Branches: branches, 742 IndexTimeUnix: repo.GetIndexTimeUnix(), 743 }) 744 } 745 746 *u = updateIndexStatusRequest{ 747 Repositories: repositories, 748 } 749} 750 751// UpdateIndexStatus sends a request to Sourcegraph to confirm that the given 752// repositories have been indexed. 753func (s *sourcegraphClient) UpdateIndexStatus(repositories []indexStatus) error { 754 r := updateIndexStatusRequest{Repositories: repositories} 755 756 if s.useGRPC { 757 return s.updateIndexStatusGRPC(r) 758 } 759 760 return s.updateIndexStatusREST(r) 761} 762 763func (s *sourcegraphClient) updateIndexStatusGRPC(r updateIndexStatusRequest) error { 764 request := r.ToProto() 765 _, err := s.grpcClient.UpdateIndexStatus(context.Background(), request) 766 767 if err != nil { 768 return fmt.Errorf("failed to update index status: %w", err) 769 } 770 771 return nil 772} 773 774func (s *sourcegraphClient) updateIndexStatusREST(r updateIndexStatusRequest) error { 775 payload, err := json.Marshal(r) 776 if err != nil { 777 return err 778 } 779 780 u := s.Root.ResolveReference(&url.URL{Path: "/.internal/search/index-status"}) 781 req, err := retryablehttp.NewRequest(http.MethodPost, u.String(), bytes.NewReader(payload)) 782 if err != nil { 783 return err 784 } 785 req.Header.Set("Content-Type", "application/json; charset=utf-8") 786 787 resp, err := s.doRequest(req) 788 if err != nil { 789 return err 790 } 791 defer resp.Body.Close() 792 793 if resp.StatusCode != http.StatusOK { 794 return fmt.Errorf("failed to update index status: status %s", resp.Status) 795 } 796 797 return nil 798} 799 800// doRequest executes the provided request after adding the appropriate headers 801// for interacting with a Sourcegraph instance. 802func (s *sourcegraphClient) doRequest(req *retryablehttp.Request) (*http.Response, error) { 803 // Make all requests as an internal user. 804 // 805 // Should match github.com/sourcegraph/sourcegraph/internal/actor.headerKeyActorUID 806 // and github.com/sourcegraph/sourcegraph/internal/actor.headerValueInternalActor 807 req.Header.Set("X-Sourcegraph-Actor-UID", "internal") 808 return s.restClient.Do(req) 809} 810 811type sourcegraphFake struct { 812 RootDir string 813 Log *log.Logger 814} 815 816// GetDocumentRanks expects a file where each line has the following format: 817// path<tab>rank... where rank is a float64. 818func (sf sourcegraphFake) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) { 819 dir := filepath.Join(sf.RootDir, filepath.FromSlash(repoName)) 820 821 fd, err := os.Open(filepath.Join(dir, "SG_DOCUMENT_RANKS")) 822 if err != nil { 823 return RepoPathRanks{}, err 824 } 825 826 ranks := RepoPathRanks{} 827 828 sum := 0.0 829 count := 0 830 scanner := bufio.NewScanner(fd) 831 for scanner.Scan() { 832 s := scanner.Text() 833 pathRanks := strings.Split(s, "\t") 834 if rank, err := strconv.ParseFloat(pathRanks[1], 64); err == nil { 835 ranks.Paths[pathRanks[0]] = rank 836 sum += rank 837 count++ 838 } 839 } 840 841 if err := scanner.Err(); err != nil { 842 return RepoPathRanks{}, err 843 } 844 845 ranks.MeanRank = sum / float64(count) 846 return ranks, nil 847} 848 849func floats64(s string) []float64 { 850 parts := strings.Split(s, ",") 851 852 var r []float64 853 for _, rank := range parts { 854 f, err := strconv.ParseFloat(rank, 64) 855 if err != nil { 856 return nil 857 } 858 r = append(r, f) 859 } 860 861 return r 862} 863 864func (sf sourcegraphFake) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 865 repos, err := sf.ListRepoIDs(ctx, indexed) 866 if err != nil { 867 return nil, err 868 } 869 870 iterate := func(f func(IndexOptions)) { 871 opts, err := sf.GetIndexOptions(repos...) 872 if err != nil { 873 sf.Log.Printf("WARN: ignoring GetIndexOptions error: %v", err) 874 } 875 for _, opt := range opts { 876 if opt.Error != "" { 877 sf.Log.Printf("WARN: ignoring GetIndexOptions error for %s: %v", opt.Name, opt.Error) 878 continue 879 } 880 f(opt.IndexOptions) 881 } 882 } 883 884 return &SourcegraphListResult{ 885 IDs: repos, 886 IterateIndexOptions: iterate, 887 }, nil 888} 889 890func (sf sourcegraphFake) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 891 opts, err := sf.GetIndexOptions(repos...) 892 if err != nil { 893 for _, id := range repos { 894 onError(id, err) 895 } 896 return 897 } 898 for _, o := range opts { 899 if o.RepoID > 0 && o.Error != "" { 900 onError(o.RepoID, errors.New(o.Error)) 901 } 902 if o.Error == "" { 903 onSuccess(o.IndexOptions) 904 } 905 } 906} 907 908func (sf sourcegraphFake) GetIndexOptions(repos ...uint32) ([]indexOptionsItem, error) { 909 reposIdx := map[uint32]int{} 910 for i, id := range repos { 911 reposIdx[id] = i 912 } 913 914 items := make([]indexOptionsItem, len(repos)) 915 err := sf.visitRepos(func(name string) { 916 idx, ok := reposIdx[sf.id(name)] 917 if !ok { 918 return 919 } 920 opts, err := sf.getIndexOptions(name) 921 if err != nil { 922 items[idx] = indexOptionsItem{Error: err.Error()} 923 } else { 924 items[idx] = indexOptionsItem{IndexOptions: opts} 925 } 926 }) 927 928 if err != nil { 929 return nil, err 930 } 931 932 for i := range items { 933 if items[i].Error == "" && items[i].RepoID == 0 { 934 items[i].Error = "not found" 935 } 936 } 937 938 return items, nil 939} 940 941func (sf sourcegraphFake) getIndexOptions(name string) (IndexOptions, error) { 942 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name)) 943 exists := func(p string) bool { 944 _, err := os.Stat(filepath.Join(dir, p)) 945 return err == nil 946 } 947 float := func(p string) float64 { 948 b, _ := os.ReadFile(filepath.Join(dir, p)) 949 f, _ := strconv.ParseFloat(string(bytes.TrimSpace(b)), 64) 950 return f 951 } 952 953 opts := IndexOptions{ 954 RepoID: sf.id(name), 955 Name: name, 956 CloneURL: sf.getCloneURL(name), 957 Symbols: true, 958 959 Public: !exists("SG_PRIVATE"), 960 Fork: exists("SG_FORK"), 961 Archived: exists("SG_ARCHIVED"), 962 963 Priority: float("SG_PRIORITY"), 964 } 965 966 if stat, err := os.Stat(filepath.Join(dir, "SG_DOCUMENT_RANKS")); err == nil { 967 opts.DocumentRanksVersion = stat.ModTime().String() 968 } 969 970 branches, err := sf.getBranches(name) 971 if err != nil { 972 return opts, err 973 } 974 opts.Branches = branches 975 976 return opts, nil 977} 978 979func (sf sourcegraphFake) getBranches(name string) ([]zoekt.RepositoryBranch, error) { 980 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name)) 981 repo, err := git.PlainOpen(dir) 982 if err != nil { 983 return nil, err 984 } 985 986 cfg, err := repo.Config() 987 if err != nil { 988 return nil, err 989 } 990 991 sec := cfg.Raw.Section("zoekt") 992 branches := sec.Options.GetAll("branch") 993 if len(branches) == 0 { 994 branches = append(branches, "HEAD") 995 } 996 997 rBranches := make([]zoekt.RepositoryBranch, 0, len(branches)) 998 for _, branch := range branches { 999 cmd := exec.Command("git", "rev-parse", branch) 1000 cmd.Dir = dir 1001 if b, err := cmd.Output(); err != nil { 1002 sf.Log.Printf("WARN: Could not get branch %s/%s", name, branch) 1003 } else { 1004 version := string(bytes.TrimSpace(b)) 1005 rBranches = append(rBranches, zoekt.RepositoryBranch{ 1006 Name: branch, 1007 Version: version, 1008 }) 1009 } 1010 } 1011 1012 if len(rBranches) == 0 { 1013 return nil, fmt.Errorf("WARN: Could not get any branch revisions for repo %s", name) 1014 } 1015 1016 return rBranches, nil 1017} 1018 1019func (sf sourcegraphFake) id(name string) uint32 { 1020 // allow overriding the ID. 1021 idPath := filepath.Join(sf.RootDir, filepath.FromSlash(name), "SG_ID") 1022 if b, _ := os.ReadFile(idPath); len(b) > 0 { 1023 id, err := strconv.Atoi(strings.TrimSpace(string(b))) 1024 if err == nil { 1025 return uint32(id) 1026 } 1027 } 1028 return fakeID(name) 1029} 1030 1031func (sf sourcegraphFake) getCloneURL(name string) string { 1032 return filepath.Join(sf.RootDir, filepath.FromSlash(name)) 1033} 1034 1035func (sf sourcegraphFake) ListRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) { 1036 var repos []uint32 1037 err := sf.visitRepos(func(name string) { 1038 repos = append(repos, sf.id(name)) 1039 }) 1040 return repos, err 1041} 1042 1043func (sf sourcegraphFake) visitRepos(visit func(name string)) error { 1044 return filepath.Walk(sf.RootDir, func(path string, fi os.FileInfo, fileErr error) error { 1045 if fileErr != nil { 1046 sf.Log.Printf("WARN: ignoring error searching %s: %v", path, fileErr) 1047 return nil 1048 } 1049 if !fi.IsDir() { 1050 return nil 1051 } 1052 1053 gitdir := filepath.Join(path, ".git") 1054 if fi, err := os.Stat(gitdir); err != nil || !fi.IsDir() { 1055 return nil 1056 } 1057 1058 subpath, err := filepath.Rel(sf.RootDir, path) 1059 if err != nil { 1060 // According to WalkFunc docs, path is always filepath.Join(root, 1061 // subpath). So Rel should always work. 1062 return fmt.Errorf("filepath.Walk returned %s which is not relative to %s: %w", path, sf.RootDir, err) 1063 } 1064 1065 name := filepath.ToSlash(subpath) 1066 visit(name) 1067 1068 return filepath.SkipDir 1069 }) 1070} 1071 1072func (s sourcegraphFake) UpdateIndexStatus(repositories []indexStatus) error { 1073 // noop 1074 return nil 1075} 1076 1077// fakeID returns a deterministic ID based on name. Used for fakes and tests. 1078func fakeID(name string) uint32 { 1079 // magic at the end is to ensure we get a positive number when casting. 1080 return uint32(crc32.ChecksumIEEE([]byte(name))%(1<<31-1) + 1) 1081} 1082 1083type sourcegraphNop struct{} 1084 1085func (s sourcegraphNop) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 1086 return nil, nil 1087} 1088 1089func (s sourcegraphNop) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 1090 return 1091} 1092 1093func (s sourcegraphNop) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) { 1094 return RepoPathRanks{}, nil 1095} 1096 1097func (s sourcegraphNop) UpdateIndexStatus(repositories []indexStatus) error { 1098 return nil 1099} 1100 1101type RepoPathRanks struct { 1102 MeanRank float64 `json:"mean_reference_count"` 1103 Paths map[string]float64 `json:"paths"` 1104} 1105 1106func (r *RepoPathRanks) FromProto(x *proto.DocumentRanksResponse) { 1107 protoPaths := x.GetPaths() 1108 ranks := make(map[string]float64, len(protoPaths)) 1109 for filePath, rank := range protoPaths { 1110 ranks[filePath] = rank 1111 } 1112 1113 *r = RepoPathRanks{ 1114 MeanRank: x.GetMeanRank(), 1115 Paths: ranks, 1116 } 1117} 1118 1119func (r *RepoPathRanks) ToProto() *proto.DocumentRanksResponse { 1120 paths := make(map[string]float64, len(r.Paths)) 1121 for filePath, rank := range r.Paths { 1122 paths[filePath] = rank 1123 } 1124 1125 return &proto.DocumentRanksResponse{ 1126 MeanRank: r.MeanRank, 1127 Paths: paths, 1128 } 1129} 1130 1131type noopGRPCClient struct{} 1132 1133func (n noopGRPCClient) SearchConfiguration(ctx context.Context, in *proto.SearchConfigurationRequest, opts ...grpc.CallOption) (*proto.SearchConfigurationResponse, error) { 1134 return nil, fmt.Errorf("grpc client not enabled") 1135} 1136 1137func (n noopGRPCClient) List(ctx context.Context, in *proto.ListRequest, opts ...grpc.CallOption) (*proto.ListResponse, error) { 1138 return nil, fmt.Errorf("grpc client not enabled") 1139} 1140 1141func (n noopGRPCClient) DocumentRanks(ctx context.Context, in *proto.DocumentRanksRequest, opts ...grpc.CallOption) (*proto.DocumentRanksResponse, error) { 1142 return nil, fmt.Errorf("grpc client not enabled") 1143} 1144 1145func (n noopGRPCClient) UpdateIndexStatus(ctx context.Context, in *proto.UpdateIndexStatusRequest, opts ...grpc.CallOption) (*proto.UpdateIndexStatusResponse, error) { 1146 return nil, fmt.Errorf("grpc client not enabled") 1147} 1148 1149var _ proto.ZoektConfigurationServiceClient = noopGRPCClient{}