From b4ab9c6cf972bafcd5a6a4fa6cf904ff26fa9afd Mon Sep 17 00:00:00 2001 From: omigamedev Date: Wed, 4 Mar 2026 09:51:04 +0100 Subject: [PATCH] Auto-detect ended Twitch streams via Helix API polling --- src/routes/streams/plans.ts | 123 ++++++++++++++++++++++----------- src/services/twitch.service.ts | 22 ++++++ 2 files changed, 105 insertions(+), 40 deletions(-) diff --git a/src/routes/streams/plans.ts b/src/routes/streams/plans.ts index faa926a..a72b53d 100644 --- a/src/routes/streams/plans.ts +++ b/src/routes/streams/plans.ts @@ -4,6 +4,7 @@ import { join } from 'path'; import { requireAuth } from '../../middleware/require-auth.js'; import { AppError } from '../../plugins/error-handler.js'; import { getYouTubeBroadcastStatus, refreshYouTubeToken } from '../../services/youtube.service.js'; +import { isTwitchStreamLive, refreshTwitchToken } from '../../services/twitch.service.js'; import { decrypt, encrypt } from '../../services/crypto.service.js'; import { PREVIEWS_DIR } from './preview.js'; import type { CreateStreamPlanBody, CreateDestinationBody, UpdateStreamPlanBody, StreamPlanResponse, StreamDestinationResponse } from '../../types/api.js'; @@ -37,60 +38,102 @@ export function formatPlan(plan: any): StreamPlanResponse { const planRoutes: FastifyPluginAsync = async (fastify) => { - /** Check YouTube broadcast status and auto-end plans that are over */ + /** Check service-side broadcast status and auto-end plans that are over */ async function autoDetectEndedPlans(plans: any[], userId: string) { // Cache decrypted tokens by linkedAccountId to avoid redundant decrypts const tokenCache = new Map(); + async function getAccessToken(linkedAccountId: string, service: 'YOUTUBE' | 'TWITCH'): Promise { + const cached = tokenCache.get(linkedAccountId); + if (cached) return cached; + + const account = await fastify.prisma.linkedAccount.findFirst({ + where: { id: linkedAccountId, userId }, + }); + if (!account) return null; + + let accessToken = decrypt(account.accessTokenEnc, account.accessTokenIv); + + if (account.tokenExpiresAt < new Date(Date.now() + 60_000)) { + const refreshToken = decrypt(account.refreshTokenEnc, account.refreshTokenIv); + const result = service === 'YOUTUBE' + ? await refreshYouTubeToken(refreshToken) + : await refreshTwitchToken(refreshToken); + accessToken = result.accessToken; + const accessEnc = encrypt(accessToken); + const updateData: any = { + accessTokenEnc: accessEnc.ciphertext, + accessTokenIv: accessEnc.iv, + tokenExpiresAt: new Date(Date.now() + result.expiresIn * 1000), + }; + if (service === 'TWITCH' && 'refreshToken' in result) { + const refreshEnc = encrypt(result.refreshToken as string); + updateData.refreshTokenEnc = refreshEnc.ciphertext; + updateData.refreshTokenIv = refreshEnc.iv; + } + await fastify.prisma.linkedAccount.update({ + where: { id: account.id }, + data: updateData, + }); + } + + tokenCache.set(linkedAccountId, accessToken); + return accessToken; + } + + async function markPlanEnded(plan: any) { + for (const dest of plan.destinations) { + await fastify.prisma.streamDestination.update({ + where: { id: dest.id }, + data: { status: 'ENDED' }, + }); + } + await fastify.prisma.streamPlan.update({ + where: { id: plan.id }, + data: { status: 'ENDED' }, + }); + plan.status = 'ENDED'; + } + for (const plan of plans) { if (plan.status !== 'LIVE' && plan.status !== 'READY') continue; + + // Check YouTube broadcast status const ytDest = plan.destinations.find( (d: any) => d.serviceId === 'YOUTUBE' && d.broadcastId, ); - if (!ytDest) continue; - - try { - let ytAccessToken = tokenCache.get(ytDest.linkedAccountId); - if (!ytAccessToken) { - const account = await fastify.prisma.linkedAccount.findFirst({ - where: { id: ytDest.linkedAccountId, userId }, - }); - if (!account) continue; - - ytAccessToken = decrypt(account.accessTokenEnc, account.accessTokenIv); - if (account.tokenExpiresAt < new Date(Date.now() + 60_000)) { - const refreshToken = decrypt(account.refreshTokenEnc, account.refreshTokenIv); - const result = await refreshYouTubeToken(refreshToken); - ytAccessToken = result.accessToken; - const accessEnc = encrypt(ytAccessToken); - await fastify.prisma.linkedAccount.update({ - where: { id: account.id }, - data: { - accessTokenEnc: accessEnc.ciphertext, - accessTokenIv: accessEnc.iv, - tokenExpiresAt: new Date(Date.now() + result.expiresIn * 1000), - }, - }); + if (ytDest) { + try { + const token = await getAccessToken(ytDest.linkedAccountId, 'YOUTUBE'); + if (token) { + const ytStatus = await getYouTubeBroadcastStatus(token, ytDest.broadcastId!); + if (ytStatus === 'complete' || ytStatus === 'revoked') { + await markPlanEnded(plan); + continue; + } } - tokenCache.set(ytDest.linkedAccountId, ytAccessToken); + } catch { + // Non-fatal } + } - const ytStatus = await getYouTubeBroadcastStatus(ytAccessToken, ytDest.broadcastId!); - if (ytStatus === 'complete' || ytStatus === 'revoked') { - for (const dest of plan.destinations) { - await fastify.prisma.streamDestination.update({ - where: { id: dest.id }, - data: { status: 'ENDED' }, - }); + // Check Twitch stream status + const twitchDest = plan.destinations.find( + (d: any) => d.serviceId === 'TWITCH' && d.broadcastId, + ); + if (twitchDest) { + try { + const token = await getAccessToken(twitchDest.linkedAccountId, 'TWITCH'); + if (token) { + const live = await isTwitchStreamLive(token, twitchDest.broadcastId!); + if (!live) { + await markPlanEnded(plan); + continue; + } } - await fastify.prisma.streamPlan.update({ - where: { id: plan.id }, - data: { status: 'ENDED' }, - }); - plan.status = 'ENDED'; + } catch { + // Non-fatal } - } catch { - // Non-fatal } } } diff --git a/src/services/twitch.service.ts b/src/services/twitch.service.ts index 0079b99..d386d31 100644 --- a/src/services/twitch.service.ts +++ b/src/services/twitch.service.ts @@ -127,6 +127,28 @@ export async function revokeTwitchToken(accessToken: string): Promise { // ── Twitch Helix Stream Management ────────────────────── +/** Check if a Twitch channel is currently live. Returns true if live, false if offline. */ +export async function isTwitchStreamLive( + accessToken: string, + broadcasterId: string, +): Promise { + const res = await fetch( + `https://api.twitch.tv/helix/streams?user_id=${broadcasterId}`, + { + headers: { + Authorization: `Bearer ${accessToken}`, + 'Client-Id': config.twitch.clientId, + }, + }, + ); + if (!res.ok) { + const body = await res.text(); + throw new Error(`Twitch get streams failed: ${res.status} ${body}`); + } + const data = (await res.json()) as { data: { type: string }[] }; + return data.data.length > 0 && data.data[0].type === 'live'; +} + export async function getTwitchStreamKey( accessToken: string, broadcasterId: string,