import 'dart:async'; import 'dart:typed_data'; import 'serial_port_service.dart'; import 'device_log.dart'; import 'device_message.dart'; import 'json_protocol.dart'; /// 设备消息分发器 /// /// 集中处理与下位机之间的 JSON 消息收发: /// 1. 接收:监听串口原始字节,调用 [JsonProtocolService] 解码为 [DeviceMessage], /// 按 type 分发到对应订阅者; /// 2. 发送:把 [DeviceMessage] 编码为字节流写入串口。 /// /// 各业务方(SerialRunner / DeviceInfoNotifier / LightControl)只需关心 /// 自己订阅/发送的消息类型,不必直接接触串口。 class DeviceMessageService { final SerialPortService _serial; final JsonProtocolService _protocol; StreamSubscription? _rxSub; StreamSubscription? _errorSub; StreamSubscription? _stateSub; final Map> _handlers = {}; /// 错误信息流(解码错误、串口异常等) final _errorCtrl = StreamController.broadcast(); /// 任何消息流的广播(用于调试 / 日志) final _messageCtrl = StreamController.broadcast(); DeviceMessageService({ required SerialPortService serial, JsonProtocolService? protocol, }) : _serial = serial, _protocol = protocol ?? JsonProtocolService() { _subscribe(); } Stream get onError => _errorCtrl.stream; Stream get onMessage => _messageCtrl.stream; /// 串口连接状态变更(透传自 [SerialPortService]) Stream get connectionStateChanges => _serial.connectionStateChanges; /// 是否可发送消息(已连接且已初始化) bool get canSend => _serial.isConnected; /// 当前连接状态 SerialConnectionState get connectionState => _serial.state; /// 生成下一个消息 ID String nextId() => _protocol.nextId(); /// 订阅指定 [type] 的消息;返回取消订阅的函数 void Function() subscribe(DeviceMessageType type, void Function(DeviceMessage) handler) { _handlers.putIfAbsent(type, () => []); _handlers[type]!.add(handler); return () { final list = _handlers[type]; if (list != null) list.remove(handler); }; } /// 发送消息到下位机;返回是否成功写入 Future send(DeviceMessage message) async { if (!_serial.isConnected) { _emitError('串口未连接,无法发送 ${message.type.wireName}'); return false; } try { final bytes = _protocol.encode(message); final written = await _serial.write(bytes); if (written == 0) { _emitError('写入失败:${message.type.wireName}'); return false; } DeviceLog.info( 'TX ${message.type.wireName} ' 'id=${message.messageId} ' 'ack=${message.ack ?? "-"} ' 'needAck=${message.needAck} ' 'data=${DeviceLog.summarizeData(message.data)}\n' ' json=${message.encode()}', ); _messageCtrl.add(message); return true; } catch (e) { _emitError('编码异常: $e', error: e); return false; } } /// 释放资源 Future dispose() async { await _rxSub?.cancel(); await _errorSub?.cancel(); await _stateSub?.cancel(); _rxSub = _errorSub = _stateSub = null; _handlers.clear(); await _errorCtrl.close(); await _messageCtrl.close(); } // -- 私有方法 --------------------------------------------------------- void _subscribe() { _rxSub?.cancel(); _errorSub?.cancel(); _stateSub?.cancel(); _rxSub = _serial.onData.listen(_onData); _errorSub = _serial.onError.listen((msg) => _emitError(msg)); _stateSub = _serial.connectionStateChanges.listen((s) { if (s == SerialConnectionState.disconnected) { _protocol.reset(); } }); } void _onData(Uint8List data) { if (data.isEmpty) return; // 反复解析直到缓冲区无法形成完整帧 while (true) { final (msg, consumed) = _protocol.tryDecode(data); if (msg == null) { if (consumed > 0 && data.length >= consumed) { data = Uint8List.sublistView(data, consumed); } return; } data = Uint8List.sublistView(data, consumed); _dispatch(msg); } } void _dispatch(DeviceMessage msg) { DeviceLog.info( 'RX ${msg.type.wireName} ' 'id=${msg.messageId} ' 'ack=${msg.ack ?? "-"} ' 'data=${DeviceLog.summarizeData(msg.data)}\n' ' json=${msg.encode()}', ); _messageCtrl.add(msg); final list = _handlers[msg.type]; if (list == null) return; for (final handler in List.of(list)) { try { handler(msg); } catch (e) { _emitError('消息处理异常 (${msg.type.wireName}): $e', error: e); DeviceLog.severe('消息处理异常 (${msg.type.wireName})', error: e); } } } void _emitError(String message, {Object? error}) { if (error != null) { DeviceLog.warn(message, error: error); } else { DeviceLog.warn(message); } if (!_errorCtrl.isClosed) _errorCtrl.add(message); } }