集群问题处理

This commit is contained in:
geht
2026-06-09 18:26:31 +08:00
parent 5b8bd2797a
commit de48bd2324
19 changed files with 626 additions and 17 deletions

View File

@@ -859,6 +859,26 @@ jeecgboot-vue3/src/views/xslmes/approval/integration/components/DingApprovalFore
jeecgboot-vue3/src/views/xslmes/approval/integration/components/MesXslApprovalTraceDrawer.vue
jeecgboot-vue3/src/views/xslmes/approval/integration/MesXslApprovalTrace.data.ts
-- author:GHT---date:20260609--for: 【钉钉Stream开发】第三方配置页可视化Stream接收节点 -----
jeecg-boot/jeecg-module-system/jeecg-system-start/src/main/resources/flyway/sql/mysql/V3.9.2_145__sys_third_app_config_stream_node.sql
jeecg-boot/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/entity/SysThirdAppConfig.java
jeecg-boot/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/service/impl/ThirdAppDingtalkServiceImpl.java
jeecg-boot/jeecg-boot-module/jeecg-module-xslmes/src/main/java/org/jeecg/modules/xslmes/dingtalk/stream/DingTalkStreamNodeConfigService.java
jeecg-boot/jeecg-boot-module/jeecg-module-xslmes/src/main/java/org/jeecg/modules/xslmes/dingtalk/controller/DingTalkStreamConfigController.java
jeecgboot-vue3/src/views/system/appconfig/ThirdApp.data.ts
jeecgboot-vue3/src/views/system/appconfig/ThirdApp.api.ts
jeecgboot-vue3/src/views/system/appconfig/ThirdAppConfigModal.vue
jeecgboot-vue3/src/views/system/appconfig/ThirdAppDingTalkConfigForm.vue
-- author:GHT---date:20260609--for: 【钉钉Stream开发】本机白名单仅指定电脑接收回调 -----
jeecg-boot/jeecg-boot-module/jeecg-module-xslmes/src/main/java/org/jeecg/modules/xslmes/dingtalk/stream/DingTalkStreamProperties.java
jeecg-boot/jeecg-boot-module/jeecg-module-xslmes/src/main/java/org/jeecg/modules/xslmes/dingtalk/stream/DingTalkStreamClient.java
jeecg-boot/jeecg-boot-module/jeecg-module-xslmes/src/main/java/org/jeecg/modules/xslmes/dingtalk/stream/DingApprovalReconcileScheduler.java
jeecg-boot/jeecg-boot-module/jeecg-module-xslmes/src/main/java/org/jeecg/modules/xslmes/dingtalk/stream/DingTalkStreamHealthMonitor.java
jeecg-boot/jeecg-module-system/jeecg-system-start/src/main/resources/application-dev.yml
jeecg-boot/jeecg-module-system/jeecg-system-start/src/main/resources/application-dev-local.yml.example
jeecg-boot/.gitignore
-- author:GHT---date:20260609--for: 【钉钉Stream集群】Redis选主单节点建连+存活监控 -----
jeecg-boot/jeecg-boot-module/jeecg-module-xslmes/src/main/java/org/jeecg/modules/xslmes/dingtalk/stream/DingTalkStreamProperties.java
jeecg-boot/jeecg-boot-module/jeecg-module-xslmes/src/main/java/org/jeecg/modules/xslmes/dingtalk/stream/DingTalkStreamLeaderElection.java

View File

@@ -0,0 +1,39 @@
package org.jeecg.modules.xslmes.dingtalk.controller;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.jeecg.common.api.vo.Result;
import org.jeecg.modules.xslmes.dingtalk.stream.DingTalkStreamNodeConfigService;
import org.jeecg.modules.xslmes.dingtalk.stream.DingTalkStreamSdkRunner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* 钉钉 Stream 节点配置辅助接口(供「第三方配置-钉钉集成」页面展示本机信息)。
*/
@Tag(name = "钉钉Stream配置")
@RestController
@RequestMapping("/xslmes/dingtalk/stream")
public class DingTalkStreamConfigController {
@Autowired
private DingTalkStreamNodeConfigService nodeConfigService;
//update-begin---author:GHT ---date:20260609 for【钉钉Stream开发】第三方配置页展示本机节点信息-----------
@Operation(summary = "获取本机 Stream 节点信息")
@GetMapping("/nodeInfo")
public Result<Map<String, Object>> nodeInfo() {
Map<String, Object> data = new LinkedHashMap<>(nodeConfigService.buildNodeInfoSnapshot());
DingTalkStreamSdkRunner.ConnectionSnapshot snap = DingTalkStreamSdkRunner.snapshot();
data.put("streamRunning", snap.streamRunning());
data.put("totalEvents", snap.totalEventCount());
data.put("reconnectCount", snap.reconnectCount());
return Result.OK(data);
}
//update-end---author:GHT ---date:20260609 for【钉钉Stream开发】第三方配置页展示本机节点信息-----------
}

