mirror your GitHub repos to tangled.org automatically
1

Configure Feed

Select the types of activity you want to include in your feed.

1import { sql } from 'drizzle-orm' 2import { job } from '../db/schema' 3import { useDb } from './db' 4 5export interface JobEnvelope extends Record<string, unknown> { 6 id: number 7 kind: string 8 payload: unknown 9 attempts: number 10} 11 12export interface EnqueueOptions { 13 /** Run no earlier than this time. Default: now. */ 14 runAfter?: Date 15} 16 17/** 18 * Push a job onto the queue. `payload` must be a JSON-serialisable object — keep 19 * it small (an envelope of identifiers, not a webhook body); see PLAN.md. 20 */ 21export async function enqueue(kind: string, payload: object, opts: EnqueueOptions = {}) { 22 const db = useDb() 23 const [row] = await db.insert(job).values({ 24 kind, 25 payload, 26 runAfter: opts.runAfter ?? new Date(), 27 }).returning({ id: job.id }) 28 return row! 29} 30 31/** 32 * Atomically claim the oldest runnable job. Uses 33 * `UPDATE ... WHERE id = (SELECT ... FOR UPDATE SKIP LOCKED LIMIT 1)` 34 * so multiple workers don't race for the same row. 35 * 36 * Returns null when the queue is empty (or all rows are locked / not yet due). 37 */ 38export async function claim(workerId: string, leaseMs: number): Promise<JobEnvelope | null> { 39 const db = useDb() 40 const leaseUntil = new Date(Date.now() + leaseMs) 41 42 // Two conditions for a job to be claimable: 43 // 1. status='queued' AND run_after <= now() 44 // 2. status='running' AND locked_until < now() (lease expired) 45 const result = await db.execute<JobEnvelope>(sql` 46 UPDATE ${job} 47 SET 48 status = 'running', 49 locked_by = ${workerId}, 50 locked_until = ${leaseUntil.toISOString()}, 51 attempts = ${job.attempts} + 1, 52 updated_at = now() 53 WHERE ${job.id} = ( 54 SELECT ${job.id} FROM ${job} 55 WHERE 56 (${job.status} = 'queued' AND ${job.runAfter} <= now()) 57 OR 58 (${job.status} = 'running' AND ${job.lockedUntil} < now()) 59 ORDER BY ${job.runAfter} ASC 60 FOR UPDATE SKIP LOCKED 61 LIMIT 1 62 ) 63 RETURNING id, kind, payload, attempts 64 `) 65 66 // drizzle's neon-http execute returns rows on `.rows`; pglite's returns directly. 67 // Normalise. 68 const rows: JobEnvelope[] = Array.isArray(result) 69 ? result 70 : ((result as { rows?: JobEnvelope[] }).rows ?? []) 71 const envelope = rows[0] 72 return envelope ?? null 73} 74 75/** Mark a job as completed. */ 76export async function complete(id: number) { 77 const db = useDb() 78 await db.update(job) 79 .set({ status: 'done', lockedBy: null, lockedUntil: null, lastError: null, updatedAt: new Date() }) 80 .where(sql`${job.id} = ${id}`) 81} 82 83/** 84 * Record a failure. Re-queues with exponential backoff until `maxAttempts`, 85 * after which the job is marked `failed` and stays put for inspection. 86 */ 87export async function fail(id: number, attempts: number, err: unknown, maxAttempts = 5) { 88 const db = useDb() 89 const message = err instanceof Error ? err.message : String(err) 90 91 if (attempts >= maxAttempts) { 92 await db.update(job) 93 .set({ status: 'failed', lastError: message, lockedBy: null, lockedUntil: null, updatedAt: new Date() }) 94 .where(sql`${job.id} = ${id}`) 95 return 96 } 97 98 // Exponential backoff with ±20% jitter: base*2^attempts, capped. 99 const base = 30_000 // 30s 100 const cap = 60 * 60_000 // 1h 101 const backoff = Math.min(base * (2 ** (attempts - 1)), cap) 102 const jitter = backoff * (0.8 + Math.random() * 0.4) 103 const runAfter = new Date(Date.now() + jitter) 104 105 await db.update(job) 106 .set({ 107 status: 'queued', 108 lockedBy: null, 109 lockedUntil: null, 110 runAfter, 111 lastError: message, 112 updatedAt: new Date(), 113 }) 114 .where(sql`${job.id} = ${id}`) 115}