mirror your GitHub repos to tangled.org automatically
1import type { OAuthSession } from '@atproto/oauth-client-node'
2import { and, eq, sql } from 'drizzle-orm'
3import { repoMapping, userIdentity } from '../db/schema'
4import { useOAuthClient } from './atproto-oauth'
5import { useDb } from './db'
6import { installationOctokit } from './github-app'
7import type { JobEnvelope } from './queue'
8import { enqueue } from './queue'
9import { type CreateRefPayload, type DeleteRefPayload, syncCreateRef, syncDeleteRef } from './sync-ref'
10import { syncPush, type PushPayload } from './sync-push'
11import { generateAndPublishKey, rotateKey } from './tangled-pubkey'
12import { enrollRepo, syncRepoMetadata } from './tangled-repo'
13
14/** Job kinds `dispatch` understands; anything else throws as a job failure. */
15const KNOWN_KINDS = new Set([
16 'github.push',
17 'github.create',
18 'github.delete',
19 'github.repository',
20 'github.installation_repositories',
21 'tangled.backfill-installation',
22 'tangled.create-repo',
23 'atproto.publish-pubkey',
24])
25
26const BACKFILL_PAGE_SIZE = 100
27
28interface PublishPubkeyPayload {
29 did: string
30 installationId: number
31 /**
32 * Dashboard "Rotate SSH key" sets this. Causes the handler to call
33 * `rotateKey()` (delete old PDS record + DB row, then re-publish) rather
34 * than the no-op-if-exists `generateAndPublishKey()` used at signup.
35 */
36 force?: boolean
37}
38
39interface CreateRepoPayload {
40 installationId: number
41 githubRepoId: number
42 /** Dashboard "Resync now" sets this; see `enrollRepo` for semantics. */
43 force?: boolean
44}
45
46interface InstallationRepositoriesPayload {
47 installationId: number
48 action: 'added' | 'removed'
49 addedRepoIds: number[]
50 removedRepoIds: number[]
51}
52
53interface BackfillInstallationPayload {
54 installationId: number
55 page: number
56}
57
58type RepositoryAction =
59 | 'created'
60 | 'edited'
61 | 'renamed'
62 | 'transferred'
63 | 'deleted'
64 | 'privatized'
65 | 'publicized'
66 | 'archived'
67 | 'unarchived'
68
69interface RepositoryPayload {
70 installationId: number
71 githubRepoId: number
72 action: RepositoryAction
73}
74
75function asObject(value: unknown): Record<string, unknown> {
76 if (value === null || typeof value !== 'object') {
77 throw new TypeError(`expected object payload, got ${typeof value}`)
78 }
79 return { ...value }
80}
81
82function publishPubkeyPayload(value: unknown): PublishPubkeyPayload {
83 const o = asObject(value)
84 if (typeof o.did !== 'string' || typeof o.installationId !== 'number') {
85 throw new TypeError('invalid atproto.publish-pubkey payload')
86 }
87 return { did: o.did, installationId: o.installationId, force: o.force === true }
88}
89
90function createRepoPayload(value: unknown): CreateRepoPayload {
91 const o = asObject(value)
92 if (typeof o.installationId !== 'number' || typeof o.githubRepoId !== 'number') {
93 throw new TypeError('invalid tangled.create-repo payload')
94 }
95 return {
96 installationId: o.installationId,
97 githubRepoId: o.githubRepoId,
98 force: o.force === true,
99 }
100}
101
102function backfillInstallationPayload(value: unknown): BackfillInstallationPayload {
103 const o = asObject(value)
104 if (typeof o.installationId !== 'number' || typeof o.page !== 'number') {
105 throw new TypeError('invalid tangled.backfill-installation payload')
106 }
107 return { installationId: o.installationId, page: o.page }
108}
109
110function refPayload(kind: 'create' | 'delete', value: unknown): CreateRefPayload {
111 const o = asObject(value)
112 if (
113 typeof o.installationId !== 'number'
114 || typeof o.githubRepoId !== 'number'
115 || (o.refType !== 'branch' && o.refType !== 'tag')
116 || typeof o.ref !== 'string'
117 ) {
118 throw new TypeError(`invalid github.${kind} payload`)
119 }
120 return {
121 installationId: o.installationId,
122 githubRepoId: o.githubRepoId,
123 refType: o.refType,
124 ref: o.ref,
125 }
126}
127
128const REPOSITORY_ACTIONS = new Set<string>([
129 'created',
130 'edited',
131 'renamed',
132 'transferred',
133 'deleted',
134 'privatized',
135 'publicized',
136 'archived',
137 'unarchived',
138])
139
140function isRepositoryAction(action: string): action is RepositoryAction {
141 return REPOSITORY_ACTIONS.has(action)
142}
143
144function repositoryPayload(value: unknown): RepositoryPayload {
145 const o = asObject(value)
146 if (
147 typeof o.installationId !== 'number'
148 || typeof o.githubRepoId !== 'number'
149 || typeof o.action !== 'string'
150 || !isRepositoryAction(o.action)
151 ) {
152 throw new TypeError('invalid github.repository payload')
153 }
154 return {
155 installationId: o.installationId,
156 githubRepoId: o.githubRepoId,
157 action: o.action,
158 }
159}
160
161function installationRepositoriesPayload(value: unknown): InstallationRepositoriesPayload {
162 const o = asObject(value)
163 if (
164 typeof o.installationId !== 'number'
165 || (o.action !== 'added' && o.action !== 'removed')
166 || !Array.isArray(o.addedRepoIds)
167 || !Array.isArray(o.removedRepoIds)
168 ) {
169 throw new TypeError('invalid github.installation_repositories payload')
170 }
171 return {
172 installationId: o.installationId,
173 action: o.action,
174 addedRepoIds: o.addedRepoIds.filter((id): id is number => typeof id === 'number'),
175 removedRepoIds: o.removedRepoIds.filter((id): id is number => typeof id === 'number'),
176 }
177}
178
179export async function dispatch(envelope: JobEnvelope): Promise<void> {
180 if (!KNOWN_KINDS.has(envelope.kind)) {
181 throw new Error(`unknown job kind: ${envelope.kind}`)
182 }
183
184 if (envelope.kind === 'github.push') {
185 await syncPush(envelope.payload as PushPayload)
186 return
187 }
188
189 if (envelope.kind === 'github.create') {
190 await syncCreateRef(refPayload('create', envelope.payload))
191 return
192 }
193
194 if (envelope.kind === 'github.delete') {
195 await syncDeleteRef(refPayload('delete', envelope.payload) as DeleteRefPayload)
196 return
197 }
198
199 if (envelope.kind === 'atproto.publish-pubkey') {
200 const { did, installationId, force } = publishPubkeyPayload(envelope.payload)
201 const client = await useOAuthClient()
202 const session = await client.restore(did)
203 if (force) await rotateKey({ oauthSession: session, installationId })
204 else await generateAndPublishKey({ oauthSession: session, installationId })
205 return
206 }
207
208 if (envelope.kind === 'tangled.create-repo') {
209 const { installationId, githubRepoId, force } = createRepoPayload(envelope.payload)
210 const session = await restoreSessionForInstallation(installationId)
211 if (!session) return
212 await enrollRepo({ oauthSession: session, installationId, githubRepoId, force })
213 return
214 }
215
216 if (envelope.kind === 'tangled.backfill-installation') {
217 const { installationId, page } = backfillInstallationPayload(envelope.payload)
218 const octokit = await installationOctokit(installationId)
219 const { data } = await octokit.request('GET /installation/repositories', {
220 per_page: BACKFILL_PAGE_SIZE,
221 page,
222 })
223
224 // Fan out one tangled.create-repo job per repo on this page.
225 for (const repo of data.repositories) {
226 // eslint-disable-next-line no-await-in-loop -- enqueue is sequential by design
227 await enqueue('tangled.create-repo', { installationId, githubRepoId: repo.id })
228 }
229
230 // If there are more pages, re-queue ourselves for the next one. This
231 // keeps each tick small and bounded; an install with thousands of repos
232 // walks through over many minutes rather than blocking one worker.
233 const seenSoFar = (page - 1) * BACKFILL_PAGE_SIZE + data.repositories.length
234 if (seenSoFar < data.total_count && data.repositories.length > 0) {
235 await enqueue('tangled.backfill-installation', { installationId, page: page + 1 })
236 }
237 return
238 }
239
240 if (envelope.kind === 'github.installation_repositories') {
241 const { installationId, action, addedRepoIds, removedRepoIds } = installationRepositoriesPayload(envelope.payload)
242
243 if (action === 'added') {
244 // Fan out one tangled.create-repo job per added repo. The fan-out keeps
245 // each unit small enough to fit comfortably in the per-job lease, lets
246 // failures retry independently, and runs the OAuth precondition check
247 // per repo (an install can outlive a tangled identity disconnection).
248 for (const id of addedRepoIds) {
249 // eslint-disable-next-line no-await-in-loop -- fan-out enqueue is sequential by design
250 await enqueue('tangled.create-repo', { installationId, githubRepoId: id })
251 }
252 return
253 }
254
255 // 'removed': the install no longer has access to these repos. We can't
256 // see them via the install token any more, so syncing has to stop. Leave
257 // the tangled mirror in place (PLAN.md: "don't delete user data on our
258 // say-so"). The user can manually re-add the repo on GitHub to re-enable.
259 if (action === 'removed' && removedRepoIds.length > 0) {
260 const db = useDb()
261 await db.update(repoMapping)
262 .set({ disabledAt: new Date(), updatedAt: new Date() })
263 .where(and(
264 eq(repoMapping.installationId, installationId),
265 sql`${repoMapping.githubRepoId} IN ${removedRepoIds}`,
266 ))
267 }
268 return
269 }
270
271 if (envelope.kind === 'github.repository') {
272 await handleRepositoryEvent(repositoryPayload(envelope.payload))
273 return
274 }
275}
276
277async function handleRepositoryEvent(payload: RepositoryPayload): Promise<void> {
278 const { installationId, githubRepoId, action } = payload
279 const db = useDb()
280
281 // Most lifecycle actions only need to touch the local mapping, no PDS work.
282 // `edited` and `publicized` (when we already have a mapping) go through the
283 // OAuth-authed metadata sync helper.
284 if (action === 'privatized' || action === 'transferred' || action === 'deleted') {
285 await db.update(repoMapping)
286 .set({ disabledAt: new Date(), updatedAt: new Date() })
287 .where(and(
288 eq(repoMapping.installationId, installationId),
289 eq(repoMapping.githubRepoId, githubRepoId),
290 ))
291 return
292 }
293
294 if (action === 'renamed') {
295 // Refetch the current full_name from GitHub; the webhook envelope is
296 // intentionally tiny. We do NOT rename on the tangled side — there's no
297 // procedure and a fresh-create-and-delete would lose stars/refs. Surface
298 // the change in `lastError` with an `info:` prefix so the dashboard can
299 // flag it without a schema change.
300 const rows = await db.select({ id: repoMapping.id, githubFullName: repoMapping.githubFullName })
301 .from(repoMapping)
302 .where(and(
303 eq(repoMapping.installationId, installationId),
304 eq(repoMapping.githubRepoId, githubRepoId),
305 ))
306 .limit(1)
307 if (rows.length === 0) return
308 const row = rows[0]!
309
310 const octokit = await installationOctokit(installationId)
311 const { data: repo } = await octokit.request('GET /repositories/{repository_id}', {
312 repository_id: githubRepoId,
313 })
314 if (repo.full_name === row.githubFullName) return
315
316 await db.update(repoMapping)
317 .set({
318 githubFullName: repo.full_name,
319 lastError: `info: renamed on github from ${row.githubFullName} to ${repo.full_name}; tangled mirror name unchanged`,
320 updatedAt: new Date(),
321 })
322 .where(eq(repoMapping.id, row.id))
323 return
324 }
325
326 if (action === 'publicized') {
327 const rows = await db.select().from(repoMapping).where(and(
328 eq(repoMapping.installationId, installationId),
329 eq(repoMapping.githubRepoId, githubRepoId),
330 )).limit(1)
331 const row = rows[0]
332
333 if (!row) {
334 // Repo flipped public without ever having been enrolled (the install
335 // was added while it was private). Kick off a fresh enrolment.
336 await enqueue('tangled.create-repo', { installationId, githubRepoId })
337 return
338 }
339
340 await db.update(repoMapping)
341 .set({ disabledAt: null, updatedAt: new Date() })
342 .where(eq(repoMapping.id, row.id))
343
344 if (!row.tangledRepoDid) {
345 await enqueue('tangled.create-repo', { installationId, githubRepoId })
346 return
347 }
348
349 // Already mirrored, just refresh metadata in case description/topics
350 // changed while it was private.
351 await runMetadataSync(installationId, githubRepoId)
352 return
353 }
354
355 if (action === 'edited') {
356 await runMetadataSync(installationId, githubRepoId)
357 return
358 }
359
360 // 'created', 'archived', 'unarchived' — nothing for us to do. Repo creation
361 // surfaces via `installation_repositories.added`; archive state isn't part
362 // of the mirror surface in v1.
363}
364
365async function runMetadataSync(installationId: number, githubRepoId: number): Promise<void> {
366 const session = await restoreSessionForInstallation(installationId)
367 if (!session) return
368 await syncRepoMetadata({ oauthSession: session, installationId, githubRepoId })
369}
370
371/**
372 * Restore the OAuth session for the user identity bound to `installationId`,
373 * or null if OAuth hasn't completed yet. The OAuth callback re-enqueues work
374 * for all accessible repos on completion, so a null here is a benign drop: a
375 * fresh trigger arrives once the identity exists.
376 */
377async function restoreSessionForInstallation(installationId: number): Promise<OAuthSession | null> {
378 const db = useDb()
379 const identity = await db.select({ did: userIdentity.did })
380 .from(userIdentity)
381 .where(sql`${userIdentity.installationId} = ${installationId}`)
382 if (identity.length === 0) return null
383
384 const client = await useOAuthClient()
385 return client.restore(identity[0]!.did)
386}