Files
qhmes/yy-admin-master/YY.Admin.Services/Service/Jeecg/JeecgBackendGateway.cs

353 lines
13 KiB
C#

using Microsoft.Extensions.Configuration;
using System.Net.Http;
using System.Net.WebSockets;
using System.Text;
using System.IO;
namespace YY.Admin.Services.Service.Jeecg;
/// <summary>
/// Jeecg 后端网关实现。
/// </summary>
public class JeecgBackendGateway : IJeecgBackendGateway, ISingletonDependency
{
private readonly IConfiguration _configuration;
private readonly HttpClient _httpClient;
private readonly ILoggerService _logger;
private readonly SemaphoreSlim _wsSendLock = new(1, 1);
private readonly object _wsStateLock = new();
private ClientWebSocket? _activeWebSocket;
public JeecgBackendGateway(
IConfiguration configuration,
HttpClient httpClient,
ILoggerService logger)
{
_configuration = configuration;
_httpClient = httpClient;
_logger = logger;
}
public async Task<HttpResponseMessage> ExecuteGetAsync(
string relativeOrAbsoluteUrl,
Dictionary<string, string>? headers = null,
CancellationToken cancellationToken = default)
{
var requestUrl = BuildUrl(relativeOrAbsoluteUrl);
using var req = new HttpRequestMessage(HttpMethod.Get, requestUrl);
if (headers != null)
{
foreach (var kv in headers)
{
req.Headers.TryAddWithoutValidation(kv.Key, kv.Value);
}
}
return await _httpClient.SendAsync(req, cancellationToken);
}
public async Task<string?> ExecuteGetStringAsync(
string relativeOrAbsoluteUrl,
Dictionary<string, string>? headers = null,
CancellationToken cancellationToken = default)
{
using var resp = await ExecuteGetAsync(relativeOrAbsoluteUrl, headers, cancellationToken);
if (!resp.IsSuccessStatusCode)
{
return null;
}
return await resp.Content.ReadAsStringAsync(cancellationToken);
}
public async Task RunWebSocketLoopAsync(
Func<string, Task> onMessage,
CancellationToken cancellationToken)
{
var wsUrl = ResolveWebSocketUrl();
_logger.Information($"Jeecg WebSocket 解析地址: {wsUrl}");
if (string.IsNullOrWhiteSpace(wsUrl))
{
_logger.Warning("Jeecg WebSocket 未启动:解析地址为空");
return;
}
var backoffSeconds = 5;
var buffer = new byte[8192];
while (!cancellationToken.IsCancellationRequested)
{
using var ws = new ClientWebSocket();
try
{
ws.Options.KeepAliveInterval = TimeSpan.FromSeconds(10);
await ws.ConnectAsync(new Uri(wsUrl), cancellationToken);
lock (_wsStateLock)
{
_activeWebSocket = ws;
}
backoffSeconds = 5;
_logger.Information($"Jeecg WebSocket 已连接: {wsUrl}");
using var heartbeatCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var lastReceiveTicks = DateTime.UtcNow.Ticks;
_ = Task.Run(() => HeartbeatLoopAsync(ws, heartbeatCts.Token), heartbeatCts.Token);
var inactivitySeconds = _configuration.GetValue("JeecgIntegration:WebSocketInactivityReconnectSeconds", 0);
CancellationTokenSource? inactivityCts = null;
if (inactivitySeconds > 0)
{
inactivityCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_ = Task.Run(() => InactivityReconnectLoopAsync(ws, () => Interlocked.Read(ref lastReceiveTicks), inactivityCts.Token), inactivityCts.Token);
}
while (ws.State == WebSocketState.Open && !cancellationToken.IsCancellationRequested)
{
using var ms = new MemoryStream();
WebSocketReceiveResult result;
do
{
var seg = new ArraySegment<byte>(buffer);
result = await ws.ReceiveAsync(seg, cancellationToken);
_logger.Information($"Jeecg WebSocket 收帧: type={result.MessageType}, count={result.Count}, end={result.EndOfMessage}, state={ws.State}");
if (result.MessageType == WebSocketMessageType.Close)
{
_logger.Warning($"Jeecg WebSocket 收到关闭帧: closeStatus={result.CloseStatus}, desc={result.CloseStatusDescription}");
break;
}
ms.Write(buffer, 0, result.Count);
} while (!result.EndOfMessage);
if (result.MessageType == WebSocketMessageType.Close)
{
_logger.Warning("Jeecg WebSocket 接收循环检测到关闭帧,准备重连");
break;
}
var payload = Encoding.UTF8.GetString(ms.ToArray());
Interlocked.Exchange(ref lastReceiveTicks, DateTime.UtcNow.Ticks);
_logger.Information($"Jeecg WebSocket 收到原始消息: {payload}");
await onMessage(payload);
}
heartbeatCts.Cancel();
inactivityCts?.Cancel();
_logger.Warning($"Jeecg WebSocket 接收循环退出,当前状态={ws.State}");
}
catch (OperationCanceledException)
{
_logger.Warning("Jeecg WebSocket 接收循环取消");
break;
}
catch (Exception ex)
{
_logger.Warning($"Jeecg WebSocket 断开,{backoffSeconds} 秒后重连。地址: {wsUrl},异常: {ex.Message}");
}
finally
{
lock (_wsStateLock)
{
if (ReferenceEquals(_activeWebSocket, ws))
{
_activeWebSocket = null;
}
}
}
try
{
await Task.Delay(TimeSpan.FromSeconds(Math.Min(backoffSeconds, 120)), cancellationToken);
}
catch (OperationCanceledException)
{
break;
}
backoffSeconds = Math.Min(backoffSeconds * 2, 120);
}
}
private async Task HeartbeatLoopAsync(ClientWebSocket ws, CancellationToken cancellationToken)
{
var payload = Encoding.UTF8.GetBytes("ping");
while (!cancellationToken.IsCancellationRequested && ws.State == WebSocketState.Open)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(12), cancellationToken);
if (ws.State != WebSocketState.Open)
{
break;
}
await ws.SendAsync(new ArraySegment<byte>(payload), WebSocketMessageType.Text, true, cancellationToken);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.Warning($"Jeecg WebSocket 心跳发送失败: {ex.Message}");
break;
}
}
}
private async Task InactivityReconnectLoopAsync(ClientWebSocket ws, Func<long> getLastReceiveTicks, CancellationToken cancellationToken)
{
var inactivitySeconds = _configuration.GetValue("JeecgIntegration:WebSocketInactivityReconnectSeconds", 0);
if (inactivitySeconds <= 0)
{
return;
}
while (!cancellationToken.IsCancellationRequested && ws.State == WebSocketState.Open)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken);
if (ws.State != WebSocketState.Open)
{
break;
}
var lastReceiveUtc = new DateTime(getLastReceiveTicks(), DateTimeKind.Utc);
var idleSeconds = (DateTime.UtcNow - lastReceiveUtc).TotalSeconds;
if (idleSeconds < inactivitySeconds)
{
continue;
}
_logger.Warning($"Jeecg WebSocket 超过 {Math.Round(idleSeconds)} 秒未收到任何消息,主动重连");
ws.Abort();
break;
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.Warning($"Jeecg WebSocket 空闲检测异常: {ex.Message}");
break;
}
}
}
public async Task<bool> SendWebSocketMessageAsync(string message, CancellationToken cancellationToken = default)
{
ClientWebSocket? socket;
lock (_wsStateLock)
{
socket = _activeWebSocket;
}
if (socket == null || socket.State != WebSocketState.Open)
{
return false;
}
var bytes = Encoding.UTF8.GetBytes(message);
var seg = new ArraySegment<byte>(bytes);
await _wsSendLock.WaitAsync(cancellationToken);
try
{
await socket.SendAsync(seg, WebSocketMessageType.Text, true, cancellationToken);
return true;
}
catch
{
return false;
}
finally
{
_wsSendLock.Release();
}
}
public async Task<bool> SendWebSocketOneShotAsync(string message, CancellationToken cancellationToken = default)
{
var wsUrl = ResolveWebSocketUrl();
if (string.IsNullOrWhiteSpace(wsUrl))
{
return false;
}
using var ws = new ClientWebSocket();
try
{
await ws.ConnectAsync(new Uri(wsUrl), cancellationToken);
var bytes = Encoding.UTF8.GetBytes(message);
var seg = new ArraySegment<byte>(bytes);
await ws.SendAsync(seg, WebSocketMessageType.Text, true, cancellationToken);
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "done", cancellationToken);
return true;
}
catch (Exception ex)
{
_logger.Warning($"Jeecg WebSocket 单次上报失败: {ex.Message}");
return false;
}
}
private string BuildUrl(string relativeOrAbsoluteUrl)
{
if (Uri.TryCreate(relativeOrAbsoluteUrl, UriKind.Absolute, out _))
{
return relativeOrAbsoluteUrl;
}
var baseUrl = _configuration.GetValue<string>("JeecgIntegration:BaseUrl")?.TrimEnd('/');
if (string.IsNullOrWhiteSpace(baseUrl))
{
return relativeOrAbsoluteUrl;
}
var path = relativeOrAbsoluteUrl.StartsWith("/") ? relativeOrAbsoluteUrl : "/" + relativeOrAbsoluteUrl;
return $"{baseUrl}{path}";
}
private string ResolveWebSocketUrl()
{
var anonymousMode = _configuration.GetValue("JeecgIntegration:AnonymousMode", true);
var configUrl = _configuration.GetValue<string>("JeecgIntegration:WebSocketUrl");
if (!string.IsNullOrWhiteSpace(configUrl))
{
return NormalizeWebSocketUrl(configUrl.Trim(), anonymousMode);
}
var baseUrl = _configuration.GetValue<string>("JeecgIntegration:BaseUrl")?.TrimEnd('/');
if (string.IsNullOrWhiteSpace(baseUrl))
{
return string.Empty;
}
var wsPath = _configuration.GetValue<string>("JeecgIntegration:WebSocketPath");
if (string.IsNullOrWhiteSpace(wsPath))
{
wsPath = "/websocket/scada-sync";
}
else if (!wsPath.StartsWith("/"))
{
wsPath = "/" + wsPath;
}
// 默认从 BaseUrl + WebSocketPath 推导地址,避免只连到根路径导致握手失败
if (baseUrl.StartsWith("https://", StringComparison.OrdinalIgnoreCase))
{
return NormalizeWebSocketUrl("wss://" + baseUrl["https://".Length..] + wsPath, anonymousMode);
}
if (baseUrl.StartsWith("http://", StringComparison.OrdinalIgnoreCase))
{
return NormalizeWebSocketUrl("ws://" + baseUrl["http://".Length..] + wsPath, anonymousMode);
}
return string.Empty;
}
private static string NormalizeWebSocketUrl(string wsUrl, bool anonymousMode)
{
if (!anonymousMode)
{
return wsUrl;
}
if (wsUrl.Contains("/ws/device/websocket", StringComparison.OrdinalIgnoreCase))
{
return wsUrl.Replace("/ws/device/websocket", "/websocket/scada-sync", StringComparison.OrdinalIgnoreCase);
}
return wsUrl;
}
}