mirror your GitHub repos to tangled.org automatically
1import { type ChildProcessWithoutNullStreams, spawn } from 'node:child_process'
2import { Readable } from 'node:stream'
3import { classifyNgReason, classifySshStderr, RemoteRejectedError, WireError } from './errors'
4import { encodePktLine, flushPkt, lineToString, PktLineReader } from './pkt-line'
5import { type Advertisement, parseAdvertisement, ZERO_SHA } from './refs'
6
7const AGENT = 'synchub.to'
8const STDERR_CAP = 16 * 1024
9/**
10 * Whole-session budget. receive-pack blocks indefinitely waiting for commands,
11 * so without this a knot that accepts the connection but stalls mid-protocol
12 * would hang a worker until its job lease expires. On expiry we SIGKILL the
13 * child; the in-flight read sees the stream end and throws, surfacing as a
14 * transient failure the queue retries.
15 */
16const SESSION_TIMEOUT_MS = 120_000
17
18export interface RefUpdate {
19 ref: string
20 /** Current value on the knot, or the zero SHA to create. The compare-and-swap. */
21 old: string
22 /** New value, or the zero SHA to delete. */
23 next: string
24}
25
26/**
27 * A spawned process exposing `git-receive-pack`'s stdio. The default factory
28 * runs ssh to the knot; tests inject a factory that spawns the binary against
29 * a local bare repo, so the stdio protocol is identical either way.
30 */
31export interface ReceivePackProcess {
32 stdin: NodeJS.WritableStream
33 stdout: AsyncIterable<Buffer>
34 /** Last bytes of stderr, for diagnostics (we don't request side-band). */
35 stderr(): string
36 kill(): void
37 /** Resolves with the exit code once the process ends. */
38 done: Promise<number | null>
39}
40
41export type ReceivePackFactory = () => ReceivePackProcess
42
43export interface SshTarget {
44 host: string
45 port?: number
46 repoPath: string
47 sshArgs: string[]
48}
49
50/** Default transport: ssh to the knot and invoke its `git-receive-pack`. */
51export function sshReceivePackFactory(target: SshTarget): ReceivePackFactory {
52 return () => {
53 const portArgs = target.port ? ['-p', String(target.port)] : []
54 // ssh:// transports invoke the remote command with the path including its
55 // leading slash, single-quoted. The knot resolves repos by that path.
56 const remoteCmd = `git-receive-pack '${target.repoPath}'`
57 const child = spawn('ssh', [...target.sshArgs, ...portArgs, `git@${target.host}`, remoteCmd], {
58 stdio: ['pipe', 'pipe', 'pipe'],
59 })
60 return wrapChild(child)
61 }
62}
63
64function wrapChild(child: ChildProcessWithoutNullStreams): ReceivePackProcess {
65 let stderrBuf = Buffer.alloc(0)
66 child.stderr.on('data', (chunk: Buffer) => {
67 stderrBuf = Buffer.concat([stderrBuf, chunk]).subarray(-STDERR_CAP)
68 })
69 const done = new Promise<number | null>(resolve => child.on('close', resolve))
70 return {
71 stdin: child.stdin,
72 stdout: child.stdout,
73 stderr: () => stderrBuf.toString('utf8'),
74 kill: () => child.kill('SIGKILL'),
75 done,
76 }
77}
78
79/**
80 * An open receive-pack session. Read `tips` after construction to learn the
81 * knot's current refs (needed as haves and as the compare-and-swap base),
82 * then call `push` once with the commands and packfile.
83 */
84export class ReceivePackSession {
85 readonly tips: Map<string, string>
86 readonly capabilities: Set<string>
87
88 private readonly watchdog: NodeJS.Timeout
89
90 private constructor(
91 private readonly proc: ReceivePackProcess,
92 private readonly reader: PktLineReader,
93 adv: Advertisement,
94 watchdog: NodeJS.Timeout,
95 ) {
96 this.tips = adv.refs
97 this.capabilities = adv.capabilities
98 this.watchdog = watchdog
99 }
100
101 /** Open the session and read the advertisement. */
102 static async open(factory: ReceivePackFactory, timeoutMs = SESSION_TIMEOUT_MS): Promise<ReceivePackSession> {
103 const proc = factory()
104 const watchdog = setTimeout(() => proc.kill(), timeoutMs)
105 try {
106 const reader = new PktLineReader(proc.stdout)
107 const advLines = await reader.readUntilFlush()
108 if (advLines === null) {
109 const err = classifySshStderr(proc.stderr())
110 throw err ?? new WireError(`receive-pack: no advertisement (stderr: ${proc.stderr().trim() || 'empty'})`)
111 }
112 const adv = parseAdvertisement(advLines)
113 assertCapabilities(adv)
114 return new ReceivePackSession(proc, reader, adv, watchdog)
115 }
116 catch (err) {
117 clearTimeout(watchdog)
118 proc.kill()
119 await proc.done.catch(() => null)
120 throw err
121 }
122 }
123
124 /**
125 * Send the ref update commands plus (for non-deletions) the packfile, then
126 * read and validate report-status. The packfile is streamed straight from
127 * `packStream` into stdin; nothing is buffered. Pass `null` for pure
128 * deletions.
129 */
130 async push(updates: RefUpdate[], packStream: AsyncIterable<Buffer> | null): Promise<void> {
131 if (updates.length === 0) throw new WireError('receive-pack: no updates')
132 try {
133 await writeAll(this.proc.stdin, buildCommandList(updates))
134 if (packStream) await pipePack(this.proc.stdin, packStream)
135 else this.proc.stdin.end()
136
137 const report = await this.reader.readUntilFlush()
138 parseReportStatus(report ?? [], updates, this.proc.stderr())
139 }
140 finally {
141 clearTimeout(this.watchdog)
142 this.proc.kill()
143 await this.proc.done.catch(() => null)
144 }
145 }
146
147 /** Close the session without pushing (advertisement-only use). */
148 async close(): Promise<void> {
149 clearTimeout(this.watchdog)
150 this.proc.stdin.end()
151 this.proc.kill()
152 await this.proc.done.catch(() => null)
153 }
154}
155
156function assertCapabilities(adv: Advertisement): void {
157 if (!adv.capabilities.has('report-status')) {
158 throw new WireError('knot receive-pack does not advertise report-status')
159 }
160}
161
162function buildCommandList(updates: RefUpdate[]): Buffer {
163 const parts: Buffer[] = []
164 updates.forEach((u, i) => {
165 const caps = i === 0 ? `\0report-status agent=${AGENT}/1` : ''
166 parts.push(encodePktLine(`${u.old} ${u.next} ${u.ref}${caps}\n`))
167 })
168 parts.push(flushPkt)
169 return Buffer.concat(parts)
170}
171
172function parseReportStatus(lines: Buffer[], updates: RefUpdate[], stderr: string): void {
173 if (lines.length === 0) {
174 const err = classifySshStderr(stderr)
175 throw err ?? new WireError(`receive-pack: empty report-status (stderr: ${stderr.trim() || 'empty'})`)
176 }
177 const unpack = lineToString(lines[0]!)
178 if (unpack !== 'unpack ok') {
179 throw new WireError(`receive-pack: ${unpack}`)
180 }
181 for (const raw of lines.slice(1)) {
182 const line = lineToString(raw)
183 if (line.startsWith('ng ')) {
184 // `ng <ref> <reason>`
185 const rest = line.slice(3)
186 const sp = rest.indexOf(' ')
187 const reason = sp === -1 ? rest : rest.slice(sp + 1)
188 throw classifyNgReason(reason)
189 }
190 }
191}
192
193async function writeAll(stream: NodeJS.WritableStream, data: Buffer): Promise<void> {
194 await new Promise<void>((resolve, reject) => {
195 stream.write(data, err => (err ? reject(err) : resolve()))
196 })
197}
198
199async function pipePack(stdin: NodeJS.WritableStream, packStream: AsyncIterable<Buffer>): Promise<void> {
200 const src = Readable.from(packStream)
201 await new Promise<void>((resolve, reject) => {
202 src.on('error', reject)
203 stdin.on('error', reject)
204 src.pipe(stdin, { end: true })
205 stdin.on('finish', resolve)
206 stdin.on('close', resolve)
207 })
208}