Files
lck-control/app/src/main/java/com/omixlab/lckcontrol/service/LckControlService.kt
omigamedev 870139b054 YouTube/Twitch live chat integration
- WebSocket chat client with auto-reconnect and re-subscribe on connect
- ChatScreen with message bubbles, mod/broadcaster badges, send capability
- Dashboard Live Chat section with unread badges per destination
- Chat notification manager for background notifications
- Chat navigation route and ViewModel
2026-03-01 22:19:17 +01:00

566 lines
22 KiB
Kotlin

package com.omixlab.lckcontrol.service
import android.app.Notification
import android.app.NotificationChannel
import android.app.NotificationManager
import android.app.Service
import android.content.Intent
import android.content.pm.ServiceInfo
import android.os.IBinder
import android.os.RemoteCallbackList
import android.util.Log
import com.meta.horizon.platform.ovr.Core
import com.meta.horizon.platform.ovr.requests.Request
import com.meta.horizon.platform.ovr.requests.Users
import com.omixlab.lckcontrol.R
import com.omixlab.lckcontrol.chat.ChatNotificationManager
import com.omixlab.lckcontrol.data.local.AppPreferences
import com.omixlab.lckcontrol.data.local.TokenStore
import com.omixlab.lckcontrol.data.remote.LckApiService
import com.omixlab.lckcontrol.data.remote.MetaCallbackRequest
import com.omixlab.lckcontrol.data.remote.RefreshRequest
import com.omixlab.lckcontrol.data.repository.AccountRepository
import com.omixlab.lckcontrol.data.repository.ChatRepository
import com.omixlab.lckcontrol.data.repository.StreamPlanRepository
import com.omixlab.lckcontrol.shared.ConnectedClientInfo
import com.omixlab.lckcontrol.shared.ILckControlCallback
import com.omixlab.lckcontrol.shared.ILckControlService
import com.omixlab.lckcontrol.shared.LinkedAccount
import com.omixlab.lckcontrol.shared.StreamDestination
import com.omixlab.lckcontrol.shared.StreamPlan
import com.omixlab.lckcontrol.shared.StreamPlanConfig
import com.omixlab.lckcontrol.shared.StreamingConfig
import com.omixlab.lckcontrol.streaming.StreamingManager
import com.omixlab.lckcontrol.streaming.StreamingState
import dagger.hilt.android.AndroidEntryPoint
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import javax.inject.Inject
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
@AndroidEntryPoint
class LckControlService : Service() {
companion object {
private const val TAG = "LckControlService"
private const val CHANNEL_ID = "lck_control_service"
private const val NOTIFICATION_ID = 1
private const val QUEST_APP_ID = "25653777174321448"
private const val TOKEN_REFRESH_INTERVAL_MS = 60_000L
private const val ACTION_BIND_STREAMING = "com.omixlab.lckcontrol.BIND_STREAMING"
}
@Inject lateinit var appPreferences: AppPreferences
@Inject lateinit var accountRepository: AccountRepository
@Inject lateinit var streamPlanRepository: StreamPlanRepository
@Inject lateinit var tokenStore: TokenStore
@Inject lateinit var apiService: LckApiService
@Inject lateinit var streamingManager: StreamingManager
@Inject lateinit var chatRepository: ChatRepository
@Inject lateinit var chatNotificationManager: ChatNotificationManager
private val serviceScope = CoroutineScope(SupervisorJob() + Dispatchers.Main)
private val clientTracker = ClientTracker()
private var streamingServiceImpl: LckStreamingServiceImpl? = null
private val callbacks = object : RemoteCallbackList<ILckControlCallback>() {
override fun onCallbackDied(callback: ILckControlCallback, cookie: Any?) {
val uid = cookie as? Int ?: return
serviceScope.launch {
val removed = clientTracker.unregisterByUid(uid)
for (client in removed) {
Log.d(TAG, "Auto-unregistered client ${client.clientId} (${client.packageName}) - process died")
broadcastClientUnregistered(client.clientId)
}
}
}
}
private val binder = object : ILckControlService.Stub() {
// ── Auth ────────────────────────────────────────────
override fun isAuthenticated(): Boolean = tokenStore.isLoggedIn()
override fun login() {
serviceScope.launch {
try {
doAutoLogin()
} catch (e: Exception) {
Log.e(TAG, "login() failed", e)
}
}
}
// ── Accounts ────────────────────────────────────────
override fun getLinkedAccounts(): List<LinkedAccount> = runBlocking {
accountRepository.getAccounts()
}
// ── Stream plans ────────────────────────────────────
override fun createStreamPlan(config: StreamPlanConfig): StreamPlan = runBlocking {
val plan = streamPlanRepository.createPlan(
name = config.name,
destinations = config.destinations,
executionMode = config.executionMode,
gameId = config.gameId,
)
broadcastPlansChanged()
plan
}
override fun createDefaultPlan(clientName: String): StreamPlan = runBlocking {
val accounts = accountRepository.getAccounts().filter { it.isEnabled }
val gameId = clientTracker.getAll()
.find { it.clientName == clientName }?.packageName ?: ""
val execMode = appPreferences.getDefaultExecutionMode()
Log.d(TAG, "createDefaultPlan: clientName=$clientName, executionMode=$execMode, accounts=${accounts.size}")
val destinations = accounts.map { account ->
StreamDestination(
service = account.serviceId,
linkedAccountId = account.id,
title = "Stream",
privacyStatus = "unlisted",
)
}
val plan = streamPlanRepository.createPlan(
name = "$clientName Stream",
destinations = destinations,
executionMode = execMode,
gameId = gameId,
)
Log.d(TAG, "createDefaultPlan: created plan ${plan.planId} with executionMode=${plan.executionMode}")
broadcastPlansChanged()
plan
}
override fun prepareStreamPlan(planId: String): StreamPlan = runBlocking {
streamPlanRepository.preparePlan(planId)
val plan = streamPlanRepository.getPlan(planId) ?: error("Plan not found after prepare")
broadcastPlanUpdated(plan)
plan
}
override fun getStreamPlans(): List<StreamPlan> = runBlocking {
streamPlanRepository.getPlans()
}
override fun getStreamPlan(planId: String): StreamPlan? = runBlocking {
streamPlanRepository.getPlan(planId)
}
override fun startStreamPlan(planId: String): Boolean = runBlocking {
val plan = streamPlanRepository.getPlan(planId) ?: return@runBlocking false
if (plan.status == "LIVE") {
// Plan already LIVE — ensure streaming engine is running for APP_STREAMING
if (plan.executionMode == "APP_STREAMING" && !streamingManager.isStreaming()) {
Log.d(TAG, "startStreamPlan: plan already LIVE but engine not running, starting engine")
streamingManager.startStreaming(
plan = plan,
config = StreamingConfig(),
width = 1920,
height = 1080,
)
}
return@runBlocking true
}
if (plan.status != "READY") return@runBlocking false
try {
streamPlanRepository.startPlan(planId)
val updated = streamPlanRepository.getPlan(planId)
// If APP_STREAMING mode, start the streaming engine
if (updated?.executionMode == "APP_STREAMING") {
streamingManager.startStreaming(
plan = updated,
config = StreamingConfig(),
width = 1920,
height = 1080,
)
}
if (updated != null) broadcastPlanUpdated(updated)
true
} catch (e: Throwable) {
Log.e(TAG, "startStreamPlan failed", e)
false
}
}
override fun endStreamPlan(planId: String): Boolean = runBlocking {
val plan = streamPlanRepository.getPlan(planId) ?: return@runBlocking false
if (plan.status == "ENDED") return@runBlocking true
if (plan.status != "LIVE" && plan.status != "READY") return@runBlocking false
try {
// Stop streaming engine if running
if (plan.executionMode == "APP_STREAMING") {
streamingManager.stopStreaming()
}
streamPlanRepository.endPlan(planId)
val updated = streamPlanRepository.getPlan(planId)
if (updated != null) broadcastPlanUpdated(updated)
true
} catch (_: Exception) { false }
}
// ── Clients ─────────────────────────────────────────
override fun registerClient(clientName: String, packageName: String): String {
val uid = android.os.Binder.getCallingUid()
val clientId = clientTracker.register(clientName, packageName, uid)
broadcastClientRegistered(clientId)
return clientId
}
override fun unregisterClient(clientId: String) {
clientTracker.unregister(clientId)
broadcastClientUnregistered(clientId)
}
override fun setClientActivePlan(clientId: String, planId: String) {
clientTracker.setActivePlan(clientId, planId)
}
override fun getConnectedClients(): List<ConnectedClientInfo> {
return clientTracker.getAll().map { client ->
ConnectedClientInfo(
clientId = client.clientId,
clientName = client.clientName,
packageName = client.packageName,
activePlanId = client.activePlanId,
)
}
}
// ── Callbacks ───────────────────────────────────────
override fun registerCallback(callback: ILckControlCallback) {
val uid = android.os.Binder.getCallingUid()
callbacks.register(callback, uid)
}
override fun unregisterCallback(callback: ILckControlCallback) {
callbacks.unregister(callback)
}
}
override fun onCreate() {
super.onCreate()
createNotificationChannel()
startForeground(
NOTIFICATION_ID,
buildNotification(),
ServiceInfo.FOREGROUND_SERVICE_TYPE_CONNECTED_DEVICE,
)
// Auto-login on service start
serviceScope.launch {
try {
doAutoLogin()
} catch (e: Exception) {
Log.e(TAG, "Auto-login on service start failed", e)
}
}
// Periodic token refresh
serviceScope.launch {
while (true) {
delay(TOKEN_REFRESH_INTERVAL_MS)
if (tokenStore.isLoggedIn()) {
tryRefreshToken()
}
}
}
// Forward streaming state changes to AIDL callbacks
serviceScope.launch {
streamingManager.state.collect { state ->
streamingServiceImpl?.broadcastStateChanged(state)
}
}
serviceScope.launch {
streamingManager.stats.collect { stats ->
streamingServiceImpl?.broadcastStats(
stats.videoBitrate, stats.audioBitrate, stats.fps, stats.droppedFrames,
)
}
}
// Forward buffer release events to AIDL callbacks
streamingManager.onBufferReleased = { bufferIndex ->
streamingServiceImpl?.broadcastBufferReleased(bufferIndex)
}
// Initialize chat notification manager and connect WebSocket
chatNotificationManager.init()
chatRepository.connect()
// Auto-subscribe/unsubscribe chat when plans go LIVE/ENDED
serviceScope.launch {
streamPlanRepository.observePlans().collect { plans ->
Log.d(TAG, "observePlans emitted ${plans.size} plans")
for (plan in plans) {
val destServices = plan.destinations.map { it.service }
Log.d(TAG, "Plan ${plan.planId} status=${plan.status} destinations=$destServices")
val hasChat = plan.destinations.any {
it.service == "YOUTUBE" || it.service == "TWITCH"
}
if (plan.status == "LIVE" && hasChat) {
Log.d(TAG, "Subscribing chat for plan ${plan.planId}")
chatRepository.subscribe(plan.planId)
} else if (plan.status == "ENDED") {
chatRepository.unsubscribe(plan.planId)
}
}
}
}
}
override fun onBind(intent: Intent?): IBinder? {
return when (intent?.action) {
ACTION_BIND_STREAMING -> {
if (streamingServiceImpl == null) {
streamingServiceImpl = LckStreamingServiceImpl(streamingManager)
}
streamingServiceImpl!!.asBinder()
}
else -> binder
}
}
override fun onDestroy() {
chatRepository.disconnect()
streamingManager.stopStreaming()
streamingServiceImpl?.kill()
serviceScope.cancel()
callbacks.kill()
super.onDestroy()
}
// ── Auth logic ──────────────────────────────────────────
private fun extractJwtSub(jwt: String): String? {
return try {
val parts = jwt.split(".")
if (parts.size < 2) return null
val payload = String(android.util.Base64.decode(parts[1], android.util.Base64.URL_SAFE or android.util.Base64.NO_WRAP))
org.json.JSONObject(payload).optString("sub", "").ifEmpty { null }
} catch (_: Exception) { null }
}
private suspend fun doAutoLogin() {
// Try token refresh first
val refreshToken = tokenStore.getRefreshToken()
val oldJwt = tokenStore.getJwt()
val oldSub = oldJwt?.let { extractJwtSub(it) }
Log.d(TAG, "doAutoLogin: hasRefreshToken=${refreshToken != null}, currentUserId=$oldSub")
if (refreshToken != null) {
Log.d(TAG, "Attempting token refresh...")
try {
val response = apiService.refreshSession(RefreshRequest(refreshToken))
val newSub = extractJwtSub(response.accessToken)
Log.d(TAG, "Token refresh successful, userId=$newSub (was $oldSub)")
tokenStore.saveSession(response.accessToken, response.refreshToken)
broadcastAuthStateChanged(true)
return
} catch (e: Exception) {
Log.w(TAG, "Token refresh failed, falling back to Quest SDK login", e)
tokenStore.clearSession()
}
}
// Full Quest SDK login
Log.d(TAG, "Starting Quest SDK login (previous userId=$oldSub)")
doQuestLogin()
}
private suspend fun doQuestLogin() {
if (!Core.isInitialized()) {
Log.d(TAG, "Initializing Platform SDK with appId=$QUEST_APP_ID")
val initResult = awaitWithPump { Core.asyncInitialize(QUEST_APP_ID, applicationContext) }
Log.d(TAG, "Platform SDK initialized: $initResult")
}
Log.d(TAG, "Requesting logged-in user...")
val user = awaitWithPump { Users.getLoggedInUser() }
val numericId = user.getID().toString()
val oculusId = user.oculusID
Log.d(TAG, "User: id=$numericId displayName=${user.displayName} oculusId=$oculusId")
val userId = if (user.getID() != 0L) numericId else oculusId
if (userId.isNullOrEmpty()) {
throw Exception("Platform SDK returned no user identifier")
}
var nonce = ""
try {
val proof = awaitWithPump { Users.getUserProof() }
nonce = proof.value
} catch (e: Exception) {
Log.w(TAG, "getUserProof failed (DUC pending?), proceeding without nonce", e)
}
Log.d(TAG, "Sending to backend: userId=$userId hasNonce=${nonce.isNotEmpty()}")
val response = apiService.metaCallback(
MetaCallbackRequest(
userId = userId,
nonce = nonce,
deviceInfo = android.os.Build.MODEL,
)
)
val loginSub = extractJwtSub(response.accessToken)
tokenStore.saveSession(response.accessToken, response.refreshToken)
Log.d(TAG, "Quest SDK login successful, userId=$loginSub")
broadcastAuthStateChanged(true)
}
private suspend fun tryRefreshToken() {
val refreshToken = tokenStore.getRefreshToken() ?: return
try {
val response = apiService.refreshSession(RefreshRequest(refreshToken))
tokenStore.saveSession(response.accessToken, response.refreshToken)
} catch (e: Exception) {
Log.w(TAG, "Periodic token refresh failed", e)
tokenStore.clearSession()
broadcastAuthStateChanged(false)
}
}
private suspend fun <T> awaitWithPump(
block: () -> Request<T>,
): T = suspendCoroutine { cont ->
var completed = false
block()
.onSuccess { result: T ->
if (!completed) {
completed = true
cont.resume(result)
}
}
.onError { error ->
if (!completed) {
completed = true
cont.resumeWithException(Exception("SDK error ${error.code}: ${error.message}"))
}
}
Thread {
val timeout = System.currentTimeMillis() + 15_000
while (!completed && System.currentTimeMillis() < timeout) {
try {
val msg = Core.popSDKMessage()
if (msg != null) Request.handleMessage(msg)
} catch (e: Exception) {
Log.w(TAG, "Message pump error", e)
}
Thread.sleep(50)
}
if (!completed) {
completed = true
cont.resumeWithException(Exception("Platform SDK request timed out"))
}
}.start()
}
// ── Notifications ───────────────────────────────────────
private fun createNotificationChannel() {
val channel = NotificationChannel(
CHANNEL_ID,
"LCK Control Service",
NotificationManager.IMPORTANCE_LOW,
).apply {
description = "Manages connected game clients and stream plans"
}
getSystemService(NotificationManager::class.java).createNotificationChannel(channel)
}
private fun buildNotification(): Notification =
Notification.Builder(this, CHANNEL_ID)
.setContentTitle("LCK Control")
.setContentText("Managing stream connections")
.setSmallIcon(R.drawable.ic_launcher_foreground)
.setOngoing(true)
.build()
// ── Broadcast helpers ───────────────────────────────────
private fun broadcastAuthStateChanged(authenticated: Boolean) {
val count = callbacks.beginBroadcast()
try {
for (i in 0 until count) {
try {
callbacks.getBroadcastItem(i).onAuthStateChanged(authenticated)
} catch (_: Exception) {}
}
} finally {
callbacks.finishBroadcast()
}
}
private fun broadcastPlansChanged() {
serviceScope.launch {
val plans = streamPlanRepository.getPlans()
val count = callbacks.beginBroadcast()
try {
for (i in 0 until count) {
try {
callbacks.getBroadcastItem(i).onStreamPlansChanged(plans)
} catch (_: Exception) {}
}
} finally {
callbacks.finishBroadcast()
}
}
}
private fun broadcastPlanUpdated(plan: StreamPlan) {
val count = callbacks.beginBroadcast()
try {
for (i in 0 until count) {
try {
callbacks.getBroadcastItem(i).onStreamPlanUpdated(plan)
} catch (_: Exception) {}
}
} finally {
callbacks.finishBroadcast()
}
}
private fun broadcastClientRegistered(clientId: String) {
val count = callbacks.beginBroadcast()
try {
for (i in 0 until count) {
try {
callbacks.getBroadcastItem(i).onClientRegistered(clientId)
} catch (_: Exception) {}
}
} finally {
callbacks.finishBroadcast()
}
}
private fun broadcastClientUnregistered(clientId: String) {
val count = callbacks.beginBroadcast()
try {
for (i in 0 until count) {
try {
callbacks.getBroadcastItem(i).onClientUnregistered(clientId)
} catch (_: Exception) {}
}
} finally {
callbacks.finishBroadcast()
}
}
}