···11-import type { InstallationEvent } from '@octokit/webhooks-types'
11+import type {
22+ CreateEvent,
33+ DeleteEvent,
44+ InstallationEvent,
55+ InstallationRepositoriesEvent,
66+ PushEvent,
77+ RepositoryEvent,
88+} from '@octokit/webhooks-types'
29import { verify } from '@octokit/webhooks-methods'
310import { sql } from 'drizzle-orm'
411import { installation, webhookEvent } from '~~/server/db/schema'
1212+import { enqueue } from '~~/server/utils/queue'
513614const RECOGNISED_EVENTS = new Set([
715 'push',
···95103 }
96104 }
971059898- // TODO(commit 7): enqueue a job for recognised event types.
106106+ // Enqueue work for events we care about. Envelope shape is the minimum the
107107+ // handler needs to re-fetch fresh data via the GitHub API; we never persist
108108+ // the raw webhook body. See PLAN.md "Deferred / follow-ups".
99109 if (RECOGNISED_EVENTS.has(eventName)) {
100100- // Will become: await enqueue({ kind: eventName, payload: <envelope> })
110110+ await enqueueForEvent(event, eventName, deliveryId)
101111 }
102112103113 return { ok: true, deliveryId }
104114})
115115+116116+async function enqueueForEvent(event: Parameters<typeof readBody>[0], eventName: string, deliveryId: string) {
117117+ if (eventName === 'push') {
118118+ const body = await readBody<PushEvent>(event)
119119+ if (!body.installation) return
120120+ await enqueue('github.push', {
121121+ deliveryId,
122122+ installationId: body.installation.id,
123123+ githubRepoId: body.repository.id,
124124+ ref: body.ref,
125125+ before: body.before,
126126+ after: body.after,
127127+ })
128128+ }
129129+ else if (eventName === 'create') {
130130+ const body = await readBody<CreateEvent>(event)
131131+ if (!body.installation) return
132132+ await enqueue('github.create', {
133133+ deliveryId,
134134+ installationId: body.installation.id,
135135+ githubRepoId: body.repository.id,
136136+ refType: body.ref_type,
137137+ ref: body.ref,
138138+ })
139139+ }
140140+ else if (eventName === 'delete') {
141141+ const body = await readBody<DeleteEvent>(event)
142142+ if (!body.installation) return
143143+ await enqueue('github.delete', {
144144+ deliveryId,
145145+ installationId: body.installation.id,
146146+ githubRepoId: body.repository.id,
147147+ refType: body.ref_type,
148148+ ref: body.ref,
149149+ })
150150+ }
151151+ else if (eventName === 'repository') {
152152+ const body = await readBody<RepositoryEvent>(event)
153153+ if (!body.installation) return
154154+ await enqueue('github.repository', {
155155+ deliveryId,
156156+ installationId: body.installation.id,
157157+ githubRepoId: body.repository.id,
158158+ action: body.action,
159159+ })
160160+ }
161161+ else if (eventName === 'installation_repositories') {
162162+ const body = await readBody<InstallationRepositoriesEvent>(event)
163163+ await enqueue('github.installation_repositories', {
164164+ deliveryId,
165165+ installationId: body.installation.id,
166166+ action: body.action,
167167+ addedRepoIds: 'repositories_added' in body ? body.repositories_added.map(r => r.id) : [],
168168+ removedRepoIds: 'repositories_removed' in body ? body.repositories_removed.map(r => r.id) : [],
169169+ })
170170+ }
171171+ // 'installation' is handled inline above for bookkeeping; no job enqueued.
172172+}
+54
server/api/jobs/run.post.ts
···11+import crypto from 'node:crypto'
22+import { dispatch } from '~~/server/utils/job-handlers'
33+import { claim, complete, fail } from '~~/server/utils/queue'
44+55+const LEASE_MS = 5 * 60_000 // 5 min — generous for a sync job
66+const DEFAULT_BUDGET_MS = 25_000 // leave headroom under Vercel's 10s default; pro tiers can override
77+88+export default defineEventHandler(async event => {
99+ const config = useRuntimeConfig()
1010+ const cronSecret = config.cronSecret
1111+ if (!cronSecret) {
1212+ throw createError({ statusCode: 500, statusMessage: 'cron secret not configured' })
1313+ }
1414+1515+ const auth = getRequestHeader(event, 'authorization')
1616+ if (auth !== `Bearer ${cronSecret}`) {
1717+ throw createError({ statusCode: 401, statusMessage: 'unauthorized' })
1818+ }
1919+2020+ const workerId = `${process.env.VERCEL_DEPLOYMENT_ID ?? 'local'}:${crypto.randomUUID()}`
2121+ const budgetMs = Number(config.workerBudgetMs) || DEFAULT_BUDGET_MS
2222+ const deadline = Date.now() + budgetMs
2323+2424+ let processed = 0
2525+ let drained = false
2626+2727+ // Sequential by design: each iteration claims one job, runs it, records the
2828+ // outcome. We don't parallelise because each Vercel invocation is a single
2929+ // small worker; concurrency comes from cron firing multiple invocations.
3030+ // eslint-disable-next-line no-await-in-loop
3131+ while (Date.now() < deadline) {
3232+ // eslint-disable-next-line no-await-in-loop
3333+ const job = await claim(workerId, LEASE_MS)
3434+ if (!job) {
3535+ drained = true
3636+ break
3737+ }
3838+3939+ try {
4040+ // eslint-disable-next-line no-await-in-loop
4141+ await dispatch(job)
4242+ // eslint-disable-next-line no-await-in-loop
4343+ await complete(job.id)
4444+ }
4545+ catch (err) {
4646+ // eslint-disable-next-line no-await-in-loop
4747+ await fail(job.id, job.attempts, err)
4848+ }
4949+5050+ processed++
5151+ }
5252+5353+ return { ok: true, processed, drained, workerId }
5454+})
+19-4
server/utils/db.ts
···11import { neon } from '@neondatabase/serverless'
22-import { drizzle } from 'drizzle-orm/neon-http'
22+import { drizzle as drizzleNeon } from 'drizzle-orm/neon-http'
33import * as schema from '../db/schema'
4455-let _db: ReturnType<typeof drizzle<typeof schema>> | undefined
55+export type Db = ReturnType<typeof drizzleNeon<typeof schema>>
66+77+let _db: Db | undefined
88+99+/**
1010+ * Override the DB returned by `useDb()`. Tests inject a PGlite-backed Drizzle
1111+ * instance with the same schema. Production code never calls this.
1212+ */
1313+export function setDb(db: Db) {
1414+ _db = db
1515+}
1616+1717+/** Clear the cached DB so the next `useDb()` reconstructs from runtime config. */
1818+export function clearDb() {
1919+ _db = undefined
2020+}
62177-export function useDb() {
2222+export function useDb(): Db {
823 if (_db) return _db
924 const { databaseUrl } = useRuntimeConfig()
1025 if (!databaseUrl) {
1126 throw new Error('NUXT_DATABASE_URL is not set')
1227 }
1328 const client = neon(databaseUrl)
1414- _db = drizzle(client, { schema })
2929+ _db = drizzleNeon(client, { schema })
1530 return _db
1631}
1732
+30
server/utils/job-handlers.ts
···11+import type { JobEnvelope } from './queue'
22+33+/**
44+ * Map of job kind → handler. Handlers are filled in by later commits:
55+ * - 'github.push' → commit 11 (sync push events)
66+ * - 'github.create' / 'github.delete' → commit 12 (branch/tag ref ops)
77+ * - 'github.repository' → commit 13/14 (description, lifecycle)
88+ * - 'tangled.create-repo' → commit 10 (initial enrolment)
99+ * - 'atproto.publish-pubkey' → commit 9 (publish ssh public key)
1010+ *
1111+ * For now the dispatcher knows the recognised kinds but routes them all to a
1212+ * noop. An unknown kind throws so it surfaces as a job failure rather than
1313+ * silent acknowledgement.
1414+ */
1515+const KNOWN_KINDS = new Set([
1616+ 'github.push',
1717+ 'github.create',
1818+ 'github.delete',
1919+ 'github.repository',
2020+ 'github.installation_repositories',
2121+ 'tangled.create-repo',
2222+ 'atproto.publish-pubkey',
2323+])
2424+2525+export async function dispatch(envelope: JobEnvelope): Promise<void> {
2626+ if (!KNOWN_KINDS.has(envelope.kind)) {
2727+ throw new Error(`unknown job kind: ${envelope.kind}`)
2828+ }
2929+ // No-op until handlers land in later commits.
3030+}
+113
server/utils/queue.ts
···11+import { sql } from 'drizzle-orm'
22+import { job } from '../db/schema'
33+import { useDb } from './db'
44+55+export interface JobEnvelope {
66+ id: number
77+ kind: string
88+ payload: unknown
99+ attempts: number
1010+}
1111+1212+export interface EnqueueOptions {
1313+ /** Run no earlier than this time. Default: now. */
1414+ runAfter?: Date
1515+}
1616+1717+/**
1818+ * Push a job onto the queue. `payload` must be a JSON-serialisable object — keep
1919+ * it small (an envelope of identifiers, not a webhook body); see PLAN.md.
2020+ */
2121+export async function enqueue(kind: string, payload: object, opts: EnqueueOptions = {}) {
2222+ const db = useDb()
2323+ const [row] = await db.insert(job).values({
2424+ kind,
2525+ payload,
2626+ runAfter: opts.runAfter ?? new Date(),
2727+ }).returning({ id: job.id })
2828+ return row
2929+}
3030+3131+/**
3232+ * Atomically claim the oldest runnable job. Uses
3333+ * `UPDATE ... WHERE id = (SELECT ... FOR UPDATE SKIP LOCKED LIMIT 1)`
3434+ * so multiple workers don't race for the same row.
3535+ *
3636+ * Returns null when the queue is empty (or all rows are locked / not yet due).
3737+ */
3838+export async function claim(workerId: string, leaseMs: number): Promise<JobEnvelope | null> {
3939+ const db = useDb()
4040+ const leaseUntil = new Date(Date.now() + leaseMs)
4141+4242+ // Two conditions for a job to be claimable:
4343+ // 1. status='queued' AND run_after <= now()
4444+ // 2. status='running' AND locked_until < now() (lease expired)
4545+ const result = await db.execute(sql`
4646+ UPDATE ${job}
4747+ SET
4848+ status = 'running',
4949+ locked_by = ${workerId},
5050+ locked_until = ${leaseUntil.toISOString()},
5151+ attempts = ${job.attempts} + 1,
5252+ updated_at = now()
5353+ WHERE ${job.id} = (
5454+ SELECT ${job.id} FROM ${job}
5555+ WHERE
5656+ (${job.status} = 'queued' AND ${job.runAfter} <= now())
5757+ OR
5858+ (${job.status} = 'running' AND ${job.lockedUntil} < now())
5959+ ORDER BY ${job.runAfter} ASC
6060+ FOR UPDATE SKIP LOCKED
6161+ LIMIT 1
6262+ )
6363+ RETURNING id, kind, payload, attempts
6464+ `)
6565+6666+ // drizzle's neon-http execute returns rows on `.rows`; pglite's returns directly.
6767+ // Normalise.
6868+ const rows = (Array.isArray(result) ? result : (result as { rows?: unknown[] }).rows) ?? []
6969+ if (rows.length === 0) return null
7070+ return rows[0] as JobEnvelope
7171+}
7272+7373+/** Mark a job as completed. */
7474+export async function complete(id: number) {
7575+ const db = useDb()
7676+ await db.update(job)
7777+ .set({ status: 'done', lockedBy: null, lockedUntil: null, lastError: null, updatedAt: new Date() })
7878+ .where(sql`${job.id} = ${id}`)
7979+}
8080+8181+/**
8282+ * Record a failure. Re-queues with exponential backoff until `maxAttempts`,
8383+ * after which the job is marked `failed` and stays put for inspection.
8484+ */
8585+export async function fail(id: number, attempts: number, err: unknown, maxAttempts = 5) {
8686+ const db = useDb()
8787+ const message = err instanceof Error ? err.message : String(err)
8888+8989+ if (attempts >= maxAttempts) {
9090+ await db.update(job)
9191+ .set({ status: 'failed', lastError: message, lockedBy: null, lockedUntil: null, updatedAt: new Date() })
9292+ .where(sql`${job.id} = ${id}`)
9393+ return
9494+ }
9595+9696+ // Exponential backoff with ±20% jitter: base*2^attempts, capped.
9797+ const base = 30_000 // 30s
9898+ const cap = 60 * 60_000 // 1h
9999+ const backoff = Math.min(base * (2 ** (attempts - 1)), cap)
100100+ const jitter = backoff * (0.8 + Math.random() * 0.4)
101101+ const runAfter = new Date(Date.now() + jitter)
102102+103103+ await db.update(job)
104104+ .set({
105105+ status: 'queued',
106106+ lockedBy: null,
107107+ lockedUntil: null,
108108+ runAfter,
109109+ lastError: message,
110110+ updatedAt: new Date(),
111111+ })
112112+ .where(sql`${job.id} = ${id}`)
113113+}
···11+import { readFileSync } from 'node:fs'
22+import { fileURLToPath } from 'node:url'
33+import { PGlite } from '@electric-sql/pglite'
44+import { drizzle } from 'drizzle-orm/pglite'
55+import * as schema from '../../server/db/schema'
66+import type { Db } from '../../server/utils/db'
77+88+const migrationPath = fileURLToPath(
99+ new URL('../../server/db/migrations/0000_initial.sql', import.meta.url),
1010+)
1111+1212+/**
1313+ * Create a fresh in-memory Postgres (PGlite) for a single test, apply our
1414+ * schema, and return a Drizzle instance. Each call returns an isolated DB.
1515+ */
1616+export async function createTestDb(): Promise<Db> {
1717+ const pg = new PGlite()
1818+ const sql = readFileSync(migrationPath, 'utf8')
1919+2020+ // The drizzle-generated migration file uses `--> statement-breakpoint` between
2121+ // statements; PGlite's exec accepts the whole thing if we strip those markers.
2222+ // Sequential by design — DDL ordering matters.
2323+ for (const statement of sql.split('--> statement-breakpoint')) {
2424+ const trimmed = statement.trim()
2525+ // eslint-disable-next-line no-await-in-loop
2626+ if (trimmed) await pg.exec(trimmed)
2727+ }
2828+2929+ return drizzle(pg, { schema }) as unknown as Db
3030+}