fix:将接收udp数据改为Isolate,主线程解耦udp数据接收

This commit is contained in:
liyi 2025-06-18 18:00:57 +08:00
parent c617d73bb4
commit 2093784f38

View File

@ -1,5 +1,6 @@
import 'dart:async';
import 'dart:io';
import 'dart:isolate';
import 'dart:math';
import 'dart:typed_data';
@ -140,6 +141,11 @@ class StartChartManage {
return _avFrameLost / _avFrameTotal;
}
// ====== Isolate相关成员变量 ======
Isolate? _udpIsolate;
SendPort? _udpIsolateSendPort;
ReceivePort? _mainReceivePort;
//
Future<void> init() async {
if (F.isXHJ) {
@ -161,6 +167,8 @@ class StartChartManage {
await _onlineRelayService();
//
await reportInformation();
// Isolate
await _initUdpIsolate();
}
///
@ -1049,52 +1057,30 @@ class StartChartManage {
//
void _onReceiveData(RawDatagramSocket socket, BuildContext context) {
// ====== Isolate初始化握手 ======
if (_udpIsolateSendPort == null && _mainReceivePort != null) {
_mainReceivePort!.listen((dynamic message) {
if (message is SendPort) {
_udpIsolateSendPort = message;
}
});
}
socket.listen((RawSocketEvent event) {
if (event == RawSocketEvent.read) {
Datagram? dg;
while ((dg = socket.receive()) != null) {
try {
if (dg?.data != null) {
final deserialize = ScpMessage.deserialize(dg!.data);
// PayloadType==talkData的数据包
if (deserialize != null &&
deserialize.PayloadType == PayloadTypeConstant.talkData) {
int? msgId = deserialize.MessageId;
int spTotal = deserialize.SpTotal ?? 1;
int spIndex = deserialize.SpIndex ?? 1;
if (msgId != null) {
//
_avFrameParts.putIfAbsent(msgId, () => <int>{});
_avFrameParts[msgId]!.add(spIndex);
//
if (spIndex == spTotal) {
_avFrameTotal++;
if (_avFrameParts[msgId]!.length < spTotal) {
_avFrameLost++;
// _log(text: '音视频丢包丢失的messageId: $msgId');
}
_avFrameParts.remove(msgId);
// 100
if (_avFrameTotal % 100 == 0) {
_log(
text:
'音视频帧丢包率: ${(getAvFrameLossRate() * 100).toStringAsFixed(2)}%');
}
}
// bytes发送到Isolate
if (_udpIsolateSendPort != null) {
_udpIsolateSendPort!.send(dg!.data);
} else {
// Fallback: 线
final deserialize = ScpMessage.deserialize(dg!.data);
if (deserialize != null) {
_handleUdpResultData(deserialize);
}
}
if (deserialize != null) {
//
_handleUdpResultData(deserialize);
}
// if (deserialize.PayloadType != PayloadTypeConstant.heartbeat) {
// if (deserialize.Payload != null) {
// _log(text: 'Udp收到结构体数据---》$deserialize');
// }
// _log(text: 'text---》${utf8.decode(deserialize.Payload)}');
// }
}
} catch (e, stackTrace) {
throw StartChartMessageException('$e\n,$stackTrace');
@ -1110,12 +1096,12 @@ class StartChartManage {
final int messageType = scpMessage.MessageType ?? 0;
try {
//
if (scpMessage.SpIndex != null &&
scpMessage.SpTotal != null &&
scpMessage.MessageId != null) {
PacketLossStatistics().recordPacket(
scpMessage.MessageId!, scpMessage.SpIndex!, scpMessage.SpTotal!);
}
// if (scpMessage.SpIndex != null &&
// scpMessage.SpTotal != null &&
// scpMessage.MessageId != null) {
// PacketLossStatistics().recordPacket(
// scpMessage.MessageId!, scpMessage.SpIndex!, scpMessage.SpTotal!);
// }
final ScpMessageHandler handler =
ScpMessageHandlerFactory.createHandler(payloadType);
@ -1317,6 +1303,8 @@ class StartChartManage {
await Storage.removerStarChartRegisterNodeInfo();
// udp服务
closeUdpSocket();
// Isolate
_disposeUdpIsolate();
}
///
@ -1325,4 +1313,68 @@ class StartChartManage {
isOnlineStarChartServer = false;
talkStatus.setUninitialized();
}
// Isolate
Future<void> _initUdpIsolate() async {
_mainReceivePort = ReceivePort();
_mainReceivePort!.listen((dynamic message) {
// 线
// if (message is String) {
// _log(text: '[Isolate] $message');
// }
});
_udpIsolate = await Isolate.spawn<_UdpIsolateInitParams>(
_udpIsolateEntry,
_UdpIsolateInitParams(
_mainReceivePort!.sendPort,
_handleUdpResultDataStatic,
),
debugName: 'UdpDataIsolate',
);
}
// Isolate
void _disposeUdpIsolate() {
_udpIsolateSendPort = null;
_mainReceivePort?.close();
_mainReceivePort = null;
_udpIsolate?.kill(priority: Isolate.immediate);
_udpIsolate = null;
}
// 便Isolate调用成员方法
static void _handleUdpResultDataStatic(Map<String, dynamic> args) {
final instance = StartChartManage();
final ScpMessage scpMessage = args['scpMessage'] as ScpMessage;
instance._handleUdpResultData(scpMessage);
}
// Isolate入口函数
static void _udpIsolateEntry(_UdpIsolateInitParams params) {
final SendPort mainSendPort = params.mainSendPort;
final Function handleUdpResultData = params.handleUdpResultData;
final ReceivePort isolateReceivePort = ReceivePort();
// SendPort发回主线程
mainSendPort.send(isolateReceivePort.sendPort);
isolateReceivePort.listen((dynamic message) {
try {
if (message is List<int>) {
// List<int>Uint8List
final scpMessage = ScpMessage.deserialize(Uint8List.fromList(message));
if (scpMessage != null) {
// 线_handleUdpResultData
handleUdpResultData({'scpMessage': scpMessage});
}
}
} catch (e, stack) {
// mainSendPort.send('[Isolate Error] $e\n$stack');
}
});
}
}
class _UdpIsolateInitParams {
final SendPort mainSendPort;
final Function handleUdpResultData;
_UdpIsolateInitParams(this.mainSendPort, this.handleUdpResultData);
}