Files
qhmes/yy-admin-master/doc/桌面端与后端实时交互及断线续连整理.md

187 lines
7.3 KiB
Markdown
Raw Normal View History

# 桌面端与后端实时交互、断线续连功能整理yy-admin ↔ jeecg-boot
## 1. 范围与目标
本文整理当前项目中“桌面端yy-admin与后端jeecg-boot”的实时交互链路重点覆盖
- 实时消息通道STOMP/WebSocket
- 用户变更同步(后端→桌面)
- 本地变更回传(桌面→后端)
- 断线检测、自动重连、离线补偿Outbox
---
## 2. 总体架构(当前实现)
### 2.1 后端 → 桌面实时触发REST 拉取落地)
1) jeecg 用户变更时,后端广播到 STOMP 主题 `/topic/sync/jeecg-users`
2) 桌面端统一 STOMP 客户端订阅该主题,收到消息后发布本地事件 `RemoteCommandReceivedEvent`
3) `JeecgUserSyncCoordinator` 识别 `SCADA_USER_CHANGED/SCADA_USERS_CHANGED` 后,不直接写库,而是“入 Outbox”。
4) Outbox 消费器执行 `TryBackgroundSyncJeecgUsersAsync`,通过 SCADA 用户列表接口拉取并写入本地镜像表 `jeecg_sys_user`
> 这条链路是“消息触发 + REST 拉全量/增量”的组合,避免只靠实时包体做复杂幂等。
### 2.2 桌面 → 后端(本地变更回传)
1) 桌面用户 CRUD`SysUserService`)成功后调用 `IUserSyncOutbox` 入队。
2) Outbox 在线时实时发送、离线时持久化。
3) 回传通过 `/sys/sync/batch` 批量提交,后端按 `aggregateType/eventType` 路由处理,并写幂等日志表避免重复消费。
---
## 3. 关键代码位置
## 3.1 后端jeecg-boot
- STOMP 配置与心跳:
`jeecg-module-device-sync/src/main/java/org/jeecg/modules/device/sync/config/WebSocketConfig.java`
- 设备消息与应用层 PING/PONG
`jeecg-module-device-sync/src/main/java/org/jeecg/modules/device/sync/controller/DeviceWebSocketController.java`
- 批量同步入口(桌面回传):
`jeecg-module-device-sync/src/main/java/org/jeecg/modules/device/sync/controller/SyncController.java`
- 用户变更触发 STOMP 广播:
`jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/controller/SysUserController.java`
### 3.2 桌面端yy-admin-master
- STOMP 统一连接、心跳、看门狗重连、网络恢复重连:
`YY.Admin/Infrastructure/Hubs/StompWebSocketService.cs`
- STOMP 信号识别并入镜像拉取 Outbox
`YY.Admin.Services/Service/Jeecg/JeecgUserSyncCoordinator.cs`
- 镜像拉取入队:
`YY.Admin/Infrastructure/Sync/JeecgUserMirrorPullOutbox.cs`
- Outbox 核心(持久化、消费、失败重试、上线刷队):
`YY.Admin/Infrastructure/Sync/OutboxProcessor.cs`
- 网络探活与状态事件:
`YY.Admin/Infrastructure/Network/NetworkMonitor.cs`
- 本地用户变更入队接口与实现:
`YY.Admin.Core/Core/Services/IUserSyncOutbox.cs`
`YY.Admin/Infrastructure/Sync/UserSyncOutbox.cs`
- 本地用户 CRUD 完成后回传入队:
`YY.Admin.Services/Service/User/SysUserService.cs`
- 模块启动装配(网络监测 / Outbox / STOMP
`YY.Admin/Module/SyncModule.cs`
- 主窗口启动同步协调器:
`YY.Admin/ViewModels/MainWindowViewModel.cs`
---
## 4. 实时链路详细说明
### 4.1 STOMP 连接与订阅(桌面端)
`StompWebSocketService` 当前行为:
- 连接地址:由 `JeecgIntegration:BaseUrl` 推导为 `ws(s)://<host>/ws/device`(纯 WebSocket
- CONNECT 心跳声明:`heart-beat:10000,10000`
- 默认订阅:
- `/topic/sync/jeecg-users`(用户变更信号)
- `/topic/device/{deviceId}/pong`(应用层保活回应)
- 认证模式额外订阅:
- `/user/{deviceId}/queue/command`
### 4.2 后端广播jeecg 用户变更)
`SysUserController` 在新增/编辑/删除/状态变更等场景调用 `notifyScadaUserChanged(...)`,并通过 `SimpMessagingTemplate``/topic/sync/jeecg-users` 广播 JSON 负载,关键 cmd 为:
- `SCADA_USER_CHANGED`
### 4.3 桌面端信号消费策略
`JeecgUserSyncCoordinator` 不直接依赖推送包体做写库,而是:
- 解析收到的 JSON支持 message/commandJson 嵌套)
- 判断 `cmd` 是否为 `SCADA_USER_CHANGED` / `SCADA_USERS_CHANGED`
- 命中后入队 `JeecgUserMirror` 类型 Outbox
- 由 Outbox 消费阶段调用 `TryBackgroundSyncJeecgUsersAsync` 做实际拉取与落库
该策略提升了一致性与容错性(尤其在断线/消息乱序时)。
---
## 5. 断线检测与自动续连
## 5.1 连接失败重试
`ConnectUnifiedDeviceChannelAsync` 内置重试退避(秒):`0, 2, 5, 10, 30`
### 5.2 协议层心跳 + 应用层 PING
- 协议层:每 10s 发送 STOMP `\n` 心跳帧。
- 应用层:每 10s 发送 `{"cmd":"PING_DEVICE"...}``/app/device/ping`
- 后端 `handlePing` 回复到 `/topic/device/{deviceId}/pong`
### 5.3 看门狗“假在线”检测
- 客户端记录 `_lastReceivedTick`
- 若 30s 内没有任何帧(含心跳或 MESSAGE则判定连接僵死主动重连。
### 5.4 网络恢复触发重连
- `NetworkMonitor` 每 10s 探活(优先 SCADA 免登录接口,失败降级 health
- 状态变更发布 `NetworkStatusChangedEvent`
- STOMP 服务在 `isOnline=true` 且 socket 非 Open 时主动重连。
---
## 6. 离线补偿与幂等
## 6.1 Outbox 持久化
`OutboxProcessor.EnqueueAsync` 先落 SQLite Outbox`OutboxMessage`),再按在线状态决定是否立即入内存通道消费。
### 6.2 消费与失败重试
- 消费失败进入 `MarkFailedAsync`
- `RetryCount + 1`
- 指数退避 `2^n` 秒(上限按实现控制)
- 最多 5 次,超过标记 `Status=2`
- 在线恢复时 `FlushPendingAsync` 会补刷所有待发送消息。
### 6.3 双通道消费分流
`OutboxProcessor` 对消息分两类处理:
1) `JeecgUserMirror`:调用 `_mirrorPullHandler.ExecutePullAsync()`(即拉取镜像)。
2) 其他业务消息(如 `SYS_USER`):走 `_httpSyncClient.SendBatchAsync()``/sys/sync/batch`
### 6.4 后端幂等
`SyncController.batch` 通过 `messageId` 检查幂等日志(`sync_idempotent_log`)避免重复消费;未处理过才路由执行。
---
## 7. 配置项(当前关键项)
配置文件:`YY.Admin.Services/Configuration/appsettings.json``JeecgIntegration` 节点。
建议重点关注:
- `Enabled`
- `BaseUrl`
- `AnonymousMode`
- `UserListPath`(当前是 `/sys/user/scada/queryUser`
- `SyncAllUsersOnJeecgLogin`
- `BackgroundSyncIntervalMinutes`
> 实时通道默认使用 `/ws/device` 统一 STOMP 端点;`WebSocketPath` 仍保留在配置中,但当前同步主链路已切到统一设备通道实现。
---
## 8. 启动时序(简版)
1) `SyncModule.OnInitialized` 启动:
- `NetworkMonitor.StartAsync`
- `OutboxProcessor.StartConsumerAsync`
- `ISignalRService.ConnectUnifiedDeviceChannelAsync`
2) 主窗口 `MainWindowViewModel` 构造中调用 `JeecgUserSyncCoordinator.Start()`
- 订阅 `RemoteCommandReceivedEvent`
- 延迟 3 秒入队一次 `EventBoot` 拉取
3) 后续由“实时消息触发 + 定时/后台同步 + Outbox 补偿”共同维持一致性。
---
## 9. 现状结论
1) 当前实现已形成“统一 STOMP 通道 + 心跳 + 看门狗 + 网络恢复重连 + Outbox 补偿 + 后端幂等”的完整闭环。
2) 用户同步策略是“实时信号触发拉取”而非“直接信号落库”,在工业网络抖动场景更稳健。
3) 本地 CRUD 回传与后端推送下行均已纳入 Outbox/幂等体系,具备断线续连后的最终一致性基础。