mirror your GitHub repos to tangled.org automatically
1

Configure Feed

Select the types of activity you want to include in your feed.

1import type { OAuthSession } from '@atproto/oauth-client-node' 2import { and, eq, sql } from 'drizzle-orm' 3import { repoMapping, userIdentity } from '../db/schema' 4import { useOAuthClient } from './atproto-oauth' 5import { useDb } from './db' 6import { installationOctokit } from './github-app' 7import type { JobEnvelope } from './queue' 8import { enqueue } from './queue' 9import { type CreateRefPayload, type DeleteRefPayload, syncCreateRef, syncDeleteRef } from './sync-ref' 10import { syncPush, type PushPayload } from './sync-push' 11import { generateAndPublishKey, rotateKey } from './tangled-pubkey' 12import { enrollRepo, syncRepoMetadata } from './tangled-repo' 13 14/** Job kinds `dispatch` understands; anything else throws as a job failure. */ 15const KNOWN_KINDS = new Set([ 16 'github.push', 17 'github.create', 18 'github.delete', 19 'github.repository', 20 'github.installation_repositories', 21 'tangled.backfill-installation', 22 'tangled.create-repo', 23 'atproto.publish-pubkey', 24]) 25 26const BACKFILL_PAGE_SIZE = 100 27 28interface PublishPubkeyPayload { 29 did: string 30 installationId: number 31 /** 32 * Dashboard "Rotate SSH key" sets this. Causes the handler to call 33 * `rotateKey()` (delete old PDS record + DB row, then re-publish) rather 34 * than the no-op-if-exists `generateAndPublishKey()` used at signup. 35 */ 36 force?: boolean 37} 38 39interface CreateRepoPayload { 40 installationId: number 41 githubRepoId: number 42 /** Dashboard "Resync now" sets this; see `enrollRepo` for semantics. */ 43 force?: boolean 44} 45 46interface InstallationRepositoriesPayload { 47 installationId: number 48 action: 'added' | 'removed' 49 addedRepoIds: number[] 50 removedRepoIds: number[] 51} 52 53interface BackfillInstallationPayload { 54 installationId: number 55 page: number 56} 57 58type RepositoryAction = 59 | 'created' 60 | 'edited' 61 | 'renamed' 62 | 'transferred' 63 | 'deleted' 64 | 'privatized' 65 | 'publicized' 66 | 'archived' 67 | 'unarchived' 68 69interface RepositoryPayload { 70 installationId: number 71 githubRepoId: number 72 action: RepositoryAction 73} 74 75function asObject(value: unknown): Record<string, unknown> { 76 if (value === null || typeof value !== 'object') { 77 throw new TypeError(`expected object payload, got ${typeof value}`) 78 } 79 return { ...value } 80} 81 82function publishPubkeyPayload(value: unknown): PublishPubkeyPayload { 83 const o = asObject(value) 84 if (typeof o.did !== 'string' || typeof o.installationId !== 'number') { 85 throw new TypeError('invalid atproto.publish-pubkey payload') 86 } 87 return { did: o.did, installationId: o.installationId, force: o.force === true } 88} 89 90function createRepoPayload(value: unknown): CreateRepoPayload { 91 const o = asObject(value) 92 if (typeof o.installationId !== 'number' || typeof o.githubRepoId !== 'number') { 93 throw new TypeError('invalid tangled.create-repo payload') 94 } 95 return { 96 installationId: o.installationId, 97 githubRepoId: o.githubRepoId, 98 force: o.force === true, 99 } 100} 101 102function backfillInstallationPayload(value: unknown): BackfillInstallationPayload { 103 const o = asObject(value) 104 if (typeof o.installationId !== 'number' || typeof o.page !== 'number') { 105 throw new TypeError('invalid tangled.backfill-installation payload') 106 } 107 return { installationId: o.installationId, page: o.page } 108} 109 110function refPayload(kind: 'create' | 'delete', value: unknown): CreateRefPayload { 111 const o = asObject(value) 112 if ( 113 typeof o.installationId !== 'number' 114 || typeof o.githubRepoId !== 'number' 115 || (o.refType !== 'branch' && o.refType !== 'tag') 116 || typeof o.ref !== 'string' 117 ) { 118 throw new TypeError(`invalid github.${kind} payload`) 119 } 120 return { 121 installationId: o.installationId, 122 githubRepoId: o.githubRepoId, 123 refType: o.refType, 124 ref: o.ref, 125 } 126} 127 128const REPOSITORY_ACTIONS = new Set<string>([ 129 'created', 130 'edited', 131 'renamed', 132 'transferred', 133 'deleted', 134 'privatized', 135 'publicized', 136 'archived', 137 'unarchived', 138]) 139 140function isRepositoryAction(action: string): action is RepositoryAction { 141 return REPOSITORY_ACTIONS.has(action) 142} 143 144function repositoryPayload(value: unknown): RepositoryPayload { 145 const o = asObject(value) 146 if ( 147 typeof o.installationId !== 'number' 148 || typeof o.githubRepoId !== 'number' 149 || typeof o.action !== 'string' 150 || !isRepositoryAction(o.action) 151 ) { 152 throw new TypeError('invalid github.repository payload') 153 } 154 return { 155 installationId: o.installationId, 156 githubRepoId: o.githubRepoId, 157 action: o.action, 158 } 159} 160 161function installationRepositoriesPayload(value: unknown): InstallationRepositoriesPayload { 162 const o = asObject(value) 163 if ( 164 typeof o.installationId !== 'number' 165 || (o.action !== 'added' && o.action !== 'removed') 166 || !Array.isArray(o.addedRepoIds) 167 || !Array.isArray(o.removedRepoIds) 168 ) { 169 throw new TypeError('invalid github.installation_repositories payload') 170 } 171 return { 172 installationId: o.installationId, 173 action: o.action, 174 addedRepoIds: o.addedRepoIds.filter((id): id is number => typeof id === 'number'), 175 removedRepoIds: o.removedRepoIds.filter((id): id is number => typeof id === 'number'), 176 } 177} 178 179export async function dispatch(envelope: JobEnvelope): Promise<void> { 180 if (!KNOWN_KINDS.has(envelope.kind)) { 181 throw new Error(`unknown job kind: ${envelope.kind}`) 182 } 183 184 if (envelope.kind === 'github.push') { 185 await syncPush(envelope.payload as PushPayload) 186 return 187 } 188 189 if (envelope.kind === 'github.create') { 190 await syncCreateRef(refPayload('create', envelope.payload)) 191 return 192 } 193 194 if (envelope.kind === 'github.delete') { 195 await syncDeleteRef(refPayload('delete', envelope.payload) as DeleteRefPayload) 196 return 197 } 198 199 if (envelope.kind === 'atproto.publish-pubkey') { 200 const { did, installationId, force } = publishPubkeyPayload(envelope.payload) 201 const client = await useOAuthClient() 202 const session = await client.restore(did) 203 if (force) await rotateKey({ oauthSession: session, installationId }) 204 else await generateAndPublishKey({ oauthSession: session, installationId }) 205 return 206 } 207 208 if (envelope.kind === 'tangled.create-repo') { 209 const { installationId, githubRepoId, force } = createRepoPayload(envelope.payload) 210 const session = await restoreSessionForInstallation(installationId) 211 if (!session) return 212 await enrollRepo({ oauthSession: session, installationId, githubRepoId, force }) 213 return 214 } 215 216 if (envelope.kind === 'tangled.backfill-installation') { 217 const { installationId, page } = backfillInstallationPayload(envelope.payload) 218 const octokit = await installationOctokit(installationId) 219 const { data } = await octokit.request('GET /installation/repositories', { 220 per_page: BACKFILL_PAGE_SIZE, 221 page, 222 }) 223 224 // Fan out one tangled.create-repo job per repo on this page. 225 for (const repo of data.repositories) { 226 // eslint-disable-next-line no-await-in-loop -- enqueue is sequential by design 227 await enqueue('tangled.create-repo', { installationId, githubRepoId: repo.id }) 228 } 229 230 // If there are more pages, re-queue ourselves for the next one. This 231 // keeps each tick small and bounded; an install with thousands of repos 232 // walks through over many minutes rather than blocking one worker. 233 const seenSoFar = (page - 1) * BACKFILL_PAGE_SIZE + data.repositories.length 234 if (seenSoFar < data.total_count && data.repositories.length > 0) { 235 await enqueue('tangled.backfill-installation', { installationId, page: page + 1 }) 236 } 237 return 238 } 239 240 if (envelope.kind === 'github.installation_repositories') { 241 const { installationId, action, addedRepoIds, removedRepoIds } = installationRepositoriesPayload(envelope.payload) 242 243 if (action === 'added') { 244 // Fan out one tangled.create-repo job per added repo. The fan-out keeps 245 // each unit small enough to fit comfortably in the per-job lease, lets 246 // failures retry independently, and runs the OAuth precondition check 247 // per repo (an install can outlive a tangled identity disconnection). 248 for (const id of addedRepoIds) { 249 // eslint-disable-next-line no-await-in-loop -- fan-out enqueue is sequential by design 250 await enqueue('tangled.create-repo', { installationId, githubRepoId: id }) 251 } 252 return 253 } 254 255 // 'removed': the install no longer has access to these repos. We can't 256 // see them via the install token any more, so syncing has to stop. Leave 257 // the tangled mirror in place (PLAN.md: "don't delete user data on our 258 // say-so"). The user can manually re-add the repo on GitHub to re-enable. 259 if (action === 'removed' && removedRepoIds.length > 0) { 260 const db = useDb() 261 await db.update(repoMapping) 262 .set({ disabledAt: new Date(), updatedAt: new Date() }) 263 .where(and( 264 eq(repoMapping.installationId, installationId), 265 sql`${repoMapping.githubRepoId} IN ${removedRepoIds}`, 266 )) 267 } 268 return 269 } 270 271 if (envelope.kind === 'github.repository') { 272 await handleRepositoryEvent(repositoryPayload(envelope.payload)) 273 return 274 } 275} 276 277async function handleRepositoryEvent(payload: RepositoryPayload): Promise<void> { 278 const { installationId, githubRepoId, action } = payload 279 const db = useDb() 280 281 // Most lifecycle actions only need to touch the local mapping, no PDS work. 282 // `edited` and `publicized` (when we already have a mapping) go through the 283 // OAuth-authed metadata sync helper. 284 if (action === 'privatized' || action === 'transferred' || action === 'deleted') { 285 await db.update(repoMapping) 286 .set({ disabledAt: new Date(), updatedAt: new Date() }) 287 .where(and( 288 eq(repoMapping.installationId, installationId), 289 eq(repoMapping.githubRepoId, githubRepoId), 290 )) 291 return 292 } 293 294 if (action === 'renamed') { 295 // Refetch the current full_name from GitHub; the webhook envelope is 296 // intentionally tiny. We do NOT rename on the tangled side — there's no 297 // procedure and a fresh-create-and-delete would lose stars/refs. Surface 298 // the change in `lastError` with an `info:` prefix so the dashboard can 299 // flag it without a schema change. 300 const rows = await db.select({ id: repoMapping.id, githubFullName: repoMapping.githubFullName }) 301 .from(repoMapping) 302 .where(and( 303 eq(repoMapping.installationId, installationId), 304 eq(repoMapping.githubRepoId, githubRepoId), 305 )) 306 .limit(1) 307 if (rows.length === 0) return 308 const row = rows[0]! 309 310 const octokit = await installationOctokit(installationId) 311 const { data: repo } = await octokit.request('GET /repositories/{repository_id}', { 312 repository_id: githubRepoId, 313 }) 314 if (repo.full_name === row.githubFullName) return 315 316 await db.update(repoMapping) 317 .set({ 318 githubFullName: repo.full_name, 319 lastError: `info: renamed on github from ${row.githubFullName} to ${repo.full_name}; tangled mirror name unchanged`, 320 updatedAt: new Date(), 321 }) 322 .where(eq(repoMapping.id, row.id)) 323 return 324 } 325 326 if (action === 'publicized') { 327 const rows = await db.select().from(repoMapping).where(and( 328 eq(repoMapping.installationId, installationId), 329 eq(repoMapping.githubRepoId, githubRepoId), 330 )).limit(1) 331 const row = rows[0] 332 333 if (!row) { 334 // Repo flipped public without ever having been enrolled (the install 335 // was added while it was private). Kick off a fresh enrolment. 336 await enqueue('tangled.create-repo', { installationId, githubRepoId }) 337 return 338 } 339 340 await db.update(repoMapping) 341 .set({ disabledAt: null, updatedAt: new Date() }) 342 .where(eq(repoMapping.id, row.id)) 343 344 if (!row.tangledRepoDid) { 345 await enqueue('tangled.create-repo', { installationId, githubRepoId }) 346 return 347 } 348 349 // Already mirrored, just refresh metadata in case description/topics 350 // changed while it was private. 351 await runMetadataSync(installationId, githubRepoId) 352 return 353 } 354 355 if (action === 'edited') { 356 await runMetadataSync(installationId, githubRepoId) 357 return 358 } 359 360 // 'created', 'archived', 'unarchived' — nothing for us to do. Repo creation 361 // surfaces via `installation_repositories.added`; archive state isn't part 362 // of the mirror surface in v1. 363} 364 365async function runMetadataSync(installationId: number, githubRepoId: number): Promise<void> { 366 const session = await restoreSessionForInstallation(installationId) 367 if (!session) return 368 await syncRepoMetadata({ oauthSession: session, installationId, githubRepoId }) 369} 370 371/** 372 * Restore the OAuth session for the user identity bound to `installationId`, 373 * or null if OAuth hasn't completed yet. The OAuth callback re-enqueues work 374 * for all accessible repos on completion, so a null here is a benign drop: a 375 * fresh trigger arrives once the identity exists. 376 */ 377async function restoreSessionForInstallation(installationId: number): Promise<OAuthSession | null> { 378 const db = useDb() 379 const identity = await db.select({ did: userIdentity.did }) 380 .from(userIdentity) 381 .where(sql`${userIdentity.installationId} = ${installationId}`) 382 if (identity.length === 0) return null 383 384 const client = await useOAuthClient() 385 return client.restore(identity[0]!.did) 386}