fork of https://github.com/sourcegraph/zoekt
1package main
2
3import (
4 "bytes"
5 "context"
6 "errors"
7 "fmt"
8 "hash/crc32"
9 "log"
10 "math/rand"
11 "net/url"
12 "os"
13 "os/exec"
14 "path"
15 "path/filepath"
16 "strconv"
17 "strings"
18 "time"
19
20 "github.com/go-git/go-git/v5"
21 "golang.org/x/net/trace"
22
23 "github.com/sourcegraph/zoekt"
24 configv1 "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/grpc/protos/sourcegraph/zoekt/configuration/v1"
25)
26
27// SourcegraphListResult is the return value of Sourcegraph.List. It is its
28// own type since internally we batch the calculation of index options. This
29// is exposed via IterateIndexOptions.
30//
31// This type has state and is coupled to the Sourcegraph implementation.
32type SourcegraphListResult struct {
33 // IDs is the set of Sourcegraph repository IDs that this replica needs
34 // to index.
35 IDs []uint32
36
37 // IterateIndexOptions best effort resolves the IndexOptions for RepoIDs. If
38 // any repository fails it internally logs. It uses the "config fingerprint"
39 // to reduce the amount of work done. This means we only resolve options for
40 // repositories which have been mutated since the last Sourcegraph.List
41 // call.
42 //
43 // Note: this has a side-effect of setting a the "config fingerprint". The
44 // config fingerprint means we only calculate index options for repositories
45 // that have changed since the last call to IterateIndexOptions. If you want
46 // to force calculation of index options use
47 // Sourcegraph.ForceIterateIndexOptions.
48 //
49 // Note: This should not be called concurrently with the Sourcegraph client.
50 IterateIndexOptions func(func(IndexOptions))
51}
52
53// Sourcegraph represents the Sourcegraph service. It informs the indexserver
54// what to index and which options to use.
55type Sourcegraph interface {
56 // List returns a list of repository IDs to index as well as a facility to
57 // fetch the indexing options.
58 //
59 // Note: The return value is not safe to use concurrently with future calls
60 // to List.
61 List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error)
62
63 // ForceIterateIndexOptions will best-effort calculate the index options for
64 // all repos. For each repo it will call either onSuccess or onError. This
65 // is the forced version of IterateIndexOptions, so will always calculate
66 // options for each id in repos.
67 ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32)
68
69 // UpdateIndexStatus sends a request to Sourcegraph to confirm that the
70 // given repositories have been indexed.
71 UpdateIndexStatus(repositories []indexStatus) error
72}
73
74type SourcegraphClientOption func(*sourcegraphClient)
75
76// WithBatchSize controls how many repository configurations we request a time.
77// If BatchSize is 0, we default to requesting 10,000 repositories at once.
78func WithBatchSize(batchSize int) SourcegraphClientOption {
79 return func(c *sourcegraphClient) {
80 c.BatchSize = batchSize
81 }
82}
83
84func newSourcegraphClient(rootURL *url.URL, hostname string, grpcClient configv1.ZoektConfigurationServiceClient, opts ...SourcegraphClientOption) *sourcegraphClient {
85 client := &sourcegraphClient{
86 Root: rootURL,
87 Hostname: hostname,
88 BatchSize: 0,
89 grpcClient: grpcClient,
90 }
91
92 for _, opt := range opts {
93 opt(client)
94 }
95
96 return client
97}
98
99// sourcegraphClient contains methods which interact with the sourcegraph API.
100type sourcegraphClient struct {
101 // Root is the base URL for the Sourcegraph instance to index. Normally
102 // http://sourcegraph-frontend-internal or http://localhost:3090.
103 Root *url.URL
104
105 // Hostname is the name we advertise to Sourcegraph when asking for the
106 // list of repositories to index.
107 Hostname string
108
109 // BatchSize is how many repository configurations we request at once. If
110 // zero a value of 10000 is used.
111 BatchSize int
112
113 // grpcClient is used to make requests to the Sourcegraph instance if gRPC is enabled.
114 grpcClient configv1.ZoektConfigurationServiceClient
115
116 // configFingerprintProto is the last config fingerprint (as GRPC) returned from
117 // Sourcegraph. It can be used for future calls to the configuration
118 // endpoint.
119 //
120 // configFingerprintProto is mutually exclusive with configFingerprint - this field
121 // will only be used if gRPC is enabled.
122 configFingerprintProto *configv1.Fingerprint
123
124 // configFingerprintReset tracks when we should zero out the
125 // configFingerprint. We want to periodically do this just in case our
126 // configFingerprint logic is faulty. When it is cleared out, we fallback to
127 // calculating everything.
128 configFingerprintReset time.Time
129}
130
131func (s *sourcegraphClient) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
132 repos, err := s.listRepoIDs(ctx, indexed)
133 if err != nil {
134 return nil, fmt.Errorf("listRepoIDs: %w", err)
135 }
136
137 batchSize := s.BatchSize
138 if batchSize == 0 {
139 batchSize = 10_000
140 }
141
142 // Check if we should recalculate everything.
143 if time.Now().After(s.configFingerprintReset) {
144 // for every 500 repos we wait a minute. 2021-12-15 on sourcegraph.com
145 // this works out to every 100 minutes.
146 next := time.Duration(len(indexed)) * time.Minute / 500
147 if min := 5 * time.Minute; next < min {
148 next = min
149 }
150 next += time.Duration(rand.Int63n(int64(next) / 4)) // jitter
151 s.configFingerprintReset = time.Now().Add(next)
152
153 s.configFingerprintProto = nil
154 }
155
156 // getIndexOptionsFunc is a function that can be used to get the index
157 // options for a set of repos (while properly handling any configuration fingerprint
158 // changes).
159 //
160 // In general, this function provides a consistent fingerprint for each batch call,
161 // and updates the server state with the new fingerprint. If any of the batch calls
162 // fail, the old fingerprint is restored.
163 type getIndexOptionsFunc func(repos ...uint32) ([]indexOptionsItem, error)
164
165 mkGetIndexOptionsFunc := func(tr trace.Trace) getIndexOptionsFunc {
166 startingFingerPrint := s.configFingerprintProto
167 tr.LazyPrintf("fingerprint: %s", startingFingerPrint.String())
168
169 first := true
170 return func(repos ...uint32) ([]indexOptionsItem, error) {
171 options, nextFingerPrint, err := s.getIndexOptions(ctx, startingFingerPrint, repos)
172 if err != nil {
173 first = false
174 s.configFingerprintProto = startingFingerPrint
175
176 return nil, err
177 }
178
179 if first {
180 first = false
181 s.configFingerprintProto = nextFingerPrint
182 tr.LazyPrintf("new fingerprint: %s", nextFingerPrint.String())
183 }
184
185 return options, nil
186 }
187 }
188
189 iterate := func(f func(IndexOptions)) {
190 start := time.Now()
191 tr := trace.New("getIndexOptions", "")
192 tr.LazyPrintf("getting index options for %d repos", len(repos))
193
194 defer func() {
195 metricResolveRevisionsDuration.Observe(time.Since(start).Seconds())
196 tr.Finish()
197 }()
198
199 getIndexOptions := mkGetIndexOptionsFunc(tr)
200
201 // We ask the frontend to get index options in batches.
202 for repos := range batched(repos, batchSize) {
203 start := time.Now()
204 options, err := getIndexOptions(repos...)
205 duration := time.Since(start)
206
207 if err != nil {
208 metricResolveRevisionDuration.WithLabelValues("false").Observe(duration.Seconds())
209 tr.LazyPrintf("failed fetching options batch: %v", err)
210 tr.SetError()
211
212 continue
213 }
214
215 metricResolveRevisionDuration.WithLabelValues("true").Observe(duration.Seconds())
216
217 for _, o := range options {
218 metricGetIndexOptions.Inc()
219
220 if o.Error != "" {
221 metricGetIndexOptionsError.Inc()
222 tr.LazyPrintf("failed fetching options for %v: %v", o.Name, o.Error)
223 tr.SetError()
224
225 continue
226 }
227 f(o.IndexOptions)
228 }
229 }
230 }
231
232 return &SourcegraphListResult{
233 IDs: repos,
234 IterateIndexOptions: iterate,
235 }, nil
236}
237
238func (s *sourcegraphClient) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
239 batchSize := s.BatchSize
240 if batchSize == 0 {
241 batchSize = 10_000
242 }
243
244 for repos := range batched(repos, batchSize) {
245 opts, _, err := s.getIndexOptions(context.Background(), nil, repos)
246 if err != nil {
247 for _, id := range repos {
248 onError(id, err)
249 }
250 continue
251 }
252 for _, o := range opts {
253 if o.RepoID > 0 && o.Error != "" {
254 onError(o.RepoID, errors.New(o.Error))
255 }
256 if o.Error == "" {
257 onSuccess(o.IndexOptions)
258 }
259 }
260 }
261}
262
263// indexOptionsItem wraps IndexOptions to also include an error returned by
264// the API.
265type indexOptionsItem struct {
266 IndexOptions
267 Error string
268}
269
270func (o *indexOptionsItem) FromProto(x *configv1.ZoektIndexOptions) {
271 item := indexOptionsItem{}
272 item.IndexOptions.FromProto(x)
273 item.Error = x.GetError()
274 *o = item
275}
276
277func (o *indexOptionsItem) ToProto() *configv1.ZoektIndexOptions {
278 branches := make([]*configv1.ZoektRepositoryBranch, 0, len(o.Branches))
279 for _, b := range o.Branches {
280 branches = append(branches, &configv1.ZoektRepositoryBranch{
281 Name: b.Name,
282 Version: b.Version,
283 })
284 }
285
286 languageMap := make([]*configv1.LanguageMapping, 0, len(o.LanguageMap))
287
288 for lang, parser := range o.LanguageMap {
289 languageMap = append(languageMap, &configv1.LanguageMapping{
290 Language: lang,
291 Ctags: configv1.CTagsParserType(parser),
292 })
293 }
294
295 return &configv1.ZoektIndexOptions{
296 RepoId: int32(o.RepoID),
297 LargeFiles: o.LargeFiles,
298 Symbols: o.Symbols,
299 Branches: branches,
300 Name: o.Name,
301
302 Priority: o.Priority,
303
304 Public: o.Public,
305 Fork: o.Fork,
306 Archived: o.Archived,
307
308 Error: o.Error,
309
310 LanguageMap: languageMap,
311 ShardConcurrency: o.ShardConcurrency,
312
313 TenantId: int64(o.TenantID),
314 }
315}
316
317func (s *sourcegraphClient) getIndexOptions(ctx context.Context, fingerprint *configv1.Fingerprint, repos []uint32) ([]indexOptionsItem, *configv1.Fingerprint, error) {
318 repoIDs := make([]int32, 0, len(repos))
319 for _, id := range repos {
320 repoIDs = append(repoIDs, int32(id))
321 }
322
323 req := configv1.SearchConfigurationRequest{
324 RepoIds: repoIDs,
325 Fingerprint: fingerprint,
326 }
327
328 response, err := s.grpcClient.SearchConfiguration(ctx, &req)
329 if err != nil {
330 return nil, nil, err
331 }
332
333 protoItems := response.GetUpdatedOptions()
334 items := make([]indexOptionsItem, 0, len(protoItems))
335 for _, x := range protoItems {
336 var item indexOptionsItem
337 item.FromProto(x)
338 item.IndexOptions.CloneURL = s.getCloneURL(item.Name)
339
340 items = append(items, item)
341 }
342
343 return items, response.GetFingerprint(), nil
344}
345
346func (s *sourcegraphClient) getCloneURL(name string) string {
347 return s.Root.ResolveReference(&url.URL{Path: path.Join("/.internal/git", name)}).String()
348}
349
350func (s *sourcegraphClient) listRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) {
351 var request configv1.ListRequest
352 request.Hostname = s.Hostname
353 request.IndexedIds = make([]int32, 0, len(indexed))
354 for _, id := range indexed {
355 request.IndexedIds = append(request.IndexedIds, int32(id))
356 }
357
358 response, err := s.grpcClient.List(ctx, &request)
359 if err != nil {
360 return nil, err
361 }
362
363 repoIDs := make([]uint32, 0, len(response.RepoIds))
364 for _, id := range response.RepoIds {
365 repoIDs = append(repoIDs, uint32(id))
366 }
367
368 return repoIDs, nil
369}
370
371type indexStatus struct {
372 RepoID uint32
373 Branches []zoekt.RepositoryBranch
374 IndexTimeUnix int64
375 State configv1.UpdateIndexStatusRequest_Repository_State
376 FailureMessage string
377}
378
379type updateIndexStatusRequest struct {
380 Repositories []indexStatus
381}
382
383func (u *updateIndexStatusRequest) ToProto() *configv1.UpdateIndexStatusRequest {
384 repositories := make([]*configv1.UpdateIndexStatusRequest_Repository, 0, len(u.Repositories))
385
386 for _, repo := range u.Repositories {
387 branches := make([]*configv1.ZoektRepositoryBranch, 0, len(repo.Branches))
388
389 for _, branch := range repo.Branches {
390 branches = append(branches, &configv1.ZoektRepositoryBranch{
391 Name: branch.Name,
392 Version: branch.Version,
393 })
394 }
395
396 repositories = append(repositories, &configv1.UpdateIndexStatusRequest_Repository{
397 RepoId: repo.RepoID,
398 Branches: branches,
399 IndexTimeUnix: repo.IndexTimeUnix,
400 State: repo.State,
401 FailureMessage: repo.FailureMessage,
402 })
403 }
404
405 return &configv1.UpdateIndexStatusRequest{
406 Repositories: repositories,
407 }
408}
409
410func (u *updateIndexStatusRequest) FromProto(x *configv1.UpdateIndexStatusRequest) {
411 protoRepositories := x.GetRepositories()
412 repositories := make([]indexStatus, 0, len(protoRepositories))
413
414 for _, repo := range x.GetRepositories() {
415 protoBranches := repo.GetBranches()
416 branches := make([]zoekt.RepositoryBranch, 0, len(protoBranches))
417
418 for _, branch := range repo.GetBranches() {
419 branches = append(branches, zoekt.RepositoryBranch{
420 Name: branch.GetName(),
421 Version: branch.GetVersion(),
422 })
423 }
424
425 repositories = append(repositories, indexStatus{
426 RepoID: repo.GetRepoId(),
427 Branches: branches,
428 IndexTimeUnix: repo.GetIndexTimeUnix(),
429 State: repo.GetState(),
430 FailureMessage: repo.GetFailureMessage(),
431 })
432 }
433
434 *u = updateIndexStatusRequest{
435 Repositories: repositories,
436 }
437}
438
439// UpdateIndexStatus sends a request to Sourcegraph to confirm that the given
440// repositories have been indexed.
441func (s *sourcegraphClient) UpdateIndexStatus(repositories []indexStatus) error {
442 r := updateIndexStatusRequest{Repositories: repositories}
443
444 request := r.ToProto()
445 _, err := s.grpcClient.UpdateIndexStatus(context.Background(), request)
446 if err != nil {
447 return fmt.Errorf("failed to update index status: %w", err)
448 }
449
450 return nil
451}
452
453type sourcegraphFake struct {
454 RootDir string
455 Log *log.Logger
456}
457
458func (sf sourcegraphFake) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
459 repos, err := sf.ListRepoIDs(ctx, indexed)
460 if err != nil {
461 return nil, err
462 }
463
464 iterate := func(f func(IndexOptions)) {
465 opts, err := sf.GetIndexOptions(repos...)
466 if err != nil {
467 sf.Log.Printf("WARN: ignoring GetIndexOptions error: %v", err)
468 }
469 for _, opt := range opts {
470 if opt.Error != "" {
471 sf.Log.Printf("WARN: ignoring GetIndexOptions error for %s: %v", opt.Name, opt.Error)
472 continue
473 }
474 f(opt.IndexOptions)
475 }
476 }
477
478 return &SourcegraphListResult{
479 IDs: repos,
480 IterateIndexOptions: iterate,
481 }, nil
482}
483
484func (sf sourcegraphFake) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
485 opts, err := sf.GetIndexOptions(repos...)
486 if err != nil {
487 for _, id := range repos {
488 onError(id, err)
489 }
490 return
491 }
492 for _, o := range opts {
493 if o.RepoID > 0 && o.Error != "" {
494 onError(o.RepoID, errors.New(o.Error))
495 }
496 if o.Error == "" {
497 onSuccess(o.IndexOptions)
498 }
499 }
500}
501
502func (sf sourcegraphFake) GetIndexOptions(repos ...uint32) ([]indexOptionsItem, error) {
503 reposIdx := map[uint32]int{}
504 for i, id := range repos {
505 reposIdx[id] = i
506 }
507
508 items := make([]indexOptionsItem, len(repos))
509 err := sf.visitRepos(func(name string) {
510 idx, ok := reposIdx[sf.id(name)]
511 if !ok {
512 return
513 }
514 opts, err := sf.getIndexOptions(name)
515 if err != nil {
516 items[idx] = indexOptionsItem{Error: err.Error()}
517 } else {
518 items[idx] = indexOptionsItem{IndexOptions: opts}
519 }
520 })
521 if err != nil {
522 return nil, err
523 }
524
525 for i := range items {
526 if items[i].Error == "" && items[i].RepoID == 0 {
527 items[i].Error = "not found"
528 }
529 }
530
531 return items, nil
532}
533
534func (sf sourcegraphFake) getIndexOptions(name string) (IndexOptions, error) {
535 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name))
536 exists := func(p string) bool {
537 _, err := os.Stat(filepath.Join(dir, p))
538 return err == nil
539 }
540 float := func(p string) float64 {
541 b, _ := os.ReadFile(filepath.Join(dir, p))
542 f, _ := strconv.ParseFloat(string(bytes.TrimSpace(b)), 64)
543 return f
544 }
545
546 opts := IndexOptions{
547 RepoID: sf.id(name),
548 Name: name,
549 CloneURL: sf.getCloneURL(name),
550 Symbols: true,
551
552 Public: !exists("SG_PRIVATE"),
553 Fork: exists("SG_FORK"),
554 Archived: exists("SG_ARCHIVED"),
555
556 Priority: float("SG_PRIORITY"),
557 }
558
559 branches, err := sf.getBranches(name)
560 if err != nil {
561 return opts, err
562 }
563 opts.Branches = branches
564
565 return opts, nil
566}
567
568func (sf sourcegraphFake) getBranches(name string) ([]zoekt.RepositoryBranch, error) {
569 dir := filepath.Join(sf.RootDir, filepath.FromSlash(name))
570 repo, err := git.PlainOpen(dir)
571 if err != nil {
572 return nil, err
573 }
574
575 cfg, err := repo.Config()
576 if err != nil {
577 return nil, err
578 }
579
580 sec := cfg.Raw.Section("zoekt")
581 branches := sec.Options.GetAll("branch")
582 if len(branches) == 0 {
583 branches = append(branches, "HEAD")
584 }
585
586 rBranches := make([]zoekt.RepositoryBranch, 0, len(branches))
587 for _, branch := range branches {
588 cmd := exec.Command("git", "rev-parse", branch)
589 cmd.Dir = dir
590 if b, err := cmd.Output(); err != nil {
591 sf.Log.Printf("WARN: Could not get branch %s/%s", name, branch)
592 } else {
593 version := string(bytes.TrimSpace(b))
594 rBranches = append(rBranches, zoekt.RepositoryBranch{
595 Name: branch,
596 Version: version,
597 })
598 }
599 }
600
601 if len(rBranches) == 0 {
602 return nil, fmt.Errorf("WARN: Could not get any branch revisions for repo %s", name)
603 }
604
605 return rBranches, nil
606}
607
608func (sf sourcegraphFake) id(name string) uint32 {
609 // allow overriding the ID.
610 idPath := filepath.Join(sf.RootDir, filepath.FromSlash(name), "SG_ID")
611 if b, _ := os.ReadFile(idPath); len(b) > 0 {
612 id, err := strconv.Atoi(strings.TrimSpace(string(b)))
613 if err == nil {
614 return uint32(id)
615 }
616 }
617 return fakeID(name)
618}
619
620func (sf sourcegraphFake) getCloneURL(name string) string {
621 return filepath.Join(sf.RootDir, filepath.FromSlash(name))
622}
623
624func (sf sourcegraphFake) ListRepoIDs(ctx context.Context, indexed []uint32) ([]uint32, error) {
625 var repos []uint32
626 err := sf.visitRepos(func(name string) {
627 repos = append(repos, sf.id(name))
628 })
629 return repos, err
630}
631
632func (sf sourcegraphFake) visitRepos(visit func(name string)) error {
633 return filepath.Walk(sf.RootDir, func(path string, fi os.FileInfo, fileErr error) error {
634 if fileErr != nil {
635 sf.Log.Printf("WARN: ignoring error searching %s: %v", path, fileErr)
636 return nil
637 }
638 if !fi.IsDir() {
639 return nil
640 }
641
642 gitdir := filepath.Join(path, ".git")
643 if fi, err := os.Stat(gitdir); err != nil || !fi.IsDir() {
644 return nil
645 }
646
647 subpath, err := filepath.Rel(sf.RootDir, path)
648 if err != nil {
649 // According to WalkFunc docs, path is always filepath.Join(root,
650 // subpath). So Rel should always work.
651 return fmt.Errorf("filepath.Walk returned %s which is not relative to %s: %w", path, sf.RootDir, err)
652 }
653
654 name := filepath.ToSlash(subpath)
655 visit(name)
656
657 return filepath.SkipDir
658 })
659}
660
661func (s sourcegraphFake) UpdateIndexStatus(repositories []indexStatus) error {
662 // noop
663 return nil
664}
665
666// fakeID returns a deterministic ID based on name. Used for fakes and tests.
667func fakeID(name string) uint32 {
668 // magic at the end is to ensure we get a positive number when casting.
669 return uint32(crc32.ChecksumIEEE([]byte(name))%(1<<31-1) + 1)
670}
671
672type sourcegraphNop struct{}
673
674func (s sourcegraphNop) List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error) {
675 return nil, nil
676}
677
678func (s sourcegraphNop) ForceIterateIndexOptions(onSuccess func(IndexOptions), onError func(uint32, error), repos ...uint32) {
679}
680
681func (s sourcegraphNop) UpdateIndexStatus(repositories []indexStatus) error {
682 return nil
683}