fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "hash/crc32"
11 "io"
12 "log"
13 "math/rand"
14 "net/http"
15 "net/url"
16 "os"
17 "os/exec"
18 "path"
19 "path/filepath"
20 "strconv"
21 "strings"
22 "time"
23
24 "github.com/go-git/go-git/v5"
25 retryablehttp "github.com/hashicorp/go-retryablehttp"
26 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
27 "golang.org/x/net/trace"
28 "google.golang.org/grpc"
29
30 "github.com/sourcegraph/zoekt"
31)
32
33// SourcegraphListResult is the return value of Sourcegraph.List. It is its
34// own type since internally we batch the calculation of index options. This
35// is exposed via IterateIndexOptions.
36//
37// This type has state and is coupled to the Sourcegraph implementation.
38type SourcegraphListResult struct {
39 // IDs is the set of Sourcegraph repository IDs that this replica needs
40 // to index.
41 IDs []uint32
42
43 // IterateIndexOptions best effort resolves the IndexOptions for RepoIDs. If
44 // any repository fails it internally logs. It uses the "config fingerprint"
45 // to reduce the amount of work done. This means we only resolve options for
46 // repositories which have been mutated since the last Sourcegraph.List
47 // call.
48 //
49 // Note: this has a side-effect of setting a the "config fingerprint". The
50 // config fingerprint means we only calculate index options for repositories
51 // that have changed since the last call to IterateIndexOptions. If you want
52 // to force calculation of index options use
53 // Sourcegraph.ForceIterateIndexOptions.
54 //
55 // Note: This should not be called concurrently with the Sourcegraph client.
56 IterateIndexOptions func(func(IndexOptions))
57}
58
59// Sourcegraph represents the Sourcegraph service. It informs the indexserver
60// what to index and which options to use.
61type Sourcegraph interface {
62 // List returns a list of repository IDs to index as well as a facility to
63 // fetch the indexing options.
64 //
65 // Note: The return value is not safe to use concurrently with future calls
66 // to List.
67 List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error)
68
69 // ForceIterateIndexOptions will best-effort calculate the index options for
70 // all repos. For each repo it will call either onSuccess or onError. This
71 // is the forced version of IterateIndexOptions, so will always calculate
72 // options for each id in repos.
73 ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32)
74
75 // GetDocumentRanks returns a map from paths within the given repo to their
76 // rank vectors. Paths are assumed to be ordered by each pairwise component of
77 // the resulting vector, higher ranks coming earlier
78 GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error)
79
80 // UpdateIndexStatus sends a request to Sourcegraph to confirm that the
81 // given repositories have been indexed.
82 UpdateIndexStatus(repositories []indexStatus) error
83}
84
85type SourcegraphClientOption func(*sourcegraphClient)
86
87// WithBatchSize controls how many repository configurations we request a time.
88// If BatchSize is 0, we default to requesting 10,000 repositories at once.
89func WithBatchSize(batchSize int) SourcegraphClientOption {
90 return func(c *sourcegraphClient) {
91 c.BatchSize = batchSize
92 }
93}
94
95// WithShouldUseGRPC enables or disables the use of gRPC for communicating with Sourcegraph.
96func WithShouldUseGRPC(useGRPC bool) SourcegraphClientOption {
97 return func(c *sourcegraphClient) {
98 c.useGRPC = useGRPC
99 }
100}
101
102// WithGRPCClient sets the gRPC client to use for communicating with Sourcegraph.
103func WithGRPCClient(client proto.ZoektConfigurationServiceClient) SourcegraphClientOption {
104 return func(c *sourcegraphClient) {
105 c.grpcClient = client
106 }
107}
108
109func newSourcegraphClient(rootURL *url.URL, hostname string, opts ...SourcegraphClientOption) *sourcegraphClient {
110 httpClient := retryablehttp.NewClient()
111 httpClient.Logger = debug
112
113 // Sourcegraph might return an error message in the body if StatusCode==500. The
114 // default behavior of the go-retryablehttp restClient is to drain the body and not
115 // to propagate the error. Hence, we call ErrorPropagatedRetryPolicy instead of
116 // DefaultRetryPolicy and augment the error with the response body if possible.
117 httpClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) {
118 shouldRetry, checkErr := retryablehttp.ErrorPropagatedRetryPolicy(ctx, resp, err)
119
120 if resp != nil && resp.StatusCode == http.StatusInternalServerError {
121 if b, e := io.ReadAll(resp.Body); e == nil {
122 checkErr = fmt.Errorf("%w: body=%q", checkErr, string(b))
123 }
124 }
125
126 return shouldRetry, checkErr
127 }
128
129 client := &sourcegraphClient{
130 Root: rootURL,
131 restClient: httpClient,
132 Hostname: hostname,
133 BatchSize: 0,
134 grpcClient: noopGRPCClient{},
135 useGRPC: false, // disable gRPC by default
136 }
137
138 for _, opt := range opts {
139 opt(client)
140 }
141
142 return client
143
144}
145
146// sourcegraphClient contains methods which interact with the sourcegraph API.
147type sourcegraphClient struct {
148 // Root is the base URL for the Sourcegraph instance to index. Normally
149 // http://sourcegraph-frontend-internal or http://localhost:3090.
150 Root *url.URL
151
152 // Hostname is the name we advertise to Sourcegraph when asking for the
153 // list of repositories to index.
154 Hostname string
155
156 // BatchSize is how many repository configurations we request at once. If
157 // zero a value of 10000 is used.
158 BatchSize int
159
160 // restClient is used to make requests to the Sourcegraph instance. Prefer to
161 // use .doRequest() to ensure the appropriate headers are set.
162 restClient *retryablehttp.Client
163
164 // grpcClient is used to make requests to the Sourcegraph instance if gRPC is enabled.
165 grpcClient proto.ZoektConfigurationServiceClient
166
167 // configFingerprint is the last config fingerprint returned from
168 // Sourcegraph. It can be used for future calls to the configuration
169 // endpoint.
170 //
171 // configFingerprint is mutually exclusive with configFingerprintProto - this field
172 // will only be used if gRPC is disabled.
173 configFingerprint string
174
175 // configFingerprintProto is the last config fingerprint (as GRPC) returned from
176 // Sourcegraph. It can be used for future calls to the configuration
177 // endpoint.
178 //
179 // configFingerprintProto is mutually exclusive with configFingerprint - this field
180 // will only be used if gRPC is enabled.
181 configFingerprintProto *proto.Fingerprint
182
183 // configFingerprintReset tracks when we should zero out the
184 // configFingerprint. We want to periodically do this just in case our
185 // configFingerprint logic is faulty. When it is cleared out, we fallback to
186 // calculating everything.
187 configFingerprintReset time.Time
188
189 // useGRPC indicates whether we should use a gRPC client to communicate with Sourcegraph.
190 useGRPC bool
191}
192
193// GetDocumentRanks asks Sourcegraph for a mapping of file paths to rank
194// vectors.
195func (s *sourcegraphClient) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) {
196 if s.useGRPC {
197 return s.getDocumentRanksGRPC(ctx, repoName)
198 }
199
200 return s.getDocumentRanksREST(ctx, repoName)
201}
202
203func (s *sourcegraphClient) getDocumentRanksGRPC(ctx context.Context, repoName string) (RepoPathRanks, error) {
204 resp, err := s.grpcClient.DocumentRanks(ctx, &proto.DocumentRanksRequest{Repository: repoName})
205 if err != nil {
206 return RepoPathRanks{}, err
207 }
208
209 var out RepoPathRanks
210 out.FromProto(resp)
211
212 return out, nil
213}
214
215func (s *sourcegraphClient) getDocumentRanksREST(ctx context.Context, repoName string) (RepoPathRanks, error) {
216 u := s.Root.ResolveReference(&url.URL{
217 Path: "/.internal/ranks/" + strings.Trim(repoName, "/") + "/documents",
218 })
219
220 b, err := s.get(ctx, u)
221 if err != nil {
222 return RepoPathRanks{}, err
223 }
224
225 ranks := RepoPathRanks{}
226 err = json.Unmarshal(b, &ranks)
227 if err != nil {
228 return RepoPathRanks{}, err
229 }
230
231 return ranks, nil
232}
233
234func (s *sourcegraphClient) get(ctx context.Context, u *url.URL) ([]byte, error) {
235 req, err := retryablehttp.NewRequestWithContext(ctx, "GET", u.String(), nil)
236 if err != nil {
237 return nil, err
238 }
239
240 resp, err := s.doRequest(req)
241 if err != nil {
242 return nil, err
243 }
244 defer resp.Body.Close()
245
246 if resp.StatusCode != http.StatusOK {
247 b, err := io.ReadAll(io.LimitReader(resp.Body, 1024))
248 _ = resp.Body.Close()
249 if err != nil {
250 return nil, err
251 }
252 return nil, &url.Error{
253 Op: "Get",
254 URL: u.String(),
255 Err: fmt.Errorf("%s: %s", resp.Status, string(b)),
256 }
257 }
258
259 b, err := io.ReadAll(resp.Body)
260 if err != nil {
261 return nil, err
262 }
263
264 return b, nil
265}
266
267func (s *sourcegraphClient) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
268 repos, err := s.listRepoIDs(ctx, indexed)
269 if err != nil {
270 return nil, fmt.Errorf("listRepoIDs: %w", err)
271 }
272
273 batchSize := s.BatchSize
274 if batchSize == 0 {
275 batchSize = 10_000
276 }
277
278 // Check if we should recalculate everything.
279 if time.Now().After(s.configFingerprintReset) {
280 // for every 500 repos we wait a minute. 2021-12-15 on sourcegraph.com
281 // this works out to every 100 minutes.
282 next := time.Duration(len(indexed)) * time.Minute / 500
283 if min := 5 * time.Minute; next < min {
284 next = min
285 }
286 next += time.Duration(rand.Int63n(int64(next) / 4)) // jitter
287 s.configFingerprintReset = time.Now().Add(next)
288
289 s.configFingerprintProto = nil
290 s.configFingerprint = ""
291 }
292
293 // getIndexOptionsFunc is a function that can be used to get the index
294 // options for a set of repos (while properly handling any configuration fingerprint
295 // changes).
296 //
297 // In general, this function provides a consistent fingerprint for each batch call,
298 // and updates the server state with the new fingerprint. If any of the batch calls
299 // fail, the old fingerprint is restored.
300 type getIndexOptionsFunc func(repos ...uint32) ([]indexOptionsItem, error)
301
302 // default to REST
303 mkGetIndexOptionsFunc := func(tr trace.Trace) getIndexOptionsFunc {
304 startingFingerPrint := s.configFingerprint
305 tr.LazyPrintf("fingerprint: %s", startingFingerPrint)
306
307 first := true
308 return func(repos ...uint32) ([]indexOptionsItem, error) {
309 options, nextFingerPrint, err := s.getIndexOptionsREST(startingFingerPrint, repos...)
310 if err != nil {
311 first = false
312 s.configFingerprint = startingFingerPrint
313
314 return nil, err
315 }
316
317 if first {
318 first = false
319 s.configFingerprint = nextFingerPrint
320
321 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint)
322 }
323
324 return options, nil
325 }
326 }
327
328 // If we enabled GRPC, use our gRPC client instead.
329 if s.useGRPC {
330 mkGetIndexOptionsFunc = func(tr trace.Trace) getIndexOptionsFunc {
331 startingFingerPrint := s.configFingerprintProto
332 tr.LazyPrintf("fingerprint: %s", startingFingerPrint.String())
333
334 first := true
335 return func(repos ...uint32) ([]indexOptionsItem, error) {
336 options, nextFingerPrint, err := s.getIndexOptionsGRPC(ctx, startingFingerPrint, repos)
337 if err != nil {
338 first = false
339 s.configFingerprintProto = startingFingerPrint
340
341 return nil, err
342 }
343
344 if first {
345 first = false
346 s.configFingerprintProto = nextFingerPrint
347 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint.String())
348 }
349
350 return options, nil
351 }
352 }
353 }
354
355 iterate := func(f func(IndexOptions)) {
356 start := time.Now()
357 tr := trace.New("getIndexOptions", "")
358 tr.LazyPrintf("getting index options for %d repos", len(repos))
359
360 defer func() {
361 metricResolveRevisionsDuration.Observe(time.Since(start).Seconds())
362 tr.Finish()
363 }()
364
365 getIndexOptions := mkGetIndexOptionsFunc(tr)
366
367 // We ask the frontend to get index options in batches.
368 for repos := range batched(repos, batchSize) {
369 start := time.Now()
370 options, err := getIndexOptions(repos...)
371 duration := time.Since(start)
372
373 if err != nil {
374 metricResolveRevisionDuration.WithLabelValues("false").Observe(duration.Seconds())
375 tr.LazyPrintf("failed fetching options batch: %v", err)
376 tr.SetError()
377
378 continue
379 }
380
381 metricResolveRevisionDuration.WithLabelValues("true").Observe(duration.Seconds())
382
383 for _, o := range options {
384 metricGetIndexOptions.Inc()
385
386 if o.Error != "" {
387 metricGetIndexOptionsError.Inc()
388 tr.LazyPrintf("failed fetching options for %v: %v", o.Name, o.Error)
389 tr.SetError()
390
391 continue
392 }
393 f(o.IndexOptions)
394 }
395 }
396 }
397
398 return &SourcegraphListResult{
399 IDs: repos,
400 IterateIndexOptions: iterate,
401 }, nil
402}
403
404func (s *sourcegraphClient) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
405 batchSize := s.BatchSize
406 if batchSize == 0 {
407 batchSize = 10_000
408 }
409
410 getIndexOptions := func(repos ...uint32) ([]indexOptionsItem, error) {
411 opts, _, err := s.getIndexOptionsREST("", repos...)
412 return opts, err
413 }
414
415 if s.useGRPC {
416 getIndexOptions = func(repos ...uint32) ([]indexOptionsItem, error) {
417 opts, _, err := s.getIndexOptionsGRPC(context.Background(), nil, repos)
418 return opts, err
419 }
420 }
421
422 for repos := range batched(repos, batchSize) {
423 opts, err := getIndexOptions(repos...)
424 if err != nil {
425 for _, id := range repos {
426 onError(id, err)
427 }
428 continue
429 }
430 for _, o := range opts {
431 if o.RepoID > 0 && o.Error != "" {
432 onError(o.RepoID, errors.New(o.Error))
433 }
434 if o.Error == "" {
435 onSuccess(o.IndexOptions)
436 }
437 }
438 }
439}
440
441// indexOptionsItem wraps IndexOptions to also include an error returned by
442// the API.
443type indexOptionsItem struct {
444 IndexOptions
445 Error string
446}
447
448func (o *indexOptionsItem) FromProto(x *proto.ZoektIndexOptions) {
449 branches := make([]zoekt.RepositoryBranch, 0, len(x.Branches))
450 for _, b := range x.GetBranches() {
451 branches = append(branches, zoekt.RepositoryBranch{
452 Name: b.GetName(),
453 Version: b.GetVersion(),
454 })
455 }
456
457 item := indexOptionsItem{}
458 languageMap := make(map[string]uint8)
459
460 for _, lang := range x.GetLanguageMap() {
461 languageMap[lang.GetLanguage()] = uint8(lang.GetCtags().Number())
462 }
463
464 item.IndexOptions = IndexOptions{
465 RepoID: uint32(x.GetRepoId()),
466 LargeFiles: x.GetLargeFiles(),
467 Symbols: x.GetSymbols(),
468 Branches: branches,
469 Name: x.GetName(),
470
471 Priority: x.GetPriority(),
472
473 DocumentRanksVersion: x.GetDocumentRanksVersion(),
474
475 Public: x.GetPublic(),
476 Fork: x.GetFork(),
477 Archived: x.GetArchived(),
478
479 LanguageMap: languageMap,
480 }
481
482 item.Error = x.GetError()
483
484 *o = item
485}
486
487func (o *indexOptionsItem) ToProto() *proto.ZoektIndexOptions {
488 branches := make([]*proto.ZoektRepositoryBranch, 0, len(o.Branches))
489 for _, b := range o.Branches {
490 branches = append(branches, &proto.ZoektRepositoryBranch{
491 Name: b.Name,
492 Version: b.Version,
493 })
494 }
495
496 languageMap := make([]*proto.LanguageMapping, 0, len(o.LanguageMap))
497
498 for lang, parser := range o.LanguageMap {
499 languageMap = append(languageMap, &proto.LanguageMapping{
500 Language: lang,
501 Ctags: proto.CTagsParserType(parser),
502 })
503 }
504
505 return &proto.ZoektIndexOptions{
506 RepoId: int32(o.RepoID),
507 LargeFiles: o.LargeFiles,
508 Symbols: o.Symbols,
509 Branches: branches,
510 Name: o.Name,
511
512 Priority: o.Priority,
513
514 DocumentRanksVersion: o.DocumentRanksVersion,
515
516 Public: o.Public,
517 Fork: o.Fork,
518 Archived: o.Archived,
519
520 Error: o.Error,
521
522 LanguageMap: languageMap,
523 }
524}
525
526func (s *sourcegraphClient) getIndexOptionsGRPC(ctx context.Context, fingerprint *proto.Fingerprint, repos []uint32) ([]indexOptionsItem, *proto.Fingerprint, error) {
527 repoIDs := make([]int32, 0, len(repos))
528 for _, id := range repos {
529 repoIDs = append(repoIDs, int32(id))
530 }
531
532 req := proto.SearchConfigurationRequest{
533 RepoIds: repoIDs,
534 Fingerprint: fingerprint,
535 }
536
537 response, err := s.grpcClient.SearchConfiguration(ctx, &req)
538 if err != nil {
539 return nil, nil, err
540 }
541
542 protoItems := response.GetUpdatedOptions()
543 items := make([]indexOptionsItem, 0, len(protoItems))
544 for _, x := range protoItems {
545 var item indexOptionsItem
546 item.FromProto(x)
547 item.IndexOptions.CloneURL = s.getCloneURL(item.Name)
548
549 items = append(items, item)
550 }
551
552 return items, response.GetFingerprint(), nil
553}
554
555const fingerprintHeader = "X-Sourcegraph-Config-Fingerprint"
556
557func (s *sourcegraphClient) getIndexOptionsREST(fingerprint string, repos ...uint32) ([]indexOptionsItem, string, error) {
558 u := s.Root.ResolveReference(&url.URL{
559 Path: "/.internal/search/configuration",
560 })
561
562 repoIDs := make([]string, len(repos))
563 for i, id := range repos {
564 repoIDs[i] = strconv.Itoa(int(id))
565 }
566 data := url.Values{"repoID": repoIDs}
567 req, err := retryablehttp.NewRequest("POST", u.String(), []byte(data.Encode()))
568 if err != nil {
569 return nil, "", err
570 }
571 req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
572 if fingerprint != "" {
573 req.Header.Set(fingerprintHeader, fingerprint)
574 }
575
576 resp, err := s.doRequest(req)
577 if err != nil {
578 return nil, "", err
579 }
580 defer resp.Body.Close()
581
582 if resp.StatusCode != http.StatusOK {
583 b, err := io.ReadAll(io.LimitReader(resp.Body, 1024))
584 _ = resp.Body.Close()
585 if err != nil {
586 return nil, "", err
587 }
588 return nil, "", &url.Error{
589 Op: "Get",
590 URL: u.String(),
591 Err: fmt.Errorf("%s: %s", resp.Status, string(b)),
592 }
593 }
594
595 dec := json.NewDecoder(resp.Body)
596 var opts []indexOptionsItem
597 for {
598 var opt indexOptionsItem
599 err := dec.Decode(&opt)
600 if err == io.EOF {
601 break
602 }
603 if err != nil {
604 return nil, "", fmt.Errorf("error decoding body: %w", err)
605 }
606 opt.CloneURL = s.getCloneURL(opt.Name)
607 opts = append(opts, opt)
608 }
609
610 return opts, resp.Header.Get(fingerprintHeader), nil
611}
612
613func (s *sourcegraphClient) getCloneURL(name string) string {
614 return s.Root.ResolveReference(&url.URL{Path: path.Join("/.internal/git", name)}).String()
615}
616
617func (s *sourcegraphClient) listRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) {
618 if s.useGRPC {
619 return s.listRepoIDsGRPC(ctx, indexed)
620 }
621
622 return s.listRepoIDsREST(ctx, indexed)
623}
624
625func (s *sourcegraphClient) listRepoIDsGRPC(ctx context.Context, indexed []uint32) ([]uint32, error) {
626 var request proto.ListRequest
627 request.Hostname = s.Hostname
628 request.IndexedIds = make([]int32, 0, len(indexed))
629 for _, id := range indexed {
630 request.IndexedIds = append(request.IndexedIds, int32(id))
631 }
632
633 response, err := s.grpcClient.List(ctx, &request)
634 if err != nil {
635 return nil, err
636 }
637
638 repoIDs := make([]uint32, 0, len(response.RepoIds))
639 for _, id := range response.RepoIds {
640 repoIDs = append(repoIDs, uint32(id))
641 }
642
643 return repoIDs, nil
644}
645
646func (s *sourcegraphClient) listRepoIDsREST(_ context.Context, indexed []uint32) ([]uint32, error) {
647 body, err := json.Marshal(&struct {
648 Hostname string
649 IndexedIDs []uint32
650 }{
651 Hostname: s.Hostname,
652 IndexedIDs: indexed,
653 })
654 if err != nil {
655 return nil, err
656 }
657
658 u := s.Root.ResolveReference(&url.URL{Path: "/.internal/repos/index"})
659 req, err := retryablehttp.NewRequest(http.MethodPost, u.String(), bytes.NewReader(body))
660 if err != nil {
661 return nil, err
662 }
663 req.Header.Set("Content-Type", "application/json; charset=utf-8")
664
665 resp, err := s.doRequest(req)
666 if err != nil {
667 return nil, err
668 }
669 defer resp.Body.Close()
670
671 if resp.StatusCode != http.StatusOK {
672 return nil, fmt.Errorf("failed to list repositories: status %s", resp.Status)
673 }
674
675 var data struct {
676 RepoIDs []uint32
677 }
678 err = json.NewDecoder(resp.Body).Decode(&data)
679 if err != nil {
680 return nil, err
681 }
682
683 return data.RepoIDs, nil
684}
685
686type indexStatus struct {
687 RepoID uint32
688 Branches []zoekt.RepositoryBranch
689 IndexTimeUnix int64
690}
691
692type updateIndexStatusRequest struct {
693 Repositories []indexStatus
694}
695
696func (u *updateIndexStatusRequest) ToProto() *proto.UpdateIndexStatusRequest {
697 repositories := make([]*proto.UpdateIndexStatusRequest_Repository, 0, len(u.Repositories))
698
699 for _, repo := range u.Repositories {
700 branches := make([]*proto.ZoektRepositoryBranch, 0, len(repo.Branches))
701
702 for _, branch := range repo.Branches {
703 branches = append(branches, &proto.ZoektRepositoryBranch{
704 Name: branch.Name,
705 Version: branch.Version,
706 })
707 }
708
709 repositories = append(repositories, &proto.UpdateIndexStatusRequest_Repository{
710 RepoId: repo.RepoID,
711 Branches: branches,
712 IndexTimeUnix: repo.IndexTimeUnix,
713 })
714 }
715
716 return &proto.UpdateIndexStatusRequest{
717 Repositories: repositories,
718 }
719}
720
721func (u *updateIndexStatusRequest) FromProto(x *proto.UpdateIndexStatusRequest) {
722 protoRepositories := x.GetRepositories()
723 repositories := make([]indexStatus, 0, len(protoRepositories))
724
725 for _, repo := range x.GetRepositories() {
726 protoBranches := repo.GetBranches()
727 branches := make([]zoekt.RepositoryBranch, 0, len(protoBranches))
728
729 for _, branch := range repo.GetBranches() {
730 branches = append(branches, zoekt.RepositoryBranch{
731 Name: branch.GetName(),
732 Version: branch.GetVersion(),
733 })
734 }
735
736 repositories = append(repositories, indexStatus{
737 RepoID: repo.GetRepoId(),
738 Branches: branches,
739 IndexTimeUnix: repo.GetIndexTimeUnix(),
740 })
741 }
742
743 *u = updateIndexStatusRequest{
744 Repositories: repositories,
745 }
746}
747
748// UpdateIndexStatus sends a request to Sourcegraph to confirm that the given
749// repositories have been indexed.
750func (s *sourcegraphClient) UpdateIndexStatus(repositories []indexStatus) error {
751 r := updateIndexStatusRequest{Repositories: repositories}
752
753 if s.useGRPC {
754 return s.updateIndexStatusGRPC(r)
755 }
756
757 return s.updateIndexStatusREST(r)
758}
759
760func (s *sourcegraphClient) updateIndexStatusGRPC(r updateIndexStatusRequest) error {
761 request := r.ToProto()
762 _, err := s.grpcClient.UpdateIndexStatus(context.Background(), request)
763
764 if err != nil {
765 return fmt.Errorf("failed to update index status: %w", err)
766 }
767
768 return nil
769}
770
771func (s *sourcegraphClient) updateIndexStatusREST(r updateIndexStatusRequest) error {
772 payload, err := json.Marshal(r)
773 if err != nil {
774 return err
775 }
776
777 u := s.Root.ResolveReference(&url.URL{Path: "/.internal/search/index-status"})
778 req, err := retryablehttp.NewRequest(http.MethodPost, u.String(), bytes.NewReader(payload))
779 if err != nil {
780 return err
781 }
782 req.Header.Set("Content-Type", "application/json; charset=utf-8")
783
784 resp, err := s.doRequest(req)
785 if err != nil {
786 return err
787 }
788 defer resp.Body.Close()
789
790 if resp.StatusCode != http.StatusOK {
791 return fmt.Errorf("failed to update index status: status %s", resp.Status)
792 }
793
794 return nil
795}
796
797// doRequest executes the provided request after adding the appropriate headers
798// for interacting with a Sourcegraph instance.
799func (s *sourcegraphClient) doRequest(req *retryablehttp.Request) (*http.Response, error) {
800 // Make all requests as an internal user.
801 //
802 // Should match github.com/sourcegraph/sourcegraph/internal/actor.headerKeyActorUID
803 // and github.com/sourcegraph/sourcegraph/internal/actor.headerValueInternalActor
804 req.Header.Set("X-Sourcegraph-Actor-UID", "internal")
805 return s.restClient.Do(req)
806}
807
808type sourcegraphFake struct {
809 RootDir string
810 Log *log.Logger
811}
812
813// GetDocumentRanks expects a file where each line has the following format:
814// path<tab>rank... where rank is a float64.
815func (sf sourcegraphFake) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) {
816 dir := filepath.Join(sf.RootDir, filepath.FromSlash(repoName))
817
818 fd, err := os.Open(filepath.Join(dir, "SG_DOCUMENT_RANKS"))
819 if err != nil {
820 return RepoPathRanks{}, err
821 }
822
823 ranks := RepoPathRanks{}
824
825 sum := 0.0
826 count := 0
827 scanner := bufio.NewScanner(fd)
828 for scanner.Scan() {
829 s := scanner.Text()
830 pathRanks := strings.Split(s, "\t")
831 if rank, err := strconv.ParseFloat(pathRanks[1], 64); err == nil {
832 ranks.Paths[pathRanks[0]] = rank
833 sum += rank
834 count++
835 }
836 }
837
838 if err := scanner.Err(); err != nil {
839 return RepoPathRanks{}, err
840 }
841
842 ranks.MeanRank = sum / float64(count)
843 return ranks, nil
844}
845
846func floats64(s string) []float64 {
847 parts := strings.Split(s, ",")
848
849 var r []float64
850 for _, rank := range parts {
851 f, err := strconv.ParseFloat(rank, 64)
852 if err != nil {
853 return nil
854 }
855 r = append(r, f)
856 }
857
858 return r
859}
860
861func (sf sourcegraphFake) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
862 repos, err := sf.ListRepoIDs(ctx, indexed)
863 if err != nil {
864 return nil, err
865 }
866
867 iterate := func(f func(IndexOptions)) {
868 opts, err := sf.GetIndexOptions(repos...)
869 if err != nil {
870 sf.Log.Printf("WARN: ignoring GetIndexOptions error: %v", err)
871 }
872 for _, opt := range opts {
873 if opt.Error != "" {
874 sf.Log.Printf("WARN: ignoring GetIndexOptions error for %s: %v", opt.Name, opt.Error)
875 continue
876 }
877 f(opt.IndexOptions)
878 }
879 }
880
881 return &SourcegraphListResult{
882 IDs: repos,
883 IterateIndexOptions: iterate,
884 }, nil
885}
886
887func (sf sourcegraphFake) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
888 opts, err := sf.GetIndexOptions(repos...)
889 if err != nil {
890 for _, id := range repos {
891 onError(id, err)
892 }
893 return
894 }
895 for _, o := range opts {
896 if o.RepoID > 0 && o.Error != "" {
897 onError(o.RepoID, errors.New(o.Error))
898 }
899 if o.Error == "" {
900 onSuccess(o.IndexOptions)
901 }
902 }
903}
904
905func (sf sourcegraphFake) GetIndexOptions(repos ...uint32) ([]indexOptionsItem, error) {
906 reposIdx := map[uint32]int{}
907 for i, id := range repos {
908 reposIdx[id] = i
909 }
910
911 items := make([]indexOptionsItem, len(repos))
912 err := sf.visitRepos(func(name string) {
913 idx, ok := reposIdx[sf.id(name)]
914 if !ok {
915 return
916 }
917 opts, err := sf.getIndexOptions(name)
918 if err != nil {
919 items[idx] = indexOptionsItem{Error: err.Error()}
920 } else {
921 items[idx] = indexOptionsItem{IndexOptions: opts}
922 }
923 })
924
925 if err != nil {
926 return nil, err
927 }
928
929 for i := range items {
930 if items[i].Error == "" && items[i].RepoID == 0 {
931 items[i].Error = "not found"
932 }
933 }
934
935 return items, nil
936}
937
938func (sf sourcegraphFake) getIndexOptions(name string) (IndexOptions, error) {
939 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name))
940 exists := func(p string) bool {
941 _, err := os.Stat(filepath.Join(dir, p))
942 return err == nil
943 }
944 float := func(p string) float64 {
945 b, _ := os.ReadFile(filepath.Join(dir, p))
946 f, _ := strconv.ParseFloat(string(bytes.TrimSpace(b)), 64)
947 return f
948 }
949
950 opts := IndexOptions{
951 RepoID: sf.id(name),
952 Name: name,
953 CloneURL: sf.getCloneURL(name),
954 Symbols: true,
955
956 Public: !exists("SG_PRIVATE"),
957 Fork: exists("SG_FORK"),
958 Archived: exists("SG_ARCHIVED"),
959
960 Priority: float("SG_PRIORITY"),
961 }
962
963 if stat, err := os.Stat(filepath.Join(dir, "SG_DOCUMENT_RANKS")); err == nil {
964 opts.DocumentRanksVersion = stat.ModTime().String()
965 }
966
967 branches, err := sf.getBranches(name)
968 if err != nil {
969 return opts, err
970 }
971 opts.Branches = branches
972
973 return opts, nil
974}
975
976func (sf sourcegraphFake) getBranches(name string) ([]zoekt.RepositoryBranch, error) {
977 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name))
978 repo, err := git.PlainOpen(dir)
979 if err != nil {
980 return nil, err
981 }
982
983 cfg, err := repo.Config()
984 if err != nil {
985 return nil, err
986 }
987
988 sec := cfg.Raw.Section("zoekt")
989 branches := sec.Options.GetAll("branch")
990 if len(branches) == 0 {
991 branches = append(branches, "HEAD")
992 }
993
994 rBranches := make([]zoekt.RepositoryBranch, 0, len(branches))
995 for _, branch := range branches {
996 cmd := exec.Command("git", "rev-parse", branch)
997 cmd.Dir = dir
998 if b, err := cmd.Output(); err != nil {
999 sf.Log.Printf("WARN: Could not get branch %s/%s", name, branch)
1000 } else {
1001 version := string(bytes.TrimSpace(b))
1002 rBranches = append(rBranches, zoekt.RepositoryBranch{
1003 Name: branch,
1004 Version: version,
1005 })
1006 }
1007 }
1008
1009 if len(rBranches) == 0 {
1010 return nil, fmt.Errorf("WARN: Could not get any branch revisions for repo %s", name)
1011 }
1012
1013 return rBranches, nil
1014}
1015
1016func (sf sourcegraphFake) id(name string) uint32 {
1017 // allow overriding the ID.
1018 idPath := filepath.Join(sf.RootDir, filepath.FromSlash(name), "SG_ID")
1019 if b, _ := os.ReadFile(idPath); len(b) > 0 {
1020 id, err := strconv.Atoi(strings.TrimSpace(string(b)))
1021 if err == nil {
1022 return uint32(id)
1023 }
1024 }
1025 return fakeID(name)
1026}
1027
1028func (sf sourcegraphFake) getCloneURL(name string) string {
1029 return filepath.Join(sf.RootDir, filepath.FromSlash(name))
1030}
1031
1032func (sf sourcegraphFake) ListRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) {
1033 var repos []uint32
1034 err := sf.visitRepos(func(name string) {
1035 repos = append(repos, sf.id(name))
1036 })
1037 return repos, err
1038}
1039
1040func (sf sourcegraphFake) visitRepos(visit func(name string)) error {
1041 return filepath.Walk(sf.RootDir, func(path string, fi os.FileInfo, fileErr error) error {
1042 if fileErr != nil {
1043 sf.Log.Printf("WARN: ignoring error searching %s: %v", path, fileErr)
1044 return nil
1045 }
1046 if !fi.IsDir() {
1047 return nil
1048 }
1049
1050 gitdir := filepath.Join(path, ".git")
1051 if fi, err := os.Stat(gitdir); err != nil || !fi.IsDir() {
1052 return nil
1053 }
1054
1055 subpath, err := filepath.Rel(sf.RootDir, path)
1056 if err != nil {
1057 // According to WalkFunc docs, path is always filepath.Join(root,
1058 // subpath). So Rel should always work.
1059 return fmt.Errorf("filepath.Walk returned %s which is not relative to %s: %w", path, sf.RootDir, err)
1060 }
1061
1062 name := filepath.ToSlash(subpath)
1063 visit(name)
1064
1065 return filepath.SkipDir
1066 })
1067}
1068
1069func (s sourcegraphFake) UpdateIndexStatus(repositories []indexStatus) error {
1070 // noop
1071 return nil
1072}
1073
1074// fakeID returns a deterministic ID based on name. Used for fakes and tests.
1075func fakeID(name string) uint32 {
1076 // magic at the end is to ensure we get a positive number when casting.
1077 return uint32(crc32.ChecksumIEEE([]byte(name))%(1<<31-1) + 1)
1078}
1079
1080type sourcegraphNop struct{}
1081
1082func (s sourcegraphNop) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
1083 return nil, nil
1084}
1085
1086func (s sourcegraphNop) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
1087 return
1088}
1089
1090func (s sourcegraphNop) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) {
1091 return RepoPathRanks{}, nil
1092}
1093
1094func (s sourcegraphNop) UpdateIndexStatus(repositories []indexStatus) error {
1095 return nil
1096}
1097
1098type RepoPathRanks struct {
1099 MeanRank float64 `json:"mean_reference_count"`
1100 Paths map[string]float64 `json:"paths"`
1101}
1102
1103func (r *RepoPathRanks) FromProto(x *proto.DocumentRanksResponse) {
1104 protoPaths := x.GetPaths()
1105 ranks := make(map[string]float64, len(protoPaths))
1106 for filePath, rank := range protoPaths {
1107 ranks[filePath] = rank
1108 }
1109
1110 *r = RepoPathRanks{
1111 MeanRank: x.GetMeanRank(),
1112 Paths: ranks,
1113 }
1114}
1115
1116func (r *RepoPathRanks) ToProto() *proto.DocumentRanksResponse {
1117 paths := make(map[string]float64, len(r.Paths))
1118 for filePath, rank := range r.Paths {
1119 paths[filePath] = rank
1120 }
1121
1122 return &proto.DocumentRanksResponse{
1123 MeanRank: r.MeanRank,
1124 Paths: paths,
1125 }
1126}
1127
1128type noopGRPCClient struct{}
1129
1130func (n noopGRPCClient) SearchConfiguration(ctx context.Context, in *proto.SearchConfigurationRequest, opts ...grpc.CallOption) (*proto.SearchConfigurationResponse, error) {
1131 return nil, fmt.Errorf("grpc client not enabled")
1132}
1133
1134func (n noopGRPCClient) List(ctx context.Context, in *proto.ListRequest, opts ...grpc.CallOption) (*proto.ListResponse, error) {
1135 return nil, fmt.Errorf("grpc client not enabled")
1136}
1137
1138func (n noopGRPCClient) DocumentRanks(ctx context.Context, in *proto.DocumentRanksRequest, opts ...grpc.CallOption) (*proto.DocumentRanksResponse, error) {
1139 return nil, fmt.Errorf("grpc client not enabled")
1140}
1141
1142func (n noopGRPCClient) UpdateIndexStatus(ctx context.Context, in *proto.UpdateIndexStatusRequest, opts ...grpc.CallOption) (*proto.UpdateIndexStatusResponse, error) {
1143 return nil, fmt.Errorf("grpc client not enabled")
1144}
1145
1146var _ proto.ZoektConfigurationServiceClient = noopGRPCClient{}