fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

1package main 2 3import ( 4 "bufio" 5 "bytes" 6 "context" 7 "encoding/json" 8 "errors" 9 "fmt" 10 "hash/crc32" 11 "io" 12 "log" 13 "math/rand" 14 "net/http" 15 "net/url" 16 "os" 17 "os/exec" 18 "path" 19 "path/filepath" 20 "strconv" 21 "strings" 22 "time" 23 24 "github.com/go-git/go-git/v5" 25 retryablehttp "github.com/hashicorp/go-retryablehttp" 26 "golang.org/x/net/trace" 27 "google.golang.org/grpc" 28 29 "github.com/sourcegraph/zoekt" 30 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1" 31) 32 33// SourcegraphListResult is the return value of Sourcegraph.List. It is its 34// own type since internally we batch the calculation of index options. This 35// is exposed via IterateIndexOptions. 36// 37// This type has state and is coupled to the Sourcegraph implementation. 38type SourcegraphListResult struct { 39 // IDs is the set of Sourcegraph repository IDs that this replica needs 40 // to index. 41 IDs []uint32 42 43 // IterateIndexOptions best effort resolves the IndexOptions for RepoIDs. If 44 // any repository fails it internally logs. It uses the "config fingerprint" 45 // to reduce the amount of work done. This means we only resolve options for 46 // repositories which have been mutated since the last Sourcegraph.List 47 // call. 48 // 49 // Note: this has a side-effect of setting a the "config fingerprint". The 50 // config fingerprint means we only calculate index options for repositories 51 // that have changed since the last call to IterateIndexOptions. If you want 52 // to force calculation of index options use 53 // Sourcegraph.ForceIterateIndexOptions. 54 // 55 // Note: This should not be called concurrently with the Sourcegraph client. 56 IterateIndexOptions func(func(IndexOptions)) 57} 58 59// Sourcegraph represents the Sourcegraph service. It informs the indexserver 60// what to index and which options to use. 61type Sourcegraph interface { 62 // List returns a list of repository IDs to index as well as a facility to 63 // fetch the indexing options. 64 // 65 // Note: The return value is not safe to use concurrently with future calls 66 // to List. 67 List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) 68 69 // ForceIterateIndexOptions will best-effort calculate the index options for 70 // all repos. For each repo it will call either onSuccess or onError. This 71 // is the forced version of IterateIndexOptions, so will always calculate 72 // options for each id in repos. 73 ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) 74 75 // GetRepoRank returns a rank vector for the given repository. Repositories are 76 // assumed to be ordered by each pairwise component of the resulting vector, 77 // higher ranks coming earlier. 78 GetRepoRank(ctx context.Context, repoName string) ([]float64, error) 79 80 // GetDocumentRanks returns a map from paths within the given repo to their 81 // rank vectors. Paths are assumed to be ordered by each pairwise component of 82 // the resulting vector, higher ranks coming earlier 83 GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) 84 85 // UpdateIndexStatus sends a request to Sourcegraph to confirm that the 86 // given repositories have been indexed. 87 UpdateIndexStatus(repositories []indexStatus) error 88} 89 90type SourcegraphClientOption func(*sourcegraphClient) 91 92// WithBatchSize controls how many repository configurations we request a time. 93// If BatchSize is 0, we default to requesting 10,000 repositories at once. 94func WithBatchSize(batchSize int) SourcegraphClientOption { 95 return func(c *sourcegraphClient) { 96 c.BatchSize = batchSize 97 } 98} 99 100// WithShouldUseGRPC enables or disables the use of gRPC for communicating with Sourcegraph. 101func WithShouldUseGRPC(useGRPC bool) SourcegraphClientOption { 102 return func(c *sourcegraphClient) { 103 c.useGRPC = useGRPC 104 } 105} 106 107// WithGRPCClient sets the gRPC client to use for communicating with Sourcegraph. 108func WithGRPCClient(client proto.ZoektConfigurationServiceClient) SourcegraphClientOption { 109 return func(c *sourcegraphClient) { 110 c.grpcClient = client 111 } 112} 113 114func newSourcegraphClient(rootURL *url.URL, hostname string, opts ...SourcegraphClientOption) *sourcegraphClient { 115 httpClient := retryablehttp.NewClient() 116 httpClient.Logger = debug 117 118 // Sourcegraph might return an error message in the body if StatusCode==500. The 119 // default behavior of the go-retryablehttp restClient is to drain the body and not 120 // to propagate the error. Hence, we call ErrorPropagatedRetryPolicy instead of 121 // DefaultRetryPolicy and augment the error with the response body if possible. 122 httpClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) { 123 shouldRetry, checkErr := retryablehttp.ErrorPropagatedRetryPolicy(ctx, resp, err) 124 125 if resp != nil && resp.StatusCode == http.StatusInternalServerError { 126 if b, e := io.ReadAll(resp.Body); e == nil { 127 checkErr = fmt.Errorf("%w: body=%q", checkErr, string(b)) 128 } 129 } 130 131 return shouldRetry, checkErr 132 } 133 134 client := &sourcegraphClient{ 135 Root: rootURL, 136 restClient: httpClient, 137 Hostname: hostname, 138 BatchSize: 0, 139 grpcClient: noopGRPCClient{}, 140 useGRPC: false, // disable gRPC by default 141 } 142 143 for _, opt := range opts { 144 opt(client) 145 } 146 147 return client 148 149} 150 151// sourcegraphClient contains methods which interact with the sourcegraph API. 152type sourcegraphClient struct { 153 // Root is the base URL for the Sourcegraph instance to index. Normally 154 // http://sourcegraph-frontend-internal or http://localhost:3090. 155 Root *url.URL 156 157 // Hostname is the name we advertise to Sourcegraph when asking for the 158 // list of repositories to index. 159 Hostname string 160 161 // BatchSize is how many repository configurations we request at once. If 162 // zero a value of 10000 is used. 163 BatchSize int 164 165 // restClient is used to make requests to the Sourcegraph instance. Prefer to 166 // use .doRequest() to ensure the appropriate headers are set. 167 restClient *retryablehttp.Client 168 169 // grpcClient is used to make requests to the Sourcegraph instance if gRPC is enabled. 170 grpcClient proto.ZoektConfigurationServiceClient 171 172 // configFingerprint is the last config fingerprint returned from 173 // Sourcegraph. It can be used for future calls to the configuration 174 // endpoint. 175 // 176 // configFingerprint is mutually exclusive with configFingerprintProto - this field 177 // will only be used if gRPC is disabled. 178 configFingerprint string 179 180 // configFingerprintProto is the last config fingerprint (as GRPC) returned from 181 // Sourcegraph. It can be used for future calls to the configuration 182 // endpoint. 183 // 184 // configFingerprintProto is mutually exclusive with configFingerprint - this field 185 // will only be used if gRPC is enabled. 186 configFingerprintProto *proto.Fingerprint 187 188 // configFingerprintReset tracks when we should zero out the 189 // configFingerprint. We want to periodically do this just in case our 190 // configFingerprint logic is faulty. When it is cleared out, we fallback to 191 // calculating everything. 192 configFingerprintReset time.Time 193 194 // useGRPC indicates whether we should use a gRPC client to communicate with Sourcegraph. 195 useGRPC bool 196} 197 198// GetRepoRank asks Sourcegraph for the rank vector of repoName. 199func (s *sourcegraphClient) GetRepoRank(ctx context.Context, repoName string) ([]float64, error) { 200 if s.useGRPC { 201 return s.getRepoRankGRPC(ctx, repoName) 202 } 203 204 return s.getRepoRankREST(ctx, repoName) 205} 206 207func (s *sourcegraphClient) getRepoRankGRPC(ctx context.Context, repoName string) ([]float64, error) { 208 resp, err := s.grpcClient.RepositoryRank(ctx, &proto.RepositoryRankRequest{Repository: repoName}) 209 if err != nil { 210 return nil, err 211 } 212 213 return resp.GetRank(), nil 214} 215 216func (s *sourcegraphClient) getRepoRankREST(ctx context.Context, repoName string) ([]float64, error) { 217 u := s.Root.ResolveReference(&url.URL{ 218 Path: "/.internal/ranks/" + strings.Trim(repoName, "/"), 219 }) 220 221 b, err := s.get(ctx, u) 222 if err != nil { 223 return nil, err 224 } 225 226 var ranks []float64 227 err = json.Unmarshal(b, &ranks) 228 if err != nil { 229 return nil, err 230 } 231 232 return ranks, nil 233} 234 235// GetDocumentRanks asks Sourcegraph for a mapping of file paths to rank 236// vectors. 237func (s *sourcegraphClient) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) { 238 if s.useGRPC { 239 return s.getDocumentRanksGRPC(ctx, repoName) 240 } 241 242 return s.getDocumentRanksREST(ctx, repoName) 243} 244 245func (s *sourcegraphClient) getDocumentRanksGRPC(ctx context.Context, repoName string) (RepoPathRanks, error) { 246 resp, err := s.grpcClient.DocumentRanks(ctx, &proto.DocumentRanksRequest{Repository: repoName}) 247 if err != nil { 248 return RepoPathRanks{}, err 249 } 250 251 var out RepoPathRanks 252 out.FromProto(resp) 253 254 return out, nil 255} 256 257func (s *sourcegraphClient) getDocumentRanksREST(ctx context.Context, repoName string) (RepoPathRanks, error) { 258 u := s.Root.ResolveReference(&url.URL{ 259 Path: "/.internal/ranks/" + strings.Trim(repoName, "/") + "/documents", 260 }) 261 262 b, err := s.get(ctx, u) 263 if err != nil { 264 return RepoPathRanks{}, err 265 } 266 267 ranks := RepoPathRanks{} 268 err = json.Unmarshal(b, &ranks) 269 if err != nil { 270 return RepoPathRanks{}, err 271 } 272 273 return ranks, nil 274} 275 276func (s *sourcegraphClient) get(ctx context.Context, u *url.URL) ([]byte, error) { 277 req, err := retryablehttp.NewRequestWithContext(ctx, "GET", u.String(), nil) 278 if err != nil { 279 return nil, err 280 } 281 282 resp, err := s.doRequest(req) 283 if err != nil { 284 return nil, err 285 } 286 defer resp.Body.Close() 287 288 if resp.StatusCode != http.StatusOK { 289 b, err := io.ReadAll(io.LimitReader(resp.Body, 1024)) 290 _ = resp.Body.Close() 291 if err != nil { 292 return nil, err 293 } 294 return nil, &url.Error{ 295 Op: "Get", 296 URL: u.String(), 297 Err: fmt.Errorf("%s: %s", resp.Status, string(b)), 298 } 299 } 300 301 b, err := io.ReadAll(resp.Body) 302 if err != nil { 303 return nil, err 304 } 305 306 return b, nil 307} 308 309func (s *sourcegraphClient) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 310 repos, err := s.listRepoIDs(ctx, indexed) 311 if err != nil { 312 return nil, fmt.Errorf("listRepoIDs: %w", err) 313 } 314 315 batchSize := s.BatchSize 316 if batchSize == 0 { 317 batchSize = 10_000 318 } 319 320 // Check if we should recalculate everything. 321 if time.Now().After(s.configFingerprintReset) { 322 // for every 500 repos we wait a minute. 2021-12-15 on sourcegraph.com 323 // this works out to every 100 minutes. 324 next := time.Duration(len(indexed)) * time.Minute / 500 325 if min := 5 * time.Minute; next < min { 326 next = min 327 } 328 next += time.Duration(rand.Int63n(int64(next) / 4)) // jitter 329 s.configFingerprintReset = time.Now().Add(next) 330 331 s.configFingerprintProto = nil 332 s.configFingerprint = "" 333 } 334 335 // getIndexOptionsFunc is a function that can be used to get the index 336 // options for a set of repos (while properly handling any configuration fingerprint 337 // changes). 338 // 339 // In general, this function provides a consistent fingerprint for each batch call, 340 // and updates the server state with the new fingerprint. If any of the batch calls 341 // fail, the old fingerprint is restored. 342 type getIndexOptionsFunc func(repos ...uint32) ([]indexOptionsItem, error) 343 344 // default to REST 345 mkGetIndexOptionsFunc := func(tr trace.Trace) getIndexOptionsFunc { 346 startingFingerPrint := s.configFingerprint 347 tr.LazyPrintf("fingerprint: %s", startingFingerPrint) 348 349 first := true 350 return func(repos ...uint32) ([]indexOptionsItem, error) { 351 options, nextFingerPrint, err := s.getIndexOptionsREST(startingFingerPrint, repos...) 352 if err != nil { 353 first = false 354 s.configFingerprint = startingFingerPrint 355 356 return nil, err 357 } 358 359 if first { 360 first = false 361 s.configFingerprint = nextFingerPrint 362 363 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint) 364 } 365 366 return options, nil 367 } 368 } 369 370 // If we enabled GRPC, use our gRPC client instead. 371 if s.useGRPC { 372 mkGetIndexOptionsFunc = func(tr trace.Trace) getIndexOptionsFunc { 373 startingFingerPrint := s.configFingerprintProto 374 tr.LazyPrintf("fingerprint: %s", startingFingerPrint.String()) 375 376 first := true 377 return func(repos ...uint32) ([]indexOptionsItem, error) { 378 options, nextFingerPrint, err := s.getIndexOptionsGRPC(ctx, startingFingerPrint, repos) 379 if err != nil { 380 first = false 381 s.configFingerprintProto = startingFingerPrint 382 383 return nil, err 384 } 385 386 if first { 387 first = false 388 s.configFingerprintProto = nextFingerPrint 389 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint.String()) 390 } 391 392 return options, nil 393 } 394 } 395 } 396 397 iterate := func(f func(IndexOptions)) { 398 start := time.Now() 399 tr := trace.New("getIndexOptions", "") 400 tr.LazyPrintf("getting index options for %d repos", len(repos)) 401 402 defer func() { 403 metricResolveRevisionsDuration.Observe(time.Since(start).Seconds()) 404 tr.Finish() 405 }() 406 407 getIndexOptions := mkGetIndexOptionsFunc(tr) 408 409 // We ask the frontend to get index options in batches. 410 for repos := range batched(repos, batchSize) { 411 start := time.Now() 412 options, err := getIndexOptions(repos...) 413 duration := time.Since(start) 414 415 if err != nil { 416 metricResolveRevisionDuration.WithLabelValues("false").Observe(duration.Seconds()) 417 tr.LazyPrintf("failed fetching options batch: %v", err) 418 tr.SetError() 419 420 continue 421 } 422 423 metricResolveRevisionDuration.WithLabelValues("true").Observe(duration.Seconds()) 424 425 for _, o := range options { 426 metricGetIndexOptions.Inc() 427 428 if o.Error != "" { 429 metricGetIndexOptionsError.Inc() 430 tr.LazyPrintf("failed fetching options for %v: %v", o.Name, o.Error) 431 tr.SetError() 432 433 continue 434 } 435 f(o.IndexOptions) 436 } 437 } 438 } 439 440 return &SourcegraphListResult{ 441 IDs: repos, 442 IterateIndexOptions: iterate, 443 }, nil 444} 445 446func (s *sourcegraphClient) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 447 batchSize := s.BatchSize 448 if batchSize == 0 { 449 batchSize = 10_000 450 } 451 452 getIndexOptions := func(repos ...uint32) ([]indexOptionsItem, error) { 453 opts, _, err := s.getIndexOptionsREST("", repos...) 454 return opts, err 455 } 456 457 if s.useGRPC { 458 getIndexOptions = func(repos ...uint32) ([]indexOptionsItem, error) { 459 opts, _, err := s.getIndexOptionsGRPC(context.Background(), nil, repos) 460 return opts, err 461 } 462 } 463 464 for repos := range batched(repos, batchSize) { 465 opts, err := getIndexOptions(repos...) 466 if err != nil { 467 for _, id := range repos { 468 onError(id, err) 469 } 470 continue 471 } 472 for _, o := range opts { 473 if o.RepoID > 0 && o.Error != "" { 474 onError(o.RepoID, errors.New(o.Error)) 475 } 476 if o.Error == "" { 477 onSuccess(o.IndexOptions) 478 } 479 } 480 } 481} 482 483// indexOptionsItem wraps IndexOptions to also include an error returned by 484// the API. 485type indexOptionsItem struct { 486 IndexOptions 487 Error string 488} 489 490func (o *indexOptionsItem) FromProto(x *proto.ZoektIndexOptions) { 491 branches := make([]zoekt.RepositoryBranch, 0, len(x.Branches)) 492 for _, b := range x.GetBranches() { 493 branches = append(branches, zoekt.RepositoryBranch{ 494 Name: b.GetName(), 495 Version: b.GetVersion(), 496 }) 497 } 498 499 item := indexOptionsItem{} 500 languageMap := make(map[string]uint8) 501 502 for _, lang := range x.GetLanguageMap() { 503 languageMap[lang.GetLanguage()] = uint8(lang.GetCtags().Number()) 504 } 505 506 item.IndexOptions = IndexOptions{ 507 RepoID: uint32(x.GetRepoId()), 508 LargeFiles: x.GetLargeFiles(), 509 Symbols: x.GetSymbols(), 510 Branches: branches, 511 Name: x.GetName(), 512 513 Priority: x.GetPriority(), 514 515 DocumentRanksVersion: x.GetDocumentRanksVersion(), 516 517 Public: x.GetPublic(), 518 Fork: x.GetFork(), 519 Archived: x.GetArchived(), 520 521 LanguageMap: languageMap, 522 } 523 524 item.Error = x.GetError() 525 526 *o = item 527} 528 529func (o *indexOptionsItem) ToProto() *proto.ZoektIndexOptions { 530 branches := make([]*proto.ZoektRepositoryBranch, 0, len(o.Branches)) 531 for _, b := range o.Branches { 532 branches = append(branches, &proto.ZoektRepositoryBranch{ 533 Name: b.Name, 534 Version: b.Version, 535 }) 536 } 537 538 languageMap := make([]*proto.LanguageMapping, 0, len(o.LanguageMap)) 539 540 for lang, parser := range o.LanguageMap { 541 languageMap = append(languageMap, &proto.LanguageMapping{ 542 Language: lang, 543 Ctags: proto.CTagsParserType(parser), 544 }) 545 } 546 547 return &proto.ZoektIndexOptions{ 548 RepoId: int32(o.RepoID), 549 LargeFiles: o.LargeFiles, 550 Symbols: o.Symbols, 551 Branches: branches, 552 Name: o.Name, 553 554 Priority: o.Priority, 555 556 DocumentRanksVersion: o.DocumentRanksVersion, 557 558 Public: o.Public, 559 Fork: o.Fork, 560 Archived: o.Archived, 561 562 Error: o.Error, 563 564 LanguageMap: languageMap, 565 } 566} 567 568func (s *sourcegraphClient) getIndexOptionsGRPC(ctx context.Context, fingerprint *proto.Fingerprint, repos []uint32) ([]indexOptionsItem, *proto.Fingerprint, error) { 569 repoIDs := make([]int32, 0, len(repos)) 570 for _, id := range repos { 571 repoIDs = append(repoIDs, int32(id)) 572 } 573 574 req := proto.SearchConfigurationRequest{ 575 RepoIds: repoIDs, 576 Fingerprint: fingerprint, 577 } 578 579 response, err := s.grpcClient.SearchConfiguration(ctx, &req) 580 if err != nil { 581 return nil, nil, err 582 } 583 584 protoItems := response.GetUpdatedOptions() 585 items := make([]indexOptionsItem, 0, len(protoItems)) 586 for _, x := range protoItems { 587 var item indexOptionsItem 588 item.FromProto(x) 589 item.IndexOptions.CloneURL = s.getCloneURL(item.Name) 590 591 items = append(items, item) 592 } 593 594 return items, response.GetFingerprint(), nil 595} 596 597const fingerprintHeader = "X-Sourcegraph-Config-Fingerprint" 598 599func (s *sourcegraphClient) getIndexOptionsREST(fingerprint string, repos ...uint32) ([]indexOptionsItem, string, error) { 600 u := s.Root.ResolveReference(&url.URL{ 601 Path: "/.internal/search/configuration", 602 }) 603 604 repoIDs := make([]string, len(repos)) 605 for i, id := range repos { 606 repoIDs[i] = strconv.Itoa(int(id)) 607 } 608 data := url.Values{"repoID": repoIDs} 609 req, err := retryablehttp.NewRequest("POST", u.String(), []byte(data.Encode())) 610 if err != nil { 611 return nil, "", err 612 } 613 req.Header.Set("Content-Type", "application/x-www-form-urlencoded") 614 if fingerprint != "" { 615 req.Header.Set(fingerprintHeader, fingerprint) 616 } 617 618 resp, err := s.doRequest(req) 619 if err != nil { 620 return nil, "", err 621 } 622 defer resp.Body.Close() 623 624 if resp.StatusCode != http.StatusOK { 625 b, err := io.ReadAll(io.LimitReader(resp.Body, 1024)) 626 _ = resp.Body.Close() 627 if err != nil { 628 return nil, "", err 629 } 630 return nil, "", &url.Error{ 631 Op: "Get", 632 URL: u.String(), 633 Err: fmt.Errorf("%s: %s", resp.Status, string(b)), 634 } 635 } 636 637 dec := json.NewDecoder(resp.Body) 638 var opts []indexOptionsItem 639 for { 640 var opt indexOptionsItem 641 err := dec.Decode(&opt) 642 if err == io.EOF { 643 break 644 } 645 if err != nil { 646 return nil, "", fmt.Errorf("error decoding body: %w", err) 647 } 648 opt.CloneURL = s.getCloneURL(opt.Name) 649 opts = append(opts, opt) 650 } 651 652 return opts, resp.Header.Get(fingerprintHeader), nil 653} 654 655func (s *sourcegraphClient) getCloneURL(name string) string { 656 return s.Root.ResolveReference(&url.URL{Path: path.Join("/.internal/git", name)}).String() 657} 658 659func (s *sourcegraphClient) listRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) { 660 if s.useGRPC { 661 return s.listRepoIDsGRPC(ctx, indexed) 662 } 663 664 return s.listRepoIDsREST(ctx, indexed) 665} 666 667func (s *sourcegraphClient) listRepoIDsGRPC(ctx context.Context, indexed []uint32) ([]uint32, error) { 668 var request proto.ListRequest 669 request.Hostname = s.Hostname 670 request.IndexedIds = make([]int32, 0, len(indexed)) 671 for _, id := range indexed { 672 request.IndexedIds = append(request.IndexedIds, int32(id)) 673 } 674 675 response, err := s.grpcClient.List(ctx, &request) 676 if err != nil { 677 return nil, err 678 } 679 680 repoIDs := make([]uint32, 0, len(response.RepoIds)) 681 for _, id := range response.RepoIds { 682 repoIDs = append(repoIDs, uint32(id)) 683 } 684 685 return repoIDs, nil 686} 687 688func (s *sourcegraphClient) listRepoIDsREST(_ context.Context, indexed []uint32) ([]uint32, error) { 689 body, err := json.Marshal(&struct { 690 Hostname string 691 IndexedIDs []uint32 692 }{ 693 Hostname: s.Hostname, 694 IndexedIDs: indexed, 695 }) 696 if err != nil { 697 return nil, err 698 } 699 700 u := s.Root.ResolveReference(&url.URL{Path: "/.internal/repos/index"}) 701 req, err := retryablehttp.NewRequest(http.MethodPost, u.String(), bytes.NewReader(body)) 702 if err != nil { 703 return nil, err 704 } 705 req.Header.Set("Content-Type", "application/json; charset=utf-8") 706 707 resp, err := s.doRequest(req) 708 if err != nil { 709 return nil, err 710 } 711 defer resp.Body.Close() 712 713 if resp.StatusCode != http.StatusOK { 714 return nil, fmt.Errorf("failed to list repositories: status %s", resp.Status) 715 } 716 717 var data struct { 718 RepoIDs []uint32 719 } 720 err = json.NewDecoder(resp.Body).Decode(&data) 721 if err != nil { 722 return nil, err 723 } 724 725 return data.RepoIDs, nil 726} 727 728type indexStatus struct { 729 RepoID uint32 730 Branches []zoekt.RepositoryBranch 731 IndexTimeUnix int64 732} 733 734type updateIndexStatusRequest struct { 735 Repositories []indexStatus 736} 737 738func (u *updateIndexStatusRequest) ToProto() *proto.UpdateIndexStatusRequest { 739 repositories := make([]*proto.UpdateIndexStatusRequest_Repository, 0, len(u.Repositories)) 740 741 for _, repo := range u.Repositories { 742 branches := make([]*proto.ZoektRepositoryBranch, 0, len(repo.Branches)) 743 744 for _, branch := range repo.Branches { 745 branches = append(branches, &proto.ZoektRepositoryBranch{ 746 Name: branch.Name, 747 Version: branch.Version, 748 }) 749 } 750 751 repositories = append(repositories, &proto.UpdateIndexStatusRequest_Repository{ 752 RepoId: repo.RepoID, 753 Branches: branches, 754 IndexTimeUnix: repo.IndexTimeUnix, 755 }) 756 } 757 758 return &proto.UpdateIndexStatusRequest{ 759 Repositories: repositories, 760 } 761} 762 763func (u *updateIndexStatusRequest) FromProto(x *proto.UpdateIndexStatusRequest) { 764 protoRepositories := x.GetRepositories() 765 repositories := make([]indexStatus, 0, len(protoRepositories)) 766 767 for _, repo := range x.GetRepositories() { 768 protoBranches := repo.GetBranches() 769 branches := make([]zoekt.RepositoryBranch, 0, len(protoBranches)) 770 771 for _, branch := range repo.GetBranches() { 772 branches = append(branches, zoekt.RepositoryBranch{ 773 Name: branch.GetName(), 774 Version: branch.GetVersion(), 775 }) 776 } 777 778 repositories = append(repositories, indexStatus{ 779 RepoID: repo.GetRepoId(), 780 Branches: branches, 781 IndexTimeUnix: repo.GetIndexTimeUnix(), 782 }) 783 } 784 785 *u = updateIndexStatusRequest{ 786 Repositories: repositories, 787 } 788} 789 790// UpdateIndexStatus sends a request to Sourcegraph to confirm that the given 791// repositories have been indexed. 792func (s *sourcegraphClient) UpdateIndexStatus(repositories []indexStatus) error { 793 r := updateIndexStatusRequest{Repositories: repositories} 794 795 if s.useGRPC { 796 return s.updateIndexStatusGRPC(r) 797 } 798 799 return s.updateIndexStatusREST(r) 800} 801 802func (s *sourcegraphClient) updateIndexStatusGRPC(r updateIndexStatusRequest) error { 803 request := r.ToProto() 804 _, err := s.grpcClient.UpdateIndexStatus(context.Background(), request) 805 806 if err != nil { 807 return fmt.Errorf("failed to update index status: %w", err) 808 } 809 810 return nil 811} 812 813func (s *sourcegraphClient) updateIndexStatusREST(r updateIndexStatusRequest) error { 814 payload, err := json.Marshal(r) 815 if err != nil { 816 return err 817 } 818 819 u := s.Root.ResolveReference(&url.URL{Path: "/.internal/search/index-status"}) 820 req, err := retryablehttp.NewRequest(http.MethodPost, u.String(), bytes.NewReader(payload)) 821 if err != nil { 822 return err 823 } 824 req.Header.Set("Content-Type", "application/json; charset=utf-8") 825 826 resp, err := s.doRequest(req) 827 if err != nil { 828 return err 829 } 830 defer resp.Body.Close() 831 832 if resp.StatusCode != http.StatusOK { 833 return fmt.Errorf("failed to update index status: status %s", resp.Status) 834 } 835 836 return nil 837} 838 839// doRequest executes the provided request after adding the appropriate headers 840// for interacting with a Sourcegraph instance. 841func (s *sourcegraphClient) doRequest(req *retryablehttp.Request) (*http.Response, error) { 842 // Make all requests as an internal user. 843 // 844 // Should match github.com/sourcegraph/sourcegraph/internal/actor.headerKeyActorUID 845 // and github.com/sourcegraph/sourcegraph/internal/actor.headerValueInternalActor 846 req.Header.Set("X-Sourcegraph-Actor-UID", "internal") 847 return s.restClient.Do(req) 848} 849 850type sourcegraphFake struct { 851 RootDir string 852 Log *log.Logger 853} 854 855// GetRepoRank expects a file with exactly 1 line containing a comma separated 856// list of float64 as ranks. 857func (sf sourcegraphFake) GetRepoRank(ctx context.Context, repoName string) ([]float64, error) { 858 dir := filepath.Join(sf.RootDir, filepath.FromSlash(repoName)) 859 860 b, err := os.ReadFile(filepath.Join(dir, "SG_REPO_RANKS")) 861 if err != nil { 862 return nil, err 863 } 864 865 return floats64(string(b)), nil 866} 867 868// GetDocumentRanks expects a file where each line has the following format: 869// path<tab>rank... where rank is a float64. 870func (sf sourcegraphFake) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) { 871 dir := filepath.Join(sf.RootDir, filepath.FromSlash(repoName)) 872 873 fd, err := os.Open(filepath.Join(dir, "SG_DOCUMENT_RANKS")) 874 if err != nil { 875 return RepoPathRanks{}, err 876 } 877 878 ranks := RepoPathRanks{} 879 880 sum := 0.0 881 count := 0 882 scanner := bufio.NewScanner(fd) 883 for scanner.Scan() { 884 s := scanner.Text() 885 pathRanks := strings.Split(s, "\t") 886 if rank, err := strconv.ParseFloat(pathRanks[1], 64); err == nil { 887 ranks.Paths[pathRanks[0]] = rank 888 sum += rank 889 count++ 890 } 891 } 892 893 if err := scanner.Err(); err != nil { 894 return RepoPathRanks{}, err 895 } 896 897 ranks.MeanRank = sum / float64(count) 898 return ranks, nil 899} 900 901func floats64(s string) []float64 { 902 parts := strings.Split(s, ",") 903 904 var r []float64 905 for _, rank := range parts { 906 f, err := strconv.ParseFloat(rank, 64) 907 if err != nil { 908 return nil 909 } 910 r = append(r, f) 911 } 912 913 return r 914} 915 916func (sf sourcegraphFake) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 917 repos, err := sf.ListRepoIDs(ctx, indexed) 918 if err != nil { 919 return nil, err 920 } 921 922 iterate := func(f func(IndexOptions)) { 923 opts, err := sf.GetIndexOptions(repos...) 924 if err != nil { 925 sf.Log.Printf("WARN: ignoring GetIndexOptions error: %v", err) 926 } 927 for _, opt := range opts { 928 if opt.Error != "" { 929 sf.Log.Printf("WARN: ignoring GetIndexOptions error for %s: %v", opt.Name, opt.Error) 930 continue 931 } 932 f(opt.IndexOptions) 933 } 934 } 935 936 return &SourcegraphListResult{ 937 IDs: repos, 938 IterateIndexOptions: iterate, 939 }, nil 940} 941 942func (sf sourcegraphFake) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 943 opts, err := sf.GetIndexOptions(repos...) 944 if err != nil { 945 for _, id := range repos { 946 onError(id, err) 947 } 948 return 949 } 950 for _, o := range opts { 951 if o.RepoID > 0 && o.Error != "" { 952 onError(o.RepoID, errors.New(o.Error)) 953 } 954 if o.Error == "" { 955 onSuccess(o.IndexOptions) 956 } 957 } 958} 959 960func (sf sourcegraphFake) GetIndexOptions(repos ...uint32) ([]indexOptionsItem, error) { 961 reposIdx := map[uint32]int{} 962 for i, id := range repos { 963 reposIdx[id] = i 964 } 965 966 items := make([]indexOptionsItem, len(repos)) 967 err := sf.visitRepos(func(name string) { 968 idx, ok := reposIdx[sf.id(name)] 969 if !ok { 970 return 971 } 972 opts, err := sf.getIndexOptions(name) 973 if err != nil { 974 items[idx] = indexOptionsItem{Error: err.Error()} 975 } else { 976 items[idx] = indexOptionsItem{IndexOptions: opts} 977 } 978 }) 979 980 if err != nil { 981 return nil, err 982 } 983 984 for i := range items { 985 if items[i].Error == "" && items[i].RepoID == 0 { 986 items[i].Error = "not found" 987 } 988 } 989 990 return items, nil 991} 992 993func (sf sourcegraphFake) getIndexOptions(name string) (IndexOptions, error) { 994 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name)) 995 exists := func(p string) bool { 996 _, err := os.Stat(filepath.Join(dir, p)) 997 return err == nil 998 } 999 float := func(p string) float64 { 1000 b, _ := os.ReadFile(filepath.Join(dir, p)) 1001 f, _ := strconv.ParseFloat(string(bytes.TrimSpace(b)), 64) 1002 return f 1003 } 1004 1005 opts := IndexOptions{ 1006 RepoID: sf.id(name), 1007 Name: name, 1008 CloneURL: sf.getCloneURL(name), 1009 Symbols: true, 1010 1011 Public: !exists("SG_PRIVATE"), 1012 Fork: exists("SG_FORK"), 1013 Archived: exists("SG_ARCHIVED"), 1014 1015 Priority: float("SG_PRIORITY"), 1016 } 1017 1018 if stat, err := os.Stat(filepath.Join(dir, "SG_DOCUMENT_RANKS")); err == nil { 1019 opts.DocumentRanksVersion = stat.ModTime().String() 1020 } 1021 1022 branches, err := sf.getBranches(name) 1023 if err != nil { 1024 return opts, err 1025 } 1026 opts.Branches = branches 1027 1028 return opts, nil 1029} 1030 1031func (sf sourcegraphFake) getBranches(name string) ([]zoekt.RepositoryBranch, error) { 1032 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name)) 1033 repo, err := git.PlainOpen(dir) 1034 if err != nil { 1035 return nil, err 1036 } 1037 1038 cfg, err := repo.Config() 1039 if err != nil { 1040 return nil, err 1041 } 1042 1043 sec := cfg.Raw.Section("zoekt") 1044 branches := sec.Options.GetAll("branch") 1045 if len(branches) == 0 { 1046 branches = append(branches, "HEAD") 1047 } 1048 1049 rBranches := make([]zoekt.RepositoryBranch, 0, len(branches)) 1050 for _, branch := range branches { 1051 cmd := exec.Command("git", "rev-parse", branch) 1052 cmd.Dir = dir 1053 if b, err := cmd.Output(); err != nil { 1054 sf.Log.Printf("WARN: Could not get branch %s/%s", name, branch) 1055 } else { 1056 version := string(bytes.TrimSpace(b)) 1057 rBranches = append(rBranches, zoekt.RepositoryBranch{ 1058 Name: branch, 1059 Version: version, 1060 }) 1061 } 1062 } 1063 1064 if len(rBranches) == 0 { 1065 return nil, fmt.Errorf("WARN: Could not get any branch revisions for repo %s", name) 1066 } 1067 1068 return rBranches, nil 1069} 1070 1071func (sf sourcegraphFake) id(name string) uint32 { 1072 // allow overriding the ID. 1073 idPath := filepath.Join(sf.RootDir, filepath.FromSlash(name), "SG_ID") 1074 if b, _ := os.ReadFile(idPath); len(b) > 0 { 1075 id, err := strconv.Atoi(strings.TrimSpace(string(b))) 1076 if err == nil { 1077 return uint32(id) 1078 } 1079 } 1080 return fakeID(name) 1081} 1082 1083func (sf sourcegraphFake) getCloneURL(name string) string { 1084 return filepath.Join(sf.RootDir, filepath.FromSlash(name)) 1085} 1086 1087func (sf sourcegraphFake) ListRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) { 1088 var repos []uint32 1089 err := sf.visitRepos(func(name string) { 1090 repos = append(repos, sf.id(name)) 1091 }) 1092 return repos, err 1093} 1094 1095func (sf sourcegraphFake) visitRepos(visit func(name string)) error { 1096 return filepath.Walk(sf.RootDir, func(path string, fi os.FileInfo, fileErr error) error { 1097 if fileErr != nil { 1098 sf.Log.Printf("WARN: ignoring error searching %s: %v", path, fileErr) 1099 return nil 1100 } 1101 if !fi.IsDir() { 1102 return nil 1103 } 1104 1105 gitdir := filepath.Join(path, ".git") 1106 if fi, err := os.Stat(gitdir); err != nil || !fi.IsDir() { 1107 return nil 1108 } 1109 1110 subpath, err := filepath.Rel(sf.RootDir, path) 1111 if err != nil { 1112 // According to WalkFunc docs, path is always filepath.Join(root, 1113 // subpath). So Rel should always work. 1114 return fmt.Errorf("filepath.Walk returned %s which is not relative to %s: %w", path, sf.RootDir, err) 1115 } 1116 1117 name := filepath.ToSlash(subpath) 1118 visit(name) 1119 1120 return filepath.SkipDir 1121 }) 1122} 1123 1124func (s sourcegraphFake) UpdateIndexStatus(repositories []indexStatus) error { 1125 // noop 1126 return nil 1127} 1128 1129// fakeID returns a deterministic ID based on name. Used for fakes and tests. 1130func fakeID(name string) uint32 { 1131 // magic at the end is to ensure we get a positive number when casting. 1132 return uint32(crc32.ChecksumIEEE([]byte(name))%(1<<31-1) + 1) 1133} 1134 1135type sourcegraphNop struct{} 1136 1137func (s sourcegraphNop) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) { 1138 return nil, nil 1139} 1140 1141func (s sourcegraphNop) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) { 1142 return 1143} 1144 1145func (s sourcegraphNop) GetRepoRank(ctx context.Context, repoName string) ([]float64, error) { 1146 return nil, nil 1147} 1148 1149func (s sourcegraphNop) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) { 1150 return RepoPathRanks{}, nil 1151} 1152 1153func (s sourcegraphNop) UpdateIndexStatus(repositories []indexStatus) error { 1154 return nil 1155} 1156 1157type RepoPathRanks struct { 1158 MeanRank float64 `json:"mean_reference_count"` 1159 Paths map[string]float64 `json:"paths"` 1160} 1161 1162func (r *RepoPathRanks) FromProto(x *proto.DocumentRanksResponse) { 1163 protoPaths := x.GetPaths() 1164 ranks := make(map[string]float64, len(protoPaths)) 1165 for filePath, rank := range protoPaths { 1166 ranks[filePath] = rank 1167 } 1168 1169 *r = RepoPathRanks{ 1170 MeanRank: x.GetMeanRank(), 1171 Paths: ranks, 1172 } 1173} 1174 1175func (r *RepoPathRanks) ToProto() *proto.DocumentRanksResponse { 1176 paths := make(map[string]float64, len(r.Paths)) 1177 for filePath, rank := range r.Paths { 1178 paths[filePath] = rank 1179 } 1180 1181 return &proto.DocumentRanksResponse{ 1182 MeanRank: r.MeanRank, 1183 Paths: paths, 1184 } 1185} 1186 1187type noopGRPCClient struct{} 1188 1189func (n noopGRPCClient) SearchConfiguration(ctx context.Context, in *proto.SearchConfigurationRequest, opts ...grpc.CallOption) (*proto.SearchConfigurationResponse, error) { 1190 return nil, fmt.Errorf("grpc client not enabled") 1191} 1192 1193func (n noopGRPCClient) List(ctx context.Context, in *proto.ListRequest, opts ...grpc.CallOption) (*proto.ListResponse, error) { 1194 return nil, fmt.Errorf("grpc client not enabled") 1195} 1196 1197func (n noopGRPCClient) RepositoryRank(ctx context.Context, in *proto.RepositoryRankRequest, opts ...grpc.CallOption) (*proto.RepositoryRankResponse, error) { 1198 return nil, fmt.Errorf("grpc client not enabled") 1199} 1200 1201func (n noopGRPCClient) DocumentRanks(ctx context.Context, in *proto.DocumentRanksRequest, opts ...grpc.CallOption) (*proto.DocumentRanksResponse, error) { 1202 return nil, fmt.Errorf("grpc client not enabled") 1203} 1204 1205func (n noopGRPCClient) UpdateIndexStatus(ctx context.Context, in *proto.UpdateIndexStatusRequest, opts ...grpc.CallOption) (*proto.UpdateIndexStatusResponse, error) { 1206 return nil, fmt.Errorf("grpc client not enabled") 1207} 1208 1209var _ proto.ZoektConfigurationServiceClient = noopGRPCClient{}