mirror your GitHub repos to tangled.org automatically
1import { sql } from 'drizzle-orm'
2import { userIdentity } from '../db/schema'
3import { useOAuthClient } from './atproto-oauth'
4import { useDb } from './db'
5import { installationOctokit } from './github-app'
6import type { JobEnvelope } from './queue'
7import { enqueue } from './queue'
8import { generateAndPublishKey } from './tangled-pubkey'
9import { enrollRepo } from './tangled-repo'
10
11/**
12 * Map of job kind → handler. Each commit fills in its slice:
13 * - 'github.push' → commit 12 (sync push events)
14 * - 'github.create' / 'github.delete' → commit 13 (branch/tag ref ops)
15 * - 'github.repository' → commit 14/15 (description, lifecycle)
16 * - 'github.installation_repositories' → this commit (fan-out enrolment)
17 * - 'tangled.backfill-installation' → this commit (paginate + fan-out)
18 * - 'tangled.create-repo' → this commit (per-repo enrolment)
19 * - 'atproto.publish-pubkey' → commit 9
20 *
21 * Unknown kinds throw so they surface as job failures rather than silent
22 * acknowledgement.
23 */
24const KNOWN_KINDS = new Set([
25 'github.push',
26 'github.create',
27 'github.delete',
28 'github.repository',
29 'github.installation_repositories',
30 'tangled.backfill-installation',
31 'tangled.create-repo',
32 'atproto.publish-pubkey',
33])
34
35const BACKFILL_PAGE_SIZE = 100
36
37interface PublishPubkeyPayload {
38 did: string
39 installationId: number
40}
41
42interface CreateRepoPayload {
43 installationId: number
44 githubRepoId: number
45}
46
47interface InstallationRepositoriesPayload {
48 installationId: number
49 action: 'added' | 'removed'
50 addedRepoIds: number[]
51 removedRepoIds: number[]
52}
53
54interface BackfillInstallationPayload {
55 installationId: number
56 page: number
57}
58
59function asObject(value: unknown): Record<string, unknown> {
60 if (value === null || typeof value !== 'object') {
61 throw new TypeError(`expected object payload, got ${typeof value}`)
62 }
63 return { ...value }
64}
65
66function publishPubkeyPayload(value: unknown): PublishPubkeyPayload {
67 const o = asObject(value)
68 if (typeof o.did !== 'string' || typeof o.installationId !== 'number') {
69 throw new TypeError('invalid atproto.publish-pubkey payload')
70 }
71 return { did: o.did, installationId: o.installationId }
72}
73
74function createRepoPayload(value: unknown): CreateRepoPayload {
75 const o = asObject(value)
76 if (typeof o.installationId !== 'number' || typeof o.githubRepoId !== 'number') {
77 throw new TypeError('invalid tangled.create-repo payload')
78 }
79 return { installationId: o.installationId, githubRepoId: o.githubRepoId }
80}
81
82function backfillInstallationPayload(value: unknown): BackfillInstallationPayload {
83 const o = asObject(value)
84 if (typeof o.installationId !== 'number' || typeof o.page !== 'number') {
85 throw new TypeError('invalid tangled.backfill-installation payload')
86 }
87 return { installationId: o.installationId, page: o.page }
88}
89
90function installationRepositoriesPayload(value: unknown): InstallationRepositoriesPayload {
91 const o = asObject(value)
92 if (
93 typeof o.installationId !== 'number'
94 || (o.action !== 'added' && o.action !== 'removed')
95 || !Array.isArray(o.addedRepoIds)
96 || !Array.isArray(o.removedRepoIds)
97 ) {
98 throw new TypeError('invalid github.installation_repositories payload')
99 }
100 return {
101 installationId: o.installationId,
102 action: o.action,
103 addedRepoIds: o.addedRepoIds.filter((id): id is number => typeof id === 'number'),
104 removedRepoIds: o.removedRepoIds.filter((id): id is number => typeof id === 'number'),
105 }
106}
107
108export async function dispatch(envelope: JobEnvelope): Promise<void> {
109 if (!KNOWN_KINDS.has(envelope.kind)) {
110 throw new Error(`unknown job kind: ${envelope.kind}`)
111 }
112
113 if (envelope.kind === 'atproto.publish-pubkey') {
114 const { did, installationId } = publishPubkeyPayload(envelope.payload)
115 const client = await useOAuthClient()
116 const session = await client.restore(did)
117 await generateAndPublishKey({ oauthSession: session, installationId })
118 return
119 }
120
121 if (envelope.kind === 'tangled.create-repo') {
122 const { installationId, githubRepoId } = createRepoPayload(envelope.payload)
123
124 // Find the user identity bound to this install. If OAuth hasn't completed
125 // yet, drop this job silently \u2014 OAuth callback re-enqueues for all
126 // accessible repos at completion time, so we'll get a fresh trigger.
127 const db = useDb()
128 const identity = await db.select({ did: userIdentity.did })
129 .from(userIdentity)
130 .where(sql`${userIdentity.installationId} = ${installationId}`)
131 if (identity.length === 0) return
132
133 const client = await useOAuthClient()
134 const session = await client.restore(identity[0]!.did)
135 await enrollRepo({ oauthSession: session, installationId, githubRepoId })
136 return
137 }
138
139 if (envelope.kind === 'tangled.backfill-installation') {
140 const { installationId, page } = backfillInstallationPayload(envelope.payload)
141 const octokit = await installationOctokit(installationId)
142 const { data } = await octokit.request('GET /installation/repositories', {
143 per_page: BACKFILL_PAGE_SIZE,
144 page,
145 })
146
147 // Fan out one tangled.create-repo job per repo on this page.
148 for (const repo of data.repositories) {
149 // eslint-disable-next-line no-await-in-loop -- enqueue is sequential by design
150 await enqueue('tangled.create-repo', { installationId, githubRepoId: repo.id })
151 }
152
153 // If there are more pages, re-queue ourselves for the next one. This
154 // keeps each tick small and bounded; an install with thousands of repos
155 // walks through over many minutes rather than blocking one worker.
156 const seenSoFar = (page - 1) * BACKFILL_PAGE_SIZE + data.repositories.length
157 if (seenSoFar < data.total_count && data.repositories.length > 0) {
158 await enqueue('tangled.backfill-installation', { installationId, page: page + 1 })
159 }
160 return
161 }
162
163 if (envelope.kind === 'github.installation_repositories') {
164 const { installationId, action, addedRepoIds } = installationRepositoriesPayload(envelope.payload)
165 if (action !== 'added') return
166
167 // Fan out one tangled.create-repo job per added repo. The fan-out keeps
168 // each unit small enough to fit comfortably in the per-job lease, lets
169 // failures retry independently, and runs the OAuth precondition check
170 // per repo (an install can outlive a tangled identity disconnection).
171 for (const id of addedRepoIds) {
172 // eslint-disable-next-line no-await-in-loop -- fan-out enqueue is sequential by design
173 await enqueue('tangled.create-repo', { installationId, githubRepoId: id })
174 }
175 return
176 }
177
178 // Other kinds: still no-op until handlers land in their commits.
179}