diff --git a/prisma/schema.prisma b/prisma/schema.prisma index a30a8f8..05c31b1 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -49,7 +49,7 @@ model LinkedAccount { createdAt DateTime @default(now()) updatedAt DateTime @updatedAt - @@unique([userId, serviceId]) + @@unique([userId, serviceId, accountId]) @@index([userId]) } @@ -67,19 +67,20 @@ model StreamPlan { } model StreamDestination { - id String @id @default(uuid()) - planId String - plan StreamPlan @relation(fields: [planId], references: [id], onDelete: Cascade) - serviceId String - title String - description String @default("") - privacyStatus String @default("public") - gameId String @default("") - tags String @default("") - rtmpUrl String @default("") - streamKey String @default("") - broadcastId String @default("") - status String @default("PENDING") + id String @id @default(uuid()) + planId String + plan StreamPlan @relation(fields: [planId], references: [id], onDelete: Cascade) + serviceId String + linkedAccountId String @default("") + title String + description String @default("") + privacyStatus String @default("public") + gameId String @default("") + tags String @default("") + rtmpUrl String @default("") + streamKey String @default("") + broadcastId String @default("") + status String @default("PENDING") @@index([planId]) } diff --git a/src/routes/providers/accounts.ts b/src/routes/providers/accounts.ts index 0b30d9a..4fb58cc 100644 --- a/src/routes/providers/accounts.ts +++ b/src/routes/providers/accounts.ts @@ -26,27 +26,23 @@ const accountRoutes: FastifyPluginAsync = async (fastify) => { return response; }); - // DELETE /providers/:serviceId — revoke tokens and unlink - fastify.delete<{ Params: { serviceId: string } }>('/providers/:serviceId', { + // DELETE /providers/accounts/:id — revoke tokens and unlink by account ID + fastify.delete<{ Params: { id: string } }>('/providers/accounts/:id', { preHandler: [requireAuth], schema: { params: { type: 'object', - required: ['serviceId'], + required: ['id'], properties: { - serviceId: { type: 'string', enum: ['YOUTUBE', 'TWITCH'] }, + id: { type: 'string' }, }, }, }, }, async (request, reply) => { - const { serviceId } = request.params; - - const account = await fastify.prisma.linkedAccount.findUnique({ + const account = await fastify.prisma.linkedAccount.findFirst({ where: { - userId_serviceId: { - userId: request.userId, - serviceId, - }, + id: request.params.id, + userId: request.userId, }, }); @@ -57,7 +53,7 @@ const accountRoutes: FastifyPluginAsync = async (fastify) => { // Best-effort revoke tokens at the provider try { const accessToken = decrypt(account.accessTokenEnc, account.accessTokenIv); - if (serviceId === 'YOUTUBE') { + if (account.serviceId === 'YOUTUBE') { await revokeYouTubeToken(accessToken); } else { await revokeTwitchToken(accessToken); diff --git a/src/routes/providers/twitch.ts b/src/routes/providers/twitch.ts index 674d767..cfb9243 100644 --- a/src/routes/providers/twitch.ts +++ b/src/routes/providers/twitch.ts @@ -90,17 +90,19 @@ const twitchRoutes: FastifyPluginAsync = async (fastify) => { const accessEnc = encrypt(tokens.accessToken); const refreshEnc = encrypt(tokens.refreshToken); - // Upsert linked account + // Upsert linked account — keyed on (userId, serviceId, accountId) so + // re-linking the same Twitch account updates it, while linking a + // different Twitch account creates a new record. const account = await fastify.prisma.linkedAccount.upsert({ where: { - userId_serviceId: { + userId_serviceId_accountId: { userId: request.userId, serviceId: 'TWITCH', + accountId: profile.accountId, }, }, update: { displayName: profile.displayName, - accountId: profile.accountId, avatarUrl: profile.avatarUrl, accessTokenEnc: accessEnc.ciphertext, refreshTokenEnc: refreshEnc.ciphertext, diff --git a/src/routes/providers/youtube.ts b/src/routes/providers/youtube.ts index 64d42af..7cd0ae4 100644 --- a/src/routes/providers/youtube.ts +++ b/src/routes/providers/youtube.ts @@ -91,17 +91,19 @@ const youtubeRoutes: FastifyPluginAsync = async (fastify) => { const accessEnc = encrypt(tokens.accessToken); const refreshEnc = encrypt(tokens.refreshToken); - // Upsert linked account + // Upsert linked account — keyed on (userId, serviceId, accountId) so + // re-linking the same Google account updates it, while linking a + // different Google account creates a new record. const account = await fastify.prisma.linkedAccount.upsert({ where: { - userId_serviceId: { + userId_serviceId_accountId: { userId: request.userId, serviceId: 'YOUTUBE', + accountId: profile.accountId, }, }, update: { displayName: profile.displayName, - accountId: profile.accountId, avatarUrl: profile.avatarUrl, accessTokenEnc: accessEnc.ciphertext, refreshTokenEnc: refreshEnc.ciphertext, diff --git a/src/routes/streams/lifecycle.ts b/src/routes/streams/lifecycle.ts index 37a465c..92fec73 100644 --- a/src/routes/streams/lifecycle.ts +++ b/src/routes/streams/lifecycle.ts @@ -16,15 +16,15 @@ import type { PrepareResponse, PreparedDestination } from '../../types/api.js'; const TWITCH_RTMP_URL = 'rtmp://live.twitch.tv/app'; -async function getDecryptedToken( +async function getDecryptedTokenByAccountId( prisma: any, userId: string, - serviceId: string, + linkedAccountId: string, ): Promise<{ account: any; accessToken: string }> { - const account = await prisma.linkedAccount.findUnique({ - where: { userId_serviceId: { userId, serviceId } }, + const account = await prisma.linkedAccount.findFirst({ + where: { id: linkedAccountId, userId }, }); - if (!account) throw new AppError(400, `No ${serviceId} account linked`); + if (!account) throw new AppError(400, `Linked account ${linkedAccountId} not found`); // Lazy refresh if token is expired or about to expire if (account.tokenExpiresAt < new Date(Date.now() + 60 * 1000)) { @@ -33,7 +33,7 @@ async function getDecryptedToken( let newRefresh: string | undefined; let expiresIn: number; - if (serviceId === 'YOUTUBE') { + if (account.serviceId === 'YOUTUBE') { const result = await refreshYouTubeToken(refreshToken); newAccess = result.accessToken; expiresIn = result.expiresIn; @@ -87,6 +87,22 @@ const lifecycleRoutes: FastifyPluginAsync = async (fastify) => { include: { destinations: true }, }); if (!plan) throw new AppError(404, 'Stream plan not found'); + + // If already READY, return the existing prepared data + if (plan.status === 'READY') { + const response: PrepareResponse = { + planId: plan.id, + destinations: plan.destinations.map((dest) => ({ + id: dest.id, + serviceId: dest.serviceId as 'YOUTUBE' | 'TWITCH', + rtmpUrl: dest.rtmpUrl || '', + streamKey: dest.streamKey || '', + broadcastId: dest.broadcastId || '', + })), + }; + return response; + } + if (plan.status !== 'DRAFT') { throw new AppError(400, `Plan is already ${plan.status}`); } @@ -94,10 +110,10 @@ const lifecycleRoutes: FastifyPluginAsync = async (fastify) => { const prepared: PreparedDestination[] = []; for (const dest of plan.destinations) { - const { account, accessToken } = await getDecryptedToken( + const { account, accessToken } = await getDecryptedTokenByAccountId( fastify.prisma, request.userId, - dest.serviceId, + dest.linkedAccountId, ); if (dest.serviceId === 'YOUTUBE') { @@ -119,6 +135,7 @@ const lifecycleRoutes: FastifyPluginAsync = async (fastify) => { }); prepared.push({ + id: dest.id, serviceId: 'YOUTUBE', rtmpUrl: broadcast.rtmpUrl, streamKey: broadcast.streamKey, @@ -149,6 +166,7 @@ const lifecycleRoutes: FastifyPluginAsync = async (fastify) => { }); prepared.push({ + id: dest.id, serviceId: 'TWITCH', rtmpUrl: TWITCH_RTMP_URL, streamKey, @@ -185,18 +203,34 @@ const lifecycleRoutes: FastifyPluginAsync = async (fastify) => { include: { destinations: true }, }); if (!plan) throw new AppError(404, 'Stream plan not found'); + + // Idempotent: already LIVE is fine + if (plan.status === 'LIVE') { + return { success: true, status: 'LIVE' }; + } + if (plan.status !== 'READY') { throw new AppError(400, `Plan must be READY to start, currently ${plan.status}`); } for (const dest of plan.destinations) { if (dest.serviceId === 'YOUTUBE' && dest.broadcastId) { - const { accessToken } = await getDecryptedToken( - fastify.prisma, - request.userId, - 'YOUTUBE', - ); - await transitionYouTubeBroadcast(accessToken, dest.broadcastId, 'live'); + // Broadcast was created with enableAutoStart: true, so YouTube + // auto-transitions to 'live' when RTMP data arrives. A manual + // transition can fail if the broadcast is still transitioning + // (e.g. in 'testing' state). Wrap in try-catch so the plan + // status always gets updated to LIVE. + try { + const { accessToken } = await getDecryptedTokenByAccountId( + fastify.prisma, + request.userId, + dest.linkedAccountId, + ); + await transitionYouTubeBroadcast(accessToken, dest.broadcastId, 'live'); + } catch (err) { + fastify.log.warn({ err, broadcastId: dest.broadcastId }, + 'YouTube live transition failed (autoStart may have handled it)'); + } } // Twitch goes live automatically when RTMP stream is received @@ -230,21 +264,33 @@ const lifecycleRoutes: FastifyPluginAsync = async (fastify) => { include: { destinations: true }, }); if (!plan) throw new AppError(404, 'Stream plan not found'); - if (plan.status !== 'LIVE') { - throw new AppError(400, `Plan must be LIVE to end, currently ${plan.status}`); + + // Idempotent: already ENDED is fine + if (plan.status === 'ENDED') { + return { success: true, status: 'ENDED' }; + } + + if (plan.status !== 'LIVE' && plan.status !== 'READY') { + throw new AppError(400, `Plan must be LIVE or READY to end, currently ${plan.status}`); } for (const dest of plan.destinations) { if (dest.serviceId === 'YOUTUBE' && dest.broadcastId) { - const { accessToken } = await getDecryptedToken( - fastify.prisma, - request.userId, - 'YOUTUBE', - ); + // Always try to end the YouTube broadcast regardless of plan status. + // The broadcast may be live via enableAutoStart even if our DB status + // is still READY (e.g. if the /start transition failed). try { + const { accessToken } = await getDecryptedTokenByAccountId( + fastify.prisma, + request.userId, + dest.linkedAccountId, + ); await transitionYouTubeBroadcast(accessToken, dest.broadcastId, 'complete'); - } catch { - // Non-fatal — stream may already be ended + } catch (err) { + // Non-fatal — broadcast may already be complete or still transitioning. + // enableAutoStop will handle it when RTMP disconnects. + fastify.log.warn({ err, broadcastId: dest.broadcastId }, + 'YouTube complete transition failed'); } } // Twitch ends when RTMP stream stops diff --git a/src/routes/streams/plans.ts b/src/routes/streams/plans.ts index a5a7a38..b71e368 100644 --- a/src/routes/streams/plans.ts +++ b/src/routes/streams/plans.ts @@ -1,6 +1,8 @@ import { FastifyPluginAsync } from 'fastify'; import { requireAuth } from '../../middleware/require-auth.js'; import { AppError } from '../../plugins/error-handler.js'; +import { getYouTubeBroadcastStatus, refreshYouTubeToken } from '../../services/youtube.service.js'; +import { decrypt, encrypt } from '../../services/crypto.service.js'; import type { CreateStreamPlanBody, StreamPlanResponse, StreamDestinationResponse } from '../../types/api.js'; function formatPlan(plan: any): StreamPlanResponse { @@ -13,6 +15,7 @@ function formatPlan(plan: any): StreamPlanResponse { destinations: (plan.destinations ?? []).map((d: any): StreamDestinationResponse => ({ id: d.id, serviceId: d.serviceId, + linkedAccountId: d.linkedAccountId, title: d.title, description: d.description, privacyStatus: d.privacyStatus, @@ -27,6 +30,65 @@ function formatPlan(plan: any): StreamPlanResponse { } const planRoutes: FastifyPluginAsync = async (fastify) => { + + /** Check YouTube broadcast status and auto-end plans that are over */ + async function autoDetectEndedPlans(plans: any[], userId: string) { + // Cache decrypted tokens by linkedAccountId to avoid redundant decrypts + const tokenCache = new Map(); + + for (const plan of plans) { + if (plan.status !== 'LIVE' && plan.status !== 'READY') continue; + const ytDest = plan.destinations.find( + (d: any) => d.serviceId === 'YOUTUBE' && d.broadcastId, + ); + if (!ytDest) continue; + + try { + let ytAccessToken = tokenCache.get(ytDest.linkedAccountId); + if (!ytAccessToken) { + const account = await fastify.prisma.linkedAccount.findFirst({ + where: { id: ytDest.linkedAccountId, userId }, + }); + if (!account) continue; + + ytAccessToken = decrypt(account.accessTokenEnc, account.accessTokenIv); + if (account.tokenExpiresAt < new Date(Date.now() + 60_000)) { + const refreshToken = decrypt(account.refreshTokenEnc, account.refreshTokenIv); + const result = await refreshYouTubeToken(refreshToken); + ytAccessToken = result.accessToken; + const accessEnc = encrypt(ytAccessToken); + await fastify.prisma.linkedAccount.update({ + where: { id: account.id }, + data: { + accessTokenEnc: accessEnc.ciphertext, + accessTokenIv: accessEnc.iv, + tokenExpiresAt: new Date(Date.now() + result.expiresIn * 1000), + }, + }); + } + tokenCache.set(ytDest.linkedAccountId, ytAccessToken); + } + + const ytStatus = await getYouTubeBroadcastStatus(ytAccessToken, ytDest.broadcastId!); + if (ytStatus === 'complete' || ytStatus === 'revoked') { + for (const dest of plan.destinations) { + await fastify.prisma.streamDestination.update({ + where: { id: dest.id }, + data: { status: 'ENDED' }, + }); + } + await fastify.prisma.streamPlan.update({ + where: { id: plan.id }, + data: { status: 'ENDED' }, + }); + plan.status = 'ENDED'; + } + } catch { + // Non-fatal + } + } + } + // GET /streams/plans — list plans fastify.get('/streams/plans', { preHandler: [requireAuth], @@ -36,6 +98,9 @@ const planRoutes: FastifyPluginAsync = async (fastify) => { include: { destinations: true }, orderBy: { createdAt: 'desc' }, }); + + await autoDetectEndedPlans(plans, request.userId); + return plans.map(formatPlan); }); @@ -50,13 +115,12 @@ const planRoutes: FastifyPluginAsync = async (fastify) => { name: { type: 'string', minLength: 1, maxLength: 200 }, destinations: { type: 'array', - minItems: 1, maxItems: 10, items: { type: 'object', - required: ['serviceId', 'title'], + required: ['linkedAccountId', 'title'], properties: { - serviceId: { type: 'string', enum: ['YOUTUBE', 'TWITCH'] }, + linkedAccountId: { type: 'string', minLength: 1 }, title: { type: 'string', minLength: 1, maxLength: 200 }, description: { type: 'string', maxLength: 5000 }, privacyStatus: { type: 'string', enum: ['public', 'unlisted', 'private'] }, @@ -77,11 +141,28 @@ const planRoutes: FastifyPluginAsync = async (fastify) => { const linkedAccounts = await fastify.prisma.linkedAccount.findMany({ where: { userId: request.userId }, }); - const linkedServices = new Set(linkedAccounts.map((a) => a.serviceId)); + const linkedAccountMap = new Map(linkedAccounts.map((a) => [a.id, a])); - for (const dest of destinations) { - if (!linkedServices.has(dest.serviceId)) { - throw new AppError(400, `No ${dest.serviceId} account linked`); + // If no destinations provided, auto-create one per linked account + const resolvedDestinations = destinations.length > 0 + ? destinations + : linkedAccounts.map((a) => ({ + linkedAccountId: a.id, + title: name, + description: '', + privacyStatus: 'unlisted' as const, + gameId: '', + tags: '', + })); + + if (resolvedDestinations.length === 0) { + throw new AppError(400, 'No destinations and no linked accounts available'); + } + + for (const dest of resolvedDestinations) { + const account = linkedAccountMap.get(dest.linkedAccountId); + if (!account) { + throw new AppError(400, `Linked account ${dest.linkedAccountId} not found`); } } @@ -90,14 +171,18 @@ const planRoutes: FastifyPluginAsync = async (fastify) => { userId: request.userId, name, destinations: { - create: destinations.map((d) => ({ - serviceId: d.serviceId, - title: d.title, - description: d.description ?? '', - privacyStatus: d.privacyStatus ?? 'public', - gameId: d.gameId ?? '', - tags: d.tags ?? '', - })), + create: resolvedDestinations.map((d) => { + const account = linkedAccountMap.get(d.linkedAccountId)!; + return { + serviceId: account.serviceId, + linkedAccountId: d.linkedAccountId, + title: d.title, + description: d.description ?? '', + privacyStatus: d.privacyStatus ?? 'unlisted', + gameId: d.gameId ?? '', + tags: d.tags ?? '', + }; + }), }, }, include: { destinations: true }, @@ -122,6 +207,9 @@ const planRoutes: FastifyPluginAsync = async (fastify) => { include: { destinations: true }, }); if (!plan) throw new AppError(404, 'Stream plan not found'); + + await autoDetectEndedPlans([plan], request.userId); + return formatPlan(plan); }); diff --git a/src/types/api.ts b/src/types/api.ts index 168ff7d..2153ecd 100644 --- a/src/types/api.ts +++ b/src/types/api.ts @@ -48,7 +48,7 @@ export interface CreateStreamPlanBody { } export interface CreateDestinationBody { - serviceId: string; + linkedAccountId: string; title: string; description?: string; privacyStatus?: string; @@ -68,6 +68,7 @@ export interface StreamPlanResponse { export interface StreamDestinationResponse { id: string; serviceId: string; + linkedAccountId: string; title: string; description: string; privacyStatus: string; @@ -85,6 +86,7 @@ export interface PrepareResponse { } export interface PreparedDestination { + id: string; serviceId: string; rtmpUrl: string; streamKey: string;