170 lines
5.1 KiB
Dart
170 lines
5.1 KiB
Dart
import 'dart:async';
|
||
import 'dart:typed_data';
|
||
|
||
import 'serial_port_service.dart';
|
||
import 'device_log.dart';
|
||
import 'device_message.dart';
|
||
import 'json_protocol.dart';
|
||
|
||
/// 设备消息分发器
|
||
///
|
||
/// 集中处理与下位机之间的 JSON 消息收发:
|
||
/// 1. 接收:监听串口原始字节,调用 [JsonProtocolService] 解码为 [DeviceMessage],
|
||
/// 按 type 分发到对应订阅者;
|
||
/// 2. 发送:把 [DeviceMessage] 编码为字节流写入串口。
|
||
///
|
||
/// 各业务方(SerialRunner / DeviceInfoNotifier / LightControl)只需关心
|
||
/// 自己订阅/发送的消息类型,不必直接接触串口。
|
||
class DeviceMessageService {
|
||
final SerialPortService _serial;
|
||
final JsonProtocolService _protocol;
|
||
|
||
StreamSubscription<Uint8List>? _rxSub;
|
||
StreamSubscription<String>? _errorSub;
|
||
StreamSubscription<SerialConnectionState>? _stateSub;
|
||
|
||
final Map<DeviceMessageType, List<void Function(DeviceMessage)>> _handlers = {};
|
||
|
||
/// 错误信息流(解码错误、串口异常等)
|
||
final _errorCtrl = StreamController<String>.broadcast();
|
||
|
||
/// 任何消息流的广播(用于调试 / 日志)
|
||
final _messageCtrl = StreamController<DeviceMessage>.broadcast();
|
||
|
||
DeviceMessageService({
|
||
required SerialPortService serial,
|
||
JsonProtocolService? protocol,
|
||
}) : _serial = serial,
|
||
_protocol = protocol ?? JsonProtocolService() {
|
||
_subscribe();
|
||
}
|
||
|
||
Stream<String> get onError => _errorCtrl.stream;
|
||
Stream<DeviceMessage> get onMessage => _messageCtrl.stream;
|
||
|
||
/// 串口连接状态变更(透传自 [SerialPortService])
|
||
Stream<SerialConnectionState> get connectionStateChanges =>
|
||
_serial.connectionStateChanges;
|
||
|
||
/// 是否可发送消息(已连接且已初始化)
|
||
bool get canSend => _serial.isConnected;
|
||
|
||
/// 当前连接状态
|
||
SerialConnectionState get connectionState => _serial.state;
|
||
|
||
/// 生成下一个消息 ID
|
||
String nextId() => _protocol.nextId();
|
||
|
||
/// 订阅指定 [type] 的消息;返回取消订阅的函数
|
||
void Function() subscribe(DeviceMessageType type, void Function(DeviceMessage) handler) {
|
||
_handlers.putIfAbsent(type, () => <void Function(DeviceMessage)>[]);
|
||
_handlers[type]!.add(handler);
|
||
return () {
|
||
final list = _handlers[type];
|
||
if (list != null) list.remove(handler);
|
||
};
|
||
}
|
||
|
||
/// 发送消息到下位机;返回是否成功写入
|
||
Future<bool> send(DeviceMessage message) async {
|
||
if (!_serial.isConnected) {
|
||
_emitError('串口未连接,无法发送 ${message.type.wireName}');
|
||
return false;
|
||
}
|
||
try {
|
||
final bytes = _protocol.encode(message);
|
||
final written = await _serial.write(bytes);
|
||
if (written == 0) {
|
||
_emitError('写入失败:${message.type.wireName}');
|
||
return false;
|
||
}
|
||
DeviceLog.info(
|
||
'TX ${message.type.wireName} '
|
||
'id=${message.messageId} '
|
||
'ack=${message.ack ?? "-"} '
|
||
'needAck=${message.needAck} '
|
||
'data=${DeviceLog.summarizeData(message.data)}\n'
|
||
' json=${message.encode()}',
|
||
);
|
||
_messageCtrl.add(message);
|
||
return true;
|
||
} catch (e) {
|
||
_emitError('编码异常: $e', error: e);
|
||
return false;
|
||
}
|
||
}
|
||
|
||
/// 释放资源
|
||
Future<void> dispose() async {
|
||
await _rxSub?.cancel();
|
||
await _errorSub?.cancel();
|
||
await _stateSub?.cancel();
|
||
_rxSub = _errorSub = _stateSub = null;
|
||
_handlers.clear();
|
||
await _errorCtrl.close();
|
||
await _messageCtrl.close();
|
||
}
|
||
|
||
// -- 私有方法 ---------------------------------------------------------
|
||
|
||
void _subscribe() {
|
||
_rxSub?.cancel();
|
||
_errorSub?.cancel();
|
||
_stateSub?.cancel();
|
||
|
||
_rxSub = _serial.onData.listen(_onData);
|
||
_errorSub = _serial.onError.listen((msg) => _emitError(msg));
|
||
_stateSub = _serial.connectionStateChanges.listen((s) {
|
||
if (s == SerialConnectionState.disconnected) {
|
||
_protocol.reset();
|
||
}
|
||
});
|
||
}
|
||
|
||
void _onData(Uint8List data) {
|
||
if (data.isEmpty) return;
|
||
// 反复解析直到缓冲区无法形成完整帧
|
||
while (true) {
|
||
final (msg, consumed) = _protocol.tryDecode(data);
|
||
if (msg == null) {
|
||
if (consumed > 0 && data.length >= consumed) {
|
||
data = Uint8List.sublistView(data, consumed);
|
||
}
|
||
return;
|
||
}
|
||
data = Uint8List.sublistView(data, consumed);
|
||
_dispatch(msg);
|
||
}
|
||
}
|
||
|
||
void _dispatch(DeviceMessage msg) {
|
||
DeviceLog.info(
|
||
'RX ${msg.type.wireName} '
|
||
'id=${msg.messageId} '
|
||
'ack=${msg.ack ?? "-"} '
|
||
'data=${DeviceLog.summarizeData(msg.data)}\n'
|
||
' json=${msg.encode()}',
|
||
);
|
||
_messageCtrl.add(msg);
|
||
final list = _handlers[msg.type];
|
||
if (list == null) return;
|
||
for (final handler in List.of(list)) {
|
||
try {
|
||
handler(msg);
|
||
} catch (e) {
|
||
_emitError('消息处理异常 (${msg.type.wireName}): $e', error: e);
|
||
DeviceLog.severe('消息处理异常 (${msg.type.wireName})', error: e);
|
||
}
|
||
}
|
||
}
|
||
|
||
void _emitError(String message, {Object? error}) {
|
||
if (error != null) {
|
||
DeviceLog.warn(message, error: error);
|
||
} else {
|
||
DeviceLog.warn(message);
|
||
}
|
||
if (!_errorCtrl.isClosed) _errorCtrl.add(message);
|
||
}
|
||
}
|