using Microsoft.Extensions.Configuration; using Prism.Events; using System.IO; using System.Net.WebSockets; using System.Text; using System.Timers; using YY.Admin.Core.Events; using YY.Admin.Core.Services; using YY.Admin.Core.Session; using YY.Admin.Helper; using YY.Admin.Infrastructure.Storage; namespace YY.Admin.Infrastructure.Hubs; public class StompWebSocketService : ISignalRService { // STOMP heart-beat: send \n every 10 s, declare we want to receive every 10 s private const int HeartbeatMs = 10_000; // Watchdog: if nothing received in 3 × HeartbeatMs, treat connection as zombie private const int WatchdogMs = 30_000; private readonly IConfiguration _configuration; private readonly IEventAggregator _eventAggregator; private readonly TokenStore _tokenStore; private readonly INetworkMonitor _networkMonitor; private ClientWebSocket? _socket; private string _deviceId = "default-device"; private string _token = string.Empty; private CancellationTokenSource? _connectionCts; private System.Timers.Timer? _heartbeatTimer; private System.Timers.Timer? _watchdogTimer; private volatile int _lastReceivedTick = Environment.TickCount; public StompWebSocketService( IConfiguration configuration, IEventAggregator eventAggregator, TokenStore tokenStore, INetworkMonitor networkMonitor) { _configuration = configuration; _eventAggregator = eventAggregator; _tokenStore = tokenStore; _networkMonitor = networkMonitor; // When the network comes back online, reconnect the STOMP channel if it is down. eventAggregator.GetEvent() .Subscribe(OnNetworkStatusChanged, ThreadOption.BackgroundThread); } private void OnNetworkStatusChanged(NetworkStatusChangedPayload payload) { if (!payload.IsOnline || IsDisconnectedByUser()) { return; } if (_socket == null || _socket.State != WebSocketState.Open) { _ = Task.Run(() => ConnectUnifiedDeviceChannelAsync(CancellationToken.None)); } } /// public async Task ConnectAsync(string token, CancellationToken cancellationToken = default) { _token = token ?? string.Empty; await ConnectUnifiedDeviceChannelAsync(cancellationToken).ConfigureAwait(false); } /// public async Task ConnectUnifiedDeviceChannelAsync(CancellationToken cancellationToken = default) { if (IsDisconnectedByUser()) { await DisconnectAsync(cancellationToken).ConfigureAwait(false); return; } var anonymous = _configuration.GetValue("JeecgIntegration:AnonymousMode", true); if (anonymous) { _token = string.Empty; } else if (string.IsNullOrWhiteSpace(_token)) { _token = await _tokenStore.GetTokenAsync(cancellationToken).ConfigureAwait(false) ?? string.Empty; } _deviceId = ResolveDeviceId(_token); var wsUrl = ResolveWsUrl(); var retryDelays = new[] { 0, 2, 5, 10, 30 }; foreach (var delay in retryDelays) { if (delay > 0) { await Task.Delay(TimeSpan.FromSeconds(delay), cancellationToken).ConfigureAwait(false); } try { // Tear down previous session before opening a new one. var oldCts = Interlocked.Exchange(ref _connectionCts, null); oldCts?.Cancel(); oldCts?.Dispose(); StopTimers(); _socket?.Dispose(); _socket = new ClientWebSocket(); _socket.Options.AddSubProtocol("v12.stomp"); await _socket.ConnectAsync(new Uri(wsUrl), cancellationToken).ConfigureAwait(false); var connectFrame = anonymous || string.IsNullOrWhiteSpace(_token) ? BuildConnectFrame(null, _deviceId) : BuildConnectFrame(_token, _deviceId); await SendFrameAsync(connectFrame, cancellationToken).ConfigureAwait(false); // 用户镜像变更:订阅 /topic/sync/jeecg-users await SendFrameAsync( BuildSubscribeFrame("sub-jeecg-users", "/topic/sync/jeecg-users"), cancellationToken).ConfigureAwait(false); // 车辆数据变更:订阅 /topic/sync/mes-vehicles await SendFrameAsync( BuildSubscribeFrame("sub-mes-vehicles", "/topic/sync/mes-vehicles"), cancellationToken).ConfigureAwait(false); // 客户数据变更:订阅 /topic/sync/mes-customers await SendFrameAsync( BuildSubscribeFrame("sub-mes-customers", "/topic/sync/mes-customers"), cancellationToken).ConfigureAwait(false); // 供应商数据变更:订阅 /topic/sync/mes-suppliers await SendFrameAsync( BuildSubscribeFrame("sub-mes-suppliers", "/topic/sync/mes-suppliers"), cancellationToken).ConfigureAwait(false); // 磅单数据变更:订阅 /topic/sync/mes-weight-records await SendFrameAsync( BuildSubscribeFrame("sub-mes-weight-records", "/topic/sync/mes-weight-records"), cancellationToken).ConfigureAwait(false); // 订阅服务端 PONG 回复(应用层假在线检测) await SendFrameAsync( BuildSubscribeFrame("sub-device-pong", $"/topic/device/{_deviceId}/pong"), cancellationToken).ConfigureAwait(false); if (!anonymous && !string.IsNullOrWhiteSpace(_token)) { await SendFrameAsync( BuildSubscribeFrame("sub-device-command", $"/user/{_deviceId}/queue/command"), cancellationToken).ConfigureAwait(false); } // Reset watchdog baseline to now. _lastReceivedTick = Environment.TickCount; var cts = new CancellationTokenSource(); _connectionCts = cts; StartHeartbeatTimer(cts.Token); StartWatchdogTimer(cts.Token); _networkMonitor.SetStompTransportOnline(true); _ = Task.Run(() => ReceiveLoopAsync(cts.Token), cts.Token); return; } catch { _networkMonitor.SetStompTransportOnline(false); } } } /// public async Task SendDeviceStatusAsync(object status, CancellationToken cancellationToken = default) { if (IsDisconnectedByUser() || _socket == null || _socket.State != WebSocketState.Open) { return; } var json = System.Text.Json.JsonSerializer.Serialize(status); var frame = "SEND\n" + "destination:/app/device/status\n" + "content-type:application/json\n" + $"content-length:{Encoding.UTF8.GetByteCount(json)}\n\n" + $"{json}\0"; await SendFrameAsync(frame, cancellationToken).ConfigureAwait(false); } public async Task DisconnectAsync(CancellationToken cancellationToken = default) { var cts = Interlocked.Exchange(ref _connectionCts, null); cts?.Cancel(); cts?.Dispose(); StopTimers(); var socket = _socket; _socket = null; if (socket == null) { _networkMonitor.SetStompTransportOnline(false); return; } try { if (socket.State == WebSocketState.Open || socket.State == WebSocketState.CloseReceived) { await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "manual disconnect", cancellationToken).ConfigureAwait(false); } } catch { socket.Abort(); } finally { socket.Dispose(); _networkMonitor.SetStompTransportOnline(false); } } // ── Heartbeat ────────────────────────────────────────────────────────── private void StartHeartbeatTimer(CancellationToken cancellationToken) { _heartbeatTimer = new System.Timers.Timer(HeartbeatMs) { AutoReset = true }; _heartbeatTimer.Elapsed += async (_, _) => { if (cancellationToken.IsCancellationRequested) { return; } // STOMP protocol keepalive (\n frame) await SendStompHeartbeatAsync(cancellationToken).ConfigureAwait(false); // Application-layer PING: server replies to /topic/device/{id}/pong await SendAppPingAsync(cancellationToken).ConfigureAwait(false); }; _heartbeatTimer.Start(); } private async Task SendStompHeartbeatAsync(CancellationToken cancellationToken) { if (_socket == null || _socket.State != WebSocketState.Open) { return; } try { // STOMP spec: a single LF (0x0A) constitutes a heartbeat frame. await _socket.SendAsync( new ArraySegment(new byte[] { 0x0A }), WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false); } catch { // Send failure will be caught by the watchdog. } } private async Task SendAppPingAsync(CancellationToken cancellationToken) { if (_socket == null || _socket.State != WebSocketState.Open) { return; } try { var body = $"{{\"cmd\":\"PING_DEVICE\",\"deviceId\":\"{_deviceId}\",\"ts\":{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}}}"; var frame = "SEND\n" + "destination:/app/device/ping\n" + "content-type:application/json\n" + $"content-length:{Encoding.UTF8.GetByteCount(body)}\n\n" + $"{body}\0"; await SendFrameAsync(frame, cancellationToken).ConfigureAwait(false); } catch { // Watchdog handles reconnect. } } // ── Watchdog ─────────────────────────────────────────────────────────── private void StartWatchdogTimer(CancellationToken cancellationToken) { _watchdogTimer = new System.Timers.Timer(WatchdogMs) { AutoReset = true }; _watchdogTimer.Elapsed += (_, _) => { if (cancellationToken.IsCancellationRequested) { return; } // TickCount wraps; unchecked subtraction handles it correctly. var silenceMs = unchecked(Environment.TickCount - _lastReceivedTick); if (silenceMs >= WatchdogMs) { // No frame (heartbeat or MESSAGE) received for WatchdogMs — zombie connection. _ = Task.Run(() => ConnectUnifiedDeviceChannelAsync(CancellationToken.None)); } }; _watchdogTimer.Start(); } private void StopTimers() { _heartbeatTimer?.Stop(); _heartbeatTimer?.Dispose(); _heartbeatTimer = null; _watchdogTimer?.Stop(); _watchdogTimer?.Dispose(); _watchdogTimer = null; } // ── Receive loop ─────────────────────────────────────────────────────── private async Task ReceiveLoopAsync(CancellationToken cancellationToken) { if (_socket == null) { return; } var buffer = new byte[8192]; while (_socket.State == WebSocketState.Open && !cancellationToken.IsCancellationRequested) { using var ms = new MemoryStream(); WebSocketReceiveResult result; do { result = await _socket.ReceiveAsync(new ArraySegment(buffer), cancellationToken).ConfigureAwait(false); if (result.MessageType == WebSocketMessageType.Close) { _networkMonitor.SetStompTransportOnline(false); await ConnectUnifiedDeviceChannelAsync(cancellationToken).ConfigureAwait(false); return; } ms.Write(buffer, 0, result.Count); } while (!result.EndOfMessage); // Any received frame (heartbeat \n or MESSAGE) resets the watchdog. _lastReceivedTick = Environment.TickCount; var text = Encoding.UTF8.GetString(ms.ToArray()); if (!text.StartsWith("MESSAGE", StringComparison.OrdinalIgnoreCase)) { continue; } var idx = text.IndexOf("\n\n", StringComparison.Ordinal); if (idx < 0) { continue; } var body = text[(idx + 2)..].TrimEnd('\0'); _eventAggregator.GetEvent().Publish(new RemoteCommandPayload { DeviceId = _deviceId, CommandJson = body }); } } private async Task SendFrameAsync(string frame, CancellationToken cancellationToken) { if (_socket == null || _socket.State != WebSocketState.Open) { return; } var data = Encoding.UTF8.GetBytes(frame); await _socket.SendAsync( new ArraySegment(data), WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false); } // ── Helpers ──────────────────────────────────────────────────────────── private string ResolveWsUrl() { var baseUrl = _configuration.GetValue("JeecgIntegration:BaseUrl")?.TrimEnd('/'); if (string.IsNullOrWhiteSpace(baseUrl)) { return "ws://127.0.0.1:8080/jeecg-boot/ws/device"; } if (baseUrl.StartsWith("https://", StringComparison.OrdinalIgnoreCase)) { return "wss://" + baseUrl["https://".Length..] + "/ws/device"; } return "ws://" + baseUrl["http://".Length..] + "/ws/device"; } private static string ResolveDeviceId(string token) { try { var parts = token.Split('.'); if (parts.Length < 2) { return "default-device"; } var payload = parts[1].Replace('-', '+').Replace('_', '/'); payload = payload.PadRight(payload.Length + (4 - payload.Length % 4) % 4, '='); var json = Encoding.UTF8.GetString(Convert.FromBase64String(payload)); using var doc = System.Text.Json.JsonDocument.Parse(json); if (doc.RootElement.TryGetProperty("deviceId", out var deviceId)) { return deviceId.GetString() ?? "default-device"; } if (doc.RootElement.TryGetProperty("username", out var username)) { return username.GetString() ?? "default-device"; } return "default-device"; } catch { return "default-device"; } } private static string BuildConnectFrame(string? token, string deviceId) { var user = AppSession.CurrentUser; var sb = new System.Text.StringBuilder(); sb.Append("CONNECT\n"); sb.Append("accept-version:1.2\n"); sb.Append($"heart-beat:{HeartbeatMs},{HeartbeatMs}\n"); if (!string.IsNullOrWhiteSpace(token)) sb.Append($"Authorization:Bearer {token}\n"); sb.Append("platform:desktop\n"); sb.Append($"hostName:{Environment.MachineName}\n"); sb.Append($"deviceId:{deviceId}\n"); sb.Append($"userName:{user?.Account ?? "unknown"}\n"); sb.Append($"realName:{user?.RealName ?? ""}\n"); sb.Append("\n\0"); return sb.ToString(); } private static string BuildSubscribeFrame(string subscriptionId, string destination) { return "SUBSCRIBE\n" + $"id:{subscriptionId}\n" + $"destination:{destination}\n" + "ack:auto\n\n\0"; } private static bool IsDisconnectedByUser() { try { return ServerSettingsStore.Load().DisconnectConnection; } catch { return false; } } }