feat:v1版本实现(改为异步解码)

This commit is contained in:
liyi 2025-04-24 14:23:26 +08:00
parent a9f96b8139
commit 40c2865ad6
4 changed files with 552 additions and 75 deletions

View File

@ -153,7 +153,8 @@ class VideoDecodePlugin : FlutterPlugin, MethodCallHandler {
height = height,
codecType = codecType,
frameRate = frameRate,
isDebug = isDebug
isDebug = isDebug,
isAsync = call.argument<Boolean>("isAsync") ?: true
)
// 创建解码器

View File

@ -10,11 +10,16 @@ import android.util.Log
import android.view.Surface
import io.flutter.view.TextureRegistry
import java.nio.ByteBuffer
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
/**
* 简化版视频解码器
* 视频解码器
* 负责解码H264/H265视频数据并将其渲染到Surface上
* 支持同步和异步两种工作模式
*/
class VideoDecoder(
private val context: Context,
@ -34,6 +39,9 @@ class VideoDecoder(
private const val NAL_UNIT_TYPE_PPS = 8
private const val NAL_UNIT_TYPE_IDR = 5
private const val NAL_UNIT_TYPE_NON_IDR = 1 // P帧
// 队列相关常量
private const val INPUT_BUFFER_QUEUE_CAPACITY = 20
}
// 回调接口
@ -96,6 +104,150 @@ class VideoDecoder(
// 是否是调试模式
private val isDebugMode: Boolean = config.isDebug
// 是否是异步模式
private val isAsyncMode: Boolean = config.isAsync
// 异步模式下的帧缓冲队列
private val inputFrameQueue = LinkedBlockingQueue<FrameData>(INPUT_BUFFER_QUEUE_CAPACITY)
// 异步模式下的状态锁
private val codecLock = ReentrantLock()
// 异步模式下的状态监控
private var frameQueueHighWatermark = 0
private var inputProcessCount = 0
private var outputProcessCount = 0
private var decodingJitterMs = 0L // 解码抖动时间ms
private var keyFrameInterval = 0L // 关键帧间隔时间ms
private var lastDecodingErrorTime = 0L // 最后一次解码错误的时间
private var decodingErrorCount = 0 // 解码错误计数
// 异步模式下的帧数据类
private data class FrameData(val data: ByteArray, val isIFrame: Boolean) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
other as FrameData
if (!data.contentEquals(other.data)) return false
if (isIFrame != other.isIFrame) return false
return true
}
override fun hashCode(): Int {
var result = data.contentHashCode()
result = 31 * result + isIFrame.hashCode()
return result
}
}
// 异步回调类
private inner class MediaCodecCallback : MediaCodec.Callback() {
override fun onInputBufferAvailable(codec: MediaCodec, index: Int) {
if (!isRunning.get()) return
try {
inputProcessCount++
// 记录当前队列大小最高水位
val currentQueueSize = inputFrameQueue.size
if (currentQueueSize > frameQueueHighWatermark) {
frameQueueHighWatermark = currentQueueSize
}
// 尝试从队列获取帧数据(非阻塞)
val frameData = inputFrameQueue.poll()
if (frameData != null) {
val inputBuffer = codec.getInputBuffer(index)
if (inputBuffer != null) {
// 处理帧数据
processInputFrame(codec, index, inputBuffer, frameData.data, frameData.isIFrame)
} else {
logError("获取输入缓冲区失败")
// 将帧数据放回队列前端
inputFrameQueue.offerFirst(frameData)
}
} else {
// 没有可用的帧,释放输入缓冲区
codec.queueInputBuffer(index, 0, 0, 0, 0)
}
} catch (e: Exception) {
logError("处理输入缓冲区时出错", e)
lastDecodingErrorTime = System.currentTimeMillis()
decodingErrorCount++
}
}
override fun onOutputBufferAvailable(codec: MediaCodec, index: Int, info: MediaCodec.BufferInfo) {
if (!isRunning.get()) return
try {
outputProcessCount++
val render = info.size > 0 && (info.flags and MediaCodec.BUFFER_FLAG_CODEC_CONFIG) == 0
if (render) {
// 计算关键帧间隔和解码抖动
val currentTime = System.currentTimeMillis()
// 如果是关键帧,计算间隔
if ((info.flags and MediaCodec.BUFFER_FLAG_KEY_FRAME) != 0) {
if (lastIFrameTimeMs > 0) {
keyFrameInterval = currentTime - lastIFrameTimeMs
}
lastIFrameTimeMs = currentTime
}
// 计算解码抖动(与预期帧间隔的差异)
if (config.frameRate != null && config.frameRate!! > 0) {
val expectedFrameInterval = 1000 / config.frameRate!!.toFloat()
if (lastOutputTimeMs > 0) {
val actualInterval = currentTime - lastOutputTimeMs
decodingJitterMs = Math.abs(actualInterval - expectedFrameInterval).toLong()
}
}
// 释放并渲染
codec.releaseOutputBuffer(index, true)
renderedFrameCount++
lastOutputTimeMs = currentTime
logDebug("成功渲染帧 #$renderedFrameCount")
// 通知Flutter刷新纹理
mainHandler.post {
notifyFrameAvailable()
}
} else {
// 释放但不渲染
codec.releaseOutputBuffer(index, false)
}
// 检查解码错误
if ((info.flags and MediaCodec.BUFFER_FLAG_END_OF_STREAM) != 0) {
logWarning("收到结束流标志")
// 忽略错误,继续解码
}
} catch (e: Exception) {
logError("处理输出缓冲区时出错", e)
lastDecodingErrorTime = System.currentTimeMillis()
decodingErrorCount++
}
}
override fun onError(codec: MediaCodec, e: MediaCodec.CodecException) {
logError("MediaCodec错误", e)
lastDecodingErrorTime = System.currentTimeMillis()
decodingErrorCount++
callback?.onError("MediaCodec错误: ${e.message}")
}
override fun onOutputFormatChanged(codec: MediaCodec, format: MediaFormat) {
logDebug("输出格式变更: $format")
}
}
/**
* 输出调试日志 - 仅在调试模式下输出
@ -131,12 +283,24 @@ class VideoDecoder(
try {
// 设置SurfaceTexture的默认缓冲区大小
surfaceTexture.setDefaultBufferSize(config.width, config.height)
logDebug("初始化解码器: ${config.width}x${config.height}, 编码: ${config.codecType}")
logDebug("初始化解码器: ${config.width}x${config.height}, 编码: ${config.codecType}, 模式: ${if (isAsyncMode) "异步" else "同步"}")
// 初始化解码器
if (setupDecoder()) {
isRunning.set(true)
// 如果是异步模式,预热解码器
if (isAsyncMode) {
// 在后台线程中预热解码器
Thread {
try {
warmUpDecoder()
} catch (e: Exception) {
logError("预热解码器出错", e)
}
}.start()
}
// 通知初始帧可用让Flutter创建Texture View
logDebug("[预通知] 解码器初始化成功,发送初始帧通知(无实际视频数据)")
mainHandler.post {
@ -231,9 +395,24 @@ class VideoDecoder(
// 创建解码器实例
val decoder = MediaCodec.createDecoderByType(mime)
// 配置解码器
decoder.configure(format, surface, null, 0)
decoder.start()
if (isAsyncMode) {
// 异步模式
logDebug("使用异步模式配置解码器")
// 设置异步回调
decoder.setCallback(MediaCodecCallback())
// 配置解码器
decoder.configure(format, surface, null, 0)
decoder.start()
} else {
// 同步模式
logDebug("使用同步模式配置解码器")
// 配置解码器
decoder.configure(format, surface, null, 0)
decoder.start()
}
// 保存解码器实例
mediaCodec = decoder
@ -287,7 +466,7 @@ class VideoDecoder(
}
/**
* 解码视频帧 - 简化但严格
* 解码视频帧 - 支持同步和异步两种模式
*/
fun decodeFrame(frameData: ByteArray, isIFrame: Boolean): Boolean {
if (!isRunning.get() || !isDecoderConfigured.get() || frameData.isEmpty()) {
@ -298,75 +477,132 @@ class VideoDecoder(
val codec = mediaCodec ?: return false
try {
// 检查NAL类型
val nalType = checkNalType(frameData)
// 实际使用的NAL类型
val effectiveType = if (nalType != -1) nalType else if (isIFrame) NAL_UNIT_TYPE_IDR else NAL_UNIT_TYPE_NON_IDR
// 如果是SPS或PPS且在缓存中已有相同内容跳过
if (effectiveType == NAL_UNIT_TYPE_SPS) {
val hash = frameData.hashCode()
if (lastSPSHash == hash) return true
lastSPSHash = hash
hasSentSPS.set(true)
} else if (effectiveType == NAL_UNIT_TYPE_PPS) {
val hash = frameData.hashCode()
if (lastPPSHash == hash) return true
lastPPSHash = hash
hasSentPPS.set(true)
} else if (effectiveType == NAL_UNIT_TYPE_IDR) {
hasSentIDR.set(true)
val currentTime = System.currentTimeMillis()
lastDetectedIFrameTime = currentTime
lastIFrameTimeMs = currentTime
consecutivePFrameCount = 0
if (isAsyncMode) {
// 检查NAL类型以便优先处理关键帧
val nalType = checkNalType(frameData)
// 判断是否为关键参数集或I帧
val isKeyFrame = nalType == NAL_UNIT_TYPE_SPS ||
nalType == NAL_UNIT_TYPE_PPS ||
nalType == NAL_UNIT_TYPE_IDR ||
isIFrame
// 异步模式 - 将帧数据添加到队列
if (isKeyFrame) {
// 对于关键帧,使用较长的超时时间,确保能加入队列
if (!inputFrameQueue.offer(FrameData(frameData, isIFrame), 500, TimeUnit.MILLISECONDS)) {
// 队列已满,尝试清理后再添加
logWarning("队列已满且尝试添加关键帧,尝试清理非关键帧后再添加")
try {
// 尝试移除一个非关键帧
var removed = false
val iterator = inputFrameQueue.iterator()
while (iterator.hasNext()) {
val item = iterator.next()
if (!item.isIFrame) {
iterator.remove()
droppedFrameCount++
removed = true
break
}
}
if (removed) {
// 再次尝试添加
return inputFrameQueue.offer(FrameData(frameData, isIFrame), 100, TimeUnit.MILLISECONDS)
} else {
logWarning("无法移除非关键帧,丢弃当前帧")
droppedFrameCount++
return false
}
} catch (e: Exception) {
logError("清理队列失败", e)
droppedFrameCount++
return false
}
}
return true
} else {
// 对于非关键帧,使用较短的超时时间
if (!inputFrameQueue.offer(FrameData(frameData, isIFrame), 50, TimeUnit.MILLISECONDS)) {
// 队列已满,丢弃此帧
logWarning("输入队列已满,丢弃非关键帧")
droppedFrameCount++
return false
}
return true
}
} else {
// P帧处理
if (!hasSentIDR.get() && renderedFrameCount == 0) {
logWarning("丢弃P帧因为尚未收到I帧")
droppedFrameCount++
// 同步模式 - 直接处理帧
// 检查NAL类型
val nalType = checkNalType(frameData)
// 实际使用的NAL类型
val effectiveType = if (nalType != -1) nalType else if (isIFrame) NAL_UNIT_TYPE_IDR else NAL_UNIT_TYPE_NON_IDR
// 如果是SPS或PPS且在缓存中已有相同内容跳过
if (effectiveType == NAL_UNIT_TYPE_SPS) {
val hash = frameData.hashCode()
if (lastSPSHash == hash) return true
lastSPSHash = hash
hasSentSPS.set(true)
} else if (effectiveType == NAL_UNIT_TYPE_PPS) {
val hash = frameData.hashCode()
if (lastPPSHash == hash) return true
lastPPSHash = hash
hasSentPPS.set(true)
} else if (effectiveType == NAL_UNIT_TYPE_IDR) {
hasSentIDR.set(true)
val currentTime = System.currentTimeMillis()
lastDetectedIFrameTime = currentTime
lastIFrameTimeMs = currentTime
consecutivePFrameCount = 0
} else {
// P帧处理
if (!hasSentIDR.get() && renderedFrameCount == 0) {
logWarning("丢弃P帧因为尚未收到I帧")
droppedFrameCount++
return false
}
consecutivePFrameCount++
}
// 记录帧信息
frameCount++
// 解码帧
val inputBufferIndex = codec.dequeueInputBuffer(TIMEOUT_US)
if (inputBufferIndex < 0) {
logWarning("无法获取输入缓冲区,可能需要等待")
return false
}
consecutivePFrameCount++
// 获取输入缓冲区
val inputBuffer = codec.getInputBuffer(inputBufferIndex)
if (inputBuffer == null) {
logError("获取输入缓冲区失败")
return false
}
// 填充数据
inputBuffer.clear()
inputBuffer.put(frameData)
// 提交缓冲区
val flags = if (isIFrame) MediaCodec.BUFFER_FLAG_KEY_FRAME else 0
codec.queueInputBuffer(
inputBufferIndex,
0,
frameData.size,
System.nanoTime() / 1000L,
flags
)
// 处理输出
processOutputBuffers()
return true
}
// 记录帧信息
frameCount++
// 解码帧
val inputBufferIndex = codec.dequeueInputBuffer(TIMEOUT_US)
if (inputBufferIndex < 0) {
logWarning("无法获取输入缓冲区,可能需要等待")
return false
}
// 获取输入缓冲区
val inputBuffer = codec.getInputBuffer(inputBufferIndex)
if (inputBuffer == null) {
logError("获取输入缓冲区失败")
return false
}
// 填充数据
inputBuffer.clear()
inputBuffer.put(frameData)
// 提交缓冲区
val flags = if (isIFrame) MediaCodec.BUFFER_FLAG_KEY_FRAME else 0
codec.queueInputBuffer(
inputBufferIndex,
0,
frameData.size,
System.nanoTime() / 1000L,
flags
)
// 处理输出
processOutputBuffers()
return true
} catch (e: Exception) {
logError("解码帧失败", e)
return false
@ -449,10 +685,32 @@ class VideoDecoder(
isDecoderConfigured.set(false)
try {
// 清空帧队列
inputFrameQueue.clear()
// 释放MediaCodec
mediaCodec?.let { codec ->
try {
codec.stop()
codec.release()
// 尝试获取锁(如果在异步模式下有必要)
if (isAsyncMode) {
val lockAcquired = codecLock.tryLock(500, TimeUnit.MILLISECONDS)
if (lockAcquired) {
try {
codec.stop()
codec.release()
} finally {
codecLock.unlock()
}
} else {
// 无法获取锁,强制释放
codec.stop()
codec.release()
}
} else {
// 同步模式直接释放
codec.stop()
codec.release()
}
} catch (e: Exception) {
logError("释放MediaCodec失败", e)
}
@ -483,6 +741,68 @@ class VideoDecoder(
* 获取解码统计信息
*/
fun getStatistics(): Map<String, Any> {
// 使用线程安全方式读取统计信息
return try {
// 对于异步模式,获取锁的读取
if (isAsyncMode) {
val lockAcquired = codecLock.tryLock(100, TimeUnit.MILLISECONDS)
if (lockAcquired) {
try {
createStatisticsMap()
} finally {
codecLock.unlock()
}
} else {
// 无法获取锁,返回基本信息
mapOf(
"totalFrames" to frameCount,
"renderedFrames" to renderedFrameCount,
"droppedFrames" to droppedFrameCount,
"fps" to currentFps
)
}
} else {
// 同步模式直接返回
createStatisticsMap()
}
} catch (e: Exception) {
logError("获取统计信息失败", e)
mapOf("error" to "获取统计信息失败: ${e.message}")
}
}
/**
* 检查解码器健康状况 - 用于监控和诊断
*/
private fun checkDecoderHealth(): Boolean {
val currentTime = System.currentTimeMillis()
// 检查解码器是否长时间没有输出帧
if (renderedFrameCount > 0 &&
lastOutputTimeMs > 0 &&
currentTime - lastOutputTimeMs > 5000) {
logWarning("解码器可能卡住了,已有${(currentTime - lastOutputTimeMs) / 1000}秒没有输出帧")
return false
}
// 检查错误频率
if (decodingErrorCount > 10) {
logWarning("解码器错误次数过多: $decodingErrorCount")
return false
}
// 检查队列使用状况
if (isAsyncMode && inputFrameQueue.size >= INPUT_BUFFER_QUEUE_CAPACITY * 0.9) {
logWarning("输入队列即将满: ${inputFrameQueue.size}/$INPUT_BUFFER_QUEUE_CAPACITY")
}
return true
}
/**
* 创建统计信息映射
*/
private fun createStatisticsMap(): Map<String, Any> {
return mapOf(
"totalFrames" to frameCount,
"renderedFrames" to renderedFrameCount,
@ -494,7 +814,156 @@ class VideoDecoder(
"consecutivePFrames" to consecutivePFrameCount,
"targetWidth" to config.width,
"targetHeight" to config.height,
"frameRate" to (config.frameRate ?: 0)
"frameRate" to (config.frameRate ?: 0),
"isAsync" to isAsyncMode,
"queueSize" to inputFrameQueue.size,
"queueHighWatermark" to frameQueueHighWatermark,
"inputProcessCount" to inputProcessCount,
"outputProcessCount" to outputProcessCount,
"decodingJitterMs" to decodingJitterMs,
"keyFrameInterval" to keyFrameInterval,
"decodingErrorCount" to decodingErrorCount,
"lastDecodingErrorTime" to if (lastDecodingErrorTime > 0) lastDecodingErrorTime else 0,
"decoderHealthy" to checkDecoderHealth()
)
}
/**
* 处理输入帧 - 异步模式下的辅助方法
*/
private fun processInputFrame(codec: MediaCodec, index: Int, inputBuffer: ByteBuffer, frameData: ByteArray, isIFrame: Boolean) {
try {
// 检查NAL类型
val nalType = checkNalType(frameData)
// 实际使用的NAL类型
val effectiveType = if (nalType != -1) nalType else if (isIFrame) NAL_UNIT_TYPE_IDR else NAL_UNIT_TYPE_NON_IDR
// 如果是SPS或PPS且在缓存中已有相同内容跳过
if (effectiveType == NAL_UNIT_TYPE_SPS) {
val hash = frameData.hashCode()
if (lastSPSHash == hash) {
codec.queueInputBuffer(index, 0, 0, 0, 0)
return
}
lastSPSHash = hash
hasSentSPS.set(true)
} else if (effectiveType == NAL_UNIT_TYPE_PPS) {
val hash = frameData.hashCode()
if (lastPPSHash == hash) {
codec.queueInputBuffer(index, 0, 0, 0, 0)
return
}
lastPPSHash = hash
hasSentPPS.set(true)
} else if (effectiveType == NAL_UNIT_TYPE_IDR) {
hasSentIDR.set(true)
val currentTime = System.currentTimeMillis()
lastDetectedIFrameTime = currentTime
lastIFrameTimeMs = currentTime
consecutivePFrameCount = 0
} else {
// P帧处理
if (!hasSentIDR.get() && renderedFrameCount == 0) {
logWarning("丢弃P帧因为尚未收到I帧")
droppedFrameCount++
codec.queueInputBuffer(index, 0, 0, 0, 0)
return
}
consecutivePFrameCount++
}
// 记录帧信息
frameCount++
// 填充数据
inputBuffer.clear()
inputBuffer.put(frameData)
// 提交缓冲区
val flags = if (isIFrame) MediaCodec.BUFFER_FLAG_KEY_FRAME else 0
codec.queueInputBuffer(
index,
0,
frameData.size,
System.nanoTime() / 1000L,
flags
)
} catch (e: Exception) {
logError("处理输入帧失败", e)
// 释放输入缓冲区但不填充数据
codec.queueInputBuffer(index, 0, 0, 0, 0)
}
}
/**
* 预热解码器 - 发送一些静帧以稳定解码器状态
*/
private fun warmUpDecoder() {
if (!isRunning.get() || !isDecoderConfigured.get()) {
return
}
try {
logDebug("开始预热解码器...")
// 创建简单的SPS和PPS帧
val simpleSps = ByteArray(10) { 0 }
simpleSps[0] = 0x00
simpleSps[1] = 0x00
simpleSps[2] = 0x00
simpleSps[3] = 0x01
simpleSps[4] = 0x67 // NAL类型 = 7 (SPS)
val simplePps = ByteArray(10) { 0 }
simplePps[0] = 0x00
simplePps[1] = 0x00
simplePps[2] = 0x00
simplePps[3] = 0x01
simplePps[4] = 0x68 // NAL类型 = 8 (PPS)
// 创建简单的I帧
val simpleIdrFrame = ByteArray(100) { 0 }
simpleIdrFrame[0] = 0x00
simpleIdrFrame[1] = 0x00
simpleIdrFrame[2] = 0x00
simpleIdrFrame[3] = 0x01
simpleIdrFrame[4] = 0x65 // NAL类型 = 5 (IDR)
// 发送几个预热帧
for (i in 0 until 3) {
decodeFrame(simpleSps, true)
Thread.sleep(20)
decodeFrame(simplePps, true)
Thread.sleep(20)
decodeFrame(simpleIdrFrame, true)
Thread.sleep(50)
}
logDebug("预热解码器完成")
} catch (e: Exception) {
logError("预热解码器失败", e)
}
}
}
/**
* LinkedBlockingQueue扩展方法将元素放到队列前端
*/
private fun <T> LinkedBlockingQueue<T>.offerFirst(element: T): Boolean {
// 创建一个新的临时队列
val tempQueue = LinkedBlockingQueue<T>()
// 先添加新元素
tempQueue.offer(element)
// 然后添加所有现有元素
this.drainTo(tempQueue)
// 清空当前队列
this.clear()
// 将临时队列中的所有元素添加回当前队列
return this.addAll(tempQueue)
}

View File

@ -8,11 +8,13 @@ package top.skychip.video_decode_plugin
* @param codecType 编解码器类型默认为h264
* @param frameRate 帧率可为空
* @param isDebug 是否开启调试日志
* @param isAsync 是否使用异步解码模式默认为true
*/
data class VideoDecoderConfig(
val width: Int,
val height: Int,
val codecType: String = "h264",
val frameRate: Int? = null,
val isDebug: Boolean = false
val isDebug: Boolean = false,
val isAsync: Boolean = true
)

View File

@ -76,6 +76,9 @@ class VideoDecoderConfig {
/// false
final bool isDebug;
/// 使true
final bool isAsync;
///
VideoDecoderConfig({
this.width = 640,
@ -83,6 +86,7 @@ class VideoDecoderConfig {
this.frameRate,
this.codecType = CodecType.h264,
this.isDebug = false,
this.isAsync = true,
});
/// Map
@ -93,6 +97,7 @@ class VideoDecoderConfig {
'frameRate': frameRate,
'codecType': codecType.toString().split('.').last,
'isDebug': isDebug,
'isAsync': isAsync,
};
}
}