mirror your GitHub repos to tangled.org automatically
1import { and, eq, sql } from 'drizzle-orm'
2import { repoMapping, userIdentity } from '../db/schema'
3import { useOAuthClient } from './atproto-oauth'
4import { useDb } from './db'
5import { installationOctokit } from './github-app'
6import type { JobEnvelope } from './queue'
7import { enqueue } from './queue'
8import { type CreateRefPayload, type DeleteRefPayload, syncCreateRef, syncDeleteRef } from './sync-ref'
9import { syncPush, type PushPayload } from './sync-push'
10import { generateAndPublishKey, rotateKey } from './tangled-pubkey'
11import { enrollRepo, syncRepoMetadata } from './tangled-repo'
12
13/**
14 * Map of job kind → handler. Each commit fills in its slice:
15 * - 'github.push' → commit 12 (sync push events)
16 * - 'github.create' / 'github.delete' → this commit (branch/tag ref ops)
17 * - 'github.repository' → metadata sync + lifecycle (edited,
18 * renamed, privatized, publicized,
19 * transferred, deleted)
20 * - 'github.installation_repositories' → commit 10 (fan-out enrolment)
21 * - 'tangled.backfill-installation' → commit 10 (paginate + fan-out)
22 * - 'tangled.create-repo' → commit 10 (per-repo enrolment)
23 * - 'atproto.publish-pubkey' → commit 9
24 *
25 * Unknown kinds throw so they surface as job failures rather than silent
26 * acknowledgement.
27 */
28const KNOWN_KINDS = new Set([
29 'github.push',
30 'github.create',
31 'github.delete',
32 'github.repository',
33 'github.installation_repositories',
34 'tangled.backfill-installation',
35 'tangled.create-repo',
36 'atproto.publish-pubkey',
37])
38
39const BACKFILL_PAGE_SIZE = 100
40
41interface PublishPubkeyPayload {
42 did: string
43 installationId: number
44 /**
45 * Dashboard "Rotate SSH key" sets this. Causes the handler to call
46 * `rotateKey()` (delete old PDS record + DB row, then re-publish) rather
47 * than the no-op-if-exists `generateAndPublishKey()` used at signup.
48 */
49 force?: boolean
50}
51
52interface CreateRepoPayload {
53 installationId: number
54 githubRepoId: number
55 /** Dashboard "Resync now" sets this; see `enrollRepo` for semantics. */
56 force?: boolean
57}
58
59interface InstallationRepositoriesPayload {
60 installationId: number
61 action: 'added' | 'removed'
62 addedRepoIds: number[]
63 removedRepoIds: number[]
64}
65
66interface BackfillInstallationPayload {
67 installationId: number
68 page: number
69}
70
71type RepositoryAction =
72 | 'created'
73 | 'edited'
74 | 'renamed'
75 | 'transferred'
76 | 'deleted'
77 | 'privatized'
78 | 'publicized'
79 | 'archived'
80 | 'unarchived'
81
82interface RepositoryPayload {
83 installationId: number
84 githubRepoId: number
85 action: RepositoryAction
86}
87
88function asObject(value: unknown): Record<string, unknown> {
89 if (value === null || typeof value !== 'object') {
90 throw new TypeError(`expected object payload, got ${typeof value}`)
91 }
92 return { ...value }
93}
94
95function publishPubkeyPayload(value: unknown): PublishPubkeyPayload {
96 const o = asObject(value)
97 if (typeof o.did !== 'string' || typeof o.installationId !== 'number') {
98 throw new TypeError('invalid atproto.publish-pubkey payload')
99 }
100 return { did: o.did, installationId: o.installationId, force: o.force === true }
101}
102
103function createRepoPayload(value: unknown): CreateRepoPayload {
104 const o = asObject(value)
105 if (typeof o.installationId !== 'number' || typeof o.githubRepoId !== 'number') {
106 throw new TypeError('invalid tangled.create-repo payload')
107 }
108 return {
109 installationId: o.installationId,
110 githubRepoId: o.githubRepoId,
111 force: o.force === true,
112 }
113}
114
115function backfillInstallationPayload(value: unknown): BackfillInstallationPayload {
116 const o = asObject(value)
117 if (typeof o.installationId !== 'number' || typeof o.page !== 'number') {
118 throw new TypeError('invalid tangled.backfill-installation payload')
119 }
120 return { installationId: o.installationId, page: o.page }
121}
122
123function refPayload(kind: 'create' | 'delete', value: unknown): CreateRefPayload {
124 const o = asObject(value)
125 if (
126 typeof o.installationId !== 'number'
127 || typeof o.githubRepoId !== 'number'
128 || (o.refType !== 'branch' && o.refType !== 'tag')
129 || typeof o.ref !== 'string'
130 ) {
131 throw new TypeError(`invalid github.${kind} payload`)
132 }
133 return {
134 installationId: o.installationId,
135 githubRepoId: o.githubRepoId,
136 refType: o.refType,
137 ref: o.ref,
138 }
139}
140
141const REPOSITORY_ACTIONS = new Set<string>([
142 'created',
143 'edited',
144 'renamed',
145 'transferred',
146 'deleted',
147 'privatized',
148 'publicized',
149 'archived',
150 'unarchived',
151])
152
153function isRepositoryAction(action: string): action is RepositoryAction {
154 return REPOSITORY_ACTIONS.has(action)
155}
156
157function repositoryPayload(value: unknown): RepositoryPayload {
158 const o = asObject(value)
159 if (
160 typeof o.installationId !== 'number'
161 || typeof o.githubRepoId !== 'number'
162 || typeof o.action !== 'string'
163 || !isRepositoryAction(o.action)
164 ) {
165 throw new TypeError('invalid github.repository payload')
166 }
167 return {
168 installationId: o.installationId,
169 githubRepoId: o.githubRepoId,
170 action: o.action,
171 }
172}
173
174function installationRepositoriesPayload(value: unknown): InstallationRepositoriesPayload {
175 const o = asObject(value)
176 if (
177 typeof o.installationId !== 'number'
178 || (o.action !== 'added' && o.action !== 'removed')
179 || !Array.isArray(o.addedRepoIds)
180 || !Array.isArray(o.removedRepoIds)
181 ) {
182 throw new TypeError('invalid github.installation_repositories payload')
183 }
184 return {
185 installationId: o.installationId,
186 action: o.action,
187 addedRepoIds: o.addedRepoIds.filter((id): id is number => typeof id === 'number'),
188 removedRepoIds: o.removedRepoIds.filter((id): id is number => typeof id === 'number'),
189 }
190}
191
192export async function dispatch(envelope: JobEnvelope): Promise<void> {
193 if (!KNOWN_KINDS.has(envelope.kind)) {
194 throw new Error(`unknown job kind: ${envelope.kind}`)
195 }
196
197 if (envelope.kind === 'github.push') {
198 await syncPush(envelope.payload as PushPayload)
199 return
200 }
201
202 if (envelope.kind === 'github.create') {
203 await syncCreateRef(refPayload('create', envelope.payload))
204 return
205 }
206
207 if (envelope.kind === 'github.delete') {
208 await syncDeleteRef(refPayload('delete', envelope.payload) as DeleteRefPayload)
209 return
210 }
211
212 if (envelope.kind === 'atproto.publish-pubkey') {
213 const { did, installationId, force } = publishPubkeyPayload(envelope.payload)
214 const client = await useOAuthClient()
215 const session = await client.restore(did)
216 if (force) await rotateKey({ oauthSession: session, installationId })
217 else await generateAndPublishKey({ oauthSession: session, installationId })
218 return
219 }
220
221 if (envelope.kind === 'tangled.create-repo') {
222 const { installationId, githubRepoId, force } = createRepoPayload(envelope.payload)
223
224 // Find the user identity bound to this install. If OAuth hasn't completed
225 // yet, drop this job silently \u2014 OAuth callback re-enqueues for all
226 // accessible repos at completion time, so we'll get a fresh trigger.
227 const db = useDb()
228 const identity = await db.select({ did: userIdentity.did })
229 .from(userIdentity)
230 .where(sql`${userIdentity.installationId} = ${installationId}`)
231 if (identity.length === 0) return
232
233 const client = await useOAuthClient()
234 const session = await client.restore(identity[0]!.did)
235 await enrollRepo({ oauthSession: session, installationId, githubRepoId, force })
236 return
237 }
238
239 if (envelope.kind === 'tangled.backfill-installation') {
240 const { installationId, page } = backfillInstallationPayload(envelope.payload)
241 const octokit = await installationOctokit(installationId)
242 const { data } = await octokit.request('GET /installation/repositories', {
243 per_page: BACKFILL_PAGE_SIZE,
244 page,
245 })
246
247 // Fan out one tangled.create-repo job per repo on this page.
248 for (const repo of data.repositories) {
249 // eslint-disable-next-line no-await-in-loop -- enqueue is sequential by design
250 await enqueue('tangled.create-repo', { installationId, githubRepoId: repo.id })
251 }
252
253 // If there are more pages, re-queue ourselves for the next one. This
254 // keeps each tick small and bounded; an install with thousands of repos
255 // walks through over many minutes rather than blocking one worker.
256 const seenSoFar = (page - 1) * BACKFILL_PAGE_SIZE + data.repositories.length
257 if (seenSoFar < data.total_count && data.repositories.length > 0) {
258 await enqueue('tangled.backfill-installation', { installationId, page: page + 1 })
259 }
260 return
261 }
262
263 if (envelope.kind === 'github.installation_repositories') {
264 const { installationId, action, addedRepoIds, removedRepoIds } = installationRepositoriesPayload(envelope.payload)
265
266 if (action === 'added') {
267 // Fan out one tangled.create-repo job per added repo. The fan-out keeps
268 // each unit small enough to fit comfortably in the per-job lease, lets
269 // failures retry independently, and runs the OAuth precondition check
270 // per repo (an install can outlive a tangled identity disconnection).
271 for (const id of addedRepoIds) {
272 // eslint-disable-next-line no-await-in-loop -- fan-out enqueue is sequential by design
273 await enqueue('tangled.create-repo', { installationId, githubRepoId: id })
274 }
275 return
276 }
277
278 // 'removed': the install no longer has access to these repos. We can't
279 // see them via the install token any more, so syncing has to stop. Leave
280 // the tangled mirror in place (PLAN.md: "don't delete user data on our
281 // say-so"). The user can manually re-add the repo on GitHub to re-enable.
282 if (action === 'removed' && removedRepoIds.length > 0) {
283 const db = useDb()
284 await db.update(repoMapping)
285 .set({ disabledAt: new Date(), updatedAt: new Date() })
286 .where(and(
287 eq(repoMapping.installationId, installationId),
288 sql`${repoMapping.githubRepoId} IN ${removedRepoIds}`,
289 ))
290 }
291 return
292 }
293
294 if (envelope.kind === 'github.repository') {
295 await handleRepositoryEvent(repositoryPayload(envelope.payload))
296 return
297 }
298
299 // Other kinds: still no-op until handlers land in their commits.
300}
301
302async function handleRepositoryEvent(payload: RepositoryPayload): Promise<void> {
303 const { installationId, githubRepoId, action } = payload
304 const db = useDb()
305
306 // Most lifecycle actions only need to touch the local mapping, no PDS work.
307 // `edited` and `publicized` (when we already have a mapping) go through the
308 // OAuth-authed metadata sync helper.
309 if (action === 'privatized' || action === 'transferred' || action === 'deleted') {
310 await db.update(repoMapping)
311 .set({ disabledAt: new Date(), updatedAt: new Date() })
312 .where(and(
313 eq(repoMapping.installationId, installationId),
314 eq(repoMapping.githubRepoId, githubRepoId),
315 ))
316 return
317 }
318
319 if (action === 'renamed') {
320 // Refetch the current full_name from GitHub; the webhook envelope is
321 // intentionally tiny. We do NOT rename on the tangled side — there's no
322 // procedure and a fresh-create-and-delete would lose stars/refs. Surface
323 // the change in `lastError` with an `info:` prefix so the dashboard can
324 // flag it without a schema change.
325 const rows = await db.select({ id: repoMapping.id, githubFullName: repoMapping.githubFullName })
326 .from(repoMapping)
327 .where(and(
328 eq(repoMapping.installationId, installationId),
329 eq(repoMapping.githubRepoId, githubRepoId),
330 ))
331 .limit(1)
332 if (rows.length === 0) return
333 const row = rows[0]!
334
335 const octokit = await installationOctokit(installationId)
336 const { data: repo } = await octokit.request('GET /repositories/{repository_id}', {
337 repository_id: githubRepoId,
338 })
339 if (repo.full_name === row.githubFullName) return
340
341 await db.update(repoMapping)
342 .set({
343 githubFullName: repo.full_name,
344 lastError: `info: renamed on github from ${row.githubFullName} to ${repo.full_name}; tangled mirror name unchanged`,
345 updatedAt: new Date(),
346 })
347 .where(eq(repoMapping.id, row.id))
348 return
349 }
350
351 if (action === 'publicized') {
352 const rows = await db.select().from(repoMapping).where(and(
353 eq(repoMapping.installationId, installationId),
354 eq(repoMapping.githubRepoId, githubRepoId),
355 )).limit(1)
356 const row = rows[0]
357
358 if (!row) {
359 // Repo flipped public without ever having been enrolled (the install
360 // was added while it was private). Kick off a fresh enrolment.
361 await enqueue('tangled.create-repo', { installationId, githubRepoId })
362 return
363 }
364
365 await db.update(repoMapping)
366 .set({ disabledAt: null, updatedAt: new Date() })
367 .where(eq(repoMapping.id, row.id))
368
369 if (!row.tangledRepoDid) {
370 await enqueue('tangled.create-repo', { installationId, githubRepoId })
371 return
372 }
373
374 // Already mirrored, just refresh metadata in case description/topics
375 // changed while it was private.
376 await runMetadataSync(installationId, githubRepoId)
377 return
378 }
379
380 if (action === 'edited') {
381 await runMetadataSync(installationId, githubRepoId)
382 return
383 }
384
385 // 'created', 'archived', 'unarchived' — nothing for us to do. Repo creation
386 // surfaces via `installation_repositories.added`; archive state isn't part
387 // of the mirror surface in v1.
388}
389
390async function runMetadataSync(installationId: number, githubRepoId: number): Promise<void> {
391 const db = useDb()
392 const identity = await db.select({ did: userIdentity.did })
393 .from(userIdentity)
394 .where(sql`${userIdentity.installationId} = ${installationId}`)
395 // No tangled identity yet — OAuth callback will backfill on completion.
396 if (identity.length === 0) return
397
398 const client = await useOAuthClient()
399 const session = await client.restore(identity[0]!.did)
400 await syncRepoMetadata({ oauthSession: session, installationId, githubRepoId })
401}