mirror your GitHub repos to tangled.org automatically
1import { PassThrough, Readable } from 'node:stream'
2import { Client } from 'ssh2'
3import { classifyNgReason, classifySshStderr, WireError } from './errors'
4import { encodePktLine, flushPkt, lineToString, PktLineReader } from './pkt-line'
5import { type Advertisement, parseAdvertisement } from './refs'
6
7const AGENT = 'synchub.to'
8const STDERR_CAP = 16 * 1024
9/**
10 * Whole-session budget. receive-pack blocks indefinitely waiting for commands,
11 * so without this a knot that accepts the connection but stalls mid-protocol
12 * would hang a worker until its job lease expires. On expiry we SIGKILL the
13 * child; the in-flight read sees the stream end and throws, surfacing as a
14 * transient failure the queue retries.
15 */
16const SESSION_TIMEOUT_MS = 120_000
17
18export interface RefUpdate {
19 ref: string
20 /** Current value on the knot, or the zero SHA to create. The compare-and-swap. */
21 old: string
22 /** New value, or the zero SHA to delete. */
23 next: string
24}
25
26/**
27 * A process exposing `git-receive-pack`'s stdio. The default factory runs an
28 * in-process `ssh2` channel to the knot; tests inject a factory that spawns the
29 * binary against a local bare repo, so the stdio protocol is identical either
30 * way.
31 */
32export interface ReceivePackProcess {
33 stdin: NodeJS.WritableStream
34 stdout: AsyncIterable<Buffer>
35 /** Last bytes of stderr, for diagnostics (we don't request side-band). */
36 stderr(): string
37 kill(): void
38 /** Resolves with the exit code once the process ends. */
39 done: Promise<number | null>
40}
41
42export type ReceivePackFactory = () => ReceivePackProcess
43
44export interface SshTarget {
45 host: string
46 port?: number
47 repoPath: string
48 /** Decrypted OpenSSH-format private key for this install. */
49 privateKey: string
50}
51
52/**
53 * Default transport: open an in-process `ssh2` connection to the knot and run
54 * its `git-receive-pack`. No `ssh` binary (the Vercel runtime has none); the
55 * key stays in memory.
56 *
57 * Host keys: tangled knots are addressed by hostname over TLS-fronted DNS, and
58 * v1 has no pinned host keys, so `hostVerifier` accepts any (TOFU-equivalent to
59 * the previous `StrictHostKeyChecking=accept-new`). Pin once the canonical knot
60 * keys are known.
61 */
62export function ssh2ReceivePackFactory(target: SshTarget): ReceivePackFactory {
63 return () => {
64 // The knot resolves repos by the leading-slash path, single-quoted.
65 const remoteCmd = `git-receive-pack '${target.repoPath}'`
66 const client = new Client()
67
68 const stdin = new PassThrough()
69 const stdout = new PassThrough()
70 let stderrBuf = Buffer.alloc(0)
71 let connError: Error | null = null
72 let killed = false
73
74 const appendStderr = (chunk: Buffer) => {
75 stderrBuf = Buffer.concat([stderrBuf, chunk]).subarray(-STDERR_CAP)
76 }
77
78 const done = new Promise<number | null>(resolve => {
79 let settled = false
80 const settle = (code: number | null) => {
81 if (settled) return
82 settled = true
83 resolve(code)
84 }
85
86 client.on('ready', () => {
87 client.exec(remoteCmd, (err, channel) => {
88 if (err) {
89 connError = err
90 stdout.end()
91 client.end()
92 settle(null)
93 return
94 }
95 stdin.pipe(channel)
96 channel.pipe(stdout)
97 channel.stderr.on('data', appendStderr)
98 channel.on('exit', code => settle(typeof code === 'number' ? code : null))
99 channel.on('close', () => { client.end(); stdout.end() })
100 })
101 })
102
103 // A connection / auth failure surfaces here. Capturing it (rather than
104 // leaving 'error' unhandled, which crashes the process) folds the message
105 // into the stderr band so open() reports a WireError the job handler
106 // catches. Ending stdout unblocks the advertisement read.
107 client.on('error', err => {
108 if (!killed) connError = err
109 stdout.end()
110 settle(null)
111 })
112
113 // Always-fires backstop: `client.end()` (from kill(), a channel close, or
114 // an exec error) emits 'close', so `done` resolves even if the channel
115 // already exited and won't emit another event.
116 client.on('close', () => {
117 stdout.end()
118 settle(null)
119 })
120 })
121
122 // stdin EPIPE-style errors once the channel goes away are expected.
123 stdin.on('error', () => {})
124
125 client.connect({
126 host: target.host,
127 port: target.port ?? 22,
128 username: 'git',
129 privateKey: target.privateKey,
130 readyTimeout: 15_000,
131 hostVerifier: () => true,
132 })
133
134 return {
135 stdin,
136 stdout,
137 stderr: () => {
138 const captured = stderrBuf.toString('utf8')
139 if (connError) return `${captured}${captured ? '\n' : ''}ssh error: ${connError.message}`.trim()
140 return captured
141 },
142 kill: () => {
143 killed = true
144 client.end()
145 stdout.end()
146 },
147 done,
148 }
149 }
150}
151
152/**
153 * An open receive-pack session. Read `tips` after construction to learn the
154 * knot's current refs (needed as haves and as the compare-and-swap base),
155 * then call `push` once with the commands and packfile.
156 */
157export class ReceivePackSession {
158 readonly tips: Map<string, string>
159 readonly capabilities: Set<string>
160
161 private readonly watchdog: NodeJS.Timeout
162
163 private constructor(
164 private readonly proc: ReceivePackProcess,
165 private readonly reader: PktLineReader,
166 adv: Advertisement,
167 watchdog: NodeJS.Timeout,
168 ) {
169 this.tips = adv.refs
170 this.capabilities = adv.capabilities
171 this.watchdog = watchdog
172 }
173
174 /** Open the session and read the advertisement. */
175 static async open(factory: ReceivePackFactory, timeoutMs = SESSION_TIMEOUT_MS): Promise<ReceivePackSession> {
176 const proc = factory()
177 const watchdog = setTimeout(() => proc.kill(), timeoutMs)
178 try {
179 const reader = new PktLineReader(proc.stdout)
180 const advLines = await reader.readUntilFlush()
181 if (advLines === null) {
182 const err = classifySshStderr(proc.stderr())
183 throw err ?? new WireError(`receive-pack: no advertisement (stderr: ${proc.stderr().trim() || 'empty'})`)
184 }
185 const adv = parseAdvertisement(advLines)
186 assertCapabilities(adv)
187 return new ReceivePackSession(proc, reader, adv, watchdog)
188 }
189 catch (err) {
190 clearTimeout(watchdog)
191 proc.kill()
192 await proc.done.catch(() => null)
193 throw err
194 }
195 }
196
197 /**
198 * Send the ref update commands plus (for non-deletions) the packfile, then
199 * read and validate report-status. The packfile is streamed straight from
200 * `packStream` into stdin; nothing is buffered. Pass `null` for pure
201 * deletions.
202 */
203 async push(updates: RefUpdate[], packStream: AsyncIterable<Buffer> | null): Promise<void> {
204 if (updates.length === 0) throw new WireError('receive-pack: no updates')
205 try {
206 await writeAll(this.proc.stdin, buildCommandList(updates))
207 if (packStream) await pipePack(this.proc.stdin, packStream)
208 else this.proc.stdin.end()
209
210 const report = await this.reader.readUntilFlush()
211 parseReportStatus(report ?? [], updates, this.proc.stderr())
212 }
213 finally {
214 clearTimeout(this.watchdog)
215 this.proc.kill()
216 await this.proc.done.catch(() => null)
217 }
218 }
219
220 /** Close the session without pushing (advertisement-only use). */
221 async close(): Promise<void> {
222 clearTimeout(this.watchdog)
223 this.proc.stdin.end()
224 this.proc.kill()
225 await this.proc.done.catch(() => null)
226 }
227}
228
229function assertCapabilities(adv: Advertisement): void {
230 if (!adv.capabilities.has('report-status')) {
231 throw new WireError('knot receive-pack does not advertise report-status')
232 }
233}
234
235function buildCommandList(updates: RefUpdate[]): Buffer {
236 const parts: Buffer[] = []
237 updates.forEach((u, i) => {
238 const caps = i === 0 ? `\0report-status agent=${AGENT}/1` : ''
239 parts.push(encodePktLine(`${u.old} ${u.next} ${u.ref}${caps}\n`))
240 })
241 parts.push(flushPkt)
242 return Buffer.concat(parts)
243}
244
245function parseReportStatus(lines: Buffer[], updates: RefUpdate[], stderr: string): void {
246 if (lines.length === 0) {
247 const err = classifySshStderr(stderr)
248 throw err ?? new WireError(`receive-pack: empty report-status (stderr: ${stderr.trim() || 'empty'})`)
249 }
250 const unpack = lineToString(lines[0]!)
251 if (unpack !== 'unpack ok') {
252 throw new WireError(`receive-pack: ${unpack}`)
253 }
254 for (const raw of lines.slice(1)) {
255 const line = lineToString(raw)
256 if (line.startsWith('ng ')) {
257 // `ng <ref> <reason>`
258 const rest = line.slice(3)
259 const sp = rest.indexOf(' ')
260 const reason = sp === -1 ? rest : rest.slice(sp + 1)
261 throw classifyNgReason(reason)
262 }
263 }
264}
265
266async function writeAll(stream: NodeJS.WritableStream, data: Buffer): Promise<void> {
267 await new Promise<void>((resolve, reject) => {
268 stream.write(data, err => (err ? reject(err) : resolve()))
269 })
270}
271
272async function pipePack(stdin: NodeJS.WritableStream, packStream: AsyncIterable<Buffer>): Promise<void> {
273 const src = Readable.from(packStream)
274 await new Promise<void>((resolve, reject) => {
275 src.on('error', reject)
276 stdin.on('error', reject)
277 src.pipe(stdin, { end: true })
278 stdin.on('finish', resolve)
279 stdin.on('close', resolve)
280 })
281}