import type { PrismaClient } from '@prisma/client'; import type { FastifyBaseLogger } from 'fastify'; import type { WebSocket } from 'ws'; import { decrypt, encrypt } from './crypto.service.js'; import { refreshYouTubeToken } from './youtube.service.js'; import { refreshTwitchToken } from './twitch.service.js'; import { getYouTubeLiveChatId, pollYouTubeChatMessages, sendYouTubeChatMessage, } from './youtube-chat.service.js'; import { TwitchChatClient } from './twitch-chat.service.js'; interface ChatSession { planId: string; userId: string; socket: WebSocket; youtubePollers: Map; liveChatId: string; pageToken: string }>; twitchClients: Map; } async function getDecryptedToken( prisma: PrismaClient, userId: string, linkedAccountId: string, ): Promise<{ account: any; accessToken: string }> { const account = await (prisma as any).linkedAccount.findFirst({ where: { id: linkedAccountId, userId }, }); if (!account) throw new Error(`Linked account ${linkedAccountId} not found`); // Lazy refresh if token expires within 60s if (account.tokenExpiresAt < new Date(Date.now() + 60 * 1000)) { const refreshToken = decrypt(account.refreshTokenEnc, account.refreshTokenIv); let newAccess: string; let newRefresh: string | undefined; let expiresIn: number; if (account.serviceId === 'YOUTUBE') { const result = await refreshYouTubeToken(refreshToken); newAccess = result.accessToken; expiresIn = result.expiresIn; } else { const result = await refreshTwitchToken(refreshToken); newAccess = result.accessToken; newRefresh = result.refreshToken; expiresIn = result.expiresIn; } const accessEnc = encrypt(newAccess); const updateData: any = { accessTokenEnc: accessEnc.ciphertext, accessTokenIv: accessEnc.iv, tokenExpiresAt: new Date(Date.now() + expiresIn * 1000), }; if (newRefresh) { const refreshEnc = encrypt(newRefresh); updateData.refreshTokenEnc = refreshEnc.ciphertext; updateData.refreshTokenIv = refreshEnc.iv; } await (prisma as any).linkedAccount.update({ where: { id: account.id }, data: updateData, }); return { account, accessToken: newAccess }; } return { account, accessToken: decrypt(account.accessTokenEnc, account.accessTokenIv), }; } interface PortalSubscriber { userId: string; displayName: string; avatarUrl: string | null; socket: WebSocket; } export class ChatManager { private sessions = new Map(); // key: `${userId}:${planId}` private portalSubscribers = new Map>(); // key: planId private prisma: PrismaClient; private logger: FastifyBaseLogger; constructor(prisma: PrismaClient, logger: FastifyBaseLogger) { this.prisma = prisma; this.logger = logger; } async startChat(planId: string, userId: string, socket: WebSocket): Promise { const sessionKey = `${userId}:${planId}`; this.logger.info({ planId, userId, sessionKey }, 'startChat called'); // Stop existing session for this plan await this.stopChat(planId, userId); const plan = await (this.prisma as any).streamPlan.findFirst({ where: { id: planId, userId }, include: { destinations: true }, }); if (!plan) { this.logger.warn({ planId, userId }, 'startChat: plan not found'); this.sendToSocket(socket, { type: 'chat_status', planId, error: 'Plan not found' }); return; } this.logger.info({ planId, destCount: plan.destinations.length, destServices: plan.destinations.map((d: any) => d.serviceId) }, 'startChat: plan loaded'); const session: ChatSession = { planId, userId, socket, youtubePollers: new Map(), twitchClients: new Map(), }; this.sessions.set(sessionKey, session); const chatPromises: Promise[] = []; for (const dest of plan.destinations) { if (dest.serviceId === 'YOUTUBE' && dest.linkedAccountId) { chatPromises.push(this.startYouTubeChat(session, dest)); } else if (dest.serviceId === 'TWITCH' && dest.linkedAccountId) { chatPromises.push(this.startTwitchChat(session, dest)); } } await Promise.allSettled(chatPromises); } async handleSendMessage( planId: string, userId: string, destinationId: string, text: string, ): Promise { const sessionKey = `${userId}:${planId}`; const session = this.sessions.get(sessionKey); if (!session) return; // Check if it's a YouTube destination const ytPoller = session.youtubePollers.get(destinationId); if (ytPoller) { try { const { accessToken } = await getDecryptedToken(this.prisma, userId, destinationId); await sendYouTubeChatMessage(accessToken, ytPoller.liveChatId, text); } catch (err) { this.logger.error({ err, destinationId }, 'Failed to send YouTube chat message'); } return; } // Check if it's a Twitch destination const twitchClient = session.twitchClients.get(destinationId); if (twitchClient) { twitchClient.sendMessage(text); // Echo sent message back to app (Twitch IRC doesn't echo your own PRIVMSGs) try { const { account } = await getDecryptedToken(this.prisma, userId, destinationId); this.sendEcho(session, 'TWITCH', destinationId, account.displayName, text); } catch { // Still echo with fallback name this.sendEcho(session, 'TWITCH', destinationId, 'You', text); } } } private sendEcho( session: ChatSession, service: string, destinationId: string, authorName: string, text: string, ): void { this.sendToSocket(session.socket, { type: 'chat_message', planId: session.planId, service, destinationId, message: { id: `echo-${Date.now()}-${Math.random().toString(36).substring(2, 8)}`, authorName, authorImageUrl: null, text, timestamp: Date.now(), isModerator: false, isBroadcaster: true, color: null, }, }); } async subscribePortalChat(planId: string, userId: string, socket: WebSocket): Promise { const user = await (this.prisma as any).user.findUnique({ where: { id: userId } }); if (!user) return; if (!this.portalSubscribers.has(planId)) { this.portalSubscribers.set(planId, new Set()); } // Remove existing subscription for this user+plan const subs = this.portalSubscribers.get(planId)!; for (const sub of subs) { if (sub.userId === userId) { subs.delete(sub); break; } } subs.add({ userId, displayName: user.displayName, avatarUrl: user.avatarUrl, socket, }); this.logger.info({ planId, userId }, 'Portal chat subscribed'); } async handlePortalComment(planId: string, userId: string, text: string): Promise { const user = await (this.prisma as any).user.findUnique({ where: { id: userId } }); if (!user) return; const message = { id: `portal-${Date.now()}-${Math.random().toString(36).substring(2, 8)}`, authorName: user.displayName, authorImageUrl: user.avatarUrl, text, timestamp: Date.now(), isModerator: false, isBroadcaster: false, color: '#00BCD4', }; // Broadcast to all portal subscribers watching this plan const subs = this.portalSubscribers.get(planId); if (subs) { for (const sub of subs) { this.sendToSocket(sub.socket, { type: 'chat_message', planId, service: 'PORTAL', destinationId: 'portal', message, }); } } // Also send to the plan owner's chat session (so it shows up in their Android app) const plan = await (this.prisma as any).streamPlan.findUnique({ where: { id: planId } }); if (plan) { const ownerSession = this.sessions.get(`${plan.userId}:${planId}`); if (ownerSession) { this.sendToSocket(ownerSession.socket, { type: 'chat_message', planId, service: 'PORTAL', destinationId: 'portal', message, }); } } } async handleLike(planId: string, userId: string): Promise { // Toggle like in DB const existing = await (this.prisma as any).like.findUnique({ where: { userId_planId: { userId, planId } }, }); if (existing) { await (this.prisma as any).like.delete({ where: { id: existing.id } }); } else { await (this.prisma as any).like.create({ data: { userId, planId } }); } const count = await (this.prisma as any).like.count({ where: { planId } }); // Broadcast like update to all portal subscribers const subs = this.portalSubscribers.get(planId); if (subs) { for (const sub of subs) { this.sendToSocket(sub.socket, { type: 'like_update', planId, count, isLiked: sub.userId === userId ? !existing : undefined, }); } } } async stopChat(planId: string, userId: string): Promise { const sessionKey = `${userId}:${planId}`; const session = this.sessions.get(sessionKey); if (!session) return; // Stop YouTube pollers for (const [, poller] of session.youtubePollers) { clearTimeout(poller.timer); } session.youtubePollers.clear(); // Disconnect Twitch clients for (const [, client] of session.twitchClients) { client.disconnect(); } session.twitchClients.clear(); this.sessions.delete(sessionKey); } stopAllForSocket(socket: WebSocket): void { for (const [key, session] of this.sessions) { if (session.socket === socket) { // Stop YouTube pollers for (const [, poller] of session.youtubePollers) { clearTimeout(poller.timer); } session.youtubePollers.clear(); // Disconnect Twitch clients for (const [, client] of session.twitchClients) { client.disconnect(); } session.twitchClients.clear(); this.sessions.delete(key); } } // Clean up portal subscriptions for this socket for (const [planId, subs] of this.portalSubscribers) { for (const sub of subs) { if (sub.socket === socket) { subs.delete(sub); } } if (subs.size === 0) { this.portalSubscribers.delete(planId); } } } private async startYouTubeChat(session: ChatSession, dest: any): Promise { try { const { accessToken } = await getDecryptedToken( this.prisma, session.userId, dest.linkedAccountId, ); // Need broadcastId to get liveChatId if (!dest.broadcastId) { this.sendToSocket(session.socket, { type: 'chat_status', planId: session.planId, service: 'YOUTUBE', destinationId: dest.linkedAccountId, connected: false, error: 'No broadcast ID', }); return; } // Retry getting liveChatId — YouTube may still be transitioning to live let liveChatId: string | null = null; const MAX_RETRIES = 12; // ~60s total (12 * 5s) for (let attempt = 0; attempt < MAX_RETRIES; attempt++) { // Re-check session is still alive if (!this.sessions.has(`${session.userId}:${session.planId}`)) return; const { accessToken: freshToken } = await getDecryptedToken( this.prisma, session.userId, dest.linkedAccountId, ); liveChatId = await getYouTubeLiveChatId(freshToken, dest.broadcastId); if (liveChatId) break; this.logger.info( { planId: session.planId, broadcastId: dest.broadcastId, attempt: attempt + 1 }, 'YouTube liveChatId not yet available, retrying...', ); if (attempt === 0) { this.sendToSocket(session.socket, { type: 'chat_status', planId: session.planId, service: 'YOUTUBE', destinationId: dest.linkedAccountId, connected: false, error: 'Waiting for broadcast to go live...', }); } await new Promise(resolve => setTimeout(resolve, 5000)); } if (!liveChatId) { this.sendToSocket(session.socket, { type: 'chat_status', planId: session.planId, service: 'YOUTUBE', destinationId: dest.linkedAccountId, connected: false, error: 'No active live chat after retries', }); return; } this.sendToSocket(session.socket, { type: 'chat_status', planId: session.planId, service: 'YOUTUBE', destinationId: dest.linkedAccountId, connected: true, }); // Start polling loop const pollerState = { timer: setTimeout(() => {}, 0), liveChatId, pageToken: '' }; session.youtubePollers.set(dest.linkedAccountId, pollerState); const poll = async () => { // Verify session is still alive if (!session.youtubePollers.has(dest.linkedAccountId)) { this.logger.info({ planId: session.planId }, 'YouTube poll skipped: poller removed'); return; } try { this.logger.info({ planId: session.planId, liveChatId }, 'YouTube poll executing'); const { accessToken: token } = await getDecryptedToken( this.prisma, session.userId, dest.linkedAccountId, ); const result = await pollYouTubeChatMessages( token, liveChatId, pollerState.pageToken || undefined, ); this.logger.info({ planId: session.planId, messageCount: result.messages.length, nextInterval: result.pollingIntervalMillis }, 'YouTube poll result'); pollerState.pageToken = result.nextPageToken; for (const msg of result.messages) { this.sendToSocket(session.socket, { type: 'chat_message', planId: session.planId, service: 'YOUTUBE', destinationId: dest.linkedAccountId, message: { id: msg.id, authorName: msg.authorName, authorImageUrl: msg.authorImageUrl, text: msg.text, timestamp: new Date(msg.publishedAt).getTime(), isModerator: msg.isModerator, isBroadcaster: msg.isChatOwner, color: null, }, }); } // Schedule next poll respecting API interval const interval = Math.max(result.pollingIntervalMillis, 5000); pollerState.timer = setTimeout(poll, interval); } catch (err) { this.logger.error({ err, planId: session.planId }, 'YouTube chat poll error'); this.sendToSocket(session.socket, { type: 'chat_status', planId: session.planId, service: 'YOUTUBE', destinationId: dest.linkedAccountId, connected: false, error: 'Poll failed', }); // Retry after 10s pollerState.timer = setTimeout(poll, 10_000); } }; // First poll immediately clearTimeout(pollerState.timer); pollerState.timer = setTimeout(poll, 0); } catch (err) { this.logger.error({ err, planId: session.planId }, 'Failed to start YouTube chat'); this.sendToSocket(session.socket, { type: 'chat_status', planId: session.planId, service: 'YOUTUBE', destinationId: dest.linkedAccountId, connected: false, error: 'Failed to initialize', }); } } private async startTwitchChat(session: ChatSession, dest: any): Promise { this.logger.info({ planId: session.planId, destId: dest.linkedAccountId }, 'startTwitchChat called'); try { const { account, accessToken } = await getDecryptedToken( this.prisma, session.userId, dest.linkedAccountId, ); const channel = account.displayName; const client = new TwitchChatClient(channel, accessToken, account.displayName); client.on('connected', () => { this.sendToSocket(session.socket, { type: 'chat_status', planId: session.planId, service: 'TWITCH', destinationId: dest.linkedAccountId, connected: true, }); }); client.on('message', (msg) => { this.sendToSocket(session.socket, { type: 'chat_message', planId: session.planId, service: 'TWITCH', destinationId: dest.linkedAccountId, message: { id: msg.id, authorName: msg.authorName, authorImageUrl: null, text: msg.text, timestamp: msg.timestamp, isModerator: msg.isModerator, isBroadcaster: msg.isBroadcaster, color: msg.color || null, }, }); }); client.on('disconnected', () => { this.sendToSocket(session.socket, { type: 'chat_status', planId: session.planId, service: 'TWITCH', destinationId: dest.linkedAccountId, connected: false, }); }); client.on('error', (err: Error) => { this.logger.error({ err, planId: session.planId }, 'Twitch chat error'); this.sendToSocket(session.socket, { type: 'chat_status', planId: session.planId, service: 'TWITCH', destinationId: dest.linkedAccountId, connected: false, error: err.message, }); }); session.twitchClients.set(dest.linkedAccountId, client); client.connect(); } catch (err) { this.logger.error({ err, planId: session.planId }, 'Failed to start Twitch chat'); this.sendToSocket(session.socket, { type: 'chat_status', planId: session.planId, service: 'TWITCH', destinationId: dest.linkedAccountId, connected: false, error: 'Failed to initialize', }); } } private sendToSocket(socket: WebSocket, data: Record): void { try { if (socket.readyState === 1) { // WebSocket.OPEN socket.send(JSON.stringify(data)); } } catch (err) { this.logger.error({ err }, 'Failed to send to WebSocket'); } } }