Resilient prepare, Twitch chat echo, parallel chat startup

- Prepare endpoint wraps each destination in try/catch; partial success
  if at least one destination is ready (e.g., Twitch works when YouTube
  is rate-limited)
- Echo sent Twitch messages back to app WebSocket (IRC doesn't echo
  your own PRIVMSGs)
- Start YouTube and Twitch chat clients in parallel via Promise.allSettled
- Fix Twitch auth failure detection (Login unsuccessful + Login
  authentication failed)
- Add Twitch IRC debug logging
This commit is contained in:
2026-03-02 09:40:15 +01:00
parent cc8ab2320b
commit 6931670a1f
3 changed files with 107 additions and 56 deletions

View File

@@ -114,6 +114,7 @@ const lifecycleRoutes: FastifyPluginAsync = async (fastify) => {
} }
const prepared: PreparedDestination[] = []; const prepared: PreparedDestination[] = [];
const errors: string[] = [];
for (const dest of plan.destinations) { for (const dest of plan.destinations) {
// CUSTOM destinations are already READY with rtmpUrl/streamKey set at creation // CUSTOM destinations are already READY with rtmpUrl/streamKey set at creation
@@ -134,71 +135,81 @@ const lifecycleRoutes: FastifyPluginAsync = async (fastify) => {
continue; continue;
} }
const { account, accessToken } = await getDecryptedTokenByAccountId( try {
fastify.prisma, const { account, accessToken } = await getDecryptedTokenByAccountId(
request.userId, fastify.prisma,
dest.linkedAccountId, request.userId,
); dest.linkedAccountId,
if (dest.serviceId === 'YOUTUBE') {
const broadcast = await createYouTubeBroadcast(
accessToken,
dest.title,
dest.description,
dest.privacyStatus,
); );
await fastify.prisma.streamDestination.update({ if (dest.serviceId === 'YOUTUBE') {
where: { id: dest.id }, const broadcast = await createYouTubeBroadcast(
data: { accessToken,
dest.title,
dest.description,
dest.privacyStatus,
);
await fastify.prisma.streamDestination.update({
where: { id: dest.id },
data: {
rtmpUrl: broadcast.rtmpUrl,
streamKey: broadcast.streamKey,
broadcastId: broadcast.id,
status: 'READY',
},
});
prepared.push({
id: dest.id,
serviceId: 'YOUTUBE',
rtmpUrl: broadcast.rtmpUrl, rtmpUrl: broadcast.rtmpUrl,
streamKey: broadcast.streamKey, streamKey: broadcast.streamKey,
broadcastId: broadcast.id, broadcastId: broadcast.id,
status: 'READY', });
}, } else if (dest.serviceId === 'TWITCH') {
}); // Update channel info
const tags = dest.tags ? dest.tags.split(',').map((t) => t.trim()).filter(Boolean) : [];
await updateTwitchChannel(
accessToken,
account.accountId,
dest.title,
dest.gameId,
tags,
);
prepared.push({ // Get stream key
id: dest.id, const streamKey = await getTwitchStreamKey(accessToken, account.accountId);
serviceId: 'YOUTUBE',
rtmpUrl: broadcast.rtmpUrl,
streamKey: broadcast.streamKey,
broadcastId: broadcast.id,
});
} else if (dest.serviceId === 'TWITCH') {
// Update channel info
const tags = dest.tags ? dest.tags.split(',').map((t) => t.trim()).filter(Boolean) : [];
await updateTwitchChannel(
accessToken,
account.accountId,
dest.title,
dest.gameId,
tags,
);
// Get stream key await fastify.prisma.streamDestination.update({
const streamKey = await getTwitchStreamKey(accessToken, account.accountId); where: { id: dest.id },
data: {
rtmpUrl: TWITCH_RTMP_URL,
streamKey,
broadcastId: account.accountId,
status: 'READY',
},
});
await fastify.prisma.streamDestination.update({ prepared.push({
where: { id: dest.id }, id: dest.id,
data: { serviceId: 'TWITCH',
rtmpUrl: TWITCH_RTMP_URL, rtmpUrl: TWITCH_RTMP_URL,
streamKey, streamKey,
broadcastId: account.accountId, broadcastId: account.accountId,
status: 'READY', });
}, }
}); } catch (err: any) {
request.log.error({ err, destId: dest.id, service: dest.serviceId }, `Failed to prepare ${dest.serviceId} destination`);
prepared.push({ errors.push(`${dest.serviceId}: ${err.message}`);
id: dest.id,
serviceId: 'TWITCH',
rtmpUrl: TWITCH_RTMP_URL,
streamKey,
broadcastId: account.accountId,
});
} }
} }
// At least one destination must be ready
if (prepared.length === 0) {
throw new AppError(500, `All destinations failed to prepare: ${errors.join('; ')}`);
}
await fastify.prisma.streamPlan.update({ await fastify.prisma.streamPlan.update({
where: { id: plan.id }, where: { id: plan.id },
data: { status: 'READY' }, data: { status: 'READY' },

View File

@@ -111,13 +111,15 @@ export class ChatManager {
this.sessions.set(sessionKey, session); this.sessions.set(sessionKey, session);
const chatPromises: Promise<void>[] = [];
for (const dest of plan.destinations) { for (const dest of plan.destinations) {
if (dest.serviceId === 'YOUTUBE' && dest.linkedAccountId) { if (dest.serviceId === 'YOUTUBE' && dest.linkedAccountId) {
await this.startYouTubeChat(session, dest); chatPromises.push(this.startYouTubeChat(session, dest));
} else if (dest.serviceId === 'TWITCH' && dest.linkedAccountId) { } else if (dest.serviceId === 'TWITCH' && dest.linkedAccountId) {
await this.startTwitchChat(session, dest); chatPromises.push(this.startTwitchChat(session, dest));
} }
} }
await Promise.allSettled(chatPromises);
} }
async handleSendMessage( async handleSendMessage(
@@ -146,9 +148,42 @@ export class ChatManager {
const twitchClient = session.twitchClients.get(destinationId); const twitchClient = session.twitchClients.get(destinationId);
if (twitchClient) { if (twitchClient) {
twitchClient.sendMessage(text); twitchClient.sendMessage(text);
// Echo sent message back to app (Twitch IRC doesn't echo your own PRIVMSGs)
try {
const { account } = await getDecryptedToken(this.prisma, userId, destinationId);
this.sendEcho(session, 'TWITCH', destinationId, account.displayName, text);
} catch {
// Still echo with fallback name
this.sendEcho(session, 'TWITCH', destinationId, 'You', text);
}
} }
} }
private sendEcho(
session: ChatSession,
service: string,
destinationId: string,
authorName: string,
text: string,
): void {
this.sendToSocket(session.socket, {
type: 'chat_message',
planId: session.planId,
service,
destinationId,
message: {
id: `echo-${Date.now()}-${Math.random().toString(36).substring(2, 8)}`,
authorName,
authorImageUrl: null,
text,
timestamp: Date.now(),
isModerator: false,
isBroadcaster: true,
color: null,
},
});
}
async stopChat(planId: string, userId: string): Promise<void> { async stopChat(planId: string, userId: string): Promise<void> {
const sessionKey = `${userId}:${planId}`; const sessionKey = `${userId}:${planId}`;
const session = this.sessions.get(sessionKey); const session = this.sessions.get(sessionKey);
@@ -345,6 +380,7 @@ export class ChatManager {
} }
private async startTwitchChat(session: ChatSession, dest: any): Promise<void> { private async startTwitchChat(session: ChatSession, dest: any): Promise<void> {
this.logger.info({ planId: session.planId, destId: dest.linkedAccountId }, 'startTwitchChat called');
try { try {
const { account, accessToken } = await getDecryptedToken( const { account, accessToken } = await getDecryptedToken(
this.prisma, this.prisma,

View File

@@ -33,6 +33,7 @@ export class TwitchChatClient extends EventEmitter {
this.ws = new WebSocket('wss://irc-ws.chat.twitch.tv:443'); this.ws = new WebSocket('wss://irc-ws.chat.twitch.tv:443');
this.ws.on('open', () => { this.ws.on('open', () => {
console.log(`[Twitch-IRC] WebSocket open for #${this.channel}`);
if (!this.ws) return; if (!this.ws) return;
// Request capabilities // Request capabilities
this.ws.send('CAP REQ :twitch.tv/tags twitch.tv/commands'); this.ws.send('CAP REQ :twitch.tv/tags twitch.tv/commands');
@@ -45,19 +46,22 @@ export class TwitchChatClient extends EventEmitter {
this.ws.on('message', (data: Buffer) => { this.ws.on('message', (data: Buffer) => {
const raw = data.toString(); const raw = data.toString();
console.log(`[Twitch-IRC] #${this.channel} << ${raw.trim().substring(0, 200)}`);
const lines = raw.split('\r\n').filter(Boolean); const lines = raw.split('\r\n').filter(Boolean);
for (const line of lines) { for (const line of lines) {
this.handleLine(line); this.handleLine(line);
} }
}); });
this.ws.on('close', () => { this.ws.on('close', (code: number, reason: Buffer) => {
console.log(`[Twitch-IRC] #${this.channel} closed: ${code} ${reason.toString()}`);
this.connected = false; this.connected = false;
this.cleanup(); this.cleanup();
this.emit('disconnected'); this.emit('disconnected');
}); });
this.ws.on('error', (err: Error) => { this.ws.on('error', (err: Error) => {
console.log(`[Twitch-IRC] #${this.channel} error: ${err.message}`);
this.emit('error', err); this.emit('error', err);
}); });
@@ -106,8 +110,8 @@ export class TwitchChatClient extends EventEmitter {
} }
// Handle auth failure // Handle auth failure
if (line.includes('NOTICE') && line.includes('Login authentication failed')) { if (line.includes('NOTICE') && (line.includes('Login unsuccessful') || line.includes('Login authentication failed'))) {
this.emit('error', new Error('missing_scopes')); this.emit('error', new Error('Twitch login failed — token may be expired or missing chat scopes'));
this.disconnect(); this.disconnect();
return; return;
} }