fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "bytes"
5 "context"
6 "errors"
7 "fmt"
8 "hash/crc32"
9 "log"
10 "math/rand"
11 "net/url"
12 "os"
13 "os/exec"
14 "path"
15 "path/filepath"
16 "strconv"
17 "strings"
18 "time"
19
20 "github.com/go-git/go-git/v5"
21 "golang.org/x/net/trace"
22
23 "github.com/sourcegraph/zoekt"
24 configv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/sourcegraph/zoekt/configuration/v1"
25)
26
27// SourcegraphListResult is the return value of Sourcegraph.List. It is its
28// own type since internally we batch the calculation of index options. This
29// is exposed via IterateIndexOptions.
30//
31// This type has state and is coupled to the Sourcegraph implementation.
32type SourcegraphListResult struct {
33 // IDs is the set of Sourcegraph repository IDs that this replica needs
34 // to index.
35 IDs []uint32
36
37 // IterateIndexOptions best effort resolves the IndexOptions for RepoIDs. If
38 // any repository fails it internally logs. It uses the "config fingerprint"
39 // to reduce the amount of work done. This means we only resolve options for
40 // repositories which have been mutated since the last Sourcegraph.List
41 // call.
42 //
43 // Note: this has a side-effect of setting a the "config fingerprint". The
44 // config fingerprint means we only calculate index options for repositories
45 // that have changed since the last call to IterateIndexOptions. If you want
46 // to force calculation of index options use
47 // Sourcegraph.ForceIterateIndexOptions.
48 //
49 // Note: This should not be called concurrently with the Sourcegraph client.
50 IterateIndexOptions func(func(IndexOptions))
51}
52
53// Sourcegraph represents the Sourcegraph service. It informs the indexserver
54// what to index and which options to use.
55type Sourcegraph interface {
56 // List returns a list of repository IDs to index as well as a facility to
57 // fetch the indexing options.
58 //
59 // Note: The return value is not safe to use concurrently with future calls
60 // to List.
61 List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error)
62
63 // ForceIterateIndexOptions will best-effort calculate the index options for
64 // all repos. For each repo it will call either onSuccess or onError. This
65 // is the forced version of IterateIndexOptions, so will always calculate
66 // options for each id in repos.
67 ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32)
68
69 // UpdateIndexStatus sends a request to Sourcegraph to confirm that the
70 // given repositories have been indexed.
71 UpdateIndexStatus(repositories []indexStatus) error
72}
73
74type SourcegraphClientOption func(*sourcegraphClient)
75
76// WithBatchSize controls how many repository configurations we request a time.
77// If BatchSize is 0, we default to requesting 10,000 repositories at once.
78func WithBatchSize(batchSize int) SourcegraphClientOption {
79 return func(c *sourcegraphClient) {
80 c.BatchSize = batchSize
81 }
82}
83
84func newSourcegraphClient(rootURL *url.URL, hostname string, grpcClient configv1.ZoektConfigurationServiceClient, opts ...SourcegraphClientOption) *sourcegraphClient {
85 client := &sourcegraphClient{
86 Root: rootURL,
87 Hostname: hostname,
88 BatchSize: 0,
89 grpcClient: grpcClient,
90 }
91
92 for _, opt := range opts {
93 opt(client)
94 }
95
96 return client
97}
98
99// sourcegraphClient contains methods which interact with the sourcegraph API.
100type sourcegraphClient struct {
101 // Root is the base URL for the Sourcegraph instance to index. Normally
102 // http://sourcegraph-frontend-internal or http://localhost:3090.
103 Root *url.URL
104
105 // Hostname is the name we advertise to Sourcegraph when asking for the
106 // list of repositories to index.
107 Hostname string
108
109 // BatchSize is how many repository configurations we request at once. If
110 // zero a value of 10000 is used.
111 BatchSize int
112
113 // grpcClient is used to make requests to the Sourcegraph instance if gRPC is enabled.
114 grpcClient configv1.ZoektConfigurationServiceClient
115
116 // configFingerprintProto is the last config fingerprint (as GRPC) returned from
117 // Sourcegraph. It can be used for future calls to the configuration
118 // endpoint.
119 //
120 // configFingerprintProto is mutually exclusive with configFingerprint - this field
121 // will only be used if gRPC is enabled.
122 configFingerprintProto *configv1.Fingerprint
123
124 // configFingerprintReset tracks when we should zero out the
125 // configFingerprint. We want to periodically do this just in case our
126 // configFingerprint logic is faulty. When it is cleared out, we fallback to
127 // calculating everything.
128 configFingerprintReset time.Time
129}
130
131func (s *sourcegraphClient) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
132 repos, err := s.listRepoIDs(ctx, indexed)
133 if err != nil {
134 return nil, fmt.Errorf("listRepoIDs: %w", err)
135 }
136
137 batchSize := s.BatchSize
138 if batchSize == 0 {
139 batchSize = 10_000
140 }
141
142 // Check if we should recalculate everything.
143 if time.Now().After(s.configFingerprintReset) {
144 // for every 500 repos we wait a minute. 2021-12-15 on sourcegraph.com
145 // this works out to every 100 minutes.
146 next := time.Duration(len(indexed)) * time.Minute / 500
147 if min := 5 * time.Minute; next < min {
148 next = min
149 }
150 next += time.Duration(rand.Int63n(int64(next) / 4)) // jitter
151 s.configFingerprintReset = time.Now().Add(next)
152
153 s.configFingerprintProto = nil
154 }
155
156 // getIndexOptionsFunc is a function that can be used to get the index
157 // options for a set of repos (while properly handling any configuration fingerprint
158 // changes).
159 //
160 // In general, this function provides a consistent fingerprint for each batch call,
161 // and updates the server state with the new fingerprint. If any of the batch calls
162 // fail, the old fingerprint is restored.
163 type getIndexOptionsFunc func(repos ...uint32) ([]indexOptionsItem, error)
164
165 mkGetIndexOptionsFunc := func(tr trace.Trace) getIndexOptionsFunc {
166 startingFingerPrint := s.configFingerprintProto
167 tr.LazyPrintf("fingerprint: %s", startingFingerPrint.String())
168
169 first := true
170 return func(repos ...uint32) ([]indexOptionsItem, error) {
171 options, nextFingerPrint, err := s.getIndexOptions(ctx, startingFingerPrint, repos)
172 if err != nil {
173 first = false
174 s.configFingerprintProto = startingFingerPrint
175
176 return nil, err
177 }
178
179 if first {
180 first = false
181 s.configFingerprintProto = nextFingerPrint
182 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint.String())
183 }
184
185 return options, nil
186 }
187 }
188
189 iterate := func(f func(IndexOptions)) {
190 start := time.Now()
191 tr := trace.New("getIndexOptions", "")
192 tr.LazyPrintf("getting index options for %d repos", len(repos))
193
194 defer func() {
195 metricResolveRevisionsDuration.Observe(time.Since(start).Seconds())
196 tr.Finish()
197 }()
198
199 getIndexOptions := mkGetIndexOptionsFunc(tr)
200
201 // We ask the frontend to get index options in batches.
202 for repos := range batched(repos, batchSize) {
203 start := time.Now()
204 options, err := getIndexOptions(repos...)
205 duration := time.Since(start)
206
207 if err != nil {
208 metricResolveRevisionDuration.WithLabelValues("false").Observe(duration.Seconds())
209 tr.LazyPrintf("failed fetching options batch: %v", err)
210 tr.SetError()
211
212 continue
213 }
214
215 metricResolveRevisionDuration.WithLabelValues("true").Observe(duration.Seconds())
216
217 for _, o := range options {
218 metricGetIndexOptions.Inc()
219
220 if o.Error != "" {
221 metricGetIndexOptionsError.Inc()
222 tr.LazyPrintf("failed fetching options for %v: %v", o.Name, o.Error)
223 tr.SetError()
224
225 continue
226 }
227 f(o.IndexOptions)
228 }
229 }
230 }
231
232 return &SourcegraphListResult{
233 IDs: repos,
234 IterateIndexOptions: iterate,
235 }, nil
236}
237
238func (s *sourcegraphClient) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
239 batchSize := s.BatchSize
240 if batchSize == 0 {
241 batchSize = 10_000
242 }
243
244 for repos := range batched(repos, batchSize) {
245 opts, _, err := s.getIndexOptions(context.Background(), nil, repos)
246 if err != nil {
247 for _, id := range repos {
248 onError(id, err)
249 }
250 continue
251 }
252 for _, o := range opts {
253 if o.RepoID > 0 && o.Error != "" {
254 onError(o.RepoID, errors.New(o.Error))
255 }
256 if o.Error == "" {
257 onSuccess(o.IndexOptions)
258 }
259 }
260 }
261}
262
263// indexOptionsItem wraps IndexOptions to also include an error returned by
264// the API.
265type indexOptionsItem struct {
266 IndexOptions
267 Error string
268}
269
270func (o *indexOptionsItem) FromProto(x *configv1.ZoektIndexOptions) {
271 item := indexOptionsItem{}
272 item.IndexOptions.FromProto(x)
273 item.Error = x.GetError()
274 *o = item
275}
276
277func (o *indexOptionsItem) ToProto() *configv1.ZoektIndexOptions {
278 branches := make([]*configv1.ZoektRepositoryBranch, 0, len(o.Branches))
279 for _, b := range o.Branches {
280 branches = append(branches, &configv1.ZoektRepositoryBranch{
281 Name: b.Name,
282 Version: b.Version,
283 })
284 }
285
286 languageMap := make([]*configv1.LanguageMapping, 0, len(o.LanguageMap))
287
288 for lang, parser := range o.LanguageMap {
289 languageMap = append(languageMap, &configv1.LanguageMapping{
290 Language: lang,
291 Ctags: configv1.CTagsParserType(parser),
292 })
293 }
294
295 return &configv1.ZoektIndexOptions{
296 RepoId: int32(o.RepoID),
297 LargeFiles: o.LargeFiles,
298 Symbols: o.Symbols,
299 Branches: branches,
300 Name: o.Name,
301
302 Priority: o.Priority,
303
304 Public: o.Public,
305 Fork: o.Fork,
306 Archived: o.Archived,
307
308 Error: o.Error,
309
310 LanguageMap: languageMap,
311 ShardConcurrency: o.ShardConcurrency,
312
313 TenantId: int64(o.TenantID),
314 }
315}
316
317func (s *sourcegraphClient) getIndexOptions(ctx context.Context, fingerprint *configv1.Fingerprint, repos []uint32) ([]indexOptionsItem, *configv1.Fingerprint, error) {
318 repoIDs := make([]int32, 0, len(repos))
319 for _, id := range repos {
320 repoIDs = append(repoIDs, int32(id))
321 }
322
323 req := configv1.SearchConfigurationRequest{
324 RepoIds: repoIDs,
325 Fingerprint: fingerprint,
326 }
327
328 response, err := s.grpcClient.SearchConfiguration(ctx, &req)
329 if err != nil {
330 return nil, nil, err
331 }
332
333 protoItems := response.GetUpdatedOptions()
334 items := make([]indexOptionsItem, 0, len(protoItems))
335 for _, x := range protoItems {
336 var item indexOptionsItem
337 item.FromProto(x)
338 item.IndexOptions.CloneURL = s.getCloneURL(item.Name)
339
340 items = append(items, item)
341 }
342
343 return items, response.GetFingerprint(), nil
344}
345
346func (s *sourcegraphClient) getCloneURL(name string) string {
347 return s.Root.ResolveReference(&url.URL{Path: path.Join("/.internal/git", name)}).String()
348}
349
350func (s *sourcegraphClient) listRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) {
351 var request configv1.ListRequest
352 request.Hostname = s.Hostname
353 request.IndexedIds = make([]int32, 0, len(indexed))
354 for _, id := range indexed {
355 request.IndexedIds = append(request.IndexedIds, int32(id))
356 }
357
358 response, err := s.grpcClient.List(ctx, &request)
359 if err != nil {
360 return nil, err
361 }
362
363 repoIDs := make([]uint32, 0, len(response.RepoIds))
364 for _, id := range response.RepoIds {
365 repoIDs = append(repoIDs, uint32(id))
366 }
367
368 return repoIDs, nil
369}
370
371type indexStatus struct {
372 RepoID uint32
373 Branches []zoekt.RepositoryBranch
374 IndexTimeUnix int64
375}
376
377type updateIndexStatusRequest struct {
378 Repositories []indexStatus
379}
380
381func (u *updateIndexStatusRequest) ToProto() *configv1.UpdateIndexStatusRequest {
382 repositories := make([]*configv1.UpdateIndexStatusRequest_Repository, 0, len(u.Repositories))
383
384 for _, repo := range u.Repositories {
385 branches := make([]*configv1.ZoektRepositoryBranch, 0, len(repo.Branches))
386
387 for _, branch := range repo.Branches {
388 branches = append(branches, &configv1.ZoektRepositoryBranch{
389 Name: branch.Name,
390 Version: branch.Version,
391 })
392 }
393
394 repositories = append(repositories, &configv1.UpdateIndexStatusRequest_Repository{
395 RepoId: repo.RepoID,
396 Branches: branches,
397 IndexTimeUnix: repo.IndexTimeUnix,
398 })
399 }
400
401 return &configv1.UpdateIndexStatusRequest{
402 Repositories: repositories,
403 }
404}
405
406func (u *updateIndexStatusRequest) FromProto(x *configv1.UpdateIndexStatusRequest) {
407 protoRepositories := x.GetRepositories()
408 repositories := make([]indexStatus, 0, len(protoRepositories))
409
410 for _, repo := range x.GetRepositories() {
411 protoBranches := repo.GetBranches()
412 branches := make([]zoekt.RepositoryBranch, 0, len(protoBranches))
413
414 for _, branch := range repo.GetBranches() {
415 branches = append(branches, zoekt.RepositoryBranch{
416 Name: branch.GetName(),
417 Version: branch.GetVersion(),
418 })
419 }
420
421 repositories = append(repositories, indexStatus{
422 RepoID: repo.GetRepoId(),
423 Branches: branches,
424 IndexTimeUnix: repo.GetIndexTimeUnix(),
425 })
426 }
427
428 *u = updateIndexStatusRequest{
429 Repositories: repositories,
430 }
431}
432
433// UpdateIndexStatus sends a request to Sourcegraph to confirm that the given
434// repositories have been indexed.
435func (s *sourcegraphClient) UpdateIndexStatus(repositories []indexStatus) error {
436 r := updateIndexStatusRequest{Repositories: repositories}
437
438 request := r.ToProto()
439 _, err := s.grpcClient.UpdateIndexStatus(context.Background(), request)
440 if err != nil {
441 return fmt.Errorf("failed to update index status: %w", err)
442 }
443
444 return nil
445}
446
447type sourcegraphFake struct {
448 RootDir string
449 Log *log.Logger
450}
451
452func (sf sourcegraphFake) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
453 repos, err := sf.ListRepoIDs(ctx, indexed)
454 if err != nil {
455 return nil, err
456 }
457
458 iterate := func(f func(IndexOptions)) {
459 opts, err := sf.GetIndexOptions(repos...)
460 if err != nil {
461 sf.Log.Printf("WARN: ignoring GetIndexOptions error: %v", err)
462 }
463 for _, opt := range opts {
464 if opt.Error != "" {
465 sf.Log.Printf("WARN: ignoring GetIndexOptions error for %s: %v", opt.Name, opt.Error)
466 continue
467 }
468 f(opt.IndexOptions)
469 }
470 }
471
472 return &SourcegraphListResult{
473 IDs: repos,
474 IterateIndexOptions: iterate,
475 }, nil
476}
477
478func (sf sourcegraphFake) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
479 opts, err := sf.GetIndexOptions(repos...)
480 if err != nil {
481 for _, id := range repos {
482 onError(id, err)
483 }
484 return
485 }
486 for _, o := range opts {
487 if o.RepoID > 0 && o.Error != "" {
488 onError(o.RepoID, errors.New(o.Error))
489 }
490 if o.Error == "" {
491 onSuccess(o.IndexOptions)
492 }
493 }
494}
495
496func (sf sourcegraphFake) GetIndexOptions(repos ...uint32) ([]indexOptionsItem, error) {
497 reposIdx := map[uint32]int{}
498 for i, id := range repos {
499 reposIdx[id] = i
500 }
501
502 items := make([]indexOptionsItem, len(repos))
503 err := sf.visitRepos(func(name string) {
504 idx, ok := reposIdx[sf.id(name)]
505 if !ok {
506 return
507 }
508 opts, err := sf.getIndexOptions(name)
509 if err != nil {
510 items[idx] = indexOptionsItem{Error: err.Error()}
511 } else {
512 items[idx] = indexOptionsItem{IndexOptions: opts}
513 }
514 })
515 if err != nil {
516 return nil, err
517 }
518
519 for i := range items {
520 if items[i].Error == "" && items[i].RepoID == 0 {
521 items[i].Error = "not found"
522 }
523 }
524
525 return items, nil
526}
527
528func (sf sourcegraphFake) getIndexOptions(name string) (IndexOptions, error) {
529 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name))
530 exists := func(p string) bool {
531 _, err := os.Stat(filepath.Join(dir, p))
532 return err == nil
533 }
534 float := func(p string) float64 {
535 b, _ := os.ReadFile(filepath.Join(dir, p))
536 f, _ := strconv.ParseFloat(string(bytes.TrimSpace(b)), 64)
537 return f
538 }
539
540 opts := IndexOptions{
541 RepoID: sf.id(name),
542 Name: name,
543 CloneURL: sf.getCloneURL(name),
544 Symbols: true,
545
546 Public: !exists("SG_PRIVATE"),
547 Fork: exists("SG_FORK"),
548 Archived: exists("SG_ARCHIVED"),
549
550 Priority: float("SG_PRIORITY"),
551 }
552
553 branches, err := sf.getBranches(name)
554 if err != nil {
555 return opts, err
556 }
557 opts.Branches = branches
558
559 return opts, nil
560}
561
562func (sf sourcegraphFake) getBranches(name string) ([]zoekt.RepositoryBranch, error) {
563 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name))
564 repo, err := git.PlainOpen(dir)
565 if err != nil {
566 return nil, err
567 }
568
569 cfg, err := repo.Config()
570 if err != nil {
571 return nil, err
572 }
573
574 sec := cfg.Raw.Section("zoekt")
575 branches := sec.Options.GetAll("branch")
576 if len(branches) == 0 {
577 branches = append(branches, "HEAD")
578 }
579
580 rBranches := make([]zoekt.RepositoryBranch, 0, len(branches))
581 for _, branch := range branches {
582 cmd := exec.Command("git", "rev-parse", branch)
583 cmd.Dir = dir
584 if b, err := cmd.Output(); err != nil {
585 sf.Log.Printf("WARN: Could not get branch %s/%s", name, branch)
586 } else {
587 version := string(bytes.TrimSpace(b))
588 rBranches = append(rBranches, zoekt.RepositoryBranch{
589 Name: branch,
590 Version: version,
591 })
592 }
593 }
594
595 if len(rBranches) == 0 {
596 return nil, fmt.Errorf("WARN: Could not get any branch revisions for repo %s", name)
597 }
598
599 return rBranches, nil
600}
601
602func (sf sourcegraphFake) id(name string) uint32 {
603 // allow overriding the ID.
604 idPath := filepath.Join(sf.RootDir, filepath.FromSlash(name), "SG_ID")
605 if b, _ := os.ReadFile(idPath); len(b) > 0 {
606 id, err := strconv.Atoi(strings.TrimSpace(string(b)))
607 if err == nil {
608 return uint32(id)
609 }
610 }
611 return fakeID(name)
612}
613
614func (sf sourcegraphFake) getCloneURL(name string) string {
615 return filepath.Join(sf.RootDir, filepath.FromSlash(name))
616}
617
618func (sf sourcegraphFake) ListRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) {
619 var repos []uint32
620 err := sf.visitRepos(func(name string) {
621 repos = append(repos, sf.id(name))
622 })
623 return repos, err
624}
625
626func (sf sourcegraphFake) visitRepos(visit func(name string)) error {
627 return filepath.Walk(sf.RootDir, func(path string, fi os.FileInfo, fileErr error) error {
628 if fileErr != nil {
629 sf.Log.Printf("WARN: ignoring error searching %s: %v", path, fileErr)
630 return nil
631 }
632 if !fi.IsDir() {
633 return nil
634 }
635
636 gitdir := filepath.Join(path, ".git")
637 if fi, err := os.Stat(gitdir); err != nil || !fi.IsDir() {
638 return nil
639 }
640
641 subpath, err := filepath.Rel(sf.RootDir, path)
642 if err != nil {
643 // According to WalkFunc docs, path is always filepath.Join(root,
644 // subpath). So Rel should always work.
645 return fmt.Errorf("filepath.Walk returned %s which is not relative to %s: %w", path, sf.RootDir, err)
646 }
647
648 name := filepath.ToSlash(subpath)
649 visit(name)
650
651 return filepath.SkipDir
652 })
653}
654
655func (s sourcegraphFake) UpdateIndexStatus(repositories []indexStatus) error {
656 // noop
657 return nil
658}
659
660// fakeID returns a deterministic ID based on name. Used for fakes and tests.
661func fakeID(name string) uint32 {
662 // magic at the end is to ensure we get a positive number when casting.
663 return uint32(crc32.ChecksumIEEE([]byte(name))%(1<<31-1) + 1)
664}
665
666type sourcegraphNop struct{}
667
668func (s sourcegraphNop) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
669 return nil, nil
670}
671
672func (s sourcegraphNop) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
673}
674
675func (s sourcegraphNop) UpdateIndexStatus(repositories []indexStatus) error {
676 return nil
677}