diff --git a/package-lock.json b/package-lock.json index 1106c3d..b118360 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,6 +11,7 @@ "@fastify/cookie": "^11.0.1", "@fastify/cors": "^10.0.1", "@fastify/rate-limit": "^10.2.1", + "@fastify/websocket": "^11.2.0", "@prisma/client": "^6.4.1", "fastify": "^5.2.1", "fastify-plugin": "^5.0.1", @@ -19,6 +20,7 @@ }, "devDependencies": { "@types/node": "^22.13.4", + "@types/ws": "^8.18.1", "prisma": "^6.4.1", "tsx": "^4.19.3", "typescript": "^5.7.3", @@ -639,6 +641,27 @@ "toad-cache": "^3.7.0" } }, + "node_modules/@fastify/websocket": { + "version": "11.2.0", + "resolved": "https://registry.npmjs.org/@fastify/websocket/-/websocket-11.2.0.tgz", + "integrity": "sha512-3HrDPbAG1CzUCqnslgJxppvzaAZffieOVbLp1DAy1huCSynUWPifSvfdEDUR8HlJLp3sp1A36uOM2tJogADS8w==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/fastify" + }, + { + "type": "opencollective", + "url": "https://opencollective.com/fastify" + } + ], + "license": "MIT", + "dependencies": { + "duplexify": "^4.1.3", + "fastify-plugin": "^5.0.0", + "ws": "^8.16.0" + } + }, "node_modules/@jridgewell/sourcemap-codec": { "version": "1.5.5", "resolved": "https://registry.npmjs.org/@jridgewell/sourcemap-codec/-/sourcemap-codec-1.5.5.tgz", @@ -1138,6 +1161,16 @@ "undici-types": "~6.21.0" } }, + "node_modules/@types/ws": { + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.18.1.tgz", + "integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@vitest/expect": { "version": "3.2.4", "resolved": "https://registry.npmjs.org/@vitest/expect/-/expect-3.2.4.tgz", @@ -1527,6 +1560,18 @@ "url": "https://dotenvx.com" } }, + "node_modules/duplexify": { + "version": "4.1.3", + "resolved": "https://registry.npmjs.org/duplexify/-/duplexify-4.1.3.tgz", + "integrity": "sha512-M3BmBhwJRZsSx38lZyhE53Csddgzl5R7xGJNk7CVddZD6CcmwMCH8J+7AprIrQKH7TonKxaCjcv27Qmf+sQ+oA==", + "license": "MIT", + "dependencies": { + "end-of-stream": "^1.4.1", + "inherits": "^2.0.3", + "readable-stream": "^3.1.1", + "stream-shift": "^1.0.2" + } + }, "node_modules/effect": { "version": "3.18.4", "resolved": "https://registry.npmjs.org/effect/-/effect-3.18.4.tgz", @@ -1548,6 +1593,15 @@ "node": ">=14" } }, + "node_modules/end-of-stream": { + "version": "1.4.5", + "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.5.tgz", + "integrity": "sha512-ooEGc6HP26xXq/N+GCGOT0JKCLDGrq2bQUZrQ7gyrJiZANJ/8YDTxTpQBXGMn+WbIQXNVpyWymm7KYVICQnyOg==", + "license": "MIT", + "dependencies": { + "once": "^1.4.0" + } + }, "node_modules/es-module-lexer": { "version": "1.7.0", "resolved": "https://registry.npmjs.org/es-module-lexer/-/es-module-lexer-1.7.0.tgz", @@ -1844,6 +1898,12 @@ "giget": "dist/cli.mjs" } }, + "node_modules/inherits": { + "version": "2.0.4", + "resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.4.tgz", + "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", + "license": "ISC" + }, "node_modules/ipaddr.js": { "version": "2.3.0", "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-2.3.0.tgz", @@ -2047,6 +2107,15 @@ "node": ">=14.0.0" } }, + "node_modules/once": { + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", + "integrity": "sha512-lNaJgI+2Q5URQBkccEKHTQOPaXdUxnZZElQTZY0MFUAuaEqe1E+Nyvgdz/aIyNi6Z9MzO5dv1H8n58/GELp3+w==", + "license": "ISC", + "dependencies": { + "wrappy": "1" + } + }, "node_modules/pathe": { "version": "2.0.3", "resolved": "https://registry.npmjs.org/pathe/-/pathe-2.0.3.tgz", @@ -2245,6 +2314,20 @@ "destr": "^2.0.3" } }, + "node_modules/readable-stream": { + "version": "3.6.2", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", + "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", + "license": "MIT", + "dependencies": { + "inherits": "^2.0.3", + "string_decoder": "^1.1.1", + "util-deprecate": "^1.0.1" + }, + "engines": { + "node": ">= 6" + } + }, "node_modules/readdirp": { "version": "4.1.2", "resolved": "https://registry.npmjs.org/readdirp/-/readdirp-4.1.2.tgz", @@ -2357,6 +2440,26 @@ "fsevents": "~2.3.2" } }, + "node_modules/safe-buffer": { + "version": "5.2.1", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz", + "integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "MIT" + }, "node_modules/safe-regex2": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/safe-regex2/-/safe-regex2-5.0.0.tgz", @@ -2468,6 +2571,21 @@ "dev": true, "license": "MIT" }, + "node_modules/stream-shift": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.3.tgz", + "integrity": "sha512-76ORR0DO1o1hlKwTbi/DM3EXWGf3ZJYO8cXX5RJwnul2DEg2oyoZyjLNoQM8WsvZiFKCRfC1O0J7iCvie3RZmQ==", + "license": "MIT" + }, + "node_modules/string_decoder": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz", + "integrity": "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==", + "license": "MIT", + "dependencies": { + "safe-buffer": "~5.2.0" + } + }, "node_modules/strip-literal": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/strip-literal/-/strip-literal-3.1.0.tgz", @@ -2616,6 +2734,12 @@ "dev": true, "license": "MIT" }, + "node_modules/util-deprecate": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", + "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==", + "license": "MIT" + }, "node_modules/vite": { "version": "7.3.1", "resolved": "https://registry.npmjs.org/vite/-/vite-7.3.1.tgz", @@ -2810,6 +2934,33 @@ "engines": { "node": ">=8" } + }, + "node_modules/wrappy": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", + "integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==", + "license": "ISC" + }, + "node_modules/ws": { + "version": "8.19.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.19.0.tgz", + "integrity": "sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } } } } diff --git a/package.json b/package.json index 012d859..855e683 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,7 @@ "@fastify/cookie": "^11.0.1", "@fastify/cors": "^10.0.1", "@fastify/rate-limit": "^10.2.1", + "@fastify/websocket": "^11.2.0", "@prisma/client": "^6.4.1", "fastify": "^5.2.1", "fastify-plugin": "^5.0.1", @@ -27,9 +28,10 @@ }, "devDependencies": { "@types/node": "^22.13.4", + "@types/ws": "^8.18.1", "prisma": "^6.4.1", "tsx": "^4.19.3", "typescript": "^5.7.3", "vitest": "^3.0.5" } -} \ No newline at end of file +} diff --git a/src/app.ts b/src/app.ts index 86c5660..fe56e37 100644 --- a/src/app.ts +++ b/src/app.ts @@ -1,6 +1,7 @@ import Fastify from 'fastify'; import cors from '@fastify/cors'; import rateLimit from '@fastify/rate-limit'; +import websocket from '@fastify/websocket'; import prismaPlugin from './plugins/prisma.js'; import errorHandlerPlugin from './plugins/error-handler.js'; import authPlugin from './plugins/auth.js'; @@ -12,6 +13,8 @@ import youtubeRoutes from './routes/providers/youtube.js'; import twitchRoutes from './routes/providers/twitch.js'; import planRoutes from './routes/streams/plans.js'; import lifecycleRoutes from './routes/streams/lifecycle.js'; +import { createChatRoutes } from './routes/chat/websocket.js'; +import { ChatManager } from './services/chat-manager.service.js'; import { config } from './config.js'; export async function buildApp() { @@ -31,6 +34,10 @@ export async function buildApp() { await app.register(errorHandlerPlugin); await app.register(prismaPlugin); await app.register(authPlugin); + await app.register(websocket); + + // Chat manager (instantiated after prisma is available) + const chatManager = new ChatManager(app.prisma, app.log); // Routes await app.register(healthRoutes); @@ -41,6 +48,7 @@ export async function buildApp() { await app.register(twitchRoutes); await app.register(planRoutes); await app.register(lifecycleRoutes); + await app.register(createChatRoutes(chatManager)); return app; } diff --git a/src/routes/chat/websocket.ts b/src/routes/chat/websocket.ts new file mode 100644 index 0000000..7ded865 --- /dev/null +++ b/src/routes/chat/websocket.ts @@ -0,0 +1,88 @@ +import { FastifyPluginAsync } from 'fastify'; +import { verifyAccessToken } from '../../plugins/auth.js'; +import { ChatManager } from '../../services/chat-manager.service.js'; + +interface ChatWsMessage { + type: 'subscribe' | 'unsubscribe' | 'send_message'; + planId?: string; + destinationId?: string; + text?: string; +} + +export function createChatRoutes(chatManager: ChatManager): FastifyPluginAsync { + const chatRoutes: FastifyPluginAsync = async (fastify) => { + fastify.get('/chat/ws', { websocket: true }, async (socket, request) => { + // Authenticate via query param + const url = new URL(request.url, `http://${request.headers.host}`); + const token = url.searchParams.get('token'); + + if (!token) { + socket.send(JSON.stringify({ type: 'error', error: 'Missing token' })); + socket.close(); + return; + } + + let userId: string; + try { + userId = await verifyAccessToken(token); + } catch { + socket.send(JSON.stringify({ type: 'error', error: 'Invalid or expired token' })); + socket.close(); + return; + } + + request.log.info({ userId }, 'Chat WebSocket connected'); + + socket.on('message', async (data: Buffer) => { + try { + const raw = data.toString(); + request.log.info({ userId, raw }, 'Chat WS message received'); + const msg: ChatWsMessage = JSON.parse(raw); + + switch (msg.type) { + case 'subscribe': + if (msg.planId) { + await chatManager.startChat(msg.planId, userId, socket); + } + break; + + case 'unsubscribe': + if (msg.planId) { + await chatManager.stopChat(msg.planId, userId); + } + break; + + case 'send_message': + if (msg.planId && msg.destinationId && msg.text) { + await chatManager.handleSendMessage( + msg.planId, + userId, + msg.destinationId, + msg.text, + ); + } + break; + + default: + socket.send(JSON.stringify({ type: 'error', error: `Unknown message type: ${msg.type}` })); + } + } catch (err) { + request.log.error({ err }, 'Chat WebSocket message handling error'); + socket.send(JSON.stringify({ type: 'error', error: 'Internal error' })); + } + }); + + socket.on('close', () => { + request.log.info({ userId }, 'Chat WebSocket disconnected'); + chatManager.stopAllForSocket(socket); + }); + + socket.on('error', (err: Error) => { + request.log.error({ err, userId }, 'Chat WebSocket error'); + chatManager.stopAllForSocket(socket); + }); + }); + }; + + return chatRoutes; +} diff --git a/src/services/chat-manager.service.ts b/src/services/chat-manager.service.ts new file mode 100644 index 0000000..4741a99 --- /dev/null +++ b/src/services/chat-manager.service.ts @@ -0,0 +1,433 @@ +import type { PrismaClient } from '@prisma/client'; +import type { FastifyBaseLogger } from 'fastify'; +import type { WebSocket } from 'ws'; +import { decrypt, encrypt } from './crypto.service.js'; +import { refreshYouTubeToken } from './youtube.service.js'; +import { refreshTwitchToken } from './twitch.service.js'; +import { + getYouTubeLiveChatId, + pollYouTubeChatMessages, + sendYouTubeChatMessage, +} from './youtube-chat.service.js'; +import { TwitchChatClient } from './twitch-chat.service.js'; + +interface ChatSession { + planId: string; + userId: string; + socket: WebSocket; + youtubePollers: Map; liveChatId: string; pageToken: string }>; + twitchClients: Map; +} + +async function getDecryptedToken( + prisma: PrismaClient, + userId: string, + linkedAccountId: string, +): Promise<{ account: any; accessToken: string }> { + const account = await (prisma as any).linkedAccount.findFirst({ + where: { id: linkedAccountId, userId }, + }); + if (!account) throw new Error(`Linked account ${linkedAccountId} not found`); + + // Lazy refresh if token expires within 60s + if (account.tokenExpiresAt < new Date(Date.now() + 60 * 1000)) { + const refreshToken = decrypt(account.refreshTokenEnc, account.refreshTokenIv); + let newAccess: string; + let newRefresh: string | undefined; + let expiresIn: number; + + if (account.serviceId === 'YOUTUBE') { + const result = await refreshYouTubeToken(refreshToken); + newAccess = result.accessToken; + expiresIn = result.expiresIn; + } else { + const result = await refreshTwitchToken(refreshToken); + newAccess = result.accessToken; + newRefresh = result.refreshToken; + expiresIn = result.expiresIn; + } + + const accessEnc = encrypt(newAccess); + const updateData: any = { + accessTokenEnc: accessEnc.ciphertext, + accessTokenIv: accessEnc.iv, + tokenExpiresAt: new Date(Date.now() + expiresIn * 1000), + }; + if (newRefresh) { + const refreshEnc = encrypt(newRefresh); + updateData.refreshTokenEnc = refreshEnc.ciphertext; + updateData.refreshTokenIv = refreshEnc.iv; + } + + await (prisma as any).linkedAccount.update({ + where: { id: account.id }, + data: updateData, + }); + + return { account, accessToken: newAccess }; + } + + return { + account, + accessToken: decrypt(account.accessTokenEnc, account.accessTokenIv), + }; +} + +export class ChatManager { + private sessions = new Map(); // key: `${userId}:${planId}` + private prisma: PrismaClient; + private logger: FastifyBaseLogger; + + constructor(prisma: PrismaClient, logger: FastifyBaseLogger) { + this.prisma = prisma; + this.logger = logger; + } + + async startChat(planId: string, userId: string, socket: WebSocket): Promise { + const sessionKey = `${userId}:${planId}`; + this.logger.info({ planId, userId, sessionKey }, 'startChat called'); + + // Stop existing session for this plan + await this.stopChat(planId, userId); + + const plan = await (this.prisma as any).streamPlan.findFirst({ + where: { id: planId, userId }, + include: { destinations: true }, + }); + if (!plan) { + this.logger.warn({ planId, userId }, 'startChat: plan not found'); + this.sendToSocket(socket, { type: 'chat_status', planId, error: 'Plan not found' }); + return; + } + this.logger.info({ planId, destCount: plan.destinations.length, destServices: plan.destinations.map((d: any) => d.serviceId) }, 'startChat: plan loaded'); + + const session: ChatSession = { + planId, + userId, + socket, + youtubePollers: new Map(), + twitchClients: new Map(), + }; + + this.sessions.set(sessionKey, session); + + for (const dest of plan.destinations) { + if (dest.serviceId === 'YOUTUBE' && dest.linkedAccountId) { + await this.startYouTubeChat(session, dest); + } else if (dest.serviceId === 'TWITCH' && dest.linkedAccountId) { + await this.startTwitchChat(session, dest); + } + } + } + + async handleSendMessage( + planId: string, + userId: string, + destinationId: string, + text: string, + ): Promise { + const sessionKey = `${userId}:${planId}`; + const session = this.sessions.get(sessionKey); + if (!session) return; + + // Check if it's a YouTube destination + const ytPoller = session.youtubePollers.get(destinationId); + if (ytPoller) { + try { + const { accessToken } = await getDecryptedToken(this.prisma, userId, destinationId); + await sendYouTubeChatMessage(accessToken, ytPoller.liveChatId, text); + } catch (err) { + this.logger.error({ err, destinationId }, 'Failed to send YouTube chat message'); + } + return; + } + + // Check if it's a Twitch destination + const twitchClient = session.twitchClients.get(destinationId); + if (twitchClient) { + twitchClient.sendMessage(text); + } + } + + async stopChat(planId: string, userId: string): Promise { + const sessionKey = `${userId}:${planId}`; + const session = this.sessions.get(sessionKey); + if (!session) return; + + // Stop YouTube pollers + for (const [, poller] of session.youtubePollers) { + clearTimeout(poller.timer); + } + session.youtubePollers.clear(); + + // Disconnect Twitch clients + for (const [, client] of session.twitchClients) { + client.disconnect(); + } + session.twitchClients.clear(); + + this.sessions.delete(sessionKey); + } + + stopAllForSocket(socket: WebSocket): void { + for (const [key, session] of this.sessions) { + if (session.socket === socket) { + // Stop YouTube pollers + for (const [, poller] of session.youtubePollers) { + clearTimeout(poller.timer); + } + session.youtubePollers.clear(); + + // Disconnect Twitch clients + for (const [, client] of session.twitchClients) { + client.disconnect(); + } + session.twitchClients.clear(); + + this.sessions.delete(key); + } + } + } + + private async startYouTubeChat(session: ChatSession, dest: any): Promise { + try { + const { accessToken } = await getDecryptedToken( + this.prisma, + session.userId, + dest.linkedAccountId, + ); + + // Need broadcastId to get liveChatId + if (!dest.broadcastId) { + this.sendToSocket(session.socket, { + type: 'chat_status', + planId: session.planId, + service: 'YOUTUBE', + destinationId: dest.linkedAccountId, + connected: false, + error: 'No broadcast ID', + }); + return; + } + + // Retry getting liveChatId — YouTube may still be transitioning to live + let liveChatId: string | null = null; + const MAX_RETRIES = 12; // ~60s total (12 * 5s) + for (let attempt = 0; attempt < MAX_RETRIES; attempt++) { + // Re-check session is still alive + if (!this.sessions.has(`${session.userId}:${session.planId}`)) return; + + const { accessToken: freshToken } = await getDecryptedToken( + this.prisma, + session.userId, + dest.linkedAccountId, + ); + liveChatId = await getYouTubeLiveChatId(freshToken, dest.broadcastId); + if (liveChatId) break; + + this.logger.info( + { planId: session.planId, broadcastId: dest.broadcastId, attempt: attempt + 1 }, + 'YouTube liveChatId not yet available, retrying...', + ); + + if (attempt === 0) { + this.sendToSocket(session.socket, { + type: 'chat_status', + planId: session.planId, + service: 'YOUTUBE', + destinationId: dest.linkedAccountId, + connected: false, + error: 'Waiting for broadcast to go live...', + }); + } + + await new Promise(resolve => setTimeout(resolve, 5000)); + } + + if (!liveChatId) { + this.sendToSocket(session.socket, { + type: 'chat_status', + planId: session.planId, + service: 'YOUTUBE', + destinationId: dest.linkedAccountId, + connected: false, + error: 'No active live chat after retries', + }); + return; + } + + this.sendToSocket(session.socket, { + type: 'chat_status', + planId: session.planId, + service: 'YOUTUBE', + destinationId: dest.linkedAccountId, + connected: true, + }); + + // Start polling loop + const pollerState = { timer: setTimeout(() => {}, 0), liveChatId, pageToken: '' }; + session.youtubePollers.set(dest.linkedAccountId, pollerState); + + const poll = async () => { + // Verify session is still alive + if (!session.youtubePollers.has(dest.linkedAccountId)) { + this.logger.info({ planId: session.planId }, 'YouTube poll skipped: poller removed'); + return; + } + + try { + this.logger.info({ planId: session.planId, liveChatId }, 'YouTube poll executing'); + const { accessToken: token } = await getDecryptedToken( + this.prisma, + session.userId, + dest.linkedAccountId, + ); + const result = await pollYouTubeChatMessages( + token, + liveChatId, + pollerState.pageToken || undefined, + ); + + this.logger.info({ planId: session.planId, messageCount: result.messages.length, nextInterval: result.pollingIntervalMillis }, 'YouTube poll result'); + pollerState.pageToken = result.nextPageToken; + + for (const msg of result.messages) { + this.sendToSocket(session.socket, { + type: 'chat_message', + planId: session.planId, + service: 'YOUTUBE', + destinationId: dest.linkedAccountId, + message: { + id: msg.id, + authorName: msg.authorName, + authorImageUrl: msg.authorImageUrl, + text: msg.text, + timestamp: new Date(msg.publishedAt).getTime(), + isModerator: msg.isModerator, + isBroadcaster: msg.isChatOwner, + color: null, + }, + }); + } + + // Schedule next poll respecting API interval + const interval = Math.max(result.pollingIntervalMillis, 5000); + pollerState.timer = setTimeout(poll, interval); + } catch (err) { + this.logger.error({ err, planId: session.planId }, 'YouTube chat poll error'); + this.sendToSocket(session.socket, { + type: 'chat_status', + planId: session.planId, + service: 'YOUTUBE', + destinationId: dest.linkedAccountId, + connected: false, + error: 'Poll failed', + }); + // Retry after 10s + pollerState.timer = setTimeout(poll, 10_000); + } + }; + + // First poll immediately + clearTimeout(pollerState.timer); + pollerState.timer = setTimeout(poll, 0); + } catch (err) { + this.logger.error({ err, planId: session.planId }, 'Failed to start YouTube chat'); + this.sendToSocket(session.socket, { + type: 'chat_status', + planId: session.planId, + service: 'YOUTUBE', + destinationId: dest.linkedAccountId, + connected: false, + error: 'Failed to initialize', + }); + } + } + + private async startTwitchChat(session: ChatSession, dest: any): Promise { + try { + const { account, accessToken } = await getDecryptedToken( + this.prisma, + session.userId, + dest.linkedAccountId, + ); + + const channel = account.displayName; + const client = new TwitchChatClient(channel, accessToken, account.displayName); + + client.on('connected', () => { + this.sendToSocket(session.socket, { + type: 'chat_status', + planId: session.planId, + service: 'TWITCH', + destinationId: dest.linkedAccountId, + connected: true, + }); + }); + + client.on('message', (msg) => { + this.sendToSocket(session.socket, { + type: 'chat_message', + planId: session.planId, + service: 'TWITCH', + destinationId: dest.linkedAccountId, + message: { + id: msg.id, + authorName: msg.authorName, + authorImageUrl: null, + text: msg.text, + timestamp: msg.timestamp, + isModerator: msg.isModerator, + isBroadcaster: msg.isBroadcaster, + color: msg.color || null, + }, + }); + }); + + client.on('disconnected', () => { + this.sendToSocket(session.socket, { + type: 'chat_status', + planId: session.planId, + service: 'TWITCH', + destinationId: dest.linkedAccountId, + connected: false, + }); + }); + + client.on('error', (err: Error) => { + this.logger.error({ err, planId: session.planId }, 'Twitch chat error'); + this.sendToSocket(session.socket, { + type: 'chat_status', + planId: session.planId, + service: 'TWITCH', + destinationId: dest.linkedAccountId, + connected: false, + error: err.message, + }); + }); + + session.twitchClients.set(dest.linkedAccountId, client); + client.connect(); + } catch (err) { + this.logger.error({ err, planId: session.planId }, 'Failed to start Twitch chat'); + this.sendToSocket(session.socket, { + type: 'chat_status', + planId: session.planId, + service: 'TWITCH', + destinationId: dest.linkedAccountId, + connected: false, + error: 'Failed to initialize', + }); + } + } + + private sendToSocket(socket: WebSocket, data: Record): void { + try { + if (socket.readyState === 1) { // WebSocket.OPEN + socket.send(JSON.stringify(data)); + } + } catch (err) { + this.logger.error({ err }, 'Failed to send to WebSocket'); + } + } +} diff --git a/src/services/twitch-chat.service.ts b/src/services/twitch-chat.service.ts new file mode 100644 index 0000000..c5f4e86 --- /dev/null +++ b/src/services/twitch-chat.service.ts @@ -0,0 +1,172 @@ +import { EventEmitter } from 'events'; +import { WebSocket } from 'ws'; + +export interface TwitchChatMessage { + id: string; + authorName: string; + text: string; + timestamp: number; + isModerator: boolean; + isBroadcaster: boolean; + color: string; + badges: string; +} + +export class TwitchChatClient extends EventEmitter { + private ws: WebSocket | null = null; + private channel: string; + private token: string; + private nick: string; + private connected = false; + private pingTimer: ReturnType | null = null; + + constructor(channel: string, token: string, nick: string) { + super(); + this.channel = channel.toLowerCase(); + this.token = token; + this.nick = nick.toLowerCase(); + } + + connect(): void { + if (this.ws) return; + + this.ws = new WebSocket('wss://irc-ws.chat.twitch.tv:443'); + + this.ws.on('open', () => { + if (!this.ws) return; + // Request capabilities + this.ws.send('CAP REQ :twitch.tv/tags twitch.tv/commands'); + // Authenticate + this.ws.send(`PASS oauth:${this.token}`); + this.ws.send(`NICK ${this.nick}`); + // Join channel + this.ws.send(`JOIN #${this.channel}`); + }); + + this.ws.on('message', (data: Buffer) => { + const raw = data.toString(); + const lines = raw.split('\r\n').filter(Boolean); + for (const line of lines) { + this.handleLine(line); + } + }); + + this.ws.on('close', () => { + this.connected = false; + this.cleanup(); + this.emit('disconnected'); + }); + + this.ws.on('error', (err: Error) => { + this.emit('error', err); + }); + + // Keep-alive: send PING every 4 minutes + this.pingTimer = setInterval(() => { + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this.ws.send('PING :tmi.twitch.tv'); + } + }, 240_000); + } + + sendMessage(text: string): void { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return; + this.ws.send(`PRIVMSG #${this.channel} :${text}`); + } + + disconnect(): void { + this.cleanup(); + if (this.ws) { + this.ws.close(); + this.ws = null; + } + } + + private cleanup(): void { + if (this.pingTimer) { + clearInterval(this.pingTimer); + this.pingTimer = null; + } + } + + private handleLine(line: string): void { + // Handle PING + if (line.startsWith('PING')) { + this.ws?.send('PONG :tmi.twitch.tv'); + return; + } + + // Handle successful join / connection + if (line.includes('366') || line.includes(':End of /NAMES list')) { + if (!this.connected) { + this.connected = true; + this.emit('connected'); + } + return; + } + + // Handle auth failure + if (line.includes('NOTICE') && line.includes('Login authentication failed')) { + this.emit('error', new Error('missing_scopes')); + this.disconnect(); + return; + } + + // Parse PRIVMSG with IRCv3 tags + if (line.includes('PRIVMSG')) { + const msg = this.parsePrivmsg(line); + if (msg) { + this.emit('message', msg); + } + } + } + + private parsePrivmsg(line: string): TwitchChatMessage | null { + // Format: @tags :user!user@user.tmi.twitch.tv PRIVMSG #channel :message + let tags: Record = {}; + + let rest = line; + if (rest.startsWith('@')) { + const spaceIdx = rest.indexOf(' '); + if (spaceIdx === -1) return null; + const tagStr = rest.substring(1, spaceIdx); + rest = rest.substring(spaceIdx + 1); + + for (const part of tagStr.split(';')) { + const eq = part.indexOf('='); + if (eq !== -1) { + tags[part.substring(0, eq)] = part.substring(eq + 1); + } + } + } + + // Find message text after "PRIVMSG #channel :" + const privmsgIdx = rest.indexOf('PRIVMSG'); + if (privmsgIdx === -1) return null; + const afterPrivmsg = rest.substring(privmsgIdx); + const colonIdx = afterPrivmsg.indexOf(' :'); + if (colonIdx === -1) return null; + const text = afterPrivmsg.substring(colonIdx + 2); + + const displayName = tags['display-name'] || this.extractNick(rest); + const timestamp = tags['tmi-sent-ts'] ? parseInt(tags['tmi-sent-ts'], 10) : Date.now(); + const badges = tags['badges'] || ''; + + return { + id: tags['id'] || `twitch-${Date.now()}-${Math.random().toString(36).substring(2, 8)}`, + authorName: displayName, + text, + timestamp, + isModerator: badges.includes('moderator') || tags['mod'] === '1', + isBroadcaster: badges.includes('broadcaster'), + color: tags['color'] || '', + badges, + }; + } + + private extractNick(line: string): string { + // :nick!nick@nick.tmi.twitch.tv PRIVMSG ... + const match = line.match(/^:(\w+)!/); + return match ? match[1] : 'unknown'; + } +} diff --git a/src/services/twitch.service.ts b/src/services/twitch.service.ts index 2a1a5f5..0079b99 100644 --- a/src/services/twitch.service.ts +++ b/src/services/twitch.service.ts @@ -18,6 +18,8 @@ const SCOPES = [ 'channel:manage:broadcast', 'channel:read:stream_key', 'user:read:email', + 'chat:read', + 'chat:edit', ].join(' '); export function getTwitchAuthUrl(state: string): string { diff --git a/src/services/youtube-chat.service.ts b/src/services/youtube-chat.service.ts new file mode 100644 index 0000000..306bf45 --- /dev/null +++ b/src/services/youtube-chat.service.ts @@ -0,0 +1,108 @@ +export interface YouTubeChatMessage { + id: string; + authorName: string; + authorImageUrl: string; + text: string; + publishedAt: string; + isModerator: boolean; + isChatOwner: boolean; +} + +export interface YouTubeChatPollResult { + messages: YouTubeChatMessage[]; + nextPageToken: string; + pollingIntervalMillis: number; +} + +export async function getYouTubeLiveChatId( + accessToken: string, + broadcastId: string, +): Promise { + const res = await fetch( + `https://www.googleapis.com/youtube/v3/liveBroadcasts?part=snippet,status&id=${broadcastId}`, + { headers: { Authorization: `Bearer ${accessToken}` } }, + ); + if (!res.ok) { + const body = await res.text(); + console.log(`[YT-Chat] liveBroadcasts API error: ${res.status} ${body}`); + return null; + } + const data = (await res.json()) as any; + const items = data.items ?? []; + if (items.length === 0) { + console.log(`[YT-Chat] No broadcast found for id=${broadcastId}`); + return null; + } + const broadcast = items[0]; + const lifeCycleStatus = broadcast.status?.lifeCycleStatus; + const liveChatId = broadcast.snippet?.liveChatId ?? null; + console.log(`[YT-Chat] broadcast=${broadcastId} lifeCycleStatus=${lifeCycleStatus} liveChatId=${liveChatId}`); + return liveChatId; +} + +export async function pollYouTubeChatMessages( + accessToken: string, + liveChatId: string, + pageToken?: string, +): Promise { + const params = new URLSearchParams({ + liveChatId, + part: 'snippet,authorDetails', + maxResults: '200', + }); + if (pageToken) params.set('pageToken', pageToken); + + const res = await fetch( + `https://www.googleapis.com/youtube/v3/liveChat/messages?${params}`, + { headers: { Authorization: `Bearer ${accessToken}` } }, + ); + if (!res.ok) { + const body = await res.text(); + throw new Error(`YouTube chat poll failed: ${res.status} ${body}`); + } + const data = (await res.json()) as any; + + const messages: YouTubeChatMessage[] = (data.items ?? []).map((item: any) => ({ + id: item.id, + authorName: item.authorDetails?.displayName ?? 'Unknown', + authorImageUrl: item.authorDetails?.profileImageUrl ?? '', + text: item.snippet?.displayMessage ?? '', + publishedAt: item.snippet?.publishedAt ?? new Date().toISOString(), + isModerator: item.authorDetails?.isChatModerator ?? false, + isChatOwner: item.authorDetails?.isChatOwner ?? false, + })); + + return { + messages, + nextPageToken: data.nextPageToken ?? '', + pollingIntervalMillis: data.pollingIntervalMillis ?? 5000, + }; +} + +export async function sendYouTubeChatMessage( + accessToken: string, + liveChatId: string, + messageText: string, +): Promise { + const res = await fetch( + 'https://www.googleapis.com/youtube/v3/liveChat/messages?part=snippet', + { + method: 'POST', + headers: { + Authorization: `Bearer ${accessToken}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + snippet: { + liveChatId, + type: 'textMessageEvent', + textMessageDetails: { messageText }, + }, + }), + }, + ); + if (!res.ok) { + const body = await res.text(); + throw new Error(`YouTube send chat message failed: ${res.status} ${body}`); + } +}