钉钉回调事件处理

This commit is contained in:
geht
2026-06-09 17:52:33 +08:00
parent fd5205e33e
commit 5b8bd2797a
50 changed files with 2861 additions and 428 deletions

View File

@@ -859,6 +859,14 @@ jeecgboot-vue3/src/views/xslmes/approval/integration/components/DingApprovalFore
jeecgboot-vue3/src/views/xslmes/approval/integration/components/MesXslApprovalTraceDrawer.vue
jeecgboot-vue3/src/views/xslmes/approval/integration/MesXslApprovalTrace.data.ts
-- author:GHT---date:20260609--for: 【钉钉Stream集群】Redis选主单节点建连+存活监控 -----
jeecg-boot/jeecg-boot-module/jeecg-module-xslmes/src/main/java/org/jeecg/modules/xslmes/dingtalk/stream/DingTalkStreamProperties.java
jeecg-boot/jeecg-boot-module/jeecg-module-xslmes/src/main/java/org/jeecg/modules/xslmes/dingtalk/stream/DingTalkStreamLeaderElection.java
jeecg-boot/jeecg-boot-module/jeecg-module-xslmes/src/main/java/org/jeecg/modules/xslmes/dingtalk/stream/DingTalkStreamHealthMonitor.java
jeecg-boot/jeecg-boot-module/jeecg-module-xslmes/src/main/java/org/jeecg/modules/xslmes/dingtalk/stream/DingTalkStreamClient.java
jeecg-boot/jeecg-boot-module/jeecg-module-xslmes/src/main/java/org/jeecg/modules/xslmes/dingtalk/stream/DingTalkStreamSdkRunner.java
jeecg-boot/jeecg-module-system/jeecg-system-start/src/main/resources/application-dev.yml
-- author:cursor---date:20260608--for: 【XSLMES-20260608-A01】混炼示方新增状态字段及列表查询条件 -----
jeecg-boot/jeecg-module-system/jeecg-system-start/src/main/resources/flyway/sql/mysql/V3.9.2_141__mes_xsl_mixing_spec_status.sql
jeecg-boot/jeecg-boot-module/jeecg-module-xslmes/src/main/java/org/jeecg/modules/xslmes/entity/MesXslMixingSpec.java

View File

@@ -390,9 +390,10 @@ public class MesXslApprovalFlowController extends JeecgController<MesXslApproval
}
//update-begin---author:GHT ---date:20260605 for【XSLMES-20260605-K8R2】从审批注册中心解析启用环节-----
//update-begin---author:GHT ---date:20260609 for【审批注册中心】移除 byField 引用,操作人由痕迹表承载-----------
/**
* 从审批注册中心读取已启用环节,映射为流程设计器候选节点。
* 返回有序列表:[{stageKey, stageName, nodeType, field, fieldComment}]
* 返回有序列表:[{stageKey, stageName, nodeType}]
*/
private List<Map<String, Object>> parseRegistryStages(String table) {
List<Map<String, Object>> stages = new ArrayList<>();
@@ -402,25 +403,23 @@ public class MesXslApprovalFlowController extends JeecgController<MesXslApproval
}
java.util.Set<String> enabled = ApprovalStageResolver.parseEnabledStages(registry.getEnabledStages());
String[][] ordered = new String[][]{
{ApprovalStageResolver.STAGE_PROOFREAD, "校对", registry.getProofreadByField()},
{ApprovalStageResolver.STAGE_AUDIT, "审核", registry.getAuditByField()},
{ApprovalStageResolver.STAGE_APPROVE, "批准", registry.getApproveByField()},
{ApprovalStageResolver.STAGE_PROOFREAD, "校对"},
{ApprovalStageResolver.STAGE_AUDIT, "审核"},
{ApprovalStageResolver.STAGE_APPROVE, "批准"},
};
for (String[] item : ordered) {
String stageKey = item[0];
if (!enabled.contains(stageKey) || oConvertUtils.isEmpty(item[2])) {
if (!enabled.contains(item[0])) {
continue;
}
Map<String, Object> stage = new LinkedHashMap<>();
stage.put("stageKey", stageKey);
stage.put("stageKey", item[0]);
stage.put("stageName", item[1]);
stage.put("nodeType", "approver");
stage.put("field", item[2]);
stage.put("fieldComment", item[1] + "");
stages.add(stage);
}
return stages;
}
//update-end---author:GHT ---date:20260609 for【审批注册中心】移除 byField 引用,操作人由痕迹表承载-----------
//update-end---author:GHT ---date:20260605 for【XSLMES-20260605-K8R2】从审批注册中心解析启用环节-----
/** 按业务表+租户查找审批流(取最近一条) */

View File

@@ -0,0 +1,258 @@
package org.jeecg.modules.xslmes.approval.integration.advice;
import com.alibaba.fastjson2.JSON;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import jakarta.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.util.oConvertUtils;
import org.jeecg.modules.xslmes.approval.integration.entity.MesXslApprovalTrace;
import org.jeecg.modules.xslmes.approval.integration.entity.MesXslBizDocRegistry;
import org.jeecg.modules.xslmes.approval.integration.service.IMesXslApprovalTraceService;
import org.jeecg.modules.xslmes.approval.integration.service.IMesXslBizDocRegistryService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.MethodParameter;
import org.springframework.http.MediaType;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyAdvice;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* 审批痕迹自动注入增强器
*
* <p>当审批注册中心配置了 listApiPath 后,拦截匹配 URL 的列表响应,
* 自动 LEFT JOIN mes_xsl_approval_trace将痕迹字段traceProofreadBy 等)
* 注入到每条记录中,无需修改业务代码。
*
* @author GHT
* @date 2026-06-08 for【XSLMES-20260608-TRACE】审批痕迹响应自动注入
*/
//update-begin---author:GHT ---date:20260608 for【XSLMES-20260608-TRACE】审批痕迹响应自动注入-----------
@ControllerAdvice
@Slf4j
@SuppressWarnings({"unchecked", "rawtypes"})
public class ApprovalTraceResponseAdvice implements ResponseBodyAdvice<Object> {
@Autowired
private IMesXslBizDocRegistryService registryService;
@Autowired
private IMesXslApprovalTraceService traceService;
/** 路径缓存条目 */
private static class CacheEntry {
final String tableName;
/** enabledStages 集合,如 {"proofread","audit","approve"} */
final java.util.Set<String> enabledStages;
CacheEntry(String tableName, java.util.Set<String> enabledStages) {
this.tableName = tableName;
this.enabledStages = enabledStages;
}
}
/** path → CacheEntry 缓存1 分钟 TTL*/
private volatile Map<String, CacheEntry> pathToEntryCache = Collections.emptyMap();
private volatile long cacheLoadTime = 0L;
private static final long CACHE_TTL_MS = 60_000L;
@Override
public boolean supports(MethodParameter returnType,
Class<? extends HttpMessageConverter<?>> converterType) {
return Result.class.isAssignableFrom(returnType.getParameterType());
}
@Override
public Object beforeBodyWrite(Object body,
MethodParameter returnType,
MediaType selectedContentType,
Class<? extends HttpMessageConverter<?>> selectedConverterType,
ServerHttpRequest request,
ServerHttpResponse response) {
if (!(body instanceof Result)) {
return body;
}
String path = extractServletPath(request);
CacheEntry entry = resolveEntry(path);
if (entry == null) {
return body;
}
Result result = (Result) body;
Object data = result.getResult();
List<?> records = null;
IPage page = null;
if (data instanceof IPage) {
page = (IPage) data;
records = page.getRecords();
} else if (data instanceof List) {
records = (List<?>) data;
}
if (records == null || records.isEmpty()) {
return body;
}
List<String> ids = extractIds(records);
Map<String, MesXslApprovalTrace> traceMap = Collections.emptyMap();
if (!ids.isEmpty()) {
try {
traceMap = traceService.batchQueryByBizIds(entry.tableName, ids);
} catch (Exception e) {
log.warn("[审批痕迹注入] 批量查询失败 table={} path={}: {}", entry.tableName, path, e.getMessage());
}
}
List<Map<String, Object>> enriched = enrichRecords(records, traceMap, entry.enabledStages);
if (page != null) {
((Page) page).setRecords(enriched);
} else {
result.setResult(enriched);
}
return body;
}
private String extractServletPath(ServerHttpRequest request) {
if (request instanceof ServletServerHttpRequest) {
HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
String path = servletRequest.getServletPath();
return oConvertUtils.isNotEmpty(path) ? path : request.getURI().getPath();
}
return request.getURI().getPath();
}
private CacheEntry resolveEntry(String path) {
if (oConvertUtils.isEmpty(path)) {
return null;
}
ensureCacheLoaded();
return pathToEntryCache.get(path);
}
private void ensureCacheLoaded() {
long now = System.currentTimeMillis();
if (now - cacheLoadTime > CACHE_TTL_MS) {
synchronized (this) {
if (now - cacheLoadTime > CACHE_TTL_MS) {
reloadCache();
cacheLoadTime = now;
}
}
}
}
private void reloadCache() {
try {
List<MesXslBizDocRegistry> registries = registryService.lambdaQuery()
.eq(MesXslBizDocRegistry::getEnabled, 1)
.isNotNull(MesXslBizDocRegistry::getListApiPath)
.list();
Map<String, CacheEntry> map = new HashMap<>();
for (MesXslBizDocRegistry reg : registries) {
if (oConvertUtils.isEmpty(reg.getListApiPath()) || oConvertUtils.isEmpty(reg.getTableName())) {
continue;
}
java.util.Set<String> stages = parseStages(reg.getEnabledStages());
CacheEntry entry = new CacheEntry(reg.getTableName(), stages);
for (String p : reg.getListApiPath().split(",")) {
String trimmed = p.trim();
if (oConvertUtils.isNotEmpty(trimmed)) {
map.put(trimmed, entry);
}
}
}
pathToEntryCache = map;
log.debug("[审批痕迹注入] 路径缓存已刷新,共 {} 条路径映射", map.size());
} catch (Exception e) {
log.warn("[审批痕迹注入] 路径缓存刷新失败: {}", e.getMessage());
}
}
private java.util.Set<String> parseStages(String enabledStages) {
java.util.Set<String> set = new java.util.LinkedHashSet<>();
if (oConvertUtils.isEmpty(enabledStages)) {
return set;
}
for (String s : enabledStages.split(",")) {
String t = s.trim();
if (oConvertUtils.isNotEmpty(t)) {
set.add(t);
}
}
return set;
}
private List<String> extractIds(List<?> records) {
List<String> ids = new ArrayList<>(records.size());
for (Object r : records) {
if (r == null) {
continue;
}
Object id = null;
if (r instanceof Map) {
id = ((Map<?, ?>) r).get("id");
} else {
try {
id = r.getClass().getMethod("getId").invoke(r);
} catch (Exception ignored) {
// 无 getId 方法时跳过
}
}
if (id != null) {
String idStr = String.valueOf(id);
if (oConvertUtils.isNotEmpty(idStr)) {
ids.add(idStr);
}
}
}
return ids;
}
private List<Map<String, Object>> enrichRecords(List<?> records,
Map<String, MesXslApprovalTrace> traceMap,
java.util.Set<String> enabledStages) {
List<Map<String, Object>> enriched = new ArrayList<>(records.size());
for (Object r : records) {
if (r == null) {
continue;
}
Map<String, Object> map;
if (r instanceof Map) {
map = new LinkedHashMap<>((Map<String, Object>) r);
} else {
// 实体类转 Map保留序列化配置如 @JsonFormat
map = new LinkedHashMap<>(JSON.parseObject(JSON.toJSONString(r), Map.class));
}
Object idObj = map.get("id");
MesXslApprovalTrace trace = (idObj != null) ? traceMap.get(String.valueOf(idObj)) : null;
// 对每个启用的环节,始终注入字段(无痕迹时为 null使前端能感知注册了哪些列
if (enabledStages.contains("proofread")) {
map.put("traceProofreadBy", trace != null ? trace.getProofreadBy() : null);
map.put("traceProofreadTime", trace != null ? trace.getProofreadTime() : null);
}
if (enabledStages.contains("audit")) {
map.put("traceAuditBy", trace != null ? trace.getAuditBy() : null);
map.put("traceAuditTime", trace != null ? trace.getAuditTime() : null);
}
if (enabledStages.contains("approve")) {
map.put("traceApproveBy", trace != null ? trace.getApproveBy() : null);
map.put("traceApproveTime", trace != null ? trace.getApproveTime() : null);
}
enriched.add(map);
}
return enriched;
}
}
//update-end---author:GHT ---date:20260608 for【XSLMES-20260608-TRACE】审批痕迹响应自动注入-----------

View File

