Files
qhmes/yy-admin-master/YY.Admin/Infrastructure/Hubs/StompWebSocketService.cs

499 lines
19 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using Microsoft.Extensions.Configuration;
using Prism.Events;
using System.IO;
using System.Net.WebSockets;
using System.Text;
using System.Timers;
using YY.Admin.Core.Events;
using YY.Admin.Core.Services;
using YY.Admin.Core.Session;
using YY.Admin.Helper;
using YY.Admin.Infrastructure.Storage;
namespace YY.Admin.Infrastructure.Hubs;
public class StompWebSocketService : ISignalRService
{
// STOMP heart-beat: send \n every 10 s, declare we want to receive every 10 s
private const int HeartbeatMs = 10_000;
// Watchdog: if nothing received in 3 × HeartbeatMs, treat connection as zombie
private const int WatchdogMs = 30_000;
private readonly IConfiguration _configuration;
private readonly IEventAggregator _eventAggregator;
private readonly TokenStore _tokenStore;
private readonly INetworkMonitor _networkMonitor;
private ClientWebSocket? _socket;
private string _deviceId = "default-device";
private string _token = string.Empty;
private CancellationTokenSource? _connectionCts;
private System.Timers.Timer? _heartbeatTimer;
private System.Timers.Timer? _watchdogTimer;
private volatile int _lastReceivedTick = Environment.TickCount;
public StompWebSocketService(
IConfiguration configuration,
IEventAggregator eventAggregator,
TokenStore tokenStore,
INetworkMonitor networkMonitor)
{
_configuration = configuration;
_eventAggregator = eventAggregator;
_tokenStore = tokenStore;
_networkMonitor = networkMonitor;
// When the network comes back online, reconnect the STOMP channel if it is down.
eventAggregator.GetEvent<NetworkStatusChangedEvent>()
.Subscribe(OnNetworkStatusChanged, ThreadOption.BackgroundThread);
}
private void OnNetworkStatusChanged(NetworkStatusChangedPayload payload)
{
if (!payload.IsOnline || IsDisconnectedByUser())
{
return;
}
if (_socket == null || _socket.State != WebSocketState.Open)
{
_ = Task.Run(() => ConnectUnifiedDeviceChannelAsync(CancellationToken.None));
}
}
/// <inheritdoc />
public async Task ConnectAsync(string token, CancellationToken cancellationToken = default)
{
_token = token ?? string.Empty;
await ConnectUnifiedDeviceChannelAsync(cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task ConnectUnifiedDeviceChannelAsync(CancellationToken cancellationToken = default)
{
if (IsDisconnectedByUser())
{
await DisconnectAsync(cancellationToken).ConfigureAwait(false);
return;
}
var anonymous = _configuration.GetValue("JeecgIntegration:AnonymousMode", true);
if (anonymous)
{
_token = string.Empty;
}
else if (string.IsNullOrWhiteSpace(_token))
{
_token = await _tokenStore.GetTokenAsync(cancellationToken).ConfigureAwait(false) ?? string.Empty;
}
_deviceId = ResolveDeviceId(_token);
var wsUrl = ResolveWsUrl();
var retryDelays = new[] { 0, 2, 5, 10, 30 };
foreach (var delay in retryDelays)
{
if (delay > 0)
{
await Task.Delay(TimeSpan.FromSeconds(delay), cancellationToken).ConfigureAwait(false);
}
try
{
// Tear down previous session before opening a new one.
var oldCts = Interlocked.Exchange(ref _connectionCts, null);
oldCts?.Cancel();
oldCts?.Dispose();
StopTimers();
_socket?.Dispose();
_socket = new ClientWebSocket();
_socket.Options.AddSubProtocol("v12.stomp");
await _socket.ConnectAsync(new Uri(wsUrl), cancellationToken).ConfigureAwait(false);
var connectFrame = anonymous || string.IsNullOrWhiteSpace(_token)
? BuildConnectFrame(null, _deviceId)
: BuildConnectFrame(_token, _deviceId);
await SendFrameAsync(connectFrame, cancellationToken).ConfigureAwait(false);
// 用户镜像变更:订阅 /topic/sync/jeecg-users
await SendFrameAsync(
BuildSubscribeFrame("sub-jeecg-users", "/topic/sync/jeecg-users"),
cancellationToken).ConfigureAwait(false);
// 车辆数据变更:订阅 /topic/sync/mes-vehicles
await SendFrameAsync(
BuildSubscribeFrame("sub-mes-vehicles", "/topic/sync/mes-vehicles"),
cancellationToken).ConfigureAwait(false);
// 客户数据变更:订阅 /topic/sync/mes-customers
await SendFrameAsync(
BuildSubscribeFrame("sub-mes-customers", "/topic/sync/mes-customers"),
cancellationToken).ConfigureAwait(false);
// 供应商数据变更:订阅 /topic/sync/mes-suppliers
await SendFrameAsync(
BuildSubscribeFrame("sub-mes-suppliers", "/topic/sync/mes-suppliers"),
cancellationToken).ConfigureAwait(false);
// 磅单数据变更:订阅 /topic/sync/mes-weight-records
await SendFrameAsync(
BuildSubscribeFrame("sub-mes-weight-records", "/topic/sync/mes-weight-records"),
cancellationToken).ConfigureAwait(false);
// 密炼物料数据变更:订阅 /topic/sync/mes-mixer-materials
await SendFrameAsync(
BuildSubscribeFrame("sub-mes-mixer-material", "/topic/sync/mes-mixer-materials"),
cancellationToken).ConfigureAwait(false);
// 分类字典变更:订阅 /topic/sync/sys-categories
await SendFrameAsync(
BuildSubscribeFrame("sub-sys-categories", "/topic/sync/sys-categories"),
cancellationToken).ConfigureAwait(false);
// 数据字典变更:订阅 /topic/sync/sys-dicts
await SendFrameAsync(
BuildSubscribeFrame("sub-sys-dicts", "/topic/sync/sys-dicts"),
cancellationToken).ConfigureAwait(false);
// 原料入场记录变更:订阅 /topic/sync/mes-raw-material-entries
await SendFrameAsync(
BuildSubscribeFrame("sub-mes-raw-material-entries", "/topic/sync/mes-raw-material-entries"),
cancellationToken).ConfigureAwait(false);
// 原材料卡片变更:订阅 /topic/sync/mes-raw-material-cards
await SendFrameAsync(
BuildSubscribeFrame("sub-mes-raw-material-cards", "/topic/sync/mes-raw-material-cards"),
cancellationToken).ConfigureAwait(false);
// 库区数据变更:订阅 /topic/sync/mes-warehouse-areas
await SendFrameAsync(
BuildSubscribeFrame("sub-mes-warehouse-areas", "/topic/sync/mes-warehouse-areas"),
cancellationToken).ConfigureAwait(false);
// 密炼物料皮重策略变更:订阅 /topic/sync/mes-mixer-material-tare-strategies
await SendFrameAsync(
BuildSubscribeFrame("sub-mes-mixer-material-tare-strategies", "/topic/sync/mes-mixer-material-tare-strategies"),
cancellationToken).ConfigureAwait(false);
// 打印模板变更:订阅 /topic/sync/print-templates
await SendFrameAsync(
BuildSubscribeFrame("sub-print-templates", "/topic/sync/print-templates"),
cancellationToken).ConfigureAwait(false);
// 业务打印绑定变更:订阅 /topic/sync/print-biz-binds
await SendFrameAsync(
BuildSubscribeFrame("sub-print-biz-binds", "/topic/sync/print-biz-binds"),
cancellationToken).ConfigureAwait(false);
// 订阅服务端 PONG 回复(应用层假在线检测)
await SendFrameAsync(
BuildSubscribeFrame("sub-device-pong", $"/topic/device/{_deviceId}/pong"),
cancellationToken).ConfigureAwait(false);
if (!anonymous && !string.IsNullOrWhiteSpace(_token))
{
await SendFrameAsync(
BuildSubscribeFrame("sub-device-command", $"/user/{_deviceId}/queue/command"),
cancellationToken).ConfigureAwait(false);
}
// Reset watchdog baseline to now.
_lastReceivedTick = Environment.TickCount;
var cts = new CancellationTokenSource();
_connectionCts = cts;
StartHeartbeatTimer(cts.Token);
StartWatchdogTimer(cts.Token);
_networkMonitor.SetStompTransportOnline(true);
_ = Task.Run(() => ReceiveLoopAsync(cts.Token), cts.Token);
return;
}
catch
{
_networkMonitor.SetStompTransportOnline(false);
}
}
}
/// <inheritdoc />
public async Task SendDeviceStatusAsync(object status, CancellationToken cancellationToken = default)
{
if (IsDisconnectedByUser() || _socket == null || _socket.State != WebSocketState.Open)
{
return;
}
var json = System.Text.Json.JsonSerializer.Serialize(status);
var frame = "SEND\n" +
"destination:/app/device/status\n" +
"content-type:application/json\n" +
$"content-length:{Encoding.UTF8.GetByteCount(json)}\n\n" +
$"{json}\0";
await SendFrameAsync(frame, cancellationToken).ConfigureAwait(false);
}
public async Task DisconnectAsync(CancellationToken cancellationToken = default)
{
var cts = Interlocked.Exchange(ref _connectionCts, null);
cts?.Cancel();
cts?.Dispose();
StopTimers();
var socket = _socket;
_socket = null;
if (socket == null)
{
_networkMonitor.SetStompTransportOnline(false);
return;
}
try
{
if (socket.State == WebSocketState.Open || socket.State == WebSocketState.CloseReceived)
{
await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "manual disconnect", cancellationToken).ConfigureAwait(false);
}
}
catch
{
socket.Abort();
}
finally
{
socket.Dispose();
_networkMonitor.SetStompTransportOnline(false);
}
}
// ── Heartbeat ──────────────────────────────────────────────────────────
private void StartHeartbeatTimer(CancellationToken cancellationToken)
{
_heartbeatTimer = new System.Timers.Timer(HeartbeatMs) { AutoReset = true };
_heartbeatTimer.Elapsed += async (_, _) =>
{
if (cancellationToken.IsCancellationRequested)
{
return;
}
// STOMP protocol keepalive (\n frame)
await SendStompHeartbeatAsync(cancellationToken).ConfigureAwait(false);
// Application-layer PING: server replies to /topic/device/{id}/pong
await SendAppPingAsync(cancellationToken).ConfigureAwait(false);
};
_heartbeatTimer.Start();
}
private async Task SendStompHeartbeatAsync(CancellationToken cancellationToken)
{
if (_socket == null || _socket.State != WebSocketState.Open)
{
return;
}
try
{
// STOMP spec: a single LF (0x0A) constitutes a heartbeat frame.
await _socket.SendAsync(
new ArraySegment<byte>(new byte[] { 0x0A }),
WebSocketMessageType.Text,
true,
cancellationToken).ConfigureAwait(false);
}
catch
{
// Send failure will be caught by the watchdog.
}
}
private async Task SendAppPingAsync(CancellationToken cancellationToken)
{
if (_socket == null || _socket.State != WebSocketState.Open)
{
return;
}
try
{
var body = $"{{\"cmd\":\"PING_DEVICE\",\"deviceId\":\"{_deviceId}\",\"ts\":{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}}}";
var frame = "SEND\n" +
"destination:/app/device/ping\n" +
"content-type:application/json\n" +
$"content-length:{Encoding.UTF8.GetByteCount(body)}\n\n" +
$"{body}\0";
await SendFrameAsync(frame, cancellationToken).ConfigureAwait(false);
}
catch
{
// Watchdog handles reconnect.
}
}
// ── Watchdog ───────────────────────────────────────────────────────────
private void StartWatchdogTimer(CancellationToken cancellationToken)
{
_watchdogTimer = new System.Timers.Timer(WatchdogMs) { AutoReset = true };
_watchdogTimer.Elapsed += (_, _) =>
{
if (cancellationToken.IsCancellationRequested)
{
return;
}
// TickCount wraps; unchecked subtraction handles it correctly.
var silenceMs = unchecked(Environment.TickCount - _lastReceivedTick);
if (silenceMs >= WatchdogMs)
{
// No frame (heartbeat or MESSAGE) received for WatchdogMs — zombie connection.
_ = Task.Run(() => ConnectUnifiedDeviceChannelAsync(CancellationToken.None));
}
};
_watchdogTimer.Start();
}
private void StopTimers()
{
_heartbeatTimer?.Stop();
_heartbeatTimer?.Dispose();
_heartbeatTimer = null;
_watchdogTimer?.Stop();
_watchdogTimer?.Dispose();
_watchdogTimer = null;
}
// ── Receive loop ───────────────────────────────────────────────────────
private async Task ReceiveLoopAsync(CancellationToken cancellationToken)
{
if (_socket == null)
{
return;
}
var buffer = new byte[8192];
while (_socket.State == WebSocketState.Open && !cancellationToken.IsCancellationRequested)
{
using var ms = new MemoryStream();
WebSocketReceiveResult result;
do
{
result = await _socket.ReceiveAsync(new ArraySegment<byte>(buffer), cancellationToken).ConfigureAwait(false);
if (result.MessageType == WebSocketMessageType.Close)
{
_networkMonitor.SetStompTransportOnline(false);
await ConnectUnifiedDeviceChannelAsync(cancellationToken).ConfigureAwait(false);
return;
}
ms.Write(buffer, 0, result.Count);
} while (!result.EndOfMessage);
// Any received frame (heartbeat \n or MESSAGE) resets the watchdog.
_lastReceivedTick = Environment.TickCount;
var text = Encoding.UTF8.GetString(ms.ToArray());
if (!text.StartsWith("MESSAGE", StringComparison.OrdinalIgnoreCase))
{
continue;
}
var idx = text.IndexOf("\n\n", StringComparison.Ordinal);
if (idx < 0)
{
continue;
}
var body = text[(idx + 2)..].TrimEnd('\0');
_eventAggregator.GetEvent<RemoteCommandReceivedEvent>().Publish(new RemoteCommandPayload
{
DeviceId = _deviceId,
CommandJson = body
});
}
}
private async Task SendFrameAsync(string frame, CancellationToken cancellationToken)
{
if (_socket == null || _socket.State != WebSocketState.Open)
{
return;
}
var data = Encoding.UTF8.GetBytes(frame);
await _socket.SendAsync(
new ArraySegment<byte>(data),
WebSocketMessageType.Text,
true,
cancellationToken).ConfigureAwait(false);
}
// ── Helpers ────────────────────────────────────────────────────────────
private string ResolveWsUrl()
{
var baseUrl = _configuration.GetValue<string>("JeecgIntegration:BaseUrl")?.TrimEnd('/');
if (string.IsNullOrWhiteSpace(baseUrl))
{
return "ws://127.0.0.1:8080/jeecg-boot/ws/device";
}
if (baseUrl.StartsWith("https://", StringComparison.OrdinalIgnoreCase))
{
return "wss://" + baseUrl["https://".Length..] + "/ws/device";
}
return "ws://" + baseUrl["http://".Length..] + "/ws/device";
}
private static string ResolveDeviceId(string token)
{
try
{
var parts = token.Split('.');
if (parts.Length < 2)
{
return "default-device";
}
var payload = parts[1].Replace('-', '+').Replace('_', '/');
payload = payload.PadRight(payload.Length + (4 - payload.Length % 4) % 4, '=');
var json = Encoding.UTF8.GetString(Convert.FromBase64String(payload));
using var doc = System.Text.Json.JsonDocument.Parse(json);
if (doc.RootElement.TryGetProperty("deviceId", out var deviceId))
{
return deviceId.GetString() ?? "default-device";
}
if (doc.RootElement.TryGetProperty("username", out var username))
{
return username.GetString() ?? "default-device";
}
return "default-device";
}
catch
{
return "default-device";
}
}
private static string BuildConnectFrame(string? token, string deviceId)
{
var user = AppSession.CurrentUser;
var sb = new System.Text.StringBuilder();
sb.Append("CONNECT\n");
sb.Append("accept-version:1.2\n");
sb.Append($"heart-beat:{HeartbeatMs},{HeartbeatMs}\n");
if (!string.IsNullOrWhiteSpace(token))
sb.Append($"Authorization:Bearer {token}\n");
sb.Append("platform:desktop\n");
sb.Append($"hostName:{Environment.MachineName}\n");
sb.Append($"deviceId:{deviceId}\n");
sb.Append($"userName:{user?.Account ?? "unknown"}\n");
sb.Append($"realName:{user?.RealName ?? ""}\n");
sb.Append("\n\0");
return sb.ToString();
}
private static string BuildSubscribeFrame(string subscriptionId, string destination)
{
return "SUBSCRIBE\n" +
$"id:{subscriptionId}\n" +
$"destination:{destination}\n" +
"ack:auto\n\n\0";
}
private static bool IsDisconnectedByUser()
{
try
{
return ServerSettingsStore.Load().DisconnectConnection;
}
catch
{
return false;
}
}
}