From 36dce50b6428f2a9be144afaa8f28ee14880d1fb Mon Sep 17 00:00:00 2001 From: omigamedev Date: Wed, 4 Mar 2026 14:41:15 +0100 Subject: [PATCH] Add Device/Video models, signaling WebSocket, device and content routes - Prisma: Device model (Quest/Phone, online status, battery, storage, game/streaming/cortex state), Video model with likes, VideoLike - Signaling WebSocket for SDP/ICE relay and device presence - Device routes: list, status, delete - Content routes: video CRUD with range-support streaming - SignalingManager service for device socket registry and heartbeat --- prisma/schema.prisma | 54 ++++++ src/app.ts | 13 +- src/index.ts | 1 + src/routes/content/videos.ts | 164 +++++++++++++++++ src/routes/devices/status.ts | 73 ++++++++ src/routes/signaling/websocket.ts | 33 ++++ src/services/signaling-manager.service.ts | 204 ++++++++++++++++++++++ 7 files changed, 541 insertions(+), 1 deletion(-) create mode 100644 src/routes/content/videos.ts create mode 100644 src/routes/devices/status.ts create mode 100644 src/routes/signaling/websocket.ts create mode 100644 src/services/signaling-manager.service.ts diff --git a/prisma/schema.prisma b/prisma/schema.prisma index d8029b6..15c769e 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -25,6 +25,9 @@ model User { followers Follow[] @relation("following") following Follow[] @relation("follower") likes Like[] + devices Device[] + videos Video[] + videoLikes VideoLike[] } model PairingCode { @@ -130,3 +133,54 @@ model Like { @@unique([userId, planId]) @@index([planId]) } + +model Device { + id String @id @default(uuid()) + userId String + user User @relation(fields: [userId], references: [id], onDelete: Cascade) + deviceId String + deviceName String @default("") + deviceType String @default("QUEST") // "QUEST" or "PHONE" + isOnline Boolean @default(false) + lastSeen DateTime @default(now()) + batteryLevel Int? + storageAvailable Int? + runningGame String? + streamingState String? + cortexState String? + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@unique([userId, deviceId]) + @@index([userId]) +} + +model Video { + id String @id @default(uuid()) + userId String + user User @relation(fields: [userId], references: [id], onDelete: Cascade) + title String + description String @default("") + duration Int @default(0) + thumbnailUrl String? + videoUrl String + fileSize Int? + isPublic Boolean @default(true) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + likes VideoLike[] + + @@index([userId]) +} + +model VideoLike { + id String @id @default(uuid()) + userId String + user User @relation(fields: [userId], references: [id], onDelete: Cascade) + videoId String + video Video @relation(fields: [videoId], references: [id], onDelete: Cascade) + createdAt DateTime @default(now()) + + @@unique([userId, videoId]) + @@index([videoId]) +} diff --git a/src/app.ts b/src/app.ts index aaffa39..9580b4c 100644 --- a/src/app.ts +++ b/src/app.ts @@ -21,7 +21,11 @@ import followingRoutes from './routes/social/following.js'; import feedRoutes from './routes/social/feed.js'; import likesRoutes from './routes/social/likes.js'; import { createChatRoutes } from './routes/chat/websocket.js'; +import { createSignalingRoutes } from './routes/signaling/websocket.js'; +import deviceRoutes from './routes/devices/status.js'; +import videoRoutes from './routes/content/videos.js'; import { ChatManager } from './services/chat-manager.service.js'; +import { SignalingManager } from './services/signaling-manager.service.js'; import { config } from './config.js'; export async function buildApp() { @@ -47,8 +51,12 @@ export async function buildApp() { await app.register(websocket); await app.register(multipart); - // Chat manager (instantiated after prisma is available) + // Managers (instantiated after prisma is available) const chatManager = new ChatManager(app.prisma, app.log); + const signalingManager = new SignalingManager(app.prisma, app.log); + + // Expose signalingManager for cleanup + app.decorate('signalingManager', signalingManager); // Routes await app.register(pageRoutes); @@ -66,6 +74,9 @@ export async function buildApp() { await app.register(feedRoutes); await app.register(likesRoutes); await app.register(createChatRoutes(chatManager)); + await app.register(createSignalingRoutes(signalingManager)); + await app.register(deviceRoutes); + await app.register(videoRoutes); return app; } diff --git a/src/index.ts b/src/index.ts index 419e899..397a78d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -17,6 +17,7 @@ async function main() { process.on(signal, async () => { app.log.info(`Received ${signal}, shutting down...`); stopTokenRefreshScheduler(); + (app as any).signalingManager?.cleanup(); await app.close(); process.exit(0); }); diff --git a/src/routes/content/videos.ts b/src/routes/content/videos.ts new file mode 100644 index 0000000..78d4aaa --- /dev/null +++ b/src/routes/content/videos.ts @@ -0,0 +1,164 @@ +import { FastifyPluginAsync } from 'fastify'; +import { requireAuth } from '../../middleware/require-auth.js'; +import { optionalAuth } from '../../middleware/require-auth.js'; +import { createReadStream, existsSync, mkdirSync, statSync } from 'fs'; +import { join } from 'path'; +import { pipeline } from 'stream/promises'; +import { createWriteStream } from 'fs'; + +const VIDEOS_DIR = join(process.cwd(), 'data', 'videos'); + +// Ensure videos directory exists +if (!existsSync(VIDEOS_DIR)) { + mkdirSync(VIDEOS_DIR, { recursive: true }); +} + +const videoRoutes: FastifyPluginAsync = async (fastify) => { + // GET /content/videos — list videos (public + own) + fastify.get<{ Querystring: { cursor?: string; limit?: string } }>('/content/videos', { + preHandler: [optionalAuth], + }, async (request) => { + const limit = Math.min(parseInt(request.query.limit || '20', 10), 50); + const cursorId = request.query.cursor; + + const where: any = { + OR: [ + { isPublic: true }, + ...(request.userId ? [{ userId: request.userId }] : []), + ], + }; + + const videos = await fastify.prisma.video.findMany({ + where, + include: { + user: { select: { id: true, displayName: true, avatarUrl: true } }, + _count: { select: { likes: true } }, + }, + orderBy: { createdAt: 'desc' }, + take: limit + 1, + ...(cursorId ? { cursor: { id: cursorId }, skip: 1 } : {}), + }); + + const hasMore = videos.length > limit; + const items = videos.slice(0, limit); + + return { + items: items.map(v => ({ + id: v.id, + title: v.title, + description: v.description, + duration: v.duration, + thumbnailUrl: v.thumbnailUrl, + videoUrl: `/content/videos/${v.id}/stream`, + fileSize: v.fileSize, + isPublic: v.isPublic, + createdAt: v.createdAt.toISOString(), + likeCount: v._count.likes, + user: v.user, + })), + nextCursor: hasMore ? items[items.length - 1].id : null, + }; + }); + + // POST /content/videos — upload a video (multipart) + fastify.post('/content/videos', { + preHandler: [requireAuth], + }, async (request, reply) => { + const data = await request.file(); + if (!data) { + return reply.status(400).send({ error: 'No file uploaded' }); + } + + const title = (data.fields.title as any)?.value || 'Untitled'; + const description = (data.fields.description as any)?.value || ''; + const duration = parseInt((data.fields.duration as any)?.value || '0', 10); + const isPublic = (data.fields.isPublic as any)?.value !== 'false'; + + // Create DB record first + const video = await fastify.prisma.video.create({ + data: { + userId: request.userId, + title, + description, + duration, + videoUrl: '', // will update after save + isPublic, + }, + }); + + // Save file + const filename = `${video.id}.mp4`; + const filePath = join(VIDEOS_DIR, filename); + await pipeline(data.file, createWriteStream(filePath)); + + const stats = statSync(filePath); + + // Update with actual URL and size + const updated = await fastify.prisma.video.update({ + where: { id: video.id }, + data: { + videoUrl: `/content/videos/${video.id}/stream`, + fileSize: stats.size, + }, + }); + + return { + id: updated.id, + title: updated.title, + description: updated.description, + duration: updated.duration, + thumbnailUrl: updated.thumbnailUrl, + videoUrl: updated.videoUrl, + fileSize: updated.fileSize, + isPublic: updated.isPublic, + createdAt: updated.createdAt.toISOString(), + }; + }); + + // GET /content/videos/:id/stream — stream video with range support + fastify.get<{ Params: { id: string } }>('/content/videos/:id/stream', async (request, reply) => { + const video = await fastify.prisma.video.findUnique({ + where: { id: request.params.id }, + }); + + if (!video) { + return reply.status(404).send({ error: 'Video not found' }); + } + + const filePath = join(VIDEOS_DIR, `${video.id}.mp4`); + if (!existsSync(filePath)) { + return reply.status(404).send({ error: 'Video file not found' }); + } + + const stat = statSync(filePath); + const fileSize = stat.size; + const range = request.headers.range; + + if (range) { + const parts = range.replace(/bytes=/, '').split('-'); + const start = parseInt(parts[0], 10); + const end = parts[1] ? parseInt(parts[1], 10) : fileSize - 1; + const chunkSize = end - start + 1; + + reply + .status(206) + .header('Content-Range', `bytes ${start}-${end}/${fileSize}`) + .header('Accept-Ranges', 'bytes') + .header('Content-Length', chunkSize) + .header('Content-Type', 'video/mp4') + .header('Cache-Control', 'public, max-age=30'); + + return reply.send(createReadStream(filePath, { start, end })); + } + + reply + .header('Content-Length', fileSize) + .header('Content-Type', 'video/mp4') + .header('Accept-Ranges', 'bytes') + .header('Cache-Control', 'public, max-age=30'); + + return reply.send(createReadStream(filePath)); + }); +}; + +export default videoRoutes; diff --git a/src/routes/devices/status.ts b/src/routes/devices/status.ts new file mode 100644 index 0000000..8dd187f --- /dev/null +++ b/src/routes/devices/status.ts @@ -0,0 +1,73 @@ +import { FastifyPluginAsync } from 'fastify'; +import { requireAuth } from '../../middleware/require-auth.js'; + +const deviceRoutes: FastifyPluginAsync = async (fastify) => { + // GET /devices — list user's registered devices + fastify.get('/devices', { + preHandler: [requireAuth], + }, async (request) => { + const devices = await fastify.prisma.device.findMany({ + where: { userId: request.userId }, + orderBy: { lastSeen: 'desc' }, + }); + + return devices.map(d => ({ + id: d.id, + deviceId: d.deviceId, + deviceName: d.deviceName, + deviceType: d.deviceType, + isOnline: d.isOnline, + lastSeen: d.lastSeen.toISOString(), + batteryLevel: d.batteryLevel, + storageAvailable: d.storageAvailable, + runningGame: d.runningGame, + streamingState: d.streamingState, + cortexState: d.cortexState, + })); + }); + + // GET /devices/:id/status — get specific device status + fastify.get<{ Params: { id: string } }>('/devices/:id/status', { + preHandler: [requireAuth], + }, async (request, reply) => { + const device = await fastify.prisma.device.findFirst({ + where: { id: request.params.id, userId: request.userId }, + }); + + if (!device) { + return reply.status(404).send({ error: 'Device not found' }); + } + + return { + id: device.id, + deviceId: device.deviceId, + deviceName: device.deviceName, + deviceType: device.deviceType, + isOnline: device.isOnline, + lastSeen: device.lastSeen.toISOString(), + batteryLevel: device.batteryLevel, + storageAvailable: device.storageAvailable, + runningGame: device.runningGame, + streamingState: device.streamingState, + cortexState: device.cortexState, + }; + }); + + // DELETE /devices/:id — remove a device + fastify.delete<{ Params: { id: string } }>('/devices/:id', { + preHandler: [requireAuth], + }, async (request, reply) => { + const device = await fastify.prisma.device.findFirst({ + where: { id: request.params.id, userId: request.userId }, + }); + + if (!device) { + return reply.status(404).send({ error: 'Device not found' }); + } + + await fastify.prisma.device.delete({ where: { id: device.id } }); + return { success: true }; + }); +}; + +export default deviceRoutes; diff --git a/src/routes/signaling/websocket.ts b/src/routes/signaling/websocket.ts new file mode 100644 index 0000000..dc22d11 --- /dev/null +++ b/src/routes/signaling/websocket.ts @@ -0,0 +1,33 @@ +import { FastifyPluginAsync } from 'fastify'; +import { verifyAccessToken } from '../../plugins/auth.js'; +import { SignalingManager } from '../../services/signaling-manager.service.js'; + +export function createSignalingRoutes(signalingManager: SignalingManager): FastifyPluginAsync { + const signalingRoutes: FastifyPluginAsync = async (fastify) => { + // GET /signaling/ws?token= + fastify.get('/signaling/ws', { websocket: true }, async (socket, request) => { + const url = new URL(request.url, `http://${request.headers.host}`); + const token = url.searchParams.get('token'); + + if (!token) { + socket.send(JSON.stringify({ type: 'error', error: 'Missing token' })); + socket.close(); + return; + } + + let userId: string; + try { + userId = await verifyAccessToken(token); + } catch { + socket.send(JSON.stringify({ type: 'error', error: 'Invalid or expired token' })); + socket.close(); + return; + } + + request.log.info({ userId }, 'Signaling WebSocket connected'); + await signalingManager.handleConnection(userId, socket); + }); + }; + + return signalingRoutes; +} diff --git a/src/services/signaling-manager.service.ts b/src/services/signaling-manager.service.ts new file mode 100644 index 0000000..d927650 --- /dev/null +++ b/src/services/signaling-manager.service.ts @@ -0,0 +1,204 @@ +import { PrismaClient } from '@prisma/client'; +import { WebSocket } from 'ws'; +import type { FastifyBaseLogger } from 'fastify'; + +interface DeviceSocket { + userId: string; + deviceId: string; + deviceType: string; + socket: WebSocket; + lastHeartbeat: number; +} + +interface SignalingWsMessage { + type: 'register_device' | 'list_devices' | 'offer' | 'answer' | 'ice_candidate' | 'heartbeat'; + deviceId?: string; + deviceType?: string; + deviceName?: string; + to?: string; + sdp?: string; + sdpType?: string; + candidate?: { sdpMid: string; sdpMLineIndex: number; sdp: string }; +} + +export class SignalingManager { + private sockets = new Map(); // key = `${userId}:${deviceId}` + private heartbeatInterval: NodeJS.Timeout | null = null; + + constructor( + private prisma: PrismaClient, + private log: FastifyBaseLogger, + ) { + // Heartbeat check every 30s + this.heartbeatInterval = setInterval(() => this.checkHeartbeats(), 30_000); + } + + async handleConnection(userId: string, socket: WebSocket) { + socket.on('message', async (data: Buffer) => { + try { + const msg: SignalingWsMessage = JSON.parse(data.toString()); + await this.handleMessage(userId, socket, msg); + } catch (err) { + this.log.error({ err }, 'Signaling message error'); + socket.send(JSON.stringify({ type: 'error', error: 'Invalid message' })); + } + }); + + socket.on('close', () => { + this.removeSocket(userId, socket); + }); + + socket.on('error', () => { + this.removeSocket(userId, socket); + }); + } + + private async handleMessage(userId: string, socket: WebSocket, msg: SignalingWsMessage) { + switch (msg.type) { + case 'register_device': { + if (!msg.deviceId) return; + const key = `${userId}:${msg.deviceId}`; + this.sockets.set(key, { + userId, + deviceId: msg.deviceId, + deviceType: msg.deviceType || 'QUEST', + socket, + lastHeartbeat: Date.now(), + }); + + // Upsert device in DB + await this.prisma.device.upsert({ + where: { userId_deviceId: { userId, deviceId: msg.deviceId } }, + update: { + isOnline: true, + lastSeen: new Date(), + deviceName: msg.deviceName || '', + deviceType: msg.deviceType || 'QUEST', + }, + create: { + userId, + deviceId: msg.deviceId, + deviceName: msg.deviceName || '', + deviceType: msg.deviceType || 'QUEST', + isOnline: true, + }, + }); + + socket.send(JSON.stringify({ type: 'registered', deviceId: msg.deviceId })); + this.log.info({ userId, deviceId: msg.deviceId }, 'Device registered'); + break; + } + + case 'list_devices': { + // Return all online devices for this user + const devices: { deviceId: string; deviceName: string; userId: string; isOnline: boolean }[] = []; + for (const [, ds] of this.sockets) { + if (ds.userId === userId) { + devices.push({ + deviceId: ds.deviceId, + deviceName: '', + userId: ds.userId, + isOnline: true, + }); + } + } + socket.send(JSON.stringify({ type: 'device_list', devices })); + break; + } + + case 'offer': + case 'answer': + case 'ice_candidate': { + if (!msg.to) return; + // Find target socket + const targetKey = `${userId}:${msg.to}`; + const target = this.sockets.get(targetKey); + // Also check if `to` is a deviceId belonging to the same user + if (target && target.socket.readyState === WebSocket.OPEN) { + target.socket.send(JSON.stringify({ + type: msg.type, + from: this.getDeviceIdForSocket(userId, socket), + sdp: msg.sdp, + sdpType: msg.sdpType, + candidate: msg.candidate, + })); + } else { + // Try to find by just deviceId across all users (for same-user cross-device) + for (const [, ds] of this.sockets) { + if (ds.deviceId === msg.to && ds.userId === userId && ds.socket !== socket) { + if (ds.socket.readyState === WebSocket.OPEN) { + ds.socket.send(JSON.stringify({ + type: msg.type, + from: this.getDeviceIdForSocket(userId, socket), + sdp: msg.sdp, + sdpType: msg.sdpType, + candidate: msg.candidate, + })); + } + return; + } + } + socket.send(JSON.stringify({ type: 'error', error: 'Target device not found' })); + } + break; + } + + case 'heartbeat': { + const deviceId = msg.deviceId || this.getDeviceIdForSocket(userId, socket); + if (deviceId) { + const key = `${userId}:${deviceId}`; + const ds = this.sockets.get(key); + if (ds) ds.lastHeartbeat = Date.now(); + } + socket.send(JSON.stringify({ type: 'heartbeat_ack' })); + break; + } + } + } + + private getDeviceIdForSocket(userId: string, socket: WebSocket): string | undefined { + for (const [, ds] of this.sockets) { + if (ds.userId === userId && ds.socket === socket) { + return ds.deviceId; + } + } + return undefined; + } + + private removeSocket(userId: string, socket: WebSocket) { + for (const [key, ds] of this.sockets) { + if (ds.userId === userId && ds.socket === socket) { + this.sockets.delete(key); + // Mark device offline + this.prisma.device.updateMany({ + where: { userId, deviceId: ds.deviceId }, + data: { isOnline: false, lastSeen: new Date() }, + }).catch(() => {}); + this.log.info({ userId, deviceId: ds.deviceId }, 'Device disconnected'); + break; + } + } + } + + private checkHeartbeats() { + const timeout = 60_000; // 60s + const now = Date.now(); + for (const [key, ds] of this.sockets) { + if (now - ds.lastHeartbeat > timeout) { + this.log.info({ userId: ds.userId, deviceId: ds.deviceId }, 'Heartbeat timeout'); + ds.socket.close(); + this.sockets.delete(key); + } + } + } + + cleanup() { + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval); + } + for (const [, ds] of this.sockets) { + ds.socket.close(); + } + this.sockets.clear(); + } +}