fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "hash/crc32"
11 "io"
12 "log"
13 "math/rand"
14 "net/http"
15 "net/url"
16 "os"
17 "os/exec"
18 "path"
19 "path/filepath"
20 "strconv"
21 "strings"
22 "time"
23
24 "github.com/go-git/go-git/v5"
25 retryablehttp "github.com/hashicorp/go-retryablehttp"
26 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
27 "github.com/sourcegraph/zoekt/ctags"
28 "golang.org/x/net/trace"
29 "google.golang.org/grpc"
30
31 "github.com/sourcegraph/zoekt"
32)
33
34// SourcegraphListResult is the return value of Sourcegraph.List. It is its
35// own type since internally we batch the calculation of index options. This
36// is exposed via IterateIndexOptions.
37//
38// This type has state and is coupled to the Sourcegraph implementation.
39type SourcegraphListResult struct {
40 // IDs is the set of Sourcegraph repository IDs that this replica needs
41 // to index.
42 IDs []uint32
43
44 // IterateIndexOptions best effort resolves the IndexOptions for RepoIDs. If
45 // any repository fails it internally logs. It uses the "config fingerprint"
46 // to reduce the amount of work done. This means we only resolve options for
47 // repositories which have been mutated since the last Sourcegraph.List
48 // call.
49 //
50 // Note: this has a side-effect of setting a the "config fingerprint". The
51 // config fingerprint means we only calculate index options for repositories
52 // that have changed since the last call to IterateIndexOptions. If you want
53 // to force calculation of index options use
54 // Sourcegraph.ForceIterateIndexOptions.
55 //
56 // Note: This should not be called concurrently with the Sourcegraph client.
57 IterateIndexOptions func(func(IndexOptions))
58}
59
60// Sourcegraph represents the Sourcegraph service. It informs the indexserver
61// what to index and which options to use.
62type Sourcegraph interface {
63 // List returns a list of repository IDs to index as well as a facility to
64 // fetch the indexing options.
65 //
66 // Note: The return value is not safe to use concurrently with future calls
67 // to List.
68 List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error)
69
70 // ForceIterateIndexOptions will best-effort calculate the index options for
71 // all repos. For each repo it will call either onSuccess or onError. This
72 // is the forced version of IterateIndexOptions, so will always calculate
73 // options for each id in repos.
74 ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32)
75
76 // GetDocumentRanks returns a map from paths within the given repo to their
77 // rank vectors. Paths are assumed to be ordered by each pairwise component of
78 // the resulting vector, higher ranks coming earlier
79 GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error)
80
81 // UpdateIndexStatus sends a request to Sourcegraph to confirm that the
82 // given repositories have been indexed.
83 UpdateIndexStatus(repositories []indexStatus) error
84}
85
86type SourcegraphClientOption func(*sourcegraphClient)
87
88// WithBatchSize controls how many repository configurations we request a time.
89// If BatchSize is 0, we default to requesting 10,000 repositories at once.
90func WithBatchSize(batchSize int) SourcegraphClientOption {
91 return func(c *sourcegraphClient) {
92 c.BatchSize = batchSize
93 }
94}
95
96// WithShouldUseGRPC enables or disables the use of gRPC for communicating with Sourcegraph.
97func WithShouldUseGRPC(useGRPC bool) SourcegraphClientOption {
98 return func(c *sourcegraphClient) {
99 c.useGRPC = useGRPC
100 }
101}
102
103// WithGRPCClient sets the gRPC client to use for communicating with Sourcegraph.
104func WithGRPCClient(client proto.ZoektConfigurationServiceClient) SourcegraphClientOption {
105 return func(c *sourcegraphClient) {
106 c.grpcClient = client
107 }
108}
109
110func newSourcegraphClient(rootURL *url.URL, hostname string, opts ...SourcegraphClientOption) *sourcegraphClient {
111 httpClient := retryablehttp.NewClient()
112 httpClient.Logger = debug
113
114 // Sourcegraph might return an error message in the body if StatusCode==500. The
115 // default behavior of the go-retryablehttp restClient is to drain the body and not
116 // to propagate the error. Hence, we call ErrorPropagatedRetryPolicy instead of
117 // DefaultRetryPolicy and augment the error with the response body if possible.
118 httpClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) {
119 shouldRetry, checkErr := retryablehttp.ErrorPropagatedRetryPolicy(ctx, resp, err)
120
121 if resp != nil && resp.StatusCode == http.StatusInternalServerError {
122 if b, e := io.ReadAll(resp.Body); e == nil {
123 checkErr = fmt.Errorf("%w: body=%q", checkErr, string(b))
124 }
125 }
126
127 return shouldRetry, checkErr
128 }
129
130 client := &sourcegraphClient{
131 Root: rootURL,
132 restClient: httpClient,
133 Hostname: hostname,
134 BatchSize: 0,
135 grpcClient: noopGRPCClient{},
136 useGRPC: false, // disable gRPC by default
137 }
138
139 for _, opt := range opts {
140 opt(client)
141 }
142
143 return client
144
145}
146
147// sourcegraphClient contains methods which interact with the sourcegraph API.
148type sourcegraphClient struct {
149 // Root is the base URL for the Sourcegraph instance to index. Normally
150 // http://sourcegraph-frontend-internal or http://localhost:3090.
151 Root *url.URL
152
153 // Hostname is the name we advertise to Sourcegraph when asking for the
154 // list of repositories to index.
155 Hostname string
156
157 // BatchSize is how many repository configurations we request at once. If
158 // zero a value of 10000 is used.
159 BatchSize int
160
161 // restClient is used to make requests to the Sourcegraph instance. Prefer to
162 // use .doRequest() to ensure the appropriate headers are set.
163 restClient *retryablehttp.Client
164
165 // grpcClient is used to make requests to the Sourcegraph instance if gRPC is enabled.
166 grpcClient proto.ZoektConfigurationServiceClient
167
168 // configFingerprint is the last config fingerprint returned from
169 // Sourcegraph. It can be used for future calls to the configuration
170 // endpoint.
171 //
172 // configFingerprint is mutually exclusive with configFingerprintProto - this field
173 // will only be used if gRPC is disabled.
174 configFingerprint string
175
176 // configFingerprintProto is the last config fingerprint (as GRPC) returned from
177 // Sourcegraph. It can be used for future calls to the configuration
178 // endpoint.
179 //
180 // configFingerprintProto is mutually exclusive with configFingerprint - this field
181 // will only be used if gRPC is enabled.
182 configFingerprintProto *proto.Fingerprint
183
184 // configFingerprintReset tracks when we should zero out the
185 // configFingerprint. We want to periodically do this just in case our
186 // configFingerprint logic is faulty. When it is cleared out, we fallback to
187 // calculating everything.
188 configFingerprintReset time.Time
189
190 // useGRPC indicates whether we should use a gRPC client to communicate with Sourcegraph.
191 useGRPC bool
192}
193
194// GetDocumentRanks asks Sourcegraph for a mapping of file paths to rank
195// vectors.
196func (s *sourcegraphClient) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) {
197 if s.useGRPC {
198 return s.getDocumentRanksGRPC(ctx, repoName)
199 }
200
201 return s.getDocumentRanksREST(ctx, repoName)
202}
203
204func (s *sourcegraphClient) getDocumentRanksGRPC(ctx context.Context, repoName string) (RepoPathRanks, error) {
205 resp, err := s.grpcClient.DocumentRanks(ctx, &proto.DocumentRanksRequest{Repository: repoName})
206 if err != nil {
207 return RepoPathRanks{}, err
208 }
209
210 var out RepoPathRanks
211 out.FromProto(resp)
212
213 return out, nil
214}
215
216func (s *sourcegraphClient) getDocumentRanksREST(ctx context.Context, repoName string) (RepoPathRanks, error) {
217 u := s.Root.ResolveReference(&url.URL{
218 Path: "/.internal/ranks/" + strings.Trim(repoName, "/") + "/documents",
219 })
220
221 b, err := s.get(ctx, u)
222 if err != nil {
223 return RepoPathRanks{}, err
224 }
225
226 ranks := RepoPathRanks{}
227 err = json.Unmarshal(b, &ranks)
228 if err != nil {
229 return RepoPathRanks{}, err
230 }
231
232 return ranks, nil
233}
234
235func (s *sourcegraphClient) get(ctx context.Context, u *url.URL) ([]byte, error) {
236 req, err := retryablehttp.NewRequestWithContext(ctx, "GET", u.String(), nil)
237 if err != nil {
238 return nil, err
239 }
240
241 resp, err := s.doRequest(req)
242 if err != nil {
243 return nil, err
244 }
245 defer resp.Body.Close()
246
247 if resp.StatusCode != http.StatusOK {
248 b, err := io.ReadAll(io.LimitReader(resp.Body, 1024))
249 _ = resp.Body.Close()
250 if err != nil {
251 return nil, err
252 }
253 return nil, &url.Error{
254 Op: "Get",
255 URL: u.String(),
256 Err: fmt.Errorf("%s: %s", resp.Status, string(b)),
257 }
258 }
259
260 b, err := io.ReadAll(resp.Body)
261 if err != nil {
262 return nil, err
263 }
264
265 return b, nil
266}
267
268func (s *sourcegraphClient) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
269 repos, err := s.listRepoIDs(ctx, indexed)
270 if err != nil {
271 return nil, fmt.Errorf("listRepoIDs: %w", err)
272 }
273
274 batchSize := s.BatchSize
275 if batchSize == 0 {
276 batchSize = 10_000
277 }
278
279 // Check if we should recalculate everything.
280 if time.Now().After(s.configFingerprintReset) {
281 // for every 500 repos we wait a minute. 2021-12-15 on sourcegraph.com
282 // this works out to every 100 minutes.
283 next := time.Duration(len(indexed)) * time.Minute / 500
284 if min := 5 * time.Minute; next < min {
285 next = min
286 }
287 next += time.Duration(rand.Int63n(int64(next) / 4)) // jitter
288 s.configFingerprintReset = time.Now().Add(next)
289
290 s.configFingerprintProto = nil
291 s.configFingerprint = ""
292 }
293
294 // getIndexOptionsFunc is a function that can be used to get the index
295 // options for a set of repos (while properly handling any configuration fingerprint
296 // changes).
297 //
298 // In general, this function provides a consistent fingerprint for each batch call,
299 // and updates the server state with the new fingerprint. If any of the batch calls
300 // fail, the old fingerprint is restored.
301 type getIndexOptionsFunc func(repos ...uint32) ([]indexOptionsItem, error)
302
303 // default to REST
304 mkGetIndexOptionsFunc := func(tr trace.Trace) getIndexOptionsFunc {
305 startingFingerPrint := s.configFingerprint
306 tr.LazyPrintf("fingerprint: %s", startingFingerPrint)
307
308 first := true
309 return func(repos ...uint32) ([]indexOptionsItem, error) {
310 options, nextFingerPrint, err := s.getIndexOptionsREST(startingFingerPrint, repos...)
311 if err != nil {
312 first = false
313 s.configFingerprint = startingFingerPrint
314
315 return nil, err
316 }
317
318 if first {
319 first = false
320 s.configFingerprint = nextFingerPrint
321
322 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint)
323 }
324
325 return options, nil
326 }
327 }
328
329 // If we enabled GRPC, use our gRPC client instead.
330 if s.useGRPC {
331 mkGetIndexOptionsFunc = func(tr trace.Trace) getIndexOptionsFunc {
332 startingFingerPrint := s.configFingerprintProto
333 tr.LazyPrintf("fingerprint: %s", startingFingerPrint.String())
334
335 first := true
336 return func(repos ...uint32) ([]indexOptionsItem, error) {
337 options, nextFingerPrint, err := s.getIndexOptionsGRPC(ctx, startingFingerPrint, repos)
338 if err != nil {
339 first = false
340 s.configFingerprintProto = startingFingerPrint
341
342 return nil, err
343 }
344
345 if first {
346 first = false
347 s.configFingerprintProto = nextFingerPrint
348 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint.String())
349 }
350
351 return options, nil
352 }
353 }
354 }
355
356 iterate := func(f func(IndexOptions)) {
357 start := time.Now()
358 tr := trace.New("getIndexOptions", "")
359 tr.LazyPrintf("getting index options for %d repos", len(repos))
360
361 defer func() {
362 metricResolveRevisionsDuration.Observe(time.Since(start).Seconds())
363 tr.Finish()
364 }()
365
366 getIndexOptions := mkGetIndexOptionsFunc(tr)
367
368 // We ask the frontend to get index options in batches.
369 for repos := range batched(repos, batchSize) {
370 start := time.Now()
371 options, err := getIndexOptions(repos...)
372 duration := time.Since(start)
373
374 if err != nil {
375 metricResolveRevisionDuration.WithLabelValues("false").Observe(duration.Seconds())
376 tr.LazyPrintf("failed fetching options batch: %v", err)
377 tr.SetError()
378
379 continue
380 }
381
382 metricResolveRevisionDuration.WithLabelValues("true").Observe(duration.Seconds())
383
384 for _, o := range options {
385 metricGetIndexOptions.Inc()
386
387 if o.Error != "" {
388 metricGetIndexOptionsError.Inc()
389 tr.LazyPrintf("failed fetching options for %v: %v", o.Name, o.Error)
390 tr.SetError()
391
392 continue
393 }
394 f(o.IndexOptions)
395 }
396 }
397 }
398
399 return &SourcegraphListResult{
400 IDs: repos,
401 IterateIndexOptions: iterate,
402 }, nil
403}
404
405func (s *sourcegraphClient) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
406 batchSize := s.BatchSize
407 if batchSize == 0 {
408 batchSize = 10_000
409 }
410
411 getIndexOptions := func(repos ...uint32) ([]indexOptionsItem, error) {
412 opts, _, err := s.getIndexOptionsREST("", repos...)
413 return opts, err
414 }
415
416 if s.useGRPC {
417 getIndexOptions = func(repos ...uint32) ([]indexOptionsItem, error) {
418 opts, _, err := s.getIndexOptionsGRPC(context.Background(), nil, repos)
419 return opts, err
420 }
421 }
422
423 for repos := range batched(repos, batchSize) {
424 opts, err := getIndexOptions(repos...)
425 if err != nil {
426 for _, id := range repos {
427 onError(id, err)
428 }
429 continue
430 }
431 for _, o := range opts {
432 if o.RepoID > 0 && o.Error != "" {
433 onError(o.RepoID, errors.New(o.Error))
434 }
435 if o.Error == "" {
436 onSuccess(o.IndexOptions)
437 }
438 }
439 }
440}
441
442// indexOptionsItem wraps IndexOptions to also include an error returned by
443// the API.
444type indexOptionsItem struct {
445 IndexOptions
446 Error string
447}
448
449func (o *indexOptionsItem) FromProto(x *proto.ZoektIndexOptions) {
450 branches := make([]zoekt.RepositoryBranch, 0, len(x.Branches))
451 for _, b := range x.GetBranches() {
452 branches = append(branches, zoekt.RepositoryBranch{
453 Name: b.GetName(),
454 Version: b.GetVersion(),
455 })
456 }
457
458 item := indexOptionsItem{}
459 languageMap := make(map[string]ctags.CTagsParserType)
460
461 for _, lang := range x.GetLanguageMap() {
462 languageMap[lang.GetLanguage()] = ctags.CTagsParserType(lang.GetCtags().Number())
463 }
464
465 item.IndexOptions = IndexOptions{
466 RepoID: uint32(x.GetRepoId()),
467 LargeFiles: x.GetLargeFiles(),
468 Symbols: x.GetSymbols(),
469 Branches: branches,
470 Name: x.GetName(),
471
472 Priority: x.GetPriority(),
473
474 DocumentRanksVersion: x.GetDocumentRanksVersion(),
475
476 Public: x.GetPublic(),
477 Fork: x.GetFork(),
478 Archived: x.GetArchived(),
479
480 LanguageMap: languageMap,
481 }
482
483 item.Error = x.GetError()
484
485 *o = item
486}
487
488func (o *indexOptionsItem) ToProto() *proto.ZoektIndexOptions {
489 branches := make([]*proto.ZoektRepositoryBranch, 0, len(o.Branches))
490 for _, b := range o.Branches {
491 branches = append(branches, &proto.ZoektRepositoryBranch{
492 Name: b.Name,
493 Version: b.Version,
494 })
495 }
496
497 languageMap := make([]*proto.LanguageMapping, 0, len(o.LanguageMap))
498
499 for lang, parser := range o.LanguageMap {
500 languageMap = append(languageMap, &proto.LanguageMapping{
501 Language: lang,
502 Ctags: proto.CTagsParserType(parser),
503 })
504 }
505
506 return &proto.ZoektIndexOptions{
507 RepoId: int32(o.RepoID),
508 LargeFiles: o.LargeFiles,
509 Symbols: o.Symbols,
510 Branches: branches,
511 Name: o.Name,
512
513 Priority: o.Priority,
514
515 DocumentRanksVersion: o.DocumentRanksVersion,
516
517 Public: o.Public,
518 Fork: o.Fork,
519 Archived: o.Archived,
520
521 Error: o.Error,
522
523 LanguageMap: languageMap,
524 }
525}
526
527func (s *sourcegraphClient) getIndexOptionsGRPC(ctx context.Context, fingerprint *proto.Fingerprint, repos []uint32) ([]indexOptionsItem, *proto.Fingerprint, error) {
528 repoIDs := make([]int32, 0, len(repos))
529 for _, id := range repos {
530 repoIDs = append(repoIDs, int32(id))
531 }
532
533 req := proto.SearchConfigurationRequest{
534 RepoIds: repoIDs,
535 Fingerprint: fingerprint,
536 }
537
538 response, err := s.grpcClient.SearchConfiguration(ctx, &req)
539 if err != nil {
540 return nil, nil, err
541 }
542
543 protoItems := response.GetUpdatedOptions()
544 items := make([]indexOptionsItem, 0, len(protoItems))
545 for _, x := range protoItems {
546 var item indexOptionsItem
547 item.FromProto(x)
548 item.IndexOptions.CloneURL = s.getCloneURL(item.Name)
549
550 items = append(items, item)
551 }
552
553 return items, response.GetFingerprint(), nil
554}
555
556const fingerprintHeader = "X-Sourcegraph-Config-Fingerprint"
557
558func (s *sourcegraphClient) getIndexOptionsREST(fingerprint string, repos ...uint32) ([]indexOptionsItem, string, error) {
559 u := s.Root.ResolveReference(&url.URL{
560 Path: "/.internal/search/configuration",
561 })
562
563 repoIDs := make([]string, len(repos))
564 for i, id := range repos {
565 repoIDs[i] = strconv.Itoa(int(id))
566 }
567 data := url.Values{"repoID": repoIDs}
568 req, err := retryablehttp.NewRequest("POST", u.String(), []byte(data.Encode()))
569 if err != nil {
570 return nil, "", err
571 }
572 req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
573 if fingerprint != "" {
574 req.Header.Set(fingerprintHeader, fingerprint)
575 }
576
577 resp, err := s.doRequest(req)
578 if err != nil {
579 return nil, "", err
580 }
581 defer resp.Body.Close()
582
583 if resp.StatusCode != http.StatusOK {
584 b, err := io.ReadAll(io.LimitReader(resp.Body, 1024))
585 _ = resp.Body.Close()
586 if err != nil {
587 return nil, "", err
588 }
589 return nil, "", &url.Error{
590 Op: "Get",
591 URL: u.String(),
592 Err: fmt.Errorf("%s: %s", resp.Status, string(b)),
593 }
594 }
595
596 dec := json.NewDecoder(resp.Body)
597 var opts []indexOptionsItem
598 for {
599 var opt indexOptionsItem
600 err := dec.Decode(&opt)
601 if err == io.EOF {
602 break
603 }
604 if err != nil {
605 return nil, "", fmt.Errorf("error decoding body: %w", err)
606 }
607 opt.CloneURL = s.getCloneURL(opt.Name)
608 opts = append(opts, opt)
609 }
610
611 return opts, resp.Header.Get(fingerprintHeader), nil
612}
613
614func (s *sourcegraphClient) getCloneURL(name string) string {
615 return s.Root.ResolveReference(&url.URL{Path: path.Join("/.internal/git", name)}).String()
616}
617
618func (s *sourcegraphClient) listRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) {
619 if s.useGRPC {
620 return s.listRepoIDsGRPC(ctx, indexed)
621 }
622
623 return s.listRepoIDsREST(ctx, indexed)
624}
625
626func (s *sourcegraphClient) listRepoIDsGRPC(ctx context.Context, indexed []uint32) ([]uint32, error) {
627 var request proto.ListRequest
628 request.Hostname = s.Hostname
629 request.IndexedIds = make([]int32, 0, len(indexed))
630 for _, id := range indexed {
631 request.IndexedIds = append(request.IndexedIds, int32(id))
632 }
633
634 response, err := s.grpcClient.List(ctx, &request)
635 if err != nil {
636 return nil, err
637 }
638
639 repoIDs := make([]uint32, 0, len(response.RepoIds))
640 for _, id := range response.RepoIds {
641 repoIDs = append(repoIDs, uint32(id))
642 }
643
644 return repoIDs, nil
645}
646
647func (s *sourcegraphClient) listRepoIDsREST(_ context.Context, indexed []uint32) ([]uint32, error) {
648 body, err := json.Marshal(&struct {
649 Hostname string
650 IndexedIDs []uint32
651 }{
652 Hostname: s.Hostname,
653 IndexedIDs: indexed,
654 })
655 if err != nil {
656 return nil, err
657 }
658
659 u := s.Root.ResolveReference(&url.URL{Path: "/.internal/repos/index"})
660 req, err := retryablehttp.NewRequest(http.MethodPost, u.String(), bytes.NewReader(body))
661 if err != nil {
662 return nil, err
663 }
664 req.Header.Set("Content-Type", "application/json; charset=utf-8")
665
666 resp, err := s.doRequest(req)
667 if err != nil {
668 return nil, err
669 }
670 defer resp.Body.Close()
671
672 if resp.StatusCode != http.StatusOK {
673 return nil, fmt.Errorf("failed to list repositories: status %s", resp.Status)
674 }
675
676 var data struct {
677 RepoIDs []uint32
678 }
679 err = json.NewDecoder(resp.Body).Decode(&data)
680 if err != nil {
681 return nil, err
682 }
683
684 return data.RepoIDs, nil
685}
686
687type indexStatus struct {
688 RepoID uint32
689 Branches []zoekt.RepositoryBranch
690 IndexTimeUnix int64
691}
692
693type updateIndexStatusRequest struct {
694 Repositories []indexStatus
695}
696
697func (u *updateIndexStatusRequest) ToProto() *proto.UpdateIndexStatusRequest {
698 repositories := make([]*proto.UpdateIndexStatusRequest_Repository, 0, len(u.Repositories))
699
700 for _, repo := range u.Repositories {
701 branches := make([]*proto.ZoektRepositoryBranch, 0, len(repo.Branches))
702
703 for _, branch := range repo.Branches {
704 branches = append(branches, &proto.ZoektRepositoryBranch{
705 Name: branch.Name,
706 Version: branch.Version,
707 })
708 }
709
710 repositories = append(repositories, &proto.UpdateIndexStatusRequest_Repository{
711 RepoId: repo.RepoID,
712 Branches: branches,
713 IndexTimeUnix: repo.IndexTimeUnix,
714 })
715 }
716
717 return &proto.UpdateIndexStatusRequest{
718 Repositories: repositories,
719 }
720}
721
722func (u *updateIndexStatusRequest) FromProto(x *proto.UpdateIndexStatusRequest) {
723 protoRepositories := x.GetRepositories()
724 repositories := make([]indexStatus, 0, len(protoRepositories))
725
726 for _, repo := range x.GetRepositories() {
727 protoBranches := repo.GetBranches()
728 branches := make([]zoekt.RepositoryBranch, 0, len(protoBranches))
729
730 for _, branch := range repo.GetBranches() {
731 branches = append(branches, zoekt.RepositoryBranch{
732 Name: branch.GetName(),
733 Version: branch.GetVersion(),
734 })
735 }
736
737 repositories = append(repositories, indexStatus{
738 RepoID: repo.GetRepoId(),
739 Branches: branches,
740 IndexTimeUnix: repo.GetIndexTimeUnix(),
741 })
742 }
743
744 *u = updateIndexStatusRequest{
745 Repositories: repositories,
746 }
747}
748
749// UpdateIndexStatus sends a request to Sourcegraph to confirm that the given
750// repositories have been indexed.
751func (s *sourcegraphClient) UpdateIndexStatus(repositories []indexStatus) error {
752 r := updateIndexStatusRequest{Repositories: repositories}
753
754 if s.useGRPC {
755 return s.updateIndexStatusGRPC(r)
756 }
757
758 return s.updateIndexStatusREST(r)
759}
760
761func (s *sourcegraphClient) updateIndexStatusGRPC(r updateIndexStatusRequest) error {
762 request := r.ToProto()
763 _, err := s.grpcClient.UpdateIndexStatus(context.Background(), request)
764
765 if err != nil {
766 return fmt.Errorf("failed to update index status: %w", err)
767 }
768
769 return nil
770}
771
772func (s *sourcegraphClient) updateIndexStatusREST(r updateIndexStatusRequest) error {
773 payload, err := json.Marshal(r)
774 if err != nil {
775 return err
776 }
777
778 u := s.Root.ResolveReference(&url.URL{Path: "/.internal/search/index-status"})
779 req, err := retryablehttp.NewRequest(http.MethodPost, u.String(), bytes.NewReader(payload))
780 if err != nil {
781 return err
782 }
783 req.Header.Set("Content-Type", "application/json; charset=utf-8")
784
785 resp, err := s.doRequest(req)
786 if err != nil {
787 return err
788 }
789 defer resp.Body.Close()
790
791 if resp.StatusCode != http.StatusOK {
792 return fmt.Errorf("failed to update index status: status %s", resp.Status)
793 }
794
795 return nil
796}
797
798// doRequest executes the provided request after adding the appropriate headers
799// for interacting with a Sourcegraph instance.
800func (s *sourcegraphClient) doRequest(req *retryablehttp.Request) (*http.Response, error) {
801 // Make all requests as an internal user.
802 //
803 // Should match github.com/sourcegraph/sourcegraph/internal/actor.headerKeyActorUID
804 // and github.com/sourcegraph/sourcegraph/internal/actor.headerValueInternalActor
805 req.Header.Set("X-Sourcegraph-Actor-UID", "internal")
806 return s.restClient.Do(req)
807}
808
809type sourcegraphFake struct {
810 RootDir string
811 Log *log.Logger
812}
813
814// GetDocumentRanks expects a file where each line has the following format:
815// path<tab>rank... where rank is a float64.
816func (sf sourcegraphFake) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) {
817 dir := filepath.Join(sf.RootDir, filepath.FromSlash(repoName))
818
819 fd, err := os.Open(filepath.Join(dir, "SG_DOCUMENT_RANKS"))
820 if err != nil {
821 return RepoPathRanks{}, err
822 }
823
824 ranks := RepoPathRanks{}
825
826 sum := 0.0
827 count := 0
828 scanner := bufio.NewScanner(fd)
829 for scanner.Scan() {
830 s := scanner.Text()
831 pathRanks := strings.Split(s, "\t")
832 if rank, err := strconv.ParseFloat(pathRanks[1], 64); err == nil {
833 ranks.Paths[pathRanks[0]] = rank
834 sum += rank
835 count++
836 }
837 }
838
839 if err := scanner.Err(); err != nil {
840 return RepoPathRanks{}, err
841 }
842
843 ranks.MeanRank = sum / float64(count)
844 return ranks, nil
845}
846
847func floats64(s string) []float64 {
848 parts := strings.Split(s, ",")
849
850 var r []float64
851 for _, rank := range parts {
852 f, err := strconv.ParseFloat(rank, 64)
853 if err != nil {
854 return nil
855 }
856 r = append(r, f)
857 }
858
859 return r
860}
861
862func (sf sourcegraphFake) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
863 repos, err := sf.ListRepoIDs(ctx, indexed)
864 if err != nil {
865 return nil, err
866 }
867
868 iterate := func(f func(IndexOptions)) {
869 opts, err := sf.GetIndexOptions(repos...)
870 if err != nil {
871 sf.Log.Printf("WARN: ignoring GetIndexOptions error: %v", err)
872 }
873 for _, opt := range opts {
874 if opt.Error != "" {
875 sf.Log.Printf("WARN: ignoring GetIndexOptions error for %s: %v", opt.Name, opt.Error)
876 continue
877 }
878 f(opt.IndexOptions)
879 }
880 }
881
882 return &SourcegraphListResult{
883 IDs: repos,
884 IterateIndexOptions: iterate,
885 }, nil
886}
887
888func (sf sourcegraphFake) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
889 opts, err := sf.GetIndexOptions(repos...)
890 if err != nil {
891 for _, id := range repos {
892 onError(id, err)
893 }
894 return
895 }
896 for _, o := range opts {
897 if o.RepoID > 0 && o.Error != "" {
898 onError(o.RepoID, errors.New(o.Error))
899 }
900 if o.Error == "" {
901 onSuccess(o.IndexOptions)
902 }
903 }
904}
905
906func (sf sourcegraphFake) GetIndexOptions(repos ...uint32) ([]indexOptionsItem, error) {
907 reposIdx := map[uint32]int{}
908 for i, id := range repos {
909 reposIdx[id] = i
910 }
911
912 items := make([]indexOptionsItem, len(repos))
913 err := sf.visitRepos(func(name string) {
914 idx, ok := reposIdx[sf.id(name)]
915 if !ok {
916 return
917 }
918 opts, err := sf.getIndexOptions(name)
919 if err != nil {
920 items[idx] = indexOptionsItem{Error: err.Error()}
921 } else {
922 items[idx] = indexOptionsItem{IndexOptions: opts}
923 }
924 })
925
926 if err != nil {
927 return nil, err
928 }
929
930 for i := range items {
931 if items[i].Error == "" && items[i].RepoID == 0 {
932 items[i].Error = "not found"
933 }
934 }
935
936 return items, nil
937}
938
939func (sf sourcegraphFake) getIndexOptions(name string) (IndexOptions, error) {
940 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name))
941 exists := func(p string) bool {
942 _, err := os.Stat(filepath.Join(dir, p))
943 return err == nil
944 }
945 float := func(p string) float64 {
946 b, _ := os.ReadFile(filepath.Join(dir, p))
947 f, _ := strconv.ParseFloat(string(bytes.TrimSpace(b)), 64)
948 return f
949 }
950
951 opts := IndexOptions{
952 RepoID: sf.id(name),
953 Name: name,
954 CloneURL: sf.getCloneURL(name),
955 Symbols: true,
956
957 Public: !exists("SG_PRIVATE"),
958 Fork: exists("SG_FORK"),
959 Archived: exists("SG_ARCHIVED"),
960
961 Priority: float("SG_PRIORITY"),
962 }
963
964 if stat, err := os.Stat(filepath.Join(dir, "SG_DOCUMENT_RANKS")); err == nil {
965 opts.DocumentRanksVersion = stat.ModTime().String()
966 }
967
968 branches, err := sf.getBranches(name)
969 if err != nil {
970 return opts, err
971 }
972 opts.Branches = branches
973
974 return opts, nil
975}
976
977func (sf sourcegraphFake) getBranches(name string) ([]zoekt.RepositoryBranch, error) {
978 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name))
979 repo, err := git.PlainOpen(dir)
980 if err != nil {
981 return nil, err
982 }
983
984 cfg, err := repo.Config()
985 if err != nil {
986 return nil, err
987 }
988
989 sec := cfg.Raw.Section("zoekt")
990 branches := sec.Options.GetAll("branch")
991 if len(branches) == 0 {
992 branches = append(branches, "HEAD")
993 }
994
995 rBranches := make([]zoekt.RepositoryBranch, 0, len(branches))
996 for _, branch := range branches {
997 cmd := exec.Command("git", "rev-parse", branch)
998 cmd.Dir = dir
999 if b, err := cmd.Output(); err != nil {
1000 sf.Log.Printf("WARN: Could not get branch %s/%s", name, branch)
1001 } else {
1002 version := string(bytes.TrimSpace(b))
1003 rBranches = append(rBranches, zoekt.RepositoryBranch{
1004 Name: branch,
1005 Version: version,
1006 })
1007 }
1008 }
1009
1010 if len(rBranches) == 0 {
1011 return nil, fmt.Errorf("WARN: Could not get any branch revisions for repo %s", name)
1012 }
1013
1014 return rBranches, nil
1015}
1016
1017func (sf sourcegraphFake) id(name string) uint32 {
1018 // allow overriding the ID.
1019 idPath := filepath.Join(sf.RootDir, filepath.FromSlash(name), "SG_ID")
1020 if b, _ := os.ReadFile(idPath); len(b) > 0 {
1021 id, err := strconv.Atoi(strings.TrimSpace(string(b)))
1022 if err == nil {
1023 return uint32(id)
1024 }
1025 }
1026 return fakeID(name)
1027}
1028
1029func (sf sourcegraphFake) getCloneURL(name string) string {
1030 return filepath.Join(sf.RootDir, filepath.FromSlash(name))
1031}
1032
1033func (sf sourcegraphFake) ListRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) {
1034 var repos []uint32
1035 err := sf.visitRepos(func(name string) {
1036 repos = append(repos, sf.id(name))
1037 })
1038 return repos, err
1039}
1040
1041func (sf sourcegraphFake) visitRepos(visit func(name string)) error {
1042 return filepath.Walk(sf.RootDir, func(path string, fi os.FileInfo, fileErr error) error {
1043 if fileErr != nil {
1044 sf.Log.Printf("WARN: ignoring error searching %s: %v", path, fileErr)
1045 return nil
1046 }
1047 if !fi.IsDir() {
1048 return nil
1049 }
1050
1051 gitdir := filepath.Join(path, ".git")
1052 if fi, err := os.Stat(gitdir); err != nil || !fi.IsDir() {
1053 return nil
1054 }
1055
1056 subpath, err := filepath.Rel(sf.RootDir, path)
1057 if err != nil {
1058 // According to WalkFunc docs, path is always filepath.Join(root,
1059 // subpath). So Rel should always work.
1060 return fmt.Errorf("filepath.Walk returned %s which is not relative to %s: %w", path, sf.RootDir, err)
1061 }
1062
1063 name := filepath.ToSlash(subpath)
1064 visit(name)
1065
1066 return filepath.SkipDir
1067 })
1068}
1069
1070func (s sourcegraphFake) UpdateIndexStatus(repositories []indexStatus) error {
1071 // noop
1072 return nil
1073}
1074
1075// fakeID returns a deterministic ID based on name. Used for fakes and tests.
1076func fakeID(name string) uint32 {
1077 // magic at the end is to ensure we get a positive number when casting.
1078 return uint32(crc32.ChecksumIEEE([]byte(name))%(1<<31-1) + 1)
1079}
1080
1081type sourcegraphNop struct{}
1082
1083func (s sourcegraphNop) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
1084 return nil, nil
1085}
1086
1087func (s sourcegraphNop) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
1088 return
1089}
1090
1091func (s sourcegraphNop) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) {
1092 return RepoPathRanks{}, nil
1093}
1094
1095func (s sourcegraphNop) UpdateIndexStatus(repositories []indexStatus) error {
1096 return nil
1097}
1098
1099type RepoPathRanks struct {
1100 MeanRank float64 `json:"mean_reference_count"`
1101 Paths map[string]float64 `json:"paths"`
1102}
1103
1104func (r *RepoPathRanks) FromProto(x *proto.DocumentRanksResponse) {
1105 protoPaths := x.GetPaths()
1106 ranks := make(map[string]float64, len(protoPaths))
1107 for filePath, rank := range protoPaths {
1108 ranks[filePath] = rank
1109 }
1110
1111 *r = RepoPathRanks{
1112 MeanRank: x.GetMeanRank(),
1113 Paths: ranks,
1114 }
1115}
1116
1117func (r *RepoPathRanks) ToProto() *proto.DocumentRanksResponse {
1118 paths := make(map[string]float64, len(r.Paths))
1119 for filePath, rank := range r.Paths {
1120 paths[filePath] = rank
1121 }
1122
1123 return &proto.DocumentRanksResponse{
1124 MeanRank: r.MeanRank,
1125 Paths: paths,
1126 }
1127}
1128
1129type noopGRPCClient struct{}
1130
1131func (n noopGRPCClient) SearchConfiguration(ctx context.Context, in *proto.SearchConfigurationRequest, opts ...grpc.CallOption) (*proto.SearchConfigurationResponse, error) {
1132 return nil, fmt.Errorf("grpc client not enabled")
1133}
1134
1135func (n noopGRPCClient) List(ctx context.Context, in *proto.ListRequest, opts ...grpc.CallOption) (*proto.ListResponse, error) {
1136 return nil, fmt.Errorf("grpc client not enabled")
1137}
1138
1139func (n noopGRPCClient) DocumentRanks(ctx context.Context, in *proto.DocumentRanksRequest, opts ...grpc.CallOption) (*proto.DocumentRanksResponse, error) {
1140 return nil, fmt.Errorf("grpc client not enabled")
1141}
1142
1143func (n noopGRPCClient) UpdateIndexStatus(ctx context.Context, in *proto.UpdateIndexStatusRequest, opts ...grpc.CallOption) (*proto.UpdateIndexStatusResponse, error) {
1144 return nil, fmt.Errorf("grpc client not enabled")
1145}
1146
1147var _ proto.ZoektConfigurationServiceClient = noopGRPCClient{}