59 lines
2.0 KiB
C#
59 lines
2.0 KiB
C#
|
|
using System.Net.Http.Json;
|
||
|
|
using System.Net.Http;
|
||
|
|
using YY.Admin.Core.Models;
|
||
|
|
using YY.Admin.Infrastructure.Storage;
|
||
|
|
|
||
|
|
namespace YY.Admin.Infrastructure.Sync;
|
||
|
|
|
||
|
|
public class HttpSyncClient
|
||
|
|
{
|
||
|
|
private readonly IHttpClientFactory _httpClientFactory;
|
||
|
|
private readonly TokenStore _tokenStore;
|
||
|
|
|
||
|
|
private sealed class SyncBatchItem
|
||
|
|
{
|
||
|
|
public string MessageId { get; set; } = string.Empty;
|
||
|
|
public string AggregateType { get; set; } = string.Empty;
|
||
|
|
public string AggregateId { get; set; } = string.Empty;
|
||
|
|
public string EventType { get; set; } = string.Empty;
|
||
|
|
public string Payload { get; set; } = string.Empty;
|
||
|
|
public DateTime OccurredAt { get; set; }
|
||
|
|
}
|
||
|
|
|
||
|
|
public HttpSyncClient(IHttpClientFactory httpClientFactory, TokenStore tokenStore)
|
||
|
|
{
|
||
|
|
_httpClientFactory = httpClientFactory;
|
||
|
|
_tokenStore = tokenStore;
|
||
|
|
}
|
||
|
|
|
||
|
|
public async Task<bool> SendBatchAsync(IReadOnlyCollection<OutboxMessage> messages, CancellationToken cancellationToken)
|
||
|
|
{
|
||
|
|
if (messages.Count == 0)
|
||
|
|
{
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
|
||
|
|
var client = _httpClientFactory.CreateClient("JeecgApi");
|
||
|
|
var body = messages.Select(m => new SyncBatchItem
|
||
|
|
{
|
||
|
|
MessageId = m.Id,
|
||
|
|
AggregateType = m.AggregateType,
|
||
|
|
AggregateId = m.AggregateId,
|
||
|
|
EventType = m.EventType,
|
||
|
|
Payload = m.Payload,
|
||
|
|
OccurredAt = m.CreatedAt
|
||
|
|
}).ToList();
|
||
|
|
|
||
|
|
using var response = await client.PostAsJsonAsync("/sys/sync/batch", body, cancellationToken).ConfigureAwait(false);
|
||
|
|
if (response.Headers.TryGetValues("X-Refresh-Token", out var values))
|
||
|
|
{
|
||
|
|
var refreshed = values.FirstOrDefault();
|
||
|
|
if (!string.IsNullOrWhiteSpace(refreshed))
|
||
|
|
{
|
||
|
|
await _tokenStore.UpdateTokenAsync(refreshed, cancellationToken).ConfigureAwait(false);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return response.IsSuccessStatusCode;
|
||
|
|
}
|
||
|
|
}
|