mirror your GitHub repos to tangled.org automatically
1import { sql } from 'drizzle-orm'
2import { job } from '../db/schema'
3import { useDb } from './db'
4
5export interface JobEnvelope {
6 id: number
7 kind: string
8 payload: unknown
9 attempts: number
10}
11
12export interface EnqueueOptions {
13 /** Run no earlier than this time. Default: now. */
14 runAfter?: Date
15}
16
17/**
18 * Push a job onto the queue. `payload` must be a JSON-serialisable object — keep
19 * it small (an envelope of identifiers, not a webhook body); see PLAN.md.
20 */
21export async function enqueue(kind: string, payload: object, opts: EnqueueOptions = {}) {
22 const db = useDb()
23 const [row] = await db.insert(job).values({
24 kind,
25 payload,
26 runAfter: opts.runAfter ?? new Date(),
27 }).returning({ id: job.id })
28 return row
29}
30
31/**
32 * Atomically claim the oldest runnable job. Uses
33 * `UPDATE ... WHERE id = (SELECT ... FOR UPDATE SKIP LOCKED LIMIT 1)`
34 * so multiple workers don't race for the same row.
35 *
36 * Returns null when the queue is empty (or all rows are locked / not yet due).
37 */
38export async function claim(workerId: string, leaseMs: number): Promise<JobEnvelope | null> {
39 const db = useDb()
40 const leaseUntil = new Date(Date.now() + leaseMs)
41
42 // Two conditions for a job to be claimable:
43 // 1. status='queued' AND run_after <= now()
44 // 2. status='running' AND locked_until < now() (lease expired)
45 const result = await db.execute(sql`
46 UPDATE ${job}
47 SET
48 status = 'running',
49 locked_by = ${workerId},
50 locked_until = ${leaseUntil.toISOString()},
51 attempts = ${job.attempts} + 1,
52 updated_at = now()
53 WHERE ${job.id} = (
54 SELECT ${job.id} FROM ${job}
55 WHERE
56 (${job.status} = 'queued' AND ${job.runAfter} <= now())
57 OR
58 (${job.status} = 'running' AND ${job.lockedUntil} < now())
59 ORDER BY ${job.runAfter} ASC
60 FOR UPDATE SKIP LOCKED
61 LIMIT 1
62 )
63 RETURNING id, kind, payload, attempts
64 `)
65
66 // drizzle's neon-http execute returns rows on `.rows`; pglite's returns directly.
67 // Normalise.
68 const rows = (Array.isArray(result) ? result : (result as { rows?: unknown[] }).rows) ?? []
69 if (rows.length === 0) return null
70 return rows[0] as JobEnvelope
71}
72
73/** Mark a job as completed. */
74export async function complete(id: number) {
75 const db = useDb()
76 await db.update(job)
77 .set({ status: 'done', lockedBy: null, lockedUntil: null, lastError: null, updatedAt: new Date() })
78 .where(sql`${job.id} = ${id}`)
79}
80
81/**
82 * Record a failure. Re-queues with exponential backoff until `maxAttempts`,
83 * after which the job is marked `failed` and stays put for inspection.
84 */
85export async function fail(id: number, attempts: number, err: unknown, maxAttempts = 5) {
86 const db = useDb()
87 const message = err instanceof Error ? err.message : String(err)
88
89 if (attempts >= maxAttempts) {
90 await db.update(job)
91 .set({ status: 'failed', lastError: message, lockedBy: null, lockedUntil: null, updatedAt: new Date() })
92 .where(sql`${job.id} = ${id}`)
93 return
94 }
95
96 // Exponential backoff with ±20% jitter: base*2^attempts, capped.
97 const base = 30_000 // 30s
98 const cap = 60 * 60_000 // 1h
99 const backoff = Math.min(base * (2 ** (attempts - 1)), cap)
100 const jitter = backoff * (0.8 + Math.random() * 0.4)
101 const runAfter = new Date(Date.now() + jitter)
102
103 await db.update(job)
104 .set({
105 status: 'queued',
106 lockedBy: null,
107 lockedUntil: null,
108 runAfter,
109 lastError: message,
110 updatedAt: new Date(),
111 })
112 .where(sql`${job.id} = ${id}`)
113}