Files
qhmes/yy-admin-master/doc/桌面端与后端实时交互及断线续连整理.md

7.3 KiB
Raw Blame History

桌面端与后端实时交互、断线续连功能整理yy-admin ↔ jeecg-boot

1. 范围与目标

本文整理当前项目中“桌面端yy-admin与后端jeecg-boot”的实时交互链路重点覆盖

  • 实时消息通道STOMP/WebSocket
  • 用户变更同步(后端→桌面)
  • 本地变更回传(桌面→后端)
  • 断线检测、自动重连、离线补偿Outbox

2. 总体架构(当前实现)

2.1 后端 → 桌面实时触发REST 拉取落地)

  1. jeecg 用户变更时,后端广播到 STOMP 主题 /topic/sync/jeecg-users
  2. 桌面端统一 STOMP 客户端订阅该主题,收到消息后发布本地事件 RemoteCommandReceivedEvent
  3. JeecgUserSyncCoordinator 识别 SCADA_USER_CHANGED/SCADA_USERS_CHANGED 后,不直接写库,而是“入 Outbox”。
  4. Outbox 消费器执行 TryBackgroundSyncJeecgUsersAsync,通过 SCADA 用户列表接口拉取并写入本地镜像表 jeecg_sys_user

这条链路是“消息触发 + REST 拉全量/增量”的组合,避免只靠实时包体做复杂幂等。

2.2 桌面 → 后端(本地变更回传)

  1. 桌面用户 CRUDSysUserService)成功后调用 IUserSyncOutbox 入队。
  2. Outbox 在线时实时发送、离线时持久化。
  3. 回传通过 /sys/sync/batch 批量提交,后端按 aggregateType/eventType 路由处理,并写幂等日志表避免重复消费。

3. 关键代码位置

3.1 后端jeecg-boot

  • STOMP 配置与心跳:
    jeecg-module-device-sync/src/main/java/org/jeecg/modules/device/sync/config/WebSocketConfig.java
  • 设备消息与应用层 PING/PONG
    jeecg-module-device-sync/src/main/java/org/jeecg/modules/device/sync/controller/DeviceWebSocketController.java
  • 批量同步入口(桌面回传):
    jeecg-module-device-sync/src/main/java/org/jeecg/modules/device/sync/controller/SyncController.java
  • 用户变更触发 STOMP 广播:
    jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/controller/SysUserController.java

3.2 桌面端yy-admin-master

  • STOMP 统一连接、心跳、看门狗重连、网络恢复重连:
    YY.Admin/Infrastructure/Hubs/StompWebSocketService.cs
  • STOMP 信号识别并入镜像拉取 Outbox
    YY.Admin.Services/Service/Jeecg/JeecgUserSyncCoordinator.cs
  • 镜像拉取入队:
    YY.Admin/Infrastructure/Sync/JeecgUserMirrorPullOutbox.cs
  • Outbox 核心(持久化、消费、失败重试、上线刷队):
    YY.Admin/Infrastructure/Sync/OutboxProcessor.cs
  • 网络探活与状态事件:
    YY.Admin/Infrastructure/Network/NetworkMonitor.cs
  • 本地用户变更入队接口与实现:
    YY.Admin.Core/Core/Services/IUserSyncOutbox.cs
    YY.Admin/Infrastructure/Sync/UserSyncOutbox.cs
  • 本地用户 CRUD 完成后回传入队:
    YY.Admin.Services/Service/User/SysUserService.cs
  • 模块启动装配(网络监测 / Outbox / STOMP
    YY.Admin/Module/SyncModule.cs
  • 主窗口启动同步协调器:
    YY.Admin/ViewModels/MainWindowViewModel.cs

4. 实时链路详细说明

4.1 STOMP 连接与订阅(桌面端)

StompWebSocketService 当前行为:

  • 连接地址:由 JeecgIntegration:BaseUrl 推导为 ws(s)://<host>/ws/device(纯 WebSocket
  • CONNECT 心跳声明:heart-beat:10000,10000
  • 默认订阅:
    • /topic/sync/jeecg-users(用户变更信号)
    • /topic/device/{deviceId}/pong(应用层保活回应)
  • 认证模式额外订阅:
    • /user/{deviceId}/queue/command

4.2 后端广播jeecg 用户变更)

