using SqlSugar; using System.Threading.Channels; using Prism.Events; using YY.Admin.Core.Events; using YY.Admin.Core.Models; using YY.Admin.Core.Services; using YY.Admin.Core.Sync; namespace YY.Admin.Infrastructure.Sync; public class OutboxProcessor { private readonly INetworkMonitor _networkMonitor; private readonly HttpSyncClient _httpSyncClient; private readonly IJeecgUserMirrorPullHandler _mirrorPullHandler; private readonly ISqlSugarClient _db; private readonly IEventAggregator _eventAggregator; private readonly Channel _channel = Channel.CreateBounded(new BoundedChannelOptions(1000) { SingleReader = true, SingleWriter = false, FullMode = BoundedChannelFullMode.Wait }); private readonly SemaphoreSlim _flushLock = new(1, 1); public OutboxProcessor( INetworkMonitor networkMonitor, HttpSyncClient httpSyncClient, IJeecgUserMirrorPullHandler mirrorPullHandler, ISqlSugarClient db, IEventAggregator eventAggregator) { _networkMonitor = networkMonitor; _httpSyncClient = httpSyncClient; _mirrorPullHandler = mirrorPullHandler; _db = db.AsTenant().GetConnectionScope("Slave"); _eventAggregator = eventAggregator; _networkMonitor.StatusChanged += OnNetworkStatusChanged; } private static bool IsJeecgUserMirrorMessage(OutboxMessage m) => string.Equals(m.AggregateType, JeecgUserMirrorOutbox.AggregateType, StringComparison.OrdinalIgnoreCase); public async Task EnqueueAsync( string aggregateType, string aggregateId, string eventType, T payload, CancellationToken cancellationToken) { var now = DateTime.UtcNow; var message = new OutboxMessage { AggregateType = aggregateType, AggregateId = aggregateId, EventType = eventType, Payload = System.Text.Json.JsonSerializer.Serialize(payload), Status = 0, RetryCount = 0, CreatedAt = now }; await EnsureTableAsync(cancellationToken).ConfigureAwait(false); await _db.Insertable(message).ExecuteCommandAsync(cancellationToken).ConfigureAwait(false); if (_networkMonitor.IsOnline) { await _channel.Writer.WriteAsync(message, cancellationToken).ConfigureAwait(false); } } public async Task StartConsumerAsync(CancellationToken cancellationToken) { await EnsureTableAsync(cancellationToken).ConfigureAwait(false); _ = Task.Run(() => ConsumeLoopAsync(cancellationToken), cancellationToken); } public async Task FlushPendingAsync(CancellationToken cancellationToken) { if (!await _flushLock.WaitAsync(0, cancellationToken).ConfigureAwait(false)) { return; } try { var pending = await _db.Queryable() .Where(x => x.Status == 0 && x.RetryCount < 5) .OrderBy(x => x.CreatedAt) .ToListAsync(cancellationToken) .ConfigureAwait(false); if (pending.Count == 0) { return; } var mirror = pending.Where(IsJeecgUserMirrorMessage).ToList(); var serverBatch = pending.Where(m => !IsJeecgUserMirrorMessage(m)).ToList(); foreach (var item in mirror) { var ok = await _mirrorPullHandler.ExecutePullAsync(cancellationToken).ConfigureAwait(false); if (ok) { await MarkSentAsync(item, cancellationToken).ConfigureAwait(false); _eventAggregator.GetEvent().Publish(item.AggregateId); } else { await MarkFailedAsync(item, "Jeecg用户镜像拉取失败", cancellationToken).ConfigureAwait(false); } } if (serverBatch.Count == 0) { return; } var success = await _httpSyncClient.SendBatchAsync(serverBatch, cancellationToken).ConfigureAwait(false); if (success) { var ids = serverBatch.Select(x => x.Id).ToArray(); await _db.Updateable() .SetColumns(x => new OutboxMessage { Status = 1, SentAt = DateTime.UtcNow, LastTriedAt = DateTime.UtcNow, ErrorMessage = null }) .Where(x => ids.Contains(x.Id)) .ExecuteCommandAsync(cancellationToken) .ConfigureAwait(false); foreach (var item in serverBatch) { _eventAggregator.GetEvent().Publish(item.AggregateId); } return; } foreach (var item in serverBatch) { await MarkFailedAsync(item, "批量同步失败", cancellationToken).ConfigureAwait(false); } } finally { _flushLock.Release(); } } private async Task ConsumeLoopAsync(CancellationToken cancellationToken) { while (await _channel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) { while (_channel.Reader.TryRead(out var message)) { var success = IsJeecgUserMirrorMessage(message) ? await _mirrorPullHandler.ExecutePullAsync(cancellationToken).ConfigureAwait(false) : await _httpSyncClient.SendBatchAsync(new[] { message }, cancellationToken).ConfigureAwait(false); if (success) { await MarkSentAsync(message, cancellationToken).ConfigureAwait(false); _eventAggregator.GetEvent().Publish(message.AggregateId); } else { await MarkFailedAsync(message, "实时同步失败", cancellationToken).ConfigureAwait(false); } } } } private async Task MarkSentAsync(OutboxMessage message, CancellationToken cancellationToken) { await _db.Updateable() .SetColumns(x => new OutboxMessage { Status = 1, SentAt = DateTime.UtcNow, LastTriedAt = DateTime.UtcNow, ErrorMessage = null }) .Where(x => x.Id == message.Id) .ExecuteCommandAsync(cancellationToken) .ConfigureAwait(false); } private async Task MarkFailedAsync(OutboxMessage message, string error, CancellationToken cancellationToken) { var nextRetry = message.RetryCount + 1; var backoff = (int)Math.Pow(2, Math.Min(nextRetry, 5)); await _db.Updateable() .SetColumns(x => new OutboxMessage { RetryCount = nextRetry, Status = nextRetry >= 5 ? 2 : 0, ErrorMessage = error, LastTriedAt = DateTime.UtcNow }) .Where(x => x.Id == message.Id) .ExecuteCommandAsync(cancellationToken) .ConfigureAwait(false); if (nextRetry < 5) { await Task.Delay(TimeSpan.FromSeconds(backoff), cancellationToken).ConfigureAwait(false); if (_networkMonitor.IsOnline) { var retryMessage = await _db.Queryable() .FirstAsync(x => x.Id == message.Id, cancellationToken) .ConfigureAwait(false); if (retryMessage != null && retryMessage.Status == 0) { await _channel.Writer.WriteAsync(retryMessage, cancellationToken).ConfigureAwait(false); } } } } private async Task EnsureTableAsync(CancellationToken cancellationToken) { _ = cancellationToken; await Task.Run(() => _db.CodeFirst.InitTables(), cancellationToken).ConfigureAwait(false); } private void OnNetworkStatusChanged(bool isOnline) { if (!isOnline) { return; } _ = FlushPendingAsync(default); } }