mirror your GitHub repos to tangled.org automatically
1

Configure Feed

Select the types of activity you want to include in your feed.

1import { sql } from 'drizzle-orm' 2import { job } from '../db/schema' 3import { useDb } from './db' 4 5export interface JobEnvelope { 6 id: number 7 kind: string 8 payload: unknown 9 attempts: number 10} 11 12export interface EnqueueOptions { 13 /** Run no earlier than this time. Default: now. */ 14 runAfter?: Date 15} 16 17/** 18 * Push a job onto the queue. `payload` must be a JSON-serialisable object — keep 19 * it small (an envelope of identifiers, not a webhook body); see PLAN.md. 20 */ 21export async function enqueue(kind: string, payload: object, opts: EnqueueOptions = {}) { 22 const db = useDb() 23 const [row] = await db.insert(job).values({ 24 kind, 25 payload, 26 runAfter: opts.runAfter ?? new Date(), 27 }).returning({ id: job.id }) 28 return row 29} 30 31/** 32 * Atomically claim the oldest runnable job. Uses 33 * `UPDATE ... WHERE id = (SELECT ... FOR UPDATE SKIP LOCKED LIMIT 1)` 34 * so multiple workers don't race for the same row. 35 * 36 * Returns null when the queue is empty (or all rows are locked / not yet due). 37 */ 38export async function claim(workerId: string, leaseMs: number): Promise<JobEnvelope | null> { 39 const db = useDb() 40 const leaseUntil = new Date(Date.now() + leaseMs) 41 42 // Two conditions for a job to be claimable: 43 // 1. status='queued' AND run_after <= now() 44 // 2. status='running' AND locked_until < now() (lease expired) 45 const result = await db.execute(sql` 46 UPDATE ${job} 47 SET 48 status = 'running', 49 locked_by = ${workerId}, 50 locked_until = ${leaseUntil.toISOString()}, 51 attempts = ${job.attempts} + 1, 52 updated_at = now() 53 WHERE ${job.id} = ( 54 SELECT ${job.id} FROM ${job} 55 WHERE 56 (${job.status} = 'queued' AND ${job.runAfter} <= now()) 57 OR 58 (${job.status} = 'running' AND ${job.lockedUntil} < now()) 59 ORDER BY ${job.runAfter} ASC 60 FOR UPDATE SKIP LOCKED 61 LIMIT 1 62 ) 63 RETURNING id, kind, payload, attempts 64 `) 65 66 // drizzle's neon-http execute returns rows on `.rows`; pglite's returns directly. 67 // Normalise. 68 const rows = (Array.isArray(result) ? result : (result as { rows?: unknown[] }).rows) ?? [] 69 if (rows.length === 0) return null 70 return rows[0] as JobEnvelope 71} 72 73/** Mark a job as completed. */ 74export async function complete(id: number) { 75 const db = useDb() 76 await db.update(job) 77 .set({ status: 'done', lockedBy: null, lockedUntil: null, lastError: null, updatedAt: new Date() }) 78 .where(sql`${job.id} = ${id}`) 79} 80 81/** 82 * Record a failure. Re-queues with exponential backoff until `maxAttempts`, 83 * after which the job is marked `failed` and stays put for inspection. 84 */ 85export async function fail(id: number, attempts: number, err: unknown, maxAttempts = 5) { 86 const db = useDb() 87 const message = err instanceof Error ? err.message : String(err) 88 89 if (attempts >= maxAttempts) { 90 await db.update(job) 91 .set({ status: 'failed', lastError: message, lockedBy: null, lockedUntil: null, updatedAt: new Date() }) 92 .where(sql`${job.id} = ${id}`) 93 return 94 } 95 96 // Exponential backoff with ±20% jitter: base*2^attempts, capped. 97 const base = 30_000 // 30s 98 const cap = 60 * 60_000 // 1h 99 const backoff = Math.min(base * (2 ** (attempts - 1)), cap) 100 const jitter = backoff * (0.8 + Math.random() * 0.4) 101 const runAfter = new Date(Date.now() + jitter) 102 103 await db.update(job) 104 .set({ 105 status: 'queued', 106 lockedBy: null, 107 lockedUntil: null, 108 runAfter, 109 lastError: message, 110 updatedAt: new Date(), 111 }) 112 .where(sql`${job.id} = ${id}`) 113}