fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "bytes"
5 "context"
6 "errors"
7 "fmt"
8 "hash/crc32"
9 "log"
10 "math/rand"
11 "net/url"
12 "os"
13 "os/exec"
14 "path"
15 "path/filepath"
16 "strconv"
17 "strings"
18 "time"
19
20 "github.com/go-git/go-git/v5"
21 "github.com/sourcegraph/zoekt/internal/ctags"
22 "golang.org/x/net/trace"
23
24 "github.com/sourcegraph/zoekt"
25 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
26)
27
28// SourcegraphListResult is the return value of Sourcegraph.List. It is its
29// own type since internally we batch the calculation of index options. This
30// is exposed via IterateIndexOptions.
31//
32// This type has state and is coupled to the Sourcegraph implementation.
33type SourcegraphListResult struct {
34 // IDs is the set of Sourcegraph repository IDs that this replica needs
35 // to index.
36 IDs []uint32
37
38 // IterateIndexOptions best effort resolves the IndexOptions for RepoIDs. If
39 // any repository fails it internally logs. It uses the "config fingerprint"
40 // to reduce the amount of work done. This means we only resolve options for
41 // repositories which have been mutated since the last Sourcegraph.List
42 // call.
43 //
44 // Note: this has a side-effect of setting a the "config fingerprint". The
45 // config fingerprint means we only calculate index options for repositories
46 // that have changed since the last call to IterateIndexOptions. If you want
47 // to force calculation of index options use
48 // Sourcegraph.ForceIterateIndexOptions.
49 //
50 // Note: This should not be called concurrently with the Sourcegraph client.
51 IterateIndexOptions func(func(IndexOptions))
52}
53
54// Sourcegraph represents the Sourcegraph service. It informs the indexserver
55// what to index and which options to use.
56type Sourcegraph interface {
57 // List returns a list of repository IDs to index as well as a facility to
58 // fetch the indexing options.
59 //
60 // Note: The return value is not safe to use concurrently with future calls
61 // to List.
62 List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error)
63
64 // ForceIterateIndexOptions will best-effort calculate the index options for
65 // all repos. For each repo it will call either onSuccess or onError. This
66 // is the forced version of IterateIndexOptions, so will always calculate
67 // options for each id in repos.
68 ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32)
69
70 // UpdateIndexStatus sends a request to Sourcegraph to confirm that the
71 // given repositories have been indexed.
72 UpdateIndexStatus(repositories []indexStatus) error
73}
74
75type SourcegraphClientOption func(*sourcegraphClient)
76
77// WithBatchSize controls how many repository configurations we request a time.
78// If BatchSize is 0, we default to requesting 10,000 repositories at once.
79func WithBatchSize(batchSize int) SourcegraphClientOption {
80 return func(c *sourcegraphClient) {
81 c.BatchSize = batchSize
82 }
83}
84
85func newSourcegraphClient(rootURL *url.URL, hostname string, grpcClient proto.ZoektConfigurationServiceClient, opts ...SourcegraphClientOption) *sourcegraphClient {
86 client := &sourcegraphClient{
87 Root: rootURL,
88 Hostname: hostname,
89 BatchSize: 0,
90 grpcClient: grpcClient,
91 }
92
93 for _, opt := range opts {
94 opt(client)
95 }
96
97 return client
98}
99
100// sourcegraphClient contains methods which interact with the sourcegraph API.
101type sourcegraphClient struct {
102 // Root is the base URL for the Sourcegraph instance to index. Normally
103 // http://sourcegraph-frontend-internal or http://localhost:3090.
104 Root *url.URL
105
106 // Hostname is the name we advertise to Sourcegraph when asking for the
107 // list of repositories to index.
108 Hostname string
109
110 // BatchSize is how many repository configurations we request at once. If
111 // zero a value of 10000 is used.
112 BatchSize int
113
114 // grpcClient is used to make requests to the Sourcegraph instance if gRPC is enabled.
115 grpcClient proto.ZoektConfigurationServiceClient
116
117 // configFingerprintProto is the last config fingerprint (as GRPC) returned from
118 // Sourcegraph. It can be used for future calls to the configuration
119 // endpoint.
120 //
121 // configFingerprintProto is mutually exclusive with configFingerprint - this field
122 // will only be used if gRPC is enabled.
123 configFingerprintProto *proto.Fingerprint
124
125 // configFingerprintReset tracks when we should zero out the
126 // configFingerprint. We want to periodically do this just in case our
127 // configFingerprint logic is faulty. When it is cleared out, we fallback to
128 // calculating everything.
129 configFingerprintReset time.Time
130}
131
132func (s *sourcegraphClient) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
133 repos, err := s.listRepoIDs(ctx, indexed)
134 if err != nil {
135 return nil, fmt.Errorf("listRepoIDs: %w", err)
136 }
137
138 batchSize := s.BatchSize
139 if batchSize == 0 {
140 batchSize = 10_000
141 }
142
143 // Check if we should recalculate everything.
144 if time.Now().After(s.configFingerprintReset) {
145 // for every 500 repos we wait a minute. 2021-12-15 on sourcegraph.com
146 // this works out to every 100 minutes.
147 next := time.Duration(len(indexed)) * time.Minute / 500
148 if min := 5 * time.Minute; next < min {
149 next = min
150 }
151 next += time.Duration(rand.Int63n(int64(next) / 4)) // jitter
152 s.configFingerprintReset = time.Now().Add(next)
153
154 s.configFingerprintProto = nil
155 }
156
157 // getIndexOptionsFunc is a function that can be used to get the index
158 // options for a set of repos (while properly handling any configuration fingerprint
159 // changes).
160 //
161 // In general, this function provides a consistent fingerprint for each batch call,
162 // and updates the server state with the new fingerprint. If any of the batch calls
163 // fail, the old fingerprint is restored.
164 type getIndexOptionsFunc func(repos ...uint32) ([]indexOptionsItem, error)
165
166 mkGetIndexOptionsFunc := func(tr trace.Trace) getIndexOptionsFunc {
167 startingFingerPrint := s.configFingerprintProto
168 tr.LazyPrintf("fingerprint: %s", startingFingerPrint.String())
169
170 first := true
171 return func(repos ...uint32) ([]indexOptionsItem, error) {
172 options, nextFingerPrint, err := s.getIndexOptions(ctx, startingFingerPrint, repos)
173 if err != nil {
174 first = false
175 s.configFingerprintProto = startingFingerPrint
176
177 return nil, err
178 }
179
180 if first {
181 first = false
182 s.configFingerprintProto = nextFingerPrint
183 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint.String())
184 }
185
186 return options, nil
187 }
188 }
189
190 iterate := func(f func(IndexOptions)) {
191 start := time.Now()
192 tr := trace.New("getIndexOptions", "")
193 tr.LazyPrintf("getting index options for %d repos", len(repos))
194
195 defer func() {
196 metricResolveRevisionsDuration.Observe(time.Since(start).Seconds())
197 tr.Finish()
198 }()
199
200 getIndexOptions := mkGetIndexOptionsFunc(tr)
201
202 // We ask the frontend to get index options in batches.
203 for repos := range batched(repos, batchSize) {
204 start := time.Now()
205 options, err := getIndexOptions(repos...)
206 duration := time.Since(start)
207
208 if err != nil {
209 metricResolveRevisionDuration.WithLabelValues("false").Observe(duration.Seconds())
210 tr.LazyPrintf("failed fetching options batch: %v", err)
211 tr.SetError()
212
213 continue
214 }
215
216 metricResolveRevisionDuration.WithLabelValues("true").Observe(duration.Seconds())
217
218 for _, o := range options {
219 metricGetIndexOptions.Inc()
220
221 if o.Error != "" {
222 metricGetIndexOptionsError.Inc()
223 tr.LazyPrintf("failed fetching options for %v: %v", o.Name, o.Error)
224 tr.SetError()
225
226 continue
227 }
228 f(o.IndexOptions)
229 }
230 }
231 }
232
233 return &SourcegraphListResult{
234 IDs: repos,
235 IterateIndexOptions: iterate,
236 }, nil
237}
238
239func (s *sourcegraphClient) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
240 batchSize := s.BatchSize
241 if batchSize == 0 {
242 batchSize = 10_000
243 }
244
245 for repos := range batched(repos, batchSize) {
246 opts, _, err := s.getIndexOptions(context.Background(), nil, repos)
247 if err != nil {
248 for _, id := range repos {
249 onError(id, err)
250 }
251 continue
252 }
253 for _, o := range opts {
254 if o.RepoID > 0 && o.Error != "" {
255 onError(o.RepoID, errors.New(o.Error))
256 }
257 if o.Error == "" {
258 onSuccess(o.IndexOptions)
259 }
260 }
261 }
262}
263
264// indexOptionsItem wraps IndexOptions to also include an error returned by
265// the API.
266type indexOptionsItem struct {
267 IndexOptions
268 Error string
269}
270
271func (o *indexOptionsItem) FromProto(x *proto.ZoektIndexOptions) {
272 branches := make([]zoekt.RepositoryBranch, 0, len(x.Branches))
273 for _, b := range x.GetBranches() {
274 branches = append(branches, zoekt.RepositoryBranch{
275 Name: b.GetName(),
276 Version: b.GetVersion(),
277 })
278 }
279
280 item := indexOptionsItem{}
281 languageMap := make(map[string]ctags.CTagsParserType)
282
283 for _, lang := range x.GetLanguageMap() {
284 languageMap[lang.GetLanguage()] = ctags.CTagsParserType(lang.GetCtags().Number())
285 }
286
287 item.IndexOptions = IndexOptions{
288 RepoID: uint32(x.GetRepoId()),
289 LargeFiles: x.GetLargeFiles(),
290 Symbols: x.GetSymbols(),
291 Branches: branches,
292 Name: x.GetName(),
293
294 Priority: x.GetPriority(),
295
296 Public: x.GetPublic(),
297 Fork: x.GetFork(),
298 Archived: x.GetArchived(),
299
300 LanguageMap: languageMap,
301 ShardConcurrency: x.GetShardConcurrency(),
302
303 TenantID: int(x.TenantId),
304 }
305
306 item.Error = x.GetError()
307
308 *o = item
309}
310
311func (o *indexOptionsItem) ToProto() *proto.ZoektIndexOptions {
312 branches := make([]*proto.ZoektRepositoryBranch, 0, len(o.Branches))
313 for _, b := range o.Branches {
314 branches = append(branches, &proto.ZoektRepositoryBranch{
315 Name: b.Name,
316 Version: b.Version,
317 })
318 }
319
320 languageMap := make([]*proto.LanguageMapping, 0, len(o.LanguageMap))
321
322 for lang, parser := range o.LanguageMap {
323 languageMap = append(languageMap, &proto.LanguageMapping{
324 Language: lang,
325 Ctags: proto.CTagsParserType(parser),
326 })
327 }
328
329 return &proto.ZoektIndexOptions{
330 RepoId: int32(o.RepoID),
331 LargeFiles: o.LargeFiles,
332 Symbols: o.Symbols,
333 Branches: branches,
334 Name: o.Name,
335
336 Priority: o.Priority,
337
338 Public: o.Public,
339 Fork: o.Fork,
340 Archived: o.Archived,
341
342 Error: o.Error,
343
344 LanguageMap: languageMap,
345 ShardConcurrency: o.ShardConcurrency,
346
347 TenantId: int64(o.TenantID),
348 }
349}
350
351func (s *sourcegraphClient) getIndexOptions(ctx context.Context, fingerprint *proto.Fingerprint, repos []uint32) ([]indexOptionsItem, *proto.Fingerprint, error) {
352 repoIDs := make([]int32, 0, len(repos))
353 for _, id := range repos {
354 repoIDs = append(repoIDs, int32(id))
355 }
356
357 req := proto.SearchConfigurationRequest{
358 RepoIds: repoIDs,
359 Fingerprint: fingerprint,
360 }
361
362 response, err := s.grpcClient.SearchConfiguration(ctx, &req)
363 if err != nil {
364 return nil, nil, err
365 }
366
367 protoItems := response.GetUpdatedOptions()
368 items := make([]indexOptionsItem, 0, len(protoItems))
369 for _, x := range protoItems {
370 var item indexOptionsItem
371 item.FromProto(x)
372 item.IndexOptions.CloneURL = s.getCloneURL(item.Name)
373
374 items = append(items, item)
375 }
376
377 return items, response.GetFingerprint(), nil
378}
379
380func (s *sourcegraphClient) getCloneURL(name string) string {
381 return s.Root.ResolveReference(&url.URL{Path: path.Join("/.internal/git", name)}).String()
382}
383
384func (s *sourcegraphClient) listRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) {
385 var request proto.ListRequest
386 request.Hostname = s.Hostname
387 request.IndexedIds = make([]int32, 0, len(indexed))
388 for _, id := range indexed {
389 request.IndexedIds = append(request.IndexedIds, int32(id))
390 }
391
392 response, err := s.grpcClient.List(ctx, &request)
393 if err != nil {
394 return nil, err
395 }
396
397 repoIDs := make([]uint32, 0, len(response.RepoIds))
398 for _, id := range response.RepoIds {
399 repoIDs = append(repoIDs, uint32(id))
400 }
401
402 return repoIDs, nil
403}
404
405type indexStatus struct {
406 RepoID uint32
407 Branches []zoekt.RepositoryBranch
408 IndexTimeUnix int64
409}
410
411type updateIndexStatusRequest struct {
412 Repositories []indexStatus
413}
414
415func (u *updateIndexStatusRequest) ToProto() *proto.UpdateIndexStatusRequest {
416 repositories := make([]*proto.UpdateIndexStatusRequest_Repository, 0, len(u.Repositories))
417
418 for _, repo := range u.Repositories {
419 branches := make([]*proto.ZoektRepositoryBranch, 0, len(repo.Branches))
420
421 for _, branch := range repo.Branches {
422 branches = append(branches, &proto.ZoektRepositoryBranch{
423 Name: branch.Name,
424 Version: branch.Version,
425 })
426 }
427
428 repositories = append(repositories, &proto.UpdateIndexStatusRequest_Repository{
429 RepoId: repo.RepoID,
430 Branches: branches,
431 IndexTimeUnix: repo.IndexTimeUnix,
432 })
433 }
434
435 return &proto.UpdateIndexStatusRequest{
436 Repositories: repositories,
437 }
438}
439
440func (u *updateIndexStatusRequest) FromProto(x *proto.UpdateIndexStatusRequest) {
441 protoRepositories := x.GetRepositories()
442 repositories := make([]indexStatus, 0, len(protoRepositories))
443
444 for _, repo := range x.GetRepositories() {
445 protoBranches := repo.GetBranches()
446 branches := make([]zoekt.RepositoryBranch, 0, len(protoBranches))
447
448 for _, branch := range repo.GetBranches() {
449 branches = append(branches, zoekt.RepositoryBranch{
450 Name: branch.GetName(),
451 Version: branch.GetVersion(),
452 })
453 }
454
455 repositories = append(repositories, indexStatus{
456 RepoID: repo.GetRepoId(),
457 Branches: branches,
458 IndexTimeUnix: repo.GetIndexTimeUnix(),
459 })
460 }
461
462 *u = updateIndexStatusRequest{
463 Repositories: repositories,
464 }
465}
466
467// UpdateIndexStatus sends a request to Sourcegraph to confirm that the given
468// repositories have been indexed.
469func (s *sourcegraphClient) UpdateIndexStatus(repositories []indexStatus) error {
470 r := updateIndexStatusRequest{Repositories: repositories}
471
472 request := r.ToProto()
473 _, err := s.grpcClient.UpdateIndexStatus(context.Background(), request)
474 if err != nil {
475 return fmt.Errorf("failed to update index status: %w", err)
476 }
477
478 return nil
479}
480
481type sourcegraphFake struct {
482 RootDir string
483 Log *log.Logger
484}
485
486func (sf sourcegraphFake) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
487 repos, err := sf.ListRepoIDs(ctx, indexed)
488 if err != nil {
489 return nil, err
490 }
491
492 iterate := func(f func(IndexOptions)) {
493 opts, err := sf.GetIndexOptions(repos...)
494 if err != nil {
495 sf.Log.Printf("WARN: ignoring GetIndexOptions error: %v", err)
496 }
497 for _, opt := range opts {
498 if opt.Error != "" {
499 sf.Log.Printf("WARN: ignoring GetIndexOptions error for %s: %v", opt.Name, opt.Error)
500 continue
501 }
502 f(opt.IndexOptions)
503 }
504 }
505
506 return &SourcegraphListResult{
507 IDs: repos,
508 IterateIndexOptions: iterate,
509 }, nil
510}
511
512func (sf sourcegraphFake) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
513 opts, err := sf.GetIndexOptions(repos...)
514 if err != nil {
515 for _, id := range repos {
516 onError(id, err)
517 }
518 return
519 }
520 for _, o := range opts {
521 if o.RepoID > 0 && o.Error != "" {
522 onError(o.RepoID, errors.New(o.Error))
523 }
524 if o.Error == "" {
525 onSuccess(o.IndexOptions)
526 }
527 }
528}
529
530func (sf sourcegraphFake) GetIndexOptions(repos ...uint32) ([]indexOptionsItem, error) {
531 reposIdx := map[uint32]int{}
532 for i, id := range repos {
533 reposIdx[id] = i
534 }
535
536 items := make([]indexOptionsItem, len(repos))
537 err := sf.visitRepos(func(name string) {
538 idx, ok := reposIdx[sf.id(name)]
539 if !ok {
540 return
541 }
542 opts, err := sf.getIndexOptions(name)
543 if err != nil {
544 items[idx] = indexOptionsItem{Error: err.Error()}
545 } else {
546 items[idx] = indexOptionsItem{IndexOptions: opts}
547 }
548 })
549 if err != nil {
550 return nil, err
551 }
552
553 for i := range items {
554 if items[i].Error == "" && items[i].RepoID == 0 {
555 items[i].Error = "not found"
556 }
557 }
558
559 return items, nil
560}
561
562func (sf sourcegraphFake) getIndexOptions(name string) (IndexOptions, error) {
563 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name))
564 exists := func(p string) bool {
565 _, err := os.Stat(filepath.Join(dir, p))
566 return err == nil
567 }
568 float := func(p string) float64 {
569 b, _ := os.ReadFile(filepath.Join(dir, p))
570 f, _ := strconv.ParseFloat(string(bytes.TrimSpace(b)), 64)
571 return f
572 }
573
574 opts := IndexOptions{
575 RepoID: sf.id(name),
576 Name: name,
577 CloneURL: sf.getCloneURL(name),
578 Symbols: true,
579
580 Public: !exists("SG_PRIVATE"),
581 Fork: exists("SG_FORK"),
582 Archived: exists("SG_ARCHIVED"),
583
584 Priority: float("SG_PRIORITY"),
585 }
586
587 branches, err := sf.getBranches(name)
588 if err != nil {
589 return opts, err
590 }
591 opts.Branches = branches
592
593 return opts, nil
594}
595
596func (sf sourcegraphFake) getBranches(name string) ([]zoekt.RepositoryBranch, error) {
597 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name))
598 repo, err := git.PlainOpen(dir)
599 if err != nil {
600 return nil, err
601 }
602
603 cfg, err := repo.Config()
604 if err != nil {
605 return nil, err
606 }
607
608 sec := cfg.Raw.Section("zoekt")
609 branches := sec.Options.GetAll("branch")
610 if len(branches) == 0 {
611 branches = append(branches, "HEAD")
612 }
613
614 rBranches := make([]zoekt.RepositoryBranch, 0, len(branches))
615 for _, branch := range branches {
616 cmd := exec.Command("git", "rev-parse", branch)
617 cmd.Dir = dir
618 if b, err := cmd.Output(); err != nil {
619 sf.Log.Printf("WARN: Could not get branch %s/%s", name, branch)
620 } else {
621 version := string(bytes.TrimSpace(b))
622 rBranches = append(rBranches, zoekt.RepositoryBranch{
623 Name: branch,
624 Version: version,
625 })
626 }
627 }
628
629 if len(rBranches) == 0 {
630 return nil, fmt.Errorf("WARN: Could not get any branch revisions for repo %s", name)
631 }
632
633 return rBranches, nil
634}
635
636func (sf sourcegraphFake) id(name string) uint32 {
637 // allow overriding the ID.
638 idPath := filepath.Join(sf.RootDir, filepath.FromSlash(name), "SG_ID")
639 if b, _ := os.ReadFile(idPath); len(b) > 0 {
640 id, err := strconv.Atoi(strings.TrimSpace(string(b)))
641 if err == nil {
642 return uint32(id)
643 }
644 }
645 return fakeID(name)
646}
647
648func (sf sourcegraphFake) getCloneURL(name string) string {
649 return filepath.Join(sf.RootDir, filepath.FromSlash(name))
650}
651
652func (sf sourcegraphFake) ListRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) {
653 var repos []uint32
654 err := sf.visitRepos(func(name string) {
655 repos = append(repos, sf.id(name))
656 })
657 return repos, err
658}
659
660func (sf sourcegraphFake) visitRepos(visit func(name string)) error {
661 return filepath.Walk(sf.RootDir, func(path string, fi os.FileInfo, fileErr error) error {
662 if fileErr != nil {
663 sf.Log.Printf("WARN: ignoring error searching %s: %v", path, fileErr)
664 return nil
665 }
666 if !fi.IsDir() {
667 return nil
668 }
669
670 gitdir := filepath.Join(path, ".git")
671 if fi, err := os.Stat(gitdir); err != nil || !fi.IsDir() {
672 return nil
673 }
674
675 subpath, err := filepath.Rel(sf.RootDir, path)
676 if err != nil {
677 // According to WalkFunc docs, path is always filepath.Join(root,
678 // subpath). So Rel should always work.
679 return fmt.Errorf("filepath.Walk returned %s which is not relative to %s: %w", path, sf.RootDir, err)
680 }
681
682 name := filepath.ToSlash(subpath)
683 visit(name)
684
685 return filepath.SkipDir
686 })
687}
688
689func (s sourcegraphFake) UpdateIndexStatus(repositories []indexStatus) error {
690 // noop
691 return nil
692}
693
694// fakeID returns a deterministic ID based on name. Used for fakes and tests.
695func fakeID(name string) uint32 {
696 // magic at the end is to ensure we get a positive number when casting.
697 return uint32(crc32.ChecksumIEEE([]byte(name))%(1<<31-1) + 1)
698}
699
700type sourcegraphNop struct{}
701
702func (s sourcegraphNop) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
703 return nil, nil
704}
705
706func (s sourcegraphNop) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
707}
708
709func (s sourcegraphNop) UpdateIndexStatus(repositories []indexStatus) error {
710 return nil
711}