fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "hash/crc32"
11 "io"
12 "log"
13 "math/rand"
14 "net/http"
15 "net/url"
16 "os"
17 "os/exec"
18 "path"
19 "path/filepath"
20 "strconv"
21 "strings"
22 "time"
23
24 "github.com/go-git/go-git/v5"
25 retryablehttp "github.com/hashicorp/go-retryablehttp"
26 "golang.org/x/net/trace"
27 "google.golang.org/grpc"
28
29 "github.com/sourcegraph/zoekt"
30 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
31)
32
33// SourcegraphListResult is the return value of Sourcegraph.List. It is its
34// own type since internally we batch the calculation of index options. This
35// is exposed via IterateIndexOptions.
36//
37// This type has state and is coupled to the Sourcegraph implementation.
38type SourcegraphListResult struct {
39 // IDs is the set of Sourcegraph repository IDs that this replica needs
40 // to index.
41 IDs []uint32
42
43 // IterateIndexOptions best effort resolves the IndexOptions for RepoIDs. If
44 // any repository fails it internally logs. It uses the "config fingerprint"
45 // to reduce the amount of work done. This means we only resolve options for
46 // repositories which have been mutated since the last Sourcegraph.List
47 // call.
48 //
49 // Note: this has a side-effect of setting a the "config fingerprint". The
50 // config fingerprint means we only calculate index options for repositories
51 // that have changed since the last call to IterateIndexOptions. If you want
52 // to force calculation of index options use
53 // Sourcegraph.ForceIterateIndexOptions.
54 //
55 // Note: This should not be called concurrently with the Sourcegraph client.
56 IterateIndexOptions func(func(IndexOptions))
57}
58
59// Sourcegraph represents the Sourcegraph service. It informs the indexserver
60// what to index and which options to use.
61type Sourcegraph interface {
62 // List returns a list of repository IDs to index as well as a facility to
63 // fetch the indexing options.
64 //
65 // Note: The return value is not safe to use concurrently with future calls
66 // to List.
67 List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error)
68
69 // ForceIterateIndexOptions will best-effort calculate the index options for
70 // all repos. For each repo it will call either onSuccess or onError. This
71 // is the forced version of IterateIndexOptions, so will always calculate
72 // options for each id in repos.
73 ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32)
74
75 // GetRepoRank returns a rank vector for the given repository. Repositories are
76 // assumed to be ordered by each pairwise component of the resulting vector,
77 // higher ranks coming earlier.
78 GetRepoRank(ctx context.Context, repoName string) ([]float64, error)
79
80 // GetDocumentRanks returns a map from paths within the given repo to their
81 // rank vectors. Paths are assumed to be ordered by each pairwise component of
82 // the resulting vector, higher ranks coming earlier
83 GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error)
84
85 // UpdateIndexStatus sends a request to Sourcegraph to confirm that the
86 // given repositories have been indexed.
87 UpdateIndexStatus(repositories []indexStatus) error
88}
89
90type SourcegraphClientOption func(*sourcegraphClient)
91
92// WithBatchSize controls how many repository configurations we request a time.
93// If BatchSize is 0, we default to requesting 10,000 repositories at once.
94func WithBatchSize(batchSize int) SourcegraphClientOption {
95 return func(c *sourcegraphClient) {
96 c.BatchSize = batchSize
97 }
98}
99
100// WithShouldUseGRPC enables or disables the use of gRPC for communicating with Sourcegraph.
101func WithShouldUseGRPC(useGRPC bool) SourcegraphClientOption {
102 return func(c *sourcegraphClient) {
103 c.useGRPC = useGRPC
104 }
105}
106
107// WithGRPCClient sets the gRPC client to use for communicating with Sourcegraph.
108func WithGRPCClient(client proto.ZoektConfigurationServiceClient) SourcegraphClientOption {
109 return func(c *sourcegraphClient) {
110 c.grpcClient = client
111 }
112}
113
114func newSourcegraphClient(rootURL *url.URL, hostname string, opts ...SourcegraphClientOption) *sourcegraphClient {
115 httpClient := retryablehttp.NewClient()
116 httpClient.Logger = debug
117
118 // Sourcegraph might return an error message in the body if StatusCode==500. The
119 // default behavior of the go-retryablehttp restClient is to drain the body and not
120 // to propagate the error. Hence, we call ErrorPropagatedRetryPolicy instead of
121 // DefaultRetryPolicy and augment the error with the response body if possible.
122 httpClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) {
123 shouldRetry, checkErr := retryablehttp.ErrorPropagatedRetryPolicy(ctx, resp, err)
124
125 if resp != nil && resp.StatusCode == http.StatusInternalServerError {
126 if b, e := io.ReadAll(resp.Body); e == nil {
127 checkErr = fmt.Errorf("%w: body=%q", checkErr, string(b))
128 }
129 }
130
131 return shouldRetry, checkErr
132 }
133
134 client := &sourcegraphClient{
135 Root: rootURL,
136 restClient: httpClient,
137 Hostname: hostname,
138 BatchSize: 0,
139 grpcClient: noopGRPCClient{},
140 useGRPC: false, // disable gRPC by default
141 }
142
143 for _, opt := range opts {
144 opt(client)
145 }
146
147 return client
148
149}
150
151// sourcegraphClient contains methods which interact with the sourcegraph API.
152type sourcegraphClient struct {
153 // Root is the base URL for the Sourcegraph instance to index. Normally
154 // http://sourcegraph-frontend-internal or http://localhost:3090.
155 Root *url.URL
156
157 // Hostname is the name we advertise to Sourcegraph when asking for the
158 // list of repositories to index.
159 Hostname string
160
161 // BatchSize is how many repository configurations we request at once. If
162 // zero a value of 10000 is used.
163 BatchSize int
164
165 // restClient is used to make requests to the Sourcegraph instance. Prefer to
166 // use .doRequest() to ensure the appropriate headers are set.
167 restClient *retryablehttp.Client
168
169 // grpcClient is used to make requests to the Sourcegraph instance if gRPC is enabled.
170 grpcClient proto.ZoektConfigurationServiceClient
171
172 // configFingerprint is the last config fingerprint returned from
173 // Sourcegraph. It can be used for future calls to the configuration
174 // endpoint.
175 //
176 // configFingerprint is mutually exclusive with configFingerprintProto - this field
177 // will only be used if gRPC is disabled.
178 configFingerprint string
179
180 // configFingerprintProto is the last config fingerprint (as GRPC) returned from
181 // Sourcegraph. It can be used for future calls to the configuration
182 // endpoint.
183 //
184 // configFingerprintProto is mutually exclusive with configFingerprint - this field
185 // will only be used if gRPC is enabled.
186 configFingerprintProto *proto.Fingerprint
187
188 // configFingerprintReset tracks when we should zero out the
189 // configFingerprint. We want to periodically do this just in case our
190 // configFingerprint logic is faulty. When it is cleared out, we fallback to
191 // calculating everything.
192 configFingerprintReset time.Time
193
194 // useGRPC indicates whether we should use a gRPC client to communicate with Sourcegraph.
195 useGRPC bool
196}
197
198// GetRepoRank asks Sourcegraph for the rank vector of repoName.
199func (s *sourcegraphClient) GetRepoRank(ctx context.Context, repoName string) ([]float64, error) {
200 if s.useGRPC {
201 return s.getRepoRankGRPC(ctx, repoName)
202 }
203
204 return s.getRepoRankREST(ctx, repoName)
205}
206
207func (s *sourcegraphClient) getRepoRankGRPC(ctx context.Context, repoName string) ([]float64, error) {
208 resp, err := s.grpcClient.RepositoryRank(ctx, &proto.RepositoryRankRequest{Repository: repoName})
209 if err != nil {
210 return nil, err
211 }
212
213 return resp.GetRank(), nil
214}
215
216func (s *sourcegraphClient) getRepoRankREST(ctx context.Context, repoName string) ([]float64, error) {
217 u := s.Root.ResolveReference(&url.URL{
218 Path: "/.internal/ranks/" + strings.Trim(repoName, "/"),
219 })
220
221 b, err := s.get(ctx, u)
222 if err != nil {
223 return nil, err
224 }
225
226 var ranks []float64
227 err = json.Unmarshal(b, &ranks)
228 if err != nil {
229 return nil, err
230 }
231
232 return ranks, nil
233}
234
235// GetDocumentRanks asks Sourcegraph for a mapping of file paths to rank
236// vectors.
237func (s *sourcegraphClient) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) {
238 if s.useGRPC {
239 return s.getDocumentRanksGRPC(ctx, repoName)
240 }
241
242 return s.getDocumentRanksREST(ctx, repoName)
243}
244
245func (s *sourcegraphClient) getDocumentRanksGRPC(ctx context.Context, repoName string) (RepoPathRanks, error) {
246 resp, err := s.grpcClient.DocumentRanks(ctx, &proto.DocumentRanksRequest{Repository: repoName})
247 if err != nil {
248 return RepoPathRanks{}, err
249 }
250
251 var out RepoPathRanks
252 out.FromProto(resp)
253
254 return out, nil
255}
256
257func (s *sourcegraphClient) getDocumentRanksREST(ctx context.Context, repoName string) (RepoPathRanks, error) {
258 u := s.Root.ResolveReference(&url.URL{
259 Path: "/.internal/ranks/" + strings.Trim(repoName, "/") + "/documents",
260 })
261
262 b, err := s.get(ctx, u)
263 if err != nil {
264 return RepoPathRanks{}, err
265 }
266
267 ranks := RepoPathRanks{}
268 err = json.Unmarshal(b, &ranks)
269 if err != nil {
270 return RepoPathRanks{}, err
271 }
272
273 return ranks, nil
274}
275
276func (s *sourcegraphClient) get(ctx context.Context, u *url.URL) ([]byte, error) {
277 req, err := retryablehttp.NewRequestWithContext(ctx, "GET", u.String(), nil)
278 if err != nil {
279 return nil, err
280 }
281
282 resp, err := s.doRequest(req)
283 if err != nil {
284 return nil, err
285 }
286 defer resp.Body.Close()
287
288 if resp.StatusCode != http.StatusOK {
289 b, err := io.ReadAll(io.LimitReader(resp.Body, 1024))
290 _ = resp.Body.Close()
291 if err != nil {
292 return nil, err
293 }
294 return nil, &url.Error{
295 Op: "Get",
296 URL: u.String(),
297 Err: fmt.Errorf("%s: %s", resp.Status, string(b)),
298 }
299 }
300
301 b, err := io.ReadAll(resp.Body)
302 if err != nil {
303 return nil, err
304 }
305
306 return b, nil
307}
308
309func (s *sourcegraphClient) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
310 repos, err := s.listRepoIDs(ctx, indexed)
311 if err != nil {
312 return nil, fmt.Errorf("listRepoIDs: %w", err)
313 }
314
315 batchSize := s.BatchSize
316 if batchSize == 0 {
317 batchSize = 10_000
318 }
319
320 // Check if we should recalculate everything.
321 if time.Now().After(s.configFingerprintReset) {
322 // for every 500 repos we wait a minute. 2021-12-15 on sourcegraph.com
323 // this works out to every 100 minutes.
324 next := time.Duration(len(indexed)) * time.Minute / 500
325 if min := 5 * time.Minute; next < min {
326 next = min
327 }
328 next += time.Duration(rand.Int63n(int64(next) / 4)) // jitter
329 s.configFingerprintReset = time.Now().Add(next)
330
331 s.configFingerprintProto = nil
332 s.configFingerprint = ""
333 }
334
335 // getIndexOptionsFunc is a function that can be used to get the index
336 // options for a set of repos (while properly handling any configuration fingerprint
337 // changes).
338 //
339 // In general, this function provides a consistent fingerprint for each batch call,
340 // and updates the server state with the new fingerprint. If any of the batch calls
341 // fail, the old fingerprint is restored.
342 type getIndexOptionsFunc func(repos ...uint32) ([]indexOptionsItem, error)
343
344 // default to REST
345 mkGetIndexOptionsFunc := func(tr trace.Trace) getIndexOptionsFunc {
346 startingFingerPrint := s.configFingerprint
347 tr.LazyPrintf("fingerprint: %s", startingFingerPrint)
348
349 first := true
350 return func(repos ...uint32) ([]indexOptionsItem, error) {
351 options, nextFingerPrint, err := s.getIndexOptionsREST(startingFingerPrint, repos...)
352 if err != nil {
353 first = false
354 s.configFingerprint = startingFingerPrint
355
356 return nil, err
357 }
358
359 if first {
360 first = false
361 s.configFingerprint = nextFingerPrint
362
363 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint)
364 }
365
366 return options, nil
367 }
368 }
369
370 // If we enabled GRPC, use our gRPC client instead.
371 if s.useGRPC {
372 mkGetIndexOptionsFunc = func(tr trace.Trace) getIndexOptionsFunc {
373 startingFingerPrint := s.configFingerprintProto
374 tr.LazyPrintf("fingerprint: %s", startingFingerPrint.String())
375
376 first := true
377 return func(repos ...uint32) ([]indexOptionsItem, error) {
378 options, nextFingerPrint, err := s.getIndexOptionsGRPC(ctx, startingFingerPrint, repos)
379 if err != nil {
380 first = false
381 s.configFingerprintProto = startingFingerPrint
382
383 return nil, err
384 }
385
386 if first {
387 first = false
388 s.configFingerprintProto = nextFingerPrint
389 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint.String())
390 }
391
392 return options, nil
393 }
394 }
395 }
396
397 iterate := func(f func(IndexOptions)) {
398 start := time.Now()
399 tr := trace.New("getIndexOptions", "")
400 tr.LazyPrintf("getting index options for %d repos", len(repos))
401
402 defer func() {
403 metricResolveRevisionsDuration.Observe(time.Since(start).Seconds())
404 tr.Finish()
405 }()
406
407 getIndexOptions := mkGetIndexOptionsFunc(tr)
408
409 // We ask the frontend to get index options in batches.
410 for repos := range batched(repos, batchSize) {
411 start := time.Now()
412 options, err := getIndexOptions(repos...)
413 duration := time.Since(start)
414
415 if err != nil {
416 metricResolveRevisionDuration.WithLabelValues("false").Observe(duration.Seconds())
417 tr.LazyPrintf("failed fetching options batch: %v", err)
418 tr.SetError()
419
420 continue
421 }
422
423 metricResolveRevisionDuration.WithLabelValues("true").Observe(duration.Seconds())
424
425 for _, o := range options {
426 metricGetIndexOptions.Inc()
427
428 if o.Error != "" {
429 metricGetIndexOptionsError.Inc()
430 tr.LazyPrintf("failed fetching options for %v: %v", o.Name, o.Error)
431 tr.SetError()
432
433 continue
434 }
435 f(o.IndexOptions)
436 }
437 }
438 }
439
440 return &SourcegraphListResult{
441 IDs: repos,
442 IterateIndexOptions: iterate,
443 }, nil
444}
445
446func (s *sourcegraphClient) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
447 batchSize := s.BatchSize
448 if batchSize == 0 {
449 batchSize = 10_000
450 }
451
452 getIndexOptions := func(repos ...uint32) ([]indexOptionsItem, error) {
453 opts, _, err := s.getIndexOptionsREST("", repos...)
454 return opts, err
455 }
456
457 if s.useGRPC {
458 getIndexOptions = func(repos ...uint32) ([]indexOptionsItem, error) {
459 opts, _, err := s.getIndexOptionsGRPC(context.Background(), nil, repos)
460 return opts, err
461 }
462 }
463
464 for repos := range batched(repos, batchSize) {
465 opts, err := getIndexOptions(repos...)
466 if err != nil {
467 for _, id := range repos {
468 onError(id, err)
469 }
470 continue
471 }
472 for _, o := range opts {
473 if o.RepoID > 0 && o.Error != "" {
474 onError(o.RepoID, errors.New(o.Error))
475 }
476 if o.Error == "" {
477 onSuccess(o.IndexOptions)
478 }
479 }
480 }
481}
482
483// indexOptionsItem wraps IndexOptions to also include an error returned by
484// the API.
485type indexOptionsItem struct {
486 IndexOptions
487 Error string
488}
489
490func (o *indexOptionsItem) FromProto(x *proto.ZoektIndexOptions) {
491 branches := make([]zoekt.RepositoryBranch, 0, len(x.Branches))
492 for _, b := range x.GetBranches() {
493 branches = append(branches, zoekt.RepositoryBranch{
494 Name: b.GetName(),
495 Version: b.GetVersion(),
496 })
497 }
498
499 item := indexOptionsItem{}
500 languageMap := make(map[string]uint8)
501
502 for _, lang := range x.GetLanguageMap() {
503 languageMap[lang.GetLanguage()] = uint8(lang.GetCtags().Number())
504 }
505
506 item.IndexOptions = IndexOptions{
507 RepoID: uint32(x.GetRepoId()),
508 LargeFiles: x.GetLargeFiles(),
509 Symbols: x.GetSymbols(),
510 Branches: branches,
511 Name: x.GetName(),
512
513 Priority: x.GetPriority(),
514
515 DocumentRanksVersion: x.GetDocumentRanksVersion(),
516
517 Public: x.GetPublic(),
518 Fork: x.GetFork(),
519 Archived: x.GetArchived(),
520
521 LanguageMap: languageMap,
522 }
523
524 item.Error = x.GetError()
525
526 *o = item
527}
528
529func (o *indexOptionsItem) ToProto() *proto.ZoektIndexOptions {
530 branches := make([]*proto.ZoektRepositoryBranch, 0, len(o.Branches))
531 for _, b := range o.Branches {
532 branches = append(branches, &proto.ZoektRepositoryBranch{
533 Name: b.Name,
534 Version: b.Version,
535 })
536 }
537
538 languageMap := make([]*proto.LanguageMapping, 0, len(o.LanguageMap))
539
540 for lang, parser := range o.LanguageMap {
541 languageMap = append(languageMap, &proto.LanguageMapping{
542 Language: lang,
543 Ctags: proto.CTagsParserType(parser),
544 })
545 }
546
547 return &proto.ZoektIndexOptions{
548 RepoId: int32(o.RepoID),
549 LargeFiles: o.LargeFiles,
550 Symbols: o.Symbols,
551 Branches: branches,
552 Name: o.Name,
553
554 Priority: o.Priority,
555
556 DocumentRanksVersion: o.DocumentRanksVersion,
557
558 Public: o.Public,
559 Fork: o.Fork,
560 Archived: o.Archived,
561
562 Error: o.Error,
563
564 LanguageMap: languageMap,
565 }
566}
567
568func (s *sourcegraphClient) getIndexOptionsGRPC(ctx context.Context, fingerprint *proto.Fingerprint, repos []uint32) ([]indexOptionsItem, *proto.Fingerprint, error) {
569 repoIDs := make([]int32, 0, len(repos))
570 for _, id := range repos {
571 repoIDs = append(repoIDs, int32(id))
572 }
573
574 req := proto.SearchConfigurationRequest{
575 RepoIds: repoIDs,
576 Fingerprint: fingerprint,
577 }
578
579 response, err := s.grpcClient.SearchConfiguration(ctx, &req)
580 if err != nil {
581 return nil, nil, err
582 }
583
584 protoItems := response.GetUpdatedOptions()
585 items := make([]indexOptionsItem, 0, len(protoItems))
586 for _, x := range protoItems {
587 var item indexOptionsItem
588 item.FromProto(x)
589 item.IndexOptions.CloneURL = s.getCloneURL(item.Name)
590
591 items = append(items, item)
592 }
593
594 return items, response.GetFingerprint(), nil
595}
596
597const fingerprintHeader = "X-Sourcegraph-Config-Fingerprint"
598
599func (s *sourcegraphClient) getIndexOptionsREST(fingerprint string, repos ...uint32) ([]indexOptionsItem, string, error) {
600 u := s.Root.ResolveReference(&url.URL{
601 Path: "/.internal/search/configuration",
602 })
603
604 repoIDs := make([]string, len(repos))
605 for i, id := range repos {
606 repoIDs[i] = strconv.Itoa(int(id))
607 }
608 data := url.Values{"repoID": repoIDs}
609 req, err := retryablehttp.NewRequest("POST", u.String(), []byte(data.Encode()))
610 if err != nil {
611 return nil, "", err
612 }
613 req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
614 if fingerprint != "" {
615 req.Header.Set(fingerprintHeader, fingerprint)
616 }
617
618 resp, err := s.doRequest(req)
619 if err != nil {
620 return nil, "", err
621 }
622 defer resp.Body.Close()
623
624 if resp.StatusCode != http.StatusOK {
625 b, err := io.ReadAll(io.LimitReader(resp.Body, 1024))
626 _ = resp.Body.Close()
627 if err != nil {
628 return nil, "", err
629 }
630 return nil, "", &url.Error{
631 Op: "Get",
632 URL: u.String(),
633 Err: fmt.Errorf("%s: %s", resp.Status, string(b)),
634 }
635 }
636
637 dec := json.NewDecoder(resp.Body)
638 var opts []indexOptionsItem
639 for {
640 var opt indexOptionsItem
641 err := dec.Decode(&opt)
642 if err == io.EOF {
643 break
644 }
645 if err != nil {
646 return nil, "", fmt.Errorf("error decoding body: %w", err)
647 }
648 opt.CloneURL = s.getCloneURL(opt.Name)
649 opts = append(opts, opt)
650 }
651
652 return opts, resp.Header.Get(fingerprintHeader), nil
653}
654
655func (s *sourcegraphClient) getCloneURL(name string) string {
656 return s.Root.ResolveReference(&url.URL{Path: path.Join("/.internal/git", name)}).String()
657}
658
659func (s *sourcegraphClient) listRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) {
660 if s.useGRPC {
661 return s.listRepoIDsGRPC(ctx, indexed)
662 }
663
664 return s.listRepoIDsREST(ctx, indexed)
665}
666
667func (s *sourcegraphClient) listRepoIDsGRPC(ctx context.Context, indexed []uint32) ([]uint32, error) {
668 var request proto.ListRequest
669 request.Hostname = s.Hostname
670 request.IndexedIds = make([]int32, 0, len(indexed))
671 for _, id := range indexed {
672 request.IndexedIds = append(request.IndexedIds, int32(id))
673 }
674
675 response, err := s.grpcClient.List(ctx, &request)
676 if err != nil {
677 return nil, err
678 }
679
680 repoIDs := make([]uint32, 0, len(response.RepoIds))
681 for _, id := range response.RepoIds {
682 repoIDs = append(repoIDs, uint32(id))
683 }
684
685 return repoIDs, nil
686}
687
688func (s *sourcegraphClient) listRepoIDsREST(_ context.Context, indexed []uint32) ([]uint32, error) {
689 body, err := json.Marshal(&struct {
690 Hostname string
691 IndexedIDs []uint32
692 }{
693 Hostname: s.Hostname,
694 IndexedIDs: indexed,
695 })
696 if err != nil {
697 return nil, err
698 }
699
700 u := s.Root.ResolveReference(&url.URL{Path: "/.internal/repos/index"})
701 req, err := retryablehttp.NewRequest(http.MethodPost, u.String(), bytes.NewReader(body))
702 if err != nil {
703 return nil, err
704 }
705 req.Header.Set("Content-Type", "application/json; charset=utf-8")
706
707 resp, err := s.doRequest(req)
708 if err != nil {
709 return nil, err
710 }
711 defer resp.Body.Close()
712
713 if resp.StatusCode != http.StatusOK {
714 return nil, fmt.Errorf("failed to list repositories: status %s", resp.Status)
715 }
716
717 var data struct {
718 RepoIDs []uint32
719 }
720 err = json.NewDecoder(resp.Body).Decode(&data)
721 if err != nil {
722 return nil, err
723 }
724
725 return data.RepoIDs, nil
726}
727
728type indexStatus struct {
729 RepoID uint32
730 Branches []zoekt.RepositoryBranch
731 IndexTimeUnix int64
732}
733
734type updateIndexStatusRequest struct {
735 Repositories []indexStatus
736}
737
738func (u *updateIndexStatusRequest) ToProto() *proto.UpdateIndexStatusRequest {
739 repositories := make([]*proto.UpdateIndexStatusRequest_Repository, 0, len(u.Repositories))
740
741 for _, repo := range u.Repositories {
742 branches := make([]*proto.ZoektRepositoryBranch, 0, len(repo.Branches))
743
744 for _, branch := range repo.Branches {
745 branches = append(branches, &proto.ZoektRepositoryBranch{
746 Name: branch.Name,
747 Version: branch.Version,
748 })
749 }
750
751 repositories = append(repositories, &proto.UpdateIndexStatusRequest_Repository{
752 RepoId: repo.RepoID,
753 Branches: branches,
754 IndexTimeUnix: repo.IndexTimeUnix,
755 })
756 }
757
758 return &proto.UpdateIndexStatusRequest{
759 Repositories: repositories,
760 }
761}
762
763func (u *updateIndexStatusRequest) FromProto(x *proto.UpdateIndexStatusRequest) {
764 protoRepositories := x.GetRepositories()
765 repositories := make([]indexStatus, 0, len(protoRepositories))
766
767 for _, repo := range x.GetRepositories() {
768 protoBranches := repo.GetBranches()
769 branches := make([]zoekt.RepositoryBranch, 0, len(protoBranches))
770
771 for _, branch := range repo.GetBranches() {
772 branches = append(branches, zoekt.RepositoryBranch{
773 Name: branch.GetName(),
774 Version: branch.GetVersion(),
775 })
776 }
777
778 repositories = append(repositories, indexStatus{
779 RepoID: repo.GetRepoId(),
780 Branches: branches,
781 IndexTimeUnix: repo.GetIndexTimeUnix(),
782 })
783 }
784
785 *u = updateIndexStatusRequest{
786 Repositories: repositories,
787 }
788}
789
790// UpdateIndexStatus sends a request to Sourcegraph to confirm that the given
791// repositories have been indexed.
792func (s *sourcegraphClient) UpdateIndexStatus(repositories []indexStatus) error {
793 r := updateIndexStatusRequest{Repositories: repositories}
794
795 if s.useGRPC {
796 return s.updateIndexStatusGRPC(r)
797 }
798
799 return s.updateIndexStatusREST(r)
800}
801
802func (s *sourcegraphClient) updateIndexStatusGRPC(r updateIndexStatusRequest) error {
803 request := r.ToProto()
804 _, err := s.grpcClient.UpdateIndexStatus(context.Background(), request)
805
806 if err != nil {
807 return fmt.Errorf("failed to update index status: %w", err)
808 }
809
810 return nil
811}
812
813func (s *sourcegraphClient) updateIndexStatusREST(r updateIndexStatusRequest) error {
814 payload, err := json.Marshal(r)
815 if err != nil {
816 return err
817 }
818
819 u := s.Root.ResolveReference(&url.URL{Path: "/.internal/search/index-status"})
820 req, err := retryablehttp.NewRequest(http.MethodPost, u.String(), bytes.NewReader(payload))
821 if err != nil {
822 return err
823 }
824 req.Header.Set("Content-Type", "application/json; charset=utf-8")
825
826 resp, err := s.doRequest(req)
827 if err != nil {
828 return err
829 }
830 defer resp.Body.Close()
831
832 if resp.StatusCode != http.StatusOK {
833 return fmt.Errorf("failed to update index status: status %s", resp.Status)
834 }
835
836 return nil
837}
838
839// doRequest executes the provided request after adding the appropriate headers
840// for interacting with a Sourcegraph instance.
841func (s *sourcegraphClient) doRequest(req *retryablehttp.Request) (*http.Response, error) {
842 // Make all requests as an internal user.
843 //
844 // Should match github.com/sourcegraph/sourcegraph/internal/actor.headerKeyActorUID
845 // and github.com/sourcegraph/sourcegraph/internal/actor.headerValueInternalActor
846 req.Header.Set("X-Sourcegraph-Actor-UID", "internal")
847 return s.restClient.Do(req)
848}
849
850type sourcegraphFake struct {
851 RootDir string
852 Log *log.Logger
853}
854
855// GetRepoRank expects a file with exactly 1 line containing a comma separated
856// list of float64 as ranks.
857func (sf sourcegraphFake) GetRepoRank(ctx context.Context, repoName string) ([]float64, error) {
858 dir := filepath.Join(sf.RootDir, filepath.FromSlash(repoName))
859
860 b, err := os.ReadFile(filepath.Join(dir, "SG_REPO_RANKS"))
861 if err != nil {
862 return nil, err
863 }
864
865 return floats64(string(b)), nil
866}
867
868// GetDocumentRanks expects a file where each line has the following format:
869// path<tab>rank... where rank is a float64.
870func (sf sourcegraphFake) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) {
871 dir := filepath.Join(sf.RootDir, filepath.FromSlash(repoName))
872
873 fd, err := os.Open(filepath.Join(dir, "SG_DOCUMENT_RANKS"))
874 if err != nil {
875 return RepoPathRanks{}, err
876 }
877
878 ranks := RepoPathRanks{}
879
880 sum := 0.0
881 count := 0
882 scanner := bufio.NewScanner(fd)
883 for scanner.Scan() {
884 s := scanner.Text()
885 pathRanks := strings.Split(s, "\t")
886 if rank, err := strconv.ParseFloat(pathRanks[1], 64); err == nil {
887 ranks.Paths[pathRanks[0]] = rank
888 sum += rank
889 count++
890 }
891 }
892
893 if err := scanner.Err(); err != nil {
894 return RepoPathRanks{}, err
895 }
896
897 ranks.MeanRank = sum / float64(count)
898 return ranks, nil
899}
900
901func floats64(s string) []float64 {
902 parts := strings.Split(s, ",")
903
904 var r []float64
905 for _, rank := range parts {
906 f, err := strconv.ParseFloat(rank, 64)
907 if err != nil {
908 return nil
909 }
910 r = append(r, f)
911 }
912
913 return r
914}
915
916func (sf sourcegraphFake) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
917 repos, err := sf.ListRepoIDs(ctx, indexed)
918 if err != nil {
919 return nil, err
920 }
921
922 iterate := func(f func(IndexOptions)) {
923 opts, err := sf.GetIndexOptions(repos...)
924 if err != nil {
925 sf.Log.Printf("WARN: ignoring GetIndexOptions error: %v", err)
926 }
927 for _, opt := range opts {
928 if opt.Error != "" {
929 sf.Log.Printf("WARN: ignoring GetIndexOptions error for %s: %v", opt.Name, opt.Error)
930 continue
931 }
932 f(opt.IndexOptions)
933 }
934 }
935
936 return &SourcegraphListResult{
937 IDs: repos,
938 IterateIndexOptions: iterate,
939 }, nil
940}
941
942func (sf sourcegraphFake) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
943 opts, err := sf.GetIndexOptions(repos...)
944 if err != nil {
945 for _, id := range repos {
946 onError(id, err)
947 }
948 return
949 }
950 for _, o := range opts {
951 if o.RepoID > 0 && o.Error != "" {
952 onError(o.RepoID, errors.New(o.Error))
953 }
954 if o.Error == "" {
955 onSuccess(o.IndexOptions)
956 }
957 }
958}
959
960func (sf sourcegraphFake) GetIndexOptions(repos ...uint32) ([]indexOptionsItem, error) {
961 reposIdx := map[uint32]int{}
962 for i, id := range repos {
963 reposIdx[id] = i
964 }
965
966 items := make([]indexOptionsItem, len(repos))
967 err := sf.visitRepos(func(name string) {
968 idx, ok := reposIdx[sf.id(name)]
969 if !ok {
970 return
971 }
972 opts, err := sf.getIndexOptions(name)
973 if err != nil {
974 items[idx] = indexOptionsItem{Error: err.Error()}
975 } else {
976 items[idx] = indexOptionsItem{IndexOptions: opts}
977 }
978 })
979
980 if err != nil {
981 return nil, err
982 }
983
984 for i := range items {
985 if items[i].Error == "" && items[i].RepoID == 0 {
986 items[i].Error = "not found"
987 }
988 }
989
990 return items, nil
991}
992
993func (sf sourcegraphFake) getIndexOptions(name string) (IndexOptions, error) {
994 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name))
995 exists := func(p string) bool {
996 _, err := os.Stat(filepath.Join(dir, p))
997 return err == nil
998 }
999 float := func(p string) float64 {
1000 b, _ := os.ReadFile(filepath.Join(dir, p))
1001 f, _ := strconv.ParseFloat(string(bytes.TrimSpace(b)), 64)
1002 return f
1003 }
1004
1005 opts := IndexOptions{
1006 RepoID: sf.id(name),
1007 Name: name,
1008 CloneURL: sf.getCloneURL(name),
1009 Symbols: true,
1010
1011 Public: !exists("SG_PRIVATE"),
1012 Fork: exists("SG_FORK"),
1013 Archived: exists("SG_ARCHIVED"),
1014
1015 Priority: float("SG_PRIORITY"),
1016 }
1017
1018 if stat, err := os.Stat(filepath.Join(dir, "SG_DOCUMENT_RANKS")); err == nil {
1019 opts.DocumentRanksVersion = stat.ModTime().String()
1020 }
1021
1022 branches, err := sf.getBranches(name)
1023 if err != nil {
1024 return opts, err
1025 }
1026 opts.Branches = branches
1027
1028 return opts, nil
1029}
1030
1031func (sf sourcegraphFake) getBranches(name string) ([]zoekt.RepositoryBranch, error) {
1032 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name))
1033 repo, err := git.PlainOpen(dir)
1034 if err != nil {
1035 return nil, err
1036 }
1037
1038 cfg, err := repo.Config()
1039 if err != nil {
1040 return nil, err
1041 }
1042
1043 sec := cfg.Raw.Section("zoekt")
1044 branches := sec.Options.GetAll("branch")
1045 if len(branches) == 0 {
1046 branches = append(branches, "HEAD")
1047 }
1048
1049 rBranches := make([]zoekt.RepositoryBranch, 0, len(branches))
1050 for _, branch := range branches {
1051 cmd := exec.Command("git", "rev-parse", branch)
1052 cmd.Dir = dir
1053 if b, err := cmd.Output(); err != nil {
1054 sf.Log.Printf("WARN: Could not get branch %s/%s", name, branch)
1055 } else {
1056 version := string(bytes.TrimSpace(b))
1057 rBranches = append(rBranches, zoekt.RepositoryBranch{
1058 Name: branch,
1059 Version: version,
1060 })
1061 }
1062 }
1063
1064 if len(rBranches) == 0 {
1065 return nil, fmt.Errorf("WARN: Could not get any branch revisions for repo %s", name)
1066 }
1067
1068 return rBranches, nil
1069}
1070
1071func (sf sourcegraphFake) id(name string) uint32 {
1072 // allow overriding the ID.
1073 idPath := filepath.Join(sf.RootDir, filepath.FromSlash(name), "SG_ID")
1074 if b, _ := os.ReadFile(idPath); len(b) > 0 {
1075 id, err := strconv.Atoi(strings.TrimSpace(string(b)))
1076 if err == nil {
1077 return uint32(id)
1078 }
1079 }
1080 return fakeID(name)
1081}
1082
1083func (sf sourcegraphFake) getCloneURL(name string) string {
1084 return filepath.Join(sf.RootDir, filepath.FromSlash(name))
1085}
1086
1087func (sf sourcegraphFake) ListRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) {
1088 var repos []uint32
1089 err := sf.visitRepos(func(name string) {
1090 repos = append(repos, sf.id(name))
1091 })
1092 return repos, err
1093}
1094
1095func (sf sourcegraphFake) visitRepos(visit func(name string)) error {
1096 return filepath.Walk(sf.RootDir, func(path string, fi os.FileInfo, fileErr error) error {
1097 if fileErr != nil {
1098 sf.Log.Printf("WARN: ignoring error searching %s: %v", path, fileErr)
1099 return nil
1100 }
1101 if !fi.IsDir() {
1102 return nil
1103 }
1104
1105 gitdir := filepath.Join(path, ".git")
1106 if fi, err := os.Stat(gitdir); err != nil || !fi.IsDir() {
1107 return nil
1108 }
1109
1110 subpath, err := filepath.Rel(sf.RootDir, path)
1111 if err != nil {
1112 // According to WalkFunc docs, path is always filepath.Join(root,
1113 // subpath). So Rel should always work.
1114 return fmt.Errorf("filepath.Walk returned %s which is not relative to %s: %w", path, sf.RootDir, err)
1115 }
1116
1117 name := filepath.ToSlash(subpath)
1118 visit(name)
1119
1120 return filepath.SkipDir
1121 })
1122}
1123
1124func (s sourcegraphFake) UpdateIndexStatus(repositories []indexStatus) error {
1125 // noop
1126 return nil
1127}
1128
1129// fakeID returns a deterministic ID based on name. Used for fakes and tests.
1130func fakeID(name string) uint32 {
1131 // magic at the end is to ensure we get a positive number when casting.
1132 return uint32(crc32.ChecksumIEEE([]byte(name))%(1<<31-1) + 1)
1133}
1134
1135type sourcegraphNop struct{}
1136
1137func (s sourcegraphNop) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
1138 return nil, nil
1139}
1140
1141func (s sourcegraphNop) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
1142 return
1143}
1144
1145func (s sourcegraphNop) GetRepoRank(ctx context.Context, repoName string) ([]float64, error) {
1146 return nil, nil
1147}
1148
1149func (s sourcegraphNop) GetDocumentRanks(ctx context.Context, repoName string) (RepoPathRanks, error) {
1150 return RepoPathRanks{}, nil
1151}
1152
1153func (s sourcegraphNop) UpdateIndexStatus(repositories []indexStatus) error {
1154 return nil
1155}
1156
1157type RepoPathRanks struct {
1158 MeanRank float64 `json:"mean_reference_count"`
1159 Paths map[string]float64 `json:"paths"`
1160}
1161
1162func (r *RepoPathRanks) FromProto(x *proto.DocumentRanksResponse) {
1163 protoPaths := x.GetPaths()
1164 ranks := make(map[string]float64, len(protoPaths))
1165 for filePath, rank := range protoPaths {
1166 ranks[filePath] = rank
1167 }
1168
1169 *r = RepoPathRanks{
1170 MeanRank: x.GetMeanRank(),
1171 Paths: ranks,
1172 }
1173}
1174
1175func (r *RepoPathRanks) ToProto() *proto.DocumentRanksResponse {
1176 paths := make(map[string]float64, len(r.Paths))
1177 for filePath, rank := range r.Paths {
1178 paths[filePath] = rank
1179 }
1180
1181 return &proto.DocumentRanksResponse{
1182 MeanRank: r.MeanRank,
1183 Paths: paths,
1184 }
1185}
1186
1187type noopGRPCClient struct{}
1188
1189func (n noopGRPCClient) SearchConfiguration(ctx context.Context, in *proto.SearchConfigurationRequest, opts ...grpc.CallOption) (*proto.SearchConfigurationResponse, error) {
1190 return nil, fmt.Errorf("grpc client not enabled")
1191}
1192
1193func (n noopGRPCClient) List(ctx context.Context, in *proto.ListRequest, opts ...grpc.CallOption) (*proto.ListResponse, error) {
1194 return nil, fmt.Errorf("grpc client not enabled")
1195}
1196
1197func (n noopGRPCClient) RepositoryRank(ctx context.Context, in *proto.RepositoryRankRequest, opts ...grpc.CallOption) (*proto.RepositoryRankResponse, error) {
1198 return nil, fmt.Errorf("grpc client not enabled")
1199}
1200
1201func (n noopGRPCClient) DocumentRanks(ctx context.Context, in *proto.DocumentRanksRequest, opts ...grpc.CallOption) (*proto.DocumentRanksResponse, error) {
1202 return nil, fmt.Errorf("grpc client not enabled")
1203}
1204
1205func (n noopGRPCClient) UpdateIndexStatus(ctx context.Context, in *proto.UpdateIndexStatusRequest, opts ...grpc.CallOption) (*proto.UpdateIndexStatusResponse, error) {
1206 return nil, fmt.Errorf("grpc client not enabled")
1207}
1208
1209var _ proto.ZoektConfigurationServiceClient = noopGRPCClient{}