fork of https://github.com/sourcegraph/zoekt
0

Configure Feed

Select the types of activity you want to include in your feed.

zoekt-archive-index: Enforce flowrate on the socket (#35)

flowrate is a Sourcegraph specific feature to limit the amount of traffic we
generate on the networks. The way it is implemented is it keeps conflicting
with upstream zoekt merges. This rewrites it so we instead enforce it in the
DefaultTransport. This should more reliably enforce flowrate as well as
prevent difficult merge conflicts.

+82 -34
+3 -14
cmd/zoekt-archive-index/archive.go
··· 11 11 "net/url" 12 12 "os" 13 13 "strings" 14 - 15 - "github.com/mxk/go-flowrate/flowrate" 16 14 ) 17 15 18 16 type Archive interface { ··· 73 71 74 72 // openArchive opens the tar at the URL or filepath u. Also supported is tgz 75 73 // files over http. 76 - // 77 - // If non-zero, limitMbps is used to limit the download speed of archives to 78 - // the specified amount in megabits per second. 79 - func openArchive(u string, limitMbps int64) (Archive, error) { 74 + func openArchive(u string) (Archive, error) { 80 75 var ( 81 76 r io.Reader 82 77 closer io.Closer ··· 87 82 if err != nil { 88 83 return nil, err 89 84 } 90 - body := resp.Body 91 - if limitMbps != 0 { 92 - const megabit = 1000 * 1000 93 - body = flowrate.NewReader(body, (limitMbps*megabit)/8) 94 - } 95 - 96 85 if resp.StatusCode < 200 || resp.StatusCode >= 300 { 97 - b, err := ioutil.ReadAll(io.LimitReader(body, 1024)) 86 + b, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024)) 98 87 resp.Body.Close() 99 88 if err != nil { 100 89 return nil, err ··· 106 95 } 107 96 } 108 97 closer = resp.Body 109 - r = body 98 + r = resp.Body 110 99 } else if u == "-" { 111 100 r = os.Stdin 112 101 } else {
+57
cmd/zoekt-archive-index/flowrate.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "io" 6 + "net" 7 + "net/http" 8 + 9 + "github.com/mxk/go-flowrate/flowrate" 10 + ) 11 + 12 + type connReadWriter struct { 13 + net.Conn 14 + 15 + Reader io.Reader 16 + Writer io.Writer 17 + } 18 + 19 + func (c *connReadWriter) Read(b []byte) (int, error) { 20 + return c.Reader.Read(b) 21 + } 22 + 23 + func (c *connReadWriter) Write(b []byte) (int, error) { 24 + return c.Writer.Write(b) 25 + } 26 + 27 + type dial func(ctx context.Context, network, addr string) (net.Conn, error) 28 + 29 + func limitDial(d dial, limit int64) dial { 30 + if limit <= 0 { 31 + return d 32 + } 33 + 34 + return func(ctx context.Context, network, addr string) (net.Conn, error) { 35 + conn, err := d(ctx, network, addr) 36 + if err != nil { 37 + return nil, err 38 + } 39 + return &connReadWriter{ 40 + Conn: conn, 41 + Reader: flowrate.NewReader(conn, limit), 42 + Writer: flowrate.NewWriter(conn, limit), 43 + }, nil 44 + } 45 + } 46 + 47 + func limitHTTPDefaultClient(limitMbps int64) { 48 + if limitMbps <= 0 { 49 + return 50 + } 51 + 52 + const megabit = 1000 * 1000 53 + limit := (limitMbps * megabit) / 8 54 + 55 + t := http.DefaultTransport.(*http.Transport) 56 + t.DialContext = limitDial(t.DialContext, limit) 57 + }
+22 -20
cmd/zoekt-archive-index/main.go
··· 58 58 type Options struct { 59 59 Incremental bool 60 60 61 - Archive string 62 - DownloadLimitMbps int64 63 - Name string 64 - RepoURL string 65 - Branch string 66 - Commit string 67 - Strip int 61 + Archive string 62 + Name string 63 + RepoURL string 64 + Branch string 65 + Commit string 66 + Strip int 68 67 } 69 68 70 69 func (o *Options) SetDefaults() { ··· 151 150 } 152 151 } 153 152 154 - a, err := openArchive(opts.Archive, opts.DownloadLimitMbps) 153 + a, err := openArchive(opts.Archive) 155 154 if err != nil { 156 155 return err 157 156 } ··· 203 202 var ( 204 203 incremental = flag.Bool("incremental", true, "only index changed repositories") 205 204 206 - name = flag.String("name", "", "The repository name for the archive") 207 - urlRaw = flag.String("url", "", "The repository URL for the archive") 208 - branch = flag.String("branch", "", "The branch name for the archive") 209 - commit = flag.String("commit", "", "The commit sha for the archive. If incremental this will avoid updating shards already at commit") 210 - strip = flag.Int("strip_components", 0, "Remove the specified number of leading path elements. Pathnames with fewer elements will be silently skipped.") 205 + name = flag.String("name", "", "The repository name for the archive") 206 + urlRaw = flag.String("url", "", "The repository URL for the archive") 207 + branch = flag.String("branch", "", "The branch name for the archive") 208 + commit = flag.String("commit", "", "The commit sha for the archive. If incremental this will avoid updating shards already at commit") 209 + strip = flag.Int("strip_components", 0, "Remove the specified number of leading path elements. Pathnames with fewer elements will be silently skipped.") 210 + 211 211 downloadLimitMbps = flag.Int64("download-limit-mbps", 0, "If non-zero, limit archive downloads to specified amount in megabits per second") 212 212 ) 213 213 flag.Parse() ··· 225 225 opts := Options{ 226 226 Incremental: *incremental, 227 227 228 - Archive: archive, 229 - DownloadLimitMbps: *downloadLimitMbps, 230 - Name: *name, 231 - RepoURL: *urlRaw, 232 - Branch: *branch, 233 - Commit: *commit, 234 - Strip: *strip, 228 + Archive: archive, 229 + Name: *name, 230 + RepoURL: *urlRaw, 231 + Branch: *branch, 232 + Commit: *commit, 233 + Strip: *strip, 235 234 } 235 + 236 + // Sourcegraph specific: Limit HTTP traffic 237 + limitHTTPDefaultClient(*downloadLimitMbps) 236 238 237 239 if err := do(opts, *bopts); err != nil { 238 240 log.Fatal(err)