From 7e99a053da748246f7899b8a29d3d7e7bd403239 Mon Sep 17 00:00:00 2001 From: omigamedev Date: Wed, 4 Mar 2026 10:50:52 +0100 Subject: [PATCH] Extract autoDetectEndedPlans to shared service, add to feed endpoint --- src/routes/social/feed.ts | 7 ++ src/routes/streams/plans.ts | 107 +------------------------ src/services/stream-status.service.ts | 109 ++++++++++++++++++++++++++ 3 files changed, 119 insertions(+), 104 deletions(-) create mode 100644 src/services/stream-status.service.ts diff --git a/src/routes/social/feed.ts b/src/routes/social/feed.ts index a2f229b..3198e79 100644 --- a/src/routes/social/feed.ts +++ b/src/routes/social/feed.ts @@ -3,6 +3,7 @@ import { existsSync } from 'fs'; import { join } from 'path'; import { optionalAuth } from '../../middleware/require-auth.js'; import { PREVIEWS_DIR } from '../streams/preview.js'; +import { autoDetectEndedPlans } from '../../services/stream-status.service.js'; import type { FeedResponse, FeedItemResponse } from '../../types/api.js'; const feedRoutes: FastifyPluginAsync = async (fastify) => { @@ -55,6 +56,12 @@ const feedRoutes: FastifyPluginAsync = async (fastify) => { ...(cursorId ? { cursor: { id: cursorId }, skip: 1 } : {}), }); + // Auto-detect streams that ended on the service side + const livePlans = plans.filter(p => p.status === 'LIVE' || p.status === 'READY'); + if (livePlans.length > 0) { + await autoDetectEndedPlans(fastify.prisma, livePlans); + } + const hasMore = plans.length > limit; const items = plans.slice(0, limit); diff --git a/src/routes/streams/plans.ts b/src/routes/streams/plans.ts index a72b53d..b5f8392 100644 --- a/src/routes/streams/plans.ts +++ b/src/routes/streams/plans.ts @@ -3,8 +3,7 @@ import { existsSync, unlinkSync } from 'fs'; import { join } from 'path'; import { requireAuth } from '../../middleware/require-auth.js'; import { AppError } from '../../plugins/error-handler.js'; -import { getYouTubeBroadcastStatus, refreshYouTubeToken } from '../../services/youtube.service.js'; -import { isTwitchStreamLive, refreshTwitchToken } from '../../services/twitch.service.js'; +import { autoDetectEndedPlans } from '../../services/stream-status.service.js'; import { decrypt, encrypt } from '../../services/crypto.service.js'; import { PREVIEWS_DIR } from './preview.js'; import type { CreateStreamPlanBody, CreateDestinationBody, UpdateStreamPlanBody, StreamPlanResponse, StreamDestinationResponse } from '../../types/api.js'; @@ -38,106 +37,6 @@ export function formatPlan(plan: any): StreamPlanResponse { const planRoutes: FastifyPluginAsync = async (fastify) => { - /** Check service-side broadcast status and auto-end plans that are over */ - async function autoDetectEndedPlans(plans: any[], userId: string) { - // Cache decrypted tokens by linkedAccountId to avoid redundant decrypts - const tokenCache = new Map(); - - async function getAccessToken(linkedAccountId: string, service: 'YOUTUBE' | 'TWITCH'): Promise { - const cached = tokenCache.get(linkedAccountId); - if (cached) return cached; - - const account = await fastify.prisma.linkedAccount.findFirst({ - where: { id: linkedAccountId, userId }, - }); - if (!account) return null; - - let accessToken = decrypt(account.accessTokenEnc, account.accessTokenIv); - - if (account.tokenExpiresAt < new Date(Date.now() + 60_000)) { - const refreshToken = decrypt(account.refreshTokenEnc, account.refreshTokenIv); - const result = service === 'YOUTUBE' - ? await refreshYouTubeToken(refreshToken) - : await refreshTwitchToken(refreshToken); - accessToken = result.accessToken; - const accessEnc = encrypt(accessToken); - const updateData: any = { - accessTokenEnc: accessEnc.ciphertext, - accessTokenIv: accessEnc.iv, - tokenExpiresAt: new Date(Date.now() + result.expiresIn * 1000), - }; - if (service === 'TWITCH' && 'refreshToken' in result) { - const refreshEnc = encrypt(result.refreshToken as string); - updateData.refreshTokenEnc = refreshEnc.ciphertext; - updateData.refreshTokenIv = refreshEnc.iv; - } - await fastify.prisma.linkedAccount.update({ - where: { id: account.id }, - data: updateData, - }); - } - - tokenCache.set(linkedAccountId, accessToken); - return accessToken; - } - - async function markPlanEnded(plan: any) { - for (const dest of plan.destinations) { - await fastify.prisma.streamDestination.update({ - where: { id: dest.id }, - data: { status: 'ENDED' }, - }); - } - await fastify.prisma.streamPlan.update({ - where: { id: plan.id }, - data: { status: 'ENDED' }, - }); - plan.status = 'ENDED'; - } - - for (const plan of plans) { - if (plan.status !== 'LIVE' && plan.status !== 'READY') continue; - - // Check YouTube broadcast status - const ytDest = plan.destinations.find( - (d: any) => d.serviceId === 'YOUTUBE' && d.broadcastId, - ); - if (ytDest) { - try { - const token = await getAccessToken(ytDest.linkedAccountId, 'YOUTUBE'); - if (token) { - const ytStatus = await getYouTubeBroadcastStatus(token, ytDest.broadcastId!); - if (ytStatus === 'complete' || ytStatus === 'revoked') { - await markPlanEnded(plan); - continue; - } - } - } catch { - // Non-fatal - } - } - - // Check Twitch stream status - const twitchDest = plan.destinations.find( - (d: any) => d.serviceId === 'TWITCH' && d.broadcastId, - ); - if (twitchDest) { - try { - const token = await getAccessToken(twitchDest.linkedAccountId, 'TWITCH'); - if (token) { - const live = await isTwitchStreamLive(token, twitchDest.broadcastId!); - if (!live) { - await markPlanEnded(plan); - continue; - } - } - } catch { - // Non-fatal - } - } - } - } - // GET /streams/plans — list plans fastify.get('/streams/plans', { preHandler: [requireAuth], @@ -152,7 +51,7 @@ const planRoutes: FastifyPluginAsync = async (fastify) => { const totalPlans = await fastify.prisma.streamPlan.count(); request.log.info({ userId: request.userId, userPlans: plans.length, totalPlans }, 'List plans'); - await autoDetectEndedPlans(plans, request.userId); + await autoDetectEndedPlans(fastify.prisma, plans); return plans.map(formatPlan); }); @@ -303,7 +202,7 @@ const planRoutes: FastifyPluginAsync = async (fastify) => { }); if (!plan) throw new AppError(404, 'Stream plan not found'); - await autoDetectEndedPlans([plan], request.userId); + await autoDetectEndedPlans(fastify.prisma, [plan]); return formatPlan(plan); }); diff --git a/src/services/stream-status.service.ts b/src/services/stream-status.service.ts new file mode 100644 index 0000000..dc45430 --- /dev/null +++ b/src/services/stream-status.service.ts @@ -0,0 +1,109 @@ +import { PrismaClient } from '@prisma/client'; +import { getYouTubeBroadcastStatus, refreshYouTubeToken } from './youtube.service.js'; +import { isTwitchStreamLive, refreshTwitchToken } from './twitch.service.js'; +import { decrypt, encrypt } from './crypto.service.js'; + +/** + * Check service-side broadcast status for LIVE/READY plans and auto-end + * plans whose streams have finished on the platform side. + * + * Works without a userId filter so it can be called from any context + * (owner's plan list, public feed, etc.). + */ +export async function autoDetectEndedPlans(prisma: PrismaClient, plans: any[]) { + const tokenCache = new Map(); + + async function getAccessToken(linkedAccountId: string, service: 'YOUTUBE' | 'TWITCH'): Promise { + const cached = tokenCache.get(linkedAccountId); + if (cached) return cached; + + const account = await prisma.linkedAccount.findFirst({ + where: { id: linkedAccountId }, + }); + if (!account) return null; + + let accessToken = decrypt(account.accessTokenEnc, account.accessTokenIv); + + if (account.tokenExpiresAt < new Date(Date.now() + 60_000)) { + const refreshToken = decrypt(account.refreshTokenEnc, account.refreshTokenIv); + const result = service === 'YOUTUBE' + ? await refreshYouTubeToken(refreshToken) + : await refreshTwitchToken(refreshToken); + accessToken = result.accessToken; + const accessEnc = encrypt(accessToken); + const updateData: any = { + accessTokenEnc: accessEnc.ciphertext, + accessTokenIv: accessEnc.iv, + tokenExpiresAt: new Date(Date.now() + result.expiresIn * 1000), + }; + if (service === 'TWITCH' && 'refreshToken' in result) { + const refreshEnc = encrypt(result.refreshToken as string); + updateData.refreshTokenEnc = refreshEnc.ciphertext; + updateData.refreshTokenIv = refreshEnc.iv; + } + await prisma.linkedAccount.update({ + where: { id: account.id }, + data: updateData, + }); + } + + tokenCache.set(linkedAccountId, accessToken); + return accessToken; + } + + async function markPlanEnded(plan: any) { + for (const dest of plan.destinations) { + await prisma.streamDestination.update({ + where: { id: dest.id }, + data: { status: 'ENDED' }, + }); + } + await prisma.streamPlan.update({ + where: { id: plan.id }, + data: { status: 'ENDED' }, + }); + plan.status = 'ENDED'; + } + + for (const plan of plans) { + if (plan.status !== 'LIVE' && plan.status !== 'READY') continue; + + // Check YouTube broadcast status + const ytDest = plan.destinations.find( + (d: any) => d.serviceId === 'YOUTUBE' && d.broadcastId, + ); + if (ytDest) { + try { + const token = await getAccessToken(ytDest.linkedAccountId, 'YOUTUBE'); + if (token) { + const ytStatus = await getYouTubeBroadcastStatus(token, ytDest.broadcastId!); + if (ytStatus === 'complete' || ytStatus === 'revoked') { + await markPlanEnded(plan); + continue; + } + } + } catch { + // Non-fatal + } + } + + // Check Twitch stream status + const twitchDest = plan.destinations.find( + (d: any) => d.serviceId === 'TWITCH' && d.broadcastId, + ); + if (twitchDest) { + try { + const token = await getAccessToken(twitchDest.linkedAccountId, 'TWITCH'); + if (token) { + const live = await isTwitchStreamLive(token, twitchDest.broadcastId!); + if (!live) { + await markPlanEnded(plan); + continue; + } + } + } catch { + // Non-fatal + } + } + } +}