From 6931670a1f961ddb62e5ca0ca3031732da80d974 Mon Sep 17 00:00:00 2001 From: omigamedev Date: Mon, 2 Mar 2026 09:40:15 +0100 Subject: [PATCH] Resilient prepare, Twitch chat echo, parallel chat startup - Prepare endpoint wraps each destination in try/catch; partial success if at least one destination is ready (e.g., Twitch works when YouTube is rate-limited) - Echo sent Twitch messages back to app WebSocket (IRC doesn't echo your own PRIVMSGs) - Start YouTube and Twitch chat clients in parallel via Promise.allSettled - Fix Twitch auth failure detection (Login unsuccessful + Login authentication failed) - Add Twitch IRC debug logging --- src/routes/streams/lifecycle.ts | 113 +++++++++++++++------------ src/services/chat-manager.service.ts | 40 +++++++++- src/services/twitch-chat.service.ts | 10 ++- 3 files changed, 107 insertions(+), 56 deletions(-) diff --git a/src/routes/streams/lifecycle.ts b/src/routes/streams/lifecycle.ts index 7d754a8..edf42bf 100644 --- a/src/routes/streams/lifecycle.ts +++ b/src/routes/streams/lifecycle.ts @@ -114,6 +114,7 @@ const lifecycleRoutes: FastifyPluginAsync = async (fastify) => { } const prepared: PreparedDestination[] = []; + const errors: string[] = []; for (const dest of plan.destinations) { // CUSTOM destinations are already READY with rtmpUrl/streamKey set at creation @@ -134,71 +135,81 @@ const lifecycleRoutes: FastifyPluginAsync = async (fastify) => { continue; } - const { account, accessToken } = await getDecryptedTokenByAccountId( - fastify.prisma, - request.userId, - dest.linkedAccountId, - ); - - if (dest.serviceId === 'YOUTUBE') { - const broadcast = await createYouTubeBroadcast( - accessToken, - dest.title, - dest.description, - dest.privacyStatus, + try { + const { account, accessToken } = await getDecryptedTokenByAccountId( + fastify.prisma, + request.userId, + dest.linkedAccountId, ); - await fastify.prisma.streamDestination.update({ - where: { id: dest.id }, - data: { + if (dest.serviceId === 'YOUTUBE') { + const broadcast = await createYouTubeBroadcast( + accessToken, + dest.title, + dest.description, + dest.privacyStatus, + ); + + await fastify.prisma.streamDestination.update({ + where: { id: dest.id }, + data: { + rtmpUrl: broadcast.rtmpUrl, + streamKey: broadcast.streamKey, + broadcastId: broadcast.id, + status: 'READY', + }, + }); + + prepared.push({ + id: dest.id, + serviceId: 'YOUTUBE', rtmpUrl: broadcast.rtmpUrl, streamKey: broadcast.streamKey, broadcastId: broadcast.id, - status: 'READY', - }, - }); + }); + } else if (dest.serviceId === 'TWITCH') { + // Update channel info + const tags = dest.tags ? dest.tags.split(',').map((t) => t.trim()).filter(Boolean) : []; + await updateTwitchChannel( + accessToken, + account.accountId, + dest.title, + dest.gameId, + tags, + ); - prepared.push({ - id: dest.id, - serviceId: 'YOUTUBE', - rtmpUrl: broadcast.rtmpUrl, - streamKey: broadcast.streamKey, - broadcastId: broadcast.id, - }); - } else if (dest.serviceId === 'TWITCH') { - // Update channel info - const tags = dest.tags ? dest.tags.split(',').map((t) => t.trim()).filter(Boolean) : []; - await updateTwitchChannel( - accessToken, - account.accountId, - dest.title, - dest.gameId, - tags, - ); + // Get stream key + const streamKey = await getTwitchStreamKey(accessToken, account.accountId); - // Get stream key - const streamKey = await getTwitchStreamKey(accessToken, account.accountId); + await fastify.prisma.streamDestination.update({ + where: { id: dest.id }, + data: { + rtmpUrl: TWITCH_RTMP_URL, + streamKey, + broadcastId: account.accountId, + status: 'READY', + }, + }); - await fastify.prisma.streamDestination.update({ - where: { id: dest.id }, - data: { + prepared.push({ + id: dest.id, + serviceId: 'TWITCH', rtmpUrl: TWITCH_RTMP_URL, streamKey, broadcastId: account.accountId, - status: 'READY', - }, - }); - - prepared.push({ - id: dest.id, - serviceId: 'TWITCH', - rtmpUrl: TWITCH_RTMP_URL, - streamKey, - broadcastId: account.accountId, - }); + }); + } + } catch (err: any) { + request.log.error({ err, destId: dest.id, service: dest.serviceId }, `Failed to prepare ${dest.serviceId} destination`); + errors.push(`${dest.serviceId}: ${err.message}`); } } + // At least one destination must be ready + if (prepared.length === 0) { + throw new AppError(500, `All destinations failed to prepare: ${errors.join('; ')}`); + } + await fastify.prisma.streamPlan.update({ where: { id: plan.id }, data: { status: 'READY' }, diff --git a/src/services/chat-manager.service.ts b/src/services/chat-manager.service.ts index 4741a99..264f071 100644 --- a/src/services/chat-manager.service.ts +++ b/src/services/chat-manager.service.ts @@ -111,13 +111,15 @@ export class ChatManager { this.sessions.set(sessionKey, session); + const chatPromises: Promise[] = []; for (const dest of plan.destinations) { if (dest.serviceId === 'YOUTUBE' && dest.linkedAccountId) { - await this.startYouTubeChat(session, dest); + chatPromises.push(this.startYouTubeChat(session, dest)); } else if (dest.serviceId === 'TWITCH' && dest.linkedAccountId) { - await this.startTwitchChat(session, dest); + chatPromises.push(this.startTwitchChat(session, dest)); } } + await Promise.allSettled(chatPromises); } async handleSendMessage( @@ -146,9 +148,42 @@ export class ChatManager { const twitchClient = session.twitchClients.get(destinationId); if (twitchClient) { twitchClient.sendMessage(text); + // Echo sent message back to app (Twitch IRC doesn't echo your own PRIVMSGs) + try { + const { account } = await getDecryptedToken(this.prisma, userId, destinationId); + this.sendEcho(session, 'TWITCH', destinationId, account.displayName, text); + } catch { + // Still echo with fallback name + this.sendEcho(session, 'TWITCH', destinationId, 'You', text); + } } } + private sendEcho( + session: ChatSession, + service: string, + destinationId: string, + authorName: string, + text: string, + ): void { + this.sendToSocket(session.socket, { + type: 'chat_message', + planId: session.planId, + service, + destinationId, + message: { + id: `echo-${Date.now()}-${Math.random().toString(36).substring(2, 8)}`, + authorName, + authorImageUrl: null, + text, + timestamp: Date.now(), + isModerator: false, + isBroadcaster: true, + color: null, + }, + }); + } + async stopChat(planId: string, userId: string): Promise { const sessionKey = `${userId}:${planId}`; const session = this.sessions.get(sessionKey); @@ -345,6 +380,7 @@ export class ChatManager { } private async startTwitchChat(session: ChatSession, dest: any): Promise { + this.logger.info({ planId: session.planId, destId: dest.linkedAccountId }, 'startTwitchChat called'); try { const { account, accessToken } = await getDecryptedToken( this.prisma, diff --git a/src/services/twitch-chat.service.ts b/src/services/twitch-chat.service.ts index c5f4e86..e574193 100644 --- a/src/services/twitch-chat.service.ts +++ b/src/services/twitch-chat.service.ts @@ -33,6 +33,7 @@ export class TwitchChatClient extends EventEmitter { this.ws = new WebSocket('wss://irc-ws.chat.twitch.tv:443'); this.ws.on('open', () => { + console.log(`[Twitch-IRC] WebSocket open for #${this.channel}`); if (!this.ws) return; // Request capabilities this.ws.send('CAP REQ :twitch.tv/tags twitch.tv/commands'); @@ -45,19 +46,22 @@ export class TwitchChatClient extends EventEmitter { this.ws.on('message', (data: Buffer) => { const raw = data.toString(); + console.log(`[Twitch-IRC] #${this.channel} << ${raw.trim().substring(0, 200)}`); const lines = raw.split('\r\n').filter(Boolean); for (const line of lines) { this.handleLine(line); } }); - this.ws.on('close', () => { + this.ws.on('close', (code: number, reason: Buffer) => { + console.log(`[Twitch-IRC] #${this.channel} closed: ${code} ${reason.toString()}`); this.connected = false; this.cleanup(); this.emit('disconnected'); }); this.ws.on('error', (err: Error) => { + console.log(`[Twitch-IRC] #${this.channel} error: ${err.message}`); this.emit('error', err); }); @@ -106,8 +110,8 @@ export class TwitchChatClient extends EventEmitter { } // Handle auth failure - if (line.includes('NOTICE') && line.includes('Login authentication failed')) { - this.emit('error', new Error('missing_scopes')); + if (line.includes('NOTICE') && (line.includes('Login unsuccessful') || line.includes('Login authentication failed'))) { + this.emit('error', new Error('Twitch login failed — token may be expired or missing chat scopes')); this.disconnect(); return; }