mirror your GitHub repos to tangled.org automatically
1

Configure Feed

Select the types of activity you want to include in your feed.

1import { type ChildProcessWithoutNullStreams, spawn } from 'node:child_process' 2import { Readable } from 'node:stream' 3import { classifyNgReason, classifySshStderr, WireError } from './errors' 4import { encodePktLine, flushPkt, lineToString, PktLineReader } from './pkt-line' 5import { type Advertisement, parseAdvertisement } from './refs' 6 7const AGENT = 'synchub.to' 8const STDERR_CAP = 16 * 1024 9/** 10 * Whole-session budget. receive-pack blocks indefinitely waiting for commands, 11 * so without this a knot that accepts the connection but stalls mid-protocol 12 * would hang a worker until its job lease expires. On expiry we SIGKILL the 13 * child; the in-flight read sees the stream end and throws, surfacing as a 14 * transient failure the queue retries. 15 */ 16const SESSION_TIMEOUT_MS = 120_000 17 18export interface RefUpdate { 19 ref: string 20 /** Current value on the knot, or the zero SHA to create. The compare-and-swap. */ 21 old: string 22 /** New value, or the zero SHA to delete. */ 23 next: string 24} 25 26/** 27 * A spawned process exposing `git-receive-pack`'s stdio. The default factory 28 * runs ssh to the knot; tests inject a factory that spawns the binary against 29 * a local bare repo, so the stdio protocol is identical either way. 30 */ 31export interface ReceivePackProcess { 32 stdin: NodeJS.WritableStream 33 stdout: AsyncIterable<Buffer> 34 /** Last bytes of stderr, for diagnostics (we don't request side-band). */ 35 stderr(): string 36 kill(): void 37 /** Resolves with the exit code once the process ends. */ 38 done: Promise<number | null> 39} 40 41export type ReceivePackFactory = () => ReceivePackProcess 42 43export interface SshTarget { 44 host: string 45 port?: number 46 repoPath: string 47 sshArgs: string[] 48} 49 50/** Default transport: ssh to the knot and invoke its `git-receive-pack`. */ 51export function sshReceivePackFactory(target: SshTarget): ReceivePackFactory { 52 return () => { 53 const portArgs = target.port ? ['-p', String(target.port)] : [] 54 // ssh:// transports invoke the remote command with the path including its 55 // leading slash, single-quoted. The knot resolves repos by that path. 56 const remoteCmd = `git-receive-pack '${target.repoPath}'` 57 const child = spawn('ssh', [...target.sshArgs, ...portArgs, `git@${target.host}`, remoteCmd], { 58 stdio: ['pipe', 'pipe', 'pipe'], 59 }) 60 return wrapChild(child) 61 } 62} 63 64function wrapChild(child: ChildProcessWithoutNullStreams): ReceivePackProcess { 65 let stderrBuf = Buffer.alloc(0) 66 child.stderr.on('data', (chunk: Buffer) => { 67 stderrBuf = Buffer.concat([stderrBuf, chunk]).subarray(-STDERR_CAP) 68 }) 69 const done = new Promise<number | null>(resolve => child.on('close', resolve)) 70 return { 71 stdin: child.stdin, 72 stdout: child.stdout, 73 stderr: () => stderrBuf.toString('utf8'), 74 kill: () => child.kill('SIGKILL'), 75 done, 76 } 77} 78 79/** 80 * An open receive-pack session. Read `tips` after construction to learn the 81 * knot's current refs (needed as haves and as the compare-and-swap base), 82 * then call `push` once with the commands and packfile. 83 */ 84export class ReceivePackSession { 85 readonly tips: Map<string, string> 86 readonly capabilities: Set<string> 87 88 private readonly watchdog: NodeJS.Timeout 89 90 private constructor( 91 private readonly proc: ReceivePackProcess, 92 private readonly reader: PktLineReader, 93 adv: Advertisement, 94 watchdog: NodeJS.Timeout, 95 ) { 96 this.tips = adv.refs 97 this.capabilities = adv.capabilities 98 this.watchdog = watchdog 99 } 100 101 /** Open the session and read the advertisement. */ 102 static async open(factory: ReceivePackFactory, timeoutMs = SESSION_TIMEOUT_MS): Promise<ReceivePackSession> { 103 const proc = factory() 104 const watchdog = setTimeout(() => proc.kill(), timeoutMs) 105 try { 106 const reader = new PktLineReader(proc.stdout) 107 const advLines = await reader.readUntilFlush() 108 if (advLines === null) { 109 const err = classifySshStderr(proc.stderr()) 110 throw err ?? new WireError(`receive-pack: no advertisement (stderr: ${proc.stderr().trim() || 'empty'})`) 111 } 112 const adv = parseAdvertisement(advLines) 113 assertCapabilities(adv) 114 return new ReceivePackSession(proc, reader, adv, watchdog) 115 } 116 catch (err) { 117 clearTimeout(watchdog) 118 proc.kill() 119 await proc.done.catch(() => null) 120 throw err 121 } 122 } 123 124 /** 125 * Send the ref update commands plus (for non-deletions) the packfile, then 126 * read and validate report-status. The packfile is streamed straight from 127 * `packStream` into stdin; nothing is buffered. Pass `null` for pure 128 * deletions. 129 */ 130 async push(updates: RefUpdate[], packStream: AsyncIterable<Buffer> | null): Promise<void> { 131 if (updates.length === 0) throw new WireError('receive-pack: no updates') 132 try { 133 await writeAll(this.proc.stdin, buildCommandList(updates)) 134 if (packStream) await pipePack(this.proc.stdin, packStream) 135 else this.proc.stdin.end() 136 137 const report = await this.reader.readUntilFlush() 138 parseReportStatus(report ?? [], updates, this.proc.stderr()) 139 } 140 finally { 141 clearTimeout(this.watchdog) 142 this.proc.kill() 143 await this.proc.done.catch(() => null) 144 } 145 } 146 147 /** Close the session without pushing (advertisement-only use). */ 148 async close(): Promise<void> { 149 clearTimeout(this.watchdog) 150 this.proc.stdin.end() 151 this.proc.kill() 152 await this.proc.done.catch(() => null) 153 } 154} 155 156function assertCapabilities(adv: Advertisement): void { 157 if (!adv.capabilities.has('report-status')) { 158 throw new WireError('knot receive-pack does not advertise report-status') 159 } 160} 161 162function buildCommandList(updates: RefUpdate[]): Buffer { 163 const parts: Buffer[] = [] 164 updates.forEach((u, i) => { 165 const caps = i === 0 ? `\0report-status agent=${AGENT}/1` : '' 166 parts.push(encodePktLine(`${u.old} ${u.next} ${u.ref}${caps}\n`)) 167 }) 168 parts.push(flushPkt) 169 return Buffer.concat(parts) 170} 171 172function parseReportStatus(lines: Buffer[], updates: RefUpdate[], stderr: string): void { 173 if (lines.length === 0) { 174 const err = classifySshStderr(stderr) 175 throw err ?? new WireError(`receive-pack: empty report-status (stderr: ${stderr.trim() || 'empty'})`) 176 } 177 const unpack = lineToString(lines[0]!) 178 if (unpack !== 'unpack ok') { 179 throw new WireError(`receive-pack: ${unpack}`) 180 } 181 for (const raw of lines.slice(1)) { 182 const line = lineToString(raw) 183 if (line.startsWith('ng ')) { 184 // `ng <ref> <reason>` 185 const rest = line.slice(3) 186 const sp = rest.indexOf(' ') 187 const reason = sp === -1 ? rest : rest.slice(sp + 1) 188 throw classifyNgReason(reason) 189 } 190 } 191} 192 193async function writeAll(stream: NodeJS.WritableStream, data: Buffer): Promise<void> { 194 await new Promise<void>((resolve, reject) => { 195 stream.write(data, err => (err ? reject(err) : resolve())) 196 }) 197} 198 199async function pipePack(stdin: NodeJS.WritableStream, packStream: AsyncIterable<Buffer>): Promise<void> { 200 const src = Readable.from(packStream) 201 await new Promise<void>((resolve, reject) => { 202 src.on('error', reject) 203 stdin.on('error', reject) 204 src.pipe(stdin, { end: true }) 205 stdin.on('finish', resolve) 206 stdin.on('close', resolve) 207 }) 208}