新增MES模块,包含供应商、客户、车辆和地磅数据记录管理功能,支持免密接口和数据同步。更新相关控制器、实体、服务和数据库配置,优化权限管理和数据字典支持,确保系统的灵活性和可扩展性。

This commit is contained in:
geht
2026-04-30 15:28:20 +08:00
parent 142a0bdaba
commit b03cbeff9b
121 changed files with 10540 additions and 424 deletions

View File

@@ -3,29 +3,62 @@ using Prism.Events;
using System.IO;
using System.Net.WebSockets;
using System.Text;
using System.Timers;
using YY.Admin.Core.Events;
using YY.Admin.Core.Services;
using YY.Admin.Core.Session;
using YY.Admin.Helper;
using YY.Admin.Infrastructure.Storage;
namespace YY.Admin.Infrastructure.Hubs;
public class StompWebSocketService : ISignalRService
{
// STOMP heart-beat: send \n every 10 s, declare we want to receive every 10 s
private const int HeartbeatMs = 10_000;
// Watchdog: if nothing received in 3 × HeartbeatMs, treat connection as zombie
private const int WatchdogMs = 30_000;
private readonly IConfiguration _configuration;
private readonly IEventAggregator _eventAggregator;
private readonly TokenStore _tokenStore;
private readonly INetworkMonitor _networkMonitor;
private ClientWebSocket? _socket;
private string _deviceId = "default-device";
private string _token = string.Empty;
private CancellationTokenSource? _connectionCts;
private System.Timers.Timer? _heartbeatTimer;
private System.Timers.Timer? _watchdogTimer;
private volatile int _lastReceivedTick = Environment.TickCount;
public StompWebSocketService(
IConfiguration configuration,
IEventAggregator eventAggregator,
TokenStore tokenStore)
TokenStore tokenStore,
INetworkMonitor networkMonitor)
{
_configuration = configuration;
_eventAggregator = eventAggregator;
_tokenStore = tokenStore;
_networkMonitor = networkMonitor;
// When the network comes back online, reconnect the STOMP channel if it is down.
eventAggregator.GetEvent<NetworkStatusChangedEvent>()
.Subscribe(OnNetworkStatusChanged, ThreadOption.BackgroundThread);
}
private void OnNetworkStatusChanged(NetworkStatusChangedPayload payload)
{
if (!payload.IsOnline || IsDisconnectedByUser())
{
return;
}
if (_socket == null || _socket.State != WebSocketState.Open)
{
_ = Task.Run(() => ConnectUnifiedDeviceChannelAsync(CancellationToken.None));
}
}
/// <inheritdoc />
@@ -38,6 +71,12 @@ public class StompWebSocketService : ISignalRService
/// <inheritdoc />
public async Task ConnectUnifiedDeviceChannelAsync(CancellationToken cancellationToken = default)
{
if (IsDisconnectedByUser())
{
await DisconnectAsync(cancellationToken).ConfigureAwait(false);
return;
}
var anonymous = _configuration.GetValue("JeecgIntegration:AnonymousMode", true);
if (anonymous)
{
@@ -61,41 +100,70 @@ public class StompWebSocketService : ISignalRService
try
{
// Tear down previous session before opening a new one.
var oldCts = Interlocked.Exchange(ref _connectionCts, null);
oldCts?.Cancel();
oldCts?.Dispose();
StopTimers();
_socket?.Dispose();
_socket = new ClientWebSocket();
_socket.Options.AddSubProtocol("v12.stomp");
await _socket.ConnectAsync(new Uri(wsUrl), cancellationToken).ConfigureAwait(false);
var connectFrame = anonymous || string.IsNullOrWhiteSpace(_token)
? "CONNECT\naccept-version:1.2\nheart-beat:10000,10000\n\n\0"
: BuildConnectFrame(_token);
? BuildConnectFrame(null, _deviceId)
: BuildConnectFrame(_token, _deviceId);
await SendFrameAsync(connectFrame, cancellationToken).ConfigureAwait(false);
// 用户镜像变更:与后端 /topic/sync/jeecg-users 对齐(设备同步统一线路)
await SendFrameAsync(BuildSubscribeFrame("sub-jeecg-users", "/topic/sync/jeecg-users"), cancellationToken).ConfigureAwait(false);
// 用户镜像变更:订阅 /topic/sync/jeecg-users
await SendFrameAsync(
BuildSubscribeFrame("sub-jeecg-users", "/topic/sync/jeecg-users"),
cancellationToken).ConfigureAwait(false);
// 车辆数据变更:订阅 /topic/sync/mes-vehicles
await SendFrameAsync(
BuildSubscribeFrame("sub-mes-vehicles", "/topic/sync/mes-vehicles"),
cancellationToken).ConfigureAwait(false);
// 客户数据变更:订阅 /topic/sync/mes-customers
await SendFrameAsync(
BuildSubscribeFrame("sub-mes-customers", "/topic/sync/mes-customers"),
cancellationToken).ConfigureAwait(false);
// 供应商数据变更:订阅 /topic/sync/mes-suppliers
await SendFrameAsync(
BuildSubscribeFrame("sub-mes-suppliers", "/topic/sync/mes-suppliers"),
cancellationToken).ConfigureAwait(false);
// 订阅服务端 PONG 回复(应用层假在线检测)
await SendFrameAsync(
BuildSubscribeFrame("sub-device-pong", $"/topic/device/{_deviceId}/pong"),
cancellationToken).ConfigureAwait(false);
// 非免密时同时订阅设备点对点指令队列
if (!anonymous && !string.IsNullOrWhiteSpace(_token))
{
await SendFrameAsync(BuildSubscribeFrame("sub-device-command", $"/user/{_deviceId}/queue/command"), cancellationToken).ConfigureAwait(false);
await SendFrameAsync(
BuildSubscribeFrame("sub-device-command", $"/user/{_deviceId}/queue/command"),
cancellationToken).ConfigureAwait(false);
}
_eventAggregator.GetEvent<NetworkStatusChangedEvent>().Publish(new NetworkStatusChangedPayload
{
IsOnline = true,
ChangedAt = DateTime.UtcNow
});
// Reset watchdog baseline to now.
_lastReceivedTick = Environment.TickCount;
_ = Task.Run(() => ReceiveLoopAsync(cancellationToken), cancellationToken);
var cts = new CancellationTokenSource();
_connectionCts = cts;
StartHeartbeatTimer(cts.Token);
StartWatchdogTimer(cts.Token);
_networkMonitor.SetStompTransportOnline(true);
_ = Task.Run(() => ReceiveLoopAsync(cts.Token), cts.Token);
return;
}
catch
{
_eventAggregator.GetEvent<NetworkStatusChangedEvent>().Publish(new NetworkStatusChangedPayload
{
IsOnline = false,
ChangedAt = DateTime.UtcNow
});
_networkMonitor.SetStompTransportOnline(false);
}
}
}
@@ -103,19 +171,149 @@ public class StompWebSocketService : ISignalRService
/// <inheritdoc />
public async Task SendDeviceStatusAsync(object status, CancellationToken cancellationToken = default)
{
if (_socket == null || _socket.State != WebSocketState.Open)
if (IsDisconnectedByUser() || _socket == null || _socket.State != WebSocketState.Open)
{
return;
}
var json = System.Text.Json.JsonSerializer.Serialize(status);
var frame = $"SEND\n" +
$"destination:/app/device/status\n" +
$"content-type:application/json\n" +
var frame = "SEND\n" +
"destination:/app/device/status\n" +
"content-type:application/json\n" +
$"content-length:{Encoding.UTF8.GetByteCount(json)}\n\n" +
$"{json}\0";
await SendFrameAsync(frame, cancellationToken).ConfigureAwait(false);
}
public async Task DisconnectAsync(CancellationToken cancellationToken = default)
{
var cts = Interlocked.Exchange(ref _connectionCts, null);
cts?.Cancel();
cts?.Dispose();
StopTimers();
var socket = _socket;
_socket = null;
if (socket == null)
{
_networkMonitor.SetStompTransportOnline(false);
return;
}
try
{
if (socket.State == WebSocketState.Open || socket.State == WebSocketState.CloseReceived)
{
await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "manual disconnect", cancellationToken).ConfigureAwait(false);
}
}
catch
{
socket.Abort();
}
finally
{
socket.Dispose();
_networkMonitor.SetStompTransportOnline(false);
}
}
// ── Heartbeat ──────────────────────────────────────────────────────────
private void StartHeartbeatTimer(CancellationToken cancellationToken)
{
_heartbeatTimer = new System.Timers.Timer(HeartbeatMs) { AutoReset = true };
_heartbeatTimer.Elapsed += async (_, _) =>
{
if (cancellationToken.IsCancellationRequested)
{
return;
}
// STOMP protocol keepalive (\n frame)
await SendStompHeartbeatAsync(cancellationToken).ConfigureAwait(false);
// Application-layer PING: server replies to /topic/device/{id}/pong
await SendAppPingAsync(cancellationToken).ConfigureAwait(false);
};
_heartbeatTimer.Start();
}
private async Task SendStompHeartbeatAsync(CancellationToken cancellationToken)
{
if (_socket == null || _socket.State != WebSocketState.Open)
{
return;
}
try
{
// STOMP spec: a single LF (0x0A) constitutes a heartbeat frame.
await _socket.SendAsync(
new ArraySegment<byte>(new byte[] { 0x0A }),
WebSocketMessageType.Text,
true,
cancellationToken).ConfigureAwait(false);
}
catch
{
// Send failure will be caught by the watchdog.
}
}
private async Task SendAppPingAsync(CancellationToken cancellationToken)
{
if (_socket == null || _socket.State != WebSocketState.Open)
{
return;
}
try
{
var body = $"{{\"cmd\":\"PING_DEVICE\",\"deviceId\":\"{_deviceId}\",\"ts\":{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}}}";
var frame = "SEND\n" +
"destination:/app/device/ping\n" +
"content-type:application/json\n" +
$"content-length:{Encoding.UTF8.GetByteCount(body)}\n\n" +
$"{body}\0";
await SendFrameAsync(frame, cancellationToken).ConfigureAwait(false);
}
catch
{
// Watchdog handles reconnect.
}
}
// ── Watchdog ───────────────────────────────────────────────────────────
private void StartWatchdogTimer(CancellationToken cancellationToken)
{
_watchdogTimer = new System.Timers.Timer(WatchdogMs) { AutoReset = true };
_watchdogTimer.Elapsed += (_, _) =>
{
if (cancellationToken.IsCancellationRequested)
{
return;
}
// TickCount wraps; unchecked subtraction handles it correctly.
var silenceMs = unchecked(Environment.TickCount - _lastReceivedTick);
if (silenceMs >= WatchdogMs)
{
// No frame (heartbeat or MESSAGE) received for WatchdogMs — zombie connection.
_ = Task.Run(() => ConnectUnifiedDeviceChannelAsync(CancellationToken.None));
}
};
_watchdogTimer.Start();
}
private void StopTimers()
{
_heartbeatTimer?.Stop();
_heartbeatTimer?.Dispose();
_heartbeatTimer = null;
_watchdogTimer?.Stop();
_watchdogTimer?.Dispose();
_watchdogTimer = null;
}
// ── Receive loop ───────────────────────────────────────────────────────
private async Task ReceiveLoopAsync(CancellationToken cancellationToken)
{
if (_socket == null)
@@ -132,17 +330,16 @@ public class StompWebSocketService : ISignalRService
result = await _socket.ReceiveAsync(new ArraySegment<byte>(buffer), cancellationToken).ConfigureAwait(false);
if (result.MessageType == WebSocketMessageType.Close)
{
_eventAggregator.GetEvent<NetworkStatusChangedEvent>().Publish(new NetworkStatusChangedPayload
{
IsOnline = false,
ChangedAt = DateTime.UtcNow
});
_networkMonitor.SetStompTransportOnline(false);
await ConnectUnifiedDeviceChannelAsync(cancellationToken).ConfigureAwait(false);
return;
}
ms.Write(buffer, 0, result.Count);
} while (!result.EndOfMessage);
// Any received frame (heartbeat \n or MESSAGE) resets the watchdog.
_lastReceivedTick = Environment.TickCount;
var text = Encoding.UTF8.GetString(ms.ToArray());
if (!text.StartsWith("MESSAGE", StringComparison.OrdinalIgnoreCase))
{
@@ -169,21 +366,27 @@ public class StompWebSocketService : ISignalRService
return;
}
var data = Encoding.UTF8.GetBytes(frame);
await _socket.SendAsync(new ArraySegment<byte>(data), WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false);
await _socket.SendAsync(
new ArraySegment<byte>(data),
WebSocketMessageType.Text,
true,
cancellationToken).ConfigureAwait(false);
}
// ── Helpers ────────────────────────────────────────────────────────────
private string ResolveWsUrl()
{
var baseUrl = _configuration.GetValue<string>("JeecgIntegration:BaseUrl")?.TrimEnd('/');
if (string.IsNullOrWhiteSpace(baseUrl))
{
return "ws://127.0.0.1:8080/jeecg-boot/ws/device/websocket";
return "ws://127.0.0.1:8080/jeecg-boot/ws/device";
}
if (baseUrl.StartsWith("https://", StringComparison.OrdinalIgnoreCase))
{
return "wss://" + baseUrl["https://".Length..] + "/ws/device/websocket";
return "wss://" + baseUrl["https://".Length..] + "/ws/device";
}
return "ws://" + baseUrl["http://".Length..] + "/ws/device/websocket";
return "ws://" + baseUrl["http://".Length..] + "/ws/device";
}
private static string ResolveDeviceId(string token)
@@ -215,12 +418,22 @@ public class StompWebSocketService : ISignalRService
}
}
private static string BuildConnectFrame(string token)
private static string BuildConnectFrame(string? token, string deviceId)
{
return "CONNECT\n" +
"accept-version:1.2\n" +
"heart-beat:10000,10000\n" +
$"Authorization:Bearer {token}\n\n\0";
var user = AppSession.CurrentUser;
var sb = new System.Text.StringBuilder();
sb.Append("CONNECT\n");
sb.Append("accept-version:1.2\n");
sb.Append($"heart-beat:{HeartbeatMs},{HeartbeatMs}\n");
if (!string.IsNullOrWhiteSpace(token))
sb.Append($"Authorization:Bearer {token}\n");
sb.Append("platform:desktop\n");
sb.Append($"hostName:{Environment.MachineName}\n");
sb.Append($"deviceId:{deviceId}\n");
sb.Append($"userName:{user?.Account ?? "unknown"}\n");
sb.Append($"realName:{user?.RealName ?? ""}\n");
sb.Append("\n\0");
return sb.ToString();
}
private static string BuildSubscribeFrame(string subscriptionId, string destination)
@@ -230,4 +443,16 @@ public class StompWebSocketService : ISignalRService
$"destination:{destination}\n" +
"ack:auto\n\n\0";
}
private static bool IsDisconnectedByUser()
{
try
{
return ServerSettingsStore.Load().DisconnectConnection;
}
catch
{
return false;
}
}
}

