mirror your GitHub repos to tangled.org automatically
1import { sql } from 'drizzle-orm'
2import { job } from '../db/schema'
3import { useDb } from './db'
4
5export interface JobEnvelope extends Record<string, unknown> {
6 id: number
7 kind: string
8 payload: unknown
9 attempts: number
10}
11
12export interface EnqueueOptions {
13 /** Run no earlier than this time. Default: now. */
14 runAfter?: Date
15}
16
17/**
18 * Push a job onto the queue. `payload` must be a JSON-serialisable object — keep
19 * it small (an envelope of identifiers, not a webhook body); see PLAN.md.
20 */
21export async function enqueue(kind: string, payload: object, opts: EnqueueOptions = {}) {
22 const db = useDb()
23 const [row] = await db.insert(job).values({
24 kind,
25 payload,
26 runAfter: opts.runAfter ?? new Date(),
27 }).returning({ id: job.id })
28 return row!
29}
30
31/**
32 * Atomically claim the oldest runnable job. Uses
33 * `UPDATE ... WHERE id = (SELECT ... FOR UPDATE SKIP LOCKED LIMIT 1)`
34 * so multiple workers don't race for the same row.
35 *
36 * Returns null when the queue is empty (or all rows are locked / not yet due).
37 */
38export async function claim(workerId: string, leaseMs: number): Promise<JobEnvelope | null> {
39 const db = useDb()
40 const leaseUntil = new Date(Date.now() + leaseMs)
41
42 // Two conditions for a job to be claimable:
43 // 1. status='queued' AND run_after <= now()
44 // 2. status='running' AND locked_until < now() (lease expired)
45 const result = await db.execute<JobEnvelope>(sql`
46 UPDATE ${job}
47 SET
48 status = 'running',
49 locked_by = ${workerId},
50 locked_until = ${leaseUntil.toISOString()},
51 attempts = ${job.attempts} + 1,
52 updated_at = now()
53 WHERE ${job.id} = (
54 SELECT ${job.id} FROM ${job}
55 WHERE
56 (${job.status} = 'queued' AND ${job.runAfter} <= now())
57 OR
58 (${job.status} = 'running' AND ${job.lockedUntil} < now())
59 ORDER BY ${job.runAfter} ASC
60 FOR UPDATE SKIP LOCKED
61 LIMIT 1
62 )
63 RETURNING id, kind, payload, attempts
64 `)
65
66 // drizzle's neon-http execute returns rows on `.rows`; pglite's returns directly.
67 // Normalise.
68 const rows: JobEnvelope[] = Array.isArray(result)
69 ? result
70 : ((result as { rows?: JobEnvelope[] }).rows ?? [])
71 const envelope = rows[0]
72 return envelope ?? null
73}
74
75/** Mark a job as completed. */
76export async function complete(id: number) {
77 const db = useDb()
78 await db.update(job)
79 .set({ status: 'done', lockedBy: null, lockedUntil: null, lastError: null, updatedAt: new Date() })
80 .where(sql`${job.id} = ${id}`)
81}
82
83/**
84 * Record a failure. Re-queues with exponential backoff until `maxAttempts`,
85 * after which the job is marked `failed` and stays put for inspection.
86 */
87export async function fail(id: number, attempts: number, err: unknown, maxAttempts = 5) {
88 const db = useDb()
89 const message = err instanceof Error ? err.message : String(err)
90
91 if (attempts >= maxAttempts) {
92 await db.update(job)
93 .set({ status: 'failed', lastError: message, lockedBy: null, lockedUntil: null, updatedAt: new Date() })
94 .where(sql`${job.id} = ${id}`)
95 return
96 }
97
98 // Exponential backoff with ±20% jitter: base*2^attempts, capped.
99 const base = 30_000 // 30s
100 const cap = 60 * 60_000 // 1h
101 const backoff = Math.min(base * (2 ** (attempts - 1)), cap)
102 const jitter = backoff * (0.8 + Math.random() * 0.4)
103 const runAfter = new Date(Date.now() + jitter)
104
105 await db.update(job)
106 .set({
107 status: 'queued',
108 lockedBy: null,
109 lockedUntil: null,
110 runAfter,
111 lastError: message,
112 updatedAt: new Date(),
113 })
114 .where(sql`${job.id} = ${id}`)
115}