mirror your GitHub repos to tangled.org automatically
1import { sql } from 'drizzle-orm'
2import { afterEach, beforeEach, describe, expect, it } from 'vitest'
3import { job } from '../../server/db/schema'
4import { clearDb, setDb, useDb } from '../../server/utils/db'
5import { claim, complete, enqueue, fail } from '../../server/utils/queue'
6import { createTestDb } from '../utils/db'
7
8describe('queue', () => {
9 beforeEach(async () => {
10 setDb(await createTestDb())
11 })
12
13 afterEach(() => {
14 clearDb()
15 })
16
17 it('enqueues and claims a job', async () => {
18 const queued = await enqueue('github.push', { foo: 'bar' })
19 expect(queued.id).toBeGreaterThan(0)
20
21 const claimed = await claim('worker-1', 60_000)
22 expect(claimed?.kind).toBe('github.push')
23 expect(claimed?.payload).toEqual({ foo: 'bar' })
24 expect(claimed?.attempts).toBe(1)
25 })
26
27 it('returns null when the queue is empty', async () => {
28 const claimed = await claim('worker-1', 60_000)
29 expect(claimed).toBeNull()
30 })
31
32 it('does not claim the same job twice', async () => {
33 await enqueue('github.push', { foo: 'bar' })
34
35 const a = await claim('worker-1', 60_000)
36 const b = await claim('worker-2', 60_000)
37
38 expect(a).not.toBeNull()
39 expect(b).toBeNull()
40 })
41
42 it('respects run_after', async () => {
43 const future = new Date(Date.now() + 60_000)
44 await enqueue('github.push', {}, { runAfter: future })
45
46 const claimed = await claim('worker-1', 60_000)
47 expect(claimed).toBeNull()
48 })
49
50 it('reclaims a job whose lease has expired', async () => {
51 await enqueue('github.push', {})
52
53 const first = await claim('worker-1', 60_000)
54 expect(first).not.toBeNull()
55
56 // Force the lease into the past.
57 const db = useDb()
58 await db.execute(sql`UPDATE ${job} SET locked_until = now() - interval '1 minute'`)
59
60 const second = await claim('worker-2', 60_000)
61 expect(second?.id).toBe(first!.id)
62 expect(second?.attempts).toBe(2) // attempts increments on each claim
63 })
64
65 it('marks a job as done on complete', async () => {
66 const queued = await enqueue('github.push', {})
67 const claimed = await claim('worker-1', 60_000)
68 await complete(claimed!.id)
69
70 const db = useDb()
71 const rows = await db.select().from(job).where(sql`${job.id} = ${queued.id}`)
72 expect(rows[0]?.status).toBe('done')
73 expect(rows[0]?.lockedBy).toBeNull()
74 })
75
76 it('re-queues with backoff on fail before maxAttempts', async () => {
77 await enqueue('github.push', {})
78 const claimed = await claim('worker-1', 60_000)
79 await fail(claimed!.id, claimed!.attempts, new Error('boom'))
80
81 const db = useDb()
82 const rows = await db.select().from(job).where(sql`${job.id} = ${claimed!.id}`)
83 expect(rows[0]?.status).toBe('queued')
84 expect(rows[0]?.lastError).toBe('boom')
85 expect(new Date(rows[0]?.runAfter ?? 0).getTime()).toBeGreaterThan(Date.now())
86 })
87
88 it('marks failed once attempts >= maxAttempts', async () => {
89 await enqueue('github.push', {})
90 const claimed = await claim('worker-1', 60_000)
91 await fail(claimed!.id, 5, new Error('terminal'), 5)
92
93 const db = useDb()
94 const rows = await db.select().from(job).where(sql`${job.id} = ${claimed!.id}`)
95 expect(rows[0]?.status).toBe('failed')
96 expect(rows[0]?.lastError).toBe('terminal')
97 })
98})