fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "hash/crc32"
11 "io"
12 "log"
13 "math/rand"
14 "net/http"
15 "net/url"
16 "os"
17 "os/exec"
18 "path"
19 "path/filepath"
20 "strconv"
21 "strings"
22 "time"
23
24 "github.com/go-git/go-git/v5"
25 retryablehttp "github.com/hashicorp/go-retryablehttp"
26 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
27 "github.com/sourcegraph/zoekt/ctags"
28 "golang.org/x/net/trace"
29 "google.golang.org/grpc"
30
31 "github.com/sourcegraph/zoekt"
32)
33
34// SourcegraphListResult is the return value of Sourcegraph.List. It is its
35// own type since internally we batch the calculation of index options. This
36// is exposed via IterateIndexOptions.
37//
38// This type has state and is coupled to the Sourcegraph implementation.
39type SourcegraphListResult struct {
40 // IDs is the set of Sourcegraph repository IDs that this replica needs
41 // to index.
42 IDs []uint32
43
44 // IterateIndexOptions best effort resolves the IndexOptions for RepoIDs. If
45 // any repository fails it internally logs. It uses the "config fingerprint"
46 // to reduce the amount of work done. This means we only resolve options for
47 // repositories which have been mutated since the last Sourcegraph.List
48 // call.
49 //
50 // Note: this has a side-effect of setting a the "config fingerprint". The
51 // config fingerprint means we only calculate index options for repositories
52 // that have changed since the last call to IterateIndexOptions. If you want
53 // to force calculation of index options use
54 // Sourcegraph.ForceIterateIndexOptions.
55 //
56 // Note: This should not be called concurrently with the Sourcegraph client.
57 IterateIndexOptions func(func(IndexOptions))
58}
59
60// Sourcegraph represents the Sourcegraph service. It informs the indexserver
61// what to index and which options to use.
62type Sourcegraph interface {
63 // List returns a list of repository IDs to index as well as a facility to
64 // fetch the indexing options.
65 //
66 // Note: The return value is not safe to use concurrently with future calls
67 // to List.
68 List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error)
69
70 // ForceIterateIndexOptions will best-effort calculate the index options for
71 // all repos. For each repo it will call either onSuccess or onError. This
72 // is the forced version of IterateIndexOptions, so will always calculate
73 // options for each id in repos.
74 ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32)
75
76 // GetDocumentRanks returns a map from paths within the given repo to their
77 // rank vectors. Paths are assumed to be ordered by each pairwise component of
78 // the resulting vector, higher ranks coming earlier
79 GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error)
80
81 // UpdateIndexStatus sends a request to Sourcegraph to confirm that the
82 // given repositories have been indexed.
83 UpdateIndexStatus(repositories []indexStatus) error
84}
85
86type SourcegraphClientOption func(*sourcegraphClient)
87
88// WithBatchSize controls how many repository configurations we request a time.
89// If BatchSize is 0, we default to requesting 10,000 repositories at once.
90func WithBatchSize(batchSize int) SourcegraphClientOption {
91 return func(c *sourcegraphClient) {
92 c.BatchSize = batchSize
93 }
94}
95
96// WithShouldUseGRPC enables or disables the use of gRPC for communicating with Sourcegraph.
97func WithShouldUseGRPC(useGRPC bool) SourcegraphClientOption {
98 return func(c *sourcegraphClient) {
99 c.useGRPC = useGRPC
100 }
101}
102
103// WithGRPCClient sets the gRPC client to use for communicating with Sourcegraph.
104func WithGRPCClient(client proto.ZoektConfigurationServiceClient) SourcegraphClientOption {
105 return func(c *sourcegraphClient) {
106 c.grpcClient = client
107 }
108}
109
110func newSourcegraphClient(rootURL *url.URL, hostname string, opts ...SourcegraphClientOption) *sourcegraphClient {
111 httpClient := retryablehttp.NewClient()
112 httpClient.Logger = debug
113
114 // Sourcegraph might return an error message in the body if StatusCode==500. The
115 // default behavior of the go-retryablehttp restClient is to drain the body and not
116 // to propagate the error. Hence, we call ErrorPropagatedRetryPolicy instead of
117 // DefaultRetryPolicy and augment the error with the response body if possible.
118 httpClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) {
119 shouldRetry, checkErr := retryablehttp.ErrorPropagatedRetryPolicy(ctx, resp, err)
120
121 if resp != nil && resp.StatusCode == http.StatusInternalServerError {
122 if b, e := io.ReadAll(resp.Body); e == nil {
123 checkErr = fmt.Errorf("%w: body=%q", checkErr, string(b))
124 }
125 }
126
127 return shouldRetry, checkErr
128 }
129
130 client := &sourcegraphClient{
131 Root: rootURL,
132 restClient: httpClient,
133 Hostname: hostname,
134 BatchSize: 0,
135 grpcClient: noopGRPCClient{},
136 useGRPC: false, // disable gRPC by default
137 }
138
139 for _, opt := range opts {
140 opt(client)
141 }
142
143 return client
144
145}
146
147// sourcegraphClient contains methods which interact with the sourcegraph API.
148type sourcegraphClient struct {
149 // Root is the base URL for the Sourcegraph instance to index. Normally
150 // http://sourcegraph-frontend-internal or http://localhost:3090.
151 Root *url.URL
152
153 // Hostname is the name we advertise to Sourcegraph when asking for the
154 // list of repositories to index.
155 Hostname string
156
157 // BatchSize is how many repository configurations we request at once. If
158 // zero a value of 10000 is used.
159 BatchSize int
160
161 // restClient is used to make requests to the Sourcegraph instance. Prefer to
162 // use .doRequest() to ensure the appropriate headers are set.
163 restClient *retryablehttp.Client
164
165 // grpcClient is used to make requests to the Sourcegraph instance if gRPC is enabled.
166 grpcClient proto.ZoektConfigurationServiceClient
167
168 // configFingerprint is the last config fingerprint returned from
169 // Sourcegraph. It can be used for future calls to the configuration
170 // endpoint.
171 //
172 // configFingerprint is mutually exclusive with configFingerprintProto - this field
173 // will only be used if gRPC is disabled.
174 configFingerprint string
175
176 // configFingerprintProto is the last config fingerprint (as GRPC) returned from
177 // Sourcegraph. It can be used for future calls to the configuration
178 // endpoint.
179 //
180 // configFingerprintProto is mutually exclusive with configFingerprint - this field
181 // will only be used if gRPC is enabled.
182 configFingerprintProto *proto.Fingerprint
183
184 // configFingerprintReset tracks when we should zero out the
185 // configFingerprint. We want to periodically do this just in case our
186 // configFingerprint logic is faulty. When it is cleared out, we fallback to
187 // calculating everything.
188 configFingerprintReset time.Time
189
190 // useGRPC indicates whether we should use a gRPC client to communicate with Sourcegraph.
191 useGRPC bool
192}
193
194// GetDocumentRanks asks Sourcegraph for a mapping of file paths to rank
195// vectors.
196func (s *sourcegraphClient) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) {
197 if s.useGRPC {
198 return s.getDocumentRanksGRPC(ctx, repoName)
199 }
200
201 return s.getDocumentRanksREST(ctx, repoName)
202}
203
204func (s *sourcegraphClient) getDocumentRanksGRPC(ctx context.Context, repoName string) (RepoPathRanks, error) {
205 resp, err := s.grpcClient.DocumentRanks(ctx, &proto.DocumentRanksRequest{Repository: repoName})
206 if err != nil {
207 return RepoPathRanks{}, err
208 }
209
210 var out RepoPathRanks
211 out.FromProto(resp)
212
213 return out, nil
214}
215
216func (s *sourcegraphClient) getDocumentRanksREST(ctx context.Context, repoName string) (RepoPathRanks, error) {
217 u := s.Root.ResolveReference(&url.URL{
218 Path: "/.internal/ranks/" + strings.Trim(repoName, "/") + "/documents",
219 })
220
221 b, err := s.get(ctx, u)
222 if err != nil {
223 return RepoPathRanks{}, err
224 }
225
226 ranks := RepoPathRanks{}
227 err = json.Unmarshal(b, &ranks)
228 if err != nil {
229 return RepoPathRanks{}, err
230 }
231
232 return ranks, nil
233}
234
235func (s *sourcegraphClient) get(ctx context.Context, u *url.URL) ([]byte, error) {
236 req, err := retryablehttp.NewRequestWithContext(ctx, "GET", u.String(), nil)
237 if err != nil {
238 return nil, err
239 }
240
241 resp, err := s.doRequest(req)
242 if err != nil {
243 return nil, err
244 }
245 defer resp.Body.Close()
246
247 if resp.StatusCode != http.StatusOK {
248 b, err := io.ReadAll(io.LimitReader(resp.Body, 1024))
249 _ = resp.Body.Close()
250 if err != nil {
251 return nil, err
252 }
253 return nil, &url.Error{
254 Op: "Get",
255 URL: u.String(),
256 Err: fmt.Errorf("%s: %s", resp.Status, string(b)),
257 }
258 }
259
260 b, err := io.ReadAll(resp.Body)
261 if err != nil {
262 return nil, err
263 }
264
265 return b, nil
266}
267
268func (s *sourcegraphClient) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
269 repos, err := s.listRepoIDs(ctx, indexed)
270 if err != nil {
271 return nil, fmt.Errorf("listRepoIDs: %w", err)
272 }
273
274 batchSize := s.BatchSize
275 if batchSize == 0 {
276 batchSize = 10_000
277 }
278
279 // Check if we should recalculate everything.
280 if time.Now().After(s.configFingerprintReset) {
281 // for every 500 repos we wait a minute. 2021-12-15 on sourcegraph.com
282 // this works out to every 100 minutes.
283 next := time.Duration(len(indexed)) * time.Minute / 500
284 if min := 5 * time.Minute; next < min {
285 next = min
286 }
287 next += time.Duration(rand.Int63n(int64(next) / 4)) // jitter
288 s.configFingerprintReset = time.Now().Add(next)
289
290 s.configFingerprintProto = nil
291 s.configFingerprint = ""
292 }
293
294 // getIndexOptionsFunc is a function that can be used to get the index
295 // options for a set of repos (while properly handling any configuration fingerprint
296 // changes).
297 //
298 // In general, this function provides a consistent fingerprint for each batch call,
299 // and updates the server state with the new fingerprint. If any of the batch calls
300 // fail, the old fingerprint is restored.
301 type getIndexOptionsFunc func(repos ...uint32) ([]indexOptionsItem, error)
302
303 // default to REST
304 mkGetIndexOptionsFunc := func(tr trace.Trace) getIndexOptionsFunc {
305 startingFingerPrint := s.configFingerprint
306 tr.LazyPrintf("fingerprint: %s", startingFingerPrint)
307
308 first := true
309 return func(repos ...uint32) ([]indexOptionsItem, error) {
310 options, nextFingerPrint, err := s.getIndexOptionsREST(startingFingerPrint, repos...)
311 if err != nil {
312 first = false
313 s.configFingerprint = startingFingerPrint
314
315 return nil, err
316 }
317
318 if first {
319 first = false
320 s.configFingerprint = nextFingerPrint
321
322 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint)
323 }
324
325 return options, nil
326 }
327 }
328
329 // If we enabled GRPC, use our gRPC client instead.
330 if s.useGRPC {
331 mkGetIndexOptionsFunc = func(tr trace.Trace) getIndexOptionsFunc {
332 startingFingerPrint := s.configFingerprintProto
333 tr.LazyPrintf("fingerprint: %s", startingFingerPrint.String())
334
335 first := true
336 return func(repos ...uint32) ([]indexOptionsItem, error) {
337 options, nextFingerPrint, err := s.getIndexOptionsGRPC(ctx, startingFingerPrint, repos)
338 if err != nil {
339 first = false
340 s.configFingerprintProto = startingFingerPrint
341
342 return nil, err
343 }
344
345 if first {
346 first = false
347 s.configFingerprintProto = nextFingerPrint
348 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint.String())
349 }
350
351 return options, nil
352 }
353 }
354 }
355
356 iterate := func(f func(IndexOptions)) {
357 start := time.Now()
358 tr := trace.New("getIndexOptions", "")
359 tr.LazyPrintf("getting index options for %d repos", len(repos))
360
361 defer func() {
362 metricResolveRevisionsDuration.Observe(time.Since(start).Seconds())
363 tr.Finish()
364 }()
365
366 getIndexOptions := mkGetIndexOptionsFunc(tr)
367
368 // We ask the frontend to get index options in batches.
369 for repos := range batched(repos, batchSize) {
370 start := time.Now()
371 options, err := getIndexOptions(repos...)
372 duration := time.Since(start)
373
374 if err != nil {
375 metricResolveRevisionDuration.WithLabelValues("false").Observe(duration.Seconds())
376 tr.LazyPrintf("failed fetching options batch: %v", err)
377 tr.SetError()
378
379 continue
380 }
381
382 metricResolveRevisionDuration.WithLabelValues("true").Observe(duration.Seconds())
383
384 for _, o := range options {
385 metricGetIndexOptions.Inc()
386
387 if o.Error != "" {
388 metricGetIndexOptionsError.Inc()
389 tr.LazyPrintf("failed fetching options for %v: %v", o.Name, o.Error)
390 tr.SetError()
391
392 continue
393 }
394 f(o.IndexOptions)
395 }
396 }
397 }
398
399 return &SourcegraphListResult{
400 IDs: repos,
401 IterateIndexOptions: iterate,
402 }, nil
403}
404
405func (s *sourcegraphClient) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
406 batchSize := s.BatchSize
407 if batchSize == 0 {
408 batchSize = 10_000
409 }
410
411 getIndexOptions := func(repos ...uint32) ([]indexOptionsItem, error) {
412 opts, _, err := s.getIndexOptionsREST("", repos...)
413 return opts, err
414 }
415
416 if s.useGRPC {
417 getIndexOptions = func(repos ...uint32) ([]indexOptionsItem, error) {
418 opts, _, err := s.getIndexOptionsGRPC(context.Background(), nil, repos)
419 return opts, err
420 }
421 }
422
423 for repos := range batched(repos, batchSize) {
424 opts, err := getIndexOptions(repos...)
425 if err != nil {
426 for _, id := range repos {
427 onError(id, err)
428 }
429 continue
430 }
431 for _, o := range opts {
432 if o.RepoID > 0 && o.Error != "" {
433 onError(o.RepoID, errors.New(o.Error))
434 }
435 if o.Error == "" {
436 onSuccess(o.IndexOptions)
437 }
438 }
439 }
440}
441
442// indexOptionsItem wraps IndexOptions to also include an error returned by
443// the API.
444type indexOptionsItem struct {
445 IndexOptions
446 Error string
447}
448
449func (o *indexOptionsItem) FromProto(x *proto.ZoektIndexOptions) {
450 branches := make([]zoekt.RepositoryBranch, 0, len(x.Branches))
451 for _, b := range x.GetBranches() {
452 branches = append(branches, zoekt.RepositoryBranch{
453 Name: b.GetName(),
454 Version: b.GetVersion(),
455 })
456 }
457
458 item := indexOptionsItem{}
459 languageMap := make(map[string]ctags.CTagsParserType)
460
461 for _, lang := range x.GetLanguageMap() {
462 languageMap[lang.GetLanguage()] = ctags.CTagsParserType(lang.GetCtags().Number())
463 }
464
465 item.IndexOptions = IndexOptions{
466 RepoID: uint32(x.GetRepoId()),
467 LargeFiles: x.GetLargeFiles(),
468 Symbols: x.GetSymbols(),
469 Branches: branches,
470 Name: x.GetName(),
471
472 Priority: x.GetPriority(),
473
474 DocumentRanksVersion: x.GetDocumentRanksVersion(),
475
476 Public: x.GetPublic(),
477 Fork: x.GetFork(),
478 Archived: x.GetArchived(),
479
480 LanguageMap: languageMap,
481 ShardConcurrency: x.GetShardConcurrency(),
482 }
483
484 item.Error = x.GetError()
485
486 *o = item
487}
488
489func (o *indexOptionsItem) ToProto() *proto.ZoektIndexOptions {
490 branches := make([]*proto.ZoektRepositoryBranch, 0, len(o.Branches))
491 for _, b := range o.Branches {
492 branches = append(branches, &proto.ZoektRepositoryBranch{
493 Name: b.Name,
494 Version: b.Version,
495 })
496 }
497
498 languageMap := make([]*proto.LanguageMapping, 0, len(o.LanguageMap))
499
500 for lang, parser := range o.LanguageMap {
501 languageMap = append(languageMap, &proto.LanguageMapping{
502 Language: lang,
503 Ctags: proto.CTagsParserType(parser),
504 })
505 }
506
507 return &proto.ZoektIndexOptions{
508 RepoId: int32(o.RepoID),
509 LargeFiles: o.LargeFiles,
510 Symbols: o.Symbols,
511 Branches: branches,
512 Name: o.Name,
513
514 Priority: o.Priority,
515
516 DocumentRanksVersion: o.DocumentRanksVersion,
517
518 Public: o.Public,
519 Fork: o.Fork,
520 Archived: o.Archived,
521
522 Error: o.Error,
523
524 LanguageMap: languageMap,
525 ShardConcurrency: o.ShardConcurrency,
526 }
527}
528
529func (s *sourcegraphClient) getIndexOptionsGRPC(ctx context.Context, fingerprint *proto.Fingerprint, repos []uint32) ([]indexOptionsItem, *proto.Fingerprint, error) {
530 repoIDs := make([]int32, 0, len(repos))
531 for _, id := range repos {
532 repoIDs = append(repoIDs, int32(id))
533 }
534
535 req := proto.SearchConfigurationRequest{
536 RepoIds: repoIDs,
537 Fingerprint: fingerprint,
538 }
539
540 response, err := s.grpcClient.SearchConfiguration(ctx, &req)
541 if err != nil {
542 return nil, nil, err
543 }
544
545 protoItems := response.GetUpdatedOptions()
546 items := make([]indexOptionsItem, 0, len(protoItems))
547 for _, x := range protoItems {
548 var item indexOptionsItem
549 item.FromProto(x)
550 item.IndexOptions.CloneURL = s.getCloneURL(item.Name)
551
552 items = append(items, item)
553 }
554
555 return items, response.GetFingerprint(), nil
556}
557
558const fingerprintHeader = "X-Sourcegraph-Config-Fingerprint"
559
560func (s *sourcegraphClient) getIndexOptionsREST(fingerprint string, repos ...uint32) ([]indexOptionsItem, string, error) {
561 u := s.Root.ResolveReference(&url.URL{
562 Path: "/.internal/search/configuration",
563 })
564
565 repoIDs := make([]string, len(repos))
566 for i, id := range repos {
567 repoIDs[i] = strconv.Itoa(int(id))
568 }
569 data := url.Values{"repoID": repoIDs}
570 req, err := retryablehttp.NewRequest("POST", u.String(), []byte(data.Encode()))
571 if err != nil {
572 return nil, "", err
573 }
574 req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
575 if fingerprint != "" {
576 req.Header.Set(fingerprintHeader, fingerprint)
577 }
578
579 resp, err := s.doRequest(req)
580 if err != nil {
581 return nil, "", err
582 }
583 defer resp.Body.Close()
584
585 if resp.StatusCode != http.StatusOK {
586 b, err := io.ReadAll(io.LimitReader(resp.Body, 1024))
587 _ = resp.Body.Close()
588 if err != nil {
589 return nil, "", err
590 }
591 return nil, "", &url.Error{
592 Op: "Get",
593 URL: u.String(),
594 Err: fmt.Errorf("%s: %s", resp.Status, string(b)),
595 }
596 }
597
598 dec := json.NewDecoder(resp.Body)
599 var opts []indexOptionsItem
600 for {
601 var opt indexOptionsItem
602 err := dec.Decode(&opt)
603 if err == io.EOF {
604 break
605 }
606 if err != nil {
607 return nil, "", fmt.Errorf("error decoding body: %w", err)
608 }
609 opt.CloneURL = s.getCloneURL(opt.Name)
610 opts = append(opts, opt)
611 }
612
613 return opts, resp.Header.Get(fingerprintHeader), nil
614}
615
616func (s *sourcegraphClient) getCloneURL(name string) string {
617 return s.Root.ResolveReference(&url.URL{Path: path.Join("/.internal/git", name)}).String()
618}
619
620func (s *sourcegraphClient) listRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) {
621 if s.useGRPC {
622 return s.listRepoIDsGRPC(ctx, indexed)
623 }
624
625 return s.listRepoIDsREST(ctx, indexed)
626}
627
628func (s *sourcegraphClient) listRepoIDsGRPC(ctx context.Context, indexed []uint32) ([]uint32, error) {
629 var request proto.ListRequest
630 request.Hostname = s.Hostname
631 request.IndexedIds = make([]int32, 0, len(indexed))
632 for _, id := range indexed {
633 request.IndexedIds = append(request.IndexedIds, int32(id))
634 }
635
636 response, err := s.grpcClient.List(ctx, &request)
637 if err != nil {
638 return nil, err
639 }
640
641 repoIDs := make([]uint32, 0, len(response.RepoIds))
642 for _, id := range response.RepoIds {
643 repoIDs = append(repoIDs, uint32(id))
644 }
645
646 return repoIDs, nil
647}
648
649func (s *sourcegraphClient) listRepoIDsREST(_ context.Context, indexed []uint32) ([]uint32, error) {
650 body, err := json.Marshal(&struct {
651 Hostname string
652 IndexedIDs []uint32
653 }{
654 Hostname: s.Hostname,
655 IndexedIDs: indexed,
656 })
657 if err != nil {
658 return nil, err
659 }
660
661 u := s.Root.ResolveReference(&url.URL{Path: "/.internal/repos/index"})
662 req, err := retryablehttp.NewRequest(http.MethodPost, u.String(), bytes.NewReader(body))
663 if err != nil {
664 return nil, err
665 }
666 req.Header.Set("Content-Type", "application/json; charset=utf-8")
667
668 resp, err := s.doRequest(req)
669 if err != nil {
670 return nil, err
671 }
672 defer resp.Body.Close()
673
674 if resp.StatusCode != http.StatusOK {
675 return nil, fmt.Errorf("failed to list repositories: status %s", resp.Status)
676 }
677
678 var data struct {
679 RepoIDs []uint32
680 }
681 err = json.NewDecoder(resp.Body).Decode(&data)
682 if err != nil {
683 return nil, err
684 }
685
686 return data.RepoIDs, nil
687}
688
689type indexStatus struct {
690 RepoID uint32
691 Branches []zoekt.RepositoryBranch
692 IndexTimeUnix int64
693}
694
695type updateIndexStatusRequest struct {
696 Repositories []indexStatus
697}
698
699func (u *updateIndexStatusRequest) ToProto() *proto.UpdateIndexStatusRequest {
700 repositories := make([]*proto.UpdateIndexStatusRequest_Repository, 0, len(u.Repositories))
701
702 for _, repo := range u.Repositories {
703 branches := make([]*proto.ZoektRepositoryBranch, 0, len(repo.Branches))
704
705 for _, branch := range repo.Branches {
706 branches = append(branches, &proto.ZoektRepositoryBranch{
707 Name: branch.Name,
708 Version: branch.Version,
709 })
710 }
711
712 repositories = append(repositories, &proto.UpdateIndexStatusRequest_Repository{
713 RepoId: repo.RepoID,
714 Branches: branches,
715 IndexTimeUnix: repo.IndexTimeUnix,
716 })
717 }
718
719 return &proto.UpdateIndexStatusRequest{
720 Repositories: repositories,
721 }
722}
723
724func (u *updateIndexStatusRequest) FromProto(x *proto.UpdateIndexStatusRequest) {
725 protoRepositories := x.GetRepositories()
726 repositories := make([]indexStatus, 0, len(protoRepositories))
727
728 for _, repo := range x.GetRepositories() {
729 protoBranches := repo.GetBranches()
730 branches := make([]zoekt.RepositoryBranch, 0, len(protoBranches))
731
732 for _, branch := range repo.GetBranches() {
733 branches = append(branches, zoekt.RepositoryBranch{
734 Name: branch.GetName(),
735 Version: branch.GetVersion(),
736 })
737 }
738
739 repositories = append(repositories, indexStatus{
740 RepoID: repo.GetRepoId(),
741 Branches: branches,
742 IndexTimeUnix: repo.GetIndexTimeUnix(),
743 })
744 }
745
746 *u = updateIndexStatusRequest{
747 Repositories: repositories,
748 }
749}
750
751// UpdateIndexStatus sends a request to Sourcegraph to confirm that the given
752// repositories have been indexed.
753func (s *sourcegraphClient) UpdateIndexStatus(repositories []indexStatus) error {
754 r := updateIndexStatusRequest{Repositories: repositories}
755
756 if s.useGRPC {
757 return s.updateIndexStatusGRPC(r)
758 }
759
760 return s.updateIndexStatusREST(r)
761}
762
763func (s *sourcegraphClient) updateIndexStatusGRPC(r updateIndexStatusRequest) error {
764 request := r.ToProto()
765 _, err := s.grpcClient.UpdateIndexStatus(context.Background(), request)
766
767 if err != nil {
768 return fmt.Errorf("failed to update index status: %w", err)
769 }
770
771 return nil
772}
773
774func (s *sourcegraphClient) updateIndexStatusREST(r updateIndexStatusRequest) error {
775 payload, err := json.Marshal(r)
776 if err != nil {
777 return err
778 }
779
780 u := s.Root.ResolveReference(&url.URL{Path: "/.internal/search/index-status"})
781 req, err := retryablehttp.NewRequest(http.MethodPost, u.String(), bytes.NewReader(payload))
782 if err != nil {
783 return err
784 }
785 req.Header.Set("Content-Type", "application/json; charset=utf-8")
786
787 resp, err := s.doRequest(req)
788 if err != nil {
789 return err
790 }
791 defer resp.Body.Close()
792
793 if resp.StatusCode != http.StatusOK {
794 return fmt.Errorf("failed to update index status: status %s", resp.Status)
795 }
796
797 return nil
798}
799
800// doRequest executes the provided request after adding the appropriate headers
801// for interacting with a Sourcegraph instance.
802func (s *sourcegraphClient) doRequest(req *retryablehttp.Request) (*http.Response, error) {
803 // Make all requests as an internal user.
804 //
805 // Should match github.com/sourcegraph/sourcegraph/internal/actor.headerKeyActorUID
806 // and github.com/sourcegraph/sourcegraph/internal/actor.headerValueInternalActor
807 req.Header.Set("X-Sourcegraph-Actor-UID", "internal")
808 return s.restClient.Do(req)
809}
810
811type sourcegraphFake struct {
812 RootDir string
813 Log *log.Logger
814}
815
816// GetDocumentRanks expects a file where each line has the following format:
817// path<tab>rank... where rank is a float64.
818func (sf sourcegraphFake) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) {
819 dir := filepath.Join(sf.RootDir, filepath.FromSlash(repoName))
820
821 fd, err := os.Open(filepath.Join(dir, "SG_DOCUMENT_RANKS"))
822 if err != nil {
823 return RepoPathRanks{}, err
824 }
825
826 ranks := RepoPathRanks{}
827
828 sum := 0.0
829 count := 0
830 scanner := bufio.NewScanner(fd)
831 for scanner.Scan() {
832 s := scanner.Text()
833 pathRanks := strings.Split(s, "\t")
834 if rank, err := strconv.ParseFloat(pathRanks[1], 64); err == nil {
835 ranks.Paths[pathRanks[0]] = rank
836 sum += rank
837 count++
838 }
839 }
840
841 if err := scanner.Err(); err != nil {
842 return RepoPathRanks{}, err
843 }
844
845 ranks.MeanRank = sum / float64(count)
846 return ranks, nil
847}
848
849func floats64(s string) []float64 {
850 parts := strings.Split(s, ",")
851
852 var r []float64
853 for _, rank := range parts {
854 f, err := strconv.ParseFloat(rank, 64)
855 if err != nil {
856 return nil
857 }
858 r = append(r, f)
859 }
860
861 return r
862}
863
864func (sf sourcegraphFake) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
865 repos, err := sf.ListRepoIDs(ctx, indexed)
866 if err != nil {
867 return nil, err
868 }
869
870 iterate := func(f func(IndexOptions)) {
871 opts, err := sf.GetIndexOptions(repos...)
872 if err != nil {
873 sf.Log.Printf("WARN: ignoring GetIndexOptions error: %v", err)
874 }
875 for _, opt := range opts {
876 if opt.Error != "" {
877 sf.Log.Printf("WARN: ignoring GetIndexOptions error for %s: %v", opt.Name, opt.Error)
878 continue
879 }
880 f(opt.IndexOptions)
881 }
882 }
883
884 return &SourcegraphListResult{
885 IDs: repos,
886 IterateIndexOptions: iterate,
887 }, nil
888}
889
890func (sf sourcegraphFake) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
891 opts, err := sf.GetIndexOptions(repos...)
892 if err != nil {
893 for _, id := range repos {
894 onError(id, err)
895 }
896 return
897 }
898 for _, o := range opts {
899 if o.RepoID > 0 && o.Error != "" {
900 onError(o.RepoID, errors.New(o.Error))
901 }
902 if o.Error == "" {
903 onSuccess(o.IndexOptions)
904 }
905 }
906}
907
908func (sf sourcegraphFake) GetIndexOptions(repos ...uint32) ([]indexOptionsItem, error) {
909 reposIdx := map[uint32]int{}
910 for i, id := range repos {
911 reposIdx[id] = i
912 }
913
914 items := make([]indexOptionsItem, len(repos))
915 err := sf.visitRepos(func(name string) {
916 idx, ok := reposIdx[sf.id(name)]
917 if !ok {
918 return
919 }
920 opts, err := sf.getIndexOptions(name)
921 if err != nil {
922 items[idx] = indexOptionsItem{Error: err.Error()}
923 } else {
924 items[idx] = indexOptionsItem{IndexOptions: opts}
925 }
926 })
927
928 if err != nil {
929 return nil, err
930 }
931
932 for i := range items {
933 if items[i].Error == "" && items[i].RepoID == 0 {
934 items[i].Error = "not found"
935 }
936 }
937
938 return items, nil
939}
940
941func (sf sourcegraphFake) getIndexOptions(name string) (IndexOptions, error) {
942 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name))
943 exists := func(p string) bool {
944 _, err := os.Stat(filepath.Join(dir, p))
945 return err == nil
946 }
947 float := func(p string) float64 {
948 b, _ := os.ReadFile(filepath.Join(dir, p))
949 f, _ := strconv.ParseFloat(string(bytes.TrimSpace(b)), 64)
950 return f
951 }
952
953 opts := IndexOptions{
954 RepoID: sf.id(name),
955 Name: name,
956 CloneURL: sf.getCloneURL(name),
957 Symbols: true,
958
959 Public: !exists("SG_PRIVATE"),
960 Fork: exists("SG_FORK"),
961 Archived: exists("SG_ARCHIVED"),
962
963 Priority: float("SG_PRIORITY"),
964 }
965
966 if stat, err := os.Stat(filepath.Join(dir, "SG_DOCUMENT_RANKS")); err == nil {
967 opts.DocumentRanksVersion = stat.ModTime().String()
968 }
969
970 branches, err := sf.getBranches(name)
971 if err != nil {
972 return opts, err
973 }
974 opts.Branches = branches
975
976 return opts, nil
977}
978
979func (sf sourcegraphFake) getBranches(name string) ([]zoekt.RepositoryBranch, error) {
980 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name))
981 repo, err := git.PlainOpen(dir)
982 if err != nil {
983 return nil, err
984 }
985
986 cfg, err := repo.Config()
987 if err != nil {
988 return nil, err
989 }
990
991 sec := cfg.Raw.Section("zoekt")
992 branches := sec.Options.GetAll("branch")
993 if len(branches) == 0 {
994 branches = append(branches, "HEAD")
995 }
996
997 rBranches := make([]zoekt.RepositoryBranch, 0, len(branches))
998 for _, branch := range branches {
999 cmd := exec.Command("git", "rev-parse", branch)
1000 cmd.Dir = dir
1001 if b, err := cmd.Output(); err != nil {
1002 sf.Log.Printf("WARN: Could not get branch %s/%s", name, branch)
1003 } else {
1004 version := string(bytes.TrimSpace(b))
1005 rBranches = append(rBranches, zoekt.RepositoryBranch{
1006 Name: branch,
1007 Version: version,
1008 })
1009 }
1010 }
1011
1012 if len(rBranches) == 0 {
1013 return nil, fmt.Errorf("WARN: Could not get any branch revisions for repo %s", name)
1014 }
1015
1016 return rBranches, nil
1017}
1018
1019func (sf sourcegraphFake) id(name string) uint32 {
1020 // allow overriding the ID.
1021 idPath := filepath.Join(sf.RootDir, filepath.FromSlash(name), "SG_ID")
1022 if b, _ := os.ReadFile(idPath); len(b) > 0 {
1023 id, err := strconv.Atoi(strings.TrimSpace(string(b)))
1024 if err == nil {
1025 return uint32(id)
1026 }
1027 }
1028 return fakeID(name)
1029}
1030
1031func (sf sourcegraphFake) getCloneURL(name string) string {
1032 return filepath.Join(sf.RootDir, filepath.FromSlash(name))
1033}
1034
1035func (sf sourcegraphFake) ListRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) {
1036 var repos []uint32
1037 err := sf.visitRepos(func(name string) {
1038 repos = append(repos, sf.id(name))
1039 })
1040 return repos, err
1041}
1042
1043func (sf sourcegraphFake) visitRepos(visit func(name string)) error {
1044 return filepath.Walk(sf.RootDir, func(path string, fi os.FileInfo, fileErr error) error {
1045 if fileErr != nil {
1046 sf.Log.Printf("WARN: ignoring error searching %s: %v", path, fileErr)
1047 return nil
1048 }
1049 if !fi.IsDir() {
1050 return nil
1051 }
1052
1053 gitdir := filepath.Join(path, ".git")
1054 if fi, err := os.Stat(gitdir); err != nil || !fi.IsDir() {
1055 return nil
1056 }
1057
1058 subpath, err := filepath.Rel(sf.RootDir, path)
1059 if err != nil {
1060 // According to WalkFunc docs, path is always filepath.Join(root,
1061 // subpath). So Rel should always work.
1062 return fmt.Errorf("filepath.Walk returned %s which is not relative to %s: %w", path, sf.RootDir, err)
1063 }
1064
1065 name := filepath.ToSlash(subpath)
1066 visit(name)
1067
1068 return filepath.SkipDir
1069 })
1070}
1071
1072func (s sourcegraphFake) UpdateIndexStatus(repositories []indexStatus) error {
1073 // noop
1074 return nil
1075}
1076
1077// fakeID returns a deterministic ID based on name. Used for fakes and tests.
1078func fakeID(name string) uint32 {
1079 // magic at the end is to ensure we get a positive number when casting.
1080 return uint32(crc32.ChecksumIEEE([]byte(name))%(1<<31-1) + 1)
1081}
1082
1083type sourcegraphNop struct{}
1084
1085func (s sourcegraphNop) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
1086 return nil, nil
1087}
1088
1089func (s sourcegraphNop) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
1090 return
1091}
1092
1093func (s sourcegraphNop) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) {
1094 return RepoPathRanks{}, nil
1095}
1096
1097func (s sourcegraphNop) UpdateIndexStatus(repositories []indexStatus) error {
1098 return nil
1099}
1100
1101type RepoPathRanks struct {
1102 MeanRank float64 `json:"mean_reference_count"`
1103 Paths map[string]float64 `json:"paths"`
1104}
1105
1106func (r *RepoPathRanks) FromProto(x *proto.DocumentRanksResponse) {
1107 protoPaths := x.GetPaths()
1108 ranks := make(map[string]float64, len(protoPaths))
1109 for filePath, rank := range protoPaths {
1110 ranks[filePath] = rank
1111 }
1112
1113 *r = RepoPathRanks{
1114 MeanRank: x.GetMeanRank(),
1115 Paths: ranks,
1116 }
1117}
1118
1119func (r *RepoPathRanks) ToProto() *proto.DocumentRanksResponse {
1120 paths := make(map[string]float64, len(r.Paths))
1121 for filePath, rank := range r.Paths {
1122 paths[filePath] = rank
1123 }
1124
1125 return &proto.DocumentRanksResponse{
1126 MeanRank: r.MeanRank,
1127 Paths: paths,
1128 }
1129}
1130
1131type noopGRPCClient struct{}
1132
1133func (n noopGRPCClient) SearchConfiguration(ctx context.Context, in *proto.SearchConfigurationRequest, opts ...grpc.CallOption) (*proto.SearchConfigurationResponse, error) {
1134 return nil, fmt.Errorf("grpc client not enabled")
1135}
1136
1137func (n noopGRPCClient) List(ctx context.Context, in *proto.ListRequest, opts ...grpc.CallOption) (*proto.ListResponse, error) {
1138 return nil, fmt.Errorf("grpc client not enabled")
1139}
1140
1141func (n noopGRPCClient) DocumentRanks(ctx context.Context, in *proto.DocumentRanksRequest, opts ...grpc.CallOption) (*proto.DocumentRanksResponse, error) {
1142 return nil, fmt.Errorf("grpc client not enabled")
1143}
1144
1145func (n noopGRPCClient) UpdateIndexStatus(ctx context.Context, in *proto.UpdateIndexStatusRequest, opts ...grpc.CallOption) (*proto.UpdateIndexStatusResponse, error) {
1146 return nil, fmt.Errorf("grpc client not enabled")
1147}
1148
1149var _ proto.ZoektConfigurationServiceClient = noopGRPCClient{}