From ba47b2731b639fdae3f17854c1f2f5e2ab251713 Mon Sep 17 00:00:00 2001 From: liyi Date: Thu, 19 Dec 2024 17:47:44 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E5=A2=9E=E5=8A=A0=E5=88=86=E5=8C=85?= =?UTF-8?q?=E5=8F=91=E9=80=81=E3=80=81=E6=8E=A5=E6=94=B6=E5=88=86=E5=8C=85?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=90=8E=E7=9A=84=E7=BB=84=E5=8C=85=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/talk/startChart/entity/scp_message.dart | 94 +++++++++++---------- lib/talk/startChart/start_chart_manage.dart | 28 +++--- 2 files changed, 63 insertions(+), 59 deletions(-) diff --git a/lib/talk/startChart/entity/scp_message.dart b/lib/talk/startChart/entity/scp_message.dart index a4ccb79d..5f876964 100644 --- a/lib/talk/startChart/entity/scp_message.dart +++ b/lib/talk/startChart/entity/scp_message.dart @@ -22,8 +22,8 @@ import 'package:star_lock/talk/startChart/proto/talk_request.pb.dart'; class ScpMessage { /// 分包缓冲区 - /// key:MessageId - static Map> _buffer = {}; + // 存储每个 messageId 对应的分包数据 + static Map>> _packetBuffer = {}; ScpMessage({ this.ProtocolFlag, @@ -154,7 +154,7 @@ class ScpMessage { // Payload (字符串,转换为字节) if (Payload != null && Payload is String) { bytes.addAll(utf8.encode(Payload!)); - } else { + } else if (Payload != null) { // 如果不是字符串则需要外部转为字节数组,这里直接添加 bytes.addAll(Payload); } @@ -302,8 +302,22 @@ class ScpMessage { return heartbeatResponse; case PayloadTypeConstant.echoTest: // 回声测试 - String payload = utf8.decode(byte); - return payload; + if (spTotal != null && + spTotal > 1 && + messageId != null && + spIndex != null) { + // 分包处理 + return _handleFragmentedPayload( + messageId: messageId, + spTotal: spTotal, + spIndex: spIndex, + byte: byte, + ); + } else { + // 如果 spTotal 为 1 或者没有分包信息,直接处理 byte 数据 + String payload = utf8.decode(byte); + return payload; + } case PayloadTypeConstant.gatewayReset: // 初始化网关 if (messageType == MessageTypeConstant.Resp) { @@ -428,22 +442,9 @@ class ScpMessage { final GenericResp genericResp = GenericResp.fromBuffer(byte); return genericResp; } else if (messageType == MessageTypeConstant.RealTimeData) { - if (spTotal != null && spTotal > 1) { - // 处理分包 - final List subPackageBytes = _subPackage( - spTotal: spTotal!, - spIndex: spIndex!, - bytes: byte, - messageId: messageId!, - ); - // 没有分包直接解析 - final TalkData talkData = TalkData.fromBuffer(subPackageBytes); - return talkData; - } else { - // 没有分包直接解析 - final TalkData talkData = TalkData.fromBuffer(byte); - return talkData; - } + // 没有分包直接解析 + final TalkData talkData = TalkData.fromBuffer(byte); + return talkData; } else { String payload = utf8.decode(byte); return payload; @@ -465,19 +466,11 @@ class ScpMessage { return payload; } } catch (e) { - // _log(text: '❌反序列化udp数据时遇到错误----》$e'); - - // // 尝试打印原始字节数组以供调试 + _log(text: '❌反序列化udp数据时遇到错误----》$e'); + // 尝试打印原始字节数组以供调试 _log( text: '原始字节数组: ${byte.sublist(0, 20).map((b) => b.toRadixString(16)).join(" ")}'); - // - // // 如果是Protobuf相关的异常,尝试提供更多信息 - // if (e is InvalidProtocolBufferException || e is FormatException) { - // _log( - // text: - // '反序列化失败的payloadType: $payloadType, messageType: $messageType'); - // } return ''; } } @@ -500,22 +493,37 @@ class ScpMessage { AppLog.log('=====${text}'); } - /// 处理分包情况 - static List _subPackage({ + /// 处理分包逻辑 + /// 如果没有收到所有包则返回null + static String? _handleFragmentedPayload({ required int messageId, required int spTotal, required int spIndex, - required List bytes, + required List byte, }) { - if (_buffer.containsKey(messageId)) { - // 存在这个key就追加 - final List bytesList = _buffer[messageId]!; - bytesList.addAll(bytes); - _buffer[messageId] = bytesList; - } else { - // 如果不存在这个key,就新增进去 - _buffer.putIfAbsent(messageId, () => bytes); + // 初始化分包列表 + if (!_packetBuffer.containsKey(messageId)) { + _packetBuffer[messageId] = List.filled(spTotal, []); + } + + // 存储当前分包 + _packetBuffer[messageId]![spIndex - 1] = byte; + + // 检查是否接收到所有分包 + if (_packetBuffer[messageId]!.every((packet) => packet.isNotEmpty)) { + // 重组所有分包 + List completePayload = + _packetBuffer[messageId]!.expand((packet) => packet).toList(); + + // 清除已重组的分包数据 + _packetBuffer.remove(messageId); + + // 解析完整的 payload + String payload = utf8.decode(completePayload); + return payload; + } else { + // 如果分包尚未接收完全,返回 null 或其他指示符 + return null; } - return []; } } diff --git a/lib/talk/startChart/start_chart_manage.dart b/lib/talk/startChart/start_chart_manage.dart index 7e33a9dd..2fb712e1 100644 --- a/lib/talk/startChart/start_chart_manage.dart +++ b/lib/talk/startChart/start_chart_manage.dart @@ -75,7 +75,7 @@ class StartChartManage { int talkDataIntervalTime = 10; // 通话数据的消息间隔(ms) Timer? talkDataTimer; // 发送通话数据消息定时器 - final int _maxPayloadSize = 50 * 1024; // 分包大小 + final int _maxPayloadSize = 8 * 1024; // 分包大小 // 默认通话的期望数据格式 TalkExpect defaultTalkExpect = TalkExpect( @@ -111,7 +111,7 @@ class StartChartManage { Future _clientRegister() async { final StarChartRegisterNodeEntity? registerNodeEntity = await Storage.getStarChartRegisterNodeInfo(); - if (registerNodeEntity != null) { + if (registerNodeEntity != null && registerNodeEntity.peer?.id != null) { _log(text: '获取到星图注册节点信息:$registerNodeEntity'); FromPeerId = registerNodeEntity.peer!.id ?? ''; } else { @@ -215,7 +215,7 @@ class StartChartManage { // 发送上线消息 Future _sendOnlineMessage() async { - _log(text: '发送上线消息'); + _log(text: '发送上线消息,是否已经上线:$isOnlineStartChartServer'); if (isOnlineStartChartServer) { _log(text: '星图已上线,请勿重复发送上线消息'); return; @@ -281,17 +281,21 @@ class StartChartManage { // 发送回声测试消息 void sendEchoMessage({required List payload}) async { - String toPeerId = '2vzXdjdzipJBpWpJxhiRzCFXrDKk54t3YJ7EjYPSRuij'; + String toPeerId = 'G7fzJkbS5MigMqnbTCQVk7VspcDsnGeikJpQwS8fbhim'; + // 计算需要分多少个包发送 final int totalPackets = (payload.length / _maxPayloadSize).ceil(); + // 循环遍历 for (int i = 0; i < totalPackets; i++) { int start = i * _maxPayloadSize; int end = (i + 1) * _maxPayloadSize; if (end > payload.length) { end = payload.length; } + // 取到分包数据 List packet = payload.sublist(start, end); + // 组装分包数据 final message = MessageCommand.echoMessage( ToPeerId: toPeerId, FromPeerId: FromPeerId, @@ -300,22 +304,14 @@ class StartChartManage { SpIndex: i + 1, MessageId: MessageCommand.getNextMessageId(toPeerId, increment: false), ); + // 发送消息 await _sendMessage(message: message); _log( text: - '发送回声测试消息=====SpTotal:$totalPackets,SpIndex:${i + 1},packet:${packet.length}'); + '发送回声测试分包消息=====SpTotal:$totalPackets,SpIndex:${i + 1},packet:${packet.length}'); } // 分包发送完了递增一下id MessageCommand.getNextMessageId(toPeerId); - // final message = MessageCommand.echoMessage( - // ToPeerId: echoPeerId, - // FromPeerId: FromPeerId, - // payload: [], - // SpIndex: 0, - // SpTotal: 0, - // ); - // await _sendMessage(message: message); - // _log(text: '发送回声测试消息'); } // 发送网关初始化消息 @@ -372,7 +368,7 @@ class StartChartManage { ToPeerId: ToPeerId, FromPeerId: FromPeerId, PayloadType: PayloadType, - MessageId: MessageCommand.getNextMessageId(ToPeerId, increment: true), + MessageId: MessageCommand.getNextMessageId(ToPeerId, increment: false), ); await _sendMessage(message: message); } @@ -386,7 +382,7 @@ class StartChartManage { ToPeerId: ToPeerId, FromPeerId: FromPeerId, PayloadType: PayloadType, - MessageId: MessageCommand.getNextMessageId(ToPeerId, increment: true), + MessageId: MessageCommand.getNextMessageId(ToPeerId, increment: false), ); await _sendMessage(message: message); }