fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "bytes"
5 "context"
6 "errors"
7 "fmt"
8 "hash/crc32"
9 "log"
10 "math/rand"
11 "net/url"
12 "os"
13 "os/exec"
14 "path"
15 "path/filepath"
16 "strconv"
17 "strings"
18 "time"
19
20 "github.com/go-git/go-git/v5"
21 "golang.org/x/net/trace"
22
23 proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
24 "github.com/sourcegraph/zoekt/ctags"
25
26 "github.com/sourcegraph/zoekt"
27)
28
29// SourcegraphListResult is the return value of Sourcegraph.List. It is its
30// own type since internally we batch the calculation of index options. This
31// is exposed via IterateIndexOptions.
32//
33// This type has state and is coupled to the Sourcegraph implementation.
34type SourcegraphListResult struct {
35 // IDs is the set of Sourcegraph repository IDs that this replica needs
36 // to index.
37 IDs []uint32
38
39 // IterateIndexOptions best effort resolves the IndexOptions for RepoIDs. If
40 // any repository fails it internally logs. It uses the "config fingerprint"
41 // to reduce the amount of work done. This means we only resolve options for
42 // repositories which have been mutated since the last Sourcegraph.List
43 // call.
44 //
45 // Note: this has a side-effect of setting a the "config fingerprint". The
46 // config fingerprint means we only calculate index options for repositories
47 // that have changed since the last call to IterateIndexOptions. If you want
48 // to force calculation of index options use
49 // Sourcegraph.ForceIterateIndexOptions.
50 //
51 // Note: This should not be called concurrently with the Sourcegraph client.
52 IterateIndexOptions func(func(IndexOptions))
53}
54
55// Sourcegraph represents the Sourcegraph service. It informs the indexserver
56// what to index and which options to use.
57type Sourcegraph interface {
58 // List returns a list of repository IDs to index as well as a facility to
59 // fetch the indexing options.
60 //
61 // Note: The return value is not safe to use concurrently with future calls
62 // to List.
63 List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error)
64
65 // ForceIterateIndexOptions will best-effort calculate the index options for
66 // all repos. For each repo it will call either onSuccess or onError. This
67 // is the forced version of IterateIndexOptions, so will always calculate
68 // options for each id in repos.
69 ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32)
70
71 // UpdateIndexStatus sends a request to Sourcegraph to confirm that the
72 // given repositories have been indexed.
73 UpdateIndexStatus(repositories []indexStatus) error
74}
75
76type SourcegraphClientOption func(*sourcegraphClient)
77
78// WithBatchSize controls how many repository configurations we request a time.
79// If BatchSize is 0, we default to requesting 10,000 repositories at once.
80func WithBatchSize(batchSize int) SourcegraphClientOption {
81 return func(c *sourcegraphClient) {
82 c.BatchSize = batchSize
83 }
84}
85
86func newSourcegraphClient(rootURL *url.URL, hostname string, grpcClient proto.ZoektConfigurationServiceClient, opts ...SourcegraphClientOption) *sourcegraphClient {
87 client := &sourcegraphClient{
88 Root: rootURL,
89 Hostname: hostname,
90 BatchSize: 0,
91 grpcClient: grpcClient,
92 }
93
94 for _, opt := range opts {
95 opt(client)
96 }
97
98 return client
99}
100
101// sourcegraphClient contains methods which interact with the sourcegraph API.
102type sourcegraphClient struct {
103 // Root is the base URL for the Sourcegraph instance to index. Normally
104 // http://sourcegraph-frontend-internal or http://localhost:3090.
105 Root *url.URL
106
107 // Hostname is the name we advertise to Sourcegraph when asking for the
108 // list of repositories to index.
109 Hostname string
110
111 // BatchSize is how many repository configurations we request at once. If
112 // zero a value of 10000 is used.
113 BatchSize int
114
115 // grpcClient is used to make requests to the Sourcegraph instance if gRPC is enabled.
116 grpcClient proto.ZoektConfigurationServiceClient
117
118 // configFingerprintProto is the last config fingerprint (as GRPC) returned from
119 // Sourcegraph. It can be used for future calls to the configuration
120 // endpoint.
121 //
122 // configFingerprintProto is mutually exclusive with configFingerprint - this field
123 // will only be used if gRPC is enabled.
124 configFingerprintProto *proto.Fingerprint
125
126 // configFingerprintReset tracks when we should zero out the
127 // configFingerprint. We want to periodically do this just in case our
128 // configFingerprint logic is faulty. When it is cleared out, we fallback to
129 // calculating everything.
130 configFingerprintReset time.Time
131}
132
133func (s *sourcegraphClient) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
134 repos, err := s.listRepoIDs(ctx, indexed)
135 if err != nil {
136 return nil, fmt.Errorf("listRepoIDs: %w", err)
137 }
138
139 batchSize := s.BatchSize
140 if batchSize == 0 {
141 batchSize = 10_000
142 }
143
144 // Check if we should recalculate everything.
145 if time.Now().After(s.configFingerprintReset) {
146 // for every 500 repos we wait a minute. 2021-12-15 on sourcegraph.com
147 // this works out to every 100 minutes.
148 next := time.Duration(len(indexed)) * time.Minute / 500
149 if min := 5 * time.Minute; next < min {
150 next = min
151 }
152 next += time.Duration(rand.Int63n(int64(next) / 4)) // jitter
153 s.configFingerprintReset = time.Now().Add(next)
154
155 s.configFingerprintProto = nil
156 }
157
158 // getIndexOptionsFunc is a function that can be used to get the index
159 // options for a set of repos (while properly handling any configuration fingerprint
160 // changes).
161 //
162 // In general, this function provides a consistent fingerprint for each batch call,
163 // and updates the server state with the new fingerprint. If any of the batch calls
164 // fail, the old fingerprint is restored.
165 type getIndexOptionsFunc func(repos ...uint32) ([]indexOptionsItem, error)
166
167 mkGetIndexOptionsFunc := func(tr trace.Trace) getIndexOptionsFunc {
168 startingFingerPrint := s.configFingerprintProto
169 tr.LazyPrintf("fingerprint: %s", startingFingerPrint.String())
170
171 first := true
172 return func(repos ...uint32) ([]indexOptionsItem, error) {
173 options, nextFingerPrint, err := s.getIndexOptions(ctx, startingFingerPrint, repos)
174 if err != nil {
175 first = false
176 s.configFingerprintProto = startingFingerPrint
177
178 return nil, err
179 }
180
181 if first {
182 first = false
183 s.configFingerprintProto = nextFingerPrint
184 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint.String())
185 }
186
187 return options, nil
188 }
189 }
190
191 iterate := func(f func(IndexOptions)) {
192 start := time.Now()
193 tr := trace.New("getIndexOptions", "")
194 tr.LazyPrintf("getting index options for %d repos", len(repos))
195
196 defer func() {
197 metricResolveRevisionsDuration.Observe(time.Since(start).Seconds())
198 tr.Finish()
199 }()
200
201 getIndexOptions := mkGetIndexOptionsFunc(tr)
202
203 // We ask the frontend to get index options in batches.
204 for repos := range batched(repos, batchSize) {
205 start := time.Now()
206 options, err := getIndexOptions(repos...)
207 duration := time.Since(start)
208
209 if err != nil {
210 metricResolveRevisionDuration.WithLabelValues("false").Observe(duration.Seconds())
211 tr.LazyPrintf("failed fetching options batch: %v", err)
212 tr.SetError()
213
214 continue
215 }
216
217 metricResolveRevisionDuration.WithLabelValues("true").Observe(duration.Seconds())
218
219 for _, o := range options {
220 metricGetIndexOptions.Inc()
221
222 if o.Error != "" {
223 metricGetIndexOptionsError.Inc()
224 tr.LazyPrintf("failed fetching options for %v: %v", o.Name, o.Error)
225 tr.SetError()
226
227 continue
228 }
229 f(o.IndexOptions)
230 }
231 }
232 }
233
234 return &SourcegraphListResult{
235 IDs: repos,
236 IterateIndexOptions: iterate,
237 }, nil
238}
239
240func (s *sourcegraphClient) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
241 batchSize := s.BatchSize
242 if batchSize == 0 {
243 batchSize = 10_000
244 }
245
246 for repos := range batched(repos, batchSize) {
247 opts, _, err := s.getIndexOptions(context.Background(), nil, repos)
248 if err != nil {
249 for _, id := range repos {
250 onError(id, err)
251 }
252 continue
253 }
254 for _, o := range opts {
255 if o.RepoID > 0 && o.Error != "" {
256 onError(o.RepoID, errors.New(o.Error))
257 }
258 if o.Error == "" {
259 onSuccess(o.IndexOptions)
260 }
261 }
262 }
263}
264
265// indexOptionsItem wraps IndexOptions to also include an error returned by
266// the API.
267type indexOptionsItem struct {
268 IndexOptions
269 Error string
270}
271
272func (o *indexOptionsItem) FromProto(x *proto.ZoektIndexOptions) {
273 branches := make([]zoekt.RepositoryBranch, 0, len(x.Branches))
274 for _, b := range x.GetBranches() {
275 branches = append(branches, zoekt.RepositoryBranch{
276 Name: b.GetName(),
277 Version: b.GetVersion(),
278 })
279 }
280
281 item := indexOptionsItem{}
282 languageMap := make(map[string]ctags.CTagsParserType)
283
284 for _, lang := range x.GetLanguageMap() {
285 languageMap[lang.GetLanguage()] = ctags.CTagsParserType(lang.GetCtags().Number())
286 }
287
288 item.IndexOptions = IndexOptions{
289 RepoID: uint32(x.GetRepoId()),
290 LargeFiles: x.GetLargeFiles(),
291 Symbols: x.GetSymbols(),
292 Branches: branches,
293 Name: x.GetName(),
294
295 Priority: x.GetPriority(),
296
297 Public: x.GetPublic(),
298 Fork: x.GetFork(),
299 Archived: x.GetArchived(),
300
301 LanguageMap: languageMap,
302 ShardConcurrency: x.GetShardConcurrency(),
303
304 TenantID: int(x.TenantId),
305 }
306
307 item.Error = x.GetError()
308
309 *o = item
310}
311
312func (o *indexOptionsItem) ToProto() *proto.ZoektIndexOptions {
313 branches := make([]*proto.ZoektRepositoryBranch, 0, len(o.Branches))
314 for _, b := range o.Branches {
315 branches = append(branches, &proto.ZoektRepositoryBranch{
316 Name: b.Name,
317 Version: b.Version,
318 })
319 }
320
321 languageMap := make([]*proto.LanguageMapping, 0, len(o.LanguageMap))
322
323 for lang, parser := range o.LanguageMap {
324 languageMap = append(languageMap, &proto.LanguageMapping{
325 Language: lang,
326 Ctags: proto.CTagsParserType(parser),
327 })
328 }
329
330 return &proto.ZoektIndexOptions{
331 RepoId: int32(o.RepoID),
332 LargeFiles: o.LargeFiles,
333 Symbols: o.Symbols,
334 Branches: branches,
335 Name: o.Name,
336
337 Priority: o.Priority,
338
339 Public: o.Public,
340 Fork: o.Fork,
341 Archived: o.Archived,
342
343 Error: o.Error,
344
345 LanguageMap: languageMap,
346 ShardConcurrency: o.ShardConcurrency,
347
348 TenantId: int64(o.TenantID),
349 }
350}
351
352func (s *sourcegraphClient) getIndexOptions(ctx context.Context, fingerprint *proto.Fingerprint, repos []uint32) ([]indexOptionsItem, *proto.Fingerprint, error) {
353 repoIDs := make([]int32, 0, len(repos))
354 for _, id := range repos {
355 repoIDs = append(repoIDs, int32(id))
356 }
357
358 req := proto.SearchConfigurationRequest{
359 RepoIds: repoIDs,
360 Fingerprint: fingerprint,
361 }
362
363 response, err := s.grpcClient.SearchConfiguration(ctx, &req)
364 if err != nil {
365 return nil, nil, err
366 }
367
368 protoItems := response.GetUpdatedOptions()
369 items := make([]indexOptionsItem, 0, len(protoItems))
370 for _, x := range protoItems {
371 var item indexOptionsItem
372 item.FromProto(x)
373 item.IndexOptions.CloneURL = s.getCloneURL(item.Name)
374
375 items = append(items, item)
376 }
377
378 return items, response.GetFingerprint(), nil
379}
380
381func (s *sourcegraphClient) getCloneURL(name string) string {
382 return s.Root.ResolveReference(&url.URL{Path: path.Join("/.internal/git", name)}).String()
383}
384
385func (s *sourcegraphClient) listRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) {
386 var request proto.ListRequest
387 request.Hostname = s.Hostname
388 request.IndexedIds = make([]int32, 0, len(indexed))
389 for _, id := range indexed {
390 request.IndexedIds = append(request.IndexedIds, int32(id))
391 }
392
393 response, err := s.grpcClient.List(ctx, &request)
394 if err != nil {
395 return nil, err
396 }
397
398 repoIDs := make([]uint32, 0, len(response.RepoIds))
399 for _, id := range response.RepoIds {
400 repoIDs = append(repoIDs, uint32(id))
401 }
402
403 return repoIDs, nil
404}
405
406type indexStatus struct {
407 RepoID uint32
408 Branches []zoekt.RepositoryBranch
409 IndexTimeUnix int64
410}
411
412type updateIndexStatusRequest struct {
413 Repositories []indexStatus
414}
415
416func (u *updateIndexStatusRequest) ToProto() *proto.UpdateIndexStatusRequest {
417 repositories := make([]*proto.UpdateIndexStatusRequest_Repository, 0, len(u.Repositories))
418
419 for _, repo := range u.Repositories {
420 branches := make([]*proto.ZoektRepositoryBranch, 0, len(repo.Branches))
421
422 for _, branch := range repo.Branches {
423 branches = append(branches, &proto.ZoektRepositoryBranch{
424 Name: branch.Name,
425 Version: branch.Version,
426 })
427 }
428
429 repositories = append(repositories, &proto.UpdateIndexStatusRequest_Repository{
430 RepoId: repo.RepoID,
431 Branches: branches,
432 IndexTimeUnix: repo.IndexTimeUnix,
433 })
434 }
435
436 return &proto.UpdateIndexStatusRequest{
437 Repositories: repositories,
438 }
439}
440
441func (u *updateIndexStatusRequest) FromProto(x *proto.UpdateIndexStatusRequest) {
442 protoRepositories := x.GetRepositories()
443 repositories := make([]indexStatus, 0, len(protoRepositories))
444
445 for _, repo := range x.GetRepositories() {
446 protoBranches := repo.GetBranches()
447 branches := make([]zoekt.RepositoryBranch, 0, len(protoBranches))
448
449 for _, branch := range repo.GetBranches() {
450 branches = append(branches, zoekt.RepositoryBranch{
451 Name: branch.GetName(),
452 Version: branch.GetVersion(),
453 })
454 }
455
456 repositories = append(repositories, indexStatus{
457 RepoID: repo.GetRepoId(),
458 Branches: branches,
459 IndexTimeUnix: repo.GetIndexTimeUnix(),
460 })
461 }
462
463 *u = updateIndexStatusRequest{
464 Repositories: repositories,
465 }
466}
467
468// UpdateIndexStatus sends a request to Sourcegraph to confirm that the given
469// repositories have been indexed.
470func (s *sourcegraphClient) UpdateIndexStatus(repositories []indexStatus) error {
471 r := updateIndexStatusRequest{Repositories: repositories}
472
473 request := r.ToProto()
474 _, err := s.grpcClient.UpdateIndexStatus(context.Background(), request)
475 if err != nil {
476 return fmt.Errorf("failed to update index status: %w", err)
477 }
478
479 return nil
480}
481
482type sourcegraphFake struct {
483 RootDir string
484 Log *log.Logger
485}
486
487func (sf sourcegraphFake) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
488 repos, err := sf.ListRepoIDs(ctx, indexed)
489 if err != nil {
490 return nil, err
491 }
492
493 iterate := func(f func(IndexOptions)) {
494 opts, err := sf.GetIndexOptions(repos...)
495 if err != nil {
496 sf.Log.Printf("WARN: ignoring GetIndexOptions error: %v", err)
497 }
498 for _, opt := range opts {
499 if opt.Error != "" {
500 sf.Log.Printf("WARN: ignoring GetIndexOptions error for %s: %v", opt.Name, opt.Error)
501 continue
502 }
503 f(opt.IndexOptions)
504 }
505 }
506
507 return &SourcegraphListResult{
508 IDs: repos,
509 IterateIndexOptions: iterate,
510 }, nil
511}
512
513func (sf sourcegraphFake) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
514 opts, err := sf.GetIndexOptions(repos...)
515 if err != nil {
516 for _, id := range repos {
517 onError(id, err)
518 }
519 return
520 }
521 for _, o := range opts {
522 if o.RepoID > 0 && o.Error != "" {
523 onError(o.RepoID, errors.New(o.Error))
524 }
525 if o.Error == "" {
526 onSuccess(o.IndexOptions)
527 }
528 }
529}
530
531func (sf sourcegraphFake) GetIndexOptions(repos ...uint32) ([]indexOptionsItem, error) {
532 reposIdx := map[uint32]int{}
533 for i, id := range repos {
534 reposIdx[id] = i
535 }
536
537 items := make([]indexOptionsItem, len(repos))
538 err := sf.visitRepos(func(name string) {
539 idx, ok := reposIdx[sf.id(name)]
540 if !ok {
541 return
542 }
543 opts, err := sf.getIndexOptions(name)
544 if err != nil {
545 items[idx] = indexOptionsItem{Error: err.Error()}
546 } else {
547 items[idx] = indexOptionsItem{IndexOptions: opts}
548 }
549 })
550 if err != nil {
551 return nil, err
552 }
553
554 for i := range items {
555 if items[i].Error == "" && items[i].RepoID == 0 {
556 items[i].Error = "not found"
557 }
558 }
559
560 return items, nil
561}
562
563func (sf sourcegraphFake) getIndexOptions(name string) (IndexOptions, error) {
564 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name))
565 exists := func(p string) bool {
566 _, err := os.Stat(filepath.Join(dir, p))
567 return err == nil
568 }
569 float := func(p string) float64 {
570 b, _ := os.ReadFile(filepath.Join(dir, p))
571 f, _ := strconv.ParseFloat(string(bytes.TrimSpace(b)), 64)
572 return f
573 }
574
575 opts := IndexOptions{
576 RepoID: sf.id(name),
577 Name: name,
578 CloneURL: sf.getCloneURL(name),
579 Symbols: true,
580
581 Public: !exists("SG_PRIVATE"),
582 Fork: exists("SG_FORK"),
583 Archived: exists("SG_ARCHIVED"),
584
585 Priority: float("SG_PRIORITY"),
586 }
587
588 branches, err := sf.getBranches(name)
589 if err != nil {
590 return opts, err
591 }
592 opts.Branches = branches
593
594 return opts, nil
595}
596
597func (sf sourcegraphFake) getBranches(name string) ([]zoekt.RepositoryBranch, error) {
598 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name))
599 repo, err := git.PlainOpen(dir)
600 if err != nil {
601 return nil, err
602 }
603
604 cfg, err := repo.Config()
605 if err != nil {
606 return nil, err
607 }
608
609 sec := cfg.Raw.Section("zoekt")
610 branches := sec.Options.GetAll("branch")
611 if len(branches) == 0 {
612 branches = append(branches, "HEAD")
613 }
614
615 rBranches := make([]zoekt.RepositoryBranch, 0, len(branches))
616 for _, branch := range branches {
617 cmd := exec.Command("git", "rev-parse", branch)
618 cmd.Dir = dir
619 if b, err := cmd.Output(); err != nil {
620 sf.Log.Printf("WARN: Could not get branch %s/%s", name, branch)
621 } else {
622 version := string(bytes.TrimSpace(b))
623 rBranches = append(rBranches, zoekt.RepositoryBranch{
624 Name: branch,
625 Version: version,
626 })
627 }
628 }
629
630 if len(rBranches) == 0 {
631 return nil, fmt.Errorf("WARN: Could not get any branch revisions for repo %s", name)
632 }
633
634 return rBranches, nil
635}
636
637func (sf sourcegraphFake) id(name string) uint32 {
638 // allow overriding the ID.
639 idPath := filepath.Join(sf.RootDir, filepath.FromSlash(name), "SG_ID")
640 if b, _ := os.ReadFile(idPath); len(b) > 0 {
641 id, err := strconv.Atoi(strings.TrimSpace(string(b)))
642 if err == nil {
643 return uint32(id)
644 }
645 }
646 return fakeID(name)
647}
648
649func (sf sourcegraphFake) getCloneURL(name string) string {
650 return filepath.Join(sf.RootDir, filepath.FromSlash(name))
651}
652
653func (sf sourcegraphFake) ListRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) {
654 var repos []uint32
655 err := sf.visitRepos(func(name string) {
656 repos = append(repos, sf.id(name))
657 })
658 return repos, err
659}
660
661func (sf sourcegraphFake) visitRepos(visit func(name string)) error {
662 return filepath.Walk(sf.RootDir, func(path string, fi os.FileInfo, fileErr error) error {
663 if fileErr != nil {
664 sf.Log.Printf("WARN: ignoring error searching %s: %v", path, fileErr)
665 return nil
666 }
667 if !fi.IsDir() {
668 return nil
669 }
670
671 gitdir := filepath.Join(path, ".git")
672 if fi, err := os.Stat(gitdir); err != nil || !fi.IsDir() {
673 return nil
674 }
675
676 subpath, err := filepath.Rel(sf.RootDir, path)
677 if err != nil {
678 // According to WalkFunc docs, path is always filepath.Join(root,
679 // subpath). So Rel should always work.
680 return fmt.Errorf("filepath.Walk returned %s which is not relative to %s: %w", path, sf.RootDir, err)
681 }
682
683 name := filepath.ToSlash(subpath)
684 visit(name)
685
686 return filepath.SkipDir
687 })
688}
689
690func (s sourcegraphFake) UpdateIndexStatus(repositories []indexStatus) error {
691 // noop
692 return nil
693}
694
695// fakeID returns a deterministic ID based on name. Used for fakes and tests.
696func fakeID(name string) uint32 {
697 // magic at the end is to ensure we get a positive number when casting.
698 return uint32(crc32.ChecksumIEEE([]byte(name))%(1<<31-1) + 1)
699}
700
701type sourcegraphNop struct{}
702
703func (s sourcegraphNop) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
704 return nil, nil
705}
706
707func (s sourcegraphNop) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
708}
709
710func (s sourcegraphNop) UpdateIndexStatus(repositories []indexStatus) error {
711 return nil
712}