YouTube/Twitch live chat backend WebSocket proxy
- YouTube chat polling via liveBroadcasts + liveChat/messages APIs - Twitch IRC WebSocket client with IRCv3 tag parsing - ChatManager orchestrator with token refresh, retry logic - WebSocket endpoint at /chat/ws with JWT auth - Added chat:read, chat:edit to Twitch OAuth scopes
This commit is contained in:
433
src/services/chat-manager.service.ts
Normal file
433
src/services/chat-manager.service.ts
Normal file
@@ -0,0 +1,433 @@
|
||||
import type { PrismaClient } from '@prisma/client';
|
||||
import type { FastifyBaseLogger } from 'fastify';
|
||||
import type { WebSocket } from 'ws';
|
||||
import { decrypt, encrypt } from './crypto.service.js';
|
||||
import { refreshYouTubeToken } from './youtube.service.js';
|
||||
import { refreshTwitchToken } from './twitch.service.js';
|
||||
import {
|
||||
getYouTubeLiveChatId,
|
||||
pollYouTubeChatMessages,
|
||||
sendYouTubeChatMessage,
|
||||
} from './youtube-chat.service.js';
|
||||
import { TwitchChatClient } from './twitch-chat.service.js';
|
||||
|
||||
interface ChatSession {
|
||||
planId: string;
|
||||
userId: string;
|
||||
socket: WebSocket;
|
||||
youtubePollers: Map<string, { timer: ReturnType<typeof setTimeout>; liveChatId: string; pageToken: string }>;
|
||||
twitchClients: Map<string, TwitchChatClient>;
|
||||
}
|
||||
|
||||
async function getDecryptedToken(
|
||||
prisma: PrismaClient,
|
||||
userId: string,
|
||||
linkedAccountId: string,
|
||||
): Promise<{ account: any; accessToken: string }> {
|
||||
const account = await (prisma as any).linkedAccount.findFirst({
|
||||
where: { id: linkedAccountId, userId },
|
||||
});
|
||||
if (!account) throw new Error(`Linked account ${linkedAccountId} not found`);
|
||||
|
||||
// Lazy refresh if token expires within 60s
|
||||
if (account.tokenExpiresAt < new Date(Date.now() + 60 * 1000)) {
|
||||
const refreshToken = decrypt(account.refreshTokenEnc, account.refreshTokenIv);
|
||||
let newAccess: string;
|
||||
let newRefresh: string | undefined;
|
||||
let expiresIn: number;
|
||||
|
||||
if (account.serviceId === 'YOUTUBE') {
|
||||
const result = await refreshYouTubeToken(refreshToken);
|
||||
newAccess = result.accessToken;
|
||||
expiresIn = result.expiresIn;
|
||||
} else {
|
||||
const result = await refreshTwitchToken(refreshToken);
|
||||
newAccess = result.accessToken;
|
||||
newRefresh = result.refreshToken;
|
||||
expiresIn = result.expiresIn;
|
||||
}
|
||||
|
||||
const accessEnc = encrypt(newAccess);
|
||||
const updateData: any = {
|
||||
accessTokenEnc: accessEnc.ciphertext,
|
||||
accessTokenIv: accessEnc.iv,
|
||||
tokenExpiresAt: new Date(Date.now() + expiresIn * 1000),
|
||||
};
|
||||
if (newRefresh) {
|
||||
const refreshEnc = encrypt(newRefresh);
|
||||
updateData.refreshTokenEnc = refreshEnc.ciphertext;
|
||||
updateData.refreshTokenIv = refreshEnc.iv;
|
||||
}
|
||||
|
||||
await (prisma as any).linkedAccount.update({
|
||||
where: { id: account.id },
|
||||
data: updateData,
|
||||
});
|
||||
|
||||
return { account, accessToken: newAccess };
|
||||
}
|
||||
|
||||
return {
|
||||
account,
|
||||
accessToken: decrypt(account.accessTokenEnc, account.accessTokenIv),
|
||||
};
|
||||
}
|
||||
|
||||
export class ChatManager {
|
||||
private sessions = new Map<string, ChatSession>(); // key: `${userId}:${planId}`
|
||||
private prisma: PrismaClient;
|
||||
private logger: FastifyBaseLogger;
|
||||
|
||||
constructor(prisma: PrismaClient, logger: FastifyBaseLogger) {
|
||||
this.prisma = prisma;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
async startChat(planId: string, userId: string, socket: WebSocket): Promise<void> {
|
||||
const sessionKey = `${userId}:${planId}`;
|
||||
this.logger.info({ planId, userId, sessionKey }, 'startChat called');
|
||||
|
||||
// Stop existing session for this plan
|
||||
await this.stopChat(planId, userId);
|
||||
|
||||
const plan = await (this.prisma as any).streamPlan.findFirst({
|
||||
where: { id: planId, userId },
|
||||
include: { destinations: true },
|
||||
});
|
||||
if (!plan) {
|
||||
this.logger.warn({ planId, userId }, 'startChat: plan not found');
|
||||
this.sendToSocket(socket, { type: 'chat_status', planId, error: 'Plan not found' });
|
||||
return;
|
||||
}
|
||||
this.logger.info({ planId, destCount: plan.destinations.length, destServices: plan.destinations.map((d: any) => d.serviceId) }, 'startChat: plan loaded');
|
||||
|
||||
const session: ChatSession = {
|
||||
planId,
|
||||
userId,
|
||||
socket,
|
||||
youtubePollers: new Map(),
|
||||
twitchClients: new Map(),
|
||||
};
|
||||
|
||||
this.sessions.set(sessionKey, session);
|
||||
|
||||
for (const dest of plan.destinations) {
|
||||
if (dest.serviceId === 'YOUTUBE' && dest.linkedAccountId) {
|
||||
await this.startYouTubeChat(session, dest);
|
||||
} else if (dest.serviceId === 'TWITCH' && dest.linkedAccountId) {
|
||||
await this.startTwitchChat(session, dest);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async handleSendMessage(
|
||||
planId: string,
|
||||
userId: string,
|
||||
destinationId: string,
|
||||
text: string,
|
||||
): Promise<void> {
|
||||
const sessionKey = `${userId}:${planId}`;
|
||||
const session = this.sessions.get(sessionKey);
|
||||
if (!session) return;
|
||||
|
||||
// Check if it's a YouTube destination
|
||||
const ytPoller = session.youtubePollers.get(destinationId);
|
||||
if (ytPoller) {
|
||||
try {
|
||||
const { accessToken } = await getDecryptedToken(this.prisma, userId, destinationId);
|
||||
await sendYouTubeChatMessage(accessToken, ytPoller.liveChatId, text);
|
||||
} catch (err) {
|
||||
this.logger.error({ err, destinationId }, 'Failed to send YouTube chat message');
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if it's a Twitch destination
|
||||
const twitchClient = session.twitchClients.get(destinationId);
|
||||
if (twitchClient) {
|
||||
twitchClient.sendMessage(text);
|
||||
}
|
||||
}
|
||||
|
||||
async stopChat(planId: string, userId: string): Promise<void> {
|
||||
const sessionKey = `${userId}:${planId}`;
|
||||
const session = this.sessions.get(sessionKey);
|
||||
if (!session) return;
|
||||
|
||||
// Stop YouTube pollers
|
||||
for (const [, poller] of session.youtubePollers) {
|
||||
clearTimeout(poller.timer);
|
||||
}
|
||||
session.youtubePollers.clear();
|
||||
|
||||
// Disconnect Twitch clients
|
||||
for (const [, client] of session.twitchClients) {
|
||||
client.disconnect();
|
||||
}
|
||||
session.twitchClients.clear();
|
||||
|
||||
this.sessions.delete(sessionKey);
|
||||
}
|
||||
|
||||
stopAllForSocket(socket: WebSocket): void {
|
||||
for (const [key, session] of this.sessions) {
|
||||
if (session.socket === socket) {
|
||||
// Stop YouTube pollers
|
||||
for (const [, poller] of session.youtubePollers) {
|
||||
clearTimeout(poller.timer);
|
||||
}
|
||||
session.youtubePollers.clear();
|
||||
|
||||
// Disconnect Twitch clients
|
||||
for (const [, client] of session.twitchClients) {
|
||||
client.disconnect();
|
||||
}
|
||||
session.twitchClients.clear();
|
||||
|
||||
this.sessions.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async startYouTubeChat(session: ChatSession, dest: any): Promise<void> {
|
||||
try {
|
||||
const { accessToken } = await getDecryptedToken(
|
||||
this.prisma,
|
||||
session.userId,
|
||||
dest.linkedAccountId,
|
||||
);
|
||||
|
||||
// Need broadcastId to get liveChatId
|
||||
if (!dest.broadcastId) {
|
||||
this.sendToSocket(session.socket, {
|
||||
type: 'chat_status',
|
||||
planId: session.planId,
|
||||
service: 'YOUTUBE',
|
||||
destinationId: dest.linkedAccountId,
|
||||
connected: false,
|
||||
error: 'No broadcast ID',
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Retry getting liveChatId — YouTube may still be transitioning to live
|
||||
let liveChatId: string | null = null;
|
||||
const MAX_RETRIES = 12; // ~60s total (12 * 5s)
|
||||
for (let attempt = 0; attempt < MAX_RETRIES; attempt++) {
|
||||
// Re-check session is still alive
|
||||
if (!this.sessions.has(`${session.userId}:${session.planId}`)) return;
|
||||
|
||||
const { accessToken: freshToken } = await getDecryptedToken(
|
||||
this.prisma,
|
||||
session.userId,
|
||||
dest.linkedAccountId,
|
||||
);
|
||||
liveChatId = await getYouTubeLiveChatId(freshToken, dest.broadcastId);
|
||||
if (liveChatId) break;
|
||||
|
||||
this.logger.info(
|
||||
{ planId: session.planId, broadcastId: dest.broadcastId, attempt: attempt + 1 },
|
||||
'YouTube liveChatId not yet available, retrying...',
|
||||
);
|
||||
|
||||
if (attempt === 0) {
|
||||
this.sendToSocket(session.socket, {
|
||||
type: 'chat_status',
|
||||
planId: session.planId,
|
||||
service: 'YOUTUBE',
|
||||
destinationId: dest.linkedAccountId,
|
||||
connected: false,
|
||||
error: 'Waiting for broadcast to go live...',
|
||||
});
|
||||
}
|
||||
|
||||
await new Promise(resolve => setTimeout(resolve, 5000));
|
||||
}
|
||||
|
||||
if (!liveChatId) {
|
||||
this.sendToSocket(session.socket, {
|
||||
type: 'chat_status',
|
||||
planId: session.planId,
|
||||
service: 'YOUTUBE',
|
||||
destinationId: dest.linkedAccountId,
|
||||
connected: false,
|
||||
error: 'No active live chat after retries',
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
this.sendToSocket(session.socket, {
|
||||
type: 'chat_status',
|
||||
planId: session.planId,
|
||||
service: 'YOUTUBE',
|
||||
destinationId: dest.linkedAccountId,
|
||||
connected: true,
|
||||
});
|
||||
|
||||
// Start polling loop
|
||||
const pollerState = { timer: setTimeout(() => {}, 0), liveChatId, pageToken: '' };
|
||||
session.youtubePollers.set(dest.linkedAccountId, pollerState);
|
||||
|
||||
const poll = async () => {
|
||||
// Verify session is still alive
|
||||
if (!session.youtubePollers.has(dest.linkedAccountId)) {
|
||||
this.logger.info({ planId: session.planId }, 'YouTube poll skipped: poller removed');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
this.logger.info({ planId: session.planId, liveChatId }, 'YouTube poll executing');
|
||||
const { accessToken: token } = await getDecryptedToken(
|
||||
this.prisma,
|
||||
session.userId,
|
||||
dest.linkedAccountId,
|
||||
);
|
||||
const result = await pollYouTubeChatMessages(
|
||||
token,
|
||||
liveChatId,
|
||||
pollerState.pageToken || undefined,
|
||||
);
|
||||
|
||||
this.logger.info({ planId: session.planId, messageCount: result.messages.length, nextInterval: result.pollingIntervalMillis }, 'YouTube poll result');
|
||||
pollerState.pageToken = result.nextPageToken;
|
||||
|
||||
for (const msg of result.messages) {
|
||||
this.sendToSocket(session.socket, {
|
||||
type: 'chat_message',
|
||||
planId: session.planId,
|
||||
service: 'YOUTUBE',
|
||||
destinationId: dest.linkedAccountId,
|
||||
message: {
|
||||
id: msg.id,
|
||||
authorName: msg.authorName,
|
||||
authorImageUrl: msg.authorImageUrl,
|
||||
text: msg.text,
|
||||
timestamp: new Date(msg.publishedAt).getTime(),
|
||||
isModerator: msg.isModerator,
|
||||
isBroadcaster: msg.isChatOwner,
|
||||
color: null,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
// Schedule next poll respecting API interval
|
||||
const interval = Math.max(result.pollingIntervalMillis, 5000);
|
||||
pollerState.timer = setTimeout(poll, interval);
|
||||
} catch (err) {
|
||||
this.logger.error({ err, planId: session.planId }, 'YouTube chat poll error');
|
||||
this.sendToSocket(session.socket, {
|
||||
type: 'chat_status',
|
||||
planId: session.planId,
|
||||
service: 'YOUTUBE',
|
||||
destinationId: dest.linkedAccountId,
|
||||
connected: false,
|
||||
error: 'Poll failed',
|
||||
});
|
||||
// Retry after 10s
|
||||
pollerState.timer = setTimeout(poll, 10_000);
|
||||
}
|
||||
};
|
||||
|
||||
// First poll immediately
|
||||
clearTimeout(pollerState.timer);
|
||||
pollerState.timer = setTimeout(poll, 0);
|
||||
} catch (err) {
|
||||
this.logger.error({ err, planId: session.planId }, 'Failed to start YouTube chat');
|
||||
this.sendToSocket(session.socket, {
|
||||
type: 'chat_status',
|
||||
planId: session.planId,
|
||||
service: 'YOUTUBE',
|
||||
destinationId: dest.linkedAccountId,
|
||||
connected: false,
|
||||
error: 'Failed to initialize',
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async startTwitchChat(session: ChatSession, dest: any): Promise<void> {
|
||||
try {
|
||||
const { account, accessToken } = await getDecryptedToken(
|
||||
this.prisma,
|
||||
session.userId,
|
||||
dest.linkedAccountId,
|
||||
);
|
||||
|
||||
const channel = account.displayName;
|
||||
const client = new TwitchChatClient(channel, accessToken, account.displayName);
|
||||
|
||||
client.on('connected', () => {
|
||||
this.sendToSocket(session.socket, {
|
||||
type: 'chat_status',
|
||||
planId: session.planId,
|
||||
service: 'TWITCH',
|
||||
destinationId: dest.linkedAccountId,
|
||||
connected: true,
|
||||
});
|
||||
});
|
||||
|
||||
client.on('message', (msg) => {
|
||||
this.sendToSocket(session.socket, {
|
||||
type: 'chat_message',
|
||||
planId: session.planId,
|
||||
service: 'TWITCH',
|
||||
destinationId: dest.linkedAccountId,
|
||||
message: {
|
||||
id: msg.id,
|
||||
authorName: msg.authorName,
|
||||
authorImageUrl: null,
|
||||
text: msg.text,
|
||||
timestamp: msg.timestamp,
|
||||
isModerator: msg.isModerator,
|
||||
isBroadcaster: msg.isBroadcaster,
|
||||
color: msg.color || null,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
client.on('disconnected', () => {
|
||||
this.sendToSocket(session.socket, {
|
||||
type: 'chat_status',
|
||||
planId: session.planId,
|
||||
service: 'TWITCH',
|
||||
destinationId: dest.linkedAccountId,
|
||||
connected: false,
|
||||
});
|
||||
});
|
||||
|
||||
client.on('error', (err: Error) => {
|
||||
this.logger.error({ err, planId: session.planId }, 'Twitch chat error');
|
||||
this.sendToSocket(session.socket, {
|
||||
type: 'chat_status',
|
||||
planId: session.planId,
|
||||
service: 'TWITCH',
|
||||
destinationId: dest.linkedAccountId,
|
||||
connected: false,
|
||||
error: err.message,
|
||||
});
|
||||
});
|
||||
|
||||
session.twitchClients.set(dest.linkedAccountId, client);
|
||||
client.connect();
|
||||
} catch (err) {
|
||||
this.logger.error({ err, planId: session.planId }, 'Failed to start Twitch chat');
|
||||
this.sendToSocket(session.socket, {
|
||||
type: 'chat_status',
|
||||
planId: session.planId,
|
||||
service: 'TWITCH',
|
||||
destinationId: dest.linkedAccountId,
|
||||
connected: false,
|
||||
error: 'Failed to initialize',
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private sendToSocket(socket: WebSocket, data: Record<string, unknown>): void {
|
||||
try {
|
||||
if (socket.readyState === 1) { // WebSocket.OPEN
|
||||
socket.send(JSON.stringify(data));
|
||||
}
|
||||
} catch (err) {
|
||||
this.logger.error({ err }, 'Failed to send to WebSocket');
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user