fix:增加分包发送、接收分包数据后的组包逻辑

This commit is contained in:
liyi 2024-12-19 17:47:44 +08:00
parent d6322c21ad
commit ba47b2731b
2 changed files with 63 additions and 59 deletions

View File

@ -22,8 +22,8 @@ import 'package:star_lock/talk/startChart/proto/talk_request.pb.dart';
class ScpMessage { class ScpMessage {
/// ///
/// keyMessageId // messageId
static Map<int, List<int>> _buffer = {}; static Map<int, List<List<int>>> _packetBuffer = {};
ScpMessage({ ScpMessage({
this.ProtocolFlag, this.ProtocolFlag,
@ -154,7 +154,7 @@ class ScpMessage {
// Payload () // Payload ()
if (Payload != null && Payload is String) { if (Payload != null && Payload is String) {
bytes.addAll(utf8.encode(Payload!)); bytes.addAll(utf8.encode(Payload!));
} else { } else if (Payload != null) {
// //
bytes.addAll(Payload); bytes.addAll(Payload);
} }
@ -302,8 +302,22 @@ class ScpMessage {
return heartbeatResponse; return heartbeatResponse;
case PayloadTypeConstant.echoTest: case PayloadTypeConstant.echoTest:
// //
String payload = utf8.decode(byte); if (spTotal != null &&
return payload; spTotal > 1 &&
messageId != null &&
spIndex != null) {
//
return _handleFragmentedPayload(
messageId: messageId,
spTotal: spTotal,
spIndex: spIndex,
byte: byte,
);
} else {
// spTotal 1 byte
String payload = utf8.decode(byte);
return payload;
}
case PayloadTypeConstant.gatewayReset: case PayloadTypeConstant.gatewayReset:
// //
if (messageType == MessageTypeConstant.Resp) { if (messageType == MessageTypeConstant.Resp) {
@ -428,22 +442,9 @@ class ScpMessage {
final GenericResp genericResp = GenericResp.fromBuffer(byte); final GenericResp genericResp = GenericResp.fromBuffer(byte);
return genericResp; return genericResp;
} else if (messageType == MessageTypeConstant.RealTimeData) { } else if (messageType == MessageTypeConstant.RealTimeData) {
if (spTotal != null && spTotal > 1) { //
// final TalkData talkData = TalkData.fromBuffer(byte);
final List<int> subPackageBytes = _subPackage( return talkData;
spTotal: spTotal!,
spIndex: spIndex!,
bytes: byte,
messageId: messageId!,
);
//
final TalkData talkData = TalkData.fromBuffer(subPackageBytes);
return talkData;
} else {
//
final TalkData talkData = TalkData.fromBuffer(byte);
return talkData;
}
} else { } else {
String payload = utf8.decode(byte); String payload = utf8.decode(byte);
return payload; return payload;
@ -465,19 +466,11 @@ class ScpMessage {
return payload; return payload;
} }
} catch (e) { } catch (e) {
// _log(text: '❌反序列化udp数据时遇到错误----》$e'); _log(text: '❌反序列化udp数据时遇到错误----》$e');
//
// //
_log( _log(
text: text:
'原始字节数组: ${byte.sublist(0, 20).map((b) => b.toRadixString(16)).join(" ")}'); '原始字节数组: ${byte.sublist(0, 20).map((b) => b.toRadixString(16)).join(" ")}');
//
// // Protobuf相关的异常
// if (e is InvalidProtocolBufferException || e is FormatException) {
// _log(
// text:
// '反序列化失败的payloadType: $payloadType, messageType: $messageType');
// }
return ''; return '';
} }
} }
@ -500,22 +493,37 @@ class ScpMessage {
AppLog.log('=====${text}'); AppLog.log('=====${text}');
} }
/// ///
static List<int> _subPackage({ /// null
static String? _handleFragmentedPayload({
required int messageId, required int messageId,
required int spTotal, required int spTotal,
required int spIndex, required int spIndex,
required List<int> bytes, required List<int> byte,
}) { }) {
if (_buffer.containsKey(messageId)) { //
// key就追加 if (!_packetBuffer.containsKey(messageId)) {
final List<int> bytesList = _buffer[messageId]!; _packetBuffer[messageId] = List.filled(spTotal, []);
bytesList.addAll(bytes); }
_buffer[messageId] = bytesList;
} else { //
// key _packetBuffer[messageId]![spIndex - 1] = byte;
_buffer.putIfAbsent(messageId, () => bytes);
//
if (_packetBuffer[messageId]!.every((packet) => packet.isNotEmpty)) {
//
List<int> completePayload =
_packetBuffer[messageId]!.expand((packet) => packet).toList();
//
_packetBuffer.remove(messageId);
// payload
String payload = utf8.decode(completePayload);
return payload;
} else {
// null
return null;
} }
return [];
} }
} }

View File

@ -75,7 +75,7 @@ class StartChartManage {
int talkDataIntervalTime = 10; // (ms) int talkDataIntervalTime = 10; // (ms)
Timer? talkDataTimer; // Timer? talkDataTimer; //
final int _maxPayloadSize = 50 * 1024; // final int _maxPayloadSize = 8 * 1024; //
// //
TalkExpect defaultTalkExpect = TalkExpect( TalkExpect defaultTalkExpect = TalkExpect(
@ -111,7 +111,7 @@ class StartChartManage {
Future<void> _clientRegister() async { Future<void> _clientRegister() async {
final StarChartRegisterNodeEntity? registerNodeEntity = final StarChartRegisterNodeEntity? registerNodeEntity =
await Storage.getStarChartRegisterNodeInfo(); await Storage.getStarChartRegisterNodeInfo();
if (registerNodeEntity != null) { if (registerNodeEntity != null && registerNodeEntity.peer?.id != null) {
_log(text: '获取到星图注册节点信息:$registerNodeEntity'); _log(text: '获取到星图注册节点信息:$registerNodeEntity');
FromPeerId = registerNodeEntity.peer!.id ?? ''; FromPeerId = registerNodeEntity.peer!.id ?? '';
} else { } else {
@ -215,7 +215,7 @@ class StartChartManage {
// 线 // 线
Future<void> _sendOnlineMessage() async { Future<void> _sendOnlineMessage() async {
_log(text: '发送上线消息'); _log(text: '发送上线消息,是否已经上线:$isOnlineStartChartServer');
if (isOnlineStartChartServer) { if (isOnlineStartChartServer) {
_log(text: '星图已上线,请勿重复发送上线消息'); _log(text: '星图已上线,请勿重复发送上线消息');
return; return;
@ -281,17 +281,21 @@ class StartChartManage {
// //
void sendEchoMessage({required List<int> payload}) async { void sendEchoMessage({required List<int> payload}) async {
String toPeerId = '2vzXdjdzipJBpWpJxhiRzCFXrDKk54t3YJ7EjYPSRuij'; String toPeerId = 'G7fzJkbS5MigMqnbTCQVk7VspcDsnGeikJpQwS8fbhim';
//
final int totalPackets = (payload.length / _maxPayloadSize).ceil(); final int totalPackets = (payload.length / _maxPayloadSize).ceil();
//
for (int i = 0; i < totalPackets; i++) { for (int i = 0; i < totalPackets; i++) {
int start = i * _maxPayloadSize; int start = i * _maxPayloadSize;
int end = (i + 1) * _maxPayloadSize; int end = (i + 1) * _maxPayloadSize;
if (end > payload.length) { if (end > payload.length) {
end = payload.length; end = payload.length;
} }
//
List<int> packet = payload.sublist(start, end); List<int> packet = payload.sublist(start, end);
//
final message = MessageCommand.echoMessage( final message = MessageCommand.echoMessage(
ToPeerId: toPeerId, ToPeerId: toPeerId,
FromPeerId: FromPeerId, FromPeerId: FromPeerId,
@ -300,22 +304,14 @@ class StartChartManage {
SpIndex: i + 1, SpIndex: i + 1,
MessageId: MessageCommand.getNextMessageId(toPeerId, increment: false), MessageId: MessageCommand.getNextMessageId(toPeerId, increment: false),
); );
//
await _sendMessage(message: message); await _sendMessage(message: message);
_log( _log(
text: text:
'发送回声测试消息=====SpTotal:$totalPackets,SpIndex:${i + 1},packet:${packet.length}'); '发送回声测试分包消息=====SpTotal:$totalPackets,SpIndex:${i + 1},packet:${packet.length}');
} }
// id // id
MessageCommand.getNextMessageId(toPeerId); MessageCommand.getNextMessageId(toPeerId);
// final message = MessageCommand.echoMessage(
// ToPeerId: echoPeerId,
// FromPeerId: FromPeerId,
// payload: [],
// SpIndex: 0,
// SpTotal: 0,
// );
// await _sendMessage(message: message);
// _log(text: '发送回声测试消息');
} }
// //
@ -372,7 +368,7 @@ class StartChartManage {
ToPeerId: ToPeerId, ToPeerId: ToPeerId,
FromPeerId: FromPeerId, FromPeerId: FromPeerId,
PayloadType: PayloadType, PayloadType: PayloadType,
MessageId: MessageCommand.getNextMessageId(ToPeerId, increment: true), MessageId: MessageCommand.getNextMessageId(ToPeerId, increment: false),
); );
await _sendMessage(message: message); await _sendMessage(message: message);
} }
@ -386,7 +382,7 @@ class StartChartManage {
ToPeerId: ToPeerId, ToPeerId: ToPeerId,
FromPeerId: FromPeerId, FromPeerId: FromPeerId,
PayloadType: PayloadType, PayloadType: PayloadType,
MessageId: MessageCommand.getNextMessageId(ToPeerId, increment: true), MessageId: MessageCommand.getNextMessageId(ToPeerId, increment: false),
); );
await _sendMessage(message: message); await _sendMessage(message: message);
} }