View File

@@ -61,9 +61,15 @@ public class DingApprovalReconcileScheduler {
@Autowired
private DingBpmsEventProcessor eventProcessor;
@Autowired
private DingTalkStreamNodeConfigService nodeConfigService;
//update-begin---author:GHT ---date:20260609 for【钉钉Stream补偿扫描】漏推回调自动修复-----
@Scheduled(fixedDelay = SWEEP_INTERVAL_MS)
public void reconcile() {
if (!nodeConfigService.isThisNodeReceiver()) {
return;
}
long sweepStart = System.currentTimeMillis();
Date cutoff = new Date(sweepStart - MIN_AGE_MS);

View File

@@ -40,7 +40,7 @@ public class DingTalkStreamClient implements SmartLifecycle {
private DingStreamCallbackLogHelper callbackLogHelper;
@Autowired
private DingTalkStreamProperties streamProperties;
private DingTalkStreamNodeConfigService nodeConfigService;
@Autowired
private DingTalkStreamLeaderElection leaderElection;
@@ -69,7 +69,7 @@ public class DingTalkStreamClient implements SmartLifecycle {
public void stop() {
running = false;
stopStreamClient();
if (streamProperties.isClusterMode()) {
if (nodeConfigService.isClusterMode()) {
leaderElection.release();
}
log.info("{} Stream 客户端已停止", LOG_TAG);
@@ -85,12 +85,21 @@ public class DingTalkStreamClient implements SmartLifecycle {
//update-begin---author:GHT ---date:20260609 for【钉钉Stream集群】Redis选主+心跳重连生命周期管理-----
private void initSdkClient() {
//update-begin---author:GHT ---date:20260609 for【钉钉Stream开发】非接收节点跳过建连-----------
if (!nodeConfigService.isThisNodeReceiver()) {
log.info("{} 本节点未启用钉钉 Stream 接收 host={} localIps={}"
+ "请在【系统管理-第三方配置-钉钉集成】配置 Stream 接收节点白名单",
LOG_TAG, nodeConfigService.resolveLocalHostName(), nodeConfigService.resolveLocalIpAddresses());
return;
}
//update-end---author:GHT ---date:20260609 for【钉钉Stream开发】非接收节点跳过建连-----------
String[] creds = resolveCredentials();
if (creds == null) {
return;
}
if (!streamProperties.isClusterMode()) {
if (!nodeConfigService.isClusterMode()) {
log.info("{} 单实例模式cluster-mode=false本节点直接建立 Stream 连接", LOG_TAG);
try {
startStreamClient(creds);
@@ -116,7 +125,7 @@ public class DingTalkStreamClient implements SmartLifecycle {
try {
if (streamActive) {
if (leaderElection.renew()) {
sleepQuietly(streamProperties.getLeaderRenewIntervalMs());
sleepQuietly(nodeConfigService.getLeaderRenewIntervalMs());
continue;
}
log.warn("{} Leader 锁续期失败,断开 Stream 并降级为 Follower instanceId={} currentLeader={}",
@@ -124,25 +133,25 @@ public class DingTalkStreamClient implements SmartLifecycle {
stopStreamClient();
streamActive = false;
leaderElection.release();
sleepQuietly(streamProperties.getFollowerRetryIntervalMs());
sleepQuietly(nodeConfigService.getFollowerRetryIntervalMs());
continue;
}
if (leaderElection.tryAcquire()) {
startStreamClient(creds);
streamActive = true;
sleepQuietly(streamProperties.getLeaderRenewIntervalMs());
sleepQuietly(nodeConfigService.getLeaderRenewIntervalMs());
} else {
log.debug("{} 本节点为 Follower等待 Leader 释放锁 instanceId={} currentLeader={}",
LOG_TAG, leaderElection.instanceId(), leaderElection.currentHolder());
sleepQuietly(streamProperties.getFollowerRetryIntervalMs());
sleepQuietly(nodeConfigService.getFollowerRetryIntervalMs());
}
} catch (Exception e) {
log.error("{} Stream 集群生命周期异常: {}", LOG_TAG, e.getMessage(), e);
stopStreamClient();
streamActive = false;
leaderElection.release();
sleepQuietly(streamProperties.getFollowerRetryIntervalMs());
sleepQuietly(nodeConfigService.getFollowerRetryIntervalMs());
}
}

View File

@@ -15,7 +15,7 @@ public class DingTalkStreamHealthMonitor {
private static final String LOG_TAG = DingTalkStreamSdkRunner.LOG_TAG;
@Autowired
private DingTalkStreamProperties properties;
private DingTalkStreamNodeConfigService nodeConfigService;
@Autowired
private DingTalkStreamLeaderElection leaderElection;
@@ -23,10 +23,15 @@ public class DingTalkStreamHealthMonitor {
//update-begin---author:GHT ---date:20260609 for【钉钉Stream监控】定时输出存活与心跳状态-----------
@Scheduled(fixedDelayString = "${jeecg.xslmes.dingtalk.stream.health-log-interval-ms:60000}")
public void reportHealth() {
if (!nodeConfigService.isThisNodeReceiver()) {
log.debug("{} Stream存活状态 role=DISABLED host={}(本节点未配置为钉钉回调接收机)",
LOG_TAG, nodeConfigService.resolveLocalHostName());
return;
}
DingTalkStreamSdkRunner.ConnectionSnapshot snap = DingTalkStreamSdkRunner.snapshot();
long now = System.currentTimeMillis();
boolean clusterMode = properties.isClusterMode();
boolean clusterMode = nodeConfigService.isClusterMode();
String role;
String leaderHolder;
if (!clusterMode) {
@@ -51,9 +56,9 @@ public class DingTalkStreamHealthMonitor {
if ("LEADER".equals(role) && snap.streamRunning()
&& snap.lastEventAtMs() > 0
&& (now - snap.lastEventAtMs()) > properties.getIdleWarnSeconds() * 1000L) {
&& (now - snap.lastEventAtMs()) > nodeConfigService.getIdleWarnSeconds() * 1000L) {
log.warn("{} Stream长时间无推送可能业务空闲或连接异常idleSec={} thresholdSec={}",
LOG_TAG, lastEventAgoSec, properties.getIdleWarnSeconds());
LOG_TAG, lastEventAgoSec, nodeConfigService.getIdleWarnSeconds());
}
}
//update-end---author:GHT ---date:20260609 for【钉钉Stream监控】定时输出存活与心跳状态-----------

View File

@@ -0,0 +1,192 @@
package org.jeecg.modules.xslmes.dingtalk.stream;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.util.oConvertUtils;
import org.jeecg.modules.system.entity.SysThirdAppConfig;
import org.jeecg.modules.system.service.impl.ThirdAppDingtalkServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* 钉钉 Stream 接收节点运行时配置DB 优先YAML 兜底)。
* <p>
* 配置来源:第三方应用「钉钉集成」页面保存的 {@link SysThirdAppConfig}stream_enabled=1 的记录)。
*/
@Slf4j
@Service
public class DingTalkStreamNodeConfigService {
private static final String LOG_TAG = DingTalkStreamSdkRunner.LOG_TAG;
@Autowired
private ThirdAppDingtalkServiceImpl dingtalkService;
@Autowired
private DingTalkStreamProperties yamlProperties;
private volatile ResolvedConfig cachedConfig;
private volatile long cachedAtMs;
//update-begin---author:GHT ---date:20260609 for【钉钉Stream开发】DB配置优先于YAML控制接收节点-----------
public boolean isThisNodeReceiver() {
ResolvedConfig config = resolveConfig();
if (!config.receiverEnabled) {
return false;
}
boolean hasHostRule = !config.designatedHosts.isEmpty();
boolean hasIpRule = !config.designatedIps.isEmpty();
if (!hasHostRule && !hasIpRule) {
return true;
}
if (hasHostRule && matchesHost(config.designatedHosts)) {
return true;
}
return hasIpRule && matchesIp(config.designatedIps);
}
public boolean isClusterMode() {
return resolveConfig().clusterMode;
}
public long getLeaderRenewIntervalMs() {
return yamlProperties.getLeaderRenewIntervalMs();
}
public long getFollowerRetryIntervalMs() {
return yamlProperties.getFollowerRetryIntervalMs();
}
public long getIdleWarnSeconds() {
return yamlProperties.getIdleWarnSeconds();
}
public String resolveLocalHostName() {
return yamlProperties.resolveLocalHostName();
}
public List<String> resolveLocalIpAddresses() {
return yamlProperties.resolveLocalIpAddresses();
}
/** 配置变更后由定时任务自动刷新;保存第三方配置后最多 30 秒内生效 */
@Scheduled(fixedDelay = 30_000L)
public void refreshCache() {
cachedConfig = null;
}
private ResolvedConfig resolveConfig() {
long now = System.currentTimeMillis();
if (cachedConfig != null && now - cachedAtMs < 25_000L) {
return cachedConfig;
}
synchronized (this) {
if (cachedConfig != null && now - cachedAtMs < 25_000L) {
return cachedConfig;
}
cachedConfig = loadFromDbAndYaml();
cachedAtMs = System.currentTimeMillis();
return cachedConfig;
}
}
private ResolvedConfig loadFromDbAndYaml() {
SysThirdAppConfig db = dingtalkService.getStreamMasterConfig();
boolean receiverEnabled = yamlProperties.isReceiverEnabled();
List<String> ips = new ArrayList<>(yamlProperties.getDesignatedIps());
List<String> hosts = new ArrayList<>(yamlProperties.getDesignatedHosts());
boolean clusterMode = yamlProperties.isClusterMode();
String source = "YAML";
if (db != null) {
if (db.getStreamReceiverEnabled() != null) {
receiverEnabled = db.getStreamReceiverEnabled() == 1;
}
// null 表示未在页面配置,沿用 YAML空字符串表示明确清空白名单
if (db.getStreamDesignatedIps() != null) {
ips = splitCsv(db.getStreamDesignatedIps());
}
if (db.getStreamDesignatedHosts() != null) {
hosts = splitCsv(db.getStreamDesignatedHosts());
}
if (db.getStreamClusterMode() != null) {
clusterMode = db.getStreamClusterMode() == 1;
}
source = "DB";
}
log.debug("{} Stream节点配置已加载 source={} receiverEnabled={} ips={} hosts={} clusterMode={}",
LOG_TAG, source, receiverEnabled, ips, hosts, clusterMode);
return new ResolvedConfig(receiverEnabled, ips, hosts, clusterMode);
}
private List<String> splitCsv(String raw) {
if (oConvertUtils.isEmpty(raw)) {
return Collections.emptyList();
}
return Arrays.stream(raw.split("[,;\\s]+"))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
}
private boolean matchesHost(List<String> allowedHosts) {
String host = resolveLocalHostName();
for (String allowed : allowedHosts) {
if (allowed.equalsIgnoreCase(host)) {
return true;
}
}
return false;
}
private boolean matchesIp(List<String> allowedIps) {
List<String> localIps = resolveLocalIpAddresses();
for (String allowed : allowedIps) {
for (String localIp : localIps) {
if (allowed.equals(localIp)) {
return true;
}
}
}
return false;
}
/**
* 供第三方配置页展示:本机网络信息与当前是否接收 Stream。
*/
public java.util.Map<String, Object> buildNodeInfoSnapshot() {
ResolvedConfig config = resolveConfig();
java.util.Map<String, Object> map = new java.util.LinkedHashMap<>();
map.put("hostName", resolveLocalHostName());
map.put("localIps", resolveLocalIpAddresses());
map.put("thisNodeReceiver", isThisNodeReceiver());
map.put("receiverEnabled", config.receiverEnabled);
map.put("designatedIps", config.designatedIps);
map.put("designatedHosts", config.designatedHosts);
map.put("clusterMode", config.clusterMode);
return map;
}
private static final class ResolvedConfig {
private final boolean receiverEnabled;
private final List<String> designatedIps;
private final List<String> designatedHosts;
private final boolean clusterMode;
private ResolvedConfig(boolean receiverEnabled, List<String> designatedIps,
List<String> designatedHosts, boolean clusterMode) {
this.receiverEnabled = receiverEnabled;
this.designatedIps = designatedIps != null ? designatedIps : Collections.emptyList();
this.designatedHosts = designatedHosts != null ? designatedHosts : Collections.emptyList();
this.clusterMode = clusterMode;
}
}
//update-end---author:GHT ---date:20260609 for【钉钉Stream开发】DB配置优先于YAML控制接收节点-----------
}

View File

@@ -1,9 +1,14 @@
package org.jeecg.modules.xslmes.dingtalk.stream;
import lombok.Data;
import org.jeecg.common.util.oConvertUtils;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
/**
* 钉钉 Stream 集群与监控配置。
*/
@@ -12,6 +17,24 @@ import org.springframework.stereotype.Component;
@ConfigurationProperties(prefix = "jeecg.xslmes.dingtalk.stream")
public class DingTalkStreamProperties {
/**
* 本节点是否参与钉钉 Stream 接收(含补偿扫描)。
* 共享 dev 环境建议默认 false开发者在本机 application-dev-local.yml 中设为 true。
*/
private boolean receiverEnabled = true;
/**
* 允许接收钉钉回调的主机名白名单(不区分大小写,可选)。
* 与 designated-ips 二选一或同时配置:满足任一即视为本机接收节点。
*/
private List<String> designatedHosts = new ArrayList<>();
/**
* 允许接收钉钉回调的本机 IP 白名单(推荐,比主机名更稳定)。
* 匹配本机网卡 IPv4/IPv6 地址,与 designated-hosts 满足其一即可。
*/
private List<String> designatedIps = new ArrayList<>();
/**
* 集群模式true 时通过 Redis 选主,仅 Leader 节点建立 Stream 长连接。
* 多实例生产环境务必保持 true单实例可设为 false 简化部署。
@@ -29,4 +52,88 @@ public class DingTalkStreamProperties {
/** 无事件空闲告警阈值Leader 且连接中超过该时间无推送则 warn */
private long idleWarnSeconds = 1800L;
//update-begin---author:GHT ---date:20260609 for【钉钉Stream开发】本机白名单仅指定电脑接收回调-----------
/**
* 当前节点是否应接收钉钉 Stream 推送并执行补偿扫描。
*/
public boolean isThisNodeReceiver() {
if (!receiverEnabled) {
return false;
}
boolean hasHostRule = designatedHosts != null && !designatedHosts.isEmpty();
boolean hasIpRule = designatedIps != null && !designatedIps.isEmpty();
if (!hasHostRule && !hasIpRule) {
return true;
}
if (hasHostRule && matchesDesignatedHost()) {
return true;
}
return hasIpRule && matchesDesignatedIp();
}
private boolean matchesDesignatedHost() {
String host = resolveLocalHostName();
for (String allowed : designatedHosts) {
if (oConvertUtils.isNotEmpty(allowed) && allowed.equalsIgnoreCase(host)) {
return true;
}
}
return false;
}
private boolean matchesDesignatedIp() {
List<String> localIps = resolveLocalIpAddresses();
for (String allowed : designatedIps) {
if (oConvertUtils.isEmpty(allowed)) {
continue;
}
for (String localIp : localIps) {
if (allowed.equals(localIp)) {
return true;
}
}
}
return false;
}
public String resolveLocalHostName() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
return "unknown-host";
}
}
/**
* 采集本机所有网卡 IP含 127.0.0.1),供白名单匹配与启动日志展示。
*/
public List<String> resolveLocalIpAddresses() {
List<String> ips = new ArrayList<>();
try {
java.util.Enumeration<java.net.NetworkInterface> interfaces = java.net.NetworkInterface.getNetworkInterfaces();
while (interfaces != null && interfaces.hasMoreElements()) {
java.net.NetworkInterface ni = interfaces.nextElement();
java.util.Enumeration<InetAddress> addresses = ni.getInetAddresses();
while (addresses.hasMoreElements()) {
String ip = addresses.nextElement().getHostAddress();
if (oConvertUtils.isNotEmpty(ip) && !ips.contains(ip)) {
ips.add(ip);
}
}
}
} catch (Exception ignored) {
// 回退到 LocalHost
}
try {
String localIp = InetAddress.getLocalHost().getHostAddress();
if (oConvertUtils.isNotEmpty(localIp) && !ips.contains(localIp)) {
ips.add(localIp);
}
} catch (Exception ignored) {
// 忽略
}
return ips;
}
//update-end---author:GHT ---date:20260609 for【钉钉Stream开发】本机白名单仅指定电脑接收回调-----------
}