View File

@@ -0,0 +1,26 @@
using System.Net;
using System.Net.Http;
using YY.Admin.Helper;
namespace YY.Admin.Infrastructure.Network;
/// <summary>
/// 全局断开保护:用户勾选"断开连接"时,直接短路所有后端 HTTP 请求,
/// 返回 499 而不发起真实网络调用,各服务的 catch/IsSuccessStatusCode 分支自行降级。
/// </summary>
internal sealed class DisconnectGuardHandler : DelegatingHandler
{
protected override Task<HttpResponseMessage> SendAsync(
HttpRequestMessage request,
CancellationToken cancellationToken)
{
if (ServerSettingsStore.Load().DisconnectConnection)
{
return Task.FromResult(new HttpResponseMessage((HttpStatusCode)499)
{
ReasonPhrase = "User Disconnected"
});
}
return base.SendAsync(request, cancellationToken);
}
}

View File

@@ -3,6 +3,7 @@ using Prism.Events;
using System.Net.Http;
using YY.Admin.Core.Events;
using YY.Admin.Core.Services;
using YY.Admin.Helper;
namespace YY.Admin.Infrastructure.Network;
@@ -12,8 +13,11 @@ public class NetworkMonitor : INetworkMonitor
private readonly IConfiguration _configuration;
private readonly IEventAggregator _eventAggregator;
private readonly SemaphoreSlim _startLock = new(1, 1);
private readonly object _aggregateLock = new();
private Task? _loopTask;
private CancellationTokenSource? _cts;
private volatile bool _httpProbeOnline;
private volatile bool _stompTransportOnline;
private volatile bool _isOnline;
public NetworkMonitor(
@@ -30,6 +34,13 @@ public class NetworkMonitor : INetworkMonitor
public event Action<bool>? StatusChanged;
/// <inheritdoc />
public void SetStompTransportOnline(bool online)
{
_stompTransportOnline = online;
RecomputeAggregatedOnline();
}
public async Task StartAsync(CancellationToken cancellationToken = default)
{
await _startLock.WaitAsync(cancellationToken).ConfigureAwait(false);
@@ -51,27 +62,63 @@ public class NetworkMonitor : INetworkMonitor
private async Task MonitorLoopAsync(CancellationToken cancellationToken)
{
// 启动后立即探活一次,避免首屏 10 秒内 IsOnline 恒为 false
await RunHttpProbeAndRecomputeAsync(cancellationToken).ConfigureAwait(false);
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(10));
while (await timer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false))
{
var online = await ProbeAsync(cancellationToken).ConfigureAwait(false);
if (online == _isOnline)
await RunHttpProbeAndRecomputeAsync(cancellationToken).ConfigureAwait(false);
}
}
private async Task RunHttpProbeAndRecomputeAsync(CancellationToken cancellationToken)
{
var httpOk = await ProbeAsync(cancellationToken).ConfigureAwait(false);
_httpProbeOnline = httpOk;
RecomputeAggregatedOnline();
}
private void RecomputeAggregatedOnline()
{
var combined = ComputeCombinedOnline();
bool newValue;
lock (_aggregateLock)
{
if (combined == _isOnline)
{
continue;
return;
}
_isOnline = online;
StatusChanged?.Invoke(online);
_eventAggregator.GetEvent<NetworkStatusChangedEvent>().Publish(new NetworkStatusChangedPayload
{
IsOnline = online,
ChangedAt = DateTime.UtcNow
});
_isOnline = combined;
newValue = combined;
}
StatusChanged?.Invoke(newValue);
_eventAggregator.GetEvent<NetworkStatusChangedEvent>().Publish(new NetworkStatusChangedPayload
{
IsOnline = newValue,
ChangedAt = DateTime.UtcNow
});
}
private bool ComputeCombinedOnline()
{
if (IsDisconnectedByUser())
{
return false;
}
return _httpProbeOnline || _stompTransportOnline;
}
private async Task<bool> ProbeAsync(CancellationToken cancellationToken)
{
if (IsDisconnectedByUser())
{
return false;
}
var baseUrl = _configuration.GetValue<string>("JeecgIntegration:BaseUrl")?.TrimEnd('/');
if (string.IsNullOrWhiteSpace(baseUrl))
{
@@ -108,4 +155,16 @@ public class NetworkMonitor : INetworkMonitor
return false;
}
private static bool IsDisconnectedByUser()
{
try
{
return ServerSettingsStore.Load().DisconnectConnection;
}
catch
{
return false;
}
}
}

