更新项目配置,新增设备同步模块,优化WebSocket和Swagger配置,增强SCADA系统的免登录接口,支持数据字典项和登录日志的免登录查询与记录。调整Java编译设置,确保更好的开发体验。
This commit is contained in:
232
yy-admin-master/YY.Admin/Infrastructure/Sync/OutboxProcessor.cs
Normal file
232
yy-admin-master/YY.Admin/Infrastructure/Sync/OutboxProcessor.cs
Normal file
@@ -0,0 +1,232 @@
|
||||
using SqlSugar;
|
||||
using System.Threading.Channels;
|
||||
using Prism.Events;
|
||||
using YY.Admin.Core.Events;
|
||||
using YY.Admin.Core.Models;
|
||||
using YY.Admin.Core.Services;
|
||||
using YY.Admin.Core.Sync;
|
||||
|
||||
namespace YY.Admin.Infrastructure.Sync;
|
||||
|
||||
public class OutboxProcessor
|
||||
{
|
||||
private readonly INetworkMonitor _networkMonitor;
|
||||
private readonly HttpSyncClient _httpSyncClient;
|
||||
private readonly IJeecgUserMirrorPullHandler _mirrorPullHandler;
|
||||
private readonly ISqlSugarClient _db;
|
||||
private readonly IEventAggregator _eventAggregator;
|
||||
private readonly Channel<OutboxMessage> _channel = Channel.CreateBounded<OutboxMessage>(new BoundedChannelOptions(1000)
|
||||
{
|
||||
SingleReader = true,
|
||||
SingleWriter = false,
|
||||
FullMode = BoundedChannelFullMode.Wait
|
||||
});
|
||||
private readonly SemaphoreSlim _flushLock = new(1, 1);
|
||||
|
||||
public OutboxProcessor(
|
||||
INetworkMonitor networkMonitor,
|
||||
HttpSyncClient httpSyncClient,
|
||||
IJeecgUserMirrorPullHandler mirrorPullHandler,
|
||||
ISqlSugarClient db,
|
||||
IEventAggregator eventAggregator)
|
||||
{
|
||||
_networkMonitor = networkMonitor;
|
||||
_httpSyncClient = httpSyncClient;
|
||||
_mirrorPullHandler = mirrorPullHandler;
|
||||
_db = db.AsTenant().GetConnectionScope("Slave");
|
||||
_eventAggregator = eventAggregator;
|
||||
_networkMonitor.StatusChanged += OnNetworkStatusChanged;
|
||||
}
|
||||
|
||||
private static bool IsJeecgUserMirrorMessage(OutboxMessage m) =>
|
||||
string.Equals(m.AggregateType, JeecgUserMirrorOutbox.AggregateType, StringComparison.OrdinalIgnoreCase);
|
||||
|
||||
public async Task EnqueueAsync<T>(
|
||||
string aggregateType,
|
||||
string aggregateId,
|
||||
string eventType,
|
||||
T payload,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var now = DateTime.UtcNow;
|
||||
var message = new OutboxMessage
|
||||
{
|
||||
AggregateType = aggregateType,
|
||||
AggregateId = aggregateId,
|
||||
EventType = eventType,
|
||||
Payload = System.Text.Json.JsonSerializer.Serialize(payload),
|
||||
Status = 0,
|
||||
RetryCount = 0,
|
||||
CreatedAt = now
|
||||
};
|
||||
await EnsureTableAsync(cancellationToken).ConfigureAwait(false);
|
||||
await _db.Insertable(message).ExecuteCommandAsync(cancellationToken).ConfigureAwait(false);
|
||||
if (_networkMonitor.IsOnline)
|
||||
{
|
||||
await _channel.Writer.WriteAsync(message, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task StartConsumerAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await EnsureTableAsync(cancellationToken).ConfigureAwait(false);
|
||||
_ = Task.Run(() => ConsumeLoopAsync(cancellationToken), cancellationToken);
|
||||
}
|
||||
|
||||
public async Task FlushPendingAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (!await _flushLock.WaitAsync(0, cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
return;
|
||||
}
|
||||
try
|
||||
{
|
||||
var pending = await _db.Queryable<OutboxMessage>()
|
||||
.Where(x => x.Status == 0 && x.RetryCount < 5)
|
||||
.OrderBy(x => x.CreatedAt)
|
||||
.ToListAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
if (pending.Count == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var mirror = pending.Where(IsJeecgUserMirrorMessage).ToList();
|
||||
var serverBatch = pending.Where(m => !IsJeecgUserMirrorMessage(m)).ToList();
|
||||
|
||||
foreach (var item in mirror)
|
||||
{
|
||||
var ok = await _mirrorPullHandler.ExecutePullAsync(cancellationToken).ConfigureAwait(false);
|
||||
if (ok)
|
||||
{
|
||||
await MarkSentAsync(item, cancellationToken).ConfigureAwait(false);
|
||||
_eventAggregator.GetEvent<SyncCompletedEvent>().Publish(item.AggregateId);
|
||||
}
|
||||
else
|
||||
{
|
||||
await MarkFailedAsync(item, "Jeecg用户镜像拉取失败", cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
if (serverBatch.Count == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var success = await _httpSyncClient.SendBatchAsync(serverBatch, cancellationToken).ConfigureAwait(false);
|
||||
if (success)
|
||||
{
|
||||
var ids = serverBatch.Select(x => x.Id).ToArray();
|
||||
await _db.Updateable<OutboxMessage>()
|
||||
.SetColumns(x => new OutboxMessage
|
||||
{
|
||||
Status = 1,
|
||||
SentAt = DateTime.UtcNow,
|
||||
LastTriedAt = DateTime.UtcNow,
|
||||
ErrorMessage = null
|
||||
})
|
||||
.Where(x => ids.Contains(x.Id))
|
||||
.ExecuteCommandAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
foreach (var item in serverBatch)
|
||||
{
|
||||
_eventAggregator.GetEvent<SyncCompletedEvent>().Publish(item.AggregateId);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
foreach (var item in serverBatch)
|
||||
{
|
||||
await MarkFailedAsync(item, "批量同步失败", cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_flushLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ConsumeLoopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
while (await _channel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
while (_channel.Reader.TryRead(out var message))
|
||||
{
|
||||
var success = IsJeecgUserMirrorMessage(message)
|
||||
? await _mirrorPullHandler.ExecutePullAsync(cancellationToken).ConfigureAwait(false)
|
||||
: await _httpSyncClient.SendBatchAsync(new[] { message }, cancellationToken).ConfigureAwait(false);
|
||||
if (success)
|
||||
{
|
||||
await MarkSentAsync(message, cancellationToken).ConfigureAwait(false);
|
||||
_eventAggregator.GetEvent<SyncCompletedEvent>().Publish(message.AggregateId);
|
||||
}
|
||||
else
|
||||
{
|
||||
await MarkFailedAsync(message, "实时同步失败", cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task MarkSentAsync(OutboxMessage message, CancellationToken cancellationToken)
|
||||
{
|
||||
await _db.Updateable<OutboxMessage>()
|
||||
.SetColumns(x => new OutboxMessage
|
||||
{
|
||||
Status = 1,
|
||||
SentAt = DateTime.UtcNow,
|
||||
LastTriedAt = DateTime.UtcNow,
|
||||
ErrorMessage = null
|
||||
})
|
||||
.Where(x => x.Id == message.Id)
|
||||
.ExecuteCommandAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task MarkFailedAsync(OutboxMessage message, string error, CancellationToken cancellationToken)
|
||||
{
|
||||
var nextRetry = message.RetryCount + 1;
|
||||
var backoff = (int)Math.Pow(2, Math.Min(nextRetry, 5));
|
||||
await _db.Updateable<OutboxMessage>()
|
||||
.SetColumns(x => new OutboxMessage
|
||||
{
|
||||
RetryCount = nextRetry,
|
||||
Status = nextRetry >= 5 ? 2 : 0,
|
||||
ErrorMessage = error,
|
||||
LastTriedAt = DateTime.UtcNow
|
||||
})
|
||||
.Where(x => x.Id == message.Id)
|
||||
.ExecuteCommandAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (nextRetry < 5)
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromSeconds(backoff), cancellationToken).ConfigureAwait(false);
|
||||
if (_networkMonitor.IsOnline)
|
||||
{
|
||||
var retryMessage = await _db.Queryable<OutboxMessage>()
|
||||
.FirstAsync(x => x.Id == message.Id, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
if (retryMessage != null && retryMessage.Status == 0)
|
||||
{
|
||||
await _channel.Writer.WriteAsync(retryMessage, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task EnsureTableAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_ = cancellationToken;
|
||||
await Task.Run(() => _db.CodeFirst.InitTables<OutboxMessage>(), cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private void OnNetworkStatusChanged(bool isOnline)
|
||||
{
|
||||
if (!isOnline)
|
||||
{
|
||||
return;
|
||||
}
|
||||
_ = FlushPendingAsync(default);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user