Phases 2-4: Auth, providers, stream management

This commit is contained in:
2026-02-23 15:32:24 +01:00
parent 8ea3279c3b
commit 538c24c58f
14 changed files with 1530 additions and 0 deletions

View File

@@ -4,6 +4,13 @@ import prismaPlugin from './plugins/prisma.js';
import errorHandlerPlugin from './plugins/error-handler.js';
import authPlugin from './plugins/auth.js';
import healthRoutes from './routes/health.js';
import metaAuthRoutes from './routes/auth/meta.js';
import sessionRoutes from './routes/auth/session.js';
import accountRoutes from './routes/providers/accounts.js';
import youtubeRoutes from './routes/providers/youtube.js';
import twitchRoutes from './routes/providers/twitch.js';
import planRoutes from './routes/streams/plans.js';
import lifecycleRoutes from './routes/streams/lifecycle.js';
import { config } from './config.js';
export async function buildApp() {
@@ -21,6 +28,13 @@ export async function buildApp() {
// Routes
await app.register(healthRoutes);
await app.register(metaAuthRoutes);
await app.register(sessionRoutes);
await app.register(accountRoutes);
await app.register(youtubeRoutes);
await app.register(twitchRoutes);
await app.register(planRoutes);
await app.register(lifecycleRoutes);
return app;
}

View File

@@ -1,5 +1,6 @@
import { buildApp } from './app.js';
import { config } from './config.js';
import { startTokenRefreshScheduler, stopTokenRefreshScheduler } from './services/token-refresh.service.js';
async function main() {
const app = await buildApp();
@@ -7,6 +8,19 @@ async function main() {
try {
await app.listen({ port: config.port, host: config.host });
app.log.info(`Server listening on ${config.host}:${config.port}`);
// Start background token refresh
startTokenRefreshScheduler(app.prisma, app.log);
// Graceful shutdown
for (const signal of ['SIGINT', 'SIGTERM'] as const) {
process.on(signal, async () => {
app.log.info(`Received ${signal}, shutting down...`);
stopTokenRefreshScheduler();
await app.close();
process.exit(0);
});
}
} catch (err) {
app.log.error(err);
process.exit(1);

74
src/routes/auth/meta.ts Normal file
View File

@@ -0,0 +1,74 @@
import { FastifyPluginAsync } from 'fastify';
import { randomUUID } from 'node:crypto';
import { exchangeMetaCode, fetchMetaProfile } from '../../services/meta-auth.service.js';
import { signAccessToken } from '../../plugins/auth.js';
import { hashToken } from '../../services/crypto.service.js';
import { config } from '../../config.js';
import { AppError } from '../../plugins/error-handler.js';
import type { MetaCallbackBody, AuthTokensResponse } from '../../types/api.js';
const metaAuthRoutes: FastifyPluginAsync = async (fastify) => {
fastify.post<{ Body: MetaCallbackBody }>('/auth/meta/callback', {
schema: {
body: {
type: 'object',
required: ['code'],
properties: {
code: { type: 'string', minLength: 1 },
deviceInfo: { type: 'string' },
},
additionalProperties: false,
},
},
}, async (request, reply) => {
const { code, deviceInfo } = request.body;
// Exchange code for Meta access token
const { accessToken: metaToken } = await exchangeMetaCode(code);
// Fetch user profile
const profile = await fetchMetaProfile(metaToken);
// Upsert user
const user = await fastify.prisma.user.upsert({
where: { metaId: profile.metaId },
update: {
displayName: profile.displayName,
email: profile.email,
avatarUrl: profile.avatarUrl,
},
create: {
metaId: profile.metaId,
displayName: profile.displayName,
email: profile.email,
avatarUrl: profile.avatarUrl,
},
});
// Create session with hashed refresh token
const refreshToken = randomUUID();
const expiresAt = new Date(Date.now() + config.jwt.refreshTtl * 1000);
await fastify.prisma.session.create({
data: {
userId: user.id,
refreshToken: hashToken(refreshToken),
expiresAt,
deviceInfo: deviceInfo ?? null,
},
});
// Sign JWT
const accessToken = await signAccessToken(user.id);
const response: AuthTokensResponse = {
accessToken,
refreshToken,
expiresIn: config.jwt.accessTtl,
};
reply.status(200).send(response);
});
};
export default metaAuthRoutes;

101
src/routes/auth/session.ts Normal file
View File

@@ -0,0 +1,101 @@
import { FastifyPluginAsync } from 'fastify';
import { randomUUID } from 'node:crypto';
import { signAccessToken } from '../../plugins/auth.js';
import { hashToken } from '../../services/crypto.service.js';
import { requireAuth } from '../../middleware/require-auth.js';
import { config } from '../../config.js';
import { AppError } from '../../plugins/error-handler.js';
import type { RefreshBody, AuthTokensResponse, UserProfileResponse } from '../../types/api.js';
const sessionRoutes: FastifyPluginAsync = async (fastify) => {
// POST /auth/refresh — rotate refresh token, issue new JWT
fastify.post<{ Body: RefreshBody }>('/auth/refresh', {
schema: {
body: {
type: 'object',
required: ['refreshToken'],
properties: {
refreshToken: { type: 'string', minLength: 1 },
},
additionalProperties: false,
},
},
}, async (request, reply) => {
const { refreshToken } = request.body;
const hashed = hashToken(refreshToken);
// Find and validate session
const session = await fastify.prisma.session.findUnique({
where: { refreshToken: hashed },
include: { user: true },
});
if (!session || session.expiresAt < new Date()) {
// Delete expired session if it exists
if (session) {
await fastify.prisma.session.delete({ where: { id: session.id } });
}
throw new AppError(401, 'Invalid or expired refresh token');
}
// Rotate: delete old session, create new one
const newRefreshToken = randomUUID();
const expiresAt = new Date(Date.now() + config.jwt.refreshTtl * 1000);
await fastify.prisma.session.delete({ where: { id: session.id } });
await fastify.prisma.session.create({
data: {
userId: session.userId,
refreshToken: hashToken(newRefreshToken),
expiresAt,
deviceInfo: session.deviceInfo,
},
});
const accessToken = await signAccessToken(session.userId);
const response: AuthTokensResponse = {
accessToken,
refreshToken: newRefreshToken,
expiresIn: config.jwt.accessTtl,
};
reply.status(200).send(response);
});
// GET /auth/me — current user profile
fastify.get('/auth/me', {
preHandler: [requireAuth],
}, async (request, reply) => {
const user = await fastify.prisma.user.findUnique({
where: { id: request.userId },
});
if (!user) {
throw new AppError(404, 'User not found');
}
const response: UserProfileResponse = {
id: user.id,
displayName: user.displayName,
email: user.email,
avatarUrl: user.avatarUrl,
};
reply.status(200).send(response);
});
// POST /auth/logout — invalidate session
fastify.post('/auth/logout', {
preHandler: [requireAuth],
}, async (request, reply) => {
// Delete all sessions for this user (full logout)
await fastify.prisma.session.deleteMany({
where: { userId: request.userId },
});
reply.status(200).send({ success: true });
});
};
export default sessionRoutes;

View File

@@ -0,0 +1,77 @@
import { FastifyPluginAsync } from 'fastify';
import { requireAuth } from '../../middleware/require-auth.js';
import { decrypt } from '../../services/crypto.service.js';
import { revokeYouTubeToken } from '../../services/youtube.service.js';
import { revokeTwitchToken } from '../../services/twitch.service.js';
import { AppError } from '../../plugins/error-handler.js';
import type { LinkedAccountResponse } from '../../types/api.js';
const accountRoutes: FastifyPluginAsync = async (fastify) => {
// GET /providers/accounts — list linked accounts (no tokens)
fastify.get('/providers/accounts', {
preHandler: [requireAuth],
}, async (request) => {
const accounts = await fastify.prisma.linkedAccount.findMany({
where: { userId: request.userId },
});
const response: LinkedAccountResponse[] = accounts.map((a) => ({
id: a.id,
serviceId: a.serviceId,
displayName: a.displayName,
accountId: a.accountId,
avatarUrl: a.avatarUrl,
}));
return response;
});
// DELETE /providers/:serviceId — revoke tokens and unlink
fastify.delete<{ Params: { serviceId: string } }>('/providers/:serviceId', {
preHandler: [requireAuth],
schema: {
params: {
type: 'object',
required: ['serviceId'],
properties: {
serviceId: { type: 'string', enum: ['YOUTUBE', 'TWITCH'] },
},
},
},
}, async (request, reply) => {
const { serviceId } = request.params;
const account = await fastify.prisma.linkedAccount.findUnique({
where: {
userId_serviceId: {
userId: request.userId,
serviceId,
},
},
});
if (!account) {
throw new AppError(404, 'Account not linked');
}
// Best-effort revoke tokens at the provider
try {
const accessToken = decrypt(account.accessTokenEnc, account.accessTokenIv);
if (serviceId === 'YOUTUBE') {
await revokeYouTubeToken(accessToken);
} else {
await revokeTwitchToken(accessToken);
}
} catch {
// Revocation failure is non-fatal
}
await fastify.prisma.linkedAccount.delete({
where: { id: account.id },
});
reply.status(200).send({ success: true });
});
};
export default accountRoutes;

View File

@@ -0,0 +1,136 @@
import { FastifyPluginAsync } from 'fastify';
import { randomUUID } from 'node:crypto';
import { requireAuth } from '../../middleware/require-auth.js';
import { encrypt } from '../../services/crypto.service.js';
import {
getTwitchAuthUrl,
exchangeTwitchCode,
fetchTwitchProfile,
} from '../../services/twitch.service.js';
import { config } from '../../config.js';
import { AppError } from '../../plugins/error-handler.js';
import type { AuthUrlResponse, ProviderCallbackBody, LinkedAccountResponse } from '../../types/api.js';
// In-memory CSRF state store (state → { userId, expiresAt })
const pendingStates = new Map<string, { userId: string; expiresAt: number }>();
setInterval(() => {
const now = Date.now();
for (const [key, val] of pendingStates) {
if (val.expiresAt < now) pendingStates.delete(key);
}
}, 5 * 60 * 1000);
const twitchRoutes: FastifyPluginAsync = async (fastify) => {
// GET /providers/twitch/auth-url — get OAuth URL with CSRF state
fastify.get('/providers/twitch/auth-url', {
preHandler: [requireAuth],
}, async (request) => {
const state = randomUUID();
pendingStates.set(state, {
userId: request.userId,
expiresAt: Date.now() + 10 * 60 * 1000,
});
const response: AuthUrlResponse = {
url: getTwitchAuthUrl(state),
state,
};
return response;
});
// GET /providers/twitch/callback-redirect — Twitch redirects here → 302 to app deep link
fastify.get<{ Querystring: { code?: string; state?: string; error?: string } }>(
'/providers/twitch/callback-redirect',
async (request, reply) => {
const { code, state, error } = request.query;
if (error || !code || !state) {
reply.status(302).redirect(
`${config.appScheme}://twitch/callback?error=${error || 'missing_params'}`,
);
return;
}
reply.status(302).redirect(
`${config.appScheme}://twitch/callback?code=${encodeURIComponent(code)}&state=${encodeURIComponent(state)}`,
);
},
);
// POST /providers/twitch/callback — app sends code+state, backend exchanges
fastify.post<{ Body: ProviderCallbackBody }>('/providers/twitch/callback', {
preHandler: [requireAuth],
schema: {
body: {
type: 'object',
required: ['code', 'state'],
properties: {
code: { type: 'string', minLength: 1 },
state: { type: 'string', minLength: 1 },
},
additionalProperties: false,
},
},
}, async (request) => {
const { code, state } = request.body;
// Validate CSRF state
const pending = pendingStates.get(state);
if (!pending || pending.userId !== request.userId || pending.expiresAt < Date.now()) {
pendingStates.delete(state);
throw new AppError(400, 'Invalid or expired state parameter');
}
pendingStates.delete(state);
// Exchange code for tokens
const tokens = await exchangeTwitchCode(code);
const profile = await fetchTwitchProfile(tokens.accessToken);
// Encrypt tokens
const accessEnc = encrypt(tokens.accessToken);
const refreshEnc = encrypt(tokens.refreshToken);
// Upsert linked account
const account = await fastify.prisma.linkedAccount.upsert({
where: {
userId_serviceId: {
userId: request.userId,
serviceId: 'TWITCH',
},
},
update: {
displayName: profile.displayName,
accountId: profile.accountId,
avatarUrl: profile.avatarUrl,
accessTokenEnc: accessEnc.ciphertext,
refreshTokenEnc: refreshEnc.ciphertext,
accessTokenIv: accessEnc.iv,
refreshTokenIv: refreshEnc.iv,
tokenExpiresAt: new Date(Date.now() + tokens.expiresIn * 1000),
},
create: {
userId: request.userId,
serviceId: 'TWITCH',
displayName: profile.displayName,
accountId: profile.accountId,
avatarUrl: profile.avatarUrl,
accessTokenEnc: accessEnc.ciphertext,
refreshTokenEnc: refreshEnc.ciphertext,
accessTokenIv: accessEnc.iv,
refreshTokenIv: refreshEnc.iv,
tokenExpiresAt: new Date(Date.now() + tokens.expiresIn * 1000),
},
});
const response: LinkedAccountResponse = {
id: account.id,
serviceId: account.serviceId,
displayName: account.displayName,
accountId: account.accountId,
avatarUrl: account.avatarUrl,
};
return response;
});
};
export default twitchRoutes;

View File

@@ -0,0 +1,137 @@
import { FastifyPluginAsync } from 'fastify';
import { randomUUID } from 'node:crypto';
import { requireAuth } from '../../middleware/require-auth.js';
import { encrypt } from '../../services/crypto.service.js';
import {
getYouTubeAuthUrl,
exchangeYouTubeCode,
fetchYouTubeProfile,
} from '../../services/youtube.service.js';
import { config } from '../../config.js';
import { AppError } from '../../plugins/error-handler.js';
import type { AuthUrlResponse, ProviderCallbackBody, LinkedAccountResponse } from '../../types/api.js';
// In-memory CSRF state store (state → { userId, expiresAt })
const pendingStates = new Map<string, { userId: string; expiresAt: number }>();
// Clean expired states every 5 minutes
setInterval(() => {
const now = Date.now();
for (const [key, val] of pendingStates) {
if (val.expiresAt < now) pendingStates.delete(key);
}
}, 5 * 60 * 1000);
const youtubeRoutes: FastifyPluginAsync = async (fastify) => {
// GET /providers/youtube/auth-url — get OAuth URL with CSRF state
fastify.get('/providers/youtube/auth-url', {
preHandler: [requireAuth],
}, async (request) => {
const state = randomUUID();
pendingStates.set(state, {
userId: request.userId,
expiresAt: Date.now() + 10 * 60 * 1000, // 10 min
});
const response: AuthUrlResponse = {
url: getYouTubeAuthUrl(state),
state,
};
return response;
});
// GET /providers/youtube/callback-redirect — Google redirects here → 302 to app deep link
fastify.get<{ Querystring: { code?: string; state?: string; error?: string } }>(
'/providers/youtube/callback-redirect',
async (request, reply) => {
const { code, state, error } = request.query;
if (error || !code || !state) {
reply.status(302).redirect(
`${config.appScheme}://youtube/callback?error=${error || 'missing_params'}`,
);
return;
}
reply.status(302).redirect(
`${config.appScheme}://youtube/callback?code=${encodeURIComponent(code)}&state=${encodeURIComponent(state)}`,
);
},
);
// POST /providers/youtube/callback — app sends code+state, backend exchanges
fastify.post<{ Body: ProviderCallbackBody }>('/providers/youtube/callback', {
preHandler: [requireAuth],
schema: {
body: {
type: 'object',
required: ['code', 'state'],
properties: {
code: { type: 'string', minLength: 1 },
state: { type: 'string', minLength: 1 },
},
additionalProperties: false,
},
},
}, async (request) => {
const { code, state } = request.body;
// Validate CSRF state
const pending = pendingStates.get(state);
if (!pending || pending.userId !== request.userId || pending.expiresAt < Date.now()) {
pendingStates.delete(state);
throw new AppError(400, 'Invalid or expired state parameter');
}
pendingStates.delete(state);
// Exchange code for tokens
const tokens = await exchangeYouTubeCode(code);
const profile = await fetchYouTubeProfile(tokens.accessToken);
// Encrypt tokens
const accessEnc = encrypt(tokens.accessToken);
const refreshEnc = encrypt(tokens.refreshToken);
// Upsert linked account
const account = await fastify.prisma.linkedAccount.upsert({
where: {
userId_serviceId: {
userId: request.userId,
serviceId: 'YOUTUBE',
},
},
update: {
displayName: profile.displayName,
accountId: profile.accountId,
avatarUrl: profile.avatarUrl,
accessTokenEnc: accessEnc.ciphertext,
refreshTokenEnc: refreshEnc.ciphertext,
accessTokenIv: accessEnc.iv,
refreshTokenIv: refreshEnc.iv,
tokenExpiresAt: new Date(Date.now() + tokens.expiresIn * 1000),
},
create: {
userId: request.userId,
serviceId: 'YOUTUBE',
displayName: profile.displayName,
accountId: profile.accountId,
avatarUrl: profile.avatarUrl,
accessTokenEnc: accessEnc.ciphertext,
refreshTokenEnc: refreshEnc.ciphertext,
accessTokenIv: accessEnc.iv,
refreshTokenIv: refreshEnc.iv,
tokenExpiresAt: new Date(Date.now() + tokens.expiresIn * 1000),
},
});
const response: LinkedAccountResponse = {
id: account.id,
serviceId: account.serviceId,
displayName: account.displayName,
accountId: account.accountId,
avatarUrl: account.avatarUrl,
};
return response;
});
};
export default youtubeRoutes;

View File

@@ -0,0 +1,267 @@
import { FastifyPluginAsync } from 'fastify';
import { requireAuth } from '../../middleware/require-auth.js';
import { decrypt, encrypt } from '../../services/crypto.service.js';
import {
createYouTubeBroadcast,
transitionYouTubeBroadcast,
refreshYouTubeToken,
} from '../../services/youtube.service.js';
import {
getTwitchStreamKey,
updateTwitchChannel,
refreshTwitchToken,
} from '../../services/twitch.service.js';
import { AppError } from '../../plugins/error-handler.js';
import type { PrepareResponse, PreparedDestination } from '../../types/api.js';
const TWITCH_RTMP_URL = 'rtmp://live.twitch.tv/app';
async function getDecryptedToken(
prisma: any,
userId: string,
serviceId: string,
): Promise<{ account: any; accessToken: string }> {
const account = await prisma.linkedAccount.findUnique({
where: { userId_serviceId: { userId, serviceId } },
});
if (!account) throw new AppError(400, `No ${serviceId} account linked`);
// Lazy refresh if token is expired or about to expire
if (account.tokenExpiresAt < new Date(Date.now() + 60 * 1000)) {
const refreshToken = decrypt(account.refreshTokenEnc, account.refreshTokenIv);
let newAccess: string;
let newRefresh: string | undefined;
let expiresIn: number;
if (serviceId === 'YOUTUBE') {
const result = await refreshYouTubeToken(refreshToken);
newAccess = result.accessToken;
expiresIn = result.expiresIn;
} else {
const result = await refreshTwitchToken(refreshToken);
newAccess = result.accessToken;
newRefresh = result.refreshToken;
expiresIn = result.expiresIn;
}
const accessEnc = encrypt(newAccess);
const updateData: any = {
accessTokenEnc: accessEnc.ciphertext,
accessTokenIv: accessEnc.iv,
tokenExpiresAt: new Date(Date.now() + expiresIn * 1000),
};
if (newRefresh) {
const refreshEnc = encrypt(newRefresh);
updateData.refreshTokenEnc = refreshEnc.ciphertext;
updateData.refreshTokenIv = refreshEnc.iv;
}
await prisma.linkedAccount.update({
where: { id: account.id },
data: updateData,
});
return { account, accessToken: newAccess };
}
return {
account,
accessToken: decrypt(account.accessTokenEnc, account.accessTokenIv),
};
}
const lifecycleRoutes: FastifyPluginAsync = async (fastify) => {
// POST /streams/plans/:id/prepare — create broadcasts, get RTMP info
fastify.post<{ Params: { id: string } }>('/streams/plans/:id/prepare', {
preHandler: [requireAuth],
schema: {
params: {
type: 'object',
required: ['id'],
properties: { id: { type: 'string' } },
},
},
}, async (request) => {
const plan = await fastify.prisma.streamPlan.findFirst({
where: { id: request.params.id, userId: request.userId },
include: { destinations: true },
});
if (!plan) throw new AppError(404, 'Stream plan not found');
if (plan.status !== 'DRAFT') {
throw new AppError(400, `Plan is already ${plan.status}`);
}
const prepared: PreparedDestination[] = [];
for (const dest of plan.destinations) {
const { account, accessToken } = await getDecryptedToken(
fastify.prisma,
request.userId,
dest.serviceId,
);
if (dest.serviceId === 'YOUTUBE') {
const broadcast = await createYouTubeBroadcast(
accessToken,
dest.title,
dest.description,
dest.privacyStatus,
);
await fastify.prisma.streamDestination.update({
where: { id: dest.id },
data: {
rtmpUrl: broadcast.rtmpUrl,
streamKey: broadcast.streamKey,
broadcastId: broadcast.id,
status: 'READY',
},
});
prepared.push({
serviceId: 'YOUTUBE',
rtmpUrl: broadcast.rtmpUrl,
streamKey: broadcast.streamKey,
broadcastId: broadcast.id,
});
} else if (dest.serviceId === 'TWITCH') {
// Update channel info
const tags = dest.tags ? dest.tags.split(',').map((t) => t.trim()).filter(Boolean) : [];
await updateTwitchChannel(
accessToken,
account.accountId,
dest.title,
dest.gameId,
tags,
);
// Get stream key
const streamKey = await getTwitchStreamKey(accessToken, account.accountId);
await fastify.prisma.streamDestination.update({
where: { id: dest.id },
data: {
rtmpUrl: TWITCH_RTMP_URL,
streamKey,
broadcastId: account.accountId,
status: 'READY',
},
});
prepared.push({
serviceId: 'TWITCH',
rtmpUrl: TWITCH_RTMP_URL,
streamKey,
broadcastId: account.accountId,
});
}
}
await fastify.prisma.streamPlan.update({
where: { id: plan.id },
data: { status: 'READY' },
});
const response: PrepareResponse = {
planId: plan.id,
destinations: prepared,
};
return response;
});
// POST /streams/plans/:id/start — transition to LIVE
fastify.post<{ Params: { id: string } }>('/streams/plans/:id/start', {
preHandler: [requireAuth],
schema: {
params: {
type: 'object',
required: ['id'],
properties: { id: { type: 'string' } },
},
},
}, async (request) => {
const plan = await fastify.prisma.streamPlan.findFirst({
where: { id: request.params.id, userId: request.userId },
include: { destinations: true },
});
if (!plan) throw new AppError(404, 'Stream plan not found');
if (plan.status !== 'READY') {
throw new AppError(400, `Plan must be READY to start, currently ${plan.status}`);
}
for (const dest of plan.destinations) {
if (dest.serviceId === 'YOUTUBE' && dest.broadcastId) {
const { accessToken } = await getDecryptedToken(
fastify.prisma,
request.userId,
'YOUTUBE',
);
await transitionYouTubeBroadcast(accessToken, dest.broadcastId, 'live');
}
// Twitch goes live automatically when RTMP stream is received
await fastify.prisma.streamDestination.update({
where: { id: dest.id },
data: { status: 'LIVE' },
});
}
await fastify.prisma.streamPlan.update({
where: { id: plan.id },
data: { status: 'LIVE' },
});
return { success: true, status: 'LIVE' };
});
// POST /streams/plans/:id/end — end stream
fastify.post<{ Params: { id: string } }>('/streams/plans/:id/end', {
preHandler: [requireAuth],
schema: {
params: {
type: 'object',
required: ['id'],
properties: { id: { type: 'string' } },
},
},
}, async (request) => {
const plan = await fastify.prisma.streamPlan.findFirst({
where: { id: request.params.id, userId: request.userId },
include: { destinations: true },
});
if (!plan) throw new AppError(404, 'Stream plan not found');
if (plan.status !== 'LIVE') {
throw new AppError(400, `Plan must be LIVE to end, currently ${plan.status}`);
}
for (const dest of plan.destinations) {
if (dest.serviceId === 'YOUTUBE' && dest.broadcastId) {
const { accessToken } = await getDecryptedToken(
fastify.prisma,
request.userId,
'YOUTUBE',
);
try {
await transitionYouTubeBroadcast(accessToken, dest.broadcastId, 'complete');
} catch {
// Non-fatal — stream may already be ended
}
}
// Twitch ends when RTMP stream stops
await fastify.prisma.streamDestination.update({
where: { id: dest.id },
data: { status: 'ENDED' },
});
}
await fastify.prisma.streamPlan.update({
where: { id: plan.id },
data: { status: 'ENDED' },
});
return { success: true, status: 'ENDED' };
});
};
export default lifecycleRoutes;

149
src/routes/streams/plans.ts Normal file
View File

@@ -0,0 +1,149 @@
import { FastifyPluginAsync } from 'fastify';
import { requireAuth } from '../../middleware/require-auth.js';
import { AppError } from '../../plugins/error-handler.js';
import type { CreateStreamPlanBody, StreamPlanResponse, StreamDestinationResponse } from '../../types/api.js';
function formatPlan(plan: any): StreamPlanResponse {
return {
id: plan.id,
name: plan.name,
status: plan.status,
createdAt: plan.createdAt.toISOString(),
updatedAt: plan.updatedAt.toISOString(),
destinations: (plan.destinations ?? []).map((d: any): StreamDestinationResponse => ({
id: d.id,
serviceId: d.serviceId,
title: d.title,
description: d.description,
privacyStatus: d.privacyStatus,
gameId: d.gameId,
tags: d.tags,
rtmpUrl: d.rtmpUrl,
streamKey: d.streamKey,
broadcastId: d.broadcastId,
status: d.status,
})),
};
}
const planRoutes: FastifyPluginAsync = async (fastify) => {
// GET /streams/plans — list plans
fastify.get('/streams/plans', {
preHandler: [requireAuth],
}, async (request) => {
const plans = await fastify.prisma.streamPlan.findMany({
where: { userId: request.userId },
include: { destinations: true },
orderBy: { createdAt: 'desc' },
});
return plans.map(formatPlan);
});
// POST /streams/plans — create plan with destinations
fastify.post<{ Body: CreateStreamPlanBody }>('/streams/plans', {
preHandler: [requireAuth],
schema: {
body: {
type: 'object',
required: ['name', 'destinations'],
properties: {
name: { type: 'string', minLength: 1, maxLength: 200 },
destinations: {
type: 'array',
minItems: 1,
maxItems: 10,
items: {
type: 'object',
required: ['serviceId', 'title'],
properties: {
serviceId: { type: 'string', enum: ['YOUTUBE', 'TWITCH'] },
title: { type: 'string', minLength: 1, maxLength: 200 },
description: { type: 'string', maxLength: 5000 },
privacyStatus: { type: 'string', enum: ['public', 'unlisted', 'private'] },
gameId: { type: 'string', maxLength: 100 },
tags: { type: 'string', maxLength: 500 },
},
additionalProperties: false,
},
},
},
additionalProperties: false,
},
},
}, async (request, reply) => {
const { name, destinations } = request.body;
// Verify user has the required linked accounts
const linkedAccounts = await fastify.prisma.linkedAccount.findMany({
where: { userId: request.userId },
});
const linkedServices = new Set(linkedAccounts.map((a) => a.serviceId));
for (const dest of destinations) {
if (!linkedServices.has(dest.serviceId)) {
throw new AppError(400, `No ${dest.serviceId} account linked`);
}
}
const plan = await fastify.prisma.streamPlan.create({
data: {
userId: request.userId,
name,
destinations: {
create: destinations.map((d) => ({
serviceId: d.serviceId,
title: d.title,
description: d.description ?? '',
privacyStatus: d.privacyStatus ?? 'public',
gameId: d.gameId ?? '',
tags: d.tags ?? '',
})),
},
},
include: { destinations: true },
});
reply.status(201).send(formatPlan(plan));
});
// GET /streams/plans/:id — get plan detail
fastify.get<{ Params: { id: string } }>('/streams/plans/:id', {
preHandler: [requireAuth],
schema: {
params: {
type: 'object',
required: ['id'],
properties: { id: { type: 'string' } },
},
},
}, async (request) => {
const plan = await fastify.prisma.streamPlan.findFirst({
where: { id: request.params.id, userId: request.userId },
include: { destinations: true },
});
if (!plan) throw new AppError(404, 'Stream plan not found');
return formatPlan(plan);
});
// DELETE /streams/plans/:id — delete plan
fastify.delete<{ Params: { id: string } }>('/streams/plans/:id', {
preHandler: [requireAuth],
schema: {
params: {
type: 'object',
required: ['id'],
properties: { id: { type: 'string' } },
},
},
}, async (request, reply) => {
const plan = await fastify.prisma.streamPlan.findFirst({
where: { id: request.params.id, userId: request.userId },
});
if (!plan) throw new AppError(404, 'Stream plan not found');
await fastify.prisma.streamPlan.delete({ where: { id: plan.id } });
reply.status(200).send({ success: true });
});
};
export default planRoutes;

View File

@@ -0,0 +1,37 @@
import { createCipheriv, createDecipheriv, randomBytes, createHash } from 'node:crypto';
import { config } from '../config.js';
const ALGORITHM = 'aes-256-gcm';
const IV_LENGTH = 12;
const AUTH_TAG_LENGTH = 16;
function getKey(): Buffer {
return Buffer.from(config.tokenEncryptionKey, 'hex');
}
export function encrypt(plaintext: string): { ciphertext: string; iv: string } {
const key = getKey();
const iv = randomBytes(IV_LENGTH);
const cipher = createCipheriv(ALGORITHM, key, iv, { authTagLength: AUTH_TAG_LENGTH });
const encrypted = Buffer.concat([cipher.update(plaintext, 'utf8'), cipher.final()]);
const authTag = cipher.getAuthTag();
return {
ciphertext: Buffer.concat([encrypted, authTag]).toString('base64'),
iv: iv.toString('base64'),
};
}
export function decrypt(ciphertext: string, iv: string): string {
const key = getKey();
const ivBuf = Buffer.from(iv, 'base64');
const data = Buffer.from(ciphertext, 'base64');
const authTag = data.subarray(data.length - AUTH_TAG_LENGTH);
const encrypted = data.subarray(0, data.length - AUTH_TAG_LENGTH);
const decipher = createDecipheriv(ALGORITHM, key, ivBuf, { authTagLength: AUTH_TAG_LENGTH });
decipher.setAuthTag(authTag);
return decipher.update(encrypted) + decipher.final('utf8');
}
export function hashToken(token: string): string {
return createHash('sha256').update(token).digest('hex');
}

View File

@@ -0,0 +1,55 @@
import { config } from '../config.js';
interface MetaTokenResponse {
access_token: string;
token_type: string;
expires_in: number;
}
interface MetaProfile {
id: string;
name: string;
email?: string;
picture?: { data?: { url?: string } };
}
export async function exchangeMetaCode(code: string): Promise<{ accessToken: string }> {
const params = new URLSearchParams({
client_id: config.meta.appId,
client_secret: config.meta.appSecret,
redirect_uri: config.meta.redirectUri,
code,
});
const res = await fetch(
`https://graph.facebook.com/v19.0/oauth/access_token?${params}`,
);
if (!res.ok) {
const body = await res.text();
throw new Error(`Meta token exchange failed: ${res.status} ${body}`);
}
const data = (await res.json()) as MetaTokenResponse;
return { accessToken: data.access_token };
}
export async function fetchMetaProfile(accessToken: string): Promise<{
metaId: string;
displayName: string;
email: string | null;
avatarUrl: string | null;
}> {
const res = await fetch(
`https://graph.facebook.com/v19.0/me?fields=id,name,email,picture.type(large)&access_token=${accessToken}`,
);
if (!res.ok) {
const body = await res.text();
throw new Error(`Meta profile fetch failed: ${res.status} ${body}`);
}
const data = (await res.json()) as MetaProfile;
return {
metaId: data.id,
displayName: data.name,
email: data.email ?? null,
avatarUrl: data.picture?.data?.url ?? null,
};
}

View File

@@ -0,0 +1,70 @@
import { PrismaClient } from '@prisma/client';
import { decrypt, encrypt } from './crypto.service.js';
import { refreshYouTubeToken } from './youtube.service.js';
import { refreshTwitchToken } from './twitch.service.js';
const REFRESH_INTERVAL = 10 * 60 * 1000; // 10 minutes
const REFRESH_THRESHOLD = 15 * 60 * 1000; // Refresh tokens expiring within 15 minutes
let timer: ReturnType<typeof setInterval> | null = null;
export function startTokenRefreshScheduler(prisma: PrismaClient, logger: { info: (...args: any[]) => void; error: (...args: any[]) => void }) {
if (timer) return;
timer = setInterval(async () => {
try {
const threshold = new Date(Date.now() + REFRESH_THRESHOLD);
const accounts = await prisma.linkedAccount.findMany({
where: { tokenExpiresAt: { lt: threshold } },
});
for (const account of accounts) {
try {
const refreshToken = decrypt(account.refreshTokenEnc, account.refreshTokenIv);
if (account.serviceId === 'YOUTUBE') {
const result = await refreshYouTubeToken(refreshToken);
const accessEnc = encrypt(result.accessToken);
await prisma.linkedAccount.update({
where: { id: account.id },
data: {
accessTokenEnc: accessEnc.ciphertext,
accessTokenIv: accessEnc.iv,
tokenExpiresAt: new Date(Date.now() + result.expiresIn * 1000),
},
});
} else if (account.serviceId === 'TWITCH') {
const result = await refreshTwitchToken(refreshToken);
const accessEnc = encrypt(result.accessToken);
const refreshEnc = encrypt(result.refreshToken);
await prisma.linkedAccount.update({
where: { id: account.id },
data: {
accessTokenEnc: accessEnc.ciphertext,
accessTokenIv: accessEnc.iv,
refreshTokenEnc: refreshEnc.ciphertext,
refreshTokenIv: refreshEnc.iv,
tokenExpiresAt: new Date(Date.now() + result.expiresIn * 1000),
},
});
}
logger.info(`Refreshed ${account.serviceId} token for account ${account.id}`);
} catch (err) {
logger.error(`Failed to refresh ${account.serviceId} token for account ${account.id}: ${err}`);
}
}
} catch (err) {
logger.error(`Token refresh scheduler error: ${err}`);
}
}, REFRESH_INTERVAL);
logger.info('Token refresh scheduler started');
}
export function stopTokenRefreshScheduler() {
if (timer) {
clearInterval(timer);
timer = null;
}
}

View File

@@ -0,0 +1,178 @@
import { config } from '../config.js';
interface TwitchTokenResponse {
access_token: string;
refresh_token: string;
expires_in: number;
token_type: string;
}
interface TwitchUser {
id: string;
login: string;
display_name: string;
profile_image_url: string;
}
const SCOPES = [
'channel:manage:broadcast',
'channel:read:stream_key',
'user:read:email',
].join(' ');
export function getTwitchAuthUrl(state: string): string {
const params = new URLSearchParams({
client_id: config.twitch.clientId,
redirect_uri: config.twitch.redirectUri,
response_type: 'code',
scope: SCOPES,
force_verify: 'true',
state,
});
return `https://id.twitch.tv/oauth2/authorize?${params}`;
}
export async function exchangeTwitchCode(code: string): Promise<{
accessToken: string;
refreshToken: string;
expiresIn: number;
}> {
const res = await fetch('https://id.twitch.tv/oauth2/token', {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
client_id: config.twitch.clientId,
client_secret: config.twitch.clientSecret,
code,
grant_type: 'authorization_code',
redirect_uri: config.twitch.redirectUri,
}),
});
if (!res.ok) {
const body = await res.text();
throw new Error(`Twitch token exchange failed: ${res.status} ${body}`);
}
const data = (await res.json()) as TwitchTokenResponse;
return {
accessToken: data.access_token,
refreshToken: data.refresh_token,
expiresIn: data.expires_in,
};
}
export async function refreshTwitchToken(refreshToken: string): Promise<{
accessToken: string;
refreshToken: string;
expiresIn: number;
}> {
const res = await fetch('https://id.twitch.tv/oauth2/token', {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
client_id: config.twitch.clientId,
client_secret: config.twitch.clientSecret,
refresh_token: refreshToken,
grant_type: 'refresh_token',
}),
});
if (!res.ok) {
const body = await res.text();
throw new Error(`Twitch token refresh failed: ${res.status} ${body}`);
}
const data = (await res.json()) as TwitchTokenResponse;
return {
accessToken: data.access_token,
refreshToken: data.refresh_token,
expiresIn: data.expires_in,
};
}
export async function fetchTwitchProfile(accessToken: string): Promise<{
accountId: string;
displayName: string;
avatarUrl: string | null;
}> {
const res = await fetch('https://api.twitch.tv/helix/users', {
headers: {
Authorization: `Bearer ${accessToken}`,
'Client-Id': config.twitch.clientId,
},
});
if (!res.ok) {
const body = await res.text();
throw new Error(`Twitch profile fetch failed: ${res.status} ${body}`);
}
const data = (await res.json()) as { data: TwitchUser[] };
const user = data.data[0];
if (!user) throw new Error('Twitch returned no user data');
return {
accountId: user.id,
displayName: user.display_name,
avatarUrl: user.profile_image_url ?? null,
};
}
export async function revokeTwitchToken(accessToken: string): Promise<void> {
await fetch('https://id.twitch.tv/oauth2/revoke', {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
client_id: config.twitch.clientId,
token: accessToken,
}),
});
}
// ── Twitch Helix Stream Management ──────────────────────
export async function getTwitchStreamKey(
accessToken: string,
broadcasterId: string,
): Promise<string> {
const res = await fetch(
`https://api.twitch.tv/helix/streams/key?broadcaster_id=${broadcasterId}`,
{
headers: {
Authorization: `Bearer ${accessToken}`,
'Client-Id': config.twitch.clientId,
},
},
);
if (!res.ok) {
const body = await res.text();
throw new Error(`Twitch get stream key failed: ${res.status} ${body}`);
}
const data = (await res.json()) as { data: [{ stream_key: string }] };
return data.data[0].stream_key;
}
export async function updateTwitchChannel(
accessToken: string,
broadcasterId: string,
title: string,
gameId: string,
tags: string[],
): Promise<void> {
const body: Record<string, unknown> = {
title,
};
if (gameId) body.game_id = gameId;
if (tags.length > 0) body.tags = tags;
const res = await fetch(
`https://api.twitch.tv/helix/channels?broadcaster_id=${broadcasterId}`,
{
method: 'PATCH',
headers: {
Authorization: `Bearer ${accessToken}`,
'Client-Id': config.twitch.clientId,
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
},
);
if (!res.ok) {
const body = await res.text();
throw new Error(`Twitch update channel failed: ${res.status} ${body}`);
}
}

View File

@@ -0,0 +1,221 @@
import { config } from '../config.js';
interface GoogleTokenResponse {
access_token: string;
refresh_token?: string;
expires_in: number;
token_type: string;
}
interface GoogleUserInfo {
sub: string;
name: string;
picture?: string;
email?: string;
}
const SCOPES = [
'https://www.googleapis.com/auth/youtube',
'https://www.googleapis.com/auth/youtube.force-ssl',
'openid',
'profile',
].join(' ');
export function getYouTubeAuthUrl(state: string): string {
const params = new URLSearchParams({
client_id: config.youtube.clientId,
redirect_uri: config.youtube.redirectUri,
response_type: 'code',
scope: SCOPES,
access_type: 'offline',
prompt: 'consent',
state,
});
return `https://accounts.google.com/o/oauth2/v2/auth?${params}`;
}
export async function exchangeYouTubeCode(code: string): Promise<{
accessToken: string;
refreshToken: string;
expiresIn: number;
}> {
const res = await fetch('https://oauth2.googleapis.com/token', {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
client_id: config.youtube.clientId,
client_secret: config.youtube.clientSecret,
code,
grant_type: 'authorization_code',
redirect_uri: config.youtube.redirectUri,
}),
});
if (!res.ok) {
const body = await res.text();
throw new Error(`YouTube token exchange failed: ${res.status} ${body}`);
}
const data = (await res.json()) as GoogleTokenResponse;
if (!data.refresh_token) {
throw new Error('YouTube did not return a refresh token. User may need to revoke and re-link.');
}
return {
accessToken: data.access_token,
refreshToken: data.refresh_token,
expiresIn: data.expires_in,
};
}
export async function refreshYouTubeToken(refreshToken: string): Promise<{
accessToken: string;
expiresIn: number;
}> {
const res = await fetch('https://oauth2.googleapis.com/token', {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
client_id: config.youtube.clientId,
client_secret: config.youtube.clientSecret,
refresh_token: refreshToken,
grant_type: 'refresh_token',
}),
});
if (!res.ok) {
const body = await res.text();
throw new Error(`YouTube token refresh failed: ${res.status} ${body}`);
}
const data = (await res.json()) as GoogleTokenResponse;
return { accessToken: data.access_token, expiresIn: data.expires_in };
}
export async function fetchYouTubeProfile(accessToken: string): Promise<{
accountId: string;
displayName: string;
avatarUrl: string | null;
}> {
const res = await fetch('https://www.googleapis.com/oauth2/v3/userinfo', {
headers: { Authorization: `Bearer ${accessToken}` },
});
if (!res.ok) {
const body = await res.text();
throw new Error(`YouTube profile fetch failed: ${res.status} ${body}`);
}
const data = (await res.json()) as GoogleUserInfo;
return {
accountId: data.sub,
displayName: data.name,
avatarUrl: data.picture ?? null,
};
}
export async function revokeYouTubeToken(token: string): Promise<void> {
await fetch(`https://oauth2.googleapis.com/revoke?token=${token}`, {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
});
}
// ── YouTube Live Streaming API ───────────────────────────
export interface YouTubeBroadcast {
id: string;
rtmpUrl: string;
streamKey: string;
}
export async function createYouTubeBroadcast(
accessToken: string,
title: string,
description: string,
privacyStatus: string,
): Promise<YouTubeBroadcast> {
// Create liveBroadcast
const broadcastRes = await fetch(
'https://www.googleapis.com/youtube/v3/liveBroadcasts?part=snippet,status,contentDetails',
{
method: 'POST',
headers: {
Authorization: `Bearer ${accessToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
snippet: {
title,
description,
scheduledStartTime: new Date().toISOString(),
},
status: { privacyStatus },
contentDetails: {
enableAutoStart: true,
enableAutoStop: true,
},
}),
},
);
if (!broadcastRes.ok) {
const body = await broadcastRes.text();
throw new Error(`YouTube create broadcast failed: ${broadcastRes.status} ${body}`);
}
const broadcast = (await broadcastRes.json()) as any;
// Create liveStream
const streamRes = await fetch(
'https://www.googleapis.com/youtube/v3/liveStreams?part=snippet,cdn',
{
method: 'POST',
headers: {
Authorization: `Bearer ${accessToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
snippet: { title: `${title} - stream` },
cdn: {
frameRate: 'variable',
ingestionType: 'rtmp',
resolution: 'variable',
},
}),
},
);
if (!streamRes.ok) {
const body = await streamRes.text();
throw new Error(`YouTube create stream failed: ${streamRes.status} ${body}`);
}
const stream = (await streamRes.json()) as any;
// Bind stream to broadcast
const bindRes = await fetch(
`https://www.googleapis.com/youtube/v3/liveBroadcasts/bind?id=${broadcast.id}&part=id&streamId=${stream.id}`,
{
method: 'POST',
headers: { Authorization: `Bearer ${accessToken}` },
},
);
if (!bindRes.ok) {
const body = await bindRes.text();
throw new Error(`YouTube bind broadcast failed: ${bindRes.status} ${body}`);
}
return {
id: broadcast.id,
rtmpUrl: stream.cdn.ingestionInfo.ingestionAddress,
streamKey: stream.cdn.ingestionInfo.streamName,
};
}
export async function transitionYouTubeBroadcast(
accessToken: string,
broadcastId: string,
status: 'live' | 'complete',
): Promise<void> {
const res = await fetch(
`https://www.googleapis.com/youtube/v3/liveBroadcasts/transition?broadcastStatus=${status}&id=${broadcastId}&part=status`,
{
method: 'POST',
headers: { Authorization: `Bearer ${accessToken}` },
},
);
if (!res.ok) {
const body = await res.text();
throw new Error(`YouTube transition broadcast failed: ${res.status} ${body}`);
}
}