Merge branch 'develop_sky_liyi' into 'develop_sky'

fix:将接收udp数据改为Isolate,主线程解耦udp数据接收

See merge request StarlockTeam/app-starlock!175
This commit is contained in:
李仪 2025-06-18 10:03:04 +00:00
commit dfccfa9089

View File

@ -1,5 +1,6 @@
import 'dart:async'; import 'dart:async';
import 'dart:io'; import 'dart:io';
import 'dart:isolate';
import 'dart:math'; import 'dart:math';
import 'dart:typed_data'; import 'dart:typed_data';
@ -140,6 +141,11 @@ class StartChartManage {
return _avFrameLost / _avFrameTotal; return _avFrameLost / _avFrameTotal;
} }
// ====== Isolate相关成员变量 ======
Isolate? _udpIsolate;
SendPort? _udpIsolateSendPort;
ReceivePort? _mainReceivePort;
// //
Future<void> init() async { Future<void> init() async {
if (F.isXHJ) { if (F.isXHJ) {
@ -161,6 +167,8 @@ class StartChartManage {
await _onlineRelayService(); await _onlineRelayService();
// //
await reportInformation(); await reportInformation();
// Isolate
await _initUdpIsolate();
} }
/// ///
@ -1049,52 +1057,30 @@ class StartChartManage {
// //
void _onReceiveData(RawDatagramSocket socket, BuildContext context) { void _onReceiveData(RawDatagramSocket socket, BuildContext context) {
// ====== Isolate初始化握手 ======
if (_udpIsolateSendPort == null && _mainReceivePort != null) {
_mainReceivePort!.listen((dynamic message) {
if (message is SendPort) {
_udpIsolateSendPort = message;
}
});
}
socket.listen((RawSocketEvent event) { socket.listen((RawSocketEvent event) {
if (event == RawSocketEvent.read) { if (event == RawSocketEvent.read) {
Datagram? dg; Datagram? dg;
while ((dg = socket.receive()) != null) { while ((dg = socket.receive()) != null) {
try { try {
if (dg?.data != null) { if (dg?.data != null) {
final deserialize = ScpMessage.deserialize(dg!.data); // bytes发送到Isolate
if (_udpIsolateSendPort != null) {
// PayloadType==talkData的数据包 _udpIsolateSendPort!.send(dg!.data);
if (deserialize != null && } else {
deserialize.PayloadType == PayloadTypeConstant.talkData) { // Fallback: 线
int? msgId = deserialize.MessageId; final deserialize = ScpMessage.deserialize(dg!.data);
int spTotal = deserialize.SpTotal ?? 1; if (deserialize != null) {
int spIndex = deserialize.SpIndex ?? 1; _handleUdpResultData(deserialize);
if (msgId != null) {
//
_avFrameParts.putIfAbsent(msgId, () => <int>{});
_avFrameParts[msgId]!.add(spIndex);
//
if (spIndex == spTotal) {
_avFrameTotal++;
if (_avFrameParts[msgId]!.length < spTotal) {
_avFrameLost++;
// _log(text: '音视频丢包丢失的messageId: $msgId');
}
_avFrameParts.remove(msgId);
// 100
if (_avFrameTotal % 100 == 0) {
_log(
text:
'音视频帧丢包率: ${(getAvFrameLossRate() * 100).toStringAsFixed(2)}%');
}
}
} }
} }
if (deserialize != null) {
//
_handleUdpResultData(deserialize);
}
// if (deserialize.PayloadType != PayloadTypeConstant.heartbeat) {
// if (deserialize.Payload != null) {
// _log(text: 'Udp收到结构体数据---》$deserialize');
// }
// _log(text: 'text---》${utf8.decode(deserialize.Payload)}');
// }
} }
} catch (e, stackTrace) { } catch (e, stackTrace) {
throw StartChartMessageException('$e\n,$stackTrace'); throw StartChartMessageException('$e\n,$stackTrace');
@ -1110,12 +1096,12 @@ class StartChartManage {
final int messageType = scpMessage.MessageType ?? 0; final int messageType = scpMessage.MessageType ?? 0;
try { try {
// //
if (scpMessage.SpIndex != null && // if (scpMessage.SpIndex != null &&
scpMessage.SpTotal != null && // scpMessage.SpTotal != null &&
scpMessage.MessageId != null) { // scpMessage.MessageId != null) {
PacketLossStatistics().recordPacket( // PacketLossStatistics().recordPacket(
scpMessage.MessageId!, scpMessage.SpIndex!, scpMessage.SpTotal!); // scpMessage.MessageId!, scpMessage.SpIndex!, scpMessage.SpTotal!);
} // }
final ScpMessageHandler handler = final ScpMessageHandler handler =
ScpMessageHandlerFactory.createHandler(payloadType); ScpMessageHandlerFactory.createHandler(payloadType);
@ -1317,6 +1303,8 @@ class StartChartManage {
await Storage.removerStarChartRegisterNodeInfo(); await Storage.removerStarChartRegisterNodeInfo();
// udp服务 // udp服务
closeUdpSocket(); closeUdpSocket();
// Isolate
_disposeUdpIsolate();
} }
/// ///
@ -1325,4 +1313,68 @@ class StartChartManage {
isOnlineStarChartServer = false; isOnlineStarChartServer = false;
talkStatus.setUninitialized(); talkStatus.setUninitialized();
} }
// Isolate
Future<void> _initUdpIsolate() async {
_mainReceivePort = ReceivePort();
_mainReceivePort!.listen((dynamic message) {
// 线
// if (message is String) {
// _log(text: '[Isolate] $message');
// }
});
_udpIsolate = await Isolate.spawn<_UdpIsolateInitParams>(
_udpIsolateEntry,
_UdpIsolateInitParams(
_mainReceivePort!.sendPort,
_handleUdpResultDataStatic,
),
debugName: 'UdpDataIsolate',
);
}
// Isolate
void _disposeUdpIsolate() {
_udpIsolateSendPort = null;
_mainReceivePort?.close();
_mainReceivePort = null;
_udpIsolate?.kill(priority: Isolate.immediate);
_udpIsolate = null;
}
// 便Isolate调用成员方法
static void _handleUdpResultDataStatic(Map<String, dynamic> args) {
final instance = StartChartManage();
final ScpMessage scpMessage = args['scpMessage'] as ScpMessage;
instance._handleUdpResultData(scpMessage);
}
// Isolate入口函数
static void _udpIsolateEntry(_UdpIsolateInitParams params) {
final SendPort mainSendPort = params.mainSendPort;
final Function handleUdpResultData = params.handleUdpResultData;
final ReceivePort isolateReceivePort = ReceivePort();
// SendPort发回主线程
mainSendPort.send(isolateReceivePort.sendPort);
isolateReceivePort.listen((dynamic message) {
try {
if (message is List<int>) {
// List<int>Uint8List
final scpMessage = ScpMessage.deserialize(Uint8List.fromList(message));
if (scpMessage != null) {
// 线_handleUdpResultData
handleUdpResultData({'scpMessage': scpMessage});
}
}
} catch (e, stack) {
// mainSendPort.send('[Isolate Error] $e\n$stack');
}
});
}
}
class _UdpIsolateInitParams {
final SendPort mainSendPort;
final Function handleUdpResultData;
_UdpIsolateInitParams(this.mainSendPort, this.handleUdpResultData);
} }