View File

@@ -0,0 +1,74 @@
using YY.Admin.Core.Services;
using YY.Admin.Core.Sync;
namespace YY.Admin.Infrastructure.Sync;
/// <summary>
/// 将桌面用户 CRUD 写入 Outbox通过 /sys/sync/batch 反同步到 Jeecg 后端。
/// 断网时自动持久化,联网后续传。
/// </summary>
public sealed class UserSyncOutbox : IUserSyncOutbox
{
private readonly OutboxProcessor _outboxProcessor;
public UserSyncOutbox(OutboxProcessor outboxProcessor)
{
_outboxProcessor = outboxProcessor;
}
public Task EnqueueCreateAsync(
string userId, string account, string? realName, int? sex, DateTime? birthday, string? phone, string? email, int status, string? updateBy,
CancellationToken cancellationToken = default)
{
return _outboxProcessor.EnqueueAsync(
SysUserSyncOutbox.AggregateType,
userId,
SysUserSyncOutbox.EventCreate,
new { userId, account, realName, sex, birthday, phone, email, status, updateBy },
cancellationToken);
}
public Task EnqueueUpdateAsync(
string userId, string account, string? realName, int? sex, DateTime? birthday, string? phone, string? email, int status, string? updateBy,
CancellationToken cancellationToken = default)
{
return _outboxProcessor.EnqueueAsync(
SysUserSyncOutbox.AggregateType,
userId,
SysUserSyncOutbox.EventUpdate,
new { userId, account, realName, sex, birthday, phone, email, status, updateBy },
cancellationToken);
}
public Task EnqueueToggleStatusAsync(
string userId, int status, string? updateBy,
CancellationToken cancellationToken = default)
{
return _outboxProcessor.EnqueueAsync(
SysUserSyncOutbox.AggregateType,
userId,
SysUserSyncOutbox.EventToggleStatus,
new { userId, status, updateBy },
cancellationToken);
}
public Task EnqueueDeleteAsync(string userId, CancellationToken cancellationToken = default)
{
return _outboxProcessor.EnqueueAsync(
SysUserSyncOutbox.AggregateType,
userId,
SysUserSyncOutbox.EventDelete,
new { userId },
cancellationToken);
}
public Task EnqueueBatchDeleteAsync(IReadOnlyList<string> userIds, CancellationToken cancellationToken = default)
{
return _outboxProcessor.EnqueueAsync(
SysUserSyncOutbox.AggregateType,
string.Join(",", userIds),
SysUserSyncOutbox.EventBatchDelete,
new { userIds },
cancellationToken);
}
}