feat(device): 实现下位机 JSON 协议(data model 对齐)
按 docs/下位机交互数据模型.md 重构串口协议层: 协议层 - 新增 DeviceMessage 模型,对应 message_id/type/ack/need_ack/data - 新增 JsonProtocolService,4 字节大端长度前缀 + UTF-8 JSON 帧 - 删除原二进制协议(serial_protocol.dart) 服务层 - 新增 DeviceMessageService,集中收发并按 type 分发 - 重写 SerialRunner 为 JsonSerialRunner,使用 create_task/control 消息 数据模型 - DeviceState 增加 doorStatus/lightStatus/taskStatus/lastInfoAt - 新增 DeviceInfoNotifier 订阅 device_info 上行 - 灯光按钮接通 light_control 消息 测试 - 新增 device_protocol_test.dart(14 用例) - 修复 models_test.dart 残留的 Step mixSpeed/blowSpeed 错误
This commit is contained in:
147
lib/features/device/services/device_message_service.dart
Normal file
147
lib/features/device/services/device_message_service.dart
Normal file
@@ -0,0 +1,147 @@
|
||||
import 'dart:async';
|
||||
import 'dart:typed_data';
|
||||
|
||||
import 'serial_port_service.dart';
|
||||
import 'device_message.dart';
|
||||
import 'json_protocol.dart';
|
||||
|
||||
/// 设备消息分发器
|
||||
///
|
||||
/// 集中处理与下位机之间的 JSON 消息收发:
|
||||
/// 1. 接收:监听串口原始字节,调用 [JsonProtocolService] 解码为 [DeviceMessage],
|
||||
/// 按 type 分发到对应订阅者;
|
||||
/// 2. 发送:把 [DeviceMessage] 编码为字节流写入串口。
|
||||
///
|
||||
/// 各业务方(SerialRunner / DeviceInfoNotifier / LightControl)只需关心
|
||||
/// 自己订阅/发送的消息类型,不必直接接触串口。
|
||||
class DeviceMessageService {
|
||||
final SerialPortService _serial;
|
||||
final JsonProtocolService _protocol;
|
||||
|
||||
StreamSubscription<Uint8List>? _rxSub;
|
||||
StreamSubscription<String>? _errorSub;
|
||||
StreamSubscription<SerialConnectionState>? _stateSub;
|
||||
|
||||
final Map<DeviceMessageType, List<void Function(DeviceMessage)>> _handlers = {};
|
||||
|
||||
/// 错误信息流(解码错误、串口异常等)
|
||||
final _errorCtrl = StreamController<String>.broadcast();
|
||||
|
||||
/// 任何消息流的广播(用于调试 / 日志)
|
||||
final _messageCtrl = StreamController<DeviceMessage>.broadcast();
|
||||
|
||||
DeviceMessageService({
|
||||
required SerialPortService serial,
|
||||
JsonProtocolService? protocol,
|
||||
}) : _serial = serial,
|
||||
_protocol = protocol ?? JsonProtocolService() {
|
||||
_subscribe();
|
||||
}
|
||||
|
||||
Stream<String> get onError => _errorCtrl.stream;
|
||||
Stream<DeviceMessage> get onMessage => _messageCtrl.stream;
|
||||
|
||||
/// 串口连接状态变更(透传自 [SerialPortService])
|
||||
Stream<SerialConnectionState> get connectionStateChanges =>
|
||||
_serial.connectionStateChanges;
|
||||
|
||||
/// 是否可发送消息(已连接且已初始化)
|
||||
bool get canSend => _serial.isConnected;
|
||||
|
||||
/// 当前连接状态
|
||||
SerialConnectionState get connectionState => _serial.state;
|
||||
|
||||
/// 生成下一个消息 ID
|
||||
String nextId() => _protocol.nextId();
|
||||
|
||||
/// 订阅指定 [type] 的消息;返回取消订阅的函数
|
||||
void Function() subscribe(DeviceMessageType type, void Function(DeviceMessage) handler) {
|
||||
_handlers.putIfAbsent(type, () => <void Function(DeviceMessage)>[]);
|
||||
_handlers[type]!.add(handler);
|
||||
return () {
|
||||
final list = _handlers[type];
|
||||
if (list != null) list.remove(handler);
|
||||
};
|
||||
}
|
||||
|
||||
/// 发送消息到下位机;返回是否成功写入
|
||||
Future<bool> send(DeviceMessage message) async {
|
||||
if (!_serial.isConnected) {
|
||||
_emitError('串口未连接,无法发送 ${message.type.wireName}');
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
final bytes = _protocol.encode(message);
|
||||
final written = await _serial.write(bytes);
|
||||
if (written == 0) {
|
||||
_emitError('写入失败:${message.type.wireName}');
|
||||
return false;
|
||||
}
|
||||
_messageCtrl.add(message);
|
||||
return true;
|
||||
} catch (e) {
|
||||
_emitError('编码异常: $e');
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/// 释放资源
|
||||
Future<void> dispose() async {
|
||||
await _rxSub?.cancel();
|
||||
await _errorSub?.cancel();
|
||||
await _stateSub?.cancel();
|
||||
_rxSub = _errorSub = _stateSub = null;
|
||||
_handlers.clear();
|
||||
await _errorCtrl.close();
|
||||
await _messageCtrl.close();
|
||||
}
|
||||
|
||||
// -- 私有方法 ---------------------------------------------------------
|
||||
|
||||
void _subscribe() {
|
||||
_rxSub?.cancel();
|
||||
_errorSub?.cancel();
|
||||
_stateSub?.cancel();
|
||||
|
||||
_rxSub = _serial.onData.listen(_onData);
|
||||
_errorSub = _serial.onError.listen(_emitError);
|
||||
_stateSub = _serial.connectionStateChanges.listen((s) {
|
||||
if (s == SerialConnectionState.disconnected) {
|
||||
_protocol.reset();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void _onData(Uint8List data) {
|
||||
if (data.isEmpty) return;
|
||||
// 反复解析直到缓冲区无法形成完整帧
|
||||
while (true) {
|
||||
final (msg, consumed) = _protocol.tryDecode(data);
|
||||
if (msg == null) {
|
||||
if (consumed > 0 && data.length >= consumed) {
|
||||
data = Uint8List.sublistView(data, consumed);
|
||||
}
|
||||
return;
|
||||
}
|
||||
data = Uint8List.sublistView(data, consumed);
|
||||
_dispatch(msg);
|
||||
}
|
||||
}
|
||||
|
||||
void _dispatch(DeviceMessage msg) {
|
||||
_messageCtrl.add(msg);
|
||||
final list = _handlers[msg.type];
|
||||
if (list == null) return;
|
||||
for (final handler in List.of(list)) {
|
||||
try {
|
||||
handler(msg);
|
||||
} catch (e) {
|
||||
_emitError('消息处理异常 (${msg.type.wireName}): $e');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void _emitError(String message) {
|
||||
if (!_errorCtrl.isClosed) _errorCtrl.add(message);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user