From ef221ca132bd7af8c9d0a5016dcbc5b85ee61245 Mon Sep 17 00:00:00 2001 From: omigamedev Date: Sun, 1 Mar 2026 14:33:57 +0100 Subject: [PATCH] Fix streaming pipeline: timestamps, buffer release, resolution, orientation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix encoder PTS: use wall-clock relative timestamps to prevent backward jumps when transitioning from standby to game frames (MediaCodec drops) - Suppress standby frames while game is active (500ms timeout) to prevent flickering between game video and color bars - Remove standby color bar pattern, use plain dark background - Fix vertical flip and BGR→RGB swizzle in composition base pass shader - Pass buffer index through native pipeline for pool slot release callback - Start engine for already-LIVE plans when APP_STREAMING mode is active - Use texture pool dimensions for encoder resolution instead of hardcoded 1920x1080 --- app/src/main/cpp/composition_pipeline.cpp | 10 +- app/src/main/cpp/jni_bridge.cpp | 4 +- app/src/main/cpp/streaming_engine.cpp | 129 +++++++++++++----- app/src/main/cpp/streaming_engine.h | 17 ++- .../lckcontrol/service/LckControlService.kt | 19 ++- .../streaming/NativeStreamingEngine.kt | 6 +- .../lckcontrol/streaming/StreamingManager.kt | 46 ++++++- 7 files changed, 181 insertions(+), 50 deletions(-) diff --git a/app/src/main/cpp/composition_pipeline.cpp b/app/src/main/cpp/composition_pipeline.cpp index 32b5e9d..97d0aa0 100644 --- a/app/src/main/cpp/composition_pipeline.cpp +++ b/app/src/main/cpp/composition_pipeline.cpp @@ -22,6 +22,8 @@ void main() { )"; // Base pass: renders game frame (OES texture) full-screen to FBO +// - Flips V coordinate: Vulkan render targets have origin at top-left, GL at bottom-left +// - Swizzles BGR→RGB: Vulkan RGBA8 maps to BGRA in some AHardwareBuffer formats static const char* BASE_FRAGMENT_SHADER = R"(#version 300 es #extension GL_OES_EGL_image_external_essl3 : require precision mediump float; @@ -29,7 +31,9 @@ in vec2 vTexCoord; out vec4 fragColor; uniform samplerExternalOES uTexture; void main() { - fragColor = texture(uTexture, vTexCoord); + vec2 flippedCoord = vec2(vTexCoord.x, 1.0 - vTexCoord.y); + vec4 color = texture(uTexture, flippedCoord); + fragColor = vec4(color.b, color.g, color.r, color.a); } )"; @@ -226,11 +230,9 @@ void CompositionPipeline::Compose(GLuint srcOesTexture) { glClearColor(0.05f, 0.05f, 0.05f, 1.0f); glClear(GL_COLOR_BUFFER_BIT); - // Base pass: render game frame, or standby pattern if no game input + // Base pass: render game frame (or leave cleared dark background if no game input) if (srcOesTexture != 0) { RenderBasePass(srcOesTexture); - } else { - RenderStandbyPattern(); } // Overlay pass: render layers sorted by zOrder diff --git a/app/src/main/cpp/jni_bridge.cpp b/app/src/main/cpp/jni_bridge.cpp index eb0220d..f825018 100644 --- a/app/src/main/cpp/jni_bridge.cpp +++ b/app/src/main/cpp/jni_bridge.cpp @@ -105,7 +105,7 @@ Java_com_omixlab_lckcontrol_streaming_NativeStreamingEngine_nativeStart( JNIEXPORT void JNICALL Java_com_omixlab_lckcontrol_streaming_NativeStreamingEngine_nativeSubmitVideoFrame( JNIEnv* env, jobject thiz, jlong ptr, - jobject hardwareBuffer, jlong timestampNs, jint fenceFd) { + jobject hardwareBuffer, jlong timestampNs, jint fenceFd, jint bufferIndex) { auto* engine = reinterpret_cast(ptr); if (!engine) return; @@ -115,7 +115,7 @@ Java_com_omixlab_lckcontrol_streaming_NativeStreamingEngine_nativeSubmitVideoFra return; } - engine->SubmitVideoFrame(buffer, timestampNs, fenceFd); + engine->SubmitVideoFrame(buffer, timestampNs, fenceFd, bufferIndex); } JNIEXPORT void JNICALL diff --git a/app/src/main/cpp/streaming_engine.cpp b/app/src/main/cpp/streaming_engine.cpp index 105be43..4496f6d 100644 --- a/app/src/main/cpp/streaming_engine.cpp +++ b/app/src/main/cpp/streaming_engine.cpp @@ -277,6 +277,10 @@ bool StreamingEngine::Start() { firstVideoFrame = true; startTimestampNs = 0; lastComposeTimeNs = 0; + lastGameVideoFrameNs = 0; + hasExternalAudioSource.store(false); + audioPcmBuffer.clear(); + audioBufferSamplesWritten = 0; statsVideoBytes = 0; statsAudioBytes = 0; statsFrameCount = 0; @@ -361,18 +365,28 @@ void StreamingEngine::EncoderThreadFunc() { { std::lock_guard lock(videoMutex); hadVideoFrames = !videoQueue.empty(); + if (hadVideoFrames && statsFrameCount % 30 == 0) { + LOGI("Processing %zu game video frames", videoQueue.size()); + } for (auto& frame : videoQueue) { ProcessVideoFrame(frame); + // Release buffer back to the game's pool + if (bufferReleasedCallback) { + bufferReleasedCallback(frame.bufferIndex); + } } videoQueue.clear(); } - // Generate standby frames when no game input arrives + // Generate standby frames only when no game input has arrived for STANDBY_TIMEOUT_NS. + // This prevents standby frames from being interleaved with game frames. if (!hadVideoFrames && compositionPipeline.IsInitialized()) { auto now = std::chrono::steady_clock::now().time_since_epoch(); int64_t nowNs = std::chrono::duration_cast(now).count(); int64_t frameIntervalNs = 1000000000LL / framerate; - if (nowNs - lastComposeTimeNs >= frameIntervalNs) { + bool gameActive = lastGameVideoFrameNs > 0 && + (nowNs - lastGameVideoFrameNs) < STANDBY_TIMEOUT_NS; + if (!gameActive && nowNs - lastComposeTimeNs >= frameIntervalNs) { // Compose standby frame (dark background + overlays, no game texture) compositionPipeline.Compose(0); GLuint composedTex = compositionPipeline.GetComposedTexture(); @@ -395,22 +409,11 @@ void StreamingEngine::EncoderThreadFunc() { eglContext.MakeEncoderCurrent(); } - // Generate silence audio to keep the audio track alive - if (audioEncoder) { - // 1 video frame at 30fps = 1/30s ≈ 1600 samples at 48kHz - int samplesPerFrame = sampleRate / framerate; - int bytesPerFrame = samplesPerFrame * channels * 2; // 16-bit PCM - AudioFrame silenceFrame; - silenceFrame.pcmData.resize(bytesPerFrame, 0); - silenceFrame.timestampNs = nowNs; - ProcessAudioFrame(silenceFrame); - } - lastComposeTimeNs = nowNs; } } - // Process audio frames + // Process audio frames from external sources (accumulate into PCM buffer) { std::lock_guard lock(audioMutex); for (auto& frame : audioQueue) { @@ -419,12 +422,17 @@ void StreamingEngine::EncoderThreadFunc() { audioQueue.clear(); } - // Drain encoders + // Drain encoders — always drain both regardless of pending input DrainVideoEncoder(); if (audioEncoder) { DrainAudioEncoder(); } + // Flush accumulated audio in AAC-aligned chunks + if (audioEncoder) { + FlushAudioBuffer(); + } + // Update stats every second regardless of frame output UpdateStats(); @@ -465,11 +473,21 @@ void StreamingEngine::EncoderThreadFunc() { void StreamingEngine::ProcessVideoFrame(const VideoFrame& frame) { if (!frame.buffer) return; + // Use wall-clock relative timestamps (matching standby frames) to ensure + // monotonically increasing PTS. The game's own timestamp is relative to + // AppStreamingTime which starts at 0, but standby frames may have already + // advanced the encoder's PTS — backward timestamps cause MediaCodec to drop frames. + auto now = std::chrono::steady_clock::now().time_since_epoch(); + int64_t nowNs = std::chrono::duration_cast(now).count(); + if (firstVideoFrame) { - startTimestampNs = frame.timestampNs; + startTimestampNs = nowNs; firstVideoFrame = false; } + int64_t presentationNs = nowNs - startTimestampNs; + lastGameVideoFrameNs = nowNs; + // Wait on GPU fence eglContext.WaitFence(frame.fenceFd); @@ -480,6 +498,13 @@ void StreamingEngine::ProcessVideoFrame(const VideoFrame& frame) { return; } + static int sProcessCount = 0; + if (++sProcessCount <= 3 || sProcessCount % 300 == 0) { + LOGI("ProcessVideoFrame: #%d tex=%u pts=%lldms buf=%p idx=%d", + sProcessCount, texture, (long long)(presentationNs / 1000000), + frame.buffer, frame.bufferIndex); + } + // Compose: game frame + overlay layers → FBO compositionPipeline.Compose(texture); GLuint composedTex = compositionPipeline.GetComposedTexture(); @@ -487,7 +512,7 @@ void StreamingEngine::ProcessVideoFrame(const VideoFrame& frame) { // Blit composed texture → encoder surface eglContext.MakeEncoderCurrent(); BlitComposedToSurface(composedTex, width, height); - eglContext.SetPresentationTime(frame.timestampNs); + eglContext.SetPresentationTime(presentationNs); eglContext.SwapBuffers(); // Blit composed texture → preview surface (if active) @@ -503,8 +528,7 @@ void StreamingEngine::ProcessVideoFrame(const VideoFrame& frame) { glDeleteTextures(1, &texture); // Track compose time so standby frames don't overlap - auto now = std::chrono::steady_clock::now().time_since_epoch(); - lastComposeTimeNs = std::chrono::duration_cast(now).count(); + lastComposeTimeNs = nowNs; } void StreamingEngine::BlitComposedToSurface(GLuint composedTex, int viewportW, int viewportH) { @@ -523,22 +547,48 @@ void StreamingEngine::BlitComposedToSurface(GLuint composedTex, int viewportW, i void StreamingEngine::ProcessAudioFrame(const AudioFrame& frame) { if (!audioEncoder || frame.pcmData.empty()) return; - ssize_t inputIndex = AMediaCodec_dequeueInputBuffer(audioEncoder, 0); - if (inputIndex < 0) { - LOGW("No audio input buffer available"); - return; + // Accumulate PCM data; FlushAudioBuffer will submit in AAC-aligned chunks + audioPcmBuffer.insert(audioPcmBuffer.end(), frame.pcmData.begin(), frame.pcmData.end()); +} + +void StreamingEngine::FlushAudioBuffer() { + if (!audioEncoder || audioPcmBuffer.empty()) return; + + // Bytes per AAC frame: 1024 samples * channels * 2 bytes (16-bit PCM) + const size_t aacFrameBytes = AAC_FRAME_SAMPLES * channels * 2; + + while (audioPcmBuffer.size() >= aacFrameBytes) { + ssize_t inputIndex = AMediaCodec_dequeueInputBuffer(audioEncoder, 5000); + if (inputIndex < 0) { + // No buffer available — leave data for next loop iteration + break; + } + + size_t bufferSize; + uint8_t* inputBuffer = AMediaCodec_getInputBuffer(audioEncoder, inputIndex, &bufferSize); + if (!inputBuffer) break; + + size_t copySize = std::min(aacFrameBytes, bufferSize); + memcpy(inputBuffer, audioPcmBuffer.data(), copySize); + + // Timestamp based on total samples submitted (continuous, no jitter) + int64_t timestampUs = audioBufferSamplesWritten * 1000000LL / sampleRate; + AMediaCodec_queueInputBuffer(audioEncoder, inputIndex, 0, copySize, + timestampUs, 0); + + audioBufferSamplesWritten += AAC_FRAME_SAMPLES; + + // Remove consumed data from front + audioPcmBuffer.erase(audioPcmBuffer.begin(), audioPcmBuffer.begin() + copySize); } - size_t bufferSize; - uint8_t* inputBuffer = AMediaCodec_getInputBuffer(audioEncoder, inputIndex, &bufferSize); - if (!inputBuffer) return; - - size_t copySize = std::min(frame.pcmData.size(), bufferSize); - memcpy(inputBuffer, frame.pcmData.data(), copySize); - - int64_t relativeTs = frame.timestampNs - startTimestampNs; - AMediaCodec_queueInputBuffer(audioEncoder, inputIndex, 0, copySize, - relativeTs / 1000, 0); + // Prevent unbounded accumulation if encoder is stuck + const size_t maxBufferBytes = aacFrameBytes * 16; // ~340ms of audio + if (audioPcmBuffer.size() > maxBufferBytes) { + size_t excess = audioPcmBuffer.size() - maxBufferBytes; + audioPcmBuffer.erase(audioPcmBuffer.begin(), audioPcmBuffer.begin() + excess); + LOGW("Audio buffer overflow, dropped %zu bytes", excess); + } } void StreamingEngine::DrainVideoEncoder() { @@ -648,16 +698,25 @@ void StreamingEngine::UpdateStats() { } } -void StreamingEngine::SubmitVideoFrame(AHardwareBuffer* buffer, int64_t timestampNs, int fenceFd) { +static int sVideoSubmitCount = 0; +void StreamingEngine::SubmitVideoFrame(AHardwareBuffer* buffer, int64_t timestampNs, int fenceFd, int bufferIndex) { if (!running.load()) { if (fenceFd >= 0) close(fenceFd); + // Release buffer immediately if not running + if (bufferReleasedCallback) bufferReleasedCallback(bufferIndex); return; } + if (++sVideoSubmitCount % 30 == 1) { + LOGI("SubmitVideoFrame: frame #%d idx=%d buffer=%p ts=%lld fence=%d", + sVideoSubmitCount, bufferIndex, buffer, (long long)timestampNs, fenceFd); + } + VideoFrame frame; frame.buffer = buffer; frame.timestampNs = timestampNs; frame.fenceFd = fenceFd; + frame.bufferIndex = bufferIndex; std::lock_guard lock(videoMutex); videoQueue.push_back(frame); @@ -666,6 +725,8 @@ void StreamingEngine::SubmitVideoFrame(AHardwareBuffer* buffer, int64_t timestam void StreamingEngine::SubmitAudioFrame(const uint8_t* pcmData, size_t pcmSize, int64_t timestampNs) { if (!running.load()) return; + hasExternalAudioSource.store(true); + AudioFrame frame; frame.pcmData.assign(pcmData, pcmData + pcmSize); frame.timestampNs = timestampNs; diff --git a/app/src/main/cpp/streaming_engine.h b/app/src/main/cpp/streaming_engine.h index 2ed6092..5cd9116 100644 --- a/app/src/main/cpp/streaming_engine.h +++ b/app/src/main/cpp/streaming_engine.h @@ -22,6 +22,7 @@ struct VideoFrame { AHardwareBuffer* buffer; int64_t timestampNs; int fenceFd; // -1 if no fence + int bufferIndex; // pool slot index for release callback }; struct AudioFrame { @@ -62,7 +63,7 @@ public: bool Start(); /** Submit a video frame from HardwareBuffer. Non-blocking. */ - void SubmitVideoFrame(AHardwareBuffer* buffer, int64_t timestampNs, int fenceFd); + void SubmitVideoFrame(AHardwareBuffer* buffer, int64_t timestampNs, int fenceFd, int bufferIndex); /** Submit audio PCM data. Non-blocking. */ void SubmitAudioFrame(const uint8_t* pcmData, size_t pcmSize, int64_t timestampNs); @@ -139,6 +140,12 @@ private: // Audio encoder AMediaCodec* audioEncoder = nullptr; + // Audio accumulation buffer — collects PCM and submits in 1024-sample AAC frames + std::vector audioPcmBuffer; + int64_t audioBufferSamplesWritten = 0; // total samples submitted to encoder + static constexpr int AAC_FRAME_SAMPLES = 1024; + void FlushAudioBuffer(); + // RTMP sinks (one per destination) std::vector sinks; @@ -195,6 +202,14 @@ private: // Standby frame timing int64_t lastComposeTimeNs = 0; + // Tracks when the last game video frame was received. + // Standby frames are only generated after a timeout with no game input. + int64_t lastGameVideoFrameNs = 0; + static constexpr int64_t STANDBY_TIMEOUT_NS = 500000000LL; // 500ms + + // Tracks whether external audio has arrived (set by SubmitAudioFrame) + std::atomic hasExternalAudioSource{false}; + // Callbacks StatsCallback statsCallback; ErrorCallback errorCallback; diff --git a/app/src/main/java/com/omixlab/lckcontrol/service/LckControlService.kt b/app/src/main/java/com/omixlab/lckcontrol/service/LckControlService.kt index 2bb7a4c..148de83 100644 --- a/app/src/main/java/com/omixlab/lckcontrol/service/LckControlService.kt +++ b/app/src/main/java/com/omixlab/lckcontrol/service/LckControlService.kt @@ -155,7 +155,19 @@ class LckControlService : Service() { override fun startStreamPlan(planId: String): Boolean = runBlocking { val plan = streamPlanRepository.getPlan(planId) ?: return@runBlocking false - if (plan.status == "LIVE") return@runBlocking true + if (plan.status == "LIVE") { + // Plan already LIVE — ensure streaming engine is running for APP_STREAMING + if (plan.executionMode == "APP_STREAMING" && !streamingManager.isStreaming()) { + Log.d(TAG, "startStreamPlan: plan already LIVE but engine not running, starting engine") + streamingManager.startStreaming( + plan = plan, + config = StreamingConfig(), + width = 1920, + height = 1080, + ) + } + return@runBlocking true + } if (plan.status != "READY") return@runBlocking false try { streamPlanRepository.startPlan(planId) @@ -278,6 +290,11 @@ class LckControlService : Service() { ) } } + + // Forward buffer release events to AIDL callbacks + streamingManager.onBufferReleased = { bufferIndex -> + streamingServiceImpl?.broadcastBufferReleased(bufferIndex) + } } override fun onBind(intent: Intent?): IBinder? { diff --git a/app/src/main/java/com/omixlab/lckcontrol/streaming/NativeStreamingEngine.kt b/app/src/main/java/com/omixlab/lckcontrol/streaming/NativeStreamingEngine.kt index bcc0c1f..cec7b4f 100644 --- a/app/src/main/java/com/omixlab/lckcontrol/streaming/NativeStreamingEngine.kt +++ b/app/src/main/java/com/omixlab/lckcontrol/streaming/NativeStreamingEngine.kt @@ -51,9 +51,9 @@ class NativeStreamingEngine { return nativeStart(nativePtr) } - fun submitVideoFrame(hardwareBuffer: HardwareBuffer, timestampNs: Long, fenceFd: Int) { + fun submitVideoFrame(hardwareBuffer: HardwareBuffer, timestampNs: Long, fenceFd: Int, bufferIndex: Int) { if (nativePtr == 0L) return - nativeSubmitVideoFrame(nativePtr, hardwareBuffer, timestampNs, fenceFd) + nativeSubmitVideoFrame(nativePtr, hardwareBuffer, timestampNs, fenceFd, bufferIndex) } fun submitAudioFrame(pcmData: ByteArray, timestampNs: Long) { @@ -151,7 +151,7 @@ class NativeStreamingEngine { private external fun nativeAddDestination(ptr: Long, rtmpUrl: String): Int private external fun nativeStart(ptr: Long): Boolean - private external fun nativeSubmitVideoFrame(ptr: Long, hardwareBuffer: HardwareBuffer, timestampNs: Long, fenceFd: Int) + private external fun nativeSubmitVideoFrame(ptr: Long, hardwareBuffer: HardwareBuffer, timestampNs: Long, fenceFd: Int, bufferIndex: Int) private external fun nativeSubmitAudioFrame(ptr: Long, pcmData: ByteArray, timestampNs: Long) private external fun nativeStop(ptr: Long) private external fun nativeDestroy(ptr: Long) diff --git a/app/src/main/java/com/omixlab/lckcontrol/streaming/StreamingManager.kt b/app/src/main/java/com/omixlab/lckcontrol/streaming/StreamingManager.kt index b3c6ba5..b2dc823 100644 --- a/app/src/main/java/com/omixlab/lckcontrol/streaming/StreamingManager.kt +++ b/app/src/main/java/com/omixlab/lckcontrol/streaming/StreamingManager.kt @@ -31,6 +31,8 @@ class StreamingManager @Inject constructor() { private var engine: NativeStreamingEngine? = null private var texturePoolBuffers: Array? = null + private var texturePoolWidth: Int = 0 + private var texturePoolHeight: Int = 0 private val _state = MutableStateFlow(StreamingState.IDLE) val state: StateFlow = _state.asStateFlow() @@ -61,14 +63,19 @@ class StreamingManager @Inject constructor() { return } + // Use texture pool dimensions if available, otherwise use caller-provided defaults + val actualWidth = if (texturePoolWidth > 0) texturePoolWidth else width + val actualHeight = if (texturePoolHeight > 0) texturePoolHeight else height + Log.d(TAG, "Starting streaming at ${actualWidth}x${actualHeight} (pool=${texturePoolWidth}x${texturePoolHeight}, requested=${width}x${height})") + _state.value = StreamingState.STARTING _error.value = null try { val eng = NativeStreamingEngine() eng.create( - width = width, - height = height, + width = actualWidth, + height = actualHeight, videoBitrate = config.videoBitrate, audioBitrate = config.audioBitrate, sampleRate = config.audioSampleRate, @@ -93,6 +100,10 @@ class StreamingManager @Inject constructor() { _state.value = StreamingState.ERROR } + eng.onBufferReleased = { index -> + onBufferReleased?.invoke(index) + } + if (eng.start()) { engine = eng _state.value = StreamingState.LIVE @@ -116,19 +127,43 @@ class StreamingManager @Inject constructor() { */ fun registerTexturePool(buffers: Array, width: Int, height: Int, format: Int) { texturePoolBuffers = buffers + texturePoolWidth = width + texturePoolHeight = height Log.d(TAG, "Texture pool registered: ${buffers.size} buffers, ${width}x${height}") } fun unregisterTexturePool() { texturePoolBuffers = null + texturePoolWidth = 0 + texturePoolHeight = 0 Log.d(TAG, "Texture pool unregistered") } + private var videoFrameCount = 0 + + /** Callback when a buffer is released after processing. */ + var onBufferReleased: ((Int) -> Unit)? = null + /** Forward a video frame from the game to the native engine. */ fun submitVideoFrame(bufferIndex: Int, timestampNs: Long, fenceFd: Int) { - val buffers = texturePoolBuffers ?: return - if (bufferIndex < 0 || bufferIndex >= buffers.size) return - engine?.submitVideoFrame(buffers[bufferIndex], timestampNs, fenceFd) + val buffers = texturePoolBuffers + if (buffers == null) { + if (videoFrameCount++ % 30 == 0) Log.w(TAG, "submitVideoFrame: no texture pool") + return + } + if (bufferIndex < 0 || bufferIndex >= buffers.size) { + if (videoFrameCount++ % 30 == 0) Log.w(TAG, "submitVideoFrame: index $bufferIndex out of range [0,${buffers.size})") + return + } + val eng = engine + if (eng == null) { + if (videoFrameCount++ % 30 == 0) Log.w(TAG, "submitVideoFrame: engine is null (state=${_state.value})") + return + } + eng.submitVideoFrame(buffers[bufferIndex], timestampNs, fenceFd, bufferIndex) + if (++videoFrameCount % 30 == 0) { + Log.d(TAG, "submitVideoFrame: forwarded frame #$videoFrameCount idx=$bufferIndex") + } } /** Forward audio PCM from the game to the native engine. */ @@ -138,6 +173,7 @@ class StreamingManager @Inject constructor() { /** Stop streaming and release all resources. */ fun stopStreaming() { + Log.w(TAG, "stopStreaming() called from state=${_state.value}", Exception("Caller trace")) if (_state.value != StreamingState.LIVE && _state.value != StreamingState.ERROR) { return }