Extract autoDetectEndedPlans to shared service, add to feed endpoint
This commit is contained in:
@@ -3,6 +3,7 @@ import { existsSync } from 'fs';
|
||||
import { join } from 'path';
|
||||
import { optionalAuth } from '../../middleware/require-auth.js';
|
||||
import { PREVIEWS_DIR } from '../streams/preview.js';
|
||||
import { autoDetectEndedPlans } from '../../services/stream-status.service.js';
|
||||
import type { FeedResponse, FeedItemResponse } from '../../types/api.js';
|
||||
|
||||
const feedRoutes: FastifyPluginAsync = async (fastify) => {
|
||||
@@ -55,6 +56,12 @@ const feedRoutes: FastifyPluginAsync = async (fastify) => {
|
||||
...(cursorId ? { cursor: { id: cursorId }, skip: 1 } : {}),
|
||||
});
|
||||
|
||||
// Auto-detect streams that ended on the service side
|
||||
const livePlans = plans.filter(p => p.status === 'LIVE' || p.status === 'READY');
|
||||
if (livePlans.length > 0) {
|
||||
await autoDetectEndedPlans(fastify.prisma, livePlans);
|
||||
}
|
||||
|
||||
const hasMore = plans.length > limit;
|
||||
const items = plans.slice(0, limit);
|
||||
|
||||
|
||||
@@ -3,8 +3,7 @@ import { existsSync, unlinkSync } from 'fs';
|
||||
import { join } from 'path';
|
||||
import { requireAuth } from '../../middleware/require-auth.js';
|
||||
import { AppError } from '../../plugins/error-handler.js';
|
||||
import { getYouTubeBroadcastStatus, refreshYouTubeToken } from '../../services/youtube.service.js';
|
||||
import { isTwitchStreamLive, refreshTwitchToken } from '../../services/twitch.service.js';
|
||||
import { autoDetectEndedPlans } from '../../services/stream-status.service.js';
|
||||
import { decrypt, encrypt } from '../../services/crypto.service.js';
|
||||
import { PREVIEWS_DIR } from './preview.js';
|
||||
import type { CreateStreamPlanBody, CreateDestinationBody, UpdateStreamPlanBody, StreamPlanResponse, StreamDestinationResponse } from '../../types/api.js';
|
||||
@@ -38,106 +37,6 @@ export function formatPlan(plan: any): StreamPlanResponse {
|
||||
|
||||
const planRoutes: FastifyPluginAsync = async (fastify) => {
|
||||
|
||||
/** Check service-side broadcast status and auto-end plans that are over */
|
||||
async function autoDetectEndedPlans(plans: any[], userId: string) {
|
||||
// Cache decrypted tokens by linkedAccountId to avoid redundant decrypts
|
||||
const tokenCache = new Map<string, string>();
|
||||
|
||||
async function getAccessToken(linkedAccountId: string, service: 'YOUTUBE' | 'TWITCH'): Promise<string | null> {
|
||||
const cached = tokenCache.get(linkedAccountId);
|
||||
if (cached) return cached;
|
||||
|
||||
const account = await fastify.prisma.linkedAccount.findFirst({
|
||||
where: { id: linkedAccountId, userId },
|
||||
});
|
||||
if (!account) return null;
|
||||
|
||||
let accessToken = decrypt(account.accessTokenEnc, account.accessTokenIv);
|
||||
|
||||
if (account.tokenExpiresAt < new Date(Date.now() + 60_000)) {
|
||||
const refreshToken = decrypt(account.refreshTokenEnc, account.refreshTokenIv);
|
||||
const result = service === 'YOUTUBE'
|
||||
? await refreshYouTubeToken(refreshToken)
|
||||
: await refreshTwitchToken(refreshToken);
|
||||
accessToken = result.accessToken;
|
||||
const accessEnc = encrypt(accessToken);
|
||||
const updateData: any = {
|
||||
accessTokenEnc: accessEnc.ciphertext,
|
||||
accessTokenIv: accessEnc.iv,
|
||||
tokenExpiresAt: new Date(Date.now() + result.expiresIn * 1000),
|
||||
};
|
||||
if (service === 'TWITCH' && 'refreshToken' in result) {
|
||||
const refreshEnc = encrypt(result.refreshToken as string);
|
||||
updateData.refreshTokenEnc = refreshEnc.ciphertext;
|
||||
updateData.refreshTokenIv = refreshEnc.iv;
|
||||
}
|
||||
await fastify.prisma.linkedAccount.update({
|
||||
where: { id: account.id },
|
||||
data: updateData,
|
||||
});
|
||||
}
|
||||
|
||||
tokenCache.set(linkedAccountId, accessToken);
|
||||
return accessToken;
|
||||
}
|
||||
|
||||
async function markPlanEnded(plan: any) {
|
||||
for (const dest of plan.destinations) {
|
||||
await fastify.prisma.streamDestination.update({
|
||||
where: { id: dest.id },
|
||||
data: { status: 'ENDED' },
|
||||
});
|
||||
}
|
||||
await fastify.prisma.streamPlan.update({
|
||||
where: { id: plan.id },
|
||||
data: { status: 'ENDED' },
|
||||
});
|
||||
plan.status = 'ENDED';
|
||||
}
|
||||
|
||||
for (const plan of plans) {
|
||||
if (plan.status !== 'LIVE' && plan.status !== 'READY') continue;
|
||||
|
||||
// Check YouTube broadcast status
|
||||
const ytDest = plan.destinations.find(
|
||||
(d: any) => d.serviceId === 'YOUTUBE' && d.broadcastId,
|
||||
);
|
||||
if (ytDest) {
|
||||
try {
|
||||
const token = await getAccessToken(ytDest.linkedAccountId, 'YOUTUBE');
|
||||
if (token) {
|
||||
const ytStatus = await getYouTubeBroadcastStatus(token, ytDest.broadcastId!);
|
||||
if (ytStatus === 'complete' || ytStatus === 'revoked') {
|
||||
await markPlanEnded(plan);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Non-fatal
|
||||
}
|
||||
}
|
||||
|
||||
// Check Twitch stream status
|
||||
const twitchDest = plan.destinations.find(
|
||||
(d: any) => d.serviceId === 'TWITCH' && d.broadcastId,
|
||||
);
|
||||
if (twitchDest) {
|
||||
try {
|
||||
const token = await getAccessToken(twitchDest.linkedAccountId, 'TWITCH');
|
||||
if (token) {
|
||||
const live = await isTwitchStreamLive(token, twitchDest.broadcastId!);
|
||||
if (!live) {
|
||||
await markPlanEnded(plan);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Non-fatal
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GET /streams/plans — list plans
|
||||
fastify.get('/streams/plans', {
|
||||
preHandler: [requireAuth],
|
||||
@@ -152,7 +51,7 @@ const planRoutes: FastifyPluginAsync = async (fastify) => {
|
||||
const totalPlans = await fastify.prisma.streamPlan.count();
|
||||
request.log.info({ userId: request.userId, userPlans: plans.length, totalPlans }, 'List plans');
|
||||
|
||||
await autoDetectEndedPlans(plans, request.userId);
|
||||
await autoDetectEndedPlans(fastify.prisma, plans);
|
||||
|
||||
return plans.map(formatPlan);
|
||||
});
|
||||
@@ -303,7 +202,7 @@ const planRoutes: FastifyPluginAsync = async (fastify) => {
|
||||
});
|
||||
if (!plan) throw new AppError(404, 'Stream plan not found');
|
||||
|
||||
await autoDetectEndedPlans([plan], request.userId);
|
||||
await autoDetectEndedPlans(fastify.prisma, [plan]);
|
||||
|
||||
return formatPlan(plan);
|
||||
});
|
||||
|
||||
109
src/services/stream-status.service.ts
Normal file
109
src/services/stream-status.service.ts
Normal file
@@ -0,0 +1,109 @@
|
||||
import { PrismaClient } from '@prisma/client';
|
||||
import { getYouTubeBroadcastStatus, refreshYouTubeToken } from './youtube.service.js';
|
||||
import { isTwitchStreamLive, refreshTwitchToken } from './twitch.service.js';
|
||||
import { decrypt, encrypt } from './crypto.service.js';
|
||||
|
||||
/**
|
||||
* Check service-side broadcast status for LIVE/READY plans and auto-end
|
||||
* plans whose streams have finished on the platform side.
|
||||
*
|
||||
* Works without a userId filter so it can be called from any context
|
||||
* (owner's plan list, public feed, etc.).
|
||||
*/
|
||||
export async function autoDetectEndedPlans(prisma: PrismaClient, plans: any[]) {
|
||||
const tokenCache = new Map<string, string>();
|
||||
|
||||
async function getAccessToken(linkedAccountId: string, service: 'YOUTUBE' | 'TWITCH'): Promise<string | null> {
|
||||
const cached = tokenCache.get(linkedAccountId);
|
||||
if (cached) return cached;
|
||||
|
||||
const account = await prisma.linkedAccount.findFirst({
|
||||
where: { id: linkedAccountId },
|
||||
});
|
||||
if (!account) return null;
|
||||
|
||||
let accessToken = decrypt(account.accessTokenEnc, account.accessTokenIv);
|
||||
|
||||
if (account.tokenExpiresAt < new Date(Date.now() + 60_000)) {
|
||||
const refreshToken = decrypt(account.refreshTokenEnc, account.refreshTokenIv);
|
||||
const result = service === 'YOUTUBE'
|
||||
? await refreshYouTubeToken(refreshToken)
|
||||
: await refreshTwitchToken(refreshToken);
|
||||
accessToken = result.accessToken;
|
||||
const accessEnc = encrypt(accessToken);
|
||||
const updateData: any = {
|
||||
accessTokenEnc: accessEnc.ciphertext,
|
||||
accessTokenIv: accessEnc.iv,
|
||||
tokenExpiresAt: new Date(Date.now() + result.expiresIn * 1000),
|
||||
};
|
||||
if (service === 'TWITCH' && 'refreshToken' in result) {
|
||||
const refreshEnc = encrypt(result.refreshToken as string);
|
||||
updateData.refreshTokenEnc = refreshEnc.ciphertext;
|
||||
updateData.refreshTokenIv = refreshEnc.iv;
|
||||
}
|
||||
await prisma.linkedAccount.update({
|
||||
where: { id: account.id },
|
||||
data: updateData,
|
||||
});
|
||||
}
|
||||
|
||||
tokenCache.set(linkedAccountId, accessToken);
|
||||
return accessToken;
|
||||
}
|
||||
|
||||
async function markPlanEnded(plan: any) {
|
||||
for (const dest of plan.destinations) {
|
||||
await prisma.streamDestination.update({
|
||||
where: { id: dest.id },
|
||||
data: { status: 'ENDED' },
|
||||
});
|
||||
}
|
||||
await prisma.streamPlan.update({
|
||||
where: { id: plan.id },
|
||||
data: { status: 'ENDED' },
|
||||
});
|
||||
plan.status = 'ENDED';
|
||||
}
|
||||
|
||||
for (const plan of plans) {
|
||||
if (plan.status !== 'LIVE' && plan.status !== 'READY') continue;
|
||||
|
||||
// Check YouTube broadcast status
|
||||
const ytDest = plan.destinations.find(
|
||||
(d: any) => d.serviceId === 'YOUTUBE' && d.broadcastId,
|
||||
);
|
||||
if (ytDest) {
|
||||
try {
|
||||
const token = await getAccessToken(ytDest.linkedAccountId, 'YOUTUBE');
|
||||
if (token) {
|
||||
const ytStatus = await getYouTubeBroadcastStatus(token, ytDest.broadcastId!);
|
||||
if (ytStatus === 'complete' || ytStatus === 'revoked') {
|
||||
await markPlanEnded(plan);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Non-fatal
|
||||
}
|
||||
}
|
||||
|
||||
// Check Twitch stream status
|
||||
const twitchDest = plan.destinations.find(
|
||||
(d: any) => d.serviceId === 'TWITCH' && d.broadcastId,
|
||||
);
|
||||
if (twitchDest) {
|
||||
try {
|
||||
const token = await getAccessToken(twitchDest.linkedAccountId, 'TWITCH');
|
||||
if (token) {
|
||||
const live = await isTwitchStreamLive(token, twitchDest.broadcastId!);
|
||||
if (!live) {
|
||||
await markPlanEnded(plan);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Non-fatal
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user