using Microsoft.Extensions.Configuration; using System.Net.Http; using System.Net.WebSockets; using System.Text; using System.IO; namespace YY.Admin.Services.Service.Jeecg; /// /// Jeecg 后端网关实现。 /// public class JeecgBackendGateway : IJeecgBackendGateway, ISingletonDependency { private readonly IConfiguration _configuration; private readonly HttpClient _httpClient; private readonly ILoggerService _logger; private readonly SemaphoreSlim _wsSendLock = new(1, 1); private readonly object _wsStateLock = new(); private ClientWebSocket? _activeWebSocket; public JeecgBackendGateway( IConfiguration configuration, HttpClient httpClient, ILoggerService logger) { _configuration = configuration; _httpClient = httpClient; _logger = logger; } public async Task ExecuteGetAsync( string relativeOrAbsoluteUrl, Dictionary? headers = null, CancellationToken cancellationToken = default) { var requestUrl = BuildUrl(relativeOrAbsoluteUrl); using var req = new HttpRequestMessage(HttpMethod.Get, requestUrl); if (headers != null) { foreach (var kv in headers) { req.Headers.TryAddWithoutValidation(kv.Key, kv.Value); } } return await _httpClient.SendAsync(req, cancellationToken); } public async Task ExecuteGetStringAsync( string relativeOrAbsoluteUrl, Dictionary? headers = null, CancellationToken cancellationToken = default) { using var resp = await ExecuteGetAsync(relativeOrAbsoluteUrl, headers, cancellationToken); if (!resp.IsSuccessStatusCode) { return null; } return await resp.Content.ReadAsStringAsync(cancellationToken); } public async Task RunWebSocketLoopAsync( Func onMessage, CancellationToken cancellationToken) { var wsUrl = ResolveWebSocketUrl(); _logger.Information($"Jeecg WebSocket 解析地址: {wsUrl}"); if (string.IsNullOrWhiteSpace(wsUrl)) { _logger.Warning("Jeecg WebSocket 未启动:解析地址为空"); return; } var backoffSeconds = 5; var buffer = new byte[8192]; while (!cancellationToken.IsCancellationRequested) { using var ws = new ClientWebSocket(); try { ws.Options.KeepAliveInterval = TimeSpan.FromSeconds(10); await ws.ConnectAsync(new Uri(wsUrl), cancellationToken); lock (_wsStateLock) { _activeWebSocket = ws; } backoffSeconds = 5; _logger.Information($"Jeecg WebSocket 已连接: {wsUrl}"); using var heartbeatCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); var lastReceiveTicks = DateTime.UtcNow.Ticks; _ = Task.Run(() => HeartbeatLoopAsync(ws, heartbeatCts.Token), heartbeatCts.Token); var inactivitySeconds = _configuration.GetValue("JeecgIntegration:WebSocketInactivityReconnectSeconds", 0); CancellationTokenSource? inactivityCts = null; if (inactivitySeconds > 0) { inactivityCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); _ = Task.Run(() => InactivityReconnectLoopAsync(ws, () => Interlocked.Read(ref lastReceiveTicks), inactivityCts.Token), inactivityCts.Token); } while (ws.State == WebSocketState.Open && !cancellationToken.IsCancellationRequested) { using var ms = new MemoryStream(); WebSocketReceiveResult result; do { var seg = new ArraySegment(buffer); result = await ws.ReceiveAsync(seg, cancellationToken); _logger.Information($"Jeecg WebSocket 收帧: type={result.MessageType}, count={result.Count}, end={result.EndOfMessage}, state={ws.State}"); if (result.MessageType == WebSocketMessageType.Close) { _logger.Warning($"Jeecg WebSocket 收到关闭帧: closeStatus={result.CloseStatus}, desc={result.CloseStatusDescription}"); break; } ms.Write(buffer, 0, result.Count); } while (!result.EndOfMessage); if (result.MessageType == WebSocketMessageType.Close) { _logger.Warning("Jeecg WebSocket 接收循环检测到关闭帧,准备重连"); break; } var payload = Encoding.UTF8.GetString(ms.ToArray()); Interlocked.Exchange(ref lastReceiveTicks, DateTime.UtcNow.Ticks); _logger.Information($"Jeecg WebSocket 收到原始消息: {payload}"); await onMessage(payload); } heartbeatCts.Cancel(); inactivityCts?.Cancel(); _logger.Warning($"Jeecg WebSocket 接收循环退出,当前状态={ws.State}"); } catch (OperationCanceledException) { _logger.Warning("Jeecg WebSocket 接收循环取消"); break; } catch (Exception ex) { _logger.Warning($"Jeecg WebSocket 断开,{backoffSeconds} 秒后重连。地址: {wsUrl},异常: {ex.Message}"); } finally { lock (_wsStateLock) { if (ReferenceEquals(_activeWebSocket, ws)) { _activeWebSocket = null; } } } try { await Task.Delay(TimeSpan.FromSeconds(Math.Min(backoffSeconds, 120)), cancellationToken); } catch (OperationCanceledException) { break; } backoffSeconds = Math.Min(backoffSeconds * 2, 120); } } private async Task HeartbeatLoopAsync(ClientWebSocket ws, CancellationToken cancellationToken) { var payload = Encoding.UTF8.GetBytes("ping"); while (!cancellationToken.IsCancellationRequested && ws.State == WebSocketState.Open) { try { await Task.Delay(TimeSpan.FromSeconds(12), cancellationToken); if (ws.State != WebSocketState.Open) { break; } await ws.SendAsync(new ArraySegment(payload), WebSocketMessageType.Text, true, cancellationToken); } catch (OperationCanceledException) { break; } catch (Exception ex) { _logger.Warning($"Jeecg WebSocket 心跳发送失败: {ex.Message}"); break; } } } private async Task InactivityReconnectLoopAsync(ClientWebSocket ws, Func getLastReceiveTicks, CancellationToken cancellationToken) { var inactivitySeconds = _configuration.GetValue("JeecgIntegration:WebSocketInactivityReconnectSeconds", 0); if (inactivitySeconds <= 0) { return; } while (!cancellationToken.IsCancellationRequested && ws.State == WebSocketState.Open) { try { await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken); if (ws.State != WebSocketState.Open) { break; } var lastReceiveUtc = new DateTime(getLastReceiveTicks(), DateTimeKind.Utc); var idleSeconds = (DateTime.UtcNow - lastReceiveUtc).TotalSeconds; if (idleSeconds < inactivitySeconds) { continue; } _logger.Warning($"Jeecg WebSocket 超过 {Math.Round(idleSeconds)} 秒未收到任何消息,主动重连"); ws.Abort(); break; } catch (OperationCanceledException) { break; } catch (Exception ex) { _logger.Warning($"Jeecg WebSocket 空闲检测异常: {ex.Message}"); break; } } } public async Task SendWebSocketMessageAsync(string message, CancellationToken cancellationToken = default) { ClientWebSocket? socket; lock (_wsStateLock) { socket = _activeWebSocket; } if (socket == null || socket.State != WebSocketState.Open) { return false; } var bytes = Encoding.UTF8.GetBytes(message); var seg = new ArraySegment(bytes); await _wsSendLock.WaitAsync(cancellationToken); try { await socket.SendAsync(seg, WebSocketMessageType.Text, true, cancellationToken); return true; } catch { return false; } finally { _wsSendLock.Release(); } } public async Task SendWebSocketOneShotAsync(string message, CancellationToken cancellationToken = default) { var wsUrl = ResolveWebSocketUrl(); if (string.IsNullOrWhiteSpace(wsUrl)) { return false; } using var ws = new ClientWebSocket(); try { await ws.ConnectAsync(new Uri(wsUrl), cancellationToken); var bytes = Encoding.UTF8.GetBytes(message); var seg = new ArraySegment(bytes); await ws.SendAsync(seg, WebSocketMessageType.Text, true, cancellationToken); await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "done", cancellationToken); return true; } catch (Exception ex) { _logger.Warning($"Jeecg WebSocket 单次上报失败: {ex.Message}"); return false; } } private string BuildUrl(string relativeOrAbsoluteUrl) { if (Uri.TryCreate(relativeOrAbsoluteUrl, UriKind.Absolute, out _)) { return relativeOrAbsoluteUrl; } var baseUrl = _configuration.GetValue("JeecgIntegration:BaseUrl")?.TrimEnd('/'); if (string.IsNullOrWhiteSpace(baseUrl)) { return relativeOrAbsoluteUrl; } var path = relativeOrAbsoluteUrl.StartsWith("/") ? relativeOrAbsoluteUrl : "/" + relativeOrAbsoluteUrl; return $"{baseUrl}{path}"; } private string ResolveWebSocketUrl() { var anonymousMode = _configuration.GetValue("JeecgIntegration:AnonymousMode", true); var configUrl = _configuration.GetValue("JeecgIntegration:WebSocketUrl"); if (!string.IsNullOrWhiteSpace(configUrl)) { return NormalizeWebSocketUrl(configUrl.Trim(), anonymousMode); } var baseUrl = _configuration.GetValue("JeecgIntegration:BaseUrl")?.TrimEnd('/'); if (string.IsNullOrWhiteSpace(baseUrl)) { return string.Empty; } var wsPath = _configuration.GetValue("JeecgIntegration:WebSocketPath"); if (string.IsNullOrWhiteSpace(wsPath)) { wsPath = "/websocket/scada-sync"; } else if (!wsPath.StartsWith("/")) { wsPath = "/" + wsPath; } // 默认从 BaseUrl + WebSocketPath 推导地址,避免只连到根路径导致握手失败 if (baseUrl.StartsWith("https://", StringComparison.OrdinalIgnoreCase)) { return NormalizeWebSocketUrl("wss://" + baseUrl["https://".Length..] + wsPath, anonymousMode); } if (baseUrl.StartsWith("http://", StringComparison.OrdinalIgnoreCase)) { return NormalizeWebSocketUrl("ws://" + baseUrl["http://".Length..] + wsPath, anonymousMode); } return string.Empty; } private static string NormalizeWebSocketUrl(string wsUrl, bool anonymousMode) { if (!anonymousMode) { return wsUrl; } if (wsUrl.Contains("/ws/device/websocket", StringComparison.OrdinalIgnoreCase)) { return wsUrl.Replace("/ws/device/websocket", "/websocket/scada-sync", StringComparison.OrdinalIgnoreCase); } return wsUrl; } }