Auto-detect ended Twitch streams via Helix API polling
This commit is contained in:
@@ -4,6 +4,7 @@ import { join } from 'path';
|
|||||||
import { requireAuth } from '../../middleware/require-auth.js';
|
import { requireAuth } from '../../middleware/require-auth.js';
|
||||||
import { AppError } from '../../plugins/error-handler.js';
|
import { AppError } from '../../plugins/error-handler.js';
|
||||||
import { getYouTubeBroadcastStatus, refreshYouTubeToken } from '../../services/youtube.service.js';
|
import { getYouTubeBroadcastStatus, refreshYouTubeToken } from '../../services/youtube.service.js';
|
||||||
|
import { isTwitchStreamLive, refreshTwitchToken } from '../../services/twitch.service.js';
|
||||||
import { decrypt, encrypt } from '../../services/crypto.service.js';
|
import { decrypt, encrypt } from '../../services/crypto.service.js';
|
||||||
import { PREVIEWS_DIR } from './preview.js';
|
import { PREVIEWS_DIR } from './preview.js';
|
||||||
import type { CreateStreamPlanBody, CreateDestinationBody, UpdateStreamPlanBody, StreamPlanResponse, StreamDestinationResponse } from '../../types/api.js';
|
import type { CreateStreamPlanBody, CreateDestinationBody, UpdateStreamPlanBody, StreamPlanResponse, StreamDestinationResponse } from '../../types/api.js';
|
||||||
@@ -37,46 +38,50 @@ export function formatPlan(plan: any): StreamPlanResponse {
|
|||||||
|
|
||||||
const planRoutes: FastifyPluginAsync = async (fastify) => {
|
const planRoutes: FastifyPluginAsync = async (fastify) => {
|
||||||
|
|
||||||
/** Check YouTube broadcast status and auto-end plans that are over */
|
/** Check service-side broadcast status and auto-end plans that are over */
|
||||||
async function autoDetectEndedPlans(plans: any[], userId: string) {
|
async function autoDetectEndedPlans(plans: any[], userId: string) {
|
||||||
// Cache decrypted tokens by linkedAccountId to avoid redundant decrypts
|
// Cache decrypted tokens by linkedAccountId to avoid redundant decrypts
|
||||||
const tokenCache = new Map<string, string>();
|
const tokenCache = new Map<string, string>();
|
||||||
|
|
||||||
for (const plan of plans) {
|
async function getAccessToken(linkedAccountId: string, service: 'YOUTUBE' | 'TWITCH'): Promise<string | null> {
|
||||||
if (plan.status !== 'LIVE' && plan.status !== 'READY') continue;
|
const cached = tokenCache.get(linkedAccountId);
|
||||||
const ytDest = plan.destinations.find(
|
if (cached) return cached;
|
||||||
(d: any) => d.serviceId === 'YOUTUBE' && d.broadcastId,
|
|
||||||
);
|
|
||||||
if (!ytDest) continue;
|
|
||||||
|
|
||||||
try {
|
|
||||||
let ytAccessToken = tokenCache.get(ytDest.linkedAccountId);
|
|
||||||
if (!ytAccessToken) {
|
|
||||||
const account = await fastify.prisma.linkedAccount.findFirst({
|
const account = await fastify.prisma.linkedAccount.findFirst({
|
||||||
where: { id: ytDest.linkedAccountId, userId },
|
where: { id: linkedAccountId, userId },
|
||||||
});
|
});
|
||||||
if (!account) continue;
|
if (!account) return null;
|
||||||
|
|
||||||
|
let accessToken = decrypt(account.accessTokenEnc, account.accessTokenIv);
|
||||||
|
|
||||||
ytAccessToken = decrypt(account.accessTokenEnc, account.accessTokenIv);
|
|
||||||
if (account.tokenExpiresAt < new Date(Date.now() + 60_000)) {
|
if (account.tokenExpiresAt < new Date(Date.now() + 60_000)) {
|
||||||
const refreshToken = decrypt(account.refreshTokenEnc, account.refreshTokenIv);
|
const refreshToken = decrypt(account.refreshTokenEnc, account.refreshTokenIv);
|
||||||
const result = await refreshYouTubeToken(refreshToken);
|
const result = service === 'YOUTUBE'
|
||||||
ytAccessToken = result.accessToken;
|
? await refreshYouTubeToken(refreshToken)
|
||||||
const accessEnc = encrypt(ytAccessToken);
|
: await refreshTwitchToken(refreshToken);
|
||||||
await fastify.prisma.linkedAccount.update({
|
accessToken = result.accessToken;
|
||||||
where: { id: account.id },
|
const accessEnc = encrypt(accessToken);
|
||||||
data: {
|
const updateData: any = {
|
||||||
accessTokenEnc: accessEnc.ciphertext,
|
accessTokenEnc: accessEnc.ciphertext,
|
||||||
accessTokenIv: accessEnc.iv,
|
accessTokenIv: accessEnc.iv,
|
||||||
tokenExpiresAt: new Date(Date.now() + result.expiresIn * 1000),
|
tokenExpiresAt: new Date(Date.now() + result.expiresIn * 1000),
|
||||||
},
|
};
|
||||||
|
if (service === 'TWITCH' && 'refreshToken' in result) {
|
||||||
|
const refreshEnc = encrypt(result.refreshToken as string);
|
||||||
|
updateData.refreshTokenEnc = refreshEnc.ciphertext;
|
||||||
|
updateData.refreshTokenIv = refreshEnc.iv;
|
||||||
|
}
|
||||||
|
await fastify.prisma.linkedAccount.update({
|
||||||
|
where: { id: account.id },
|
||||||
|
data: updateData,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
tokenCache.set(ytDest.linkedAccountId, ytAccessToken);
|
|
||||||
|
tokenCache.set(linkedAccountId, accessToken);
|
||||||
|
return accessToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
const ytStatus = await getYouTubeBroadcastStatus(ytAccessToken, ytDest.broadcastId!);
|
async function markPlanEnded(plan: any) {
|
||||||
if (ytStatus === 'complete' || ytStatus === 'revoked') {
|
|
||||||
for (const dest of plan.destinations) {
|
for (const dest of plan.destinations) {
|
||||||
await fastify.prisma.streamDestination.update({
|
await fastify.prisma.streamDestination.update({
|
||||||
where: { id: dest.id },
|
where: { id: dest.id },
|
||||||
@@ -89,10 +94,48 @@ const planRoutes: FastifyPluginAsync = async (fastify) => {
|
|||||||
});
|
});
|
||||||
plan.status = 'ENDED';
|
plan.status = 'ENDED';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (const plan of plans) {
|
||||||
|
if (plan.status !== 'LIVE' && plan.status !== 'READY') continue;
|
||||||
|
|
||||||
|
// Check YouTube broadcast status
|
||||||
|
const ytDest = plan.destinations.find(
|
||||||
|
(d: any) => d.serviceId === 'YOUTUBE' && d.broadcastId,
|
||||||
|
);
|
||||||
|
if (ytDest) {
|
||||||
|
try {
|
||||||
|
const token = await getAccessToken(ytDest.linkedAccountId, 'YOUTUBE');
|
||||||
|
if (token) {
|
||||||
|
const ytStatus = await getYouTubeBroadcastStatus(token, ytDest.broadcastId!);
|
||||||
|
if (ytStatus === 'complete' || ytStatus === 'revoked') {
|
||||||
|
await markPlanEnded(plan);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch {
|
} catch {
|
||||||
// Non-fatal
|
// Non-fatal
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check Twitch stream status
|
||||||
|
const twitchDest = plan.destinations.find(
|
||||||
|
(d: any) => d.serviceId === 'TWITCH' && d.broadcastId,
|
||||||
|
);
|
||||||
|
if (twitchDest) {
|
||||||
|
try {
|
||||||
|
const token = await getAccessToken(twitchDest.linkedAccountId, 'TWITCH');
|
||||||
|
if (token) {
|
||||||
|
const live = await isTwitchStreamLive(token, twitchDest.broadcastId!);
|
||||||
|
if (!live) {
|
||||||
|
await markPlanEnded(plan);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// Non-fatal
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// GET /streams/plans — list plans
|
// GET /streams/plans — list plans
|
||||||
|
|||||||
@@ -127,6 +127,28 @@ export async function revokeTwitchToken(accessToken: string): Promise<void> {
|
|||||||
|
|
||||||
// ── Twitch Helix Stream Management ──────────────────────
|
// ── Twitch Helix Stream Management ──────────────────────
|
||||||
|
|
||||||
|
/** Check if a Twitch channel is currently live. Returns true if live, false if offline. */
|
||||||
|
export async function isTwitchStreamLive(
|
||||||
|
accessToken: string,
|
||||||
|
broadcasterId: string,
|
||||||
|
): Promise<boolean> {
|
||||||
|
const res = await fetch(
|
||||||
|
`https://api.twitch.tv/helix/streams?user_id=${broadcasterId}`,
|
||||||
|
{
|
||||||
|
headers: {
|
||||||
|
Authorization: `Bearer ${accessToken}`,
|
||||||
|
'Client-Id': config.twitch.clientId,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
);
|
||||||
|
if (!res.ok) {
|
||||||
|
const body = await res.text();
|
||||||
|
throw new Error(`Twitch get streams failed: ${res.status} ${body}`);
|
||||||
|
}
|
||||||
|
const data = (await res.json()) as { data: { type: string }[] };
|
||||||
|
return data.data.length > 0 && data.data[0].type === 'live';
|
||||||
|
}
|
||||||
|
|
||||||
export async function getTwitchStreamKey(
|
export async function getTwitchStreamKey(
|
||||||
accessToken: string,
|
accessToken: string,
|
||||||
broadcasterId: string,
|
broadcasterId: string,
|
||||||
|
|||||||
Reference in New Issue
Block a user