From 538c24c58f0ef7360bbdff1d5043681982e35fbd Mon Sep 17 00:00:00 2001 From: omigamedev Date: Mon, 23 Feb 2026 15:32:24 +0100 Subject: [PATCH] Phases 2-4: Auth, providers, stream management --- src/app.ts | 14 ++ src/index.ts | 14 ++ src/routes/auth/meta.ts | 74 +++++++ src/routes/auth/session.ts | 101 ++++++++++ src/routes/providers/accounts.ts | 77 ++++++++ src/routes/providers/twitch.ts | 136 +++++++++++++ src/routes/providers/youtube.ts | 137 +++++++++++++ src/routes/streams/lifecycle.ts | 267 ++++++++++++++++++++++++++ src/routes/streams/plans.ts | 149 ++++++++++++++ src/services/crypto.service.ts | 37 ++++ src/services/meta-auth.service.ts | 55 ++++++ src/services/token-refresh.service.ts | 70 +++++++ src/services/twitch.service.ts | 178 +++++++++++++++++ src/services/youtube.service.ts | 221 +++++++++++++++++++++ 14 files changed, 1530 insertions(+) create mode 100644 src/routes/auth/meta.ts create mode 100644 src/routes/auth/session.ts create mode 100644 src/routes/providers/accounts.ts create mode 100644 src/routes/providers/twitch.ts create mode 100644 src/routes/providers/youtube.ts create mode 100644 src/routes/streams/lifecycle.ts create mode 100644 src/routes/streams/plans.ts create mode 100644 src/services/crypto.service.ts create mode 100644 src/services/meta-auth.service.ts create mode 100644 src/services/token-refresh.service.ts create mode 100644 src/services/twitch.service.ts create mode 100644 src/services/youtube.service.ts diff --git a/src/app.ts b/src/app.ts index c0c5d2b..93de841 100644 --- a/src/app.ts +++ b/src/app.ts @@ -4,6 +4,13 @@ import prismaPlugin from './plugins/prisma.js'; import errorHandlerPlugin from './plugins/error-handler.js'; import authPlugin from './plugins/auth.js'; import healthRoutes from './routes/health.js'; +import metaAuthRoutes from './routes/auth/meta.js'; +import sessionRoutes from './routes/auth/session.js'; +import accountRoutes from './routes/providers/accounts.js'; +import youtubeRoutes from './routes/providers/youtube.js'; +import twitchRoutes from './routes/providers/twitch.js'; +import planRoutes from './routes/streams/plans.js'; +import lifecycleRoutes from './routes/streams/lifecycle.js'; import { config } from './config.js'; export async function buildApp() { @@ -21,6 +28,13 @@ export async function buildApp() { // Routes await app.register(healthRoutes); + await app.register(metaAuthRoutes); + await app.register(sessionRoutes); + await app.register(accountRoutes); + await app.register(youtubeRoutes); + await app.register(twitchRoutes); + await app.register(planRoutes); + await app.register(lifecycleRoutes); return app; } diff --git a/src/index.ts b/src/index.ts index 14d85ab..419e899 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,6 @@ import { buildApp } from './app.js'; import { config } from './config.js'; +import { startTokenRefreshScheduler, stopTokenRefreshScheduler } from './services/token-refresh.service.js'; async function main() { const app = await buildApp(); @@ -7,6 +8,19 @@ async function main() { try { await app.listen({ port: config.port, host: config.host }); app.log.info(`Server listening on ${config.host}:${config.port}`); + + // Start background token refresh + startTokenRefreshScheduler(app.prisma, app.log); + + // Graceful shutdown + for (const signal of ['SIGINT', 'SIGTERM'] as const) { + process.on(signal, async () => { + app.log.info(`Received ${signal}, shutting down...`); + stopTokenRefreshScheduler(); + await app.close(); + process.exit(0); + }); + } } catch (err) { app.log.error(err); process.exit(1); diff --git a/src/routes/auth/meta.ts b/src/routes/auth/meta.ts new file mode 100644 index 0000000..d0c3028 --- /dev/null +++ b/src/routes/auth/meta.ts @@ -0,0 +1,74 @@ +import { FastifyPluginAsync } from 'fastify'; +import { randomUUID } from 'node:crypto'; +import { exchangeMetaCode, fetchMetaProfile } from '../../services/meta-auth.service.js'; +import { signAccessToken } from '../../plugins/auth.js'; +import { hashToken } from '../../services/crypto.service.js'; +import { config } from '../../config.js'; +import { AppError } from '../../plugins/error-handler.js'; +import type { MetaCallbackBody, AuthTokensResponse } from '../../types/api.js'; + +const metaAuthRoutes: FastifyPluginAsync = async (fastify) => { + fastify.post<{ Body: MetaCallbackBody }>('/auth/meta/callback', { + schema: { + body: { + type: 'object', + required: ['code'], + properties: { + code: { type: 'string', minLength: 1 }, + deviceInfo: { type: 'string' }, + }, + additionalProperties: false, + }, + }, + }, async (request, reply) => { + const { code, deviceInfo } = request.body; + + // Exchange code for Meta access token + const { accessToken: metaToken } = await exchangeMetaCode(code); + + // Fetch user profile + const profile = await fetchMetaProfile(metaToken); + + // Upsert user + const user = await fastify.prisma.user.upsert({ + where: { metaId: profile.metaId }, + update: { + displayName: profile.displayName, + email: profile.email, + avatarUrl: profile.avatarUrl, + }, + create: { + metaId: profile.metaId, + displayName: profile.displayName, + email: profile.email, + avatarUrl: profile.avatarUrl, + }, + }); + + // Create session with hashed refresh token + const refreshToken = randomUUID(); + const expiresAt = new Date(Date.now() + config.jwt.refreshTtl * 1000); + + await fastify.prisma.session.create({ + data: { + userId: user.id, + refreshToken: hashToken(refreshToken), + expiresAt, + deviceInfo: deviceInfo ?? null, + }, + }); + + // Sign JWT + const accessToken = await signAccessToken(user.id); + + const response: AuthTokensResponse = { + accessToken, + refreshToken, + expiresIn: config.jwt.accessTtl, + }; + + reply.status(200).send(response); + }); +}; + +export default metaAuthRoutes; diff --git a/src/routes/auth/session.ts b/src/routes/auth/session.ts new file mode 100644 index 0000000..d773d4d --- /dev/null +++ b/src/routes/auth/session.ts @@ -0,0 +1,101 @@ +import { FastifyPluginAsync } from 'fastify'; +import { randomUUID } from 'node:crypto'; +import { signAccessToken } from '../../plugins/auth.js'; +import { hashToken } from '../../services/crypto.service.js'; +import { requireAuth } from '../../middleware/require-auth.js'; +import { config } from '../../config.js'; +import { AppError } from '../../plugins/error-handler.js'; +import type { RefreshBody, AuthTokensResponse, UserProfileResponse } from '../../types/api.js'; + +const sessionRoutes: FastifyPluginAsync = async (fastify) => { + // POST /auth/refresh — rotate refresh token, issue new JWT + fastify.post<{ Body: RefreshBody }>('/auth/refresh', { + schema: { + body: { + type: 'object', + required: ['refreshToken'], + properties: { + refreshToken: { type: 'string', minLength: 1 }, + }, + additionalProperties: false, + }, + }, + }, async (request, reply) => { + const { refreshToken } = request.body; + const hashed = hashToken(refreshToken); + + // Find and validate session + const session = await fastify.prisma.session.findUnique({ + where: { refreshToken: hashed }, + include: { user: true }, + }); + + if (!session || session.expiresAt < new Date()) { + // Delete expired session if it exists + if (session) { + await fastify.prisma.session.delete({ where: { id: session.id } }); + } + throw new AppError(401, 'Invalid or expired refresh token'); + } + + // Rotate: delete old session, create new one + const newRefreshToken = randomUUID(); + const expiresAt = new Date(Date.now() + config.jwt.refreshTtl * 1000); + + await fastify.prisma.session.delete({ where: { id: session.id } }); + await fastify.prisma.session.create({ + data: { + userId: session.userId, + refreshToken: hashToken(newRefreshToken), + expiresAt, + deviceInfo: session.deviceInfo, + }, + }); + + const accessToken = await signAccessToken(session.userId); + + const response: AuthTokensResponse = { + accessToken, + refreshToken: newRefreshToken, + expiresIn: config.jwt.accessTtl, + }; + + reply.status(200).send(response); + }); + + // GET /auth/me — current user profile + fastify.get('/auth/me', { + preHandler: [requireAuth], + }, async (request, reply) => { + const user = await fastify.prisma.user.findUnique({ + where: { id: request.userId }, + }); + + if (!user) { + throw new AppError(404, 'User not found'); + } + + const response: UserProfileResponse = { + id: user.id, + displayName: user.displayName, + email: user.email, + avatarUrl: user.avatarUrl, + }; + + reply.status(200).send(response); + }); + + // POST /auth/logout — invalidate session + fastify.post('/auth/logout', { + preHandler: [requireAuth], + }, async (request, reply) => { + // Delete all sessions for this user (full logout) + await fastify.prisma.session.deleteMany({ + where: { userId: request.userId }, + }); + + reply.status(200).send({ success: true }); + }); +}; + +export default sessionRoutes; diff --git a/src/routes/providers/accounts.ts b/src/routes/providers/accounts.ts new file mode 100644 index 0000000..0b30d9a --- /dev/null +++ b/src/routes/providers/accounts.ts @@ -0,0 +1,77 @@ +import { FastifyPluginAsync } from 'fastify'; +import { requireAuth } from '../../middleware/require-auth.js'; +import { decrypt } from '../../services/crypto.service.js'; +import { revokeYouTubeToken } from '../../services/youtube.service.js'; +import { revokeTwitchToken } from '../../services/twitch.service.js'; +import { AppError } from '../../plugins/error-handler.js'; +import type { LinkedAccountResponse } from '../../types/api.js'; + +const accountRoutes: FastifyPluginAsync = async (fastify) => { + // GET /providers/accounts — list linked accounts (no tokens) + fastify.get('/providers/accounts', { + preHandler: [requireAuth], + }, async (request) => { + const accounts = await fastify.prisma.linkedAccount.findMany({ + where: { userId: request.userId }, + }); + + const response: LinkedAccountResponse[] = accounts.map((a) => ({ + id: a.id, + serviceId: a.serviceId, + displayName: a.displayName, + accountId: a.accountId, + avatarUrl: a.avatarUrl, + })); + + return response; + }); + + // DELETE /providers/:serviceId — revoke tokens and unlink + fastify.delete<{ Params: { serviceId: string } }>('/providers/:serviceId', { + preHandler: [requireAuth], + schema: { + params: { + type: 'object', + required: ['serviceId'], + properties: { + serviceId: { type: 'string', enum: ['YOUTUBE', 'TWITCH'] }, + }, + }, + }, + }, async (request, reply) => { + const { serviceId } = request.params; + + const account = await fastify.prisma.linkedAccount.findUnique({ + where: { + userId_serviceId: { + userId: request.userId, + serviceId, + }, + }, + }); + + if (!account) { + throw new AppError(404, 'Account not linked'); + } + + // Best-effort revoke tokens at the provider + try { + const accessToken = decrypt(account.accessTokenEnc, account.accessTokenIv); + if (serviceId === 'YOUTUBE') { + await revokeYouTubeToken(accessToken); + } else { + await revokeTwitchToken(accessToken); + } + } catch { + // Revocation failure is non-fatal + } + + await fastify.prisma.linkedAccount.delete({ + where: { id: account.id }, + }); + + reply.status(200).send({ success: true }); + }); +}; + +export default accountRoutes; diff --git a/src/routes/providers/twitch.ts b/src/routes/providers/twitch.ts new file mode 100644 index 0000000..674d767 --- /dev/null +++ b/src/routes/providers/twitch.ts @@ -0,0 +1,136 @@ +import { FastifyPluginAsync } from 'fastify'; +import { randomUUID } from 'node:crypto'; +import { requireAuth } from '../../middleware/require-auth.js'; +import { encrypt } from '../../services/crypto.service.js'; +import { + getTwitchAuthUrl, + exchangeTwitchCode, + fetchTwitchProfile, +} from '../../services/twitch.service.js'; +import { config } from '../../config.js'; +import { AppError } from '../../plugins/error-handler.js'; +import type { AuthUrlResponse, ProviderCallbackBody, LinkedAccountResponse } from '../../types/api.js'; + +// In-memory CSRF state store (state → { userId, expiresAt }) +const pendingStates = new Map(); + +setInterval(() => { + const now = Date.now(); + for (const [key, val] of pendingStates) { + if (val.expiresAt < now) pendingStates.delete(key); + } +}, 5 * 60 * 1000); + +const twitchRoutes: FastifyPluginAsync = async (fastify) => { + // GET /providers/twitch/auth-url — get OAuth URL with CSRF state + fastify.get('/providers/twitch/auth-url', { + preHandler: [requireAuth], + }, async (request) => { + const state = randomUUID(); + pendingStates.set(state, { + userId: request.userId, + expiresAt: Date.now() + 10 * 60 * 1000, + }); + + const response: AuthUrlResponse = { + url: getTwitchAuthUrl(state), + state, + }; + return response; + }); + + // GET /providers/twitch/callback-redirect — Twitch redirects here → 302 to app deep link + fastify.get<{ Querystring: { code?: string; state?: string; error?: string } }>( + '/providers/twitch/callback-redirect', + async (request, reply) => { + const { code, state, error } = request.query; + if (error || !code || !state) { + reply.status(302).redirect( + `${config.appScheme}://twitch/callback?error=${error || 'missing_params'}`, + ); + return; + } + + reply.status(302).redirect( + `${config.appScheme}://twitch/callback?code=${encodeURIComponent(code)}&state=${encodeURIComponent(state)}`, + ); + }, + ); + + // POST /providers/twitch/callback — app sends code+state, backend exchanges + fastify.post<{ Body: ProviderCallbackBody }>('/providers/twitch/callback', { + preHandler: [requireAuth], + schema: { + body: { + type: 'object', + required: ['code', 'state'], + properties: { + code: { type: 'string', minLength: 1 }, + state: { type: 'string', minLength: 1 }, + }, + additionalProperties: false, + }, + }, + }, async (request) => { + const { code, state } = request.body; + + // Validate CSRF state + const pending = pendingStates.get(state); + if (!pending || pending.userId !== request.userId || pending.expiresAt < Date.now()) { + pendingStates.delete(state); + throw new AppError(400, 'Invalid or expired state parameter'); + } + pendingStates.delete(state); + + // Exchange code for tokens + const tokens = await exchangeTwitchCode(code); + const profile = await fetchTwitchProfile(tokens.accessToken); + + // Encrypt tokens + const accessEnc = encrypt(tokens.accessToken); + const refreshEnc = encrypt(tokens.refreshToken); + + // Upsert linked account + const account = await fastify.prisma.linkedAccount.upsert({ + where: { + userId_serviceId: { + userId: request.userId, + serviceId: 'TWITCH', + }, + }, + update: { + displayName: profile.displayName, + accountId: profile.accountId, + avatarUrl: profile.avatarUrl, + accessTokenEnc: accessEnc.ciphertext, + refreshTokenEnc: refreshEnc.ciphertext, + accessTokenIv: accessEnc.iv, + refreshTokenIv: refreshEnc.iv, + tokenExpiresAt: new Date(Date.now() + tokens.expiresIn * 1000), + }, + create: { + userId: request.userId, + serviceId: 'TWITCH', + displayName: profile.displayName, + accountId: profile.accountId, + avatarUrl: profile.avatarUrl, + accessTokenEnc: accessEnc.ciphertext, + refreshTokenEnc: refreshEnc.ciphertext, + accessTokenIv: accessEnc.iv, + refreshTokenIv: refreshEnc.iv, + tokenExpiresAt: new Date(Date.now() + tokens.expiresIn * 1000), + }, + }); + + const response: LinkedAccountResponse = { + id: account.id, + serviceId: account.serviceId, + displayName: account.displayName, + accountId: account.accountId, + avatarUrl: account.avatarUrl, + }; + return response; + }); +}; + +export default twitchRoutes; diff --git a/src/routes/providers/youtube.ts b/src/routes/providers/youtube.ts new file mode 100644 index 0000000..64d42af --- /dev/null +++ b/src/routes/providers/youtube.ts @@ -0,0 +1,137 @@ +import { FastifyPluginAsync } from 'fastify'; +import { randomUUID } from 'node:crypto'; +import { requireAuth } from '../../middleware/require-auth.js'; +import { encrypt } from '../../services/crypto.service.js'; +import { + getYouTubeAuthUrl, + exchangeYouTubeCode, + fetchYouTubeProfile, +} from '../../services/youtube.service.js'; +import { config } from '../../config.js'; +import { AppError } from '../../plugins/error-handler.js'; +import type { AuthUrlResponse, ProviderCallbackBody, LinkedAccountResponse } from '../../types/api.js'; + +// In-memory CSRF state store (state → { userId, expiresAt }) +const pendingStates = new Map(); + +// Clean expired states every 5 minutes +setInterval(() => { + const now = Date.now(); + for (const [key, val] of pendingStates) { + if (val.expiresAt < now) pendingStates.delete(key); + } +}, 5 * 60 * 1000); + +const youtubeRoutes: FastifyPluginAsync = async (fastify) => { + // GET /providers/youtube/auth-url — get OAuth URL with CSRF state + fastify.get('/providers/youtube/auth-url', { + preHandler: [requireAuth], + }, async (request) => { + const state = randomUUID(); + pendingStates.set(state, { + userId: request.userId, + expiresAt: Date.now() + 10 * 60 * 1000, // 10 min + }); + + const response: AuthUrlResponse = { + url: getYouTubeAuthUrl(state), + state, + }; + return response; + }); + + // GET /providers/youtube/callback-redirect — Google redirects here → 302 to app deep link + fastify.get<{ Querystring: { code?: string; state?: string; error?: string } }>( + '/providers/youtube/callback-redirect', + async (request, reply) => { + const { code, state, error } = request.query; + if (error || !code || !state) { + reply.status(302).redirect( + `${config.appScheme}://youtube/callback?error=${error || 'missing_params'}`, + ); + return; + } + + reply.status(302).redirect( + `${config.appScheme}://youtube/callback?code=${encodeURIComponent(code)}&state=${encodeURIComponent(state)}`, + ); + }, + ); + + // POST /providers/youtube/callback — app sends code+state, backend exchanges + fastify.post<{ Body: ProviderCallbackBody }>('/providers/youtube/callback', { + preHandler: [requireAuth], + schema: { + body: { + type: 'object', + required: ['code', 'state'], + properties: { + code: { type: 'string', minLength: 1 }, + state: { type: 'string', minLength: 1 }, + }, + additionalProperties: false, + }, + }, + }, async (request) => { + const { code, state } = request.body; + + // Validate CSRF state + const pending = pendingStates.get(state); + if (!pending || pending.userId !== request.userId || pending.expiresAt < Date.now()) { + pendingStates.delete(state); + throw new AppError(400, 'Invalid or expired state parameter'); + } + pendingStates.delete(state); + + // Exchange code for tokens + const tokens = await exchangeYouTubeCode(code); + const profile = await fetchYouTubeProfile(tokens.accessToken); + + // Encrypt tokens + const accessEnc = encrypt(tokens.accessToken); + const refreshEnc = encrypt(tokens.refreshToken); + + // Upsert linked account + const account = await fastify.prisma.linkedAccount.upsert({ + where: { + userId_serviceId: { + userId: request.userId, + serviceId: 'YOUTUBE', + }, + }, + update: { + displayName: profile.displayName, + accountId: profile.accountId, + avatarUrl: profile.avatarUrl, + accessTokenEnc: accessEnc.ciphertext, + refreshTokenEnc: refreshEnc.ciphertext, + accessTokenIv: accessEnc.iv, + refreshTokenIv: refreshEnc.iv, + tokenExpiresAt: new Date(Date.now() + tokens.expiresIn * 1000), + }, + create: { + userId: request.userId, + serviceId: 'YOUTUBE', + displayName: profile.displayName, + accountId: profile.accountId, + avatarUrl: profile.avatarUrl, + accessTokenEnc: accessEnc.ciphertext, + refreshTokenEnc: refreshEnc.ciphertext, + accessTokenIv: accessEnc.iv, + refreshTokenIv: refreshEnc.iv, + tokenExpiresAt: new Date(Date.now() + tokens.expiresIn * 1000), + }, + }); + + const response: LinkedAccountResponse = { + id: account.id, + serviceId: account.serviceId, + displayName: account.displayName, + accountId: account.accountId, + avatarUrl: account.avatarUrl, + }; + return response; + }); +}; + +export default youtubeRoutes; diff --git a/src/routes/streams/lifecycle.ts b/src/routes/streams/lifecycle.ts new file mode 100644 index 0000000..37a465c --- /dev/null +++ b/src/routes/streams/lifecycle.ts @@ -0,0 +1,267 @@ +import { FastifyPluginAsync } from 'fastify'; +import { requireAuth } from '../../middleware/require-auth.js'; +import { decrypt, encrypt } from '../../services/crypto.service.js'; +import { + createYouTubeBroadcast, + transitionYouTubeBroadcast, + refreshYouTubeToken, +} from '../../services/youtube.service.js'; +import { + getTwitchStreamKey, + updateTwitchChannel, + refreshTwitchToken, +} from '../../services/twitch.service.js'; +import { AppError } from '../../plugins/error-handler.js'; +import type { PrepareResponse, PreparedDestination } from '../../types/api.js'; + +const TWITCH_RTMP_URL = 'rtmp://live.twitch.tv/app'; + +async function getDecryptedToken( + prisma: any, + userId: string, + serviceId: string, +): Promise<{ account: any; accessToken: string }> { + const account = await prisma.linkedAccount.findUnique({ + where: { userId_serviceId: { userId, serviceId } }, + }); + if (!account) throw new AppError(400, `No ${serviceId} account linked`); + + // Lazy refresh if token is expired or about to expire + if (account.tokenExpiresAt < new Date(Date.now() + 60 * 1000)) { + const refreshToken = decrypt(account.refreshTokenEnc, account.refreshTokenIv); + let newAccess: string; + let newRefresh: string | undefined; + let expiresIn: number; + + if (serviceId === 'YOUTUBE') { + const result = await refreshYouTubeToken(refreshToken); + newAccess = result.accessToken; + expiresIn = result.expiresIn; + } else { + const result = await refreshTwitchToken(refreshToken); + newAccess = result.accessToken; + newRefresh = result.refreshToken; + expiresIn = result.expiresIn; + } + + const accessEnc = encrypt(newAccess); + const updateData: any = { + accessTokenEnc: accessEnc.ciphertext, + accessTokenIv: accessEnc.iv, + tokenExpiresAt: new Date(Date.now() + expiresIn * 1000), + }; + if (newRefresh) { + const refreshEnc = encrypt(newRefresh); + updateData.refreshTokenEnc = refreshEnc.ciphertext; + updateData.refreshTokenIv = refreshEnc.iv; + } + + await prisma.linkedAccount.update({ + where: { id: account.id }, + data: updateData, + }); + + return { account, accessToken: newAccess }; + } + + return { + account, + accessToken: decrypt(account.accessTokenEnc, account.accessTokenIv), + }; +} + +const lifecycleRoutes: FastifyPluginAsync = async (fastify) => { + // POST /streams/plans/:id/prepare — create broadcasts, get RTMP info + fastify.post<{ Params: { id: string } }>('/streams/plans/:id/prepare', { + preHandler: [requireAuth], + schema: { + params: { + type: 'object', + required: ['id'], + properties: { id: { type: 'string' } }, + }, + }, + }, async (request) => { + const plan = await fastify.prisma.streamPlan.findFirst({ + where: { id: request.params.id, userId: request.userId }, + include: { destinations: true }, + }); + if (!plan) throw new AppError(404, 'Stream plan not found'); + if (plan.status !== 'DRAFT') { + throw new AppError(400, `Plan is already ${plan.status}`); + } + + const prepared: PreparedDestination[] = []; + + for (const dest of plan.destinations) { + const { account, accessToken } = await getDecryptedToken( + fastify.prisma, + request.userId, + dest.serviceId, + ); + + if (dest.serviceId === 'YOUTUBE') { + const broadcast = await createYouTubeBroadcast( + accessToken, + dest.title, + dest.description, + dest.privacyStatus, + ); + + await fastify.prisma.streamDestination.update({ + where: { id: dest.id }, + data: { + rtmpUrl: broadcast.rtmpUrl, + streamKey: broadcast.streamKey, + broadcastId: broadcast.id, + status: 'READY', + }, + }); + + prepared.push({ + serviceId: 'YOUTUBE', + rtmpUrl: broadcast.rtmpUrl, + streamKey: broadcast.streamKey, + broadcastId: broadcast.id, + }); + } else if (dest.serviceId === 'TWITCH') { + // Update channel info + const tags = dest.tags ? dest.tags.split(',').map((t) => t.trim()).filter(Boolean) : []; + await updateTwitchChannel( + accessToken, + account.accountId, + dest.title, + dest.gameId, + tags, + ); + + // Get stream key + const streamKey = await getTwitchStreamKey(accessToken, account.accountId); + + await fastify.prisma.streamDestination.update({ + where: { id: dest.id }, + data: { + rtmpUrl: TWITCH_RTMP_URL, + streamKey, + broadcastId: account.accountId, + status: 'READY', + }, + }); + + prepared.push({ + serviceId: 'TWITCH', + rtmpUrl: TWITCH_RTMP_URL, + streamKey, + broadcastId: account.accountId, + }); + } + } + + await fastify.prisma.streamPlan.update({ + where: { id: plan.id }, + data: { status: 'READY' }, + }); + + const response: PrepareResponse = { + planId: plan.id, + destinations: prepared, + }; + return response; + }); + + // POST /streams/plans/:id/start — transition to LIVE + fastify.post<{ Params: { id: string } }>('/streams/plans/:id/start', { + preHandler: [requireAuth], + schema: { + params: { + type: 'object', + required: ['id'], + properties: { id: { type: 'string' } }, + }, + }, + }, async (request) => { + const plan = await fastify.prisma.streamPlan.findFirst({ + where: { id: request.params.id, userId: request.userId }, + include: { destinations: true }, + }); + if (!plan) throw new AppError(404, 'Stream plan not found'); + if (plan.status !== 'READY') { + throw new AppError(400, `Plan must be READY to start, currently ${plan.status}`); + } + + for (const dest of plan.destinations) { + if (dest.serviceId === 'YOUTUBE' && dest.broadcastId) { + const { accessToken } = await getDecryptedToken( + fastify.prisma, + request.userId, + 'YOUTUBE', + ); + await transitionYouTubeBroadcast(accessToken, dest.broadcastId, 'live'); + } + // Twitch goes live automatically when RTMP stream is received + + await fastify.prisma.streamDestination.update({ + where: { id: dest.id }, + data: { status: 'LIVE' }, + }); + } + + await fastify.prisma.streamPlan.update({ + where: { id: plan.id }, + data: { status: 'LIVE' }, + }); + + return { success: true, status: 'LIVE' }; + }); + + // POST /streams/plans/:id/end — end stream + fastify.post<{ Params: { id: string } }>('/streams/plans/:id/end', { + preHandler: [requireAuth], + schema: { + params: { + type: 'object', + required: ['id'], + properties: { id: { type: 'string' } }, + }, + }, + }, async (request) => { + const plan = await fastify.prisma.streamPlan.findFirst({ + where: { id: request.params.id, userId: request.userId }, + include: { destinations: true }, + }); + if (!plan) throw new AppError(404, 'Stream plan not found'); + if (plan.status !== 'LIVE') { + throw new AppError(400, `Plan must be LIVE to end, currently ${plan.status}`); + } + + for (const dest of plan.destinations) { + if (dest.serviceId === 'YOUTUBE' && dest.broadcastId) { + const { accessToken } = await getDecryptedToken( + fastify.prisma, + request.userId, + 'YOUTUBE', + ); + try { + await transitionYouTubeBroadcast(accessToken, dest.broadcastId, 'complete'); + } catch { + // Non-fatal — stream may already be ended + } + } + // Twitch ends when RTMP stream stops + + await fastify.prisma.streamDestination.update({ + where: { id: dest.id }, + data: { status: 'ENDED' }, + }); + } + + await fastify.prisma.streamPlan.update({ + where: { id: plan.id }, + data: { status: 'ENDED' }, + }); + + return { success: true, status: 'ENDED' }; + }); +}; + +export default lifecycleRoutes; diff --git a/src/routes/streams/plans.ts b/src/routes/streams/plans.ts new file mode 100644 index 0000000..a5a7a38 --- /dev/null +++ b/src/routes/streams/plans.ts @@ -0,0 +1,149 @@ +import { FastifyPluginAsync } from 'fastify'; +import { requireAuth } from '../../middleware/require-auth.js'; +import { AppError } from '../../plugins/error-handler.js'; +import type { CreateStreamPlanBody, StreamPlanResponse, StreamDestinationResponse } from '../../types/api.js'; + +function formatPlan(plan: any): StreamPlanResponse { + return { + id: plan.id, + name: plan.name, + status: plan.status, + createdAt: plan.createdAt.toISOString(), + updatedAt: plan.updatedAt.toISOString(), + destinations: (plan.destinations ?? []).map((d: any): StreamDestinationResponse => ({ + id: d.id, + serviceId: d.serviceId, + title: d.title, + description: d.description, + privacyStatus: d.privacyStatus, + gameId: d.gameId, + tags: d.tags, + rtmpUrl: d.rtmpUrl, + streamKey: d.streamKey, + broadcastId: d.broadcastId, + status: d.status, + })), + }; +} + +const planRoutes: FastifyPluginAsync = async (fastify) => { + // GET /streams/plans — list plans + fastify.get('/streams/plans', { + preHandler: [requireAuth], + }, async (request) => { + const plans = await fastify.prisma.streamPlan.findMany({ + where: { userId: request.userId }, + include: { destinations: true }, + orderBy: { createdAt: 'desc' }, + }); + return plans.map(formatPlan); + }); + + // POST /streams/plans — create plan with destinations + fastify.post<{ Body: CreateStreamPlanBody }>('/streams/plans', { + preHandler: [requireAuth], + schema: { + body: { + type: 'object', + required: ['name', 'destinations'], + properties: { + name: { type: 'string', minLength: 1, maxLength: 200 }, + destinations: { + type: 'array', + minItems: 1, + maxItems: 10, + items: { + type: 'object', + required: ['serviceId', 'title'], + properties: { + serviceId: { type: 'string', enum: ['YOUTUBE', 'TWITCH'] }, + title: { type: 'string', minLength: 1, maxLength: 200 }, + description: { type: 'string', maxLength: 5000 }, + privacyStatus: { type: 'string', enum: ['public', 'unlisted', 'private'] }, + gameId: { type: 'string', maxLength: 100 }, + tags: { type: 'string', maxLength: 500 }, + }, + additionalProperties: false, + }, + }, + }, + additionalProperties: false, + }, + }, + }, async (request, reply) => { + const { name, destinations } = request.body; + + // Verify user has the required linked accounts + const linkedAccounts = await fastify.prisma.linkedAccount.findMany({ + where: { userId: request.userId }, + }); + const linkedServices = new Set(linkedAccounts.map((a) => a.serviceId)); + + for (const dest of destinations) { + if (!linkedServices.has(dest.serviceId)) { + throw new AppError(400, `No ${dest.serviceId} account linked`); + } + } + + const plan = await fastify.prisma.streamPlan.create({ + data: { + userId: request.userId, + name, + destinations: { + create: destinations.map((d) => ({ + serviceId: d.serviceId, + title: d.title, + description: d.description ?? '', + privacyStatus: d.privacyStatus ?? 'public', + gameId: d.gameId ?? '', + tags: d.tags ?? '', + })), + }, + }, + include: { destinations: true }, + }); + + reply.status(201).send(formatPlan(plan)); + }); + + // GET /streams/plans/:id — get plan detail + fastify.get<{ Params: { id: string } }>('/streams/plans/:id', { + preHandler: [requireAuth], + schema: { + params: { + type: 'object', + required: ['id'], + properties: { id: { type: 'string' } }, + }, + }, + }, async (request) => { + const plan = await fastify.prisma.streamPlan.findFirst({ + where: { id: request.params.id, userId: request.userId }, + include: { destinations: true }, + }); + if (!plan) throw new AppError(404, 'Stream plan not found'); + return formatPlan(plan); + }); + + // DELETE /streams/plans/:id — delete plan + fastify.delete<{ Params: { id: string } }>('/streams/plans/:id', { + preHandler: [requireAuth], + schema: { + params: { + type: 'object', + required: ['id'], + properties: { id: { type: 'string' } }, + }, + }, + }, async (request, reply) => { + const plan = await fastify.prisma.streamPlan.findFirst({ + where: { id: request.params.id, userId: request.userId }, + }); + if (!plan) throw new AppError(404, 'Stream plan not found'); + + await fastify.prisma.streamPlan.delete({ where: { id: plan.id } }); + reply.status(200).send({ success: true }); + }); +}; + +export default planRoutes; diff --git a/src/services/crypto.service.ts b/src/services/crypto.service.ts new file mode 100644 index 0000000..63ee3eb --- /dev/null +++ b/src/services/crypto.service.ts @@ -0,0 +1,37 @@ +import { createCipheriv, createDecipheriv, randomBytes, createHash } from 'node:crypto'; +import { config } from '../config.js'; + +const ALGORITHM = 'aes-256-gcm'; +const IV_LENGTH = 12; +const AUTH_TAG_LENGTH = 16; + +function getKey(): Buffer { + return Buffer.from(config.tokenEncryptionKey, 'hex'); +} + +export function encrypt(plaintext: string): { ciphertext: string; iv: string } { + const key = getKey(); + const iv = randomBytes(IV_LENGTH); + const cipher = createCipheriv(ALGORITHM, key, iv, { authTagLength: AUTH_TAG_LENGTH }); + const encrypted = Buffer.concat([cipher.update(plaintext, 'utf8'), cipher.final()]); + const authTag = cipher.getAuthTag(); + return { + ciphertext: Buffer.concat([encrypted, authTag]).toString('base64'), + iv: iv.toString('base64'), + }; +} + +export function decrypt(ciphertext: string, iv: string): string { + const key = getKey(); + const ivBuf = Buffer.from(iv, 'base64'); + const data = Buffer.from(ciphertext, 'base64'); + const authTag = data.subarray(data.length - AUTH_TAG_LENGTH); + const encrypted = data.subarray(0, data.length - AUTH_TAG_LENGTH); + const decipher = createDecipheriv(ALGORITHM, key, ivBuf, { authTagLength: AUTH_TAG_LENGTH }); + decipher.setAuthTag(authTag); + return decipher.update(encrypted) + decipher.final('utf8'); +} + +export function hashToken(token: string): string { + return createHash('sha256').update(token).digest('hex'); +} diff --git a/src/services/meta-auth.service.ts b/src/services/meta-auth.service.ts new file mode 100644 index 0000000..1ad16b8 --- /dev/null +++ b/src/services/meta-auth.service.ts @@ -0,0 +1,55 @@ +import { config } from '../config.js'; + +interface MetaTokenResponse { + access_token: string; + token_type: string; + expires_in: number; +} + +interface MetaProfile { + id: string; + name: string; + email?: string; + picture?: { data?: { url?: string } }; +} + +export async function exchangeMetaCode(code: string): Promise<{ accessToken: string }> { + const params = new URLSearchParams({ + client_id: config.meta.appId, + client_secret: config.meta.appSecret, + redirect_uri: config.meta.redirectUri, + code, + }); + + const res = await fetch( + `https://graph.facebook.com/v19.0/oauth/access_token?${params}`, + ); + if (!res.ok) { + const body = await res.text(); + throw new Error(`Meta token exchange failed: ${res.status} ${body}`); + } + const data = (await res.json()) as MetaTokenResponse; + return { accessToken: data.access_token }; +} + +export async function fetchMetaProfile(accessToken: string): Promise<{ + metaId: string; + displayName: string; + email: string | null; + avatarUrl: string | null; +}> { + const res = await fetch( + `https://graph.facebook.com/v19.0/me?fields=id,name,email,picture.type(large)&access_token=${accessToken}`, + ); + if (!res.ok) { + const body = await res.text(); + throw new Error(`Meta profile fetch failed: ${res.status} ${body}`); + } + const data = (await res.json()) as MetaProfile; + return { + metaId: data.id, + displayName: data.name, + email: data.email ?? null, + avatarUrl: data.picture?.data?.url ?? null, + }; +} diff --git a/src/services/token-refresh.service.ts b/src/services/token-refresh.service.ts new file mode 100644 index 0000000..07dbb4f --- /dev/null +++ b/src/services/token-refresh.service.ts @@ -0,0 +1,70 @@ +import { PrismaClient } from '@prisma/client'; +import { decrypt, encrypt } from './crypto.service.js'; +import { refreshYouTubeToken } from './youtube.service.js'; +import { refreshTwitchToken } from './twitch.service.js'; + +const REFRESH_INTERVAL = 10 * 60 * 1000; // 10 minutes +const REFRESH_THRESHOLD = 15 * 60 * 1000; // Refresh tokens expiring within 15 minutes + +let timer: ReturnType | null = null; + +export function startTokenRefreshScheduler(prisma: PrismaClient, logger: { info: (...args: any[]) => void; error: (...args: any[]) => void }) { + if (timer) return; + + timer = setInterval(async () => { + try { + const threshold = new Date(Date.now() + REFRESH_THRESHOLD); + const accounts = await prisma.linkedAccount.findMany({ + where: { tokenExpiresAt: { lt: threshold } }, + }); + + for (const account of accounts) { + try { + const refreshToken = decrypt(account.refreshTokenEnc, account.refreshTokenIv); + + if (account.serviceId === 'YOUTUBE') { + const result = await refreshYouTubeToken(refreshToken); + const accessEnc = encrypt(result.accessToken); + await prisma.linkedAccount.update({ + where: { id: account.id }, + data: { + accessTokenEnc: accessEnc.ciphertext, + accessTokenIv: accessEnc.iv, + tokenExpiresAt: new Date(Date.now() + result.expiresIn * 1000), + }, + }); + } else if (account.serviceId === 'TWITCH') { + const result = await refreshTwitchToken(refreshToken); + const accessEnc = encrypt(result.accessToken); + const refreshEnc = encrypt(result.refreshToken); + await prisma.linkedAccount.update({ + where: { id: account.id }, + data: { + accessTokenEnc: accessEnc.ciphertext, + accessTokenIv: accessEnc.iv, + refreshTokenEnc: refreshEnc.ciphertext, + refreshTokenIv: refreshEnc.iv, + tokenExpiresAt: new Date(Date.now() + result.expiresIn * 1000), + }, + }); + } + + logger.info(`Refreshed ${account.serviceId} token for account ${account.id}`); + } catch (err) { + logger.error(`Failed to refresh ${account.serviceId} token for account ${account.id}: ${err}`); + } + } + } catch (err) { + logger.error(`Token refresh scheduler error: ${err}`); + } + }, REFRESH_INTERVAL); + + logger.info('Token refresh scheduler started'); +} + +export function stopTokenRefreshScheduler() { + if (timer) { + clearInterval(timer); + timer = null; + } +} diff --git a/src/services/twitch.service.ts b/src/services/twitch.service.ts new file mode 100644 index 0000000..2a1a5f5 --- /dev/null +++ b/src/services/twitch.service.ts @@ -0,0 +1,178 @@ +import { config } from '../config.js'; + +interface TwitchTokenResponse { + access_token: string; + refresh_token: string; + expires_in: number; + token_type: string; +} + +interface TwitchUser { + id: string; + login: string; + display_name: string; + profile_image_url: string; +} + +const SCOPES = [ + 'channel:manage:broadcast', + 'channel:read:stream_key', + 'user:read:email', +].join(' '); + +export function getTwitchAuthUrl(state: string): string { + const params = new URLSearchParams({ + client_id: config.twitch.clientId, + redirect_uri: config.twitch.redirectUri, + response_type: 'code', + scope: SCOPES, + force_verify: 'true', + state, + }); + return `https://id.twitch.tv/oauth2/authorize?${params}`; +} + +export async function exchangeTwitchCode(code: string): Promise<{ + accessToken: string; + refreshToken: string; + expiresIn: number; +}> { + const res = await fetch('https://id.twitch.tv/oauth2/token', { + method: 'POST', + headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, + body: new URLSearchParams({ + client_id: config.twitch.clientId, + client_secret: config.twitch.clientSecret, + code, + grant_type: 'authorization_code', + redirect_uri: config.twitch.redirectUri, + }), + }); + if (!res.ok) { + const body = await res.text(); + throw new Error(`Twitch token exchange failed: ${res.status} ${body}`); + } + const data = (await res.json()) as TwitchTokenResponse; + return { + accessToken: data.access_token, + refreshToken: data.refresh_token, + expiresIn: data.expires_in, + }; +} + +export async function refreshTwitchToken(refreshToken: string): Promise<{ + accessToken: string; + refreshToken: string; + expiresIn: number; +}> { + const res = await fetch('https://id.twitch.tv/oauth2/token', { + method: 'POST', + headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, + body: new URLSearchParams({ + client_id: config.twitch.clientId, + client_secret: config.twitch.clientSecret, + refresh_token: refreshToken, + grant_type: 'refresh_token', + }), + }); + if (!res.ok) { + const body = await res.text(); + throw new Error(`Twitch token refresh failed: ${res.status} ${body}`); + } + const data = (await res.json()) as TwitchTokenResponse; + return { + accessToken: data.access_token, + refreshToken: data.refresh_token, + expiresIn: data.expires_in, + }; +} + +export async function fetchTwitchProfile(accessToken: string): Promise<{ + accountId: string; + displayName: string; + avatarUrl: string | null; +}> { + const res = await fetch('https://api.twitch.tv/helix/users', { + headers: { + Authorization: `Bearer ${accessToken}`, + 'Client-Id': config.twitch.clientId, + }, + }); + if (!res.ok) { + const body = await res.text(); + throw new Error(`Twitch profile fetch failed: ${res.status} ${body}`); + } + const data = (await res.json()) as { data: TwitchUser[] }; + const user = data.data[0]; + if (!user) throw new Error('Twitch returned no user data'); + return { + accountId: user.id, + displayName: user.display_name, + avatarUrl: user.profile_image_url ?? null, + }; +} + +export async function revokeTwitchToken(accessToken: string): Promise { + await fetch('https://id.twitch.tv/oauth2/revoke', { + method: 'POST', + headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, + body: new URLSearchParams({ + client_id: config.twitch.clientId, + token: accessToken, + }), + }); +} + +// ── Twitch Helix Stream Management ────────────────────── + +export async function getTwitchStreamKey( + accessToken: string, + broadcasterId: string, +): Promise { + const res = await fetch( + `https://api.twitch.tv/helix/streams/key?broadcaster_id=${broadcasterId}`, + { + headers: { + Authorization: `Bearer ${accessToken}`, + 'Client-Id': config.twitch.clientId, + }, + }, + ); + if (!res.ok) { + const body = await res.text(); + throw new Error(`Twitch get stream key failed: ${res.status} ${body}`); + } + const data = (await res.json()) as { data: [{ stream_key: string }] }; + return data.data[0].stream_key; +} + +export async function updateTwitchChannel( + accessToken: string, + broadcasterId: string, + title: string, + gameId: string, + tags: string[], +): Promise { + const body: Record = { + title, + }; + if (gameId) body.game_id = gameId; + if (tags.length > 0) body.tags = tags; + + const res = await fetch( + `https://api.twitch.tv/helix/channels?broadcaster_id=${broadcasterId}`, + { + method: 'PATCH', + headers: { + Authorization: `Bearer ${accessToken}`, + 'Client-Id': config.twitch.clientId, + 'Content-Type': 'application/json', + }, + body: JSON.stringify(body), + }, + ); + if (!res.ok) { + const body = await res.text(); + throw new Error(`Twitch update channel failed: ${res.status} ${body}`); + } +} diff --git a/src/services/youtube.service.ts b/src/services/youtube.service.ts new file mode 100644 index 0000000..9be0828 --- /dev/null +++ b/src/services/youtube.service.ts @@ -0,0 +1,221 @@ +import { config } from '../config.js'; + +interface GoogleTokenResponse { + access_token: string; + refresh_token?: string; + expires_in: number; + token_type: string; +} + +interface GoogleUserInfo { + sub: string; + name: string; + picture?: string; + email?: string; +} + +const SCOPES = [ + 'https://www.googleapis.com/auth/youtube', + 'https://www.googleapis.com/auth/youtube.force-ssl', + 'openid', + 'profile', +].join(' '); + +export function getYouTubeAuthUrl(state: string): string { + const params = new URLSearchParams({ + client_id: config.youtube.clientId, + redirect_uri: config.youtube.redirectUri, + response_type: 'code', + scope: SCOPES, + access_type: 'offline', + prompt: 'consent', + state, + }); + return `https://accounts.google.com/o/oauth2/v2/auth?${params}`; +} + +export async function exchangeYouTubeCode(code: string): Promise<{ + accessToken: string; + refreshToken: string; + expiresIn: number; +}> { + const res = await fetch('https://oauth2.googleapis.com/token', { + method: 'POST', + headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, + body: new URLSearchParams({ + client_id: config.youtube.clientId, + client_secret: config.youtube.clientSecret, + code, + grant_type: 'authorization_code', + redirect_uri: config.youtube.redirectUri, + }), + }); + if (!res.ok) { + const body = await res.text(); + throw new Error(`YouTube token exchange failed: ${res.status} ${body}`); + } + const data = (await res.json()) as GoogleTokenResponse; + if (!data.refresh_token) { + throw new Error('YouTube did not return a refresh token. User may need to revoke and re-link.'); + } + return { + accessToken: data.access_token, + refreshToken: data.refresh_token, + expiresIn: data.expires_in, + }; +} + +export async function refreshYouTubeToken(refreshToken: string): Promise<{ + accessToken: string; + expiresIn: number; +}> { + const res = await fetch('https://oauth2.googleapis.com/token', { + method: 'POST', + headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, + body: new URLSearchParams({ + client_id: config.youtube.clientId, + client_secret: config.youtube.clientSecret, + refresh_token: refreshToken, + grant_type: 'refresh_token', + }), + }); + if (!res.ok) { + const body = await res.text(); + throw new Error(`YouTube token refresh failed: ${res.status} ${body}`); + } + const data = (await res.json()) as GoogleTokenResponse; + return { accessToken: data.access_token, expiresIn: data.expires_in }; +} + +export async function fetchYouTubeProfile(accessToken: string): Promise<{ + accountId: string; + displayName: string; + avatarUrl: string | null; +}> { + const res = await fetch('https://www.googleapis.com/oauth2/v3/userinfo', { + headers: { Authorization: `Bearer ${accessToken}` }, + }); + if (!res.ok) { + const body = await res.text(); + throw new Error(`YouTube profile fetch failed: ${res.status} ${body}`); + } + const data = (await res.json()) as GoogleUserInfo; + return { + accountId: data.sub, + displayName: data.name, + avatarUrl: data.picture ?? null, + }; +} + +export async function revokeYouTubeToken(token: string): Promise { + await fetch(`https://oauth2.googleapis.com/revoke?token=${token}`, { + method: 'POST', + headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, + }); +} + +// ── YouTube Live Streaming API ─────────────────────────── + +export interface YouTubeBroadcast { + id: string; + rtmpUrl: string; + streamKey: string; +} + +export async function createYouTubeBroadcast( + accessToken: string, + title: string, + description: string, + privacyStatus: string, +): Promise { + // Create liveBroadcast + const broadcastRes = await fetch( + 'https://www.googleapis.com/youtube/v3/liveBroadcasts?part=snippet,status,contentDetails', + { + method: 'POST', + headers: { + Authorization: `Bearer ${accessToken}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + snippet: { + title, + description, + scheduledStartTime: new Date().toISOString(), + }, + status: { privacyStatus }, + contentDetails: { + enableAutoStart: true, + enableAutoStop: true, + }, + }), + }, + ); + if (!broadcastRes.ok) { + const body = await broadcastRes.text(); + throw new Error(`YouTube create broadcast failed: ${broadcastRes.status} ${body}`); + } + const broadcast = (await broadcastRes.json()) as any; + + // Create liveStream + const streamRes = await fetch( + 'https://www.googleapis.com/youtube/v3/liveStreams?part=snippet,cdn', + { + method: 'POST', + headers: { + Authorization: `Bearer ${accessToken}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + snippet: { title: `${title} - stream` }, + cdn: { + frameRate: 'variable', + ingestionType: 'rtmp', + resolution: 'variable', + }, + }), + }, + ); + if (!streamRes.ok) { + const body = await streamRes.text(); + throw new Error(`YouTube create stream failed: ${streamRes.status} ${body}`); + } + const stream = (await streamRes.json()) as any; + + // Bind stream to broadcast + const bindRes = await fetch( + `https://www.googleapis.com/youtube/v3/liveBroadcasts/bind?id=${broadcast.id}&part=id&streamId=${stream.id}`, + { + method: 'POST', + headers: { Authorization: `Bearer ${accessToken}` }, + }, + ); + if (!bindRes.ok) { + const body = await bindRes.text(); + throw new Error(`YouTube bind broadcast failed: ${bindRes.status} ${body}`); + } + + return { + id: broadcast.id, + rtmpUrl: stream.cdn.ingestionInfo.ingestionAddress, + streamKey: stream.cdn.ingestionInfo.streamName, + }; +} + +export async function transitionYouTubeBroadcast( + accessToken: string, + broadcastId: string, + status: 'live' | 'complete', +): Promise { + const res = await fetch( + `https://www.googleapis.com/youtube/v3/liveBroadcasts/transition?broadcastStatus=${status}&id=${broadcastId}&part=status`, + { + method: 'POST', + headers: { Authorization: `Bearer ${accessToken}` }, + }, + ); + if (!res.ok) { + const body = await res.text(); + throw new Error(`YouTube transition broadcast failed: ${res.status} ${body}`); + } +}