187 lines
7.3 KiB
Markdown
187 lines
7.3 KiB
Markdown
|
|
# 桌面端与后端实时交互、断线续连功能整理(yy-admin ↔ jeecg-boot)
|
|||
|
|
|
|||
|
|
## 1. 范围与目标
|
|||
|
|
|
|||
|
|
本文整理当前项目中“桌面端(yy-admin)与后端(jeecg-boot)”的实时交互链路,重点覆盖:
|
|||
|
|
- 实时消息通道(STOMP/WebSocket)
|
|||
|
|
- 用户变更同步(后端→桌面)
|
|||
|
|
- 本地变更回传(桌面→后端)
|
|||
|
|
- 断线检测、自动重连、离线补偿(Outbox)
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 2. 总体架构(当前实现)
|
|||
|
|
|
|||
|
|
### 2.1 后端 → 桌面(实时触发,REST 拉取落地)
|
|||
|
|
|
|||
|
|
1) jeecg 用户变更时,后端广播到 STOMP 主题 `/topic/sync/jeecg-users`。
|
|||
|
|
2) 桌面端统一 STOMP 客户端订阅该主题,收到消息后发布本地事件 `RemoteCommandReceivedEvent`。
|
|||
|
|
3) `JeecgUserSyncCoordinator` 识别 `SCADA_USER_CHANGED/SCADA_USERS_CHANGED` 后,不直接写库,而是“入 Outbox”。
|
|||
|
|
4) Outbox 消费器执行 `TryBackgroundSyncJeecgUsersAsync`,通过 SCADA 用户列表接口拉取并写入本地镜像表 `jeecg_sys_user`。
|
|||
|
|
|
|||
|
|
> 这条链路是“消息触发 + REST 拉全量/增量”的组合,避免只靠实时包体做复杂幂等。
|
|||
|
|
|
|||
|
|
### 2.2 桌面 → 后端(本地变更回传)
|
|||
|
|
|
|||
|
|
1) 桌面用户 CRUD(`SysUserService`)成功后调用 `IUserSyncOutbox` 入队。
|
|||
|
|
2) Outbox 在线时实时发送、离线时持久化。
|
|||
|
|
3) 回传通过 `/sys/sync/batch` 批量提交,后端按 `aggregateType/eventType` 路由处理,并写幂等日志表避免重复消费。
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 3. 关键代码位置
|
|||
|
|
|
|||
|
|
## 3.1 后端(jeecg-boot)
|
|||
|
|
|
|||
|
|
- STOMP 配置与心跳:
|
|||
|
|
`jeecg-module-device-sync/src/main/java/org/jeecg/modules/device/sync/config/WebSocketConfig.java`
|
|||
|
|
- 设备消息与应用层 PING/PONG:
|
|||
|
|
`jeecg-module-device-sync/src/main/java/org/jeecg/modules/device/sync/controller/DeviceWebSocketController.java`
|
|||
|
|
- 批量同步入口(桌面回传):
|
|||
|
|
`jeecg-module-device-sync/src/main/java/org/jeecg/modules/device/sync/controller/SyncController.java`
|
|||
|
|
- 用户变更触发 STOMP 广播:
|
|||
|
|
`jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/controller/SysUserController.java`
|
|||
|
|
|
|||
|
|
### 3.2 桌面端(yy-admin-master)
|
|||
|
|
|
|||
|
|
- STOMP 统一连接、心跳、看门狗重连、网络恢复重连:
|
|||
|
|
`YY.Admin/Infrastructure/Hubs/StompWebSocketService.cs`
|
|||
|
|
- STOMP 信号识别并入镜像拉取 Outbox:
|
|||
|
|
`YY.Admin.Services/Service/Jeecg/JeecgUserSyncCoordinator.cs`
|
|||
|
|
- 镜像拉取入队:
|
|||
|
|
`YY.Admin/Infrastructure/Sync/JeecgUserMirrorPullOutbox.cs`
|
|||
|
|
- Outbox 核心(持久化、消费、失败重试、上线刷队):
|
|||
|
|
`YY.Admin/Infrastructure/Sync/OutboxProcessor.cs`
|
|||
|
|
- 网络探活与状态事件:
|
|||
|
|
`YY.Admin/Infrastructure/Network/NetworkMonitor.cs`
|
|||
|
|
- 本地用户变更入队接口与实现:
|
|||
|
|
`YY.Admin.Core/Core/Services/IUserSyncOutbox.cs`
|
|||
|
|
`YY.Admin/Infrastructure/Sync/UserSyncOutbox.cs`
|
|||
|
|
- 本地用户 CRUD 完成后回传入队:
|
|||
|
|
`YY.Admin.Services/Service/User/SysUserService.cs`
|
|||
|
|
- 模块启动装配(网络监测 / Outbox / STOMP):
|
|||
|
|
`YY.Admin/Module/SyncModule.cs`
|
|||
|
|
- 主窗口启动同步协调器:
|
|||
|
|
`YY.Admin/ViewModels/MainWindowViewModel.cs`
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 4. 实时链路详细说明
|
|||
|
|
|
|||
|
|
### 4.1 STOMP 连接与订阅(桌面端)
|
|||
|
|
|
|||
|
|
`StompWebSocketService` 当前行为:
|
|||
|
|
|
|||
|
|
- 连接地址:由 `JeecgIntegration:BaseUrl` 推导为 `ws(s)://<host>/ws/device`(纯 WebSocket)。
|
|||
|
|
- CONNECT 心跳声明:`heart-beat:10000,10000`。
|
|||
|
|
- 默认订阅:
|
|||
|
|
- `/topic/sync/jeecg-users`(用户变更信号)
|
|||
|
|
- `/topic/device/{deviceId}/pong`(应用层保活回应)
|
|||
|
|
- 认证模式额外订阅:
|
|||
|
|
- `/user/{deviceId}/queue/command`
|
|||
|
|
|
|||
|
|
### 4.2 后端广播(jeecg 用户变更)
|
|||
|
|
|
|||
|
|
`SysUserController` 在新增/编辑/删除/状态变更等场景调用 `notifyScadaUserChanged(...)`,并通过 `SimpMessagingTemplate` 向 `/topic/sync/jeecg-users` 广播 JSON 负载,关键 cmd 为:
|
|||
|
|
- `SCADA_USER_CHANGED`
|
|||
|
|
|
|||
|
|
### 4.3 桌面端信号消费策略
|
|||
|
|
|
|||
|
|
`JeecgUserSyncCoordinator` 不直接依赖推送包体做写库,而是:
|
|||
|
|
|
|||
|
|
- 解析收到的 JSON(支持 message/commandJson 嵌套)
|
|||
|
|
- 判断 `cmd` 是否为 `SCADA_USER_CHANGED` / `SCADA_USERS_CHANGED`
|
|||
|
|
- 命中后入队 `JeecgUserMirror` 类型 Outbox
|
|||
|
|
- 由 Outbox 消费阶段调用 `TryBackgroundSyncJeecgUsersAsync` 做实际拉取与落库
|
|||
|
|
|
|||
|
|
该策略提升了一致性与容错性(尤其在断线/消息乱序时)。
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 5. 断线检测与自动续连
|
|||
|
|
|
|||
|
|
## 5.1 连接失败重试
|
|||
|
|
|
|||
|
|
`ConnectUnifiedDeviceChannelAsync` 内置重试退避(秒):`0, 2, 5, 10, 30`。
|
|||
|
|
|
|||
|
|
### 5.2 协议层心跳 + 应用层 PING
|
|||
|
|
|
|||
|
|
- 协议层:每 10s 发送 STOMP `\n` 心跳帧。
|
|||
|
|
- 应用层:每 10s 发送 `{"cmd":"PING_DEVICE"...}` 到 `/app/device/ping`。
|
|||
|
|
- 后端 `handlePing` 回复到 `/topic/device/{deviceId}/pong`。
|
|||
|
|
|
|||
|
|
### 5.3 看门狗“假在线”检测
|
|||
|
|
|
|||
|
|
- 客户端记录 `_lastReceivedTick`。
|
|||
|
|
- 若 30s 内没有任何帧(含心跳或 MESSAGE)则判定连接僵死,主动重连。
|
|||
|
|
|
|||
|
|
### 5.4 网络恢复触发重连
|
|||
|
|
|
|||
|
|
- `NetworkMonitor` 每 10s 探活(优先 SCADA 免登录接口,失败降级 health)。
|
|||
|
|
- 状态变更发布 `NetworkStatusChangedEvent`。
|
|||
|
|
- STOMP 服务在 `isOnline=true` 且 socket 非 Open 时主动重连。
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 6. 离线补偿与幂等
|
|||
|
|
|
|||
|
|
## 6.1 Outbox 持久化
|
|||
|
|
|
|||
|
|
`OutboxProcessor.EnqueueAsync` 先落 SQLite Outbox(`OutboxMessage`),再按在线状态决定是否立即入内存通道消费。
|
|||
|
|
|
|||
|
|
### 6.2 消费与失败重试
|
|||
|
|
|
|||
|
|
- 消费失败进入 `MarkFailedAsync`:
|
|||
|
|
- `RetryCount + 1`
|
|||
|
|
- 指数退避 `2^n` 秒(上限按实现控制)
|
|||
|
|
- 最多 5 次,超过标记 `Status=2`
|
|||
|
|
- 在线恢复时 `FlushPendingAsync` 会补刷所有待发送消息。
|
|||
|
|
|
|||
|
|
### 6.3 双通道消费分流
|
|||
|
|
|
|||
|
|
`OutboxProcessor` 对消息分两类处理:
|
|||
|
|
|
|||
|
|
1) `JeecgUserMirror`:调用 `_mirrorPullHandler.ExecutePullAsync()`(即拉取镜像)。
|
|||
|
|
2) 其他业务消息(如 `SYS_USER`):走 `_httpSyncClient.SendBatchAsync()` 发 `/sys/sync/batch`。
|
|||
|
|
|
|||
|
|
### 6.4 后端幂等
|
|||
|
|
|
|||
|
|
`SyncController.batch` 通过 `messageId` 检查幂等日志(`sync_idempotent_log`)避免重复消费;未处理过才路由执行。
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 7. 配置项(当前关键项)
|
|||
|
|
|
|||
|
|
配置文件:`YY.Admin.Services/Configuration/appsettings.json`,`JeecgIntegration` 节点。
|
|||
|
|
|
|||
|
|
建议重点关注:
|
|||
|
|
- `Enabled`
|
|||
|
|
- `BaseUrl`
|
|||
|
|
- `AnonymousMode`
|
|||
|
|
- `UserListPath`(当前是 `/sys/user/scada/queryUser`)
|
|||
|
|
- `SyncAllUsersOnJeecgLogin`
|
|||
|
|
- `BackgroundSyncIntervalMinutes`
|
|||
|
|
|
|||
|
|
> 实时通道默认使用 `/ws/device` 统一 STOMP 端点;`WebSocketPath` 仍保留在配置中,但当前同步主链路已切到统一设备通道实现。
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 8. 启动时序(简版)
|
|||
|
|
|
|||
|
|
1) `SyncModule.OnInitialized` 启动:
|
|||
|
|
- `NetworkMonitor.StartAsync`
|
|||
|
|
- `OutboxProcessor.StartConsumerAsync`
|
|||
|
|
- `ISignalRService.ConnectUnifiedDeviceChannelAsync`
|
|||
|
|
2) 主窗口 `MainWindowViewModel` 构造中调用 `JeecgUserSyncCoordinator.Start()`:
|
|||
|
|
- 订阅 `RemoteCommandReceivedEvent`
|
|||
|
|
- 延迟 3 秒入队一次 `EventBoot` 拉取
|
|||
|
|
3) 后续由“实时消息触发 + 定时/后台同步 + Outbox 补偿”共同维持一致性。
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 9. 现状结论
|
|||
|
|
|
|||
|
|
1) 当前实现已形成“统一 STOMP 通道 + 心跳 + 看门狗 + 网络恢复重连 + Outbox 补偿 + 后端幂等”的完整闭环。
|
|||
|
|
2) 用户同步策略是“实时信号触发拉取”而非“直接信号落库”,在工业网络抖动场景更稳健。
|
|||
|
|
3) 本地 CRUD 回传与后端推送下行均已纳入 Outbox/幂等体系,具备断线续连后的最终一致性基础。
|