2026-04-28 10:23:58 +08:00
|
|
|
|
using Microsoft.Extensions.Configuration;
|
|
|
|
|
|
using Prism.Events;
|
|
|
|
|
|
using System.Text.Json;
|
|
|
|
|
|
using YY.Admin.Core;
|
|
|
|
|
|
using YY.Admin.Core.Events;
|
|
|
|
|
|
using YY.Admin.Core.Services;
|
|
|
|
|
|
using YY.Admin.Core.Sync;
|
|
|
|
|
|
|
|
|
|
|
|
namespace YY.Admin.Services.Service.Jeecg;
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 用户镜像同步:统一走设备同步规范线路(STOMP 收信号 → Outbox → REST 拉取 SCADA),不再使用独立 Jeecg 原生 WebSocket 收包循环。
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
public class JeecgUserSyncCoordinator : IJeecgUserSyncCoordinator, ISingletonDependency
|
|
|
|
|
|
{
|
|
|
|
|
|
private readonly IConfiguration _configuration;
|
|
|
|
|
|
private readonly IEventAggregator _eventAggregator;
|
|
|
|
|
|
private readonly IJeecgUserMirrorPullOutbox _mirrorOutbox;
|
|
|
|
|
|
private readonly ILoggerService _logger;
|
|
|
|
|
|
private CancellationTokenSource? _cts;
|
|
|
|
|
|
private readonly object _lifecycleLock = new();
|
|
|
|
|
|
private SubscriptionToken? _remoteCommandSubscription;
|
2026-04-30 15:28:20 +08:00
|
|
|
|
private SubscriptionToken? _networkStatusSubscription;
|
2026-04-28 10:23:58 +08:00
|
|
|
|
|
|
|
|
|
|
public JeecgUserSyncCoordinator(
|
|
|
|
|
|
IConfiguration configuration,
|
|
|
|
|
|
IEventAggregator eventAggregator,
|
|
|
|
|
|
IJeecgUserMirrorPullOutbox mirrorOutbox,
|
|
|
|
|
|
ILoggerService logger)
|
|
|
|
|
|
{
|
|
|
|
|
|
_configuration = configuration;
|
|
|
|
|
|
_eventAggregator = eventAggregator;
|
|
|
|
|
|
_mirrorOutbox = mirrorOutbox;
|
|
|
|
|
|
_logger = logger;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <inheritdoc />
|
|
|
|
|
|
public void Start()
|
|
|
|
|
|
{
|
|
|
|
|
|
var enabled = _configuration.GetValue("JeecgIntegration:Enabled", false);
|
|
|
|
|
|
var baseUrl = _configuration.GetValue<string>("JeecgIntegration:BaseUrl") ?? string.Empty;
|
|
|
|
|
|
var stompPath = "/ws/device/websocket";
|
|
|
|
|
|
_logger.Information($"Jeecg用户同步协调器启动(统一设备通道),Enabled={enabled}, BaseUrl={baseUrl}, Stomp={stompPath}");
|
|
|
|
|
|
|
|
|
|
|
|
if (!enabled)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.Warning("Jeecg用户同步协调器未启动:JeecgIntegration:Enabled=false");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
CancellationToken token;
|
|
|
|
|
|
lock (_lifecycleLock)
|
|
|
|
|
|
{
|
|
|
|
|
|
CancelAndDisposeCts();
|
|
|
|
|
|
UnsubscribeRemoteCommand();
|
2026-04-30 15:28:20 +08:00
|
|
|
|
UnsubscribeNetworkStatus();
|
2026-04-28 10:23:58 +08:00
|
|
|
|
_remoteCommandSubscription = _eventAggregator.GetEvent<RemoteCommandReceivedEvent>().Subscribe(OnRemoteCommand, ThreadOption.BackgroundThread);
|
2026-04-30 15:28:20 +08:00
|
|
|
|
_networkStatusSubscription = _eventAggregator.GetEvent<NetworkStatusChangedEvent>().Subscribe(OnNetworkStatusChanged, ThreadOption.BackgroundThread);
|
2026-04-28 10:23:58 +08:00
|
|
|
|
|
|
|
|
|
|
_cts = new CancellationTokenSource();
|
|
|
|
|
|
token = _cts.Token;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 进入主窗口后稍延迟再入队一次全量拉取,避免与登录同步抢带宽
|
|
|
|
|
|
_ = Task.Run(async () =>
|
|
|
|
|
|
{
|
|
|
|
|
|
try
|
|
|
|
|
|
{
|
|
|
|
|
|
await Task.Delay(3000, token).ConfigureAwait(false);
|
|
|
|
|
|
await _mirrorOutbox.EnqueuePullAsync(JeecgUserMirrorOutbox.EventBoot, null, token).ConfigureAwait(false);
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (OperationCanceledException)
|
|
|
|
|
|
{
|
|
|
|
|
|
// 忽略
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.Warning($"Jeecg 启动后入队同步失败: {ex.Message}");
|
|
|
|
|
|
}
|
|
|
|
|
|
}, token);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <inheritdoc />
|
|
|
|
|
|
public void Stop()
|
|
|
|
|
|
{
|
|
|
|
|
|
lock (_lifecycleLock)
|
|
|
|
|
|
{
|
|
|
|
|
|
UnsubscribeRemoteCommand();
|
2026-04-30 15:28:20 +08:00
|
|
|
|
UnsubscribeNetworkStatus();
|
2026-04-28 10:23:58 +08:00
|
|
|
|
CancelAndDisposeCts();
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private void OnRemoteCommand(RemoteCommandPayload payload)
|
|
|
|
|
|
{
|
|
|
|
|
|
try
|
|
|
|
|
|
{
|
|
|
|
|
|
var json = payload.CommandJson ?? string.Empty;
|
|
|
|
|
|
if (!ShouldTriggerUserSync(json))
|
|
|
|
|
|
{
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
_logger.Information($"收到设备统一通道(STOMP)用户变更信号,入队 Outbox: {json}");
|
|
|
|
|
|
_ = _mirrorOutbox.EnqueuePullAsync(JeecgUserMirrorOutbox.EventSignal, json, CancellationToken.None);
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.Warning($"处理 STOMP 用户变更信号失败: {ex.Message}");
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private void UnsubscribeRemoteCommand()
|
|
|
|
|
|
{
|
|
|
|
|
|
if (_remoteCommandSubscription != null)
|
|
|
|
|
|
{
|
|
|
|
|
|
_eventAggregator.GetEvent<RemoteCommandReceivedEvent>().Unsubscribe(_remoteCommandSubscription);
|
|
|
|
|
|
_remoteCommandSubscription = null;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-30 15:28:20 +08:00
|
|
|
|
private void UnsubscribeNetworkStatus()
|
|
|
|
|
|
{
|
|
|
|
|
|
if (_networkStatusSubscription != null)
|
|
|
|
|
|
{
|
|
|
|
|
|
_eventAggregator.GetEvent<NetworkStatusChangedEvent>().Unsubscribe(_networkStatusSubscription);
|
|
|
|
|
|
_networkStatusSubscription = null;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private void OnNetworkStatusChanged(NetworkStatusChangedPayload payload)
|
|
|
|
|
|
{
|
|
|
|
|
|
if (payload is null || !payload.IsOnline)
|
|
|
|
|
|
{
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
try
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.Information("检测到网络恢复,入队一次用户全量拉取(断线重连补偿)。");
|
|
|
|
|
|
_ = _mirrorOutbox.EnqueuePullAsync(
|
|
|
|
|
|
JeecgUserMirrorOutbox.EventBoot,
|
|
|
|
|
|
"{\"reason\":\"network-reconnected\"}",
|
|
|
|
|
|
CancellationToken.None);
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.Warning($"网络恢复后入队用户同步失败: {ex.Message}");
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-28 10:23:58 +08:00
|
|
|
|
private void CancelAndDisposeCts()
|
|
|
|
|
|
{
|
|
|
|
|
|
try
|
|
|
|
|
|
{
|
|
|
|
|
|
_cts?.Cancel();
|
|
|
|
|
|
}
|
|
|
|
|
|
catch
|
|
|
|
|
|
{
|
|
|
|
|
|
// 忽略
|
|
|
|
|
|
}
|
|
|
|
|
|
finally
|
|
|
|
|
|
{
|
|
|
|
|
|
_cts?.Dispose();
|
|
|
|
|
|
_cts = null;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private static bool ShouldTriggerUserSync(string message)
|
|
|
|
|
|
{
|
|
|
|
|
|
if (string.IsNullOrWhiteSpace(message))
|
|
|
|
|
|
{
|
|
|
|
|
|
return false;
|
|
|
|
|
|
}
|
|
|
|
|
|
try
|
|
|
|
|
|
{
|
|
|
|
|
|
using var doc = JsonDocument.Parse(message);
|
|
|
|
|
|
var root = doc.RootElement;
|
|
|
|
|
|
if (TryMatchCmd(root))
|
|
|
|
|
|
{
|
|
|
|
|
|
return true;
|
|
|
|
|
|
}
|
|
|
|
|
|
// 设备模块 REST 下发的 commandJson 包裹
|
|
|
|
|
|
if (root.TryGetProperty("commandJson", out var innerEl) && innerEl.ValueKind == JsonValueKind.String)
|
|
|
|
|
|
{
|
|
|
|
|
|
var rawInner = innerEl.GetString();
|
|
|
|
|
|
if (!string.IsNullOrWhiteSpace(rawInner))
|
|
|
|
|
|
{
|
|
|
|
|
|
using var innerDoc = JsonDocument.Parse(rawInner);
|
|
|
|
|
|
return TryMatchCmd(innerDoc.RootElement);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
if (root.TryGetProperty("message", out var innerMessage)
|
|
|
|
|
|
&& innerMessage.ValueKind == JsonValueKind.String)
|
|
|
|
|
|
{
|
|
|
|
|
|
var rawInner = innerMessage.GetString();
|
|
|
|
|
|
if (!string.IsNullOrWhiteSpace(rawInner))
|
|
|
|
|
|
{
|
|
|
|
|
|
using var innerDoc = JsonDocument.Parse(rawInner);
|
|
|
|
|
|
return TryMatchCmd(innerDoc.RootElement);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
return false;
|
|
|
|
|
|
}
|
|
|
|
|
|
catch
|
|
|
|
|
|
{
|
|
|
|
|
|
return false;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private static bool TryMatchCmd(JsonElement element)
|
|
|
|
|
|
{
|
|
|
|
|
|
if (element.ValueKind != JsonValueKind.Object)
|
|
|
|
|
|
{
|
|
|
|
|
|
return false;
|
|
|
|
|
|
}
|
|
|
|
|
|
if (!element.TryGetProperty("cmd", out var cmd))
|
|
|
|
|
|
{
|
|
|
|
|
|
return false;
|
|
|
|
|
|
}
|
|
|
|
|
|
var cmdValue = cmd.GetString();
|
|
|
|
|
|
return string.Equals(cmdValue, "SCADA_USER_CHANGED", StringComparison.OrdinalIgnoreCase)
|
|
|
|
|
|
|| string.Equals(cmdValue, "SCADA_USERS_CHANGED", StringComparison.OrdinalIgnoreCase);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|