353 lines
13 KiB
C#
353 lines
13 KiB
C#
using Microsoft.Extensions.Configuration;
|
|
using System.Net.Http;
|
|
using System.Net.WebSockets;
|
|
using System.Text;
|
|
using System.IO;
|
|
|
|
namespace YY.Admin.Services.Service.Jeecg;
|
|
|
|
/// <summary>
|
|
/// Jeecg 后端网关实现。
|
|
/// </summary>
|
|
public class JeecgBackendGateway : IJeecgBackendGateway, ISingletonDependency
|
|
{
|
|
private readonly IConfiguration _configuration;
|
|
private readonly HttpClient _httpClient;
|
|
private readonly ILoggerService _logger;
|
|
private readonly SemaphoreSlim _wsSendLock = new(1, 1);
|
|
private readonly object _wsStateLock = new();
|
|
private ClientWebSocket? _activeWebSocket;
|
|
|
|
public JeecgBackendGateway(
|
|
IConfiguration configuration,
|
|
HttpClient httpClient,
|
|
ILoggerService logger)
|
|
{
|
|
_configuration = configuration;
|
|
_httpClient = httpClient;
|
|
_logger = logger;
|
|
}
|
|
|
|
public async Task<HttpResponseMessage> ExecuteGetAsync(
|
|
string relativeOrAbsoluteUrl,
|
|
Dictionary<string, string>? headers = null,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var requestUrl = BuildUrl(relativeOrAbsoluteUrl);
|
|
using var req = new HttpRequestMessage(HttpMethod.Get, requestUrl);
|
|
if (headers != null)
|
|
{
|
|
foreach (var kv in headers)
|
|
{
|
|
req.Headers.TryAddWithoutValidation(kv.Key, kv.Value);
|
|
}
|
|
}
|
|
|
|
return await _httpClient.SendAsync(req, cancellationToken);
|
|
}
|
|
|
|
public async Task<string?> ExecuteGetStringAsync(
|
|
string relativeOrAbsoluteUrl,
|
|
Dictionary<string, string>? headers = null,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
using var resp = await ExecuteGetAsync(relativeOrAbsoluteUrl, headers, cancellationToken);
|
|
if (!resp.IsSuccessStatusCode)
|
|
{
|
|
return null;
|
|
}
|
|
return await resp.Content.ReadAsStringAsync(cancellationToken);
|
|
}
|
|
|
|
public async Task RunWebSocketLoopAsync(
|
|
Func<string, Task> onMessage,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
var wsUrl = ResolveWebSocketUrl();
|
|
_logger.Information($"Jeecg WebSocket 解析地址: {wsUrl}");
|
|
if (string.IsNullOrWhiteSpace(wsUrl))
|
|
{
|
|
_logger.Warning("Jeecg WebSocket 未启动:解析地址为空");
|
|
return;
|
|
}
|
|
|
|
var backoffSeconds = 5;
|
|
var buffer = new byte[8192];
|
|
|
|
while (!cancellationToken.IsCancellationRequested)
|
|
{
|
|
using var ws = new ClientWebSocket();
|
|
try
|
|
{
|
|
ws.Options.KeepAliveInterval = TimeSpan.FromSeconds(10);
|
|
await ws.ConnectAsync(new Uri(wsUrl), cancellationToken);
|
|
lock (_wsStateLock)
|
|
{
|
|
_activeWebSocket = ws;
|
|
}
|
|
backoffSeconds = 5;
|
|
_logger.Information($"Jeecg WebSocket 已连接: {wsUrl}");
|
|
using var heartbeatCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
|
var lastReceiveTicks = DateTime.UtcNow.Ticks;
|
|
_ = Task.Run(() => HeartbeatLoopAsync(ws, heartbeatCts.Token), heartbeatCts.Token);
|
|
var inactivitySeconds = _configuration.GetValue("JeecgIntegration:WebSocketInactivityReconnectSeconds", 0);
|
|
CancellationTokenSource? inactivityCts = null;
|
|
if (inactivitySeconds > 0)
|
|
{
|
|
inactivityCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
|
_ = Task.Run(() => InactivityReconnectLoopAsync(ws, () => Interlocked.Read(ref lastReceiveTicks), inactivityCts.Token), inactivityCts.Token);
|
|
}
|
|
|
|
while (ws.State == WebSocketState.Open && !cancellationToken.IsCancellationRequested)
|
|
{
|
|
using var ms = new MemoryStream();
|
|
WebSocketReceiveResult result;
|
|
do
|
|
{
|
|
var seg = new ArraySegment<byte>(buffer);
|
|
result = await ws.ReceiveAsync(seg, cancellationToken);
|
|
_logger.Information($"Jeecg WebSocket 收帧: type={result.MessageType}, count={result.Count}, end={result.EndOfMessage}, state={ws.State}");
|
|
if (result.MessageType == WebSocketMessageType.Close)
|
|
{
|
|
_logger.Warning($"Jeecg WebSocket 收到关闭帧: closeStatus={result.CloseStatus}, desc={result.CloseStatusDescription}");
|
|
break;
|
|
}
|
|
ms.Write(buffer, 0, result.Count);
|
|
} while (!result.EndOfMessage);
|
|
|
|
if (result.MessageType == WebSocketMessageType.Close)
|
|
{
|
|
_logger.Warning("Jeecg WebSocket 接收循环检测到关闭帧,准备重连");
|
|
break;
|
|
}
|
|
|
|
var payload = Encoding.UTF8.GetString(ms.ToArray());
|
|
Interlocked.Exchange(ref lastReceiveTicks, DateTime.UtcNow.Ticks);
|
|
_logger.Information($"Jeecg WebSocket 收到原始消息: {payload}");
|
|
await onMessage(payload);
|
|
}
|
|
heartbeatCts.Cancel();
|
|
inactivityCts?.Cancel();
|
|
_logger.Warning($"Jeecg WebSocket 接收循环退出,当前状态={ws.State}");
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
_logger.Warning("Jeecg WebSocket 接收循环取消");
|
|
break;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.Warning($"Jeecg WebSocket 断开,{backoffSeconds} 秒后重连。地址: {wsUrl},异常: {ex.Message}");
|
|
}
|
|
finally
|
|
{
|
|
lock (_wsStateLock)
|
|
{
|
|
if (ReferenceEquals(_activeWebSocket, ws))
|
|
{
|
|
_activeWebSocket = null;
|
|
}
|
|
}
|
|
}
|
|
|
|
try
|
|
{
|
|
await Task.Delay(TimeSpan.FromSeconds(Math.Min(backoffSeconds, 120)), cancellationToken);
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
break;
|
|
}
|
|
|
|
backoffSeconds = Math.Min(backoffSeconds * 2, 120);
|
|
}
|
|
}
|
|
|
|
private async Task HeartbeatLoopAsync(ClientWebSocket ws, CancellationToken cancellationToken)
|
|
{
|
|
var payload = Encoding.UTF8.GetBytes("ping");
|
|
while (!cancellationToken.IsCancellationRequested && ws.State == WebSocketState.Open)
|
|
{
|
|
try
|
|
{
|
|
await Task.Delay(TimeSpan.FromSeconds(12), cancellationToken);
|
|
if (ws.State != WebSocketState.Open)
|
|
{
|
|
break;
|
|
}
|
|
await ws.SendAsync(new ArraySegment<byte>(payload), WebSocketMessageType.Text, true, cancellationToken);
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
break;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.Warning($"Jeecg WebSocket 心跳发送失败: {ex.Message}");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task InactivityReconnectLoopAsync(ClientWebSocket ws, Func<long> getLastReceiveTicks, CancellationToken cancellationToken)
|
|
{
|
|
var inactivitySeconds = _configuration.GetValue("JeecgIntegration:WebSocketInactivityReconnectSeconds", 0);
|
|
if (inactivitySeconds <= 0)
|
|
{
|
|
return;
|
|
}
|
|
while (!cancellationToken.IsCancellationRequested && ws.State == WebSocketState.Open)
|
|
{
|
|
try
|
|
{
|
|
await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken);
|
|
if (ws.State != WebSocketState.Open)
|
|
{
|
|
break;
|
|
}
|
|
|
|
var lastReceiveUtc = new DateTime(getLastReceiveTicks(), DateTimeKind.Utc);
|
|
var idleSeconds = (DateTime.UtcNow - lastReceiveUtc).TotalSeconds;
|
|
if (idleSeconds < inactivitySeconds)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
_logger.Warning($"Jeecg WebSocket 超过 {Math.Round(idleSeconds)} 秒未收到任何消息,主动重连");
|
|
ws.Abort();
|
|
break;
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
break;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.Warning($"Jeecg WebSocket 空闲检测异常: {ex.Message}");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
public async Task<bool> SendWebSocketMessageAsync(string message, CancellationToken cancellationToken = default)
|
|
{
|
|
ClientWebSocket? socket;
|
|
lock (_wsStateLock)
|
|
{
|
|
socket = _activeWebSocket;
|
|
}
|
|
if (socket == null || socket.State != WebSocketState.Open)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
var bytes = Encoding.UTF8.GetBytes(message);
|
|
var seg = new ArraySegment<byte>(bytes);
|
|
await _wsSendLock.WaitAsync(cancellationToken);
|
|
try
|
|
{
|
|
await socket.SendAsync(seg, WebSocketMessageType.Text, true, cancellationToken);
|
|
return true;
|
|
}
|
|
catch
|
|
{
|
|
return false;
|
|
}
|
|
finally
|
|
{
|
|
_wsSendLock.Release();
|
|
}
|
|
}
|
|
|
|
public async Task<bool> SendWebSocketOneShotAsync(string message, CancellationToken cancellationToken = default)
|
|
{
|
|
var wsUrl = ResolveWebSocketUrl();
|
|
if (string.IsNullOrWhiteSpace(wsUrl))
|
|
{
|
|
return false;
|
|
}
|
|
|
|
using var ws = new ClientWebSocket();
|
|
try
|
|
{
|
|
await ws.ConnectAsync(new Uri(wsUrl), cancellationToken);
|
|
var bytes = Encoding.UTF8.GetBytes(message);
|
|
var seg = new ArraySegment<byte>(bytes);
|
|
await ws.SendAsync(seg, WebSocketMessageType.Text, true, cancellationToken);
|
|
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "done", cancellationToken);
|
|
return true;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.Warning($"Jeecg WebSocket 单次上报失败: {ex.Message}");
|
|
return false;
|
|
}
|
|
}
|
|
|
|
private string BuildUrl(string relativeOrAbsoluteUrl)
|
|
{
|
|
if (Uri.TryCreate(relativeOrAbsoluteUrl, UriKind.Absolute, out _))
|
|
{
|
|
return relativeOrAbsoluteUrl;
|
|
}
|
|
|
|
var baseUrl = _configuration.GetValue<string>("JeecgIntegration:BaseUrl")?.TrimEnd('/');
|
|
if (string.IsNullOrWhiteSpace(baseUrl))
|
|
{
|
|
return relativeOrAbsoluteUrl;
|
|
}
|
|
var path = relativeOrAbsoluteUrl.StartsWith("/") ? relativeOrAbsoluteUrl : "/" + relativeOrAbsoluteUrl;
|
|
return $"{baseUrl}{path}";
|
|
}
|
|
|
|
private string ResolveWebSocketUrl()
|
|
{
|
|
var anonymousMode = _configuration.GetValue("JeecgIntegration:AnonymousMode", true);
|
|
var configUrl = _configuration.GetValue<string>("JeecgIntegration:WebSocketUrl");
|
|
if (!string.IsNullOrWhiteSpace(configUrl))
|
|
{
|
|
return NormalizeWebSocketUrl(configUrl.Trim(), anonymousMode);
|
|
}
|
|
|
|
var baseUrl = _configuration.GetValue<string>("JeecgIntegration:BaseUrl")?.TrimEnd('/');
|
|
if (string.IsNullOrWhiteSpace(baseUrl))
|
|
{
|
|
return string.Empty;
|
|
}
|
|
|
|
var wsPath = _configuration.GetValue<string>("JeecgIntegration:WebSocketPath");
|
|
if (string.IsNullOrWhiteSpace(wsPath))
|
|
{
|
|
wsPath = "/websocket/scada-sync";
|
|
}
|
|
else if (!wsPath.StartsWith("/"))
|
|
{
|
|
wsPath = "/" + wsPath;
|
|
}
|
|
|
|
// 默认从 BaseUrl + WebSocketPath 推导地址,避免只连到根路径导致握手失败
|
|
if (baseUrl.StartsWith("https://", StringComparison.OrdinalIgnoreCase))
|
|
{
|
|
return NormalizeWebSocketUrl("wss://" + baseUrl["https://".Length..] + wsPath, anonymousMode);
|
|
}
|
|
if (baseUrl.StartsWith("http://", StringComparison.OrdinalIgnoreCase))
|
|
{
|
|
return NormalizeWebSocketUrl("ws://" + baseUrl["http://".Length..] + wsPath, anonymousMode);
|
|
}
|
|
return string.Empty;
|
|
}
|
|
|
|
private static string NormalizeWebSocketUrl(string wsUrl, bool anonymousMode)
|
|
{
|
|
if (!anonymousMode)
|
|
{
|
|
return wsUrl;
|
|
}
|
|
if (wsUrl.Contains("/ws/device/websocket", StringComparison.OrdinalIgnoreCase))
|
|
{
|
|
return wsUrl.Replace("/ws/device/websocket", "/websocket/scada-sync", StringComparison.OrdinalIgnoreCase);
|
|
}
|
|
return wsUrl;
|
|
}
|
|
}
|