using Microsoft.Extensions.Configuration;
using Prism.Events;
using System.Text.Json;
using YY.Admin.Core;
using YY.Admin.Core.Events;
using YY.Admin.Core.Services;
using YY.Admin.Core.Sync;
namespace YY.Admin.Services.Service.Jeecg;
///
/// 用户镜像同步:统一走设备同步规范线路(STOMP 收信号 → Outbox → REST 拉取 SCADA),不再使用独立 Jeecg 原生 WebSocket 收包循环。
///
public class JeecgUserSyncCoordinator : IJeecgUserSyncCoordinator, ISingletonDependency
{
private readonly IConfiguration _configuration;
private readonly IEventAggregator _eventAggregator;
private readonly IJeecgUserMirrorPullOutbox _mirrorOutbox;
private readonly ILoggerService _logger;
private CancellationTokenSource? _cts;
private readonly object _lifecycleLock = new();
private SubscriptionToken? _remoteCommandSubscription;
private SubscriptionToken? _networkStatusSubscription;
public JeecgUserSyncCoordinator(
IConfiguration configuration,
IEventAggregator eventAggregator,
IJeecgUserMirrorPullOutbox mirrorOutbox,
ILoggerService logger)
{
_configuration = configuration;
_eventAggregator = eventAggregator;
_mirrorOutbox = mirrorOutbox;
_logger = logger;
}
///
public void Start()
{
var enabled = _configuration.GetValue("JeecgIntegration:Enabled", false);
var baseUrl = _configuration.GetValue("JeecgIntegration:BaseUrl") ?? string.Empty;
var stompPath = "/ws/device/websocket";
_logger.Information($"Jeecg用户同步协调器启动(统一设备通道),Enabled={enabled}, BaseUrl={baseUrl}, Stomp={stompPath}");
if (!enabled)
{
_logger.Warning("Jeecg用户同步协调器未启动:JeecgIntegration:Enabled=false");
return;
}
CancellationToken token;
lock (_lifecycleLock)
{
CancelAndDisposeCts();
UnsubscribeRemoteCommand();
UnsubscribeNetworkStatus();
_remoteCommandSubscription = _eventAggregator.GetEvent().Subscribe(OnRemoteCommand, ThreadOption.BackgroundThread);
_networkStatusSubscription = _eventAggregator.GetEvent().Subscribe(OnNetworkStatusChanged, ThreadOption.BackgroundThread);
_cts = new CancellationTokenSource();
token = _cts.Token;
}
// 进入主窗口后稍延迟再入队一次全量拉取,避免与登录同步抢带宽
_ = Task.Run(async () =>
{
try
{
await Task.Delay(3000, token).ConfigureAwait(false);
await _mirrorOutbox.EnqueuePullAsync(JeecgUserMirrorOutbox.EventBoot, null, token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// 忽略
}
catch (Exception ex)
{
_logger.Warning($"Jeecg 启动后入队同步失败: {ex.Message}");
}
}, token);
}
///
public void Stop()
{
lock (_lifecycleLock)
{
UnsubscribeRemoteCommand();
UnsubscribeNetworkStatus();
CancelAndDisposeCts();
}
}
private void OnRemoteCommand(RemoteCommandPayload payload)
{
try
{
var json = payload.CommandJson ?? string.Empty;
if (!ShouldTriggerUserSync(json))
{
return;
}
_logger.Information($"收到设备统一通道(STOMP)用户变更信号,入队 Outbox: {json}");
_ = _mirrorOutbox.EnqueuePullAsync(JeecgUserMirrorOutbox.EventSignal, json, CancellationToken.None);
}
catch (Exception ex)
{
_logger.Warning($"处理 STOMP 用户变更信号失败: {ex.Message}");
}
}
private void UnsubscribeRemoteCommand()
{
if (_remoteCommandSubscription != null)
{
_eventAggregator.GetEvent().Unsubscribe(_remoteCommandSubscription);
_remoteCommandSubscription = null;
}
}
private void UnsubscribeNetworkStatus()
{
if (_networkStatusSubscription != null)
{
_eventAggregator.GetEvent().Unsubscribe(_networkStatusSubscription);
_networkStatusSubscription = null;
}
}
private void OnNetworkStatusChanged(NetworkStatusChangedPayload payload)
{
if (payload is null || !payload.IsOnline)
{
return;
}
try
{
_logger.Information("检测到网络恢复,入队一次用户全量拉取(断线重连补偿)。");
_ = _mirrorOutbox.EnqueuePullAsync(
JeecgUserMirrorOutbox.EventBoot,
"{\"reason\":\"network-reconnected\"}",
CancellationToken.None);
}
catch (Exception ex)
{
_logger.Warning($"网络恢复后入队用户同步失败: {ex.Message}");
}
}
private void CancelAndDisposeCts()
{
try
{
_cts?.Cancel();
}
catch
{
// 忽略
}
finally
{
_cts?.Dispose();
_cts = null;
}
}
private static bool ShouldTriggerUserSync(string message)
{
if (string.IsNullOrWhiteSpace(message))
{
return false;
}
try
{
using var doc = JsonDocument.Parse(message);
var root = doc.RootElement;
if (TryMatchCmd(root))
{
return true;
}
// 设备模块 REST 下发的 commandJson 包裹
if (root.TryGetProperty("commandJson", out var innerEl) && innerEl.ValueKind == JsonValueKind.String)
{
var rawInner = innerEl.GetString();
if (!string.IsNullOrWhiteSpace(rawInner))
{
using var innerDoc = JsonDocument.Parse(rawInner);
return TryMatchCmd(innerDoc.RootElement);
}
}
if (root.TryGetProperty("message", out var innerMessage)
&& innerMessage.ValueKind == JsonValueKind.String)
{
var rawInner = innerMessage.GetString();
if (!string.IsNullOrWhiteSpace(rawInner))
{
using var innerDoc = JsonDocument.Parse(rawInner);
return TryMatchCmd(innerDoc.RootElement);
}
}
return false;
}
catch
{
return false;
}
}
private static bool TryMatchCmd(JsonElement element)
{
if (element.ValueKind != JsonValueKind.Object)
{
return false;
}
if (!element.TryGetProperty("cmd", out var cmd))
{
return false;
}
var cmdValue = cmd.GetString();
return string.Equals(cmdValue, "SCADA_USER_CHANGED", StringComparison.OrdinalIgnoreCase)
|| string.Equals(cmdValue, "SCADA_USERS_CHANGED", StringComparison.OrdinalIgnoreCase);
}
}