MqttClient 架构与接口文档
(Calibration Service – Factory Side, No Local)
1. MQTT 通信原理说明
1.1 MQTT 通信模型与机制
MQTT(Message Queuing Telemetry Transport)是一种基于 发布/订阅(Pub/Sub) 模式的轻量级消息传输协议,广泛应用于物联网场景,尤其适用于:
- 低带宽
- 高延迟
- 不稳定网络环境
在本系统中,客户端 A、B、C 都 既可以作为发布者(Publisher),也可以作为订阅者(Subscriber),角色由其所使用的 Topic 和业务逻辑决定。
1.1.1 客户端-服务器模型
- Broker(消息代理服务器)
- 所有 MQTT 客户端均与 Broker 建立连接。
- 负责维护客户端连接状态、订阅关系,并进行消息路由与分发。
- Publisher(发布者)
- 将消息发布到某个主题(Topic)。
- Subscriber(订阅者)
- 订阅一个或多个主题,从 Broker 接收对应消息。
在本架构中:
- Factory Side 的 MqttClient 既会:
- 订阅管理侧下发的控制指令(如
/command/update、/command/export等), - 又会发布设备状态、文件传输相关消息(如
/state、/file/transfer/*等)。
- 订阅管理侧下发的控制指令(如
1.1.2 工作流程概述
- 连接建立
- 客户端(Factory Side / 管理侧)通过 TCP/IP 或 SSL/TLS 与 Broker 建立连接。
- 订阅主题
- 客户端向 Broker 发送 SUBSCRIBE 报文,订阅对应 Topic(如:
{device}/command/export)。
- 客户端向 Broker 发送 SUBSCRIBE 报文,订阅对应 Topic(如:
- 发布消息
- 客户端向 Broker 发布 PUBLISH 报文到某 Topic(如:
{device}/file/transfer/chunk)。
- 客户端向 Broker 发布 PUBLISH 报文到某 Topic(如:
- 消息分发
- Broker 将收到的消息转发给 所有订阅了该 Topic 的客户端。
- 断线与重连
- 利用 MQTT 的持久会话及遗言(Last Will)机制实现断线检测和状态同步。
1.1.3 协议特性(与本项目相关)
- 低开销
- MQTT 报文头部非常小(最小仅 2 字节),适合频繁状态上报及小文件分片传输。
- QoS 支持(消息质量等级)
QoS 0(最多一次):- 不保证送达,不做重发,适合可丢失的状态广播。
QoS 1(至少一次):- 保证消息至少送达一次,可能出现重复,适合一般业务指令和文件分片传输。
QoS 2(恰好一次):- 保证消息到达且不重复,开销最大,通常用在对重复极度敏感的场景。
- 持久会话
- 客户端重连后可以恢复之前的订阅。
- 遗言(Last Will)
- 客户端异常断开时 Broker 可代为发布一条预先配置好的消息,用于通知设备离线、任务中断等。
1.2 Topic(主题)说明
MQTT 的 Topic 是消息的 路由标识符,采用类似文件路径的分层结构,例如:
{device}/state
{device}/command/export
{device}/file/transfer/chunk
device常为设备 ID 或设备唯一标识_currentDevice。/为层级分隔符,用于构建清晰的业务结构:
典型分层设计:
/{device}/state:设备状态相关/{device}/command/...:控制指令(更新、导出、重启等)/{device}/file/transfer/...:文件传输业务(元数据、分片、进度、完成等)/{device}/sync、/{device}/UploadReady:状态同步和上传准备相关
2. Calibration Service – MqttClient 主题设计(Factory Side,无本地界面)
本章节描述 工厂侧(Factory Side)Calibration Service 中的 MqttClient 主题约定。
其中 _currentDevice 表示当前设备的标识,如 Line1-Mixer01 等。
private readonly string _currentDevice;
所有 Topic 均以 _currentDevice 为前缀。
2.1 发布主题(Publish Topics – Factory Side)
Factory Side MqttClient 对外发布的主题如下:
public string Publish_State_Topic => $"{_currentDevice}/state";
public string Publish_File_Metadata_Topic => $"{_currentDevice}/file/transfer/matedata";
public string Publish_File_Chunk_Topic => $"{_currentDevice}/file/transfer/chunk";
public string Publish_File_Progress_Topic => $"{_currentDevice}/file/transfer/progress";
public string Publish_File_Complete_Topic => $"{_currentDevice}/file/transfer/complete";
2.1.1 设备状态发布 – Publish_State_Topic
- Topic:
{device}/state - 方向:Factory Side → 管理侧
- 用途:
- 上报设备当前状态,例如:
- 空闲 / 正在导出 / 正在上传 / 出错
- 文件是否就绪(导出完成,等待上传)
- 上报设备当前状态,例如:
- 建议 Payload 示例(JSON):
{
"deviceId": "Line1-Mixer01",
"timestamp": "2025-12-26T07:30:00Z",
"status": "ExportCompleted",
"fileReady": true,
"remark": "Export and compression finished"
}
2.1.2 文件元数据发布 – Publish_File_Metadata_Topic
-
Topic:
{device}/file/transfer/matedata -
方向:Factory Side → 管理侧
-
用途:
- 在实际文件上传前,告知管理侧即将上传的文件信息(元数据)。
-
典型内容:
- 文件名(含扩展名,如
.zip) - 文件总大小(字节数)
- 分片大小(每块字节数)
- 预计分片总数
- 文件校验信息(如 MD5)
- 文件名(含扩展名,如
-
建议 Payload 示例(JSON):
{
"deviceId": "Line1-Mixer01",
"fileName": "calibration_export_20251226_073000.zip",
"fileSize": 1048576,
"chunkSize": 4096,
"totalChunks": 256,
"md5": "a1b2c3d4e5f60123456789abcdef0000",
"timestamp": "2025-12-26T07:31:00Z"
}
注:键名如有既定规范,请统一定义并遵守。
2.1.3 文件分片数据发布 – Publish_File_Chunk_Topic
-
Topic:
{device}/file/transfer/chunk -
方向:Factory Side → 管理侧
-
用途:
- 以分片形式传输压缩后的导出文件数据。
-
典型内容:
- 当前分片序号(从 0 或 1 开始)
- 总分片数
- 当前分片的二进制内容(Base64 编码)
-
建议 Payload 示例(JSON + Base64):
{
"deviceId": "Line1-Mixer01",
"fileName": "calibration_export_20251226_073000.zip",
"chunkIndex": 1,
"totalChunks": 256,
"data": "BASE64_ENCODED_BYTES",
"timestamp": "2025-12-26T07:31:10Z"
}
2.1.4 文件传输进度发布 – Publish_File_Progress_Topic
-
Topic:
{device}/file/transfer/progress -
方向:Factory Side → 管理侧
-
用途:
- 上报文件上传进度,便于管理侧展示进度或做超时监控。
-
典型内容:
- 已上传分片数 / 总分片数
- 百分比
- 当前状态(传输中 / 成功 / 失败)
-
建议 Payload 示例(JSON):
{
"deviceId": "Line1-Mixer01",
"fileName": "calibration_export_20251226_073000.zip",
"uploadedChunks": 128,
"totalChunks": 256,
"progress": 50,
"status": "Transferring",
"timestamp": "2025-12-26T07:32:00Z"
}
2.1.5 文件传输完成通知 – Publish_File_Complete_Topic
-
Topic:
{device}/file/transfer/complete -
方向:Factory Side → 管理侧
-
用途:
- 文件全部分片上传完成后发送该消息,作为管理侧进行校验及后续处理的触发点。
-
典型内容:
- 文件名
- 总分片数
- 最终校验值(校验完整性)
- 结果状态
-
建议 Payload 示例(JSON):
{
"deviceId": "Line1-Mixer01",
"fileName": "calibration_export_20251226_073000.zip",
"totalChunks": 256,
"md5": "a1b2c3d4e5f60123456789abcdef0000",
"status": "Completed",
"timestamp": "2025-12-26T07:33:30Z"
}
2.2 订阅主题(Subscribe Topics – Factory Side)
Factory Side MqttClient 从以下主题接收管理侧控制指令及同步信息:
public string Subscribe_Update_Recipe_Topic => $"{_currentDevice}/command/update";
public string Subscribe_Restart_Ctl_Topic => $"{_currentDevice}/command/restart";
public string Subscribe_Export_Topic => $"{_currentDevice}/command/export";
public string Subscribe_State_Sync_Topic => $"{_currentDevice}/sync";
public string Subscribe_State_Sync_UploadReady => $"{_currentDevice}/UploadReady";
public string Subscribe_File_Recieve_MateData_Topic => $"{_currentDevice}/file/transfer/matedata";
public string Subscribe_File_Recieve_Chunk_Topic => $"{_currentDevice}/file/transfer/chunk";
public string Subscribe_File_Recieve_Progress_Topic => $"{_currentDevice}/file/transfer/progress";
public string Subscribe_File_Recieve_Complete_Topic => $"{_currentDevice}/file/transfer/complete";
说明:你原文中
Subscribe_File_Recieve_Complete_Topic尚未补全,这里按发布 Topic 对称,定义为
"{_currentDevice}/file/transfer/complete"。
2.2.1 配方更新指令 – Subscribe_Update_Recipe_Topic
- Topic:
{device}/command/update - 方向:管理侧 → Factory Side
- 用途:
- 通知设备执行 配方更新 操作(可能伴随文件下行或参数同步)。
- 建议 Payload 示例:
{
"deviceId": "Line1-Mixer01",
"command": "UpdateRecipe",
"recipeVersion": "v1.2.3",
"source": "CentralServer",
"timestamp": "2025-12-26T07:20:00Z"
}
接收到该指令后,Controller 内部应调用对应的 “更新配方” 接口方法。
2.2.2 设备重启控制指令 – Subscribe_Restart_Ctl_Topic
- Topic:
{device}/command/restart - 方向:管理侧 → Factory Side
- 用途:
- 远程控制设备或控制服务重启(需严格权限与安全控制)。
- 建议 Payload 示例:
{
"deviceId": "Line1-Mixer01",
"command": "Restart",
"reason": "ApplyNewConfig",
"delaySeconds": 10,
"timestamp": "2025-12-26T07:21:00Z"
}
2.2.3 导出指令 – Subscribe_Export_Topic
- Topic:
{device}/command/export - 方向:管理侧 → Factory Side
- 用途:
- 触发设备执行 导出配方/校准数据 操作,并在本地生成 CSV 文件,随后进行压缩。
- 建议 Payload 示例:
{
"deviceId": "Line1-Mixer01",
"command": "Export",
"exportType": "CalibrationData",
"dateFrom": "2025-12-01",
"dateTo": "2025-12-26",
"timestamp": "2025-12-26T07:22:00Z"
}
收到此指令后,将触发 Controller 中 “导出” 接口方法,下文在 3.0 中详细说明流程。
2.2.4 状态同步 – Subscribe_State_Sync_Topic
- Topic:
{device}/sync - 方向:管理侧 → Factory Side
- 用途:
- 用于状态同步、心跳或服务启动时的初始同步指令。
示例用途:
- 管理侧下发 “请上报当前状态” 命令;
- 校正设备与管理端的状态机(防止长时间脱离同步)。
2.2.5 上传准备状态 – Subscribe_State_Sync_UploadReady
- Topic:
{device}/UploadReady - 方向:管理侧 → Factory Side
- 用途:
- 通知 Factory Side “管理侧已经准备好接收文件上传”,或者确认设备文件已就绪可进行上传。
- 典型业务含义:
- 在导出完成后,设备可能先通过
state发布“文件已就绪”,
管理端准备完成上传通道后,通过此 Topic 告知设备可以开始进行文件传输。
- 在导出完成后,设备可能先通过
2.2.6 文件接收 – 元数据/分片/进度/完成
以下主题用于 Factory Side 作为接收方(Subscriber)时的文件接收场景,例如未来支持从管理侧下发文件(下行):
- 元数据:
Subscribe_File_Recieve_MateData_Topic
"{device}/file/transfer/matedata" - 文件分片:
Subscribe_File_Recieve_Chunk_Topic
"{device}/file/transfer/chunk" - 传输进度:
Subscribe_File_Recieve_Progress_Topic
"{device}/file/transfer/progress" - 传输完成:
Subscribe_File_Recieve_Complete_Topic
"{device}/file/transfer/complete"
Payload 结构可参照 2.1 中的发布格式,只是方向相反(管理侧 → Factory Side)。
3. 整体流程说明(Factory Side Controller 设计)
本节描述如何通过一个独立的 Controller 管理 Factory Side MqttClient,并将已有 MYRG 项目中的逻辑(导出配方、更新配方、重启控制器等)封装成接口方法,在 MQTT 消息驱动下调用。
3.1 架构概述
-
独立 MqttController:
- 负责:
- MQTT 客户端初始化、连接管理;
- 订阅所有 Factory Side 相关主题;
- 收到消息后进行解析并分发给内部业务接口。
- 负责:
-
业务接口(Service 层):
可以将 MYRG 中原有逻辑抽象为服务接口,例如:public interface ICalibrationExportService { Task<string> ExportCalibrationDataAsync(DateTime from, DateTime to); // 返回导出后 CSV 文件路径 } public interface IFileCompressionService { Task<string> CompressFileAsync(string csvFilePath); // 返回压缩后的 zip 文件路径 } public interface IDeviceControlService { Task RestartAsync(int delaySeconds); Task UpdateRecipeAsync(string recipeVersion); } -
MQTT 消息 → 业务方法调用:
- 例如,当收到
{device}/command/export消息时,解析时间范围、导出类型等参数,然后调用ExportCalibrationDataAsync和CompressFileAsync,再走上传流程。
- 例如,当收到
3.2 command/export 导出 & 上传流程示例
以你提出的核心流程为例,步骤如下:
-
接收导出指令
- Factory Side 订阅:
Subscribe_Export_Topic => $"{_currentDevice}/command/export"; - 收到管理侧消息后:
- 解析 Payload(导出类型、时间范围等参数)。
- 调用:
ICalibrationExportService.ExportCalibrationDataAsync(...)生成 CSV。 - 调用:
IFileCompressionService.CompressFileAsync(...)将 CSV 压缩为 ZIP。
- Factory Side 订阅:
-
更新设备状态:文件就绪
- 导出 + 压缩完成后:
-
通过
Publish_State_Topic上报设备状态,例如:{ "deviceId": "Line1-Mixer01", "status": "ExportCompleted", "fileReady": true, "fileName": "calibration_export_20251226_073000.zip" }
-
- 导出 + 压缩完成后:
-
等待上传指令/确认
- 设备可以:
- 直接在导出完成后自主开始上传,
或 - 等待管理侧通过
Subscribe_State_Sync_UploadReady({device}/UploadReady)确认后再开始上传。
- 直接在导出完成后自主开始上传,
- 设备可以:
-
发送文件元数据
- 调用 Publish:
Publish_File_Metadata_Topic => $"{_currentDevice}/file/transfer/matedata"; - 告知管理侧:
- 文件名、大小、分片大小、分片总数、校验 MD5 等。
- 调用 Publish:
-
分片上传文件
- 按分片读取压缩文件内容;
- 对每个分片:
- Base64 编码;
- 通过
Publish_File_Chunk_Topic发布; - 可选:每 N 个分片更新一次
Publish_File_Progress_Topic。
-
上传进度上报
- 周期性或按比例触发:
- 调用
Publish_File_Progress_Topic,带上uploadedChunks、totalChunks、progress等字段。
- 调用
- 周期性或按比例触发:
-
上传完成通知
- 所有分片发送成功后:
- 调用
Publish_File_Complete_Topic发布最终完成消息; - 管理侧收到后可进行 MD5 校验、解压、入库等后续处理。
- 调用
- 同时更新
state:- 表明设备已恢复空闲或进入下一阶段状态。
- 所有分片发送成功后:
3.3 典型 Controller 伪代码结构(简化示例)
public class MqttCalibrationController
{
private readonly IMqttClient _mqttClient;
private readonly ICalibrationExportService _exportService;
private readonly IFileCompressionService _compressionService;
private readonly IDeviceControlService _deviceControlService;
private readonly string _currentDevice;
public MqttCalibrationController(
IMqttClient mqttClient,
ICalibrationExportService exportService,
IFileCompressionService compressionService,
IDeviceControlService deviceControlService,
string currentDevice)
{
_mqttClient = mqttClient;
_exportService = exportService;
_compressionService = compressionService;
_deviceControlService = deviceControlService;
_currentDevice = currentDevice;
}
public async Task InitializeAsync()
{
// 1. 连接到 Broker
// 2. 订阅所有需要的 Topic
await _mqttClient.SubscribeAsync($"{_currentDevice}/command/export");
await _mqttClient.SubscribeAsync($"{_currentDevice}/command/update");
await _mqttClient.SubscribeAsync($"{_currentDevice}/command/restart");
await _mqttClient.SubscribeAsync($"{_currentDevice}/UploadReady");
// ...其它订阅
_mqttClient.ApplicationMessageReceivedAsync += HandleMessageAsync;
}
private async Task HandleMessageAsync(MqttApplicationMessageReceivedEventArgs e)
{
var topic = e.ApplicationMessage.Topic;
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
if (topic == $"{_currentDevice}/command/export")
{
await HandleExportCommandAsync(payload);
}
else if (topic == $"{_currentDevice}/command/update")
{
await HandleUpdateCommandAsync(payload);
}
else if (topic == $"{_currentDevice}/command/restart")
{
await HandleRestartCommandAsync(payload);
}
else if (topic == $"{_currentDevice}/UploadReady")
{
await HandleUploadReadyAsync(payload);
}
// ...其他处理
}
private async Task HandleExportCommandAsync(string payload)
{
// 解析 payload 获取导出区间等参数
// 1. 导出 CSV
var csvPath = await _exportService.ExportCalibrationDataAsync(DateTime.MinValue, DateTime.MaxValue);
// 2. 压缩 ZIP
var zipPath = await _compressionService.CompressFileAsync(csvPath);
// 3. 更新 state:文件已就绪
await PublishStateAsync("ExportCompleted", zipPath);
// (可选)根据策略立刻开始上传,或等待 UploadReady
}
private Task PublishStateAsync(string status, string filePath = null)
{
// 序列化状态为 JSON,通过 Publish_State_Topic 发布
// ...
return Task.CompletedTask;
}
// 其它 HandleXXXAsync 方法...
}
如果你需要,我可以在下一步帮你:
- 进一步细化各 Topic 的 QoS 建议、Retain 使用建议;
- 设计完整的 payload JSON schema;
- 或写一个更接近可直接落地的 C# 实现(包括压缩、分片发送的具体代码框架)。