Fix streaming pipeline: timestamps, buffer release, resolution, orientation

- Fix encoder PTS: use wall-clock relative timestamps to prevent backward
  jumps when transitioning from standby to game frames (MediaCodec drops)
- Suppress standby frames while game is active (500ms timeout) to prevent
  flickering between game video and color bars
- Remove standby color bar pattern, use plain dark background
- Fix vertical flip and BGR→RGB swizzle in composition base pass shader
- Pass buffer index through native pipeline for pool slot release callback
- Start engine for already-LIVE plans when APP_STREAMING mode is active
- Use texture pool dimensions for encoder resolution instead of hardcoded 1920x1080
This commit is contained in:
2026-03-01 14:33:57 +01:00
parent c632e22033
commit ef221ca132
7 changed files with 181 additions and 50 deletions

View File

@@ -22,6 +22,8 @@ void main() {
)";
// Base pass: renders game frame (OES texture) full-screen to FBO
// - Flips V coordinate: Vulkan render targets have origin at top-left, GL at bottom-left
// - Swizzles BGR→RGB: Vulkan RGBA8 maps to BGRA in some AHardwareBuffer formats
static const char* BASE_FRAGMENT_SHADER = R"(#version 300 es
#extension GL_OES_EGL_image_external_essl3 : require
precision mediump float;
@@ -29,7 +31,9 @@ in vec2 vTexCoord;
out vec4 fragColor;
uniform samplerExternalOES uTexture;
void main() {
fragColor = texture(uTexture, vTexCoord);
vec2 flippedCoord = vec2(vTexCoord.x, 1.0 - vTexCoord.y);
vec4 color = texture(uTexture, flippedCoord);
fragColor = vec4(color.b, color.g, color.r, color.a);
}
)";
@@ -226,11 +230,9 @@ void CompositionPipeline::Compose(GLuint srcOesTexture) {
glClearColor(0.05f, 0.05f, 0.05f, 1.0f);
glClear(GL_COLOR_BUFFER_BIT);
// Base pass: render game frame, or standby pattern if no game input
// Base pass: render game frame (or leave cleared dark background if no game input)
if (srcOesTexture != 0) {
RenderBasePass(srcOesTexture);
} else {
RenderStandbyPattern();
}
// Overlay pass: render layers sorted by zOrder

View File

@@ -105,7 +105,7 @@ Java_com_omixlab_lckcontrol_streaming_NativeStreamingEngine_nativeStart(
JNIEXPORT void JNICALL
Java_com_omixlab_lckcontrol_streaming_NativeStreamingEngine_nativeSubmitVideoFrame(
JNIEnv* env, jobject thiz, jlong ptr,
jobject hardwareBuffer, jlong timestampNs, jint fenceFd) {
jobject hardwareBuffer, jlong timestampNs, jint fenceFd, jint bufferIndex) {
auto* engine = reinterpret_cast<StreamingEngine*>(ptr);
if (!engine) return;
@@ -115,7 +115,7 @@ Java_com_omixlab_lckcontrol_streaming_NativeStreamingEngine_nativeSubmitVideoFra
return;
}
engine->SubmitVideoFrame(buffer, timestampNs, fenceFd);
engine->SubmitVideoFrame(buffer, timestampNs, fenceFd, bufferIndex);
}
JNIEXPORT void JNICALL

View File

@@ -277,6 +277,10 @@ bool StreamingEngine::Start() {
firstVideoFrame = true;
startTimestampNs = 0;
lastComposeTimeNs = 0;
lastGameVideoFrameNs = 0;
hasExternalAudioSource.store(false);
audioPcmBuffer.clear();
audioBufferSamplesWritten = 0;
statsVideoBytes = 0;
statsAudioBytes = 0;
statsFrameCount = 0;
@@ -361,18 +365,28 @@ void StreamingEngine::EncoderThreadFunc() {
{
std::lock_guard<std::mutex> lock(videoMutex);
hadVideoFrames = !videoQueue.empty();
if (hadVideoFrames && statsFrameCount % 30 == 0) {
LOGI("Processing %zu game video frames", videoQueue.size());
}
for (auto& frame : videoQueue) {
ProcessVideoFrame(frame);
// Release buffer back to the game's pool
if (bufferReleasedCallback) {
bufferReleasedCallback(frame.bufferIndex);
}
}
videoQueue.clear();
}
// Generate standby frames when no game input arrives
// Generate standby frames only when no game input has arrived for STANDBY_TIMEOUT_NS.
// This prevents standby frames from being interleaved with game frames.
if (!hadVideoFrames && compositionPipeline.IsInitialized()) {
auto now = std::chrono::steady_clock::now().time_since_epoch();
int64_t nowNs = std::chrono::duration_cast<std::chrono::nanoseconds>(now).count();
int64_t frameIntervalNs = 1000000000LL / framerate;
if (nowNs - lastComposeTimeNs >= frameIntervalNs) {
bool gameActive = lastGameVideoFrameNs > 0 &&
(nowNs - lastGameVideoFrameNs) < STANDBY_TIMEOUT_NS;
if (!gameActive && nowNs - lastComposeTimeNs >= frameIntervalNs) {
// Compose standby frame (dark background + overlays, no game texture)
compositionPipeline.Compose(0);
GLuint composedTex = compositionPipeline.GetComposedTexture();
@@ -395,22 +409,11 @@ void StreamingEngine::EncoderThreadFunc() {
eglContext.MakeEncoderCurrent();
}
// Generate silence audio to keep the audio track alive
if (audioEncoder) {
// 1 video frame at 30fps = 1/30s ≈ 1600 samples at 48kHz
int samplesPerFrame = sampleRate / framerate;
int bytesPerFrame = samplesPerFrame * channels * 2; // 16-bit PCM
AudioFrame silenceFrame;
silenceFrame.pcmData.resize(bytesPerFrame, 0);
silenceFrame.timestampNs = nowNs;
ProcessAudioFrame(silenceFrame);
}
lastComposeTimeNs = nowNs;
}
}
// Process audio frames
// Process audio frames from external sources (accumulate into PCM buffer)
{
std::lock_guard<std::mutex> lock(audioMutex);
for (auto& frame : audioQueue) {
@@ -419,12 +422,17 @@ void StreamingEngine::EncoderThreadFunc() {
audioQueue.clear();
}
// Drain encoders
// Drain encoders — always drain both regardless of pending input
DrainVideoEncoder();
if (audioEncoder) {
DrainAudioEncoder();
}
// Flush accumulated audio in AAC-aligned chunks
if (audioEncoder) {
FlushAudioBuffer();
}
// Update stats every second regardless of frame output
UpdateStats();
@@ -465,11 +473,21 @@ void StreamingEngine::EncoderThreadFunc() {
void StreamingEngine::ProcessVideoFrame(const VideoFrame& frame) {
if (!frame.buffer) return;
// Use wall-clock relative timestamps (matching standby frames) to ensure
// monotonically increasing PTS. The game's own timestamp is relative to
// AppStreamingTime which starts at 0, but standby frames may have already
// advanced the encoder's PTS — backward timestamps cause MediaCodec to drop frames.
auto now = std::chrono::steady_clock::now().time_since_epoch();
int64_t nowNs = std::chrono::duration_cast<std::chrono::nanoseconds>(now).count();
if (firstVideoFrame) {
startTimestampNs = frame.timestampNs;
startTimestampNs = nowNs;
firstVideoFrame = false;
}
int64_t presentationNs = nowNs - startTimestampNs;
lastGameVideoFrameNs = nowNs;
// Wait on GPU fence
eglContext.WaitFence(frame.fenceFd);
@@ -480,6 +498,13 @@ void StreamingEngine::ProcessVideoFrame(const VideoFrame& frame) {
return;
}
static int sProcessCount = 0;
if (++sProcessCount <= 3 || sProcessCount % 300 == 0) {
LOGI("ProcessVideoFrame: #%d tex=%u pts=%lldms buf=%p idx=%d",
sProcessCount, texture, (long long)(presentationNs / 1000000),
frame.buffer, frame.bufferIndex);
}
// Compose: game frame + overlay layers → FBO
compositionPipeline.Compose(texture);
GLuint composedTex = compositionPipeline.GetComposedTexture();
@@ -487,7 +512,7 @@ void StreamingEngine::ProcessVideoFrame(const VideoFrame& frame) {
// Blit composed texture → encoder surface
eglContext.MakeEncoderCurrent();
BlitComposedToSurface(composedTex, width, height);
eglContext.SetPresentationTime(frame.timestampNs);
eglContext.SetPresentationTime(presentationNs);
eglContext.SwapBuffers();
// Blit composed texture → preview surface (if active)
@@ -503,8 +528,7 @@ void StreamingEngine::ProcessVideoFrame(const VideoFrame& frame) {
glDeleteTextures(1, &texture);
// Track compose time so standby frames don't overlap
auto now = std::chrono::steady_clock::now().time_since_epoch();
lastComposeTimeNs = std::chrono::duration_cast<std::chrono::nanoseconds>(now).count();
lastComposeTimeNs = nowNs;
}
void StreamingEngine::BlitComposedToSurface(GLuint composedTex, int viewportW, int viewportH) {
@@ -523,22 +547,48 @@ void StreamingEngine::BlitComposedToSurface(GLuint composedTex, int viewportW, i
void StreamingEngine::ProcessAudioFrame(const AudioFrame& frame) {
if (!audioEncoder || frame.pcmData.empty()) return;
ssize_t inputIndex = AMediaCodec_dequeueInputBuffer(audioEncoder, 0);
if (inputIndex < 0) {
LOGW("No audio input buffer available");
return;
// Accumulate PCM data; FlushAudioBuffer will submit in AAC-aligned chunks
audioPcmBuffer.insert(audioPcmBuffer.end(), frame.pcmData.begin(), frame.pcmData.end());
}
void StreamingEngine::FlushAudioBuffer() {
if (!audioEncoder || audioPcmBuffer.empty()) return;
// Bytes per AAC frame: 1024 samples * channels * 2 bytes (16-bit PCM)
const size_t aacFrameBytes = AAC_FRAME_SAMPLES * channels * 2;
while (audioPcmBuffer.size() >= aacFrameBytes) {
ssize_t inputIndex = AMediaCodec_dequeueInputBuffer(audioEncoder, 5000);
if (inputIndex < 0) {
// No buffer available — leave data for next loop iteration
break;
}
size_t bufferSize;
uint8_t* inputBuffer = AMediaCodec_getInputBuffer(audioEncoder, inputIndex, &bufferSize);
if (!inputBuffer) break;
size_t copySize = std::min(aacFrameBytes, bufferSize);
memcpy(inputBuffer, audioPcmBuffer.data(), copySize);
// Timestamp based on total samples submitted (continuous, no jitter)
int64_t timestampUs = audioBufferSamplesWritten * 1000000LL / sampleRate;
AMediaCodec_queueInputBuffer(audioEncoder, inputIndex, 0, copySize,
timestampUs, 0);
audioBufferSamplesWritten += AAC_FRAME_SAMPLES;
// Remove consumed data from front
audioPcmBuffer.erase(audioPcmBuffer.begin(), audioPcmBuffer.begin() + copySize);
}
size_t bufferSize;
uint8_t* inputBuffer = AMediaCodec_getInputBuffer(audioEncoder, inputIndex, &bufferSize);
if (!inputBuffer) return;
size_t copySize = std::min(frame.pcmData.size(), bufferSize);
memcpy(inputBuffer, frame.pcmData.data(), copySize);
int64_t relativeTs = frame.timestampNs - startTimestampNs;
AMediaCodec_queueInputBuffer(audioEncoder, inputIndex, 0, copySize,
relativeTs / 1000, 0);
// Prevent unbounded accumulation if encoder is stuck
const size_t maxBufferBytes = aacFrameBytes * 16; // ~340ms of audio
if (audioPcmBuffer.size() > maxBufferBytes) {
size_t excess = audioPcmBuffer.size() - maxBufferBytes;
audioPcmBuffer.erase(audioPcmBuffer.begin(), audioPcmBuffer.begin() + excess);
LOGW("Audio buffer overflow, dropped %zu bytes", excess);
}
}
void StreamingEngine::DrainVideoEncoder() {
@@ -648,16 +698,25 @@ void StreamingEngine::UpdateStats() {
}
}
void StreamingEngine::SubmitVideoFrame(AHardwareBuffer* buffer, int64_t timestampNs, int fenceFd) {
static int sVideoSubmitCount = 0;
void StreamingEngine::SubmitVideoFrame(AHardwareBuffer* buffer, int64_t timestampNs, int fenceFd, int bufferIndex) {
if (!running.load()) {
if (fenceFd >= 0) close(fenceFd);
// Release buffer immediately if not running
if (bufferReleasedCallback) bufferReleasedCallback(bufferIndex);
return;
}
if (++sVideoSubmitCount % 30 == 1) {
LOGI("SubmitVideoFrame: frame #%d idx=%d buffer=%p ts=%lld fence=%d",
sVideoSubmitCount, bufferIndex, buffer, (long long)timestampNs, fenceFd);
}
VideoFrame frame;
frame.buffer = buffer;
frame.timestampNs = timestampNs;
frame.fenceFd = fenceFd;
frame.bufferIndex = bufferIndex;
std::lock_guard<std::mutex> lock(videoMutex);
videoQueue.push_back(frame);
@@ -666,6 +725,8 @@ void StreamingEngine::SubmitVideoFrame(AHardwareBuffer* buffer, int64_t timestam
void StreamingEngine::SubmitAudioFrame(const uint8_t* pcmData, size_t pcmSize, int64_t timestampNs) {
if (!running.load()) return;
hasExternalAudioSource.store(true);
AudioFrame frame;
frame.pcmData.assign(pcmData, pcmData + pcmSize);
frame.timestampNs = timestampNs;

View File

@@ -22,6 +22,7 @@ struct VideoFrame {
AHardwareBuffer* buffer;
int64_t timestampNs;
int fenceFd; // -1 if no fence
int bufferIndex; // pool slot index for release callback
};
struct AudioFrame {
@@ -62,7 +63,7 @@ public:
bool Start();
/** Submit a video frame from HardwareBuffer. Non-blocking. */
void SubmitVideoFrame(AHardwareBuffer* buffer, int64_t timestampNs, int fenceFd);
void SubmitVideoFrame(AHardwareBuffer* buffer, int64_t timestampNs, int fenceFd, int bufferIndex);
/** Submit audio PCM data. Non-blocking. */
void SubmitAudioFrame(const uint8_t* pcmData, size_t pcmSize, int64_t timestampNs);
@@ -139,6 +140,12 @@ private:
// Audio encoder
AMediaCodec* audioEncoder = nullptr;
// Audio accumulation buffer — collects PCM and submits in 1024-sample AAC frames
std::vector<uint8_t> audioPcmBuffer;
int64_t audioBufferSamplesWritten = 0; // total samples submitted to encoder
static constexpr int AAC_FRAME_SAMPLES = 1024;
void FlushAudioBuffer();
// RTMP sinks (one per destination)
std::vector<RtmpSink*> sinks;
@@ -195,6 +202,14 @@ private:
// Standby frame timing
int64_t lastComposeTimeNs = 0;
// Tracks when the last game video frame was received.
// Standby frames are only generated after a timeout with no game input.
int64_t lastGameVideoFrameNs = 0;
static constexpr int64_t STANDBY_TIMEOUT_NS = 500000000LL; // 500ms
// Tracks whether external audio has arrived (set by SubmitAudioFrame)
std::atomic<bool> hasExternalAudioSource{false};
// Callbacks
StatsCallback statsCallback;
ErrorCallback errorCallback;

View File

@@ -155,7 +155,19 @@ class LckControlService : Service() {
override fun startStreamPlan(planId: String): Boolean = runBlocking {
val plan = streamPlanRepository.getPlan(planId) ?: return@runBlocking false
if (plan.status == "LIVE") return@runBlocking true
if (plan.status == "LIVE") {
// Plan already LIVE — ensure streaming engine is running for APP_STREAMING
if (plan.executionMode == "APP_STREAMING" && !streamingManager.isStreaming()) {
Log.d(TAG, "startStreamPlan: plan already LIVE but engine not running, starting engine")
streamingManager.startStreaming(
plan = plan,
config = StreamingConfig(),
width = 1920,
height = 1080,
)
}
return@runBlocking true
}
if (plan.status != "READY") return@runBlocking false
try {
streamPlanRepository.startPlan(planId)
@@ -278,6 +290,11 @@ class LckControlService : Service() {
)
}
}
// Forward buffer release events to AIDL callbacks
streamingManager.onBufferReleased = { bufferIndex ->
streamingServiceImpl?.broadcastBufferReleased(bufferIndex)
}
}
override fun onBind(intent: Intent?): IBinder? {

View File

@@ -51,9 +51,9 @@ class NativeStreamingEngine {
return nativeStart(nativePtr)
}
fun submitVideoFrame(hardwareBuffer: HardwareBuffer, timestampNs: Long, fenceFd: Int) {
fun submitVideoFrame(hardwareBuffer: HardwareBuffer, timestampNs: Long, fenceFd: Int, bufferIndex: Int) {
if (nativePtr == 0L) return
nativeSubmitVideoFrame(nativePtr, hardwareBuffer, timestampNs, fenceFd)
nativeSubmitVideoFrame(nativePtr, hardwareBuffer, timestampNs, fenceFd, bufferIndex)
}
fun submitAudioFrame(pcmData: ByteArray, timestampNs: Long) {
@@ -151,7 +151,7 @@ class NativeStreamingEngine {
private external fun nativeAddDestination(ptr: Long, rtmpUrl: String): Int
private external fun nativeStart(ptr: Long): Boolean
private external fun nativeSubmitVideoFrame(ptr: Long, hardwareBuffer: HardwareBuffer, timestampNs: Long, fenceFd: Int)
private external fun nativeSubmitVideoFrame(ptr: Long, hardwareBuffer: HardwareBuffer, timestampNs: Long, fenceFd: Int, bufferIndex: Int)
private external fun nativeSubmitAudioFrame(ptr: Long, pcmData: ByteArray, timestampNs: Long)
private external fun nativeStop(ptr: Long)
private external fun nativeDestroy(ptr: Long)

View File

@@ -31,6 +31,8 @@ class StreamingManager @Inject constructor() {
private var engine: NativeStreamingEngine? = null
private var texturePoolBuffers: Array<HardwareBuffer>? = null
private var texturePoolWidth: Int = 0
private var texturePoolHeight: Int = 0
private val _state = MutableStateFlow(StreamingState.IDLE)
val state: StateFlow<StreamingState> = _state.asStateFlow()
@@ -61,14 +63,19 @@ class StreamingManager @Inject constructor() {
return
}
// Use texture pool dimensions if available, otherwise use caller-provided defaults
val actualWidth = if (texturePoolWidth > 0) texturePoolWidth else width
val actualHeight = if (texturePoolHeight > 0) texturePoolHeight else height
Log.d(TAG, "Starting streaming at ${actualWidth}x${actualHeight} (pool=${texturePoolWidth}x${texturePoolHeight}, requested=${width}x${height})")
_state.value = StreamingState.STARTING
_error.value = null
try {
val eng = NativeStreamingEngine()
eng.create(
width = width,
height = height,
width = actualWidth,
height = actualHeight,
videoBitrate = config.videoBitrate,
audioBitrate = config.audioBitrate,
sampleRate = config.audioSampleRate,
@@ -93,6 +100,10 @@ class StreamingManager @Inject constructor() {
_state.value = StreamingState.ERROR
}
eng.onBufferReleased = { index ->
onBufferReleased?.invoke(index)
}
if (eng.start()) {
engine = eng
_state.value = StreamingState.LIVE
@@ -116,19 +127,43 @@ class StreamingManager @Inject constructor() {
*/
fun registerTexturePool(buffers: Array<HardwareBuffer>, width: Int, height: Int, format: Int) {
texturePoolBuffers = buffers
texturePoolWidth = width
texturePoolHeight = height
Log.d(TAG, "Texture pool registered: ${buffers.size} buffers, ${width}x${height}")
}
fun unregisterTexturePool() {
texturePoolBuffers = null
texturePoolWidth = 0
texturePoolHeight = 0
Log.d(TAG, "Texture pool unregistered")
}
private var videoFrameCount = 0
/** Callback when a buffer is released after processing. */
var onBufferReleased: ((Int) -> Unit)? = null
/** Forward a video frame from the game to the native engine. */
fun submitVideoFrame(bufferIndex: Int, timestampNs: Long, fenceFd: Int) {
val buffers = texturePoolBuffers ?: return
if (bufferIndex < 0 || bufferIndex >= buffers.size) return
engine?.submitVideoFrame(buffers[bufferIndex], timestampNs, fenceFd)
val buffers = texturePoolBuffers
if (buffers == null) {
if (videoFrameCount++ % 30 == 0) Log.w(TAG, "submitVideoFrame: no texture pool")
return
}
if (bufferIndex < 0 || bufferIndex >= buffers.size) {
if (videoFrameCount++ % 30 == 0) Log.w(TAG, "submitVideoFrame: index $bufferIndex out of range [0,${buffers.size})")
return
}
val eng = engine
if (eng == null) {
if (videoFrameCount++ % 30 == 0) Log.w(TAG, "submitVideoFrame: engine is null (state=${_state.value})")
return
}
eng.submitVideoFrame(buffers[bufferIndex], timestampNs, fenceFd, bufferIndex)
if (++videoFrameCount % 30 == 0) {
Log.d(TAG, "submitVideoFrame: forwarded frame #$videoFrameCount idx=$bufferIndex")
}
}
/** Forward audio PCM from the game to the native engine. */
@@ -138,6 +173,7 @@ class StreamingManager @Inject constructor() {
/** Stop streaming and release all resources. */
fun stopStreaming() {
Log.w(TAG, "stopStreaming() called from state=${_state.value}", Exception("Caller trace"))
if (_state.value != StreamingState.LIVE && _state.value != StreamingState.ERROR) {
return
}