Add Device/Video models, signaling WebSocket, device and content routes

- Prisma: Device model (Quest/Phone, online status, battery, storage,
  game/streaming/cortex state), Video model with likes, VideoLike
- Signaling WebSocket for SDP/ICE relay and device presence
- Device routes: list, status, delete
- Content routes: video CRUD with range-support streaming
- SignalingManager service for device socket registry and heartbeat
This commit is contained in:
2026-03-04 14:41:15 +01:00
parent 7e99a053da
commit 36dce50b64
7 changed files with 541 additions and 1 deletions

View File

@@ -25,6 +25,9 @@ model User {
followers Follow[] @relation("following")
following Follow[] @relation("follower")
likes Like[]
devices Device[]
videos Video[]
videoLikes VideoLike[]
}
model PairingCode {
@@ -130,3 +133,54 @@ model Like {
@@unique([userId, planId])
@@index([planId])
}
model Device {
id String @id @default(uuid())
userId String
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
deviceId String
deviceName String @default("")
deviceType String @default("QUEST") // "QUEST" or "PHONE"
isOnline Boolean @default(false)
lastSeen DateTime @default(now())
batteryLevel Int?
storageAvailable Int?
runningGame String?
streamingState String?
cortexState String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@unique([userId, deviceId])
@@index([userId])
}
model Video {
id String @id @default(uuid())
userId String
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
title String
description String @default("")
duration Int @default(0)
thumbnailUrl String?
videoUrl String
fileSize Int?
isPublic Boolean @default(true)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
likes VideoLike[]
@@index([userId])
}
model VideoLike {
id String @id @default(uuid())
userId String
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
videoId String
video Video @relation(fields: [videoId], references: [id], onDelete: Cascade)
createdAt DateTime @default(now())
@@unique([userId, videoId])
@@index([videoId])
}

View File

@@ -21,7 +21,11 @@ import followingRoutes from './routes/social/following.js';
import feedRoutes from './routes/social/feed.js';
import likesRoutes from './routes/social/likes.js';
import { createChatRoutes } from './routes/chat/websocket.js';
import { createSignalingRoutes } from './routes/signaling/websocket.js';
import deviceRoutes from './routes/devices/status.js';
import videoRoutes from './routes/content/videos.js';
import { ChatManager } from './services/chat-manager.service.js';
import { SignalingManager } from './services/signaling-manager.service.js';
import { config } from './config.js';
export async function buildApp() {
@@ -47,8 +51,12 @@ export async function buildApp() {
await app.register(websocket);
await app.register(multipart);
// Chat manager (instantiated after prisma is available)
// Managers (instantiated after prisma is available)
const chatManager = new ChatManager(app.prisma, app.log);
const signalingManager = new SignalingManager(app.prisma, app.log);
// Expose signalingManager for cleanup
app.decorate('signalingManager', signalingManager);
// Routes
await app.register(pageRoutes);
@@ -66,6 +74,9 @@ export async function buildApp() {
await app.register(feedRoutes);
await app.register(likesRoutes);
await app.register(createChatRoutes(chatManager));
await app.register(createSignalingRoutes(signalingManager));
await app.register(deviceRoutes);
await app.register(videoRoutes);
return app;
}

View File

@@ -17,6 +17,7 @@ async function main() {
process.on(signal, async () => {
app.log.info(`Received ${signal}, shutting down...`);
stopTokenRefreshScheduler();
(app as any).signalingManager?.cleanup();
await app.close();
process.exit(0);
});

View File

@@ -0,0 +1,164 @@
import { FastifyPluginAsync } from 'fastify';
import { requireAuth } from '../../middleware/require-auth.js';
import { optionalAuth } from '../../middleware/require-auth.js';
import { createReadStream, existsSync, mkdirSync, statSync } from 'fs';
import { join } from 'path';
import { pipeline } from 'stream/promises';
import { createWriteStream } from 'fs';
const VIDEOS_DIR = join(process.cwd(), 'data', 'videos');
// Ensure videos directory exists
if (!existsSync(VIDEOS_DIR)) {
mkdirSync(VIDEOS_DIR, { recursive: true });
}
const videoRoutes: FastifyPluginAsync = async (fastify) => {
// GET /content/videos — list videos (public + own)
fastify.get<{ Querystring: { cursor?: string; limit?: string } }>('/content/videos', {
preHandler: [optionalAuth],
}, async (request) => {
const limit = Math.min(parseInt(request.query.limit || '20', 10), 50);
const cursorId = request.query.cursor;
const where: any = {
OR: [
{ isPublic: true },
...(request.userId ? [{ userId: request.userId }] : []),
],
};
const videos = await fastify.prisma.video.findMany({
where,
include: {
user: { select: { id: true, displayName: true, avatarUrl: true } },
_count: { select: { likes: true } },
},
orderBy: { createdAt: 'desc' },
take: limit + 1,
...(cursorId ? { cursor: { id: cursorId }, skip: 1 } : {}),
});
const hasMore = videos.length > limit;
const items = videos.slice(0, limit);
return {
items: items.map(v => ({
id: v.id,
title: v.title,
description: v.description,
duration: v.duration,
thumbnailUrl: v.thumbnailUrl,
videoUrl: `/content/videos/${v.id}/stream`,
fileSize: v.fileSize,
isPublic: v.isPublic,
createdAt: v.createdAt.toISOString(),
likeCount: v._count.likes,
user: v.user,
})),
nextCursor: hasMore ? items[items.length - 1].id : null,
};
});
// POST /content/videos — upload a video (multipart)
fastify.post('/content/videos', {
preHandler: [requireAuth],
}, async (request, reply) => {
const data = await request.file();
if (!data) {
return reply.status(400).send({ error: 'No file uploaded' });
}
const title = (data.fields.title as any)?.value || 'Untitled';
const description = (data.fields.description as any)?.value || '';
const duration = parseInt((data.fields.duration as any)?.value || '0', 10);
const isPublic = (data.fields.isPublic as any)?.value !== 'false';
// Create DB record first
const video = await fastify.prisma.video.create({
data: {
userId: request.userId,
title,
description,
duration,
videoUrl: '', // will update after save
isPublic,
},
});
// Save file
const filename = `${video.id}.mp4`;
const filePath = join(VIDEOS_DIR, filename);
await pipeline(data.file, createWriteStream(filePath));
const stats = statSync(filePath);
// Update with actual URL and size
const updated = await fastify.prisma.video.update({
where: { id: video.id },
data: {
videoUrl: `/content/videos/${video.id}/stream`,
fileSize: stats.size,
},
});
return {
id: updated.id,
title: updated.title,
description: updated.description,
duration: updated.duration,
thumbnailUrl: updated.thumbnailUrl,
videoUrl: updated.videoUrl,
fileSize: updated.fileSize,
isPublic: updated.isPublic,
createdAt: updated.createdAt.toISOString(),
};
});
// GET /content/videos/:id/stream — stream video with range support
fastify.get<{ Params: { id: string } }>('/content/videos/:id/stream', async (request, reply) => {
const video = await fastify.prisma.video.findUnique({
where: { id: request.params.id },
});
if (!video) {
return reply.status(404).send({ error: 'Video not found' });
}
const filePath = join(VIDEOS_DIR, `${video.id}.mp4`);
if (!existsSync(filePath)) {
return reply.status(404).send({ error: 'Video file not found' });
}
const stat = statSync(filePath);
const fileSize = stat.size;
const range = request.headers.range;
if (range) {
const parts = range.replace(/bytes=/, '').split('-');
const start = parseInt(parts[0], 10);
const end = parts[1] ? parseInt(parts[1], 10) : fileSize - 1;
const chunkSize = end - start + 1;
reply
.status(206)
.header('Content-Range', `bytes ${start}-${end}/${fileSize}`)
.header('Accept-Ranges', 'bytes')
.header('Content-Length', chunkSize)
.header('Content-Type', 'video/mp4')
.header('Cache-Control', 'public, max-age=30');
return reply.send(createReadStream(filePath, { start, end }));
}
reply
.header('Content-Length', fileSize)
.header('Content-Type', 'video/mp4')
.header('Accept-Ranges', 'bytes')
.header('Cache-Control', 'public, max-age=30');
return reply.send(createReadStream(filePath));
});
};
export default videoRoutes;

View File

@@ -0,0 +1,73 @@
import { FastifyPluginAsync } from 'fastify';
import { requireAuth } from '../../middleware/require-auth.js';
const deviceRoutes: FastifyPluginAsync = async (fastify) => {
// GET /devices — list user's registered devices
fastify.get('/devices', {
preHandler: [requireAuth],
}, async (request) => {
const devices = await fastify.prisma.device.findMany({
where: { userId: request.userId },
orderBy: { lastSeen: 'desc' },
});
return devices.map(d => ({
id: d.id,
deviceId: d.deviceId,
deviceName: d.deviceName,
deviceType: d.deviceType,
isOnline: d.isOnline,
lastSeen: d.lastSeen.toISOString(),
batteryLevel: d.batteryLevel,
storageAvailable: d.storageAvailable,
runningGame: d.runningGame,
streamingState: d.streamingState,
cortexState: d.cortexState,
}));
});
// GET /devices/:id/status — get specific device status
fastify.get<{ Params: { id: string } }>('/devices/:id/status', {
preHandler: [requireAuth],
}, async (request, reply) => {
const device = await fastify.prisma.device.findFirst({
where: { id: request.params.id, userId: request.userId },
});
if (!device) {
return reply.status(404).send({ error: 'Device not found' });
}
return {
id: device.id,
deviceId: device.deviceId,
deviceName: device.deviceName,
deviceType: device.deviceType,
isOnline: device.isOnline,
lastSeen: device.lastSeen.toISOString(),
batteryLevel: device.batteryLevel,
storageAvailable: device.storageAvailable,
runningGame: device.runningGame,
streamingState: device.streamingState,
cortexState: device.cortexState,
};
});
// DELETE /devices/:id — remove a device
fastify.delete<{ Params: { id: string } }>('/devices/:id', {
preHandler: [requireAuth],
}, async (request, reply) => {
const device = await fastify.prisma.device.findFirst({
where: { id: request.params.id, userId: request.userId },
});
if (!device) {
return reply.status(404).send({ error: 'Device not found' });
}
await fastify.prisma.device.delete({ where: { id: device.id } });
return { success: true };
});
};
export default deviceRoutes;

View File

@@ -0,0 +1,33 @@
import { FastifyPluginAsync } from 'fastify';
import { verifyAccessToken } from '../../plugins/auth.js';
import { SignalingManager } from '../../services/signaling-manager.service.js';
export function createSignalingRoutes(signalingManager: SignalingManager): FastifyPluginAsync {
const signalingRoutes: FastifyPluginAsync = async (fastify) => {
// GET /signaling/ws?token=<accessToken>
fastify.get('/signaling/ws', { websocket: true }, async (socket, request) => {
const url = new URL(request.url, `http://${request.headers.host}`);
const token = url.searchParams.get('token');
if (!token) {
socket.send(JSON.stringify({ type: 'error', error: 'Missing token' }));
socket.close();
return;
}
let userId: string;
try {
userId = await verifyAccessToken(token);
} catch {
socket.send(JSON.stringify({ type: 'error', error: 'Invalid or expired token' }));
socket.close();
return;
}
request.log.info({ userId }, 'Signaling WebSocket connected');
await signalingManager.handleConnection(userId, socket);
});
};
return signalingRoutes;
}

View File

@@ -0,0 +1,204 @@
import { PrismaClient } from '@prisma/client';
import { WebSocket } from 'ws';
import type { FastifyBaseLogger } from 'fastify';
interface DeviceSocket {
userId: string;
deviceId: string;
deviceType: string;
socket: WebSocket;
lastHeartbeat: number;
}
interface SignalingWsMessage {
type: 'register_device' | 'list_devices' | 'offer' | 'answer' | 'ice_candidate' | 'heartbeat';
deviceId?: string;
deviceType?: string;
deviceName?: string;
to?: string;
sdp?: string;
sdpType?: string;
candidate?: { sdpMid: string; sdpMLineIndex: number; sdp: string };
}
export class SignalingManager {
private sockets = new Map<string, DeviceSocket>(); // key = `${userId}:${deviceId}`
private heartbeatInterval: NodeJS.Timeout | null = null;
constructor(
private prisma: PrismaClient,
private log: FastifyBaseLogger,
) {
// Heartbeat check every 30s
this.heartbeatInterval = setInterval(() => this.checkHeartbeats(), 30_000);
}
async handleConnection(userId: string, socket: WebSocket) {
socket.on('message', async (data: Buffer) => {
try {
const msg: SignalingWsMessage = JSON.parse(data.toString());
await this.handleMessage(userId, socket, msg);
} catch (err) {
this.log.error({ err }, 'Signaling message error');
socket.send(JSON.stringify({ type: 'error', error: 'Invalid message' }));
}
});
socket.on('close', () => {
this.removeSocket(userId, socket);
});
socket.on('error', () => {
this.removeSocket(userId, socket);
});
}
private async handleMessage(userId: string, socket: WebSocket, msg: SignalingWsMessage) {
switch (msg.type) {
case 'register_device': {
if (!msg.deviceId) return;
const key = `${userId}:${msg.deviceId}`;
this.sockets.set(key, {
userId,
deviceId: msg.deviceId,
deviceType: msg.deviceType || 'QUEST',
socket,
lastHeartbeat: Date.now(),
});
// Upsert device in DB
await this.prisma.device.upsert({
where: { userId_deviceId: { userId, deviceId: msg.deviceId } },
update: {
isOnline: true,
lastSeen: new Date(),
deviceName: msg.deviceName || '',
deviceType: msg.deviceType || 'QUEST',
},
create: {
userId,
deviceId: msg.deviceId,
deviceName: msg.deviceName || '',
deviceType: msg.deviceType || 'QUEST',
isOnline: true,
},
});
socket.send(JSON.stringify({ type: 'registered', deviceId: msg.deviceId }));
this.log.info({ userId, deviceId: msg.deviceId }, 'Device registered');
break;
}
case 'list_devices': {
// Return all online devices for this user
const devices: { deviceId: string; deviceName: string; userId: string; isOnline: boolean }[] = [];
for (const [, ds] of this.sockets) {
if (ds.userId === userId) {
devices.push({
deviceId: ds.deviceId,
deviceName: '',
userId: ds.userId,
isOnline: true,
});
}
}
socket.send(JSON.stringify({ type: 'device_list', devices }));
break;
}
case 'offer':
case 'answer':
case 'ice_candidate': {
if (!msg.to) return;
// Find target socket
const targetKey = `${userId}:${msg.to}`;
const target = this.sockets.get(targetKey);
// Also check if `to` is a deviceId belonging to the same user
if (target && target.socket.readyState === WebSocket.OPEN) {
target.socket.send(JSON.stringify({
type: msg.type,
from: this.getDeviceIdForSocket(userId, socket),
sdp: msg.sdp,
sdpType: msg.sdpType,
candidate: msg.candidate,
}));
} else {
// Try to find by just deviceId across all users (for same-user cross-device)
for (const [, ds] of this.sockets) {
if (ds.deviceId === msg.to && ds.userId === userId && ds.socket !== socket) {
if (ds.socket.readyState === WebSocket.OPEN) {
ds.socket.send(JSON.stringify({
type: msg.type,
from: this.getDeviceIdForSocket(userId, socket),
sdp: msg.sdp,
sdpType: msg.sdpType,
candidate: msg.candidate,
}));
}
return;
}
}
socket.send(JSON.stringify({ type: 'error', error: 'Target device not found' }));
}
break;
}
case 'heartbeat': {
const deviceId = msg.deviceId || this.getDeviceIdForSocket(userId, socket);
if (deviceId) {
const key = `${userId}:${deviceId}`;
const ds = this.sockets.get(key);
if (ds) ds.lastHeartbeat = Date.now();
}
socket.send(JSON.stringify({ type: 'heartbeat_ack' }));
break;
}
}
}
private getDeviceIdForSocket(userId: string, socket: WebSocket): string | undefined {
for (const [, ds] of this.sockets) {
if (ds.userId === userId && ds.socket === socket) {
return ds.deviceId;
}
}
return undefined;
}
private removeSocket(userId: string, socket: WebSocket) {
for (const [key, ds] of this.sockets) {
if (ds.userId === userId && ds.socket === socket) {
this.sockets.delete(key);
// Mark device offline
this.prisma.device.updateMany({
where: { userId, deviceId: ds.deviceId },
data: { isOnline: false, lastSeen: new Date() },
}).catch(() => {});
this.log.info({ userId, deviceId: ds.deviceId }, 'Device disconnected');
break;
}
}
}
private checkHeartbeats() {
const timeout = 60_000; // 60s
const now = Date.now();
for (const [key, ds] of this.sockets) {
if (now - ds.lastHeartbeat > timeout) {
this.log.info({ userId: ds.userId, deviceId: ds.deviceId }, 'Heartbeat timeout');
ds.socket.close();
this.sockets.delete(key);
}
}
}
cleanup() {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
}
for (const [, ds] of this.sockets) {
ds.socket.close();
}
this.sockets.clear();
}
}