通用本地持久化RPC失败重试组件|SpringBoot + OpenHFT Chronicle-Queue
《金融支付架构实战指南技术、安全与合规》介绍了6种容错性其中重试性是其中一种本文介绍重试性的一种实现方法。一、前言当程序发布重启程序宕机kill -9) 执行一部分的数据但仍有一部分业务逻辑未执行支付敏感业务场景需要重试。痛点汇总所有微服务通用本地事务已落库RPC 调用下游订单 / 账务 / 清算失败同步抛异常回滚主事务影响业务若主事务已提交、RPC 失败 → 上下游数据不一致、资损临时内存重试JVM 宕机、进程重启待重试 RPC 全部丢失自建 MySQL 重试表大促高频 RPC 落库压垮 DB、事务锁、性能差Redis 重试队列Redis 集群宕机后整个补偿链路瘫痪设计目标做成通用组件一套代码统一支撑Dubbo RPC、OpenFeign HTTP、RestTemplate、Redis 消息发送四大场景调用失败持久化重试业务只需一行代码接入无需重复编写重试落盘逻辑定位RPC 调用失败兜底组件正常链路直接同步调用异常自动落地本地磁盘异步重试不侵入原有业务事务。二、整体通用架构业务执行入库完成 → 通用RPC调用模板发起远程调用↓调用成功 → 直接返回无落盘↓异常/超时/服务不可用 → 组装通用RPC参数落地OpenHFT磁盘后台定时任务轮询磁盘队列 → 反射/代理执行原RPC↓调用成功 → 不再落盘(等效删除消息)↓调用失败 → 更新重试次数阶梯间隔重新入磁盘队列↓超出最大重试次数 → 移入本地死信队列人工运维补发对账三、Maven 依赖xmldependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependency!-- OpenHFT持久队列 --dependencygroupIdnet.openhft/groupIdartifactIdchronicle-queue/artifactIdversion5.25ea10/version/dependency!-- JSON序列化 --dependencygroupIdcom.alibaba.fastjson2/groupIdartifactIdfastjson2/artifactIdversion2.0.48/version/dependency!-- Dubbo/Feign按需引入 --dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-openfeign/artifactId/dependencydependencygroupIdorg.apache.dubbo/groupIdartifactIddubbo-spring-boot-starter/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependency/dependencies四、通用 RPC 重试实体万能结构适配所有调用不绑定任何业务、不绑定 Dubbo/Feign通过「调用类型 全类名 方法名 入参数组」反射执行通用所有远程调用import lombok.Data;import java.io.Serializable;/*** 通用RPC/HTTP调用失败重试实体*/Datapublic class CommonRpcRetryMsg implements Serializable {// 全局唯一ID幂等标识private String uniqueId;// 业务单据号支付单号/订单号用于对账private String bizNo;/*** 调用类型DUBBO / FEIGN / REST / REDIS_MSG*/private String invokeType;// 接口全限定类名com.xxx.api.OrderRemoteApiprivate String interfaceClassName;// 目标方法名private String methodName;// 方法入参类型全类名数组private String[] paramClassNames;// 方法入参JSON数组private String[] paramJsonArr;// 重试调度字段private Integer retryCount;private Long nextRetryTime;private Long createTime;}五、全局常量配置public final class RpcRetryConfig {// 正常重试队列磁盘路径public static final String RETRY_QUEUE_PATH ./common/rpc/retry;// 死信队列路径public static final String DEAD_QUEUE_PATH ./common/rpc/dead;// 最大重试次数 金融建议12次public static final int MAX_RETRY_NUM 12;}六、通用反射 RPC 执行工具核心根据实体动态执行任意 RPCimport com.alibaba.fastjson2.JSON;import org.springframework.context.ApplicationContext;import javax.annotation.Resource;import java.lang.reflect.Method;public class RpcInvokeUtil {Resourceprivate ApplicationContext applicationContext;/*** 根据重试消息反射执行远程调用兼容Dubbo/Feign/普通Bean* return true成功 false失败*/public boolean invoke(CommonRpcRetryMsg msg) {try {// 1.获取接口ClassClass? interfaceCls Class.forName(msg.getInterfaceClassName());// 2.从Spring容器获取代理Bean(Dubbo/Feign代理对象都在Spring)Object proxyBean applicationContext.getBean(interfaceCls);// 3.拼装参数Class数组Class?[] paramClsArr new Class?[msg.getParamClassNames().length];for (int i 0; i msg.getParamClassNames().length; i) {paramClsArr[i] Class.forName(msg.getParamClassNames()[i]);}// 4.获取MethodMethod method interfaceCls.getMethod(msg.getMethodName(), paramClsArr);// 5.反序列化入参Object[] params new Object[msg.getParamJsonArr().length];for (int i 0; i params.length; i) {params[i] JSON.parseObject(msg.getParamJsonArr()[i], paramClsArr[i]);}// 6.执行RPCmethod.invoke(proxyBean, params);return true;} catch (Exception e) {return false;}}}七、OpenHFT 通用持久化重试管理器全局唯一组件javaimport com.alibaba.fastjson2.JSON;import net.openhft.chronicle.queue.ChronicleQueue;import net.openhft.chronicle.queue.ExcerptAppender;import net.openhft.chronicle.queue.ExcerptTailer;import net.openhft.chronicle.queue.RollCycle;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;import java.io.File;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;Componentpublic class CommonRpcRetryManager {private ChronicleQueue retryQueue;private ChronicleQueue deadQueue;private final RpcInvokeUtil rpcInvokeUtil;private final ScheduledExecutorService schedule Executors.newSingleThreadScheduledExecutor();public CommonRpcRetryManager(RpcInvokeUtil rpcInvokeUtil) {this.rpcInvokeUtil rpcInvokeUtil;}PostConstructpublic void init() {// 按日拆分队列文件避免超大文件retryQueue ChronicleQueue.singleBuilder(new File(RpcRetryConfig.RETRY_QUEUE_PATH)).rollCycle(RollCycle.DAILY).build();deadQueue ChronicleQueue.singleBuilder(new File(RpcRetryConfig.DEAD_QUEUE_PATH)).rollCycle(RollCycle.DAILY).build();// 每15s扫描重试schedule.scheduleWithFixedDelay(this::scanRetryTask, 3, 15, TimeUnit.SECONDS);}/*** 调用失败存入本地磁盘*/public void saveFailMsg(CommonRpcRetryMsg msg) {msg.setCreateTime(System.currentTimeMillis());msg.setRetryCount(0);msg.setNextRetryTime(System.currentTimeMillis());try (ExcerptAppender app retryQueue.createAppender()) {app.writeText(JSON.toJSONString(msg));}}/*** 定时扫描重试*/private void scanRetryTask() {ExcerptTailer tailer retryQueue.createTailer();String json;while ((json tailer.readText()) ! null) {CommonRpcRetryMsg retryMsg JSON.parseObject(json, CommonRpcRetryMsg.class);long now System.currentTimeMillis();// 未到重试时间跳过if (retryMsg.getNextRetryTime() now) continue;// 超限移入死信if (retryMsg.getRetryCount() RpcRetryConfig.MAX_RETRY_NUM) {moveToDead(retryMsg);continue;}// 反射执行RPCboolean success rpcInvokeUtil.invoke(retryMsg);if (success) {// 成功不重写删除消息continue;} else {// 失败更新次数、下次时间重新落盘retryMsg.setRetryCount(retryMsg.getRetryCount() 1);retryMsg.setNextRetryTime(getNextRetryTime(retryMsg.getRetryCount()));try (ExcerptAppender app retryQueue.createAppender()) {app.writeText(JSON.toJSONString(retryMsg));}}}}/*** 阶梯退避策略通用金融标准间隔*/private long getNextRetryTime(int count) {long now System.currentTimeMillis();return switch (count) {case 0 - now 10 * 1000;case 1 - now 30 * 1000;case 2 - now 60 * 1000;case 3 - now 3 * 60 * 1000;case 4 - now 5 * 60 * 1000;default - now 10 * 60 * 1000;};}private void moveToDead(CommonRpcRetryMsg msg) {try (ExcerptAppender app deadQueue.createAppender()) {app.writeText(JSON.toJSONString(msg));}}PreDestroypublic void close() {schedule.shutdown();retryQueue.close();deadQueue.close();}}八、通用 RPC 调用门面业务统一入口一行代码接入javaimport com.alibaba.fastjson2.JSON;import org.springframework.stereotype.Service;import javax.annotation.Resource;import java.lang.reflect.Method;import java.util.UUID;Servicepublic class RpcRetryTemplate {Resourceprivate CommonRpcRetryManager retryManager;/*** 通用RPC调用模板* param bizNo 业务单号* param invokeType DUBBO/FEIGN/REST* param targetBean 远程接口代理对象(Dubbo/Feign)* param method 要执行的方法* param args 入参*/public T Object callRpc(String bizNo, String invokeType, T targetBean, Method method, Object... args) {try {// 同步发起远程调用return method.invoke(targetBean, args);} catch (Exception e) {// 调用异常自动组装消息落盘CommonRpcRetryMsg msg buildMsg(bizNo, invokeType, targetBean.getClass().getInterfaces()[0].getName(), method, args);retryManager.saveFailMsg(msg);return null;}}private CommonRpcRetryMsg buildMsg(String bizNo, String invokeType, String interfaceName, Method method, Object[] args) {CommonRpcRetryMsg msg new CommonRpcRetryMsg();msg.setUniqueId(UUID.randomUUID().toString().replace(-, ));msg.setBizNo(bizNo);msg.setInvokeType(invokeType);msg.setInterfaceClassName(interfaceName);msg.setMethodName(method.getName());// 参数Class数组Class?[] paramTypes method.getParameterTypes();String[] clsArr new String[paramTypes.length];String[] jsonArr new String[paramTypes.length];for (int i 0; i paramTypes.length; i) {clsArr[i] paramTypes[i].getName();jsonArr[i] JSON.toJSONString(args[i]);}msg.setParamClassNames(clsArr);msg.setParamJsonArr(jsonArr);return msg;}}九、业务层使用示例Dubbo Feign 两种演示示例 1Dubbo 远程调用javaServicepublic class PayBizService {Resourceprivate OrderDubboApi orderDubboApi; // Dubbo远程接口Resourceprivate RpcRetryTemplate rpcTemplate;public void paySuccess(String payNo) throws Exception {// 1.本地数据库更新支付成功// payOrderMapper.updateSuccess(payNo);// 2.调用下游订单Dubbo失败自动落盘重试Method method OrderDubboApi.class.getDeclaredMethod(createOrder, String.class, Long.class);rpcTemplate.callRpc(payNo, DUBBO, orderDubboApi, method, payNo, 100L);}}示例 2OpenFeign 远程调用javaServicepublic class SettleBizService {Resourceprivate SettleFeignApi settleFeignApi; // Feign远程接口Resourceprivate RpcRetryTemplate rpcTemplate;public void createSettle(String settleNo) throws Exception {// 本地落库Method method SettleFeignApi.class.getDeclaredMethod(notifySettle, String.class);rpcTemplate.callRpc(settleNo, FEIGN, settleFeignApi, method, settleNo);}}十、扩展注解版进阶可选 AOP 自动拦截 RpcRetry 注解方法自定义注解RpcRetry(invokeType DUBBO)AOP 环绕拦截方法异常自动封装参数落盘业务连手动构建 Method 都不需要极简接入。javaTarget(ElementType.METHOD)Retention(RetentionPolicy.RUNTIME)public interface RpcRetry {String invokeType() default DUBBO;}十一、核心通用特性说明1、全场景兼容Dubbo 泛化调用、接口代理调用OpenFeign/HttpClient HTTP 远程调用Redis Stream/MQ 消息投递失败一套组件全部接管失败重试。2、宕机数据安全OpenHFT mmap 内存映射落磁盘进程 / 服务器宕机消息永久保存在本地文件重启自动加载全部未完成 RPC 继续重试金融杜绝资损。3、删除机制Chronicle 只追加不删除重试成功不再写入队列 逻辑删除失败更新重试参数重新入队无冗余数据堆积。4、死信运维重试耗尽转入死信目录可开发后台管理接口读取死信、手动重新入重试队列满足金融审计、人工补单。十二、四种重试方案横向对比方案通用 RPC 适配宕机不丢消息性能中间件依赖金融适配度内存重试全适配❌极高无禁止生产DB 重试表全适配✅低Mysql高并发不推荐Redis 队列全适配✅高Redis中间件故障不可用OpenHFT 通用组件✅DUBBO/FEIGN/REST✅超高无✅金融首选通用兜底十三、生产优化拓展目录监控监控./common/rpc目录磁盘容量、文件增长死信突增告警下游服务大面积宕机幂等控制uniqueId 全局唯一下游接口根据 ID 做幂等防止重复下单 / 重复入账多环境隔离测试 / 生产区分磁盘目录避免文件混杂超大流量分片多实例部署天然队列隔离每个实例维护自身本地磁盘。十四、总结本组件完全通用化不绑定任何业务、任何 RPC 框架新项目老项目无缝接入作为 RPC 调用最后兜底正常走同步远程调用异常自动降级本地磁盘异步重试解决上下游数据不一致替换项目中 DB 重试表、内存重试、Redis 重试队列金融级可靠性满足支付、账务、清算合规要求。