@@ -20,10 +20,15 @@ import org.jeecg.modules.xslmes.approval.integration.vo.DingProcessForecastVO;
import org.jeecg.modules.xslmes.approval.integration.vo.DingProcessInstanceFlowVO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import java.util.Map;
/**
* 审批痕迹明细
*
@@ -62,6 +67,20 @@ public class MesXslApprovalTraceController extends JeecgController<MesXslApprova
return entity != null ? Result.OK(entity) : Result.error("未找到对应数据");
}
//update-begin---author:GHT ---date:20260608 for【XSLMES-20260608-TRACE】批量查询痕迹供前端关联展示-----------
@Operation(summary = "审批痕迹-批量查询bizTable + 单据ID列表供前端或内部关联展示")
@RequiresPermissions("xslmes:mes_xsl_approval_trace:list")
@PostMapping("/batchByBizIds")
public Result<Map<String, MesXslApprovalTrace>> batchByBizIds(
@RequestParam String bizTable,
@RequestBody List<String> bizDataIds) {
if (oConvertUtils.isEmpty(bizTable) || bizDataIds == null || bizDataIds.isEmpty()) {
return Result.error("bizTable 与 bizDataIds 不能为空");
}
return Result.OK(traceService.batchQueryByBizIds(bizTable, bizDataIds));
}
//update-end---author:GHT ---date:20260608 for【XSLMES-20260608-TRACE】批量查询痕迹供前端关联展示-----------
@Operation(summary = "审批痕迹-按业务表与单据ID查询供业务页关联展示")
@RequiresPermissions("xslmes:mes_xsl_approval_trace:list")
@GetMapping("/queryByBiz")

View File

@@ -56,21 +56,54 @@ public final class IntegrationActionConfigHelper {
return RegistryStageFieldHelper.defaultExpectedFrom(stage);
}
//update-begin---author:GHT ---date:20260609 for【审批环节同步】通过后状态与审批环节解耦业务表状态由 statusAfter 控制-----------
/**
* 解析环节通过后业务表应写入的状态值。
* 未配置时回退为审批环节码(兼容旧数据)。
*/
public static String resolveStatusAfter(MesXslIntegrationAction action, String stage) {
if (action != null && oConvertUtils.isNotEmpty(action.getActionConfig())) {
try {
JSONObject cfg = JSONObject.parseObject(action.getActionConfig());
if (cfg.containsKey("statusAfter")) {
String v = cfg.getString("statusAfter");
return oConvertUtils.isEmpty(v) ? null : v.trim();
}
JSONObject registryStage = cfg.getJSONObject("registryStage");
if (registryStage != null && registryStage.containsKey("statusAfter")) {
String v = registryStage.getString("statusAfter");
return oConvertUtils.isEmpty(v) ? null : v.trim();
}
} catch (Exception ignored) {
// fallback
}
}
return oConvertUtils.isNotEmpty(stage) ? stage.trim() : null;
}
//update-end---author:GHT ---date:20260609 for【审批环节同步】通过后状态与审批环节解耦业务表状态由 statusAfter 控制-----------
//update-begin---author:GHT ---date:20260609 for【驳回回退】targetStage 按 containsKey 解析字典键值(含 0-----------
/**
* 解析驳回回退目标:取动作配置中「回退目标」下拉所选的字典 item_value原样写入业务表 status。
*/
public static String resolveTargetStage(MesXslIntegrationAction action) {
if (action != null && oConvertUtils.isNotEmpty(action.getActionConfig())) {
try {
JSONObject cfg = JSONObject.parseObject(action.getActionConfig());
if (oConvertUtils.isNotEmpty(cfg.getString("targetStage"))) {
return cfg.getString("targetStage").trim();
if (cfg.containsKey("targetStage")) {
String v = cfg.getString("targetStage");
return oConvertUtils.isEmpty(v) ? null : v.trim();
}
JSONObject registryStage = cfg.getJSONObject("registryStage");
if (registryStage != null && oConvertUtils.isNotEmpty(registryStage.getString("targetStage"))) {
return registryStage.getString("targetStage").trim();
if (registryStage != null && registryStage.containsKey("targetStage")) {
String v = registryStage.getString("targetStage");
return oConvertUtils.isEmpty(v) ? null : v.trim();
}
} catch (Exception ignored) {
// fallback compile
// fallback null
}
}
return "compile";
return null;
}
//update-end---author:GHT ---date:20260609 for【驳回回退】targetStage 按 containsKey 解析字典键值(含 0-----------
}

View File

@@ -66,6 +66,8 @@ public class IntegrationOrchestrator {
private JdbcTemplate jdbcTemplate;
@Autowired
private List<IIntegrationActionExecutor> executors;
@Autowired
private IntegrationRevertTargetResolver revertTargetResolver;
// ==================== 外部入口 ====================
@@ -398,7 +400,17 @@ public class IntegrationOrchestrator {
}
private String resolveRevertTargetStage(MesXslIntegrationAction action) {
return IntegrationActionConfigHelper.resolveTargetStage(action);
String target = IntegrationActionConfigHelper.resolveTargetStage(action);
if (oConvertUtils.isNotEmpty(target)) {
return target;
}
if (action != null && oConvertUtils.isNotEmpty(action.getPlanId())) {
MesXslIntegrationPlan plan = planService.getById(action.getPlanId());
if (plan != null && oConvertUtils.isNotEmpty(plan.getSourceTable())) {
return revertTargetResolver.resolveRevertTarget(plan.getSourceTable());
}
}
return "";
}
private String readSourceStatus(IntegrationContext ctx) {

View File

@@ -0,0 +1,180 @@
package org.jeecg.modules.xslmes.approval.integration.engine;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.util.oConvertUtils;
import org.jeecg.modules.xslmes.approval.integration.entity.MesXslBizDocRegistry;
import org.jeecg.modules.xslmes.approval.integration.entity.MesXslIntegrationAction;
import org.jeecg.modules.xslmes.approval.integration.entity.MesXslIntegrationPlan;
import org.jeecg.modules.xslmes.approval.integration.service.IMesXslBizDocRegistryService;
import org.jeecg.modules.xslmes.approval.integration.service.IMesXslIntegrationActionService;
import org.jeecg.modules.xslmes.approval.integration.service.IMesXslIntegrationPlanService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
/**
* 驳回回退目标解析:优先读取已发布 onReject 集成方案中的 REGISTRY_STAGE_REVERT 配置。
*/
@Slf4j
@Component
public class IntegrationRevertTargetResolver {
private static final Pattern DICT_IN_COMMENT = Pattern.compile("字典[:\\s]?([a-zA-Z][a-zA-Z0-9_]*)");
private static final Map<String, String> TABLE_STATUS_DICT_FALLBACK = Map.of(
"mes_xsl_mixer_ps_compile", "xslmes_mixer_ps_status",
"mes_xsl_formula_spec", "xslmes_formula_spec_status",
"mes_xsl_raw_material_entry", "xslmes_entry_status"
);
@Autowired
private IMesXslIntegrationPlanService planService;
@Autowired
private IMesXslIntegrationActionService actionService;
@Autowired
private IMesXslBizDocRegistryService registryService;
@Autowired
private JdbcTemplate jdbcTemplate;
//update-begin---author:GHT ---date:20260609 for【驳回回退】从已发布 onReject 集成方案解析回退目标-----------
/**
* 解析业务表驳回时应回退到的 status 值。
* 优先级:已发布 onReject 方案 REGISTRY_STAGE_REVERT.targetStage → 注册中心状态字典初始态 → compile。
*/
public String resolveRevertTarget(String sourceTable) {
if (oConvertUtils.isEmpty(sourceTable)) {
return "compile";
}
String fromPlan = resolveFromPublishedRejectPlan(sourceTable);
if (oConvertUtils.isNotEmpty(fromPlan)) {
return fromPlan;
}
String fromRegistry = resolveInitialStatusFromRegistry(sourceTable);
if (oConvertUtils.isNotEmpty(fromRegistry)) {
log.info("[集成引擎] 表 {} 未配置 onReject 回退目标,使用注册中心初始态={}", sourceTable, fromRegistry);
return fromRegistry;
}
log.warn("[集成引擎] 表 {} 未解析到回退目标,回退 compile", sourceTable);
return "compile";
}
private String resolveFromPublishedRejectPlan(String sourceTable) {
List<MesXslIntegrationPlan> plans = planService.lambdaQuery()
.eq(MesXslIntegrationPlan::getSourceTable, sourceTable)
.eq(MesXslIntegrationPlan::getTriggerPhase, "onReject")
.eq(MesXslIntegrationPlan::getStatus, "1")
.orderByAsc(MesXslIntegrationPlan::getCreateTime)
.list();
for (MesXslIntegrationPlan plan : plans) {
List<MesXslIntegrationAction> actions = actionService.listByPlanId(plan.getId());
for (MesXslIntegrationAction action : actions) {
if (!"REGISTRY_STAGE_REVERT".equals(action.getActionType())) {
continue;
}
String target = IntegrationActionConfigHelper.resolveTargetStage(action);
if (oConvertUtils.isNotEmpty(target)) {
return target;
}
}
}
return null;
}
private String resolveInitialStatusFromRegistry(String sourceTable) {
MesXslBizDocRegistry registry = registryService.findActiveByTableName(sourceTable);
if (registry == null) {
return null;
}
List<StatusDictItem> chain = loadStatusChain(registry);
if (chain.isEmpty()) {
return null;
}
List<String> enabledStages = orderedEnabledStages(registry.getEnabledStages());
return resolveInitialStatus(chain, enabledStages);
}
private List<String> orderedEnabledStages(String enabledStages) {
Set<String> enabled = ApprovalStageResolver.parseEnabledStages(enabledStages);
List<String> ordered = new ArrayList<>();
for (String key : new String[]{
ApprovalStageResolver.STAGE_PROOFREAD,
ApprovalStageResolver.STAGE_AUDIT,
ApprovalStageResolver.STAGE_APPROVE}) {
if (enabled.contains(key)) {
ordered.add(key);
}
}
return ordered;
}
private String resolveInitialStatus(List<StatusDictItem> chain, List<String> enabledStages) {
Set<String> enabledSet = new LinkedHashSet<>(enabledStages);
int firstStageIdx = -1;
for (int i = 0; i < chain.size(); i++) {
if (enabledSet.contains(chain.get(i).value)) {
firstStageIdx = i;
break;
}
}
if (firstStageIdx > 0) {
return chain.get(firstStageIdx - 1).value;
}
for (StatusDictItem item : chain) {
if (!enabledSet.contains(item.value)) {
return item.value;
}
}
return chain.get(0).value;
}
private List<StatusDictItem> loadStatusChain(MesXslBizDocRegistry registry) {
String dictCode = resolveStatusDictCode(registry);
if (oConvertUtils.isEmpty(dictCode)) {
return List.of();
}
List<Map<String, Object>> rows = jdbcTemplate.queryForList(
"SELECT item_value AS value, item_text AS label, sort_order AS sortOrder "
+ "FROM sys_dict_item WHERE dict_id=(SELECT id FROM sys_dict WHERE dict_code=?) "
+ "AND status=1 ORDER BY sort_order ASC, item_value ASC",
dictCode);
List<StatusDictItem> chain = new ArrayList<>();
for (Map<String, Object> row : rows) {
chain.add(new StatusDictItem(String.valueOf(row.get("value")), String.valueOf(row.get("label"))));
}
return chain;
}
private String resolveStatusDictCode(MesXslBizDocRegistry registry) {
String statusField = oConvertUtils.isEmpty(registry.getStatusField()) ? "status" : registry.getStatusField();
String table = registry.getTableName();
if (!table.matches("^[a-z][a-z0-9_]{0,63}$")) {
return TABLE_STATUS_DICT_FALLBACK.getOrDefault(table, null);
}
try {
List<String> comments = jdbcTemplate.queryForList(
"SELECT COLUMN_COMMENT FROM INFORMATION_SCHEMA.COLUMNS "
+ "WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME=? AND COLUMN_NAME=?",
String.class, table, statusField);
if (!comments.isEmpty()) {
Matcher m = DICT_IN_COMMENT.matcher(comments.get(0));
if (m.find()) {
return m.group(1);
}
}
} catch (Exception e) {
log.warn("[集成引擎] 读取状态字典注释失败 table={} field={}", table, statusField, e);
}
return TABLE_STATUS_DICT_FALLBACK.getOrDefault(table, null);
}
private record StatusDictItem(String value, String label) {
}
//update-end---author:GHT ---date:20260609 for【驳回回退】从已发布 onReject 集成方案解析回退目标-----------
}

View File

@@ -15,38 +15,6 @@ public final class RegistryStageFieldHelper {
return oConvertUtils.isEmpty(registry.getStatusField()) ? "status" : registry.getStatusField();
}
public static String byField(MesXslBizDocRegistry registry, String stage) {
if (registry == null || oConvertUtils.isEmpty(stage)) {
return null;
}
switch (stage) {
case ApprovalStageResolver.STAGE_PROOFREAD:
return registry.getProofreadByField();
case ApprovalStageResolver.STAGE_AUDIT:
return registry.getAuditByField();
case ApprovalStageResolver.STAGE_APPROVE:
return registry.getApproveByField();
default:
return null;
}
}
public static String timeField(MesXslBizDocRegistry registry, String stage) {
if (registry == null || oConvertUtils.isEmpty(stage)) {
return null;
}
switch (stage) {
case ApprovalStageResolver.STAGE_PROOFREAD:
return registry.getProofreadTimeField();
case ApprovalStageResolver.STAGE_AUDIT:
return registry.getAuditTimeField();
case ApprovalStageResolver.STAGE_APPROVE:
return registry.getApproveTimeField();
default:
return null;
}
}
/** 环节默认前置状态proofread←compile, audit←proofread, approve←audit */
public static String defaultExpectedFrom(String stage) {
switch (stage) {

View File

@@ -14,7 +14,7 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
/**
* 审批驳回回退:按注册中心配置将源单 status 回退并清空环节痕迹(默认回 compile
* 审批驳回回退:按集成方案 targetStage 将源单 status 回退并清空环节痕迹。
*/
@Slf4j
@Component
@@ -26,7 +26,6 @@ public class RegistryStageRevertExecutor implements IIntegrationActionExecutor {
private IApprovalTraceSyncService approvalTraceSyncService;
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public String supportActionType() {
return "REGISTRY_STAGE_REVERT";
@@ -46,34 +45,23 @@ public class RegistryStageRevertExecutor implements IIntegrationActionExecutor {
throw new IllegalStateException("业务表未在审批注册中心启用: " + bizTable);
}
//update-begin---author:GHT ---date:20260609 for【驳回回退】仅使用动作配置中「回退目标」所选字典键值-----------
String targetStage = IntegrationActionConfigHelper.resolveTargetStage(action);
if (oConvertUtils.isEmpty(targetStage)) {
throw new IllegalStateException(
"驳回回退动作未配置「回退目标」请在集成方案动作编辑器中选择状态字典项并保存actionConfig.targetStage");
}
//update-end---author:GHT ---date:20260609 for【驳回回退】仅使用动作配置中「回退目标」所选字典键值-----------
String statusField = RegistryStageFieldHelper.statusField(registry);
RegistryStageFieldHelper.assertIdentifier(statusField);
RegistryStageFieldHelper.assertIdentifier(bizTable);
StringBuilder sql = new StringBuilder("UPDATE `").append(bizTable).append("` SET `")
.append(statusField).append("`=?");
java.util.List<Object> params = new java.util.ArrayList<>();
params.add(targetStage);
clearField(sql, params, registry.getProofreadByField());
clearField(sql, params, registry.getProofreadTimeField());
clearField(sql, params, registry.getAuditByField());
clearField(sql, params, registry.getAuditTimeField());
clearField(sql, params, registry.getApproveByField());
clearField(sql, params, registry.getApproveTimeField());
if ("compile".equals(targetStage)) {
// 已全部清空
} else if ("proofread".equals(targetStage)) {
// 保留 proofread清空 audit/approve — 上面已全清需按目标环节保留简化compile 场景为主)
}
sql.append(" WHERE id=?");
params.add(bizId);
int affected = jdbcTemplate.update(sql.toString(), params.toArray());
//update-begin---author:GHT ---date:20260609 for【审批注册中心】回退只重置业务表状态操作人/时间由痕迹表承载-----------
int affected = jdbcTemplate.update(
"UPDATE `" + bizTable + "` SET `" + statusField + "`=? WHERE id=?",
targetStage, bizId);
//update-end---author:GHT ---date:20260609 for【审批注册中心】回退只重置业务表状态操作人/时间由痕迹表承载-----------
if (affected == 0) {
throw new IllegalStateException("源单不存在或回退失败 id=" + bizId);
}
@@ -82,14 +70,5 @@ public class RegistryStageRevertExecutor implements IIntegrationActionExecutor {
log.info("[集成引擎][REGISTRY_STAGE_REVERT] table={} id={} targetStage={}", bizTable, bizId, targetStage);
return "环节回退成功: " + targetStage;
}
private void clearField(StringBuilder sql, java.util.List<Object> params, String field) {
if (oConvertUtils.isEmpty(field)) {
return;
}
RegistryStageFieldHelper.assertIdentifier(field);
sql.append(", `").append(field).append("`=?");
params.add(null);
}
//update-end---author:GHT ---date:20260605 for【XSLMES-20260605-K8R2】审批注册中心环节回退执行器-----------
}

View File

@@ -56,17 +56,16 @@ public class RegistryStageSyncExecutor implements IIntegrationActionExecutor {
throw new IllegalStateException(stageErr);
}
//update-begin---author:GHT ---date:20260609 for【审批环节同步】审批环节仅写痕迹业务表状态由 statusAfter 控制-----------
String statusAfter = resolveStatusAfter(action, stage);
if (oConvertUtils.isEmpty(statusAfter)) {
throw new IllegalArgumentException("动作未配置通过后状态(statusAfter),且无法从审批环节推断");
}
//update-end---author:GHT ---date:20260609 for【审批环节同步】审批环节仅写痕迹业务表状态由 statusAfter 控制-----------
String expectedFrom = resolveExpectedFrom(action, stage);
String statusField = RegistryStageFieldHelper.statusField(registry);
String byField = RegistryStageFieldHelper.byField(registry, stage);
String timeField = RegistryStageFieldHelper.timeField(registry, stage);
RegistryStageFieldHelper.assertIdentifier(statusField);
if (oConvertUtils.isNotEmpty(byField)) {
RegistryStageFieldHelper.assertIdentifier(byField);
}
if (oConvertUtils.isNotEmpty(timeField)) {
RegistryStageFieldHelper.assertIdentifier(timeField);
}
String operator = resolveOperator(ctx);
//update-begin---author:GHT ---date:20260608 for【审批注册中心】环节同步使用实例tasks最新完成时间-----------
@@ -83,20 +82,13 @@ public class RegistryStageSyncExecutor implements IIntegrationActionExecutor {
}
}
//update-begin---author:GHT ---date:20260609 for【审批注册中心】业务表只写状态操作人/时间统一由痕迹表承载-----------
StringBuilder sql = new StringBuilder("UPDATE `").append(bizTable).append("` SET `")
.append(statusField).append("`=?");
.append(statusField).append("`=? WHERE id=?");
java.util.List<Object> params = new java.util.ArrayList<>();
params.add(stage);
if (oConvertUtils.isNotEmpty(byField)) {
sql.append(", `").append(byField).append("`=?");
params.add(operator);
}
if (oConvertUtils.isNotEmpty(timeField)) {
sql.append(", `").append(timeField).append("`=?");
params.add(now);
}
sql.append(" WHERE id=?");
params.add(statusAfter);
params.add(bizId);
//update-end---author:GHT ---date:20260609 for【审批注册中心】业务表只写状态操作人/时间统一由痕迹表承载-----------
int affected = jdbcTemplate.update(sql.toString(), params.toArray());
if (affected == 0) {
@@ -104,9 +96,9 @@ public class RegistryStageSyncExecutor implements IIntegrationActionExecutor {
}
approvalTraceSyncService.syncStage(bizTable, bizId, stage, operator, now);
log.info("[集成引擎][REGISTRY_STAGE_SYNC] table={} id={} stage={} operator={}",
bizTable, bizId, stage, operator);
return "环节同步成功: " + ApprovalStageResolver.stageLabel(stage);
log.info("[集成引擎][REGISTRY_STAGE_SYNC] table={} id={} stage={} statusAfter={} operator={}",
bizTable, bizId, stage, statusAfter, operator);
return "环节同步成功: " + ApprovalStageResolver.stageLabel(stage) + " → 状态=" + statusAfter;
}
private String resolveStage(IntegrationContext ctx, MesXslIntegrationAction action) {
@@ -121,6 +113,10 @@ public class RegistryStageSyncExecutor implements IIntegrationActionExecutor {
return IntegrationActionConfigHelper.resolveExpectedFrom(action, stage);
}
private String resolveStatusAfter(MesXslIntegrationAction action, String stage) {
return IntegrationActionConfigHelper.resolveStatusAfter(action, stage);
}
private String resolveOperator(IntegrationContext ctx) {
ApprovalCallbackContext ac = ctx.getApprovalCtx();
if (ac != null && oConvertUtils.isNotEmpty(ac.getOperatorName())) {

View File

@@ -47,27 +47,17 @@ public class MesXslBizDocRegistry extends JeecgEntity implements Serializable {
@TableField(updateStrategy = FieldStrategy.ALWAYS)
private String enabledStages;
@Schema(description = "业务状态字段名")
@Schema(description = "业务状态字段名,默认 status")
private String statusField;
@Schema(description = "校对人字段名")
private String proofreadByField;
@Schema(description = "校对时间字段名")
private String proofreadTimeField;
@Schema(description = "审核人字段名")
private String auditByField;
@Schema(description = "审核时间字段名")
private String auditTimeField;
@Schema(description = "批准人字段名")
private String approveByField;
@Schema(description = "批准时间字段名")
private String approveTimeField;
//update-end---author:GHT ---date:20260605 for【XSLMES-20260605-K8R2】审批环节与字段映射配置-----------
//update-begin---author:GHT ---date:20260609 for【审批注册中心】移除操作人字段配置操作人/时间统一由痕迹表承载,业务表只需 statusField-----------
// proofreadByField / proofreadTimeField / auditByField / auditTimeField / approveByField / approveTimeField 已移除
//update-end---author:GHT ---date:20260609 for【审批注册中心】移除操作人字段配置操作人/时间统一由痕迹表承载,业务表只需 statusField-----------
//update-begin---author:GHT ---date:20260608 for【XSLMES-20260608-TRACE】列表接口路径配置后自动注入审批痕迹字段-----------
@Schema(description = "列表接口路径(多个逗号分隔),配置后自动注入审批痕迹字段到响应")
private String listApiPath;
//update-end---author:GHT ---date:20260608 for【XSLMES-20260608-TRACE】列表接口路径配置后自动注入审批痕迹字段-----------
@Schema(description = "备注")
private String remark;

View File

@@ -20,7 +20,7 @@ public interface IApprovalTraceSyncService {
/**
* 逆向回退时同步清空高于目标环节的痕迹字段
*
* @param targetStage compile / proofread / audit
* @param targetStage 审批环节码(compile/proofread/audit)或业务 status 字典值(如 0
*/
void revertToStage(String bizTable, String bizDataId, String targetStage);
@@ -33,7 +33,7 @@ public interface IApprovalTraceSyncService {
//update-begin---author:GHT ---date:20260608 for【审批注册中心】拒绝/终止时清空源单与痕迹操作人-----------
/**
* 驳回/终止后回退到编制态:清空源单操作人/时间字段并清空痕迹明细
* 驳回/终止后按 onReject 集成方案回退目标重置业务表 status 并清空痕迹(兼容旧方法名)
*/
void revertToCompile(String bizTable, String bizDataId);
//update-end---author:GHT ---date:20260608 for【审批注册中心】拒绝/终止时清空源单与痕迹操作人-----------

View File

@@ -9,6 +9,7 @@ import org.jeecg.modules.xslmes.approval.integration.vo.DingProcessForecastVO;
import org.jeecg.modules.xslmes.approval.integration.vo.DingProcessInstanceFlowVO;
import java.util.List;
import java.util.Map;
/**
* 审批痕迹明细
@@ -20,6 +21,13 @@ public interface IMesXslApprovalTraceService extends IService<MesXslApprovalTrac
*/
MesXslApprovalTrace getByBiz(String bizTable, String bizDataId);
//update-begin---author:GHT ---date:20260608 for【XSLMES-20260608-TRACE】批量查询痕迹供响应增强器注入-----------
/**
* 按业务表 + 批量单据ID 查询痕迹,返回 bizDataId → trace 映射(供 ResponseBodyAdvice 批量注入)
*/
Map<String, MesXslApprovalTrace> batchQueryByBizIds(String bizTable, List<String> bizDataIds);
//update-end---author:GHT ---date:20260608 for【XSLMES-20260608-TRACE】批量查询痕迹供响应增强器注入-----------
//update-begin---author:GHT ---date:20260608 for【审批注册中心】明细列表补充钉钉审批实例ID-----------
/**
* 分页查询并补充钉钉审批实例ID

View File

@@ -8,7 +8,6 @@ import org.jeecg.common.api.vo.Result;
import org.jeecg.common.util.oConvertUtils;
import org.jeecg.modules.xslmes.approval.entity.MesXslApprovalFlow;
import org.jeecg.modules.xslmes.approval.integration.engine.ApprovalStageResolver;
import org.jeecg.modules.xslmes.approval.integration.engine.RegistryStageFieldHelper;
import org.jeecg.modules.xslmes.approval.integration.entity.MesXslBizDocRegistry;
import org.jeecg.modules.xslmes.approval.integration.entity.MesXslIntegrationAction;
import org.jeecg.modules.xslmes.approval.integration.entity.MesXslIntegrationPlan;
@@ -213,6 +212,10 @@ public class IntegrationPlanGenerator {
node.put("triggerPhase", phase);
node.put("expectedFrom", b.expectedFrom);
node.put("expectedFromLabel", oConvertUtils.isNotEmpty(b.expectedFrom) ? labelOf(statusChain, b.expectedFrom) : "-");
//update-begin---author:GHT ---date:20260609 for【审批环节同步】预览与生成增加通过后状态-----------
node.put("statusAfter", b.statusAfter);
node.put("statusAfterLabel", oConvertUtils.isNotEmpty(b.statusAfter) ? labelOf(statusChain, b.statusAfter) : "-");
//update-end---author:GHT ---date:20260609 for【审批环节同步】预览与生成增加通过后状态-----------
if (!b.stageConfigured && oConvertUtils.isNotEmpty(b.unconfiguredReason)) {
node.put("unconfiguredReason", b.unconfiguredReason);
}
@@ -229,6 +232,9 @@ public class IntegrationPlanGenerator {
actionConfig.put("visualType", "REGISTRY_STAGE_SYNC");
actionConfig.put("stage", b.stage);
actionConfig.put("expectedFrom", b.expectedFrom);
if (oConvertUtils.isNotEmpty(b.statusAfter)) {
actionConfig.put("statusAfter", b.statusAfter);
}
Map<String, Object> action = new LinkedHashMap<>();
action.put("actionName", b.stageLabel + "环节同步");
@@ -430,18 +436,33 @@ public class IntegrationPlanGenerator {
}
}
bindings.add(new StageBinding(
node.name, node.nodeId, stage, stageLabel, null, configured, unconfiguredReason, suggestedStage));
node.name, node.nodeId, stage, stageLabel, null, null, configured, unconfiguredReason, suggestedStage));
}
for (int i = 0; i < bindings.size(); i++) {
StageBinding b = bindings.get(i);
String expectedFrom = b.stageConfigured
? resolveExpectedFromForBinding(bindings, i, statusChain, initialStatus)
: null;
bindings.set(i, b.withExpectedFrom(expectedFrom));
String statusAfter = b.stageConfigured
? resolveStatusAfterForBinding(b, statusChain)
: null;
bindings.set(i, b.withExpectedFrom(expectedFrom).withStatusAfter(statusAfter));
}
return bindings;
}
//update-begin---author:GHT ---date:20260609 for【审批环节同步】推断通过后业务状态字典含环节码时自动填充-----------
private String resolveStatusAfterForBinding(StageBinding binding, List<StatusDictItem> statusChain) {
if (oConvertUtils.isEmpty(binding.stage)) {
return null;
}
if (indexOfValue(statusChain, binding.stage) >= 0) {
return binding.stage;
}
return null;
}
//update-end---author:GHT ---date:20260609 for【审批环节同步】推断通过后业务状态字典含环节码时自动填充-----------
private String resolveStageFromNode(FlowNode node, MesXslBizDocRegistry registry,
List<String> enabledStages, int nodeIndex) {
JSONObject props = node.props;
@@ -465,19 +486,9 @@ public class IntegrationPlanGenerator {
return null;
}
//update-begin---author:GHT ---date:20260609 for【审批注册中心】移除 byField 引用,操作人由痕迹表承载-----------
private String mapFieldToStage(MesXslBizDocRegistry registry, String fieldName) {
if (oConvertUtils.isEmpty(fieldName) || registry == null) {
return null;
}
if (fieldName.equals(registry.getProofreadByField())) {
return ApprovalStageResolver.STAGE_PROOFREAD;
}
if (fieldName.equals(registry.getAuditByField())) {
return ApprovalStageResolver.STAGE_AUDIT;
}
if (fieldName.equals(registry.getApproveByField())) {
return ApprovalStageResolver.STAGE_APPROVE;
}
// byField 已移除,节点 fieldName 不再映射环节,由 stageKey 或节点名称推断
return null;
}
@@ -486,10 +497,7 @@ public class IntegrationPlanGenerator {
return false;
}
Set<String> enabled = ApprovalStageResolver.parseEnabledStages(registry.getEnabledStages());
if (!enabled.contains(stage)) {
return false;
}
return oConvertUtils.isNotEmpty(RegistryStageFieldHelper.byField(registry, stage));
return enabled.contains(stage);
}
private String buildUnconfiguredReason(MesXslBizDocRegistry registry, String stage, List<String> enabledStages) {
@@ -500,11 +508,9 @@ public class IntegrationPlanGenerator {
if (!enabled.contains(stage)) {
return "环节「" + ApprovalStageResolver.stageLabel(stage) + "」未在注册中心启用";
}
if (oConvertUtils.isEmpty(RegistryStageFieldHelper.byField(registry, stage))) {
return "环节「" + ApprovalStageResolver.stageLabel(stage) + "」未配置操作人字段";
}
return "环节未完整配置";
}
//update-end---author:GHT ---date:20260609 for【审批注册中心】移除 byField 引用,操作人由痕迹表承载-----------
private String resolveExpectedFromForBinding(List<StageBinding> bindings, int index,
List<StatusDictItem> statusChain, String initialStatus) {
@@ -648,10 +654,16 @@ public class IntegrationPlanGenerator {
}
private record StageBinding(String nodeName, String nodeId, String stage, String stageLabel,
String expectedFrom, boolean stageConfigured, String unconfiguredReason,
String suggestedStage) {
String expectedFrom, String statusAfter, boolean stageConfigured,
String unconfiguredReason, String suggestedStage) {
StageBinding withExpectedFrom(String expectedFrom) {
return new StageBinding(nodeName, nodeId, stage, stageLabel, expectedFrom, stageConfigured, unconfiguredReason, suggestedStage);
return new StageBinding(nodeName, nodeId, stage, stageLabel, expectedFrom, statusAfter,
stageConfigured, unconfiguredReason, suggestedStage);
}
StageBinding withStatusAfter(String statusAfter) {
return new StageBinding(nodeName, nodeId, stage, stageLabel, expectedFrom, statusAfter,
stageConfigured, unconfiguredReason, suggestedStage);
}
}
}

View File

@@ -13,6 +13,7 @@ import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.util.oConvertUtils;
import org.jeecg.modules.xslmes.approval.integration.engine.ApprovalInstanceStageExtractor;
import org.jeecg.modules.xslmes.approval.integration.engine.ApprovalInstanceStageExtractor.StageCompletion;
import org.jeecg.modules.xslmes.approval.integration.engine.IntegrationRevertTargetResolver;
import org.jeecg.modules.xslmes.approval.integration.engine.RegistryStageFieldHelper;
import org.jeecg.modules.xslmes.approval.integration.entity.MesXslApprovalTrace;
import org.jeecg.modules.xslmes.approval.integration.entity.MesXslBizDocRegistry;
@@ -54,6 +55,9 @@ public class ApprovalTraceSyncServiceImpl implements IApprovalTraceSyncService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private IntegrationRevertTargetResolver revertTargetResolver;
@Override
public String checkStageAllowed(String bizTable, String stage) {
MesXslBizDocRegistry registry = findActiveRegistry(bizTable);
@@ -109,7 +113,8 @@ public class ApprovalTraceSyncServiceImpl implements IApprovalTraceSyncService {
}
LambdaUpdateWrapper<MesXslApprovalTrace> wrapper = new LambdaUpdateWrapper<>();
wrapper.eq(MesXslApprovalTrace::getId, trace.getId());
if ("compile".equals(targetStage)) {
//update-begin---author:GHT ---date:20260609 for【驳回回退】业务字典回退目标如 0/待处理)清空全部环节痕迹-----------
if (isFullTraceClearTarget(targetStage)) {
wrapper.set(MesXslApprovalTrace::getProofreadBy, null)
.set(MesXslApprovalTrace::getProofreadTime, null)
.set(MesXslApprovalTrace::getAuditBy, null)
@@ -127,6 +132,7 @@ public class ApprovalTraceSyncServiceImpl implements IApprovalTraceSyncService {
} else {
return;
}
//update-end---author:GHT ---date:20260609 for【驳回回退】业务字典回退目标如 0/待处理)清空全部环节痕迹-----------
traceMapper.update(null, wrapper);
}
@@ -178,76 +184,31 @@ public class ApprovalTraceSyncServiceImpl implements IApprovalTraceSyncService {
completions.stream().map(StageCompletion::getStage).reduce((a, b) -> a + "," + b).orElse(""));
}
private void updateBizStageFields(MesXslBizDocRegistry registry, String bizTable, String bizDataId, StageCompletion completion) {
String stage = completion.getStage();
String statusField = RegistryStageFieldHelper.statusField(registry);
String byField = RegistryStageFieldHelper.byField(registry, stage);
String timeField = RegistryStageFieldHelper.timeField(registry, stage);
RegistryStageFieldHelper.assertIdentifier(statusField);
if (oConvertUtils.isNotEmpty(byField)) {
RegistryStageFieldHelper.assertIdentifier(byField);
}
if (oConvertUtils.isNotEmpty(timeField)) {
RegistryStageFieldHelper.assertIdentifier(timeField);
}
StringBuilder sql = new StringBuilder("UPDATE `").append(bizTable).append("` SET `")
.append(statusField).append("`=?");
List<Object> params = new ArrayList<>();
params.add(stage);
if (oConvertUtils.isNotEmpty(byField)) {
sql.append(", `").append(byField).append("`=?");
params.add(completion.getOperatorBy());
}
if (oConvertUtils.isNotEmpty(timeField)) {
sql.append(", `").append(timeField).append("`=?");
params.add(completion.getOperatorTime());
}
sql.append(" WHERE id=?");
params.add(bizDataId);
jdbcTemplate.update(sql.toString(), params.toArray());
}
//update-end---author:GHT ---date:20260608 for【审批注册中心】按实例tasks反写审批痕迹明细-----------
//update-begin---author:GHT ---date:20260608 for【审批注册中心】拒绝/终止时清空源单与痕迹操作人-----------
//update-begin---author:GHT ---date:20260609 for【审批注册中心】拒绝/终止只重置业务表状态,操作人/时间由痕迹表承载-----------
@Override
@Transactional(rollbackFor = Exception.class)
public void revertToCompile(String bizTable, String bizDataId) {
if (oConvertUtils.isEmpty(bizTable) || oConvertUtils.isEmpty(bizDataId)) {
return;
}
//update-begin---author:GHT ---date:20260609 for【驳回回退】补偿回退读取 onReject 集成方案 targetStage不写死 compile-----------
String targetStage = revertTargetResolver.resolveRevertTarget(bizTable);
MesXslBizDocRegistry registry = findActiveRegistry(bizTable);
if (registry == null) {
revertToStage(bizTable, bizDataId, "compile");
return;
if (registry != null) {
String statusField = RegistryStageFieldHelper.statusField(registry);
RegistryStageFieldHelper.assertIdentifier(statusField);
RegistryStageFieldHelper.assertIdentifier(bizTable);
jdbcTemplate.update(
"UPDATE `" + bizTable + "` SET `" + statusField + "`=? WHERE id=?",
targetStage, bizDataId);
}
String statusField = RegistryStageFieldHelper.statusField(registry);
RegistryStageFieldHelper.assertIdentifier(statusField);
RegistryStageFieldHelper.assertIdentifier(bizTable);
StringBuilder sql = new StringBuilder("UPDATE `").append(bizTable).append("` SET `")
.append(statusField).append("`=?");
List<Object> params = new ArrayList<>();
params.add("compile");
appendClearField(sql, params, registry.getProofreadByField());
appendClearField(sql, params, registry.getProofreadTimeField());
appendClearField(sql, params, registry.getAuditByField());
appendClearField(sql, params, registry.getAuditTimeField());
appendClearField(sql, params, registry.getApproveByField());
appendClearField(sql, params, registry.getApproveTimeField());
sql.append(" WHERE id=?");
params.add(bizDataId);
jdbcTemplate.update(sql.toString(), params.toArray());
revertToStage(bizTable, bizDataId, "compile");
revertToStage(bizTable, bizDataId, targetStage);
log.info("[审批痕迹回退] table={} id={} targetStage={}", bizTable, bizDataId, targetStage);
//update-end---author:GHT ---date:20260609 for【驳回回退】补偿回退读取 onReject 集成方案 targetStage不写死 compile-----------
}
private void appendClearField(StringBuilder sql, List<Object> params, String field) {
if (oConvertUtils.isEmpty(field)) {
return;
}
RegistryStageFieldHelper.assertIdentifier(field);
sql.append(", `").append(field).append("`=?");
params.add(null);
}
//update-end---author:GHT ---date:20260608 for【审批注册中心】拒绝/终止时清空源单与痕迹操作人-----------
//update-end---author:GHT ---date:20260609 for【审批注册中心】拒绝/终止只重置业务表状态,操作人/时间由痕迹表承载-----------
private MesXslApprovalTrace findTraceByBiz(String bizTable, String bizDataId) {
if (oConvertUtils.isEmpty(bizTable) || oConvertUtils.isEmpty(bizDataId)) {
@@ -298,4 +259,15 @@ public class ApprovalTraceSyncServiceImpl implements IApprovalTraceSyncService {
return stage;
}
}
/** 回退到编制态或业务字典初始态时,清空全部审批环节痕迹 */
private boolean isFullTraceClearTarget(String targetStage) {
if (oConvertUtils.isEmpty(targetStage)) {
return true;
}
return "compile".equals(targetStage)
|| (!STAGE_PROOFREAD.equals(targetStage)
&& !STAGE_AUDIT.equals(targetStage)
&& !STAGE_APPROVE.equals(targetStage));
}
}

View File

@@ -33,12 +33,14 @@ import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
@@ -73,6 +75,28 @@ public class MesXslApprovalTraceServiceImpl extends ServiceImpl<MesXslApprovalTr
@Autowired
private ApprovalInstanceStageExtractor instanceStageExtractor;
//update-begin---author:GHT ---date:20260608 for【XSLMES-20260608-TRACE】批量查询痕迹供响应增强器注入-----------
@Override
public Map<String, MesXslApprovalTrace> batchQueryByBizIds(String bizTable, List<String> bizDataIds) {
if (oConvertUtils.isEmpty(bizTable) || bizDataIds == null || bizDataIds.isEmpty()) {
return Collections.emptyMap();
}
List<String> ids = bizDataIds.stream()
.filter(oConvertUtils::isNotEmpty)
.distinct()
.collect(Collectors.toList());
if (ids.isEmpty()) {
return Collections.emptyMap();
}
List<MesXslApprovalTrace> traces = lambdaQuery()
.eq(MesXslApprovalTrace::getBizTable, bizTable)
.in(MesXslApprovalTrace::getBizDataId, ids)
.list();
return traces.stream().collect(
Collectors.toMap(MesXslApprovalTrace::getBizDataId, Function.identity(), (a, b) -> a));
}
//update-end---author:GHT ---date:20260608 for【XSLMES-20260608-TRACE】批量查询痕迹供响应增强器注入-----------
@Override
public MesXslApprovalTrace getByBiz(String bizTable, String bizDataId) {
if (oConvertUtils.isEmpty(bizTable) || oConvertUtils.isEmpty(bizDataId)) {

View File

@@ -25,12 +25,6 @@ public class MesXslBizDocRegistryServiceImpl extends ServiceImpl<MesXslBizDocReg
}
entity.setEnabledStages(normalizeStages(entity.getEnabledStages()));
entity.setStatusField(defaultField(entity.getStatusField(), "status"));
entity.setProofreadByField(defaultField(entity.getProofreadByField(), "proofread_by"));
entity.setProofreadTimeField(defaultField(entity.getProofreadTimeField(), "proofread_time"));
entity.setAuditByField(defaultField(entity.getAuditByField(), "audit_by"));
entity.setAuditTimeField(defaultField(entity.getAuditTimeField(), "audit_time"));
entity.setApproveByField(defaultField(entity.getApproveByField(), "approve_by"));
entity.setApproveTimeField(defaultField(entity.getApproveTimeField(), "approve_time"));
}
private String normalizeStages(String stages) {

View File

@@ -0,0 +1,133 @@
package org.jeecg.modules.xslmes.dingtalk.callback.controller;
import java.util.Arrays;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.system.query.QueryGenerator;
import org.jeecg.modules.xslmes.dingtalk.callback.entity.MesXslDingCallbackLog;
import org.jeecg.modules.xslmes.dingtalk.callback.service.IMesXslDingCallbackLogService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.system.base.controller.JeecgController;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.ModelAndView;
import io.swagger.v3.oas.annotations.tags.Tag;
import io.swagger.v3.oas.annotations.Operation;
import org.jeecg.common.aspect.annotation.AutoLog;
import org.apache.shiro.authz.annotation.RequiresPermissions;
/**
* @Description: 钉钉回调日志
* @Author: jeecg-boot
* @Date: 2026-06-09
* @Version: V1.0
*/
@Tag(name = "钉钉回调日志")
@RestController
@RequestMapping("/xslmes/mesXslDingCallbackLog")
@Slf4j
public class MesXslDingCallbackLogController extends JeecgController<MesXslDingCallbackLog, IMesXslDingCallbackLogService> {
@Autowired
private IMesXslDingCallbackLogService mesXslDingCallbackLogService;
/**
* 分页列表查询
*/
@Operation(summary = "钉钉回调日志-分页列表查询")
@GetMapping(value = "/list")
public Result<IPage<MesXslDingCallbackLog>> queryPageList(MesXslDingCallbackLog mesXslDingCallbackLog,
@RequestParam(name = "pageNo", defaultValue = "1") Integer pageNo,
@RequestParam(name = "pageSize", defaultValue = "10") Integer pageSize,
HttpServletRequest req) {
QueryWrapper<MesXslDingCallbackLog> queryWrapper = QueryGenerator.initQueryWrapper(mesXslDingCallbackLog, req.getParameterMap());
Page<MesXslDingCallbackLog> page = new Page<>(pageNo, pageSize);
IPage<MesXslDingCallbackLog> pageList = mesXslDingCallbackLogService.page(page, queryWrapper);
return Result.OK(pageList);
}
/**
* 添加
*/
@AutoLog(value = "钉钉回调日志-添加")
@Operation(summary = "钉钉回调日志-添加")
@RequiresPermissions("xslmes:mes_xsl_ding_callback_log:add")
@PostMapping(value = "/add")
public Result<String> add(@RequestBody MesXslDingCallbackLog mesXslDingCallbackLog) {
mesXslDingCallbackLogService.save(mesXslDingCallbackLog);
return Result.OK("添加成功!");
}
/**
* 编辑
*/
@AutoLog(value = "钉钉回调日志-编辑")
@Operation(summary = "钉钉回调日志-编辑")
@RequiresPermissions("xslmes:mes_xsl_ding_callback_log:edit")
@RequestMapping(value = "/edit", method = {RequestMethod.PUT, RequestMethod.POST})
public Result<String> edit(@RequestBody MesXslDingCallbackLog mesXslDingCallbackLog) {
mesXslDingCallbackLogService.updateById(mesXslDingCallbackLog);
return Result.OK("编辑成功!");
}
/**
* 通过id删除
*/
@AutoLog(value = "钉钉回调日志-通过id删除")
@Operation(summary = "钉钉回调日志-通过id删除")
@RequiresPermissions("xslmes:mes_xsl_ding_callback_log:delete")
@DeleteMapping(value = "/delete")
public Result<String> delete(@RequestParam(name = "id", required = true) String id) {
mesXslDingCallbackLogService.removeById(id);
return Result.OK("删除成功!");
}
/**
* 批量删除
*/
@AutoLog(value = "钉钉回调日志-批量删除")
@Operation(summary = "钉钉回调日志-批量删除")
@RequiresPermissions("xslmes:mes_xsl_ding_callback_log:deleteBatch")
@DeleteMapping(value = "/deleteBatch")
public Result<String> deleteBatch(@RequestParam(name = "ids", required = true) String ids) {
this.mesXslDingCallbackLogService.removeByIds(Arrays.asList(ids.split(",")));
return Result.OK("批量删除成功!");
}
/**
* 通过id查询
*/
@Operation(summary = "钉钉回调日志-通过id查询")
@GetMapping(value = "/queryById")
public Result<MesXslDingCallbackLog> queryById(@RequestParam(name = "id", required = true) String id) {
MesXslDingCallbackLog mesXslDingCallbackLog = mesXslDingCallbackLogService.getById(id);
if (mesXslDingCallbackLog == null) {
return Result.error("未找到对应数据");
}
return Result.OK(mesXslDingCallbackLog);
}
/**
* 导出excel
*/
@RequiresPermissions("xslmes:mes_xsl_ding_callback_log:exportXls")
@RequestMapping(value = "/exportXls")
public ModelAndView exportXls(HttpServletRequest request, MesXslDingCallbackLog mesXslDingCallbackLog) {
return super.exportXls(request, mesXslDingCallbackLog, MesXslDingCallbackLog.class, "钉钉回调日志");
}
/**
* 通过excel导入数据
*/
@RequiresPermissions("xslmes:mes_xsl_ding_callback_log:importExcel")
@RequestMapping(value = "/importExcel", method = RequestMethod.POST)
public Result<?> importExcel(HttpServletRequest request, HttpServletResponse response) {
return super.importExcel(request, response, MesXslDingCallbackLog.class);
}
}

View File

@@ -0,0 +1,111 @@
package org.jeecg.modules.xslmes.dingtalk.callback.entity;
import java.io.Serializable;
import java.util.Date;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableLogic;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import com.fasterxml.jackson.annotation.JsonFormat;
import org.springframework.format.annotation.DateTimeFormat;
import org.jeecgframework.poi.excel.annotation.Excel;
import org.jeecg.common.aspect.annotation.Dict;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
/**
* @Description: 钉钉回调日志
* @Author: jeecg-boot
* @Date: 2026-06-09
* @Version: V1.0
*/
@Data
@TableName("mes_xsl_ding_callback_log")
@Accessors(chain = true)
@EqualsAndHashCode(callSuper = false)
@Schema(description = "钉钉回调日志")
public class MesXslDingCallbackLog implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(type = IdType.ASSIGN_ID)
@Schema(description = "主键")
private String id;
@Excel(name = "钉钉事件ID", width = 20)
@Schema(description = "钉钉事件ID")
private String eventId;
@Excel(name = "事件类型", width = 25)
@Schema(description = "事件类型(bpms_instance_change/bpms_task_change)")
private String eventType;
@Excel(name = "审批实例ID", width = 25)
@Schema(description = "审批实例ID")
private String processInstanceId;
@Excel(name = "原始推送数据", width = 50)
@Schema(description = "原始推送数据JSON")
private String rawData;
@Excel(name = "接收时间", width = 20, format = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@Schema(description = "接收时间")
private Date receivedTime;
@Excel(name = "是否已处理", width = 10, dicCode = "yn")
@Dict(dicCode = "yn")
@Schema(description = "是否已处理集成方案(0否1是)")
private Integer processed;
@Excel(name = "处理备注", width = 40)
@Schema(description = "处理备注")
private String processRemark;
@Excel(name = "关联业务表", width = 20)
@Schema(description = "关联业务表")
private String bizTable;
@Excel(name = "关联业务记录ID", width = 20)
@Schema(description = "关联业务记录ID")
private String bizDataId;
@Excel(name = "关联审批台账ID", width = 20)
@Schema(description = "关联审批台账ID")
private String recordId;
/**创建人*/
@Schema(description = "创建人")
private String createBy;
/**创建日期*/
@JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@Schema(description = "创建日期")
private Date createTime;
/**更新人*/
@Schema(description = "更新人")
private String updateBy;
/**更新日期*/
@JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@Schema(description = "更新日期")
private Date updateTime;
/**逻辑删除*/
@TableLogic
@Schema(description = "逻辑删除 0正常 1删除")
private Integer delFlag;
/**租户ID*/
@Schema(description = "租户ID")
private Integer tenantId;
/**所属部门*/
@Schema(description = "所属部门")
private String sysOrgCode;
}

View File

@@ -0,0 +1,13 @@
package org.jeecg.modules.xslmes.dingtalk.callback.mapper;
import org.jeecg.modules.xslmes.dingtalk.callback.entity.MesXslDingCallbackLog;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* @Description: 钉钉回调日志
* @Author: jeecg-boot
* @Date: 2026-06-09
* @Version: V1.0
*/
public interface MesXslDingCallbackLogMapper extends BaseMapper<MesXslDingCallbackLog> {
}

View File

@@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.jeecg.modules.xslmes.dingtalk.callback.mapper.MesXslDingCallbackLogMapper">
</mapper>

View File

@@ -0,0 +1,13 @@
package org.jeecg.modules.xslmes.dingtalk.callback.service;
import org.jeecg.modules.xslmes.dingtalk.callback.entity.MesXslDingCallbackLog;
import com.baomidou.mybatisplus.extension.service.IService;
/**
* @Description: 钉钉回调日志
* @Author: jeecg-boot
* @Date: 2026-06-09
* @Version: V1.0
*/
public interface IMesXslDingCallbackLogService extends IService<MesXslDingCallbackLog> {
}

View File

@@ -0,0 +1,17 @@
package org.jeecg.modules.xslmes.dingtalk.callback.service.impl;
import org.jeecg.modules.xslmes.dingtalk.callback.entity.MesXslDingCallbackLog;
import org.jeecg.modules.xslmes.dingtalk.callback.mapper.MesXslDingCallbackLogMapper;
import org.jeecg.modules.xslmes.dingtalk.callback.service.IMesXslDingCallbackLogService;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
/**
* @Description: 钉钉回调日志
* @Author: jeecg-boot
* @Date: 2026-06-09
* @Version: V1.0
*/
@Service
public class MesXslDingCallbackLogServiceImpl extends ServiceImpl<MesXslDingCallbackLogMapper, MesXslDingCallbackLog> implements IMesXslDingCallbackLogService {
}

View File

@@ -0,0 +1,194 @@
package org.jeecg.modules.xslmes.dingtalk.stream;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.util.oConvertUtils;
import org.jeecg.modules.xslmes.approval.constant.ApprovalRecordConstants;
import org.jeecg.modules.xslmes.approval.entity.MesXslApprovalRecord;
import org.jeecg.modules.xslmes.approval.service.IMesXslApprovalRecordService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
/**
* 钉钉审批回调补偿扫描器。
* <p>
* Stream 模式仅保证"连接期间"的事件推送。服务重启、网络闪断或钉钉侧推送失败
* 均可能导致事件静默丢失,造成审批台账长期停留在 RUNNING 状态。
* <p>
* 本定时任务每 {@link #SWEEP_INTERVAL_MS} 毫秒扫描一次 RUNNING 的钉钉台账,
* 主动调用钉钉 API 拉取最新实例状态:
* <ul>
* <li>钉钉已终态COMPLETED/TERMINATED→ 构造合成事件调用 {@link DingBpmsEventProcessor#onInstanceChange}</li>
* <li>钉钉仍 RUNNING 但中间节点已同意、MES 集成未执行 → 调用 {@link DingBpmsEventProcessor#reconcileIntermediateNodes}</li>
* </ul>
* 处理器内部已有幂等保护,重复调用安全。
*
* @author GHT
* @date 2026-06-09 for【钉钉Stream补偿扫描】漏推回调自动修复
*/
@Slf4j
@Component
public class DingApprovalReconcileScheduler {
private static final String LOG_TAG = DingTalkStreamSdkRunner.LOG_TAG;
/** 每次扫描最多处理的台账条数(避免单次扫描耗时过长 + DingTalk API 限速) */
private static final int MAX_RECORDS_PER_SWEEP = 30;
/** 两次扫描之间的间隔毫秒fixedDelay 保证上次扫描完成后再计时 */
private static final long SWEEP_INTERVAL_MS = 3 * 60 * 1000L;
/**
* 发起审批后的最短等待时间(毫秒),防止与 Stream 事件正常到达竞争。
* 5 分钟内刚发起的审批不扫描,给 Stream 事件足够的到达时间。
*/
private static final long MIN_AGE_MS = 5 * 60 * 1000L;
/** DingTalk API 调用之间的间隔(毫秒),避免触发速率限制 */
private static final long API_CALL_INTERVAL_MS = 300L;
@Autowired
private IMesXslApprovalRecordService approvalRecordService;
@Autowired
private DingTalkWorkflowService workflowService;
@Autowired
private DingBpmsEventProcessor eventProcessor;
//update-begin---author:GHT ---date:20260609 for【钉钉Stream补偿扫描】漏推回调自动修复-----
@Scheduled(fixedDelay = SWEEP_INTERVAL_MS)
public void reconcile() {
long sweepStart = System.currentTimeMillis();
Date cutoff = new Date(sweepStart - MIN_AGE_MS);
List<MesXslApprovalRecord> pendingRecords = approvalRecordService.list(
new LambdaQueryWrapper<MesXslApprovalRecord>()
.eq(MesXslApprovalRecord::getStatus, ApprovalRecordConstants.STATUS_RUNNING)
.eq(MesXslApprovalRecord::getChannel, ApprovalRecordConstants.CHANNEL_DINGTALK)
.isNotNull(MesXslApprovalRecord::getExternalInstanceId)
.ne(MesXslApprovalRecord::getExternalInstanceId, "")
.lt(MesXslApprovalRecord::getApplyTime, cutoff)
.orderByAsc(MesXslApprovalRecord::getApplyTime)
.last("LIMIT " + MAX_RECORDS_PER_SWEEP));
if (pendingRecords.isEmpty()) {
log.debug("{} 补偿扫描:无待检查台账", LOG_TAG);
return;
}
log.info("{} 补偿扫描开始,待检台账数={}", LOG_TAG, pendingRecords.size());
int compensated = 0;
int nodeCompensated = 0;
int skipped = 0;
int failed = 0;
for (MesXslApprovalRecord record : pendingRecords) {
String instanceId = record.getExternalInstanceId();
try {
JSONObject instance = workflowService.getProcessInstance(instanceId);
if (instance == null) {
log.warn("{} 补偿扫描:拉取实例失败 instanceId={} recordId={}",
LOG_TAG, instanceId, record.getId());
failed++;
continue;
}
JSONObject syntheticEvent = buildSyntheticEvent(instance, instanceId);
if (syntheticEvent != null) {
log.info("{} 补偿扫描:检测到漏推终态事件,触发补偿 instanceId={} dingStatus={} dingResult={}",
LOG_TAG, instanceId,
instance.getString("status"), instance.getString("result"));
eventProcessor.onInstanceChange(syntheticEvent);
compensated++;
} else if ("RUNNING".equalsIgnoreCase(instance.getString("status"))) {
//update-begin---author:GHT ---date:20260609 for【钉钉Stream补偿扫描】RUNNING态补中间节点集成-----------
int nodes = eventProcessor.reconcileIntermediateNodes(record, instance);
if (nodes > 0) {
nodeCompensated += nodes;
log.info("{} 补偿扫描:中间节点已补偿 instanceId={} recordId={} nodes={}",
LOG_TAG, instanceId, record.getId(), nodes);
} else {
log.debug("{} 补偿扫描RUNNING且无待补中间节点 instanceId={}", LOG_TAG, instanceId);
skipped++;
}
//update-end---author:GHT ---date:20260609 for【钉钉Stream补偿扫描】RUNNING态补中间节点集成-----------
} else {
log.debug("{} 补偿扫描非RUNNING且无法映射终态跳过 instanceId={} dingStatus={}",
LOG_TAG, instanceId, instance.getString("status"));
skipped++;
}
} catch (Exception e) {
log.error("{} 补偿扫描:处理异常 instanceId={} recordId={}: {}",
LOG_TAG, instanceId, record.getId(), e.getMessage(), e);
failed++;
}
// 避免连续 API 调用触发 DingTalk 限速
sleepQuietly(API_CALL_INTERVAL_MS);
}
log.info("{} 补偿扫描完成 总数={} 终态补偿={} 中间节点补偿={} 仍跳过={} 失败={} costMs={}",
LOG_TAG, pendingRecords.size(), compensated, nodeCompensated, skipped, failed,
System.currentTimeMillis() - sweepStart);
}
/**
* 将钉钉实例状态映射为 {@code onInstanceChange} 所需的合成事件。
* <ul>
* <li>COMPLETED + agree → type=finish, result=agree</li>
* <li>COMPLETED + refuse → type=finish, result=refuse</li>
* <li>TERMINATED/CANCELED → type=terminate</li>
* <li>RUNNING 或未知 → null不补偿</li>
* </ul>
*/
private JSONObject buildSyntheticEvent(JSONObject instance, String processInstanceId) {
String dingStatus = instance.getString("status");
String dingResult = instance.getString("result");
if ("RUNNING".equalsIgnoreCase(dingStatus)) {
return null;
}
JSONObject event = new JSONObject();
event.put("processInstanceId", processInstanceId);
if ("COMPLETED".equalsIgnoreCase(dingStatus)) {
if ("agree".equalsIgnoreCase(dingResult)) {
event.put("type", "finish");
event.put("result", "agree");
} else if ("refuse".equalsIgnoreCase(dingResult)) {
event.put("type", "finish");
event.put("result", "refuse");
} else {
// redirect/转交等非终态结果跳过onInstanceChange 内部会跳过这类result
event.put("type", "finish");
event.put("result", oConvertUtils.isEmpty(dingResult) ? "unknown" : dingResult);
}
return event;
}
if ("TERMINATED".equalsIgnoreCase(dingStatus) || "CANCELED".equalsIgnoreCase(dingStatus)) {
event.put("type", "terminate");
return event;
}
log.info("{} 补偿扫描:未知钉钉状态 dingStatus={} instanceId={}", LOG_TAG, dingStatus, processInstanceId);
return null;
}
private void sleepQuietly(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
//update-end---author:GHT ---date:20260609 for【钉钉Stream补偿扫描】漏推回调自动修复-----
}

View File

@@ -14,7 +14,11 @@ import org.jeecg.modules.xslmes.approval.entity.MesXslApprovalRecord;
import org.jeecg.modules.xslmes.approval.service.IMesXslApprovalFlowService;
import org.jeecg.modules.xslmes.approval.service.IMesXslApprovalGateService;
import org.jeecg.modules.xslmes.approval.integration.engine.ApprovalInstanceStageExtractor;
import org.jeecg.modules.xslmes.approval.integration.engine.ApprovalInstanceStageExtractor.NodePair;
import org.jeecg.modules.xslmes.approval.integration.engine.ApprovalInstanceStageExtractor.NodeTaskDecision;
import org.jeecg.modules.xslmes.approval.integration.engine.ApprovalInstanceStageExtractor.StageCompletion;
import org.jeecg.modules.xslmes.approval.integration.entity.MesXslIntegrationLog;
import org.jeecg.modules.xslmes.approval.integration.service.IMesXslIntegrationLogService;
import org.jeecg.modules.xslmes.approval.service.IMesXslApprovalRecordService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
@@ -63,6 +67,12 @@ public class DingBpmsEventProcessor {
@Autowired
private ApprovalInstanceStageExtractor instanceStageExtractor;
@Autowired
private DingStreamCallbackLogHelper callbackLogHelper;
@Autowired
private IMesXslIntegrationLogService integrationLogService;
// ==================== bpms_instance_change ====================
//update-begin---author:GHT ---date:20260604 for【钉钉Stream回调】拉取实例详情后精准执行节点回调-----
@@ -112,20 +122,28 @@ public class DingBpmsEventProcessor {
log.info("{} bpms_instance_change 映射终态 instanceId={} mesStatus={} remark={}",
LOG_TAG, processInstanceId, status, remark);
//update-begin---author:GHT ---date:2026-06-04 for20260604】钉钉回调幂等去重finishByExternalInstance条件为status=RUNNING0行更新即终态已处理-----
//update-begin---author:GHT ---date:20260609 for驳回回退】台账已是终态时仍补偿触发业务回调,避免集成未执行-----------
try {
boolean updated = approvalGateService.finishByExternalInstance(
ApprovalRecordConstants.CHANNEL_DINGTALK, processInstanceId, status, remark);
if (!updated) {
log.info("{} bpms_instance_change 跳过:台账已是终态(重复事件) instanceId={}", LOG_TAG, processInstanceId);
return;
if (updated) {
log.info("{} 台账已更新 instanceId={} -> status={}", LOG_TAG, processInstanceId, status);
} else {
MesXslApprovalRecord existing = findRecord(processInstanceId);
if (existing != null && status.equals(existing.getStatus())) {
log.info("{} bpms_instance_change 台账已是终态({}),补偿触发业务/集成回调 instanceId={}",
LOG_TAG, status, processInstanceId);
} else {
log.info("{} bpms_instance_change 跳过:台账已是终态(状态不一致或重复事件) instanceId={}",
LOG_TAG, processInstanceId);
return;
}
}
log.info("{} 台账已更新 instanceId={} -> status={}", LOG_TAG, processInstanceId, status);
} catch (Exception e) {
log.error("{} 台账更新失败 instanceId={}: {}", LOG_TAG, processInstanceId, e.getMessage(), e);
return;
}
//update-end---author:GHT ---date:2026-06-04 for20260604】钉钉回调幂等去重finishByExternalInstance条件为status=RUNNING0行更新即终态已处理-----
//update-end---author:GHT ---date:20260609 for驳回回退】台账已是终态时仍补偿触发业务/集成回调,避免集成未执行-----------
//update-begin---author:GHT ---date:2026-06-08 for【风险修复-R5】TERMINATED时触发fireCancelled允许业务回滚中间态-----------
if (ApprovalRecordConstants.STATUS_CANCELLED.equals(status)) {
@@ -135,6 +153,9 @@ public class DingBpmsEventProcessor {
logCallbackDispatch("fireCancelled", cancelCtx);
try {
callbackDispatcher.fireCancelled(cancelCtx);
//update-begin---author:GHT ---date:20260609 for【钉钉回调日志】终止态回调已触发-----------
callbackLogHelper.markProcessed("审批终止,已触发 fireCancelled", cancelledRecord);
//update-end---author:GHT ---date:20260609 for【钉钉回调日志】终止态回调已触发-----------
} catch (Exception e) {
log.error("{} fireCancelled 失败 instanceId={}: {}", LOG_TAG, processInstanceId, e.getMessage(), e);
}
@@ -250,6 +271,13 @@ public class DingBpmsEventProcessor {
}
//update-end---author:GHT ---date:2026-06-08 for【缺陷修复-D2】用activityId替代mesNodes索引定位终态节点支持条件分支场景-----------
//update-begin---author:GHT ---date:20260609 for【钉钉回调日志】终态回调已触发-----------
String processRemark = ApprovalRecordConstants.STATUS_APPROVED.equals(status)
? "审批通过,已触发 fireNodeApproved/fireApproved"
: "审批驳回,已触发 fireRejected";
callbackLogHelper.markProcessed(processRemark, record);
//update-end---author:GHT ---date:20260609 for【钉钉回调日志】终态回调已触发-----------
log.info("{} bpms_instance_change 完成 instanceId={} bizTable={} bizDataId={} mesStatus={}",
LOG_TAG, processInstanceId, record.getBizTable(), record.getBizDataId(), status);
}
@@ -279,8 +307,19 @@ public class DingBpmsEventProcessor {
LOG_TAG, type, processInstanceId);
return;
}
//update-begin---author:GHT ---date:20260609 for【驳回回退】task_change拒绝时主动触发终态处理不依赖 instance_change-----------
if ("refuse".equals(result)) {
log.info("{} bpms_task_change 收到拒绝,转交 onInstanceChangeinstance_change 可能未推送) instanceId={}",
LOG_TAG, processInstanceId);
JSONObject instanceData = new JSONObject(data);
instanceData.put("type", "finish");
instanceData.put("result", "refuse");
onInstanceChange(instanceData);
return;
}
//update-end---author:GHT ---date:20260609 for【驳回回退】task_change拒绝时主动触发终态处理不依赖 instance_change-----------
if (!"agree".equals(result)) {
log.info("{} bpms_task_change 跳过result={} 非同意,refuse/redirect 由 bpms_instance_change 处理 instanceId={}",
log.info("{} bpms_task_change 跳过result={} 非同意redirect 由 bpms_instance_change 处理 instanceId={}",
LOG_TAG, result, processInstanceId);
return;
}
@@ -428,6 +467,9 @@ public class DingBpmsEventProcessor {
.setActivityId(activityId);
logCallbackDispatch("fireNodeApproved", ctx);
callbackDispatcher.fireNodeApproved(ctx);
//update-begin---author:GHT ---date:20260609 for【钉钉回调日志】节点通过回调已触发-----------
callbackLogHelper.markProcessed("节点审批通过,已触发 fireNodeApproved", record);
//update-end---author:GHT ---date:20260609 for【钉钉回调日志】节点通过回调已触发-----------
log.info("{} fireNodeApproved 成功 instanceId={} nodeName={}", LOG_TAG, processInstanceId, ctx.getNodeName());
} catch (Exception e) {
log.error("{} fireNodeApproved 失败 instanceId={} nodeName={}: {}",
@@ -440,6 +482,127 @@ public class DingBpmsEventProcessor {
}
//update-end---author:GHT ---date:20260604 for【钉钉Stream回调】节点通过时按operationRecords索引执行onNodeApprove-----
//update-begin---author:GHT ---date:20260609 for【钉钉Stream补偿扫描】RUNNING态补中间节点集成-----------
/**
* 补偿钉钉侧已同意、但 MES 集成未执行的中间节点(审批仍为 RUNNING 时由定时扫描调用)。
* <p>
* 以集成日志为准判断是否需要补偿,避免 processed_op_count 虚高导致漏补。
*
* @return 本次补偿触发的节点数
*/
public int reconcileIntermediateNodes(MesXslApprovalRecord record, JSONObject instance) {
if (record == null || instance == null) {
return 0;
}
if (!ApprovalRecordConstants.STATUS_RUNNING.equals(record.getStatus())) {
return 0;
}
MesXslApprovalFlow flow = loadFlow(record.getFlowId());
if (flow == null || oConvertUtils.isEmpty(flow.getFlowConfig())) {
return 0;
}
List<NodePair> pairs = instanceStageExtractor.alignMesNodesWithTasks(instance, flow.getFlowConfig());
if (pairs.isEmpty()) {
log.info("{} 中间节点补偿:无节点对齐 instanceId={} recordId={}",
LOG_TAG, record.getExternalInstanceId(), record.getId());
return 0;
}
int compensated = 0;
for (int nodeIndex = 0; nodeIndex < pairs.size(); nodeIndex++) {
NodePair pair = pairs.get(nodeIndex);
NodeTaskDecision decision = instanceStageExtractor.evaluateNodeTasks(
pair.getTaskList(), instanceStageExtractor.resolveApprovalMethod(pair.getMesNode()));
if (!decision.isAgreed()) {
continue;
}
String planId = resolveNodeIntegrationPlanId(pair.getMesNode());
if (oConvertUtils.isNotEmpty(planId) && hasIntegrationSuccess(record.getId(), planId)) {
continue;
}
if (oConvertUtils.isEmpty(planId)) {
approvalGateService.tryMarkNodeProcessed(record.getId(), nodeIndex);
continue;
}
JSONObject mesNode = pair.getMesNode();
String nodeName = mesNode != null ? mesNode.getString("name") : null;
log.info("{} 中间节点补偿:触发集成 instanceId={} recordId={} nodeIndex={} nodeName={} planId={}",
LOG_TAG, record.getExternalInstanceId(), record.getId(), nodeIndex, nodeName, planId);
try {
fireCompensatedNodeApproved(record, instance, pair, nodeIndex, decision);
compensated++;
} catch (Exception e) {
log.error("{} 中间节点补偿失败 instanceId={} nodeIndex={} nodeName={}: {}",
LOG_TAG, record.getExternalInstanceId(), nodeIndex, nodeName, e.getMessage(), e);
}
}
return compensated;
}
private void fireCompensatedNodeApproved(MesXslApprovalRecord record, JSONObject instance,
NodePair pair, int nodeIndex, NodeTaskDecision decision) {
JSONObject mesNode = pair.getMesNode();
String dtUserId = null;
if (decision.getActorUserIds() != null && !decision.getActorUserIds().isEmpty()) {
dtUserId = decision.getActorUserIds().get(decision.getActorUserIds().size() - 1);
}
String token = workflowService.generateTokenByDtUserId(dtUserId);
JSONObject nodeProps = mesNode != null ? mesNode.getJSONObject("props") : null;
String stageKey = nodeProps != null ? nodeProps.getString("stageKey") : null;
String actioner = "审批人";
StageCompletion completion = instanceStageExtractor.extractActivityCompletion(
instance, pair.getActivityId(), mesNode);
if (completion != null && oConvertUtils.isNotEmpty(completion.getOperatorBy())) {
actioner = completion.getOperatorBy();
}
ApprovalCallbackContext ctx = buildContext(record, "钉钉节点补偿(" + actioner + "", token)
.setOperatorName(actioner)
.setOperatorTime(decision.getOperatorTime())
.setNodeId(mesNode != null ? mesNode.getString("id") : null)
.setNodeName(mesNode != null ? mesNode.getString("name") : null)
.setStageKey(stageKey)
.setActivityId(pair.getActivityId());
logCallbackDispatch("fireNodeApproved (中间节点补偿)", ctx);
callbackDispatcher.fireNodeApproved(ctx);
approvalGateService.tryMarkNodeProcessed(record.getId(), nodeIndex);
log.info("{} 中间节点补偿完成 instanceId={} nodeIndex={} nodeName={}",
LOG_TAG, record.getExternalInstanceId(), nodeIndex, ctx.getNodeName());
}
private String resolveNodeIntegrationPlanId(JSONObject mesNode) {
if (mesNode == null) {
return null;
}
JSONObject props = mesNode.getJSONObject("props");
if (props == null) {
return null;
}
JSONObject plans = props.getJSONObject("integrationPlans");
if (plans == null) {
return null;
}
return plans.getString("onNodeApprove");
}
private boolean hasIntegrationSuccess(String recordId, String planId) {
if (oConvertUtils.isEmpty(recordId) || oConvertUtils.isEmpty(planId)) {
return false;
}
return integrationLogService.lambdaQuery()
.eq(MesXslIntegrationLog::getRecordId, recordId)
.eq(MesXslIntegrationLog::getPlanId, planId)
.eq(MesXslIntegrationLog::getStatus, "success")
.count() > 0;
}
//update-end---author:GHT ---date:20260609 for【钉钉Stream补偿扫描】RUNNING态补中间节点集成-----------
// ==================== 内部辅助 ====================
private MesXslApprovalRecord findRecord(String processInstanceId) {

View File

@@ -0,0 +1,152 @@
package org.jeecg.modules.xslmes.dingtalk.stream;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.util.oConvertUtils;
import org.jeecg.modules.xslmes.approval.entity.MesXslApprovalRecord;
import org.jeecg.modules.xslmes.dingtalk.callback.entity.MesXslDingCallbackLog;
import org.jeecg.modules.xslmes.dingtalk.callback.service.IMesXslDingCallbackLogService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 钉钉 Stream 回调日志落库辅助类。
* <p>
* 在 {@link DingTalkStreamSdkRunner} 入站时写入原始推送;在 {@link DingBpmsEventProcessor}
* 触发集成/业务回调后更新 processed 及关联业务字段。
*/
@Slf4j
@Component
public class DingStreamCallbackLogHelper {
private static final String LOG_TAG = DingTalkStreamSdkRunner.LOG_TAG;
@Autowired
private IMesXslDingCallbackLogService callbackLogService;
private final ThreadLocal<ProcessingContext> processingContext = new ThreadLocal<>();
//update-begin---author:GHT ---date:20260609 for【钉钉回调日志】Stream入站原始推送落库-----------
/**
* 记录 Stream 入站事件(所有事件类型均落库)。
*
* @return 日志主键,供后续更新处理结果
*/
public String recordInbound(String eventId, String eventType, JSONObject data) {
MesXslDingCallbackLog row = new MesXslDingCallbackLog();
row.setEventId(eventId);
row.setEventType(eventType);
row.setReceivedTime(new Date());
row.setProcessed(0);
if (data != null) {
row.setProcessInstanceId(data.getString("processInstanceId"));
row.setRawData(data.toJSONString());
}
try {
callbackLogService.save(row);
return row.getId();
} catch (Exception e) {
log.error("{} 回调日志落库失败 eventId={} eventType={}: {}",
LOG_TAG, eventId, eventType, e.getMessage(), e);
return null;
}
}
/**
* 绑定当前线程正在处理的日志 ID由 SdkRunner 在调用 Processor 前设置)。
*/
public void beginProcessing(String logId) {
if (oConvertUtils.isEmpty(logId)) {
processingContext.remove();
return;
}
processingContext.set(new ProcessingContext(logId));
}
/**
* 标记已触发集成/业务回调。
*/
public void markProcessed(String remark, MesXslApprovalRecord record) {
ProcessingContext ctx = processingContext.get();
if (ctx == null || ctx.marked) {
return;
}
ctx.marked = true;
updateLog(ctx.logId, 1, remark, record);
}
/**
* 标记明确跳过(非 BPMS、data 为空等 SdkRunner 层可判定场景)。
*/
public void markSkipped(String remark) {
ProcessingContext ctx = processingContext.get();
if (ctx == null) {
return;
}
if (!ctx.marked) {
ctx.marked = true;
updateLog(ctx.logId, 0, remark, null);
}
}
/**
* 处理结束:若 Processor 未显式标记,则记为「已接收但未触发集成处理」。
*/
public void finishProcessing() {
ProcessingContext ctx = processingContext.get();
if (ctx != null && !ctx.marked) {
updateLog(ctx.logId, 0, "已接收但未触发集成处理", null);
}
processingContext.remove();
}
/**
* 处理异常时更新备注。
*/
public void markError(String remark) {
ProcessingContext ctx = processingContext.get();
if (ctx != null) {
updateLog(ctx.logId, 0, remark, null);
}
processingContext.remove();
}
private void updateLog(String logId, int processed, String remark, MesXslApprovalRecord record) {
if (oConvertUtils.isEmpty(logId)) {
return;
}
try {
MesXslDingCallbackLog row = new MesXslDingCallbackLog();
row.setId(logId);
row.setProcessed(processed);
row.setProcessRemark(truncateRemark(remark));
if (record != null) {
row.setBizTable(record.getBizTable());
row.setBizDataId(record.getBizDataId());
row.setRecordId(record.getId());
}
callbackLogService.updateById(row);
} catch (Exception e) {
log.error("{} 回调日志更新失败 logId={}: {}", LOG_TAG, logId, e.getMessage(), e);
}
}
private static String truncateRemark(String remark) {
if (remark == null) {
return null;
}
return remark.length() <= 500 ? remark : remark.substring(0, 500);
}
private static final class ProcessingContext {
private final String logId;
private boolean marked;
private ProcessingContext(String logId) {
this.logId = logId;
}
}
//update-end---author:GHT ---date:20260609 for【钉钉回调日志】Stream入站原始推送落库-----------
}

View File

@@ -13,6 +13,9 @@ import org.springframework.stereotype.Component;
* 无需注册公网回调地址:应用主动建立长连接,钉钉通过该通道推送事件(如审批结果),
* 官方 SDK 内部自动维护重连与心跳。
* <p>
* 集群模式(默认开启):通过 {@link DingTalkStreamLeaderElection} Redis 选主,
* 仅 Leader 节点建连,避免多实例抢消息。
* <p>
* 启动时机:{@link SmartLifecycle}phase=MAX-100确保 Spring 上下文完全就绪后再建连。
* SDK 实际启动委托给 {@link DingTalkStreamSdkRunner},避免本类直接引用钉钉 SDK 类型。
*
@@ -23,7 +26,9 @@ import org.springframework.stereotype.Component;
@Component
public class DingTalkStreamClient implements SmartLifecycle {
private static final String LOG_TAG = DingTalkStreamSdkRunner.LOG_TAG;
private static final String SDK_RUNNER_CLASS = "org.jeecg.modules.xslmes.dingtalk.stream.DingTalkStreamSdkRunner";
private static final String SDK_CLIENT_CLASS = "com.dingtalk.open.app.api.OpenDingTalkClient";
@Autowired
private ThirdAppDingtalkServiceImpl dingtalkService;
@@ -31,7 +36,17 @@ public class DingTalkStreamClient implements SmartLifecycle {
@Autowired
private DingBpmsEventProcessor bpmsEventProcessor;
@Autowired
private DingStreamCallbackLogHelper callbackLogHelper;
@Autowired
private DingTalkStreamProperties streamProperties;
@Autowired
private DingTalkStreamLeaderElection leaderElection;
private volatile boolean running = false;
private volatile Object streamClientRef;
// ==================== SmartLifecycle ====================
@@ -45,7 +60,6 @@ public class DingTalkStreamClient implements SmartLifecycle {
@Override
public void start() {
running = true;
// 在后台线程初始化,避免阻塞 Spring 上下文刷新
Thread t = new Thread(this::initSdkClient, "ding-stream");
t.setDaemon(true);
t.start();
@@ -54,8 +68,11 @@ public class DingTalkStreamClient implements SmartLifecycle {
@Override
public void stop() {
running = false;
// SDK 内部使用 daemon 线程JVM 退出时自动终止
log.info("{} Stream 客户端已停止", DingTalkStreamSdkRunner.LOG_TAG);
stopStreamClient();
if (streamProperties.isClusterMode()) {
leaderElection.release();
}
log.info("{} Stream 客户端已停止", LOG_TAG);
}
@Override
@@ -66,27 +83,122 @@ public class DingTalkStreamClient implements SmartLifecycle {
// ==================== SDK 初始化(反射委托,避免 LiteFlow 扫描期加载钉钉类)====================
//update-begin---author:GHT ---date:2026-06-05 for【钉钉Stream回调】反射调用SDK启动器避免LiteFlow扫描触发DingTalkCredential加载失败-----
//update-begin---author:GHT ---date:20260609 for【钉钉Stream集群】Redis选主+心跳重连生命周期管理-----
private void initSdkClient() {
String[] creds = resolveCredentials();
if (creds == null) {
return;
}
if (!streamProperties.isClusterMode()) {
log.info("{} 单实例模式cluster-mode=false本节点直接建立 Stream 连接", LOG_TAG);
try {
startStreamClient(creds);
} catch (ClassNotFoundException e) {
log.warn("{} Stream SDK 未在 classpath 中dingtalk-stream连接未启动", LOG_TAG);
} catch (Exception e) {
log.error("{} SDK 启动失败: {}", LOG_TAG, e.getMessage(), e);
}
return;
}
if (!leaderElection.isRedisAvailable()) {
log.error("{} 集群模式已开启但 Redis 不可用Stream 未启动。"
+ "请检查 Redis 连接,或设置 jeecg.xslmes.dingtalk.stream.cluster-mode=false", LOG_TAG);
return;
}
log.info("{} 集群模式已开启,通过 Redis 选主建立 Stream 连接 instanceId={}",
LOG_TAG, leaderElection.instanceId());
boolean streamActive = false;
while (running) {
try {
if (streamActive) {
if (leaderElection.renew()) {
sleepQuietly(streamProperties.getLeaderRenewIntervalMs());
continue;
}
log.warn("{} Leader 锁续期失败,断开 Stream 并降级为 Follower instanceId={} currentLeader={}",
LOG_TAG, leaderElection.instanceId(), leaderElection.currentHolder());
stopStreamClient();
streamActive = false;
leaderElection.release();
sleepQuietly(streamProperties.getFollowerRetryIntervalMs());
continue;
}
if (leaderElection.tryAcquire()) {
startStreamClient(creds);
streamActive = true;
sleepQuietly(streamProperties.getLeaderRenewIntervalMs());
} else {
log.debug("{} 本节点为 Follower等待 Leader 释放锁 instanceId={} currentLeader={}",
LOG_TAG, leaderElection.instanceId(), leaderElection.currentHolder());
sleepQuietly(streamProperties.getFollowerRetryIntervalMs());
}
} catch (Exception e) {
log.error("{} Stream 集群生命周期异常: {}", LOG_TAG, e.getMessage(), e);
stopStreamClient();
streamActive = false;
leaderElection.release();
sleepQuietly(streamProperties.getFollowerRetryIntervalMs());
}
}
if (streamActive) {
stopStreamClient();
leaderElection.release();
}
}
private String[] resolveCredentials() {
try {
String[] creds = dingtalkService.getDingAppCredentials();
if (creds == null || oConvertUtils.isEmpty(creds[0]) || oConvertUtils.isEmpty(creds[1])) {
log.warn("{} AppKey/AppSecret 未配置Stream 连接未启动。"
+ "请在【系统配置-第三方应用】中完成钉钉应用配置后重启服务。", DingTalkStreamSdkRunner.LOG_TAG);
return;
+ "请在【系统配置-第三方应用】中完成钉钉应用配置后重启服务。", LOG_TAG);
return null;
}
Class<?> runnerClass = Class.forName(SDK_RUNNER_CLASS);
runnerClass
.getMethod("start", String.class, String.class, DingBpmsEventProcessor.class)
.invoke(null, creds[0], creds[1], bpmsEventProcessor);
} catch (ClassNotFoundException e) {
log.warn("{} Stream SDK 未在 classpath 中dingtalk-stream连接未启动。"
+ "请执行 Maven 刷新/重新编译后重试。", DingTalkStreamSdkRunner.LOG_TAG);
return creds;
} catch (Exception e) {
log.error("{} SDK 启动失败,请检查钉钉配置: {}", DingTalkStreamSdkRunner.LOG_TAG, e.getMessage(), e);
log.error("{} 读取钉钉凭证失败: {}", LOG_TAG, e.getMessage(), e);
return null;
}
}
//update-end---author:GHT ---date:2026-06-05 for【钉钉Stream回调】反射调用SDK启动器避免LiteFlow扫描触发DingTalkCredential加载失败-----
private void startStreamClient(String[] creds) throws Exception {
Class<?> runnerClass = Class.forName(SDK_RUNNER_CLASS);
Object client = runnerClass
.getMethod("start", String.class, String.class, DingBpmsEventProcessor.class,
DingStreamCallbackLogHelper.class)
.invoke(null, creds[0], creds[1], bpmsEventProcessor, callbackLogHelper);
streamClientRef = client;
}
private void stopStreamClient() {
if (streamClientRef == null) {
return;
}
try {
Class<?> runnerClass = Class.forName(SDK_RUNNER_CLASS);
Class<?> clientClass = Class.forName(SDK_CLIENT_CLASS);
runnerClass.getMethod("stop", clientClass).invoke(null, streamClientRef);
} catch (ClassNotFoundException e) {
log.warn("{} Stream SDK 未在 classpath 中,跳过断开", LOG_TAG);
} catch (Exception e) {
log.warn("{} Stream 断开失败: {}", LOG_TAG, e.getMessage(), e);
} finally {
streamClientRef = null;
}
}
private void sleepQuietly(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
//update-end---author:GHT ---date:20260609 for【钉钉Stream集群】Redis选主+心跳重连生命周期管理-----
}

View File

@@ -0,0 +1,60 @@
package org.jeecg.modules.xslmes.dingtalk.stream;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* 钉钉 Stream 连接存活状态监控(定时日志)。
*/
@Slf4j
@Component
public class DingTalkStreamHealthMonitor {
private static final String LOG_TAG = DingTalkStreamSdkRunner.LOG_TAG;
@Autowired
private DingTalkStreamProperties properties;
@Autowired
private DingTalkStreamLeaderElection leaderElection;
//update-begin---author:GHT ---date:20260609 for【钉钉Stream监控】定时输出存活与心跳状态-----------
@Scheduled(fixedDelayString = "${jeecg.xslmes.dingtalk.stream.health-log-interval-ms:60000}")
public void reportHealth() {
DingTalkStreamSdkRunner.ConnectionSnapshot snap = DingTalkStreamSdkRunner.snapshot();
long now = System.currentTimeMillis();
boolean clusterMode = properties.isClusterMode();
String role;
String leaderHolder;
if (!clusterMode) {
role = "STANDALONE";
leaderHolder = leaderElection.instanceId();
} else if (leaderElection.isLeader()) {
role = "LEADER";
leaderHolder = leaderElection.instanceId();
} else {
role = "FOLLOWER";
leaderHolder = leaderElection.currentHolder();
}
Long connectedSec = snap.connectedAtMs() > 0 ? (now - snap.connectedAtMs()) / 1000 : null;
Long lastEventAgoSec = snap.lastEventAtMs() > 0 ? (now - snap.lastEventAtMs()) / 1000 : null;
log.info("{} Stream存活状态 role={} instanceId={} leaderHolder={} streamRunning={} connectedSec={} "
+ "lastEventAgoSec={} totalEvents={} reconnectCount={}",
LOG_TAG, role, leaderElection.instanceId(), leaderHolder,
snap.streamRunning(), connectedSec, lastEventAgoSec,
snap.totalEventCount(), snap.reconnectCount());
if ("LEADER".equals(role) && snap.streamRunning()
&& snap.lastEventAtMs() > 0
&& (now - snap.lastEventAtMs()) > properties.getIdleWarnSeconds() * 1000L) {
log.warn("{} Stream长时间无推送可能业务空闲或连接异常idleSec={} thresholdSec={}",
LOG_TAG, lastEventAgoSec, properties.getIdleWarnSeconds());
}
}
//update-end---author:GHT ---date:20260609 for【钉钉Stream监控】定时输出存活与心跳状态-----------
}

View File

@@ -0,0 +1,111 @@
package org.jeecg.modules.xslmes.dingtalk.stream;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* 钉钉 Stream 集群 Leader 选举Redis 分布式锁)。
* <p>
* 同一 AppKey 在多个 JeecgBoot 实例上只能有一个活跃 Stream 连接,
* 避免消息被随机节点抢走且处理失败。
*/
@Slf4j
@Component
public class DingTalkStreamLeaderElection {
private static final String LOCK_KEY = "mes:xsl:dingtalk:stream:leader";
private static final long LOCK_TTL_SECONDS = 30L;
private final String instanceId;
private final RedisTemplate<String, Object> redisTemplate;
@Autowired
public DingTalkStreamLeaderElection(@Autowired(required = false) RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
this.instanceId = buildInstanceId();
}
public String instanceId() {
return instanceId;
}
public boolean isRedisAvailable() {
return redisTemplate != null;
}
//update-begin---author:GHT ---date:20260609 for【钉钉Stream集群】Redis选主仅单节点建连-----------
/**
* 尝试成为 Leader。
*/
public boolean tryAcquire() {
if (redisTemplate == null) {
return false;
}
Boolean acquired = redisTemplate.opsForValue()
.setIfAbsent(LOCK_KEY, instanceId, LOCK_TTL_SECONDS, TimeUnit.SECONDS);
boolean ok = Boolean.TRUE.equals(acquired);
if (ok) {
log.info("{} 获得 Stream Leader 锁 instanceId={}", DingTalkStreamSdkRunner.LOG_TAG, instanceId);
}
return ok;
}
/**
* 续期 Leader 锁。
*/
public boolean renew() {
if (redisTemplate == null) {
return false;
}
Object current = redisTemplate.opsForValue().get(LOCK_KEY);
if (!instanceId.equals(String.valueOf(current))) {
return false;
}
return Boolean.TRUE.equals(redisTemplate.expire(LOCK_KEY, LOCK_TTL_SECONDS, TimeUnit.SECONDS));
}
/**
* 释放 Leader 锁(仅当前持有者有效)。
*/
public void release() {
if (redisTemplate == null) {
return;
}
Object current = redisTemplate.opsForValue().get(LOCK_KEY);
if (instanceId.equals(String.valueOf(current))) {
redisTemplate.delete(LOCK_KEY);
log.info("{} 已释放 Stream Leader 锁 instanceId={}", DingTalkStreamSdkRunner.LOG_TAG, instanceId);
}
}
public String currentHolder() {
if (redisTemplate == null) {
return null;
}
Object holder = redisTemplate.opsForValue().get(LOCK_KEY);
return holder != null ? String.valueOf(holder) : null;
}
public boolean isLeader() {
return instanceId.equals(currentHolder());
}
//update-end---author:GHT ---date:20260609 for【钉钉Stream集群】Redis选主仅单节点建连-----------
private static String buildInstanceId() {
String host = "unknown-host";
try {
host = InetAddress.getLocalHost().getHostName();
} catch (Exception ignored) {
// 使用默认 host 标识
}
String pid = ManagementFactory.getRuntimeMXBean().getName();
return host + ":" + pid + "@" + UUID.randomUUID().toString().substring(0, 8);
}
}

View File

@@ -0,0 +1,32 @@
package org.jeecg.modules.xslmes.dingtalk.stream;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* 钉钉 Stream 集群与监控配置。
*/
@Data
@Component
@ConfigurationProperties(prefix = "jeecg.xslmes.dingtalk.stream")
public class DingTalkStreamProperties {
/**
* 集群模式true 时通过 Redis 选主,仅 Leader 节点建立 Stream 长连接。
* 多实例生产环境务必保持 true单实例可设为 false 简化部署。
*/
private boolean clusterMode = true;
/** Leader 锁续期间隔(毫秒) */
private long leaderRenewIntervalMs = 10_000L;
/** Follower 抢主重试间隔(毫秒) */
private long followerRetryIntervalMs = 15_000L;
/** 存活状态日志输出间隔(毫秒) */
private long healthLogIntervalMs = 60_000L;
/** 无事件空闲告警阈值Leader 且连接中超过该时间无推送则 warn */
private long idleWarnSeconds = 1800L;
}

View File

@@ -1,109 +1,242 @@
package org.jeecg.modules.xslmes.dingtalk.stream;
import com.dingtalk.open.app.api.GenericEventListener;
import com.dingtalk.open.app.api.OpenDingTalkStreamClientBuilder;
import com.dingtalk.open.app.api.message.GenericOpenDingTalkEvent;
import com.dingtalk.open.app.api.security.AuthClientCredential;
import com.dingtalk.open.app.stream.protocol.event.EventAckStatus;
import lombok.extern.slf4j.Slf4j;
/**
* 钉钉 Stream SDK 启动器(非 Spring Bean
* <p>
* 与 {@link DingTalkStreamClient} 分离LiteFlow 在上下文初始化早期会扫描所有 {@code @Component}
* 并调用 {@code getDeclaredMethods()},若 Bean 类字节码直接引用钉钉 SDK 类型,会提前加载
* {@code DingTalkCredential} 等类;本类不参与 Spring 扫描,仅在后台线程中按需加载。
*
* @author GHT
* @date 2026-06-05 for【钉钉Stream回调】隔离SDK类避免LiteFlow启动期加载失败
*/
@Slf4j
public final class DingTalkStreamSdkRunner {
/** 统一日志前缀,便于 grep钉钉回调 */
public static final String LOG_TAG = "[钉钉回调]";
private DingTalkStreamSdkRunner() {
}
//update-begin---author:GHT ---date:2026-06-05 for【钉钉Stream回调】将SDK启动逻辑从Spring Bean中剥离-----
/**
* 建立钉钉 Stream 长连接并开始接收事件。
*
* @param appKey 钉钉 AppKey
* @param appSecret 钉钉 AppSecret
* @param processor 审批事件处理器
*/
public static void start(String appKey, String appSecret, DingBpmsEventProcessor processor) throws Exception {
log.info("{} Stream 正在建连 AppKey={}", LOG_TAG, appKey);
OpenDingTalkStreamClientBuilder
.custom()
.credential(new AuthClientCredential(appKey, appSecret))
.registerAllEventListener(new GenericEventListener() {
@Override
public EventAckStatus onEvent(GenericOpenDingTalkEvent event) {
String eventType = event != null ? event.getEventType() : null;
String eventId = event != null ? event.getEventId() : null;
Long bornTime = event != null ? event.getEventBornTime() : null;
long startMs = System.currentTimeMillis();
try {
com.alibaba.fastjson2.JSONObject data = event != null ? toJsonObject(event.getData()) : null;
//update-begin---author:GHT ---date:20260605 for【XSLMES-20260605-K8R2】钉钉Stream入站全量日志-----------
log.info("{} Stream入站 eventId={} eventType={} bornTime={} data={}",
LOG_TAG, eventId, eventType, bornTime,
data != null ? data.toJSONString() : "null");
//update-end---author:GHT ---date:20260605 for【XSLMES-20260605-K8R2】钉钉Stream入站全量日志-----------
if (!"bpms_instance_change".equals(eventType) && !"bpms_task_change".equals(eventType)) {
log.info("{} 非审批BPMS事件已忽略 eventType={}", LOG_TAG, eventType);
return EventAckStatus.SUCCESS;
}
if (data == null) {
log.warn("{} 事件 data 为空,无法处理 eventType={} eventId={}", LOG_TAG, eventType, eventId);
return EventAckStatus.SUCCESS;
}
String instanceId = data.getString("processInstanceId");
log.info("{} 开始处理 eventType={} instanceId={}", LOG_TAG, eventType, instanceId);
if ("bpms_instance_change".equals(eventType)) {
processor.onInstanceChange(data);
} else {
processor.onTaskChange(data);
}
log.info("{} 处理完成 eventType={} instanceId={} costMs={}",
LOG_TAG, eventType, instanceId, System.currentTimeMillis() - startMs);
return EventAckStatus.SUCCESS;
} catch (Exception e) {
log.error("{} 事件处理异常 eventId={} eventType={} costMs={}: {}",
LOG_TAG, eventId, eventType, System.currentTimeMillis() - startMs,
e.getMessage(), e);
return EventAckStatus.LATER;
}
}
})
.build()
.start();
log.info("{} Stream 客户端已启动,等待审批事件推送", LOG_TAG);
}
//update-end---author:GHT ---date:2026-06-05 for【钉钉Stream回调】将SDK启动逻辑从Spring Bean中剥离-----
private static com.alibaba.fastjson2.JSONObject toJsonObject(Object raw) {
if (raw == null) {
return null;
}
if (raw instanceof com.alibaba.fastjson2.JSONObject) {
return (com.alibaba.fastjson2.JSONObject) raw;
}
try {
return com.alibaba.fastjson2.JSONObject.parseObject(String.valueOf(raw));
} catch (Exception e) {
return null;
}
}
}
package org.jeecg.modules.xslmes.dingtalk.stream;
import com.dingtalk.open.app.api.GenericEventListener;
import com.dingtalk.open.app.api.KeepAliveOption;
import com.dingtalk.open.app.api.OpenDingTalkClient;
import com.dingtalk.open.app.api.OpenDingTalkStreamClientBuilder;
import com.dingtalk.open.app.api.message.GenericOpenDingTalkEvent;
import com.dingtalk.open.app.api.security.AuthClientCredential;
import com.dingtalk.open.app.stream.protocol.event.EventAckStatus;
import lombok.extern.slf4j.Slf4j;
/**
* 钉钉 Stream SDK 启动器(非 Spring Bean
* <p>
* 与 {@link DingTalkStreamClient} 分离LiteFlow 在上下文初始化早期会扫描所有 {@code @Component}
* 并调用 {@code getDeclaredMethods()},若 Bean 类字节码直接引用钉钉 SDK 类型,会提前加载
* {@code DingTalkCredential} 等类;本类不参与 Spring 扫描,仅在后台线程中按需加载。
*
* @author GHT
* @date 2026-06-05 for【钉钉Stream回调】隔离SDK类避免LiteFlow启动期加载失败
*/
@Slf4j
public final class DingTalkStreamSdkRunner {
/** 统一日志前缀,便于 grep钉钉回调 */
public static final String LOG_TAG = "[钉钉回调]";
private static volatile OpenDingTalkClient activeClient;
private static volatile boolean streamRunning;
private static volatile long connectedAtMs;
private static volatile long lastEventAtMs;
private static volatile int totalEventCount;
private static volatile int reconnectCount;
private DingTalkStreamSdkRunner() {
}
//update-begin---author:GHT ---date:20260609 for【钉钉Stream监控】连接状态快照供存活日志使用-----------
public static ConnectionSnapshot snapshot() {
return new ConnectionSnapshot(streamRunning, connectedAtMs, lastEventAtMs, totalEventCount, reconnectCount);
}
public static final class ConnectionSnapshot {
private final boolean streamRunning;
private final long connectedAtMs;
private final long lastEventAtMs;
private final int totalEventCount;
private final int reconnectCount;
private ConnectionSnapshot(boolean streamRunning, long connectedAtMs, long lastEventAtMs,
int totalEventCount, int reconnectCount) {
this.streamRunning = streamRunning;
this.connectedAtMs = connectedAtMs;
this.lastEventAtMs = lastEventAtMs;
this.totalEventCount = totalEventCount;
this.reconnectCount = reconnectCount;
}
public boolean streamRunning() {
return streamRunning;
}
public long connectedAtMs() {
return connectedAtMs;
}
public long lastEventAtMs() {
return lastEventAtMs;
}
public int totalEventCount() {
return totalEventCount;
}
public int reconnectCount() {
return reconnectCount;
}
}
//update-end---author:GHT ---date:20260609 for【钉钉Stream监控】连接状态快照供存活日志使用-----------
//update-begin---author:GHT ---date:2026-06-05 for【钉钉Stream回调】将SDK启动逻辑从Spring Bean中剥离-----
/**
* 建立钉钉 Stream 长连接并开始接收事件。
*
* @param appKey 钉钉 AppKey
* @param appSecret 钉钉 AppSecret
* @param processor 审批事件处理器
* @param logHelper 回调日志落库辅助(可为 null仅不写库
* @return SDK 客户端实例,供集群 Leader 切换时主动 stop
*/
public static OpenDingTalkClient start(String appKey, String appSecret, DingBpmsEventProcessor processor,
DingStreamCallbackLogHelper logHelper) throws Exception {
stop(activeClient);
boolean isReconnect = reconnectCount > 0 || connectedAtMs > 0;
if (isReconnect) {
reconnectCount++;
log.info("{} Stream 正在重连 AppKey={} reconnectCount={}", LOG_TAG, appKey, reconnectCount);
} else {
log.info("{} Stream 正在建连 AppKey={}", LOG_TAG, appKey);
}
OpenDingTalkClient client = OpenDingTalkStreamClientBuilder
.custom()
.credential(new AuthClientCredential(appKey, appSecret))
// SDK 内部 WebSocket 心跳,默认 60s 空闲探测
.keepAlive(KeepAliveOption.create().withKeepAliveIdleMill(60_000L))
.registerAllEventListener(new GenericEventListener() {
@Override
public EventAckStatus onEvent(GenericOpenDingTalkEvent event) {
//update-begin---author:GHT ---date:20260609 for【钉钉Stream】推送监听日志-----------
lastEventAtMs = System.currentTimeMillis();
totalEventCount++;
log.info("{} 钉钉Stream推送监听 event={}", LOG_TAG, event);
//update-end---author:GHT ---date:20260609 for【钉钉Stream】推送监听日志-----------
String eventType = event != null ? event.getEventType() : null;
String eventId = event != null ? event.getEventId() : null;
Long bornTime = event != null ? event.getEventBornTime() : null;
long startMs = System.currentTimeMillis();
try {
com.alibaba.fastjson2.JSONObject data = event != null ? toJsonObject(event.getData()) : null;