Merge branch 'develop_sky_liyi' into 'develop_sky'

Develop sky liyi

See merge request StarlockTeam/app-starlock!176
This commit is contained in:
李仪 2025-06-23 08:01:39 +00:00
commit 954cfc6276
9 changed files with 123 additions and 206 deletions

View File

@ -16,6 +16,11 @@ import 'package:star_lock/talk/starChart/proto/talk_data_h264_frame.pb.dart';
// implements ScpMessageHandler {
class UdpTalkDataHandler extends ScpMessageBaseHandle
implements ScpMessageHandler {
//
static final UdpTalkDataHandler instance = UdpTalkDataHandler();
UdpTalkDataHandler(); //
@override
void handleReq(ScpMessage scpMessage) {}
@ -32,7 +37,7 @@ class UdpTalkDataHandler extends ScpMessageBaseHandle
if (scpMessage.Payload != null) {
final TalkData talkData = scpMessage.Payload;
//
_handleTalkData(
talkData: talkData,
scpMessage: scpMessage,
@ -122,7 +127,9 @@ class UdpTalkDataHandler extends ScpMessageBaseHandle
void _handleVideoH264(TalkData talkData, ScpMessage scpMessage) {
final TalkDataH264Frame talkDataH264Frame = TalkDataH264Frame();
talkDataH264Frame.mergeFromBuffer(talkData.content);
// AppLog.log('处理H264帧: frameType=${talkDataH264Frame.frameType}, frameSeq=${talkDataH264Frame.frameSeq},MessageId:${scpMessage.MessageId}');
frameHandler.handleFrame(talkDataH264Frame, talkData, scpMessage);
}
///

View File

@ -15,6 +15,9 @@ class H264FrameHandler {
void handleFrame(
TalkDataH264Frame frame, TalkData talkData, ScpMessage scpMessage) {
// AppLog.log(
// '送入stream的帧数据: frameSeq=${frame.frameSeq},frameType=${frame
// .frameType},MessageId:${scpMessage.MessageId}');
onCompleteFrame(TalkDataModel(
talkData: talkData,
talkDataH264Frame: frame,

View File

@ -11,7 +11,7 @@ class TalkDataRepository {
onCancel: () {
_isListening = false;
},
sync: false, //
sync: true, //
);
}

View File

@ -181,40 +181,6 @@ class ScpMessageBaseHandle {
}
return null;
// if (!_packetBuffer.containsKey(key)) {
// _packetBuffer[key] = List.filled(spTotal, []);
// _startTimer(key);
// }
//
// //
// if (spIndex < 1 || spIndex > spTotal) {
// // print(
// // 'Invalid spTotal: $spTotal spIndex: $spIndex for messageId: $messageId');
// return null;
// }
//
// //
// _packetBuffer[key]![spIndex - 1] = byte;
//
// //
// if (_packetBuffer[key]!.every((packet) => packet.isNotEmpty)) {
// //
// Uint8List completePayload = Uint8List.fromList(
// _packetBuffer[key]!.expand((packet) => packet).toList());
// //
// _clearPacketData(key);
//
// // 使TalkData
// if (payloadType == PayloadTypeConstant.talkData) {
// final talkData = TalkData();
// talkData.mergeFromBuffer(completePayload);
// return talkData;
// }
// } else {
// // null
// return null;
// }
}
//

View File

@ -52,7 +52,7 @@ class ScpMessageHandlerFactory {
case PayloadTypeConstant.talkExpect:
return UdpTalkExpectHandler();
case PayloadTypeConstant.talkData:
return UdpTalkDataHandler();
return UdpTalkDataHandler.instance;
case PayloadTypeConstant.talkHangup:
return UdpTalkHangUpHandler();
case PayloadTypeConstant.RbcuInfo:

View File

@ -141,11 +141,6 @@ class StartChartManage {
return _avFrameLost / _avFrameTotal;
}
// ====== Isolate相关成员变量 ======
Isolate? _udpIsolate;
SendPort? _udpIsolateSendPort;
ReceivePort? _mainReceivePort;
//
Future<void> init() async {
if (F.isXHJ) {
@ -167,8 +162,6 @@ class StartChartManage {
await _onlineRelayService();
//
await reportInformation();
// Isolate
await _initUdpIsolate();
}
///
@ -248,21 +241,20 @@ class StartChartManage {
var addressIListenFrom = InternetAddress.anyIPv4;
RawDatagramSocket.bind(addressIListenFrom, localPort)
.then((RawDatagramSocket socket) {
// (SO_RCVBUF = 8)
socket.setRawOption(
RawSocketOption.fromInt(
RawSocketOption.levelSocket,
8, // SO_RCVBUF for Android/iOS
8, // SO_RCVBUF for Android/iOS
2 * 1024 * 1024, // 2MB receive buffer
),
);
// (SO_SNDBUF = 7)
socket.setRawOption(
RawSocketOption.fromInt(
RawSocketOption.levelSocket,
7, // SO_SNDBUF for Android/iOS
7, // SO_SNDBUF for Android/iOS
2 * 1024 * 1024, // 2MB send buffer
),
);
@ -1057,29 +1049,22 @@ class StartChartManage {
//
void _onReceiveData(RawDatagramSocket socket, BuildContext context) {
// ====== Isolate初始化握手 ======
if (_udpIsolateSendPort == null && _mainReceivePort != null) {
_mainReceivePort!.listen((dynamic message) {
if (message is SendPort) {
_udpIsolateSendPort = message;
}
});
}
socket.listen((RawSocketEvent event) {
if (event == RawSocketEvent.read) {
Datagram? dg;
while ((dg = socket.receive()) != null) {
try {
if (dg?.data != null) {
// bytes发送到Isolate
if (_udpIsolateSendPort != null) {
_udpIsolateSendPort!.send(dg!.data);
} else {
// Fallback: 线
final deserialize = ScpMessage.deserialize(dg!.data);
if (deserialize != null) {
_handleUdpResultData(deserialize);
}
// Fallback: 线
//
final deserialize = ScpMessage.deserialize(dg!.data);
// if (deserialize.PayloadType == PayloadTypeConstant.talkData) {
// _log(
// text: 'mesaageId:${deserialize.MessageId},'
// 'SpTotal:${deserialize.SpTotal},SpIndex:${deserialize.SpIndex}');
// }
if (deserialize != null) {
_handleUdpResultData(deserialize);
}
}
} catch (e, stackTrace) {
@ -1095,14 +1080,6 @@ class StartChartManage {
final int payloadType = scpMessage.PayloadType ?? 0;
final int messageType = scpMessage.MessageType ?? 0;
try {
//
// if (scpMessage.SpIndex != null &&
// scpMessage.SpTotal != null &&
// scpMessage.MessageId != null) {
// PacketLossStatistics().recordPacket(
// scpMessage.MessageId!, scpMessage.SpIndex!, scpMessage.SpTotal!);
// }
final ScpMessageHandler handler =
ScpMessageHandlerFactory.createHandler(payloadType);
if (messageType == MessageTypeConstant.Req) {
@ -1303,8 +1280,6 @@ class StartChartManage {
await Storage.removerStarChartRegisterNodeInfo();
// udp服务
closeUdpSocket();
// Isolate
_disposeUdpIsolate();
}
///
@ -1313,68 +1288,4 @@ class StartChartManage {
isOnlineStarChartServer = false;
talkStatus.setUninitialized();
}
// Isolate
Future<void> _initUdpIsolate() async {
_mainReceivePort = ReceivePort();
_mainReceivePort!.listen((dynamic message) {
// 线
// if (message is String) {
// _log(text: '[Isolate] $message');
// }
});
_udpIsolate = await Isolate.spawn<_UdpIsolateInitParams>(
_udpIsolateEntry,
_UdpIsolateInitParams(
_mainReceivePort!.sendPort,
_handleUdpResultDataStatic,
),
debugName: 'UdpDataIsolate',
);
}
// Isolate
void _disposeUdpIsolate() {
_udpIsolateSendPort = null;
_mainReceivePort?.close();
_mainReceivePort = null;
_udpIsolate?.kill(priority: Isolate.immediate);
_udpIsolate = null;
}
// 便Isolate调用成员方法
static void _handleUdpResultDataStatic(Map<String, dynamic> args) {
final instance = StartChartManage();
final ScpMessage scpMessage = args['scpMessage'] as ScpMessage;
instance._handleUdpResultData(scpMessage);
}
// Isolate入口函数
static void _udpIsolateEntry(_UdpIsolateInitParams params) {
final SendPort mainSendPort = params.mainSendPort;
final Function handleUdpResultData = params.handleUdpResultData;
final ReceivePort isolateReceivePort = ReceivePort();
// SendPort发回主线程
mainSendPort.send(isolateReceivePort.sendPort);
isolateReceivePort.listen((dynamic message) {
try {
if (message is List<int>) {
// List<int>Uint8List
final scpMessage = ScpMessage.deserialize(Uint8List.fromList(message));
if (scpMessage != null) {
// 线_handleUdpResultData
handleUdpResultData({'scpMessage': scpMessage});
}
}
} catch (e, stack) {
// mainSendPort.send('[Isolate Error] $e\n$stack');
}
});
}
}
class _UdpIsolateInitParams {
final SendPort mainSendPort;
final Function handleUdpResultData;
_UdpIsolateInitParams(this.mainSendPort, this.handleUdpResultData);
}

View File

@ -38,6 +38,7 @@ import 'package:star_lock/tools/G711Tool.dart';
import 'package:star_lock/tools/bugly/bugly_tool.dart';
import 'package:star_lock/tools/commonDataManage.dart';
import 'package:star_lock/tools/storage.dart';
import 'package:video_decode_plugin/nalu_utils.dart';
import 'package:video_decode_plugin/video_decode_plugin.dart';
import '../../../../tools/baseGetXController.dart';
@ -51,6 +52,15 @@ class TalkViewNativeDecodeLogic extends BaseGetXController {
int audioBufferSize = 2; // 2
// frameSeq较小时阈值也小
int _getFrameSeqRolloverThreshold(int lastSeq) {
if (lastSeq > 2000) {
return 1000;
} else {
return (lastSeq / 2).round();
}
}
//
final List<int> _bufferedAudioFrames = <int>[];
@ -162,28 +172,35 @@ class TalkViewNativeDecodeLogic extends BaseGetXController {
int frameSeqI,
ScpMessage scpMessage,
) {
// frameSeq回绕I帧
// frameSeq较小时阈值也小
if (!_pendingStreamReset &&
_lastFrameSeq != null &&
frameType == TalkDataH264Frame_FrameTypeE.I &&
frameSeq < _lastFrameSeq!) {
// I帧loading并重置所有本地状态
AppLog.log(
'检测到新流I帧frameSeq回绕进入loading并重置: frameSeq=$frameSeq, lastFrameSeq=$_lastFrameSeq');
Future.microtask(() => state.isLoading.value = true);
_pendingStreamReset = true;
//
_stopFrameProcessTimer();
//
_resetDecoderForNewStream(_pendingResetWidth, _pendingResetHeight);
//
_lastFrameSeq = null;
_decodedIFrames.clear();
state.h264FrameBuffer.clear();
//
_startFrameProcessTimer();
// returnI帧初始化解码器并解码
//
int dynamicThreshold = _getFrameSeqRolloverThreshold(_lastFrameSeq!);
if ((_lastFrameSeq! - frameSeq) > dynamicThreshold) {
// I帧frameSeq大幅回绕loading并重置所有本地状态
AppLog.log('检测到新流I帧frameSeq大幅回绕进入loading并重置: frameSeq=$frameSeq, lastFrameSeq=$_lastFrameSeq, 阈值=$dynamicThreshold');
Future.microtask(() => state.isLoading.value = true);
_pendingStreamReset = true;
//
_stopFrameProcessTimer();
//
_resetDecoderForNewStream(_pendingResetWidth, _pendingResetHeight);
//
_lastFrameSeq = null;
_decodedIFrames.clear();
state.h264FrameBuffer.clear();
//
_startFrameProcessTimer();
// returnI帧初始化解码器并解码
//
} else {
//
AppLog.log('检测到I帧乱序未超过回绕阈值$dynamicThreshold),丢弃: frameSeq=$frameSeq, lastFrameSeq=$_lastFrameSeq');
return;
}
}
// pendingStreamResetI帧
if (_pendingStreamReset) {
@ -306,6 +323,9 @@ class TalkViewNativeDecodeLogic extends BaseGetXController {
lastDecodedIFrameSeq = minIFrameSeq;
// AppLog.log('送入解码器的P帧数据frameSeq:${frameSeq},frameSeqI:${frameSeqI},'
// 'frameType:${frameType},messageId:${scpMessage!.MessageId}');
// final spsData = NaluUtils.filterNalusByType(frameData, 7);
// final ppsData = NaluUtils.filterNalusByType(frameData, 8);
// AppLog.log('SPSDATA:${spsData}ppsData:${ppsData}');
await VideoDecodePlugin.sendFrame(
frameData: frameData,
frameType: 0,
@ -360,8 +380,9 @@ class TalkViewNativeDecodeLogic extends BaseGetXController {
state.isProcessingFrame = false;
return;
}
// AppLog.log('送入解码器的P帧数据frameSeq:${frameSeq},frameSeqI:${frameSeqI},'
// AppLog.log('送入解码器的I帧数据frameSeq:${frameSeq},frameSeqI:${frameSeqI},'
// 'frameType:${frameType},messageId:${scpMessage!.MessageId}');
await VideoDecodePlugin.sendFrame(
frameData: frameData,
frameType: 1,
@ -392,6 +413,11 @@ class TalkViewNativeDecodeLogic extends BaseGetXController {
//
void _startListenTalkData() {
//
if (_streamSubscription != null) {
_streamSubscription!.cancel();
_streamSubscription = null;
}
//
if (_isListening) {
AppLog.log("已经存在数据流监听,避免重复监听");
@ -403,39 +429,7 @@ class TalkViewNativeDecodeLogic extends BaseGetXController {
_streamSubscription = state.talkDataRepository.talkDataStream
.listen((TalkDataModel talkDataModel) async {
final talkData = talkDataModel.talkData;
final talkDataH264Frame = talkDataModel.talkDataH264Frame;
final scpMessage = talkDataModel.scpMessage;
final contentType = talkData!.contentType;
//
switch (contentType) {
case TalkData_ContentTypeE.G711:
if (state.audioBuffer.length >= audioBufferSize) {
state.audioBuffer.removeAt(0); //
}
state.audioBuffer.add(talkData); //
//
_playAudioFrames();
break;
case TalkData_ContentTypeE.H264:
// H264帧
if (state.textureId.value != null || true) {
if (talkDataH264Frame != null) {
_addFrameToBuffer(
talkData.content,
talkDataH264Frame.frameType,
talkData.durationMs,
talkDataH264Frame.frameSeq,
talkDataH264Frame.frameSeqI,
scpMessage!,
);
}
} else {
AppLog.log('无法处理H264帧textureId为空');
}
break;
}
_processFrame(talkDataModel);
});
}
@ -1372,7 +1366,7 @@ class TalkViewNativeDecodeLogic extends BaseGetXController {
try {
//
_stopFrameProcessTimer();
//
if (state.textureId.value != null) {
await VideoDecodePlugin.releaseDecoder();
@ -1381,7 +1375,7 @@ class TalkViewNativeDecodeLogic extends BaseGetXController {
//
await Future.delayed(Duration(milliseconds: 100));
//
final config = VideoDecoderConfig(
width: width,
@ -1394,7 +1388,7 @@ class TalkViewNativeDecodeLogic extends BaseGetXController {
if (textureId != null) {
state.textureId.value = textureId;
AppLog.log('frameSeq回绕后解码器初始化成功textureId=$textureId');
//
VideoDecodePlugin.setOnFrameRenderedListener((textureId) {
AppLog.log('已经开始渲染=======');
@ -1404,7 +1398,7 @@ class TalkViewNativeDecodeLogic extends BaseGetXController {
//
_startFrameProcessTimer();
//
_decodedIFrames.clear();
state.h264FrameBuffer.clear();
@ -1422,4 +1416,40 @@ class TalkViewNativeDecodeLogic extends BaseGetXController {
state.isLoading.value = false;
}
}
void _processFrame(TalkDataModel talkDataModel) {
final talkData = talkDataModel.talkData;
final talkDataH264Frame = talkDataModel.talkDataH264Frame;
final scpMessage = talkDataModel.scpMessage;
final contentType = talkData!.contentType;
//
switch (contentType) {
case TalkData_ContentTypeE.G711:
if (state.audioBuffer.length >= audioBufferSize) {
state.audioBuffer.removeAt(0); //
}
state.audioBuffer.add(talkData); //
//
_playAudioFrames();
break;
case TalkData_ContentTypeE.H264:
// H264帧
if (state.textureId.value != null) {
if (talkDataH264Frame != null) {
_addFrameToBuffer(
talkData.content,
talkDataH264Frame.frameType,
talkData.durationMs,
talkDataH264Frame.frameSeq,
talkDataH264Frame.frameSeqI,
scpMessage!,
);
}
} else {
AppLog.log('无法处理H264帧textureId为空');
}
break;
}
}
}

View File

@ -114,16 +114,16 @@ class _TalkViewNativeDecodePageState extends State<TalkViewNativeDecodePage>
key: state.globalKey,
child: SizedBox.expand(
child: RotatedBox(
// 使RotatedBox
quarterTurns: startChartManage.rotateAngle ~/ 90,
child: Platform.isIOS
? Transform.scale(
scale: 1.008, // iOS白边
child: Texture(
textureId: state.textureId.value!,
filterQuality: FilterQuality.medium,
),
)
// 使RotatedBox
quarterTurns: startChartManage.rotateAngle ~/ 90,
child: Platform.isIOS
? Transform.scale(
scale: 1.008, // iOS白边
child: Texture(
textureId: state.textureId.value!,
filterQuality: FilterQuality.medium,
),
)
: Texture(
textureId: state.textureId.value!,
filterQuality: FilterQuality.medium,

View File

@ -110,7 +110,7 @@ class TalkViewNativeDecodeState {
// H264帧缓冲区相关
final List<Map<String, dynamic>> h264FrameBuffer = <Map<String, dynamic>>[]; // H264帧缓冲区
final int maxFrameBufferSize = 150; //
final int maxFrameBufferSize = 50; //
final int targetFps = 60; // ,native的缓冲区
Timer? frameProcessTimer; //
bool isProcessingFrame = false; //