fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

indexserver: move sourcegraph api functions into own struct (#125)

This PR is refactoring the interactions with the Sourcegraph API into its own
struct. The intention is to add a mode to indexserver based on the local
filesystem rather than just the Sourcegraph API. The benefit of that is it
avoids needing to run the whole Sourcegraph environment to test manually test
indexserver. This is so we can have a shorter feedback loop when making
changes to indexserver -> move towards integration tests.

+219 -182
+3 -51
cmd/zoekt-sourcegraph-indexserver/index.go
··· 4 4 "bytes" 5 5 "context" 6 6 "crypto/sha1" 7 - "encoding/json" 8 7 "errors" 9 8 "fmt" 10 9 "io" 11 - "io/ioutil" 12 10 "log" 13 - "net/http" 14 11 "net/url" 15 12 "os" 16 13 "os/exec" 17 - "path" 18 14 "path/filepath" 19 15 "sort" 20 16 "strconv" ··· 59 55 type indexArgs struct { 60 56 IndexOptions 61 57 62 - // Root is the base URL for the Sourcegraph instance to index. Normally 63 - // http://sourcegraph-frontend-internal or http://localhost:3090. 64 - Root *url.URL 58 + // CloneURL is the remote git URL of the repository for cloning. 59 + CloneURL string 65 60 66 61 // Name is the name of the repository. 67 62 Name string ··· 131 126 return s 132 127 } 133 128 134 - // indexOptionsItem wraps IndexOptions to also include an error returned by 135 - // the API. 136 - type indexOptionsItem struct { 137 - IndexOptions 138 - Error string 139 - } 140 - 141 - func getIndexOptions(root *url.URL, repos ...string) ([]indexOptionsItem, error) { 142 - u := root.ResolveReference(&url.URL{ 143 - Path: "/.internal/search/configuration", 144 - }) 145 - 146 - resp, err := client.PostForm(u.String(), url.Values{"repo": repos}) 147 - if err != nil { 148 - return nil, err 149 - } 150 - defer resp.Body.Close() 151 - 152 - if resp.StatusCode != http.StatusOK { 153 - b, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024)) 154 - _ = resp.Body.Close() 155 - if err != nil { 156 - return nil, err 157 - } 158 - return nil, &url.Error{ 159 - Op: "Get", 160 - URL: u.String(), 161 - Err: fmt.Errorf("%s: %s", resp.Status, string(b)), 162 - } 163 - } 164 - 165 - opts := make([]indexOptionsItem, len(repos)) 166 - dec := json.NewDecoder(resp.Body) 167 - for i := range opts { 168 - if err := dec.Decode(&opts[i]); err != nil { 169 - return nil, fmt.Errorf("error decoding body: %w", err) 170 - } 171 - } 172 - 173 - return opts, nil 174 - } 175 - 176 129 func gitIndex(o *indexArgs, runCmd func(*exec.Cmd) error) error { 177 130 if len(o.Branches) == 0 { 178 131 return errors.New("zoekt-git-index requires 1 or more branches") ··· 210 163 // We shallow fetch each commit specified in zoekt.Branches. This requires 211 164 // the server to have configured both uploadpack.allowAnySHA1InWant and 212 165 // uploadpack.allowFilter. (See gitservice.go in the Sourcegraph repository) 213 - cloneURL := o.Root.ResolveReference(&url.URL{Path: path.Join("/.internal/git", o.Name)}).String() 214 - fetchArgs := []string{"-C", gitDir, "-c", "protocol.version=2", "fetch", "--depth=1", cloneURL} 166 + fetchArgs := []string{"-C", gitDir, "-c", "protocol.version=2", "fetch", "--depth=1", o.CloneURL} 215 167 for _, b := range o.Branches { 216 168 fetchArgs = append(fetchArgs, b.Version) 217 169 }
+12 -11
cmd/zoekt-sourcegraph-indexserver/index_test.go
··· 15 15 "github.com/google/go-cmp/cmp" 16 16 "github.com/google/go-cmp/cmp/cmpopts" 17 17 "github.com/google/zoekt" 18 + "github.com/hashicorp/go-retryablehttp" 18 19 ) 19 20 20 21 func TestGetIndexOptions(t *testing.T) { ··· 39 40 u, err := url.Parse(server.URL) 40 41 if err != nil { 41 42 t.Fatal(err) 43 + } 44 + 45 + sg := &Sourcegraph{ 46 + Root: u, 47 + Client: retryablehttp.NewClient(), 42 48 } 43 49 44 50 cases := map[string]*IndexOptions{ ··· 67 73 for r, want := range cases { 68 74 response = []byte(r) 69 75 70 - got, err := getIndexOptions(u, "test/repo") 76 + got, err := sg.GetIndexOptions("test/repo") 71 77 if err != nil && want != nil { 72 78 t.Fatalf("unexpected error: %v", err) 73 79 } ··· 83 89 } 84 90 85 91 func TestIndex(t *testing.T) { 86 - root, err := url.Parse("http://api.test") 87 - if err != nil { 88 - t.Fatal(err) 89 - } 90 - 91 92 cases := []struct { 92 93 name string 93 94 args indexArgs ··· 95 96 }{{ 96 97 name: "minimal", 97 98 args: indexArgs{ 98 - Root: root, 99 - Name: "test/repo", 99 + CloneURL: "http://api.test/.internal/git/test/repo", 100 + Name: "test/repo", 100 101 IndexOptions: IndexOptions{ 101 102 Branches: []zoekt.RepositoryBranch{{Name: "HEAD", Version: "deadbeef"}}, 102 103 }, ··· 116 117 }, { 117 118 name: "minimal-id", 118 119 args: indexArgs{ 119 - Root: root, 120 - Name: "test/repo", 120 + CloneURL: "http://api.test/.internal/git/test/repo", 121 + Name: "test/repo", 121 122 IndexOptions: IndexOptions{ 122 123 Branches: []zoekt.RepositoryBranch{{Name: "HEAD", Version: "deadbeef"}}, 123 124 RepoID: 123, ··· 138 139 }, { 139 140 name: "all", 140 141 args: indexArgs{ 141 - Root: root, 142 + CloneURL: "http://api.test/.internal/git/test/repo", 142 143 Name: "test/repo", 143 144 Incremental: true, 144 145 IndexDir: "/data/index",
+22 -117
cmd/zoekt-sourcegraph-indexserver/main.go
··· 5 5 import ( 6 6 "bytes" 7 7 "context" 8 - "encoding/json" 9 8 "errors" 10 9 "flag" 11 10 "fmt" 12 11 "html/template" 13 - "io" 14 12 "io/ioutil" 15 13 "log" 16 14 "math" ··· 30 28 "cloud.google.com/go/profiler" 31 29 "github.com/google/zoekt" 32 30 "github.com/google/zoekt/debugserver" 31 + "github.com/hashicorp/go-retryablehttp" 33 32 "go.uber.org/automaxprocs/maxprocs" 34 33 "golang.org/x/net/trace" 35 34 36 35 "github.com/google/zoekt/build" 37 - retryablehttp "github.com/hashicorp/go-retryablehttp" 38 36 "github.com/keegancsmith/tmpfriend" 39 37 "github.com/prometheus/client_golang/prometheus" 40 38 "github.com/prometheus/client_golang/prometheus/promauto" ··· 108 106 // Server is the main functionality of zoekt-sourcegraph-indexserver. It 109 107 // exists to conveniently use all the options passed in via func main. 110 108 type Server struct { 111 - // Root is the base URL for the Sourcegraph instance to index. Normally 112 - // http://sourcegraph-frontend-internal or http://localhost:3090. 113 - Root *url.URL 109 + Sourcegraph *Sourcegraph 114 110 115 111 // IndexDir is the index directory to use. 116 112 IndexDir string ··· 118 114 // Interval is how often we sync with Sourcegraph. 119 115 Interval time.Duration 120 116 121 - // Hostname is the name we advertise to Sourcegraph when asking for the 122 - // list of repositories to index. 123 - Hostname string 124 - 125 117 // CPUCount is the amount of parallelism to use when indexing a 126 118 // repository. 127 119 CPUCount int ··· 130 122 lastListRepos []string 131 123 } 132 124 133 - var client = retryablehttp.NewClient() 134 125 var debug = log.New(ioutil.Discard, "", log.LstdFlags) 135 - 136 - func init() { 137 - client.Logger = debug 138 - } 139 126 140 127 // our index commands should output something every 100mb they process. 141 128 // ··· 254 241 // Run the sync loop. This blocks forever. 255 242 func (s *Server) Run(queue *Queue) { 256 243 removeIncompleteShards(s.IndexDir) 257 - waitForFrontend(s.Root) 244 + s.Sourcegraph.WaitForFrontend() 258 245 259 246 // Start a goroutine which updates the queue with commits to index. 260 247 go func() { 261 248 for range jitterTicker(s.Interval) { 262 - repos, err := listRepos(context.Background(), s.Hostname, s.Root, listIndexed(s.IndexDir)) 249 + repos, err := s.Sourcegraph.ListRepos(context.Background(), listIndexed(s.IndexDir)) 263 250 if err != nil { 264 251 log.Println(err) 265 252 continue ··· 290 277 // We ask the frontend to get index options in batches. 291 278 for repos := range batched(repos, 1000) { 292 279 start := time.Now() 293 - opts, err := getIndexOptions(s.Root, repos...) 280 + opts, err := s.Sourcegraph.GetIndexOptions(repos...) 294 281 if err != nil { 295 282 metricResolveRevisionDuration.WithLabelValues("false").Observe(time.Since(start).Seconds()) 296 283 tr.LazyPrintf("failed fetching options batch: %v", err) ··· 325 312 continue 326 313 } 327 314 start := time.Now() 328 - args := s.defaultArgs() 329 - args.Name = name 330 - args.IndexOptions = opts 315 + args := s.indexArgs(name, opts) 331 316 state, err := s.Index(args) 332 317 metricIndexDuration.WithLabelValues(string(state)).Observe(time.Since(start).Seconds()) 333 318 if err != nil { ··· 427 412 return indexStateSuccess, gitIndex(args, runCmd) 428 413 } 429 414 430 - func (s *Server) defaultArgs() *indexArgs { 415 + func (s *Server) indexArgs(name string, opts IndexOptions) *indexArgs { 431 416 return &indexArgs{ 432 - Root: s.Root, 417 + Name: name, 418 + CloneURL: s.Sourcegraph.GetCloneURL(name), 419 + IndexOptions: opts, 420 + 433 421 IndexDir: s.IndexDir, 434 422 Parallelism: s.CPUCount, 435 423 ··· 522 510 return 523 511 } 524 512 debug.Printf("enqueueRepoForIndex called with repo: %q", name) 525 - opts, err := getIndexOptions(s.Root, name) 513 + opts, err := s.Sourcegraph.GetIndexOptions(name) 526 514 if err != nil || opts[0].Error != "" { 527 515 http.Error(rw, "fetching index options", http.StatusInternalServerError) 528 516 return ··· 534 522 // forceIndex will run the index job for repo name now. It will return always 535 523 // return a string explaining what it did, even if it failed. 536 524 func (s *Server) forceIndex(name string) (string, error) { 537 - opts, err := getIndexOptions(s.Root, name) 525 + opts, err := s.Sourcegraph.GetIndexOptions(name) 538 526 if err != nil { 539 527 return fmt.Sprintf("Indexing %s failed: %v", name, err), err 540 528 } ··· 542 530 return fmt.Sprintf("Indexing %s failed: %s", name, errS), errors.New(errS) 543 531 } 544 532 545 - args := s.defaultArgs() 546 - args.Name = name 547 - args.IndexOptions = opts[0].IndexOptions 533 + args := s.indexArgs(name, opts[0].IndexOptions) 548 534 args.Incremental = false // force re-index 549 535 state, err := s.Index(args) 550 536 if err != nil { ··· 569 555 return repoNames 570 556 } 571 557 572 - func listRepos(ctx context.Context, hostname string, root *url.URL, indexed []string) ([]string, error) { 573 - body, err := json.Marshal(&struct { 574 - Hostname string 575 - Indexed []string 576 - }{ 577 - Hostname: hostname, 578 - Indexed: indexed, 579 - }) 580 - if err != nil { 581 - return nil, err 582 - } 583 - 584 - u := root.ResolveReference(&url.URL{Path: "/.internal/repos/index"}) 585 - resp, err := client.Post(u.String(), "application/json; charset=utf8", bytes.NewReader(body)) 586 - if err != nil { 587 - return nil, err 588 - } 589 - defer resp.Body.Close() 590 - 591 - if resp.StatusCode != http.StatusOK { 592 - return nil, fmt.Errorf("failed to list repositories: status %s", resp.Status) 593 - } 594 - 595 - var data struct { 596 - RepoNames []string 597 - } 598 - err = json.NewDecoder(resp.Body).Decode(&data) 599 - if err != nil { 600 - return nil, err 601 - } 602 - 603 - countsByHost := make(map[string]int) 604 - for _, name := range data.RepoNames { 605 - codeHost := codeHostFromName(name) 606 - countsByHost[codeHost] += 1 607 - } 608 - for codeHost, count := range countsByHost { 609 - metricNumAssigned.WithLabelValues(codeHost).Set(float64(count)) 610 - } 611 - return data.RepoNames, nil 612 - } 613 - 614 - func ping(root *url.URL) error { 615 - u := root.ResolveReference(&url.URL{Path: "/.internal/ping", RawQuery: "service=gitserver"}) 616 - resp, err := client.Get(u.String()) 617 - if err != nil { 618 - return err 619 - } 620 - 621 - defer resp.Body.Close() 622 - body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024)) 623 - if err != nil { 624 - return err 625 - } 626 - if resp.StatusCode != http.StatusOK { 627 - return fmt.Errorf("ping: bad HTTP response status %d: %s", resp.StatusCode, string(body)) 628 - } 629 - if !bytes.Equal(body, []byte("pong")) { 630 - return fmt.Errorf("ping: did not receive pong: %s", string(body)) 631 - } 632 - return nil 633 - } 634 - 635 - func waitForFrontend(root *url.URL) { 636 - warned := false 637 - lastWarn := time.Now() 638 - for { 639 - err := ping(root) 640 - if err == nil { 641 - break 642 - } 643 - 644 - if time.Since(lastWarn) > 15*time.Second { 645 - warned = true 646 - lastWarn = time.Now() 647 - log.Printf("frontend or gitserver API not available, will try again: %s", err) 648 - } 649 - 650 - time.Sleep(250 * time.Millisecond) 651 - } 652 - 653 - if warned { 654 - log.Println("frontend API is now reachable. Starting indexing...") 655 - } 656 - } 657 - 658 558 func hostnameBestEffort() string { 659 559 if h := os.Getenv("NODE_NAME"); h != "" { 660 560 return h ··· 771 671 if *dbg || *debugList || *debugIndex != "" || *debugShard != "" { 772 672 debug = log.New(os.Stderr, "", log.LstdFlags) 773 673 } 674 + 675 + client := retryablehttp.NewClient() 774 676 client.Logger = debug 775 677 776 678 cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (*cpuFraction))) ··· 778 680 cpuCount = 1 779 681 } 780 682 s := &Server{ 781 - Root: rootURL, 683 + Sourcegraph: &Sourcegraph{ 684 + Root: rootURL, 685 + Client: client, 686 + Hostname: *hostname, 687 + }, 782 688 IndexDir: *index, 783 689 Interval: *interval, 784 690 CPUCount: cpuCount, 785 - Hostname: *hostname, 786 691 } 787 692 788 693 if *debugList { 789 - repos, err := listRepos(context.Background(), s.Hostname, s.Root, listIndexed(s.IndexDir)) 694 + repos, err := s.Sourcegraph.ListRepos(context.Background(), listIndexed(s.IndexDir)) 790 695 if err != nil { 791 696 log.Fatal(err) 792 697 }
+20 -3
cmd/zoekt-sourcegraph-indexserver/main_test.go
··· 12 12 "time" 13 13 14 14 "github.com/google/go-cmp/cmp" 15 + "github.com/hashicorp/go-retryablehttp" 15 16 ) 16 17 17 18 func TestServer_defaultArgs(t *testing.T) { 19 + root, err := url.Parse("http://api.test") 20 + if err != nil { 21 + t.Fatal(err) 22 + } 23 + 18 24 s := &Server{ 25 + Sourcegraph: &Sourcegraph{ 26 + Root: root, 27 + }, 19 28 IndexDir: "/testdata/index", 20 29 CPUCount: 6, 21 30 } 22 31 want := &indexArgs{ 32 + Name: "testName", 33 + CloneURL: "http://api.test/.internal/git/testName", 23 34 IndexDir: "/testdata/index", 24 35 Parallelism: 6, 25 36 Incremental: true, 26 37 FileLimit: 1 << 20, 27 38 DownloadLimitMBPS: "1000", 28 39 } 29 - got := s.defaultArgs() 40 + got := s.indexArgs("testName", IndexOptions{}) 30 41 if !cmp.Equal(got, want) { 31 42 t.Errorf("mismatch (-want +got):\n%s", cmp.Diff(want, got)) 32 43 } ··· 56 67 t.Fatal(err) 57 68 } 58 69 59 - gotRepos, err := listRepos(context.Background(), "test-indexed-search-1", u, []string{"foo", "bam"}) 70 + s := &Sourcegraph{ 71 + Root: u, 72 + Hostname: "test-indexed-search-1", 73 + Client: retryablehttp.NewClient(), 74 + } 75 + 76 + gotRepos, err := s.ListRepos(context.Background(), []string{"foo", "bam"}) 60 77 if err != nil { 61 78 t.Fatal(err) 62 79 } ··· 108 125 // We expect waitForFrontend to just work now 109 126 done := make(chan struct{}) 110 127 go func() { 111 - waitForFrontend(root) 128 + (&Sourcegraph{Root: root}).WaitForFrontend() 112 129 close(done) 113 130 }() 114 131
+162
cmd/zoekt-sourcegraph-indexserver/sg.go
··· 1 + package main 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "io" 9 + "io/ioutil" 10 + "log" 11 + "net/http" 12 + "net/url" 13 + "path" 14 + "time" 15 + 16 + retryablehttp "github.com/hashicorp/go-retryablehttp" 17 + ) 18 + 19 + // Sourcegraph contains methods which interact with the Sourcegraph API. 20 + type Sourcegraph struct { 21 + // Root is the base URL for the Sourcegraph instance to index. Normally 22 + // http://sourcegraph-frontend-internal or http://localhost:3090. 23 + Root *url.URL 24 + 25 + // Hostname is the name we advertise to Sourcegraph when asking for the 26 + // list of repositories to index. 27 + Hostname string 28 + 29 + Client *retryablehttp.Client 30 + } 31 + 32 + // indexOptionsItem wraps IndexOptions to also include an error returned by 33 + // the API. 34 + type indexOptionsItem struct { 35 + IndexOptions 36 + Error string 37 + } 38 + 39 + func (s *Sourcegraph) GetIndexOptions(repos ...string) ([]indexOptionsItem, error) { 40 + u := s.Root.ResolveReference(&url.URL{ 41 + Path: "/.internal/search/configuration", 42 + }) 43 + 44 + resp, err := s.Client.PostForm(u.String(), url.Values{"repo": repos}) 45 + if err != nil { 46 + return nil, err 47 + } 48 + defer resp.Body.Close() 49 + 50 + if resp.StatusCode != http.StatusOK { 51 + b, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024)) 52 + _ = resp.Body.Close() 53 + if err != nil { 54 + return nil, err 55 + } 56 + return nil, &url.Error{ 57 + Op: "Get", 58 + URL: u.String(), 59 + Err: fmt.Errorf("%s: %s", resp.Status, string(b)), 60 + } 61 + } 62 + 63 + opts := make([]indexOptionsItem, len(repos)) 64 + dec := json.NewDecoder(resp.Body) 65 + for i := range opts { 66 + if err := dec.Decode(&opts[i]); err != nil { 67 + return nil, fmt.Errorf("error decoding body: %w", err) 68 + } 69 + } 70 + 71 + return opts, nil 72 + } 73 + 74 + func (s *Sourcegraph) GetCloneURL(name string) string { 75 + return s.Root.ResolveReference(&url.URL{Path: path.Join("/.internal/git", name)}).String() 76 + } 77 + 78 + func (s *Sourcegraph) WaitForFrontend() { 79 + warned := false 80 + lastWarn := time.Now() 81 + for { 82 + err := ping(s.Root) 83 + if err == nil { 84 + break 85 + } 86 + 87 + if time.Since(lastWarn) > 15*time.Second { 88 + warned = true 89 + lastWarn = time.Now() 90 + log.Printf("frontend or gitserver API not available, will try again: %s", err) 91 + } 92 + 93 + time.Sleep(250 * time.Millisecond) 94 + } 95 + 96 + if warned { 97 + log.Println("frontend API is now reachable. Starting indexing...") 98 + } 99 + } 100 + 101 + func (s *Sourcegraph) ListRepos(ctx context.Context, indexed []string) ([]string, error) { 102 + body, err := json.Marshal(&struct { 103 + Hostname string 104 + Indexed []string 105 + }{ 106 + Hostname: s.Hostname, 107 + Indexed: indexed, 108 + }) 109 + if err != nil { 110 + return nil, err 111 + } 112 + 113 + u := s.Root.ResolveReference(&url.URL{Path: "/.internal/repos/index"}) 114 + resp, err := s.Client.Post(u.String(), "application/json; charset=utf8", bytes.NewReader(body)) 115 + if err != nil { 116 + return nil, err 117 + } 118 + defer resp.Body.Close() 119 + 120 + if resp.StatusCode != http.StatusOK { 121 + return nil, fmt.Errorf("failed to list repositories: status %s", resp.Status) 122 + } 123 + 124 + var data struct { 125 + RepoNames []string 126 + } 127 + err = json.NewDecoder(resp.Body).Decode(&data) 128 + if err != nil { 129 + return nil, err 130 + } 131 + 132 + countsByHost := make(map[string]int) 133 + for _, name := range data.RepoNames { 134 + codeHost := codeHostFromName(name) 135 + countsByHost[codeHost] += 1 136 + } 137 + for codeHost, count := range countsByHost { 138 + metricNumAssigned.WithLabelValues(codeHost).Set(float64(count)) 139 + } 140 + return data.RepoNames, nil 141 + } 142 + 143 + func ping(root *url.URL) error { 144 + u := root.ResolveReference(&url.URL{Path: "/.internal/ping", RawQuery: "service=gitserver"}) 145 + resp, err := http.Get(u.String()) 146 + if err != nil { 147 + return err 148 + } 149 + 150 + defer resp.Body.Close() 151 + body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024)) 152 + if err != nil { 153 + return err 154 + } 155 + if resp.StatusCode != http.StatusOK { 156 + return fmt.Errorf("ping: bad HTTP response status %d: %s", resp.StatusCode, string(body)) 157 + } 158 + if !bytes.Equal(body, []byte("pong")) { 159 + return fmt.Errorf("ping: did not receive pong: %s", string(body)) 160 + } 161 + return nil 162 + }