mirror your GitHub repos to tangled.org automatically
1

Configure Feed

Select the types of activity you want to include in your feed.

at main 9.3 kB View raw
1import { PassThrough, Readable } from 'node:stream' 2import { Client } from 'ssh2' 3import { classifyNgReason, classifySshStderr, WireError } from './errors' 4import { encodePktLine, flushPkt, lineToString, PktLineReader } from './pkt-line' 5import { type Advertisement, parseAdvertisement } from './refs' 6 7const AGENT = 'synchub.to' 8const STDERR_CAP = 16 * 1024 9/** 10 * Whole-session budget. receive-pack blocks indefinitely waiting for commands, 11 * so without this a knot that accepts the connection but stalls mid-protocol 12 * would hang a worker until its job lease expires. On expiry we SIGKILL the 13 * child; the in-flight read sees the stream end and throws, surfacing as a 14 * transient failure the queue retries. 15 */ 16const SESSION_TIMEOUT_MS = 120_000 17 18export interface RefUpdate { 19 ref: string 20 /** Current value on the knot, or the zero SHA to create. The compare-and-swap. */ 21 old: string 22 /** New value, or the zero SHA to delete. */ 23 next: string 24} 25 26/** 27 * A process exposing `git-receive-pack`'s stdio. The default factory runs an 28 * in-process `ssh2` channel to the knot; tests inject a factory that spawns the 29 * binary against a local bare repo, so the stdio protocol is identical either 30 * way. 31 */ 32export interface ReceivePackProcess { 33 stdin: NodeJS.WritableStream 34 stdout: AsyncIterable<Buffer> 35 /** Last bytes of stderr, for diagnostics (we don't request side-band). */ 36 stderr(): string 37 kill(): void 38 /** Resolves with the exit code once the process ends. */ 39 done: Promise<number | null> 40} 41 42export type ReceivePackFactory = () => ReceivePackProcess 43 44export interface SshTarget { 45 host: string 46 port?: number 47 repoPath: string 48 /** Decrypted OpenSSH-format private key for this install. */ 49 privateKey: string 50} 51 52/** 53 * Default transport: open an in-process `ssh2` connection to the knot and run 54 * its `git-receive-pack`. No `ssh` binary (the Vercel runtime has none); the 55 * key stays in memory. 56 * 57 * Host keys: tangled knots are addressed by hostname over TLS-fronted DNS, and 58 * v1 has no pinned host keys, so `hostVerifier` accepts any (TOFU-equivalent to 59 * the previous `StrictHostKeyChecking=accept-new`). Pin once the canonical knot 60 * keys are known. 61 */ 62export function ssh2ReceivePackFactory(target: SshTarget): ReceivePackFactory { 63 return () => { 64 // The knot resolves repos by the leading-slash path, single-quoted. 65 const remoteCmd = `git-receive-pack '${target.repoPath}'` 66 const client = new Client() 67 68 const stdin = new PassThrough() 69 const stdout = new PassThrough() 70 let stderrBuf = Buffer.alloc(0) 71 let connError: Error | null = null 72 let killed = false 73 74 const appendStderr = (chunk: Buffer) => { 75 stderrBuf = Buffer.concat([stderrBuf, chunk]).subarray(-STDERR_CAP) 76 } 77 78 const done = new Promise<number | null>(resolve => { 79 let settled = false 80 const settle = (code: number | null) => { 81 if (settled) return 82 settled = true 83 resolve(code) 84 } 85 86 client.on('ready', () => { 87 client.exec(remoteCmd, (err, channel) => { 88 if (err) { 89 connError = err 90 stdout.end() 91 client.end() 92 settle(null) 93 return 94 } 95 stdin.pipe(channel) 96 channel.pipe(stdout) 97 channel.stderr.on('data', appendStderr) 98 channel.on('exit', code => settle(typeof code === 'number' ? code : null)) 99 channel.on('close', () => { client.end(); stdout.end() }) 100 }) 101 }) 102 103 // A connection / auth failure surfaces here. Capturing it (rather than 104 // leaving 'error' unhandled, which crashes the process) folds the message 105 // into the stderr band so open() reports a WireError the job handler 106 // catches. Ending stdout unblocks the advertisement read. 107 client.on('error', err => { 108 if (!killed) connError = err 109 stdout.end() 110 settle(null) 111 }) 112 113 // Always-fires backstop: `client.end()` (from kill(), a channel close, or 114 // an exec error) emits 'close', so `done` resolves even if the channel 115 // already exited and won't emit another event. 116 client.on('close', () => { 117 stdout.end() 118 settle(null) 119 }) 120 }) 121 122 // stdin EPIPE-style errors once the channel goes away are expected. 123 stdin.on('error', () => {}) 124 125 client.connect({ 126 host: target.host, 127 port: target.port ?? 22, 128 username: 'git', 129 privateKey: target.privateKey, 130 readyTimeout: 15_000, 131 hostVerifier: () => true, 132 }) 133 134 return { 135 stdin, 136 stdout, 137 stderr: () => { 138 const captured = stderrBuf.toString('utf8') 139 if (connError) return `${captured}${captured ? '\n' : ''}ssh error: ${connError.message}`.trim() 140 return captured 141 }, 142 kill: () => { 143 killed = true 144 client.end() 145 stdout.end() 146 }, 147 done, 148 } 149 } 150} 151 152/** 153 * An open receive-pack session. Read `tips` after construction to learn the 154 * knot's current refs (needed as haves and as the compare-and-swap base), 155 * then call `push` once with the commands and packfile. 156 */ 157export class ReceivePackSession { 158 readonly tips: Map<string, string> 159 readonly capabilities: Set<string> 160 161 private readonly watchdog: NodeJS.Timeout 162 163 private constructor( 164 private readonly proc: ReceivePackProcess, 165 private readonly reader: PktLineReader, 166 adv: Advertisement, 167 watchdog: NodeJS.Timeout, 168 ) { 169 this.tips = adv.refs 170 this.capabilities = adv.capabilities 171 this.watchdog = watchdog 172 } 173 174 /** Open the session and read the advertisement. */ 175 static async open(factory: ReceivePackFactory, timeoutMs = SESSION_TIMEOUT_MS): Promise<ReceivePackSession> { 176 const proc = factory() 177 const watchdog = setTimeout(() => proc.kill(), timeoutMs) 178 try { 179 const reader = new PktLineReader(proc.stdout) 180 const advLines = await reader.readUntilFlush() 181 if (advLines === null) { 182 const err = classifySshStderr(proc.stderr()) 183 throw err ?? new WireError(`receive-pack: no advertisement (stderr: ${proc.stderr().trim() || 'empty'})`) 184 } 185 const adv = parseAdvertisement(advLines) 186 assertCapabilities(adv) 187 return new ReceivePackSession(proc, reader, adv, watchdog) 188 } 189 catch (err) { 190 clearTimeout(watchdog) 191 proc.kill() 192 await proc.done.catch(() => null) 193 throw err 194 } 195 } 196 197 /** 198 * Send the ref update commands plus (for non-deletions) the packfile, then 199 * read and validate report-status. The packfile is streamed straight from 200 * `packStream` into stdin; nothing is buffered. Pass `null` for pure 201 * deletions. 202 */ 203 async push(updates: RefUpdate[], packStream: AsyncIterable<Buffer> | null): Promise<void> { 204 if (updates.length === 0) throw new WireError('receive-pack: no updates') 205 try { 206 await writeAll(this.proc.stdin, buildCommandList(updates)) 207 if (packStream) await pipePack(this.proc.stdin, packStream) 208 else this.proc.stdin.end() 209 210 const report = await this.reader.readUntilFlush() 211 parseReportStatus(report ?? [], updates, this.proc.stderr()) 212 } 213 finally { 214 clearTimeout(this.watchdog) 215 this.proc.kill() 216 await this.proc.done.catch(() => null) 217 } 218 } 219 220 /** Close the session without pushing (advertisement-only use). */ 221 async close(): Promise<void> { 222 clearTimeout(this.watchdog) 223 this.proc.stdin.end() 224 this.proc.kill() 225 await this.proc.done.catch(() => null) 226 } 227} 228 229function assertCapabilities(adv: Advertisement): void { 230 if (!adv.capabilities.has('report-status')) { 231 throw new WireError('knot receive-pack does not advertise report-status') 232 } 233} 234 235function buildCommandList(updates: RefUpdate[]): Buffer { 236 const parts: Buffer[] = [] 237 updates.forEach((u, i) => { 238 const caps = i === 0 ? `\0report-status agent=${AGENT}/1` : '' 239 parts.push(encodePktLine(`${u.old} ${u.next} ${u.ref}${caps}\n`)) 240 }) 241 parts.push(flushPkt) 242 return Buffer.concat(parts) 243} 244 245function parseReportStatus(lines: Buffer[], updates: RefUpdate[], stderr: string): void { 246 if (lines.length === 0) { 247 const err = classifySshStderr(stderr) 248 throw err ?? new WireError(`receive-pack: empty report-status (stderr: ${stderr.trim() || 'empty'})`) 249 } 250 const unpack = lineToString(lines[0]!) 251 if (unpack !== 'unpack ok') { 252 throw new WireError(`receive-pack: ${unpack}`) 253 } 254 for (const raw of lines.slice(1)) { 255 const line = lineToString(raw) 256 if (line.startsWith('ng ')) { 257 // `ng <ref> <reason>` 258 const rest = line.slice(3) 259 const sp = rest.indexOf(' ') 260 const reason = sp === -1 ? rest : rest.slice(sp + 1) 261 throw classifyNgReason(reason) 262 } 263 } 264} 265 266async function writeAll(stream: NodeJS.WritableStream, data: Buffer): Promise<void> { 267 await new Promise<void>((resolve, reject) => { 268 stream.write(data, err => (err ? reject(err) : resolve())) 269 }) 270} 271 272async function pipePack(stdin: NodeJS.WritableStream, packStream: AsyncIterable<Buffer>): Promise<void> { 273 const src = Readable.from(packStream) 274 await new Promise<void>((resolve, reject) => { 275 src.on('error', reject) 276 stdin.on('error', reject) 277 src.pipe(stdin, { end: true }) 278 stdin.on('finish', resolve) 279 stdin.on('close', resolve) 280 }) 281}