SysUserController 在新增/编辑/删除/状态变更等场景调用 notifyScadaUserChanged(...),并通过 SimpMessagingTemplate/topic/sync/jeecg-users 广播 JSON 负载,关键 cmd 为:

  • SCADA_USER_CHANGED

4.3 桌面端信号消费策略

JeecgUserSyncCoordinator 不直接依赖推送包体做写库,而是:

  • 解析收到的 JSON支持 message/commandJson 嵌套)
  • 判断 cmd 是否为 SCADA_USER_CHANGED / SCADA_USERS_CHANGED
  • 命中后入队 JeecgUserMirror 类型 Outbox
  • 由 Outbox 消费阶段调用 TryBackgroundSyncJeecgUsersAsync 做实际拉取与落库

该策略提升了一致性与容错性(尤其在断线/消息乱序时)。


5. 断线检测与自动续连

5.1 连接失败重试

ConnectUnifiedDeviceChannelAsync 内置重试退避(秒):0, 2, 5, 10, 30

5.2 协议层心跳 + 应用层 PING

  • 协议层:每 10s 发送 STOMP \n 心跳帧。
  • 应用层:每 10s 发送 {"cmd":"PING_DEVICE"...}/app/device/ping
  • 后端 handlePing 回复到 /topic/device/{deviceId}/pong

5.3 看门狗“假在线”检测

  • 客户端记录 _lastReceivedTick
  • 若 30s 内没有任何帧(含心跳或 MESSAGE则判定连接僵死主动重连。

5.4 网络恢复触发重连

  • NetworkMonitor 每 10s 探活(优先 SCADA 免登录接口,失败降级 health
  • 状态变更发布 NetworkStatusChangedEvent
  • STOMP 服务在 isOnline=true 且 socket 非 Open 时主动重连。

6. 离线补偿与幂等

6.1 Outbox 持久化

OutboxProcessor.EnqueueAsync 先落 SQLite OutboxOutboxMessage),再按在线状态决定是否立即入内存通道消费。

6.2 消费与失败重试

  • 消费失败进入 MarkFailedAsync
    • RetryCount + 1
    • 指数退避 2^n 秒(上限按实现控制)
    • 最多 5 次,超过标记 Status=2
  • 在线恢复时 FlushPendingAsync 会补刷所有待发送消息。

6.3 双通道消费分流

OutboxProcessor 对消息分两类处理:

  1. JeecgUserMirror:调用 _mirrorPullHandler.ExecutePullAsync()(即拉取镜像)。
  2. 其他业务消息(如 SYS_USER):走 _httpSyncClient.SendBatchAsync()/sys/sync/batch

6.4 后端幂等

SyncController.batch 通过 messageId 检查幂等日志(sync_idempotent_log)避免重复消费;未处理过才路由执行。


7. 配置项(当前关键项)

配置文件:YY.Admin.Services/Configuration/appsettings.jsonJeecgIntegration 节点。

建议重点关注:

  • Enabled
  • BaseUrl
  • AnonymousMode
  • UserListPath(当前是 /sys/user/scada/queryUser
  • SyncAllUsersOnJeecgLogin
  • BackgroundSyncIntervalMinutes

实时通道默认使用 /ws/device 统一 STOMP 端点;WebSocketPath 仍保留在配置中,但当前同步主链路已切到统一设备通道实现。


8. 启动时序(简版)

  1. SyncModule.OnInitialized 启动:
    • NetworkMonitor.StartAsync
    • OutboxProcessor.StartConsumerAsync
    • ISignalRService.ConnectUnifiedDeviceChannelAsync
  2. 主窗口 MainWindowViewModel 构造中调用 JeecgUserSyncCoordinator.Start()
    • 订阅 RemoteCommandReceivedEvent
    • 延迟 3 秒入队一次 EventBoot 拉取
  3. 后续由“实时消息触发 + 定时/后台同步 + Outbox 补偿”共同维持一致性。

9. 现状结论

  1. 当前实现已形成“统一 STOMP 通道 + 心跳 + 看门狗 + 网络恢复重连 + Outbox 补偿 + 后端幂等”的完整闭环。
  2. 用户同步策略是“实时信号触发拉取”而非“直接信号落库”,在工业网络抖动场景更稳健。
  3. 本地 CRUD 回传与后端推送下行均已纳入 Outbox/幂等体系,具备断线续连后的最终一致性基础。