Multi-account support and streaming fixes

- Change LinkedAccount unique constraint to (userId, serviceId, accountId)
- Add linkedAccountId to StreamDestination for per-account targeting
- OAuth callbacks upsert by accountId so different accounts create new rows
- Delete endpoint changed to /providers/accounts/:id
- getDecryptedToken resolves tokens by linkedAccountId instead of serviceId
- /start transition wrapped in try-catch (enableAutoStart compatibility)
- /end always attempts YouTube complete transition regardless of plan status
- autoDetectEndedPlans loads tokens per-destination
This commit is contained in:
2026-02-26 19:06:05 +01:00
parent 7351003c6b
commit cff7cdc58a
7 changed files with 208 additions and 71 deletions

View File

@@ -49,7 +49,7 @@ model LinkedAccount {
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@unique([userId, serviceId])
@@unique([userId, serviceId, accountId])
@@index([userId])
}
@@ -67,19 +67,20 @@ model StreamPlan {
}
model StreamDestination {
id String @id @default(uuid())
planId String
plan StreamPlan @relation(fields: [planId], references: [id], onDelete: Cascade)
serviceId String
title String
description String @default("")
privacyStatus String @default("public")
gameId String @default("")
tags String @default("")
rtmpUrl String @default("")
streamKey String @default("")
broadcastId String @default("")
status String @default("PENDING")
id String @id @default(uuid())
planId String
plan StreamPlan @relation(fields: [planId], references: [id], onDelete: Cascade)
serviceId String
linkedAccountId String @default("")
title String
description String @default("")
privacyStatus String @default("public")
gameId String @default("")
tags String @default("")
rtmpUrl String @default("")
streamKey String @default("")
broadcastId String @default("")
status String @default("PENDING")
@@index([planId])
}

View File

