mirror your GitHub repos to tangled.org automatically
1import { sql } from 'drizzle-orm'
2import { userIdentity } from '../db/schema'
3import { useOAuthClient } from './atproto-oauth'
4import { useDb } from './db'
5import { installationOctokit } from './github-app'
6import type { JobEnvelope } from './queue'
7import { enqueue } from './queue'
8import { syncPush, type PushPayload } from './sync-push'
9import { generateAndPublishKey } from './tangled-pubkey'
10import { enrollRepo } from './tangled-repo'
11
12/**
13 * Map of job kind → handler. Each commit fills in its slice:
14 * - 'github.push' → this commit (sync push events)
15 * - 'github.create' / 'github.delete' → commit 13 (branch/tag ref ops)
16 * - 'github.repository' → commit 14/15 (description, lifecycle)
17 * - 'github.installation_repositories' → commit 10 (fan-out enrolment)
18 * - 'tangled.backfill-installation' → commit 10 (paginate + fan-out)
19 * - 'tangled.create-repo' → commit 10 (per-repo enrolment)
20 * - 'atproto.publish-pubkey' → commit 9
21 *
22 * Unknown kinds throw so they surface as job failures rather than silent
23 * acknowledgement.
24 */
25const KNOWN_KINDS = new Set([
26 'github.push',
27 'github.create',
28 'github.delete',
29 'github.repository',
30 'github.installation_repositories',
31 'tangled.backfill-installation',
32 'tangled.create-repo',
33 'atproto.publish-pubkey',
34])
35
36const BACKFILL_PAGE_SIZE = 100
37
38interface PublishPubkeyPayload {
39 did: string
40 installationId: number
41}
42
43interface CreateRepoPayload {
44 installationId: number
45 githubRepoId: number
46}
47
48interface InstallationRepositoriesPayload {
49 installationId: number
50 action: 'added' | 'removed'
51 addedRepoIds: number[]
52 removedRepoIds: number[]
53}
54
55interface BackfillInstallationPayload {
56 installationId: number
57 page: number
58}
59
60function asObject(value: unknown): Record<string, unknown> {
61 if (value === null || typeof value !== 'object') {
62 throw new TypeError(`expected object payload, got ${typeof value}`)
63 }
64 return { ...value }
65}
66
67function publishPubkeyPayload(value: unknown): PublishPubkeyPayload {
68 const o = asObject(value)
69 if (typeof o.did !== 'string' || typeof o.installationId !== 'number') {
70 throw new TypeError('invalid atproto.publish-pubkey payload')
71 }
72 return { did: o.did, installationId: o.installationId }
73}
74
75function createRepoPayload(value: unknown): CreateRepoPayload {
76 const o = asObject(value)
77 if (typeof o.installationId !== 'number' || typeof o.githubRepoId !== 'number') {
78 throw new TypeError('invalid tangled.create-repo payload')
79 }
80 return { installationId: o.installationId, githubRepoId: o.githubRepoId }
81}
82
83function backfillInstallationPayload(value: unknown): BackfillInstallationPayload {
84 const o = asObject(value)
85 if (typeof o.installationId !== 'number' || typeof o.page !== 'number') {
86 throw new TypeError('invalid tangled.backfill-installation payload')
87 }
88 return { installationId: o.installationId, page: o.page }
89}
90
91function installationRepositoriesPayload(value: unknown): InstallationRepositoriesPayload {
92 const o = asObject(value)
93 if (
94 typeof o.installationId !== 'number'
95 || (o.action !== 'added' && o.action !== 'removed')
96 || !Array.isArray(o.addedRepoIds)
97 || !Array.isArray(o.removedRepoIds)
98 ) {
99 throw new TypeError('invalid github.installation_repositories payload')
100 }
101 return {
102 installationId: o.installationId,
103 action: o.action,
104 addedRepoIds: o.addedRepoIds.filter((id): id is number => typeof id === 'number'),
105 removedRepoIds: o.removedRepoIds.filter((id): id is number => typeof id === 'number'),
106 }
107}
108
109export async function dispatch(envelope: JobEnvelope): Promise<void> {
110 if (!KNOWN_KINDS.has(envelope.kind)) {
111 throw new Error(`unknown job kind: ${envelope.kind}`)
112 }
113
114 if (envelope.kind === 'github.push') {
115 await syncPush(envelope.payload as PushPayload)
116 return
117 }
118
119 if (envelope.kind === 'atproto.publish-pubkey') {
120 const { did, installationId } = publishPubkeyPayload(envelope.payload)
121 const client = await useOAuthClient()
122 const session = await client.restore(did)
123 await generateAndPublishKey({ oauthSession: session, installationId })
124 return
125 }
126
127 if (envelope.kind === 'tangled.create-repo') {
128 const { installationId, githubRepoId } = createRepoPayload(envelope.payload)
129
130 // Find the user identity bound to this install. If OAuth hasn't completed
131 // yet, drop this job silently \u2014 OAuth callback re-enqueues for all
132 // accessible repos at completion time, so we'll get a fresh trigger.
133 const db = useDb()
134 const identity = await db.select({ did: userIdentity.did })
135 .from(userIdentity)
136 .where(sql`${userIdentity.installationId} = ${installationId}`)
137 if (identity.length === 0) return
138
139 const client = await useOAuthClient()
140 const session = await client.restore(identity[0]!.did)
141 await enrollRepo({ oauthSession: session, installationId, githubRepoId })
142 return
143 }
144
145 if (envelope.kind === 'tangled.backfill-installation') {
146 const { installationId, page } = backfillInstallationPayload(envelope.payload)
147 const octokit = await installationOctokit(installationId)
148 const { data } = await octokit.request('GET /installation/repositories', {
149 per_page: BACKFILL_PAGE_SIZE,
150 page,
151 })
152
153 // Fan out one tangled.create-repo job per repo on this page.
154 for (const repo of data.repositories) {
155 // eslint-disable-next-line no-await-in-loop -- enqueue is sequential by design
156 await enqueue('tangled.create-repo', { installationId, githubRepoId: repo.id })
157 }
158
159 // If there are more pages, re-queue ourselves for the next one. This
160 // keeps each tick small and bounded; an install with thousands of repos
161 // walks through over many minutes rather than blocking one worker.
162 const seenSoFar = (page - 1) * BACKFILL_PAGE_SIZE + data.repositories.length
163 if (seenSoFar < data.total_count && data.repositories.length > 0) {
164 await enqueue('tangled.backfill-installation', { installationId, page: page + 1 })
165 }
166 return
167 }
168
169 if (envelope.kind === 'github.installation_repositories') {
170 const { installationId, action, addedRepoIds } = installationRepositoriesPayload(envelope.payload)
171 if (action !== 'added') return
172
173 // Fan out one tangled.create-repo job per added repo. The fan-out keeps
174 // each unit small enough to fit comfortably in the per-job lease, lets
175 // failures retry independently, and runs the OAuth precondition check
176 // per repo (an install can outlive a tangled identity disconnection).
177 for (const id of addedRepoIds) {
178 // eslint-disable-next-line no-await-in-loop -- fan-out enqueue is sequential by design
179 await enqueue('tangled.create-repo', { installationId, githubRepoId: id })
180 }
181 return
182 }
183
184 // Other kinds: still no-op until handlers land in their commits.
185}