Nico
Published on 2025-12-26 / 1 Visits
0
0

MqttClient 架构与接口文档

MqttClient 架构与接口文档

(Calibration Service – Factory Side, No Local)


1. MQTT 通信原理说明

1.1 MQTT 通信模型与机制

MQTT(Message Queuing Telemetry Transport)是一种基于 发布/订阅(Pub/Sub) 模式的轻量级消息传输协议,广泛应用于物联网场景,尤其适用于:

  • 低带宽
  • 高延迟
  • 不稳定网络环境

在本系统中,客户端 A、B、C 都 既可以作为发布者(Publisher),也可以作为订阅者(Subscriber),角色由其所使用的 Topic 和业务逻辑决定。

1.1.1 客户端-服务器模型

  • Broker(消息代理服务器)
    • 所有 MQTT 客户端均与 Broker 建立连接。
    • 负责维护客户端连接状态、订阅关系,并进行消息路由与分发。
  • Publisher(发布者)
    • 将消息发布到某个主题(Topic)。
  • Subscriber(订阅者)
    • 订阅一个或多个主题,从 Broker 接收对应消息。

在本架构中:

  • Factory Side 的 MqttClient 既会:
    • 订阅管理侧下发的控制指令(如 /command/update/command/export 等),
    • 又会发布设备状态、文件传输相关消息(如 /state/file/transfer/* 等)。

1.1.2 工作流程概述

  1. 连接建立
    • 客户端(Factory Side / 管理侧)通过 TCP/IP 或 SSL/TLS 与 Broker 建立连接。
  2. 订阅主题
    • 客户端向 Broker 发送 SUBSCRIBE 报文,订阅对应 Topic(如:{device}/command/export)。
  3. 发布消息
    • 客户端向 Broker 发布 PUBLISH 报文到某 Topic(如:{device}/file/transfer/chunk)。
  4. 消息分发
    • Broker 将收到的消息转发给 所有订阅了该 Topic 的客户端
  5. 断线与重连
    • 利用 MQTT 的持久会话及遗言(Last Will)机制实现断线检测和状态同步。

1.1.3 协议特性(与本项目相关)

  • 低开销
    • MQTT 报文头部非常小(最小仅 2 字节),适合频繁状态上报及小文件分片传输。
  • QoS 支持(消息质量等级)
    • QoS 0(最多一次)
      • 不保证送达,不做重发,适合可丢失的状态广播。
    • QoS 1(至少一次)
      • 保证消息至少送达一次,可能出现重复,适合一般业务指令和文件分片传输。
    • QoS 2(恰好一次)
      • 保证消息到达且不重复,开销最大,通常用在对重复极度敏感的场景。
  • 持久会话
    • 客户端重连后可以恢复之前的订阅。
  • 遗言(Last Will)
    • 客户端异常断开时 Broker 可代为发布一条预先配置好的消息,用于通知设备离线、任务中断等。

1.2 Topic(主题)说明

MQTT 的 Topic 是消息的 路由标识符,采用类似文件路径的分层结构,例如:

{device}/state
{device}/command/export
{device}/file/transfer/chunk
  • device 常为设备 ID 或设备唯一标识 _currentDevice
  • / 为层级分隔符,用于构建清晰的业务结构:

典型分层设计:

  • /{device}/state:设备状态相关
  • /{device}/command/...:控制指令(更新、导出、重启等)
  • /{device}/file/transfer/...:文件传输业务(元数据、分片、进度、完成等)
  • /{device}/sync/{device}/UploadReady:状态同步和上传准备相关

2. Calibration Service – MqttClient 主题设计(Factory Side,无本地界面)

本章节描述 工厂侧(Factory Side)Calibration Service 中的 MqttClient 主题约定
其中 _currentDevice 表示当前设备的标识,如 Line1-Mixer01 等。

private readonly string _currentDevice;

所有 Topic 均以 _currentDevice 为前缀。


2.1 发布主题(Publish Topics – Factory Side)

Factory Side MqttClient 对外发布的主题如下:

public string Publish_State_Topic              => $"{_currentDevice}/state";
public string Publish_File_Metadata_Topic      => $"{_currentDevice}/file/transfer/matedata";
public string Publish_File_Chunk_Topic         => $"{_currentDevice}/file/transfer/chunk";
public string Publish_File_Progress_Topic      => $"{_currentDevice}/file/transfer/progress";
public string Publish_File_Complete_Topic      => $"{_currentDevice}/file/transfer/complete";

2.1.1 设备状态发布 – Publish_State_Topic

  • Topic{device}/state
  • 方向:Factory Side → 管理侧
  • 用途
    • 上报设备当前状态,例如:
      • 空闲 / 正在导出 / 正在上传 / 出错
      • 文件是否就绪(导出完成,等待上传)
  • 建议 Payload 示例(JSON)
{
  "deviceId": "Line1-Mixer01",
  "timestamp": "2025-12-26T07:30:00Z",
  "status": "ExportCompleted",
  "fileReady": true,
  "remark": "Export and compression finished"
}

2.1.2 文件元数据发布 – Publish_File_Metadata_Topic

  • Topic{device}/file/transfer/matedata

  • 方向:Factory Side → 管理侧

  • 用途

    • 在实际文件上传前,告知管理侧即将上传的文件信息(元数据)。
  • 典型内容

    • 文件名(含扩展名,如 .zip
    • 文件总大小(字节数)
    • 分片大小(每块字节数)
    • 预计分片总数
    • 文件校验信息(如 MD5)
  • 建议 Payload 示例(JSON)

{
  "deviceId": "Line1-Mixer01",
  "fileName": "calibration_export_20251226_073000.zip",
  "fileSize": 1048576,
  "chunkSize": 4096,
  "totalChunks": 256,
  "md5": "a1b2c3d4e5f60123456789abcdef0000",
  "timestamp": "2025-12-26T07:31:00Z"
}

注:键名如有既定规范,请统一定义并遵守。


2.1.3 文件分片数据发布 – Publish_File_Chunk_Topic

  • Topic{device}/file/transfer/chunk

  • 方向:Factory Side → 管理侧

  • 用途

    • 以分片形式传输压缩后的导出文件数据。
  • 典型内容

    • 当前分片序号(从 0 或 1 开始)
    • 总分片数
    • 当前分片的二进制内容(Base64 编码)
  • 建议 Payload 示例(JSON + Base64)

{
  "deviceId": "Line1-Mixer01",
  "fileName": "calibration_export_20251226_073000.zip",
  "chunkIndex": 1,
  "totalChunks": 256,
  "data": "BASE64_ENCODED_BYTES",
  "timestamp": "2025-12-26T07:31:10Z"
}

2.1.4 文件传输进度发布 – Publish_File_Progress_Topic

  • Topic{device}/file/transfer/progress

  • 方向:Factory Side → 管理侧

  • 用途

    • 上报文件上传进度,便于管理侧展示进度或做超时监控。
  • 典型内容

    • 已上传分片数 / 总分片数
    • 百分比
    • 当前状态(传输中 / 成功 / 失败)
  • 建议 Payload 示例(JSON)

{
  "deviceId": "Line1-Mixer01",
  "fileName": "calibration_export_20251226_073000.zip",
  "uploadedChunks": 128,
  "totalChunks": 256,
  "progress": 50,
  "status": "Transferring",
  "timestamp": "2025-12-26T07:32:00Z"
}

2.1.5 文件传输完成通知 – Publish_File_Complete_Topic

  • Topic{device}/file/transfer/complete

  • 方向:Factory Side → 管理侧

  • 用途

    • 文件全部分片上传完成后发送该消息,作为管理侧进行校验及后续处理的触发点。
  • 典型内容

    • 文件名
    • 总分片数
    • 最终校验值(校验完整性)
    • 结果状态
  • 建议 Payload 示例(JSON)

{
  "deviceId": "Line1-Mixer01",
  "fileName": "calibration_export_20251226_073000.zip",
  "totalChunks": 256,
  "md5": "a1b2c3d4e5f60123456789abcdef0000",
  "status": "Completed",
  "timestamp": "2025-12-26T07:33:30Z"
}

2.2 订阅主题(Subscribe Topics – Factory Side)

Factory Side MqttClient 从以下主题接收管理侧控制指令及同步信息:

public string Subscribe_Update_Recipe_Topic            => $"{_currentDevice}/command/update";
public string Subscribe_Restart_Ctl_Topic              => $"{_currentDevice}/command/restart";
public string Subscribe_Export_Topic                   => $"{_currentDevice}/command/export";
public string Subscribe_State_Sync_Topic               => $"{_currentDevice}/sync";
public string Subscribe_State_Sync_UploadReady         => $"{_currentDevice}/UploadReady";
public string Subscribe_File_Recieve_MateData_Topic    => $"{_currentDevice}/file/transfer/matedata";
public string Subscribe_File_Recieve_Chunk_Topic       => $"{_currentDevice}/file/transfer/chunk";
public string Subscribe_File_Recieve_Progress_Topic    => $"{_currentDevice}/file/transfer/progress";
public string Subscribe_File_Recieve_Complete_Topic    => $"{_currentDevice}/file/transfer/complete";

说明:你原文中 Subscribe_File_Recieve_Complete_Topic 尚未补全,这里按发布 Topic 对称,定义为
"{_currentDevice}/file/transfer/complete"

2.2.1 配方更新指令 – Subscribe_Update_Recipe_Topic

  • Topic{device}/command/update
  • 方向:管理侧 → Factory Side
  • 用途
    • 通知设备执行 配方更新 操作(可能伴随文件下行或参数同步)。
  • 建议 Payload 示例
{
  "deviceId": "Line1-Mixer01",
  "command": "UpdateRecipe",
  "recipeVersion": "v1.2.3",
  "source": "CentralServer",
  "timestamp": "2025-12-26T07:20:00Z"
}

接收到该指令后,Controller 内部应调用对应的 “更新配方” 接口方法。


2.2.2 设备重启控制指令 – Subscribe_Restart_Ctl_Topic

  • Topic{device}/command/restart
  • 方向:管理侧 → Factory Side
  • 用途
    • 远程控制设备或控制服务重启(需严格权限与安全控制)。
  • 建议 Payload 示例
{
  "deviceId": "Line1-Mixer01",
  "command": "Restart",
  "reason": "ApplyNewConfig",
  "delaySeconds": 10,
  "timestamp": "2025-12-26T07:21:00Z"
}

2.2.3 导出指令 – Subscribe_Export_Topic

  • Topic{device}/command/export
  • 方向:管理侧 → Factory Side
  • 用途
    • 触发设备执行 导出配方/校准数据 操作,并在本地生成 CSV 文件,随后进行压缩。
  • 建议 Payload 示例
{
  "deviceId": "Line1-Mixer01",
  "command": "Export",
  "exportType": "CalibrationData",
  "dateFrom": "2025-12-01",
  "dateTo": "2025-12-26",
  "timestamp": "2025-12-26T07:22:00Z"
}

收到此指令后,将触发 Controller 中 “导出” 接口方法,下文在 3.0 中详细说明流程。


2.2.4 状态同步 – Subscribe_State_Sync_Topic

  • Topic{device}/sync
  • 方向:管理侧 → Factory Side
  • 用途
    • 用于状态同步、心跳或服务启动时的初始同步指令。

示例用途:

  • 管理侧下发 “请上报当前状态” 命令;
  • 校正设备与管理端的状态机(防止长时间脱离同步)。

2.2.5 上传准备状态 – Subscribe_State_Sync_UploadReady

  • Topic{device}/UploadReady
  • 方向:管理侧 → Factory Side
  • 用途
    • 通知 Factory Side “管理侧已经准备好接收文件上传”,或者确认设备文件已就绪可进行上传。
  • 典型业务含义
    • 在导出完成后,设备可能先通过 state 发布“文件已就绪”,
      管理端准备完成上传通道后,通过此 Topic 告知设备可以开始进行文件传输。

2.2.6 文件接收 – 元数据/分片/进度/完成

以下主题用于 Factory Side 作为接收方(Subscriber)时的文件接收场景,例如未来支持从管理侧下发文件(下行):

  • 元数据Subscribe_File_Recieve_MateData_Topic
    "{device}/file/transfer/matedata"
  • 文件分片Subscribe_File_Recieve_Chunk_Topic
    "{device}/file/transfer/chunk"
  • 传输进度Subscribe_File_Recieve_Progress_Topic
    "{device}/file/transfer/progress"
  • 传输完成Subscribe_File_Recieve_Complete_Topic
    "{device}/file/transfer/complete"

Payload 结构可参照 2.1 中的发布格式,只是方向相反(管理侧 → Factory Side)。


3. 整体流程说明(Factory Side Controller 设计)

本节描述如何通过一个独立的 Controller 管理 Factory Side MqttClient,并将已有 MYRG 项目中的逻辑(导出配方、更新配方、重启控制器等)封装成接口方法,在 MQTT 消息驱动下调用。

3.1 架构概述

  1. 独立 MqttController

    • 负责:
      • MQTT 客户端初始化、连接管理;
      • 订阅所有 Factory Side 相关主题;
      • 收到消息后进行解析并分发给内部业务接口。
  2. 业务接口(Service 层)
    可以将 MYRG 中原有逻辑抽象为服务接口,例如:

    public interface ICalibrationExportService
    {
        Task<string> ExportCalibrationDataAsync(DateTime from, DateTime to);
        // 返回导出后 CSV 文件路径
    }
    
    public interface IFileCompressionService
    {
        Task<string> CompressFileAsync(string csvFilePath);
        // 返回压缩后的 zip 文件路径
    }
    
    public interface IDeviceControlService
    {
        Task RestartAsync(int delaySeconds);
        Task UpdateRecipeAsync(string recipeVersion);
    }
    
  3. MQTT 消息 → 业务方法调用

    • 例如,当收到 {device}/command/export 消息时,解析时间范围、导出类型等参数,然后调用 ExportCalibrationDataAsyncCompressFileAsync,再走上传流程。

3.2 command/export 导出 & 上传流程示例

以你提出的核心流程为例,步骤如下:

  1. 接收导出指令

    • Factory Side 订阅:
      Subscribe_Export_Topic => $"{_currentDevice}/command/export";
      
    • 收到管理侧消息后:
      • 解析 Payload(导出类型、时间范围等参数)。
      • 调用:ICalibrationExportService.ExportCalibrationDataAsync(...) 生成 CSV。
      • 调用:IFileCompressionService.CompressFileAsync(...) 将 CSV 压缩为 ZIP。
  2. 更新设备状态:文件就绪

    • 导出 + 压缩完成后:
      • 通过 Publish_State_Topic 上报设备状态,例如:

        {
          "deviceId": "Line1-Mixer01",
          "status": "ExportCompleted",
          "fileReady": true,
          "fileName": "calibration_export_20251226_073000.zip"
        }
        
  3. 等待上传指令/确认

    • 设备可以:
      • 直接在导出完成后自主开始上传,
      • 等待管理侧通过 Subscribe_State_Sync_UploadReady{device}/UploadReady)确认后再开始上传。
  4. 发送文件元数据

    • 调用 Publish:
      Publish_File_Metadata_Topic => $"{_currentDevice}/file/transfer/matedata";
      
    • 告知管理侧:
      • 文件名、大小、分片大小、分片总数、校验 MD5 等。
  5. 分片上传文件

    • 按分片读取压缩文件内容;
    • 对每个分片:
      • Base64 编码;
      • 通过 Publish_File_Chunk_Topic 发布;
      • 可选:每 N 个分片更新一次 Publish_File_Progress_Topic
  6. 上传进度上报

    • 周期性或按比例触发:
      • 调用 Publish_File_Progress_Topic,带上 uploadedChunkstotalChunksprogress 等字段。
  7. 上传完成通知

    • 所有分片发送成功后:
      • 调用 Publish_File_Complete_Topic 发布最终完成消息;
      • 管理侧收到后可进行 MD5 校验、解压、入库等后续处理。
    • 同时更新 state
      • 表明设备已恢复空闲或进入下一阶段状态。

3.3 典型 Controller 伪代码结构(简化示例)

public class MqttCalibrationController
{
    private readonly IMqttClient _mqttClient;
    private readonly ICalibrationExportService _exportService;
    private readonly IFileCompressionService _compressionService;
    private readonly IDeviceControlService _deviceControlService;
    private readonly string _currentDevice;

    public MqttCalibrationController(
        IMqttClient mqttClient,
        ICalibrationExportService exportService,
        IFileCompressionService compressionService,
        IDeviceControlService deviceControlService,
        string currentDevice)
    {
        _mqttClient = mqttClient;
        _exportService = exportService;
        _compressionService = compressionService;
        _deviceControlService = deviceControlService;
        _currentDevice = currentDevice;
    }

    public async Task InitializeAsync()
    {
        // 1. 连接到 Broker
        // 2. 订阅所有需要的 Topic
        await _mqttClient.SubscribeAsync($"{_currentDevice}/command/export");
        await _mqttClient.SubscribeAsync($"{_currentDevice}/command/update");
        await _mqttClient.SubscribeAsync($"{_currentDevice}/command/restart");
        await _mqttClient.SubscribeAsync($"{_currentDevice}/UploadReady");
        // ...其它订阅

        _mqttClient.ApplicationMessageReceivedAsync += HandleMessageAsync;
    }

    private async Task HandleMessageAsync(MqttApplicationMessageReceivedEventArgs e)
    {
        var topic = e.ApplicationMessage.Topic;
        var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);

        if (topic == $"{_currentDevice}/command/export")
        {
            await HandleExportCommandAsync(payload);
        }
        else if (topic == $"{_currentDevice}/command/update")
        {
            await HandleUpdateCommandAsync(payload);
        }
        else if (topic == $"{_currentDevice}/command/restart")
        {
            await HandleRestartCommandAsync(payload);
        }
        else if (topic == $"{_currentDevice}/UploadReady")
        {
            await HandleUploadReadyAsync(payload);
        }
        // ...其他处理
    }

    private async Task HandleExportCommandAsync(string payload)
    {
        // 解析 payload 获取导出区间等参数
        // 1. 导出 CSV
        var csvPath = await _exportService.ExportCalibrationDataAsync(DateTime.MinValue, DateTime.MaxValue);

        // 2. 压缩 ZIP
        var zipPath = await _compressionService.CompressFileAsync(csvPath);

        // 3. 更新 state:文件已就绪
        await PublishStateAsync("ExportCompleted", zipPath);

        // (可选)根据策略立刻开始上传,或等待 UploadReady
    }

    private Task PublishStateAsync(string status, string filePath = null)
    {
        // 序列化状态为 JSON,通过 Publish_State_Topic 发布
        // ...
        return Task.CompletedTask;
    }

    // 其它 HandleXXXAsync 方法...
}

如果你需要,我可以在下一步帮你:

  • 进一步细化各 Topic 的 QoS 建议Retain 使用建议
  • 设计完整的 payload JSON schema
  • 或写一个更接近可直接落地的 C# 实现(包括压缩、分片发送的具体代码框架)。

Comment