mirror your GitHub repos to tangled.org automatically
1import { sql } from 'drizzle-orm'
2import { userIdentity } from '../db/schema'
3import { useOAuthClient } from './atproto-oauth'
4import { useDb } from './db'
5import { installationOctokit } from './github-app'
6import type { JobEnvelope } from './queue'
7import { enqueue } from './queue'
8import { type CreateRefPayload, type DeleteRefPayload, syncCreateRef, syncDeleteRef } from './sync-ref'
9import { syncPush, type PushPayload } from './sync-push'
10import { generateAndPublishKey } from './tangled-pubkey'
11import { enrollRepo } from './tangled-repo'
12
13/**
14 * Map of job kind → handler. Each commit fills in its slice:
15 * - 'github.push' → commit 12 (sync push events)
16 * - 'github.create' / 'github.delete' → this commit (branch/tag ref ops)
17 * - 'github.repository' → commit 14/15 (description, lifecycle)
18 * - 'github.installation_repositories' → commit 10 (fan-out enrolment)
19 * - 'tangled.backfill-installation' → commit 10 (paginate + fan-out)
20 * - 'tangled.create-repo' → commit 10 (per-repo enrolment)
21 * - 'atproto.publish-pubkey' → commit 9
22 *
23 * Unknown kinds throw so they surface as job failures rather than silent
24 * acknowledgement.
25 */
26const KNOWN_KINDS = new Set([
27 'github.push',
28 'github.create',
29 'github.delete',
30 'github.repository',
31 'github.installation_repositories',
32 'tangled.backfill-installation',
33 'tangled.create-repo',
34 'atproto.publish-pubkey',
35])
36
37const BACKFILL_PAGE_SIZE = 100
38
39interface PublishPubkeyPayload {
40 did: string
41 installationId: number
42}
43
44interface CreateRepoPayload {
45 installationId: number
46 githubRepoId: number
47}
48
49interface InstallationRepositoriesPayload {
50 installationId: number
51 action: 'added' | 'removed'
52 addedRepoIds: number[]
53 removedRepoIds: number[]
54}
55
56interface BackfillInstallationPayload {
57 installationId: number
58 page: number
59}
60
61function asObject(value: unknown): Record<string, unknown> {
62 if (value === null || typeof value !== 'object') {
63 throw new TypeError(`expected object payload, got ${typeof value}`)
64 }
65 return { ...value }
66}
67
68function publishPubkeyPayload(value: unknown): PublishPubkeyPayload {
69 const o = asObject(value)
70 if (typeof o.did !== 'string' || typeof o.installationId !== 'number') {
71 throw new TypeError('invalid atproto.publish-pubkey payload')
72 }
73 return { did: o.did, installationId: o.installationId }
74}
75
76function createRepoPayload(value: unknown): CreateRepoPayload {
77 const o = asObject(value)
78 if (typeof o.installationId !== 'number' || typeof o.githubRepoId !== 'number') {
79 throw new TypeError('invalid tangled.create-repo payload')
80 }
81 return { installationId: o.installationId, githubRepoId: o.githubRepoId }
82}
83
84function backfillInstallationPayload(value: unknown): BackfillInstallationPayload {
85 const o = asObject(value)
86 if (typeof o.installationId !== 'number' || typeof o.page !== 'number') {
87 throw new TypeError('invalid tangled.backfill-installation payload')
88 }
89 return { installationId: o.installationId, page: o.page }
90}
91
92function refPayload(kind: 'create' | 'delete', value: unknown): CreateRefPayload {
93 const o = asObject(value)
94 if (
95 typeof o.installationId !== 'number'
96 || typeof o.githubRepoId !== 'number'
97 || (o.refType !== 'branch' && o.refType !== 'tag')
98 || typeof o.ref !== 'string'
99 ) {
100 throw new TypeError(`invalid github.${kind} payload`)
101 }
102 return {
103 installationId: o.installationId,
104 githubRepoId: o.githubRepoId,
105 refType: o.refType,
106 ref: o.ref,
107 }
108}
109
110function installationRepositoriesPayload(value: unknown): InstallationRepositoriesPayload {
111 const o = asObject(value)
112 if (
113 typeof o.installationId !== 'number'
114 || (o.action !== 'added' && o.action !== 'removed')
115 || !Array.isArray(o.addedRepoIds)
116 || !Array.isArray(o.removedRepoIds)
117 ) {
118 throw new TypeError('invalid github.installation_repositories payload')
119 }
120 return {
121 installationId: o.installationId,
122 action: o.action,
123 addedRepoIds: o.addedRepoIds.filter((id): id is number => typeof id === 'number'),
124 removedRepoIds: o.removedRepoIds.filter((id): id is number => typeof id === 'number'),
125 }
126}
127
128export async function dispatch(envelope: JobEnvelope): Promise<void> {
129 if (!KNOWN_KINDS.has(envelope.kind)) {
130 throw new Error(`unknown job kind: ${envelope.kind}`)
131 }
132
133 if (envelope.kind === 'github.push') {
134 await syncPush(envelope.payload as PushPayload)
135 return
136 }
137
138 if (envelope.kind === 'github.create') {
139 await syncCreateRef(refPayload('create', envelope.payload))
140 return
141 }
142
143 if (envelope.kind === 'github.delete') {
144 await syncDeleteRef(refPayload('delete', envelope.payload) as DeleteRefPayload)
145 return
146 }
147
148 if (envelope.kind === 'atproto.publish-pubkey') {
149 const { did, installationId } = publishPubkeyPayload(envelope.payload)
150 const client = await useOAuthClient()
151 const session = await client.restore(did)
152 await generateAndPublishKey({ oauthSession: session, installationId })
153 return
154 }
155
156 if (envelope.kind === 'tangled.create-repo') {
157 const { installationId, githubRepoId } = createRepoPayload(envelope.payload)
158
159 // Find the user identity bound to this install. If OAuth hasn't completed
160 // yet, drop this job silently \u2014 OAuth callback re-enqueues for all
161 // accessible repos at completion time, so we'll get a fresh trigger.
162 const db = useDb()
163 const identity = await db.select({ did: userIdentity.did })
164 .from(userIdentity)
165 .where(sql`${userIdentity.installationId} = ${installationId}`)
166 if (identity.length === 0) return
167
168 const client = await useOAuthClient()
169 const session = await client.restore(identity[0]!.did)
170 await enrollRepo({ oauthSession: session, installationId, githubRepoId })
171 return
172 }
173
174 if (envelope.kind === 'tangled.backfill-installation') {
175 const { installationId, page } = backfillInstallationPayload(envelope.payload)
176 const octokit = await installationOctokit(installationId)
177 const { data } = await octokit.request('GET /installation/repositories', {
178 per_page: BACKFILL_PAGE_SIZE,
179 page,
180 })
181
182 // Fan out one tangled.create-repo job per repo on this page.
183 for (const repo of data.repositories) {
184 // eslint-disable-next-line no-await-in-loop -- enqueue is sequential by design
185 await enqueue('tangled.create-repo', { installationId, githubRepoId: repo.id })
186 }
187
188 // If there are more pages, re-queue ourselves for the next one. This
189 // keeps each tick small and bounded; an install with thousands of repos
190 // walks through over many minutes rather than blocking one worker.
191 const seenSoFar = (page - 1) * BACKFILL_PAGE_SIZE + data.repositories.length
192 if (seenSoFar < data.total_count && data.repositories.length > 0) {
193 await enqueue('tangled.backfill-installation', { installationId, page: page + 1 })
194 }
195 return
196 }
197
198 if (envelope.kind === 'github.installation_repositories') {
199 const { installationId, action, addedRepoIds } = installationRepositoriesPayload(envelope.payload)
200 if (action !== 'added') return
201
202 // Fan out one tangled.create-repo job per added repo. The fan-out keeps
203 // each unit small enough to fit comfortably in the per-job lease, lets
204 // failures retry independently, and runs the OAuth precondition check
205 // per repo (an install can outlive a tangled identity disconnection).
206 for (const id of addedRepoIds) {
207 // eslint-disable-next-line no-await-in-loop -- fan-out enqueue is sequential by design
208 await enqueue('tangled.create-repo', { installationId, githubRepoId: id })
209 }
210 return
211 }
212
213 // Other kinds: still no-op until handlers land in their commits.
214}