- Add POST /providers/accounts/custom-rtmp endpoint for saved RTMP servers - Encrypt rtmpUrl/streamKey in accessTokenEnc/refreshTokenEnc fields - Decrypt and return rtmpUrl/streamKey in GET /providers/accounts for CUSTOM_RTMP - Skip token revocation on DELETE for CUSTOM_RTMP accounts - Decrypt CUSTOM_RTMP credentials into CUSTOM destinations on plan create/update - Handle CUSTOM destinations in prepare lifecycle (already READY, skip provider auth) - Add debug logging for plan operations and user upsert
338 lines
11 KiB
TypeScript
338 lines
11 KiB
TypeScript
import { FastifyPluginAsync } from 'fastify';
|
|
import { requireAuth } from '../../middleware/require-auth.js';
|
|
import { decrypt, encrypt } from '../../services/crypto.service.js';
|
|
import {
|
|
createYouTubeBroadcast,
|
|
transitionYouTubeBroadcast,
|
|
refreshYouTubeToken,
|
|
} from '../../services/youtube.service.js';
|
|
import {
|
|
getTwitchStreamKey,
|
|
updateTwitchChannel,
|
|
refreshTwitchToken,
|
|
} from '../../services/twitch.service.js';
|
|
import { AppError } from '../../plugins/error-handler.js';
|
|
import type { PrepareResponse, PreparedDestination } from '../../types/api.js';
|
|
|
|
const TWITCH_RTMP_URL = 'rtmp://live.twitch.tv/app';
|
|
|
|
async function getDecryptedTokenByAccountId(
|
|
prisma: any,
|
|
userId: string,
|
|
linkedAccountId: string,
|
|
): Promise<{ account: any; accessToken: string }> {
|
|
const account = await prisma.linkedAccount.findFirst({
|
|
where: { id: linkedAccountId, userId },
|
|
});
|
|
if (!account) throw new AppError(400, `Linked account ${linkedAccountId} not found`);
|
|
|
|
// Lazy refresh if token is expired or about to expire
|
|
if (account.tokenExpiresAt < new Date(Date.now() + 60 * 1000)) {
|
|
const refreshToken = decrypt(account.refreshTokenEnc, account.refreshTokenIv);
|
|
let newAccess: string;
|
|
let newRefresh: string | undefined;
|
|
let expiresIn: number;
|
|
|
|
if (account.serviceId === 'YOUTUBE') {
|
|
const result = await refreshYouTubeToken(refreshToken);
|
|
newAccess = result.accessToken;
|
|
expiresIn = result.expiresIn;
|
|
} else {
|
|
const result = await refreshTwitchToken(refreshToken);
|
|
newAccess = result.accessToken;
|
|
newRefresh = result.refreshToken;
|
|
expiresIn = result.expiresIn;
|
|
}
|
|
|
|
const accessEnc = encrypt(newAccess);
|
|
const updateData: any = {
|
|
accessTokenEnc: accessEnc.ciphertext,
|
|
accessTokenIv: accessEnc.iv,
|
|
tokenExpiresAt: new Date(Date.now() + expiresIn * 1000),
|
|
};
|
|
if (newRefresh) {
|
|
const refreshEnc = encrypt(newRefresh);
|
|
updateData.refreshTokenEnc = refreshEnc.ciphertext;
|
|
updateData.refreshTokenIv = refreshEnc.iv;
|
|
}
|
|
|
|
await prisma.linkedAccount.update({
|
|
where: { id: account.id },
|
|
data: updateData,
|
|
});
|
|
|
|
return { account, accessToken: newAccess };
|
|
}
|
|
|
|
return {
|
|
account,
|
|
accessToken: decrypt(account.accessTokenEnc, account.accessTokenIv),
|
|
};
|
|
}
|
|
|
|
const lifecycleRoutes: FastifyPluginAsync = async (fastify) => {
|
|
// POST /streams/plans/:id/prepare — create broadcasts, get RTMP info
|
|
fastify.post<{ Params: { id: string } }>('/streams/plans/:id/prepare', {
|
|
preHandler: [requireAuth],
|
|
schema: {
|
|
params: {
|
|
type: 'object',
|
|
required: ['id'],
|
|
properties: { id: { type: 'string' } },
|
|
},
|
|
},
|
|
}, async (request) => {
|
|
request.log.info({ planId: request.params.id, userId: request.userId }, 'Prepare plan request');
|
|
const plan = await fastify.prisma.streamPlan.findFirst({
|
|
where: { id: request.params.id, userId: request.userId },
|
|
include: { destinations: true },
|
|
});
|
|
if (!plan) {
|
|
// Debug: check if plan exists under any user
|
|
const anyPlan = await fastify.prisma.streamPlan.findUnique({ where: { id: request.params.id } });
|
|
request.log.warn({ planId: request.params.id, userId: request.userId, existsUnderOtherUser: !!anyPlan, otherUserId: anyPlan?.userId }, 'Plan not found for user');
|
|
throw new AppError(404, 'Stream plan not found');
|
|
}
|
|
|
|
// If already READY, return the existing prepared data
|
|
if (plan.status === 'READY') {
|
|
const response: PrepareResponse = {
|
|
planId: plan.id,
|
|
destinations: plan.destinations.map((dest) => ({
|
|
id: dest.id,
|
|
serviceId: dest.serviceId as 'YOUTUBE' | 'TWITCH' | 'CUSTOM',
|
|
rtmpUrl: dest.rtmpUrl || '',
|
|
streamKey: dest.streamKey || '',
|
|
broadcastId: dest.broadcastId || '',
|
|
})),
|
|
};
|
|
return response;
|
|
}
|
|
|
|
if (plan.status !== 'DRAFT') {
|
|
throw new AppError(400, `Plan is already ${plan.status}`);
|
|
}
|
|
|
|
const prepared: PreparedDestination[] = [];
|
|
|
|
for (const dest of plan.destinations) {
|
|
// CUSTOM destinations are already READY with rtmpUrl/streamKey set at creation
|
|
if (dest.serviceId === 'CUSTOM') {
|
|
if (dest.status !== 'READY') {
|
|
await fastify.prisma.streamDestination.update({
|
|
where: { id: dest.id },
|
|
data: { status: 'READY' },
|
|
});
|
|
}
|
|
prepared.push({
|
|
id: dest.id,
|
|
serviceId: 'CUSTOM',
|
|
rtmpUrl: dest.rtmpUrl || '',
|
|
streamKey: dest.streamKey || '',
|
|
broadcastId: '',
|
|
});
|
|
continue;
|
|
}
|
|
|
|
const { account, accessToken } = await getDecryptedTokenByAccountId(
|
|
fastify.prisma,
|
|
request.userId,
|
|
dest.linkedAccountId,
|
|
);
|
|
|
|
if (dest.serviceId === 'YOUTUBE') {
|
|
const broadcast = await createYouTubeBroadcast(
|
|
accessToken,
|
|
dest.title,
|
|
dest.description,
|
|
dest.privacyStatus,
|
|
);
|
|
|
|
await fastify.prisma.streamDestination.update({
|
|
where: { id: dest.id },
|
|
data: {
|
|
rtmpUrl: broadcast.rtmpUrl,
|
|
streamKey: broadcast.streamKey,
|
|
broadcastId: broadcast.id,
|
|
status: 'READY',
|
|
},
|
|
});
|
|
|
|
prepared.push({
|
|
id: dest.id,
|
|
serviceId: 'YOUTUBE',
|
|
rtmpUrl: broadcast.rtmpUrl,
|
|
streamKey: broadcast.streamKey,
|
|
broadcastId: broadcast.id,
|
|
});
|
|
} else if (dest.serviceId === 'TWITCH') {
|
|
// Update channel info
|
|
const tags = dest.tags ? dest.tags.split(',').map((t) => t.trim()).filter(Boolean) : [];
|
|
await updateTwitchChannel(
|
|
accessToken,
|
|
account.accountId,
|
|
dest.title,
|
|
dest.gameId,
|
|
tags,
|
|
);
|
|
|
|
// Get stream key
|
|
const streamKey = await getTwitchStreamKey(accessToken, account.accountId);
|
|
|
|
await fastify.prisma.streamDestination.update({
|
|
where: { id: dest.id },
|
|
data: {
|
|
rtmpUrl: TWITCH_RTMP_URL,
|
|
streamKey,
|
|
broadcastId: account.accountId,
|
|
status: 'READY',
|
|
},
|
|
});
|
|
|
|
prepared.push({
|
|
id: dest.id,
|
|
serviceId: 'TWITCH',
|
|
rtmpUrl: TWITCH_RTMP_URL,
|
|
streamKey,
|
|
broadcastId: account.accountId,
|
|
});
|
|
}
|
|
}
|
|
|
|
await fastify.prisma.streamPlan.update({
|
|
where: { id: plan.id },
|
|
data: { status: 'READY' },
|
|
});
|
|
|
|
const response: PrepareResponse = {
|
|
planId: plan.id,
|
|
destinations: prepared,
|
|
};
|
|
return response;
|
|
});
|
|
|
|
// POST /streams/plans/:id/start — transition to LIVE
|
|
fastify.post<{ Params: { id: string } }>('/streams/plans/:id/start', {
|
|
preHandler: [requireAuth],
|
|
schema: {
|
|
params: {
|
|
type: 'object',
|
|
required: ['id'],
|
|
properties: { id: { type: 'string' } },
|
|
},
|
|
},
|
|
}, async (request) => {
|
|
const plan = await fastify.prisma.streamPlan.findFirst({
|
|
where: { id: request.params.id, userId: request.userId },
|
|
include: { destinations: true },
|
|
});
|
|
if (!plan) throw new AppError(404, 'Stream plan not found');
|
|
|
|
// Idempotent: already LIVE is fine
|
|
if (plan.status === 'LIVE') {
|
|
return { success: true, status: 'LIVE' };
|
|
}
|
|
|
|
if (plan.status !== 'READY') {
|
|
throw new AppError(400, `Plan must be READY to start, currently ${plan.status}`);
|
|
}
|
|
|
|
for (const dest of plan.destinations) {
|
|
if (dest.serviceId === 'YOUTUBE' && dest.broadcastId) {
|
|
// Broadcast was created with enableAutoStart: true, so YouTube
|
|
// auto-transitions to 'live' when RTMP data arrives. A manual
|
|
// transition can fail if the broadcast is still transitioning
|
|
// (e.g. in 'testing' state). Wrap in try-catch so the plan
|
|
// status always gets updated to LIVE.
|
|
try {
|
|
const { accessToken } = await getDecryptedTokenByAccountId(
|
|
fastify.prisma,
|
|
request.userId,
|
|
dest.linkedAccountId,
|
|
);
|
|
await transitionYouTubeBroadcast(accessToken, dest.broadcastId, 'live');
|
|
} catch (err) {
|
|
fastify.log.warn({ err, broadcastId: dest.broadcastId },
|
|
'YouTube live transition failed (autoStart may have handled it)');
|
|
}
|
|
}
|
|
// Twitch goes live automatically when RTMP stream is received
|
|
|
|
await fastify.prisma.streamDestination.update({
|
|
where: { id: dest.id },
|
|
data: { status: 'LIVE' },
|
|
});
|
|
}
|
|
|
|
await fastify.prisma.streamPlan.update({
|
|
where: { id: plan.id },
|
|
data: { status: 'LIVE' },
|
|
});
|
|
|
|
return { success: true, status: 'LIVE' };
|
|
});
|
|
|
|
// POST /streams/plans/:id/end — end stream
|
|
fastify.post<{ Params: { id: string } }>('/streams/plans/:id/end', {
|
|
preHandler: [requireAuth],
|
|
schema: {
|
|
params: {
|
|
type: 'object',
|
|
required: ['id'],
|
|
properties: { id: { type: 'string' } },
|
|
},
|
|
},
|
|
}, async (request) => {
|
|
const plan = await fastify.prisma.streamPlan.findFirst({
|
|
where: { id: request.params.id, userId: request.userId },
|
|
include: { destinations: true },
|
|
});
|
|
if (!plan) throw new AppError(404, 'Stream plan not found');
|
|
|
|
// Idempotent: already ENDED is fine
|
|
if (plan.status === 'ENDED') {
|
|
return { success: true, status: 'ENDED' };
|
|
}
|
|
|
|
if (plan.status !== 'LIVE' && plan.status !== 'READY') {
|
|
throw new AppError(400, `Plan must be LIVE or READY to end, currently ${plan.status}`);
|
|
}
|
|
|
|
for (const dest of plan.destinations) {
|
|
if (dest.serviceId === 'YOUTUBE' && dest.broadcastId) {
|
|
// Always try to end the YouTube broadcast regardless of plan status.
|
|
// The broadcast may be live via enableAutoStart even if our DB status
|
|
// is still READY (e.g. if the /start transition failed).
|
|
try {
|
|
const { accessToken } = await getDecryptedTokenByAccountId(
|
|
fastify.prisma,
|
|
request.userId,
|
|
dest.linkedAccountId,
|
|
);
|
|
await transitionYouTubeBroadcast(accessToken, dest.broadcastId, 'complete');
|
|
} catch (err) {
|
|
// Non-fatal — broadcast may already be complete or still transitioning.
|
|
// enableAutoStop will handle it when RTMP disconnects.
|
|
fastify.log.warn({ err, broadcastId: dest.broadcastId },
|
|
'YouTube complete transition failed');
|
|
}
|
|
}
|
|
// Twitch ends when RTMP stream stops
|
|
|
|
await fastify.prisma.streamDestination.update({
|
|
where: { id: dest.id },
|
|
data: { status: 'ENDED' },
|
|
});
|
|
}
|
|
|
|
await fastify.prisma.streamPlan.update({
|
|
where: { id: plan.id },
|
|
data: { status: 'ENDED' },
|
|
});
|
|
|
|
return { success: true, status: 'ENDED' };
|
|
});
|
|
};
|
|
|
|
export default lifecycleRoutes;
|