diff --git a/lib/talk/starChart/star_chart_manage.dart b/lib/talk/starChart/star_chart_manage.dart index 13228e01..ec465953 100644 --- a/lib/talk/starChart/star_chart_manage.dart +++ b/lib/talk/starChart/star_chart_manage.dart @@ -1,5 +1,6 @@ import 'dart:async'; import 'dart:io'; +import 'dart:isolate'; import 'dart:math'; import 'dart:typed_data'; @@ -140,6 +141,11 @@ class StartChartManage { return _avFrameLost / _avFrameTotal; } + // ====== Isolate相关成员变量 ====== + Isolate? _udpIsolate; + SendPort? _udpIsolateSendPort; + ReceivePort? _mainReceivePort; + // 星图服务初始化 Future init() async { if (F.isXHJ) { @@ -161,6 +167,8 @@ class StartChartManage { await _onlineRelayService(); // 上报 await reportInformation(); + // 初始化Isolate + await _initUdpIsolate(); } /// 客户端注册 @@ -1049,52 +1057,30 @@ class StartChartManage { // 接收返回的数据 void _onReceiveData(RawDatagramSocket socket, BuildContext context) { + // ====== Isolate初始化握手 ====== + if (_udpIsolateSendPort == null && _mainReceivePort != null) { + _mainReceivePort!.listen((dynamic message) { + if (message is SendPort) { + _udpIsolateSendPort = message; + } + }); + } socket.listen((RawSocketEvent event) { if (event == RawSocketEvent.read) { Datagram? dg; while ((dg = socket.receive()) != null) { try { if (dg?.data != null) { - final deserialize = ScpMessage.deserialize(dg!.data); - - // 音视频帧丢包统计:只统计PayloadType==talkData的数据包,结合分包信息 - if (deserialize != null && - deserialize.PayloadType == PayloadTypeConstant.talkData) { - int? msgId = deserialize.MessageId; - int spTotal = deserialize.SpTotal ?? 1; - int spIndex = deserialize.SpIndex ?? 1; - if (msgId != null) { - // 记录收到的分包 - _avFrameParts.putIfAbsent(msgId, () => {}); - _avFrameParts[msgId]!.add(spIndex); - // 如果收到最后一个分包,判断该帧是否完整 - if (spIndex == spTotal) { - _avFrameTotal++; - if (_avFrameParts[msgId]!.length < spTotal) { - _avFrameLost++; - // _log(text: '音视频丢包,丢失的messageId: $msgId'); - } - _avFrameParts.remove(msgId); - // 可选:每100帧打印一次丢包率 - if (_avFrameTotal % 100 == 0) { - _log( - text: - '音视频帧丢包率: ${(getAvFrameLossRate() * 100).toStringAsFixed(2)}%'); - } - } + // 直接将bytes发送到Isolate + if (_udpIsolateSendPort != null) { + _udpIsolateSendPort!.send(dg!.data); + } else { + // Fallback: 主线程处理(极端情况) + final deserialize = ScpMessage.deserialize(dg!.data); + if (deserialize != null) { + _handleUdpResultData(deserialize); } } - - if (deserialize != null) { - // 处理返回数据 - _handleUdpResultData(deserialize); - } - // if (deserialize.PayloadType != PayloadTypeConstant.heartbeat) { - // if (deserialize.Payload != null) { - // _log(text: 'Udp收到结构体数据---》$deserialize'); - // } - // _log(text: 'text---》${utf8.decode(deserialize.Payload)}'); - // } } } catch (e, stackTrace) { throw StartChartMessageException('$e\n,$stackTrace'); @@ -1110,12 +1096,12 @@ class StartChartManage { final int messageType = scpMessage.MessageType ?? 0; try { // 记录分包数据用于统计丢包率 - if (scpMessage.SpIndex != null && - scpMessage.SpTotal != null && - scpMessage.MessageId != null) { - PacketLossStatistics().recordPacket( - scpMessage.MessageId!, scpMessage.SpIndex!, scpMessage.SpTotal!); - } + // if (scpMessage.SpIndex != null && + // scpMessage.SpTotal != null && + // scpMessage.MessageId != null) { + // PacketLossStatistics().recordPacket( + // scpMessage.MessageId!, scpMessage.SpIndex!, scpMessage.SpTotal!); + // } final ScpMessageHandler handler = ScpMessageHandlerFactory.createHandler(payloadType); @@ -1317,6 +1303,8 @@ class StartChartManage { await Storage.removerStarChartRegisterNodeInfo(); // 关闭udp服务 closeUdpSocket(); + // 关闭Isolate + _disposeUdpIsolate(); } /// 重置数据 @@ -1325,4 +1313,68 @@ class StartChartManage { isOnlineStarChartServer = false; talkStatus.setUninitialized(); } + + // 初始化Isolate + Future _initUdpIsolate() async { + _mainReceivePort = ReceivePort(); + _mainReceivePort!.listen((dynamic message) { + // 目前不需要主线程回调,如需日志可在此处理 + // if (message is String) { + // _log(text: '[Isolate] $message'); + // } + }); + _udpIsolate = await Isolate.spawn<_UdpIsolateInitParams>( + _udpIsolateEntry, + _UdpIsolateInitParams( + _mainReceivePort!.sendPort, + _handleUdpResultDataStatic, + ), + debugName: 'UdpDataIsolate', + ); + } + + // 销毁Isolate + void _disposeUdpIsolate() { + _udpIsolateSendPort = null; + _mainReceivePort?.close(); + _mainReceivePort = null; + _udpIsolate?.kill(priority: Isolate.immediate); + _udpIsolate = null; + } + + // 静态代理,便于Isolate调用成员方法 + static void _handleUdpResultDataStatic(Map args) { + final instance = StartChartManage(); + final ScpMessage scpMessage = args['scpMessage'] as ScpMessage; + instance._handleUdpResultData(scpMessage); + } + + // Isolate入口函数 + static void _udpIsolateEntry(_UdpIsolateInitParams params) { + final SendPort mainSendPort = params.mainSendPort; + final Function handleUdpResultData = params.handleUdpResultData; + final ReceivePort isolateReceivePort = ReceivePort(); + // 首次将SendPort发回主线程 + mainSendPort.send(isolateReceivePort.sendPort); + isolateReceivePort.listen((dynamic message) { + try { + if (message is List) { + // 修正:List转Uint8List + final scpMessage = ScpMessage.deserialize(Uint8List.fromList(message)); + if (scpMessage != null) { + // 通过静态代理调用主线程的_handleUdpResultData + handleUdpResultData({'scpMessage': scpMessage}); + } + } + } catch (e, stack) { + // 可选:mainSendPort.send('[Isolate Error] $e\n$stack'); + } + }); + } +} + +class _UdpIsolateInitParams { + final SendPort mainSendPort; + final Function handleUdpResultData; + _UdpIsolateInitParams(this.mainSendPort, this.handleUdpResultData); }