7.3 KiB
7.3 KiB
桌面端与后端实时交互、断线续连功能整理(yy-admin ↔ jeecg-boot)
1. 范围与目标
本文整理当前项目中“桌面端(yy-admin)与后端(jeecg-boot)”的实时交互链路,重点覆盖:
- 实时消息通道(STOMP/WebSocket)
- 用户变更同步(后端→桌面)
- 本地变更回传(桌面→后端)
- 断线检测、自动重连、离线补偿(Outbox)
2. 总体架构(当前实现)
2.1 后端 → 桌面(实时触发,REST 拉取落地)
- jeecg 用户变更时,后端广播到 STOMP 主题
/topic/sync/jeecg-users。 - 桌面端统一 STOMP 客户端订阅该主题,收到消息后发布本地事件
RemoteCommandReceivedEvent。 JeecgUserSyncCoordinator识别SCADA_USER_CHANGED/SCADA_USERS_CHANGED后,不直接写库,而是“入 Outbox”。- Outbox 消费器执行
TryBackgroundSyncJeecgUsersAsync,通过 SCADA 用户列表接口拉取并写入本地镜像表jeecg_sys_user。
这条链路是“消息触发 + REST 拉全量/增量”的组合,避免只靠实时包体做复杂幂等。
2.2 桌面 → 后端(本地变更回传)
- 桌面用户 CRUD(
SysUserService)成功后调用IUserSyncOutbox入队。 - Outbox 在线时实时发送、离线时持久化。
- 回传通过
/sys/sync/batch批量提交,后端按aggregateType/eventType路由处理,并写幂等日志表避免重复消费。
3. 关键代码位置
3.1 后端(jeecg-boot)
- STOMP 配置与心跳:
jeecg-module-device-sync/src/main/java/org/jeecg/modules/device/sync/config/WebSocketConfig.java - 设备消息与应用层 PING/PONG:
jeecg-module-device-sync/src/main/java/org/jeecg/modules/device/sync/controller/DeviceWebSocketController.java - 批量同步入口(桌面回传):
jeecg-module-device-sync/src/main/java/org/jeecg/modules/device/sync/controller/SyncController.java - 用户变更触发 STOMP 广播:
jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/controller/SysUserController.java
3.2 桌面端(yy-admin-master)
- STOMP 统一连接、心跳、看门狗重连、网络恢复重连:
YY.Admin/Infrastructure/Hubs/StompWebSocketService.cs - STOMP 信号识别并入镜像拉取 Outbox:
YY.Admin.Services/Service/Jeecg/JeecgUserSyncCoordinator.cs - 镜像拉取入队:
YY.Admin/Infrastructure/Sync/JeecgUserMirrorPullOutbox.cs - Outbox 核心(持久化、消费、失败重试、上线刷队):
YY.Admin/Infrastructure/Sync/OutboxProcessor.cs - 网络探活与状态事件:
YY.Admin/Infrastructure/Network/NetworkMonitor.cs - 本地用户变更入队接口与实现:
YY.Admin.Core/Core/Services/IUserSyncOutbox.cs
YY.Admin/Infrastructure/Sync/UserSyncOutbox.cs - 本地用户 CRUD 完成后回传入队:
YY.Admin.Services/Service/User/SysUserService.cs - 模块启动装配(网络监测 / Outbox / STOMP):
YY.Admin/Module/SyncModule.cs - 主窗口启动同步协调器:
YY.Admin/ViewModels/MainWindowViewModel.cs
4. 实时链路详细说明
4.1 STOMP 连接与订阅(桌面端)
StompWebSocketService 当前行为:
- 连接地址:由
JeecgIntegration:BaseUrl推导为ws(s)://<host>/ws/device(纯 WebSocket)。 - CONNECT 心跳声明:
heart-beat:10000,10000。 - 默认订阅:
/topic/sync/jeecg-users(用户变更信号)/topic/device/{deviceId}/pong(应用层保活回应)
- 认证模式额外订阅:
/user/{deviceId}/queue/command
4.2 后端广播(jeecg 用户变更)
SysUserController 在新增/编辑/删除/状态变更等场景调用 notifyScadaUserChanged(...),并通过 SimpMessagingTemplate 向 /topic/sync/jeecg-users 广播 JSON 负载,关键 cmd 为:
SCADA_USER_CHANGED
4.3 桌面端信号消费策略
JeecgUserSyncCoordinator 不直接依赖推送包体做写库,而是:
- 解析收到的 JSON(支持 message/commandJson 嵌套)
- 判断
cmd是否为SCADA_USER_CHANGED/SCADA_USERS_CHANGED - 命中后入队
JeecgUserMirror类型 Outbox - 由 Outbox 消费阶段调用
TryBackgroundSyncJeecgUsersAsync做实际拉取与落库
该策略提升了一致性与容错性(尤其在断线/消息乱序时)。
5. 断线检测与自动续连
5.1 连接失败重试
ConnectUnifiedDeviceChannelAsync 内置重试退避(秒):0, 2, 5, 10, 30。
5.2 协议层心跳 + 应用层 PING
- 协议层:每 10s 发送 STOMP
\n心跳帧。 - 应用层:每 10s 发送
{"cmd":"PING_DEVICE"...}到/app/device/ping。 - 后端
handlePing回复到/topic/device/{deviceId}/pong。
5.3 看门狗“假在线”检测
- 客户端记录
_lastReceivedTick。 - 若 30s 内没有任何帧(含心跳或 MESSAGE)则判定连接僵死,主动重连。
5.4 网络恢复触发重连
NetworkMonitor每 10s 探活(优先 SCADA 免登录接口,失败降级 health)。- 状态变更发布
NetworkStatusChangedEvent。 - STOMP 服务在
isOnline=true且 socket 非 Open 时主动重连。
6. 离线补偿与幂等
6.1 Outbox 持久化
OutboxProcessor.EnqueueAsync 先落 SQLite Outbox(OutboxMessage),再按在线状态决定是否立即入内存通道消费。
6.2 消费与失败重试
- 消费失败进入
MarkFailedAsync:RetryCount + 1- 指数退避
2^n秒(上限按实现控制) - 最多 5 次,超过标记
Status=2
- 在线恢复时
FlushPendingAsync会补刷所有待发送消息。
6.3 双通道消费分流
OutboxProcessor 对消息分两类处理:
JeecgUserMirror:调用_mirrorPullHandler.ExecutePullAsync()(即拉取镜像)。- 其他业务消息(如
SYS_USER):走_httpSyncClient.SendBatchAsync()发/sys/sync/batch。
6.4 后端幂等
SyncController.batch 通过 messageId 检查幂等日志(sync_idempotent_log)避免重复消费;未处理过才路由执行。
7. 配置项(当前关键项)
配置文件:YY.Admin.Services/Configuration/appsettings.json,JeecgIntegration 节点。
建议重点关注:
EnabledBaseUrlAnonymousModeUserListPath(当前是/sys/user/scada/queryUser)SyncAllUsersOnJeecgLoginBackgroundSyncIntervalMinutes
实时通道默认使用
/ws/device统一 STOMP 端点;WebSocketPath仍保留在配置中,但当前同步主链路已切到统一设备通道实现。
8. 启动时序(简版)
SyncModule.OnInitialized启动:NetworkMonitor.StartAsyncOutboxProcessor.StartConsumerAsyncISignalRService.ConnectUnifiedDeviceChannelAsync
- 主窗口
MainWindowViewModel构造中调用JeecgUserSyncCoordinator.Start():- 订阅
RemoteCommandReceivedEvent - 延迟 3 秒入队一次
EventBoot拉取
- 订阅
- 后续由“实时消息触发 + 定时/后台同步 + Outbox 补偿”共同维持一致性。
9. 现状结论
- 当前实现已形成“统一 STOMP 通道 + 心跳 + 看门狗 + 网络恢复重连 + Outbox 补偿 + 后端幂等”的完整闭环。
- 用户同步策略是“实时信号触发拉取”而非“直接信号落库”,在工业网络抖动场景更稳健。
- 本地 CRUD 回传与后端推送下行均已纳入 Outbox/幂等体系,具备断线续连后的最终一致性基础。