@@ -26,27 +26,23 @@ const accountRoutes: FastifyPluginAsync = async (fastify) => {
return response;
});
// DELETE /providers/:serviceId — revoke tokens and unlink
fastify.delete<{ Params: { serviceId: string } }>('/providers/:serviceId', {
// DELETE /providers/accounts/:id — revoke tokens and unlink by account ID
fastify.delete<{ Params: { id: string } }>('/providers/accounts/:id', {
preHandler: [requireAuth],
schema: {
params: {
type: 'object',
required: ['serviceId'],
required: ['id'],
properties: {
serviceId: { type: 'string', enum: ['YOUTUBE', 'TWITCH'] },
id: { type: 'string' },
},
},
},
}, async (request, reply) => {
const { serviceId } = request.params;
const account = await fastify.prisma.linkedAccount.findUnique({
const account = await fastify.prisma.linkedAccount.findFirst({
where: {
userId_serviceId: {
userId: request.userId,
serviceId,
},
id: request.params.id,
userId: request.userId,
},
});
@@ -57,7 +53,7 @@ const accountRoutes: FastifyPluginAsync = async (fastify) => {
// Best-effort revoke tokens at the provider
try {
const accessToken = decrypt(account.accessTokenEnc, account.accessTokenIv);
if (serviceId === 'YOUTUBE') {
if (account.serviceId === 'YOUTUBE') {
await revokeYouTubeToken(accessToken);
} else {
await revokeTwitchToken(accessToken);

View File

@@ -90,17 +90,19 @@ const twitchRoutes: FastifyPluginAsync = async (fastify) => {
const accessEnc = encrypt(tokens.accessToken);
const refreshEnc = encrypt(tokens.refreshToken);
// Upsert linked account
// Upsert linked account — keyed on (userId, serviceId, accountId) so
// re-linking the same Twitch account updates it, while linking a
// different Twitch account creates a new record.
const account = await fastify.prisma.linkedAccount.upsert({
where: {
userId_serviceId: {
userId_serviceId_accountId: {
userId: request.userId,
serviceId: 'TWITCH',
accountId: profile.accountId,
},
},
update: {
displayName: profile.displayName,
accountId: profile.accountId,
avatarUrl: profile.avatarUrl,
accessTokenEnc: accessEnc.ciphertext,
refreshTokenEnc: refreshEnc.ciphertext,

View File

@@ -91,17 +91,19 @@ const youtubeRoutes: FastifyPluginAsync = async (fastify) => {
const accessEnc = encrypt(tokens.accessToken);
const refreshEnc = encrypt(tokens.refreshToken);
// Upsert linked account
// Upsert linked account — keyed on (userId, serviceId, accountId) so
// re-linking the same Google account updates it, while linking a
// different Google account creates a new record.
const account = await fastify.prisma.linkedAccount.upsert({
where: {
userId_serviceId: {
userId_serviceId_accountId: {
userId: request.userId,
serviceId: 'YOUTUBE',
accountId: profile.accountId,
},
},
update: {
displayName: profile.displayName,
accountId: profile.accountId,
avatarUrl: profile.avatarUrl,
accessTokenEnc: accessEnc.ciphertext,
refreshTokenEnc: refreshEnc.ciphertext,

View File

@@ -16,15 +16,15 @@ import type { PrepareResponse, PreparedDestination } from '../../types/api.js';
const TWITCH_RTMP_URL = 'rtmp://live.twitch.tv/app';
async function getDecryptedToken(
async function getDecryptedTokenByAccountId(
prisma: any,
userId: string,
serviceId: string,
linkedAccountId: string,
): Promise<{ account: any; accessToken: string }> {
const account = await prisma.linkedAccount.findUnique({
where: { userId_serviceId: { userId, serviceId } },
const account = await prisma.linkedAccount.findFirst({
where: { id: linkedAccountId, userId },
});
if (!account) throw new AppError(400, `No ${serviceId} account linked`);
if (!account) throw new AppError(400, `Linked account ${linkedAccountId} not found`);
// Lazy refresh if token is expired or about to expire
if (account.tokenExpiresAt < new Date(Date.now() + 60 * 1000)) {
@@ -33,7 +33,7 @@ async function getDecryptedToken(
let newRefresh: string | undefined;
let expiresIn: number;
if (serviceId === 'YOUTUBE') {
if (account.serviceId === 'YOUTUBE') {
const result = await refreshYouTubeToken(refreshToken);
newAccess = result.accessToken;
expiresIn = result.expiresIn;
@@ -87,6 +87,22 @@ const lifecycleRoutes: FastifyPluginAsync = async (fastify) => {
include: { destinations: true },
});
if (!plan) throw new AppError(404, 'Stream plan not found');
// If already READY, return the existing prepared data
if (plan.status === 'READY') {
const response: PrepareResponse = {
planId: plan.id,
destinations: plan.destinations.map((dest) => ({
id: dest.id,
serviceId: dest.serviceId as 'YOUTUBE' | 'TWITCH',
rtmpUrl: dest.rtmpUrl || '',
streamKey: dest.streamKey || '',
broadcastId: dest.broadcastId || '',
})),
};
return response;
}
if (plan.status !== 'DRAFT') {
throw new AppError(400, `Plan is already ${plan.status}`);
}
@@ -94,10 +110,10 @@ const lifecycleRoutes: FastifyPluginAsync = async (fastify) => {
const prepared: PreparedDestination[] = [];
for (const dest of plan.destinations) {
const { account, accessToken } = await getDecryptedToken(
const { account, accessToken } = await getDecryptedTokenByAccountId(
fastify.prisma,
request.userId,
dest.serviceId,
dest.linkedAccountId,
);
if (dest.serviceId === 'YOUTUBE') {
@@ -119,6 +135,7 @@ const lifecycleRoutes: FastifyPluginAsync = async (fastify) => {
});
prepared.push({
id: dest.id,
serviceId: 'YOUTUBE',
rtmpUrl: broadcast.rtmpUrl,
streamKey: broadcast.streamKey,
@@ -149,6 +166,7 @@ const lifecycleRoutes: FastifyPluginAsync = async (fastify) => {
});
prepared.push({
id: dest.id,
serviceId: 'TWITCH',
rtmpUrl: TWITCH_RTMP_URL,
streamKey,
@@ -185,18 +203,34 @@ const lifecycleRoutes: FastifyPluginAsync = async (fastify) => {
include: { destinations: true },
});
if (!plan) throw new AppError(404, 'Stream plan not found');
// Idempotent: already LIVE is fine
if (plan.status === 'LIVE') {
return { success: true, status: 'LIVE' };
}
if (plan.status !== 'READY') {
throw new AppError(400, `Plan must be READY to start, currently ${plan.status}`);
}
for (const dest of plan.destinations) {
if (dest.serviceId === 'YOUTUBE' && dest.broadcastId) {
const { accessToken } = await getDecryptedToken(
fastify.prisma,
request.userId,
'YOUTUBE',
);
await transitionYouTubeBroadcast(accessToken, dest.broadcastId, 'live');
// Broadcast was created with enableAutoStart: true, so YouTube
// auto-transitions to 'live' when RTMP data arrives. A manual
// transition can fail if the broadcast is still transitioning
// (e.g. in 'testing' state). Wrap in try-catch so the plan
// status always gets updated to LIVE.
try {
const { accessToken } = await getDecryptedTokenByAccountId(
fastify.prisma,
request.userId,
dest.linkedAccountId,
);
await transitionYouTubeBroadcast(accessToken, dest.broadcastId, 'live');
} catch (err) {
fastify.log.warn({ err, broadcastId: dest.broadcastId },
'YouTube live transition failed (autoStart may have handled it)');
}
}
// Twitch goes live automatically when RTMP stream is received
@@ -230,21 +264,33 @@ const lifecycleRoutes: FastifyPluginAsync = async (fastify) => {
include: { destinations: true },
});
if (!plan) throw new AppError(404, 'Stream plan not found');
if (plan.status !== 'LIVE') {
throw new AppError(400, `Plan must be LIVE to end, currently ${plan.status}`);
// Idempotent: already ENDED is fine
if (plan.status === 'ENDED') {
return { success: true, status: 'ENDED' };
}
if (plan.status !== 'LIVE' && plan.status !== 'READY') {
throw new AppError(400, `Plan must be LIVE or READY to end, currently ${plan.status}`);
}
for (const dest of plan.destinations) {
if (dest.serviceId === 'YOUTUBE' && dest.broadcastId) {
const { accessToken } = await getDecryptedToken(
fastify.prisma,
request.userId,
'YOUTUBE',
);
// Always try to end the YouTube broadcast regardless of plan status.
// The broadcast may be live via enableAutoStart even if our DB status
// is still READY (e.g. if the /start transition failed).
try {
const { accessToken } = await getDecryptedTokenByAccountId(
fastify.prisma,
request.userId,
dest.linkedAccountId,
);
await transitionYouTubeBroadcast(accessToken, dest.broadcastId, 'complete');
} catch {
// Non-fatal — stream may already be ended
} catch (err) {
// Non-fatal — broadcast may already be complete or still transitioning.
// enableAutoStop will handle it when RTMP disconnects.
fastify.log.warn({ err, broadcastId: dest.broadcastId },
'YouTube complete transition failed');
}
}
// Twitch ends when RTMP stream stops

View File

@@ -1,6 +1,8 @@
import { FastifyPluginAsync } from 'fastify';
import { requireAuth } from '../../middleware/require-auth.js';
import { AppError } from '../../plugins/error-handler.js';
import { getYouTubeBroadcastStatus, refreshYouTubeToken } from '../../services/youtube.service.js';
import { decrypt, encrypt } from '../../services/crypto.service.js';
import type { CreateStreamPlanBody, StreamPlanResponse, StreamDestinationResponse } from '../../types/api.js';
function formatPlan(plan: any): StreamPlanResponse {
@@ -13,6 +15,7 @@ function formatPlan(plan: any): StreamPlanResponse {
destinations: (plan.destinations ?? []).map((d: any): StreamDestinationResponse => ({
id: d.id,
serviceId: d.serviceId,
linkedAccountId: d.linkedAccountId,
title: d.title,
description: d.description,
privacyStatus: d.privacyStatus,
@@ -27,6 +30,65 @@ function formatPlan(plan: any): StreamPlanResponse {
}
const planRoutes: FastifyPluginAsync = async (fastify) => {
/** Check YouTube broadcast status and auto-end plans that are over */
async function autoDetectEndedPlans(plans: any[], userId: string) {
// Cache decrypted tokens by linkedAccountId to avoid redundant decrypts
const tokenCache = new Map<string, string>();
for (const plan of plans) {
if (plan.status !== 'LIVE' && plan.status !== 'READY') continue;
const ytDest = plan.destinations.find(
(d: any) => d.serviceId === 'YOUTUBE' && d.broadcastId,
);
if (!ytDest) continue;
try {
let ytAccessToken = tokenCache.get(ytDest.linkedAccountId);
if (!ytAccessToken) {
const account = await fastify.prisma.linkedAccount.findFirst({
where: { id: ytDest.linkedAccountId, userId },
});
if (!account) continue;
ytAccessToken = decrypt(account.accessTokenEnc, account.accessTokenIv);
if (account.tokenExpiresAt < new Date(Date.now() + 60_000)) {
const refreshToken = decrypt(account.refreshTokenEnc, account.refreshTokenIv);
const result = await refreshYouTubeToken(refreshToken);
ytAccessToken = result.accessToken;
const accessEnc = encrypt(ytAccessToken);
await fastify.prisma.linkedAccount.update({
where: { id: account.id },
data: {
accessTokenEnc: accessEnc.ciphertext,
accessTokenIv: accessEnc.iv,
tokenExpiresAt: new Date(Date.now() + result.expiresIn * 1000),
},
});
}
tokenCache.set(ytDest.linkedAccountId, ytAccessToken);
}
const ytStatus = await getYouTubeBroadcastStatus(ytAccessToken, ytDest.broadcastId!);
if (ytStatus === 'complete' || ytStatus === 'revoked') {
for (const dest of plan.destinations) {
await fastify.prisma.streamDestination.update({
where: { id: dest.id },
data: { status: 'ENDED' },
});
}
await fastify.prisma.streamPlan.update({
where: { id: plan.id },
data: { status: 'ENDED' },
});
plan.status = 'ENDED';
}
} catch {
// Non-fatal
}
}
}
// GET /streams/plans — list plans
fastify.get('/streams/plans', {
preHandler: [requireAuth],
@@ -36,6 +98,9 @@ const planRoutes: FastifyPluginAsync = async (fastify) => {
include: { destinations: true },
orderBy: { createdAt: 'desc' },
});
await autoDetectEndedPlans(plans, request.userId);
return plans.map(formatPlan);
});
@@ -50,13 +115,12 @@ const planRoutes: FastifyPluginAsync = async (fastify) => {
name: { type: 'string', minLength: 1, maxLength: 200 },
destinations: {
type: 'array',
minItems: 1,
maxItems: 10,
items: {
type: 'object',
required: ['serviceId', 'title'],
required: ['linkedAccountId', 'title'],
properties: {
serviceId: { type: 'string', enum: ['YOUTUBE', 'TWITCH'] },
linkedAccountId: { type: 'string', minLength: 1 },
title: { type: 'string', minLength: 1, maxLength: 200 },
description: { type: 'string', maxLength: 5000 },
privacyStatus: { type: 'string', enum: ['public', 'unlisted', 'private'] },
@@ -77,11 +141,28 @@ const planRoutes: FastifyPluginAsync = async (fastify) => {
const linkedAccounts = await fastify.prisma.linkedAccount.findMany({
where: { userId: request.userId },
});
const linkedServices = new Set(linkedAccounts.map((a) => a.serviceId));
const linkedAccountMap = new Map(linkedAccounts.map((a) => [a.id, a]));
for (const dest of destinations) {
if (!linkedServices.has(dest.serviceId)) {
throw new AppError(400, `No ${dest.serviceId} account linked`);
// If no destinations provided, auto-create one per linked account
const resolvedDestinations = destinations.length > 0
? destinations
: linkedAccounts.map((a) => ({
linkedAccountId: a.id,
title: name,
description: '',
privacyStatus: 'unlisted' as const,
gameId: '',
tags: '',
}));
if (resolvedDestinations.length === 0) {
throw new AppError(400, 'No destinations and no linked accounts available');
}
for (const dest of resolvedDestinations) {
const account = linkedAccountMap.get(dest.linkedAccountId);
if (!account) {
throw new AppError(400, `Linked account ${dest.linkedAccountId} not found`);
}
}
@@ -90,14 +171,18 @@ const planRoutes: FastifyPluginAsync = async (fastify) => {
userId: request.userId,
name,
destinations: {
create: destinations.map((d) => ({
serviceId: d.serviceId,
title: d.title,
description: d.description ?? '',
privacyStatus: d.privacyStatus ?? 'public',
gameId: d.gameId ?? '',
tags: d.tags ?? '',
})),
create: resolvedDestinations.map((d) => {
const account = linkedAccountMap.get(d.linkedAccountId)!;
return {
serviceId: account.serviceId,
linkedAccountId: d.linkedAccountId,
title: d.title,
description: d.description ?? '',
privacyStatus: d.privacyStatus ?? 'unlisted',
gameId: d.gameId ?? '',
tags: d.tags ?? '',
};
}),
},
},
include: { destinations: true },
@@ -122,6 +207,9 @@ const planRoutes: FastifyPluginAsync = async (fastify) => {
include: { destinations: true },
});
if (!plan) throw new AppError(404, 'Stream plan not found');
await autoDetectEndedPlans([plan], request.userId);
return formatPlan(plan);
});

View File

@@ -48,7 +48,7 @@ export interface CreateStreamPlanBody {
}
export interface CreateDestinationBody {
serviceId: string;
linkedAccountId: string;
title: string;
description?: string;
privacyStatus?: string;
@@ -68,6 +68,7 @@ export interface StreamPlanResponse {
export interface StreamDestinationResponse {
id: string;
serviceId: string;
linkedAccountId: string;
title: string;
description: string;
privacyStatus: string;
@@ -85,6 +86,7 @@ export interface PrepareResponse {
}
export interface PreparedDestination {
id: string;
serviceId: string;
rtmpUrl: string;
streamKey: string;