分布式事务可以说是在分布式架构系统中,比较棘手的问题了.如果根据业务场景制定相应的事务,如利用消息的事务最终一致性解决方案,XA等2PC强一致方案等等.今天就列举一下相关解决方案;
1.根据同步结果保证事务
注:该种方式适用于单一的调用其他服务,利用业务执行顺序,根据同步结果保证事务
@Transactional
@PostMapping
public R test() {
mapper.delete();
//将服务调用放到业务执行的最后,根据响应结果选择事务的提交;
feignClient.run();
return R.ok();
}
这种方式也存在一定弊端,如虽然调用其他服务成功,但是最终本地事务提交不成功,造成不一致;调用服务超时,本地异常回滚,但是最终调用的服务成功提交了,造成不一致;还需要衡量使用;
2.使用XA等强一致事务
注:该种方式适用于对数据实时一致性要求比较高,对业务的并发量要求不高;
@Transactional
@PostMapping
public R test() {
//将相关服务的数据库全部整合到本项目,利用JTA,Atomikos等事务框架结合数据库XA特性实现事务;
mapperOne.delete();
mapperTwo.delete();
return R.ok();
}
3.使用Spring的链式事务
注:该种方式适用于对业务的并发量要求不高,数据实时性高,但是允许出现数据不一致的少数情况产生;其内部实现其实是将多个事务进行迭代事务提交,最大努力单阶段提交模式;
代码示例与上述一样,只是在事务管理器配置上使用ChainedTransactionManager
4.使用相关开源框架
例如使用阿里的SEATA框架,它提供了多种事务公式;
1.AT模式:
@GlobalTransaction
@PostMapping
public R test() {
mapperOne.delete();
feignClient1.run();
feignClient2.run();
return R.ok();
}
1)它对业务代码的侵入性很小,只需简单配置:如更改事务注解,修改数据库连接池为相关代理,数据库层面新建管理器需要的表和各服务需要的日志表;
2)它使用的是二阶段提交方式,一阶段提交业务和日志的本地事务,二阶段根据全局事务管理器进行全局回滚或提交;
3)它的原理大致是这样:
首先根据事务管理器生成XID事务唯一编码,并保存在当前线程上下文中等待传递;
在本地服务中,利用代理数据库连接池解析执行的语句,生成前后SQL镜像,及相应的行锁BranchID,本地事务提交保存,用来后面的回滚;
在调用外部服务时,如使用Feign,则使用请求拦截器传递XID到其他服务,其他服务在收到请求时如Http协议则根据Header获取到XID也保存到当前线程上下文保存;
当一阶段结束,则全局事务管理器执行二阶段提交,回滚则根据各服务日志进行回滚,提交则批量删除日志的前后镜像及行锁等信息,其中客户端资源管理器和独立的事务协调器使用Netty通信;
4)看以上能看出,它在一次事务中,会多次访问数据库;
2.TCC模式:
1)它对业务代码的侵入性就比较大了,类似于下面代码;它与AT模式的不同之处在于提交与回滚不依托于数据库,而是业务自定义;
//开启GlobalTransactionScanner,实现以下接口
public interface MyWork {
@TwoPhaseBusinessAction(name = "test", commitMethod = "commit", rollbackMethod = "rollback")
public boolean doSomethings(BusinessActionContext businessActionContext,
@BusinessActionContextParameter(paramName = "amount") String amount);
public boolean commit(BusinessActionContext businessActionContext);
public boolean rollback(BusinessActionContext businessActionContext);
}
3.Saga模式:
1)Saga模式是SEATA提供的长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。
5.使用消息队列框架实现最终一致性
注:该种方式设计得当有很高并发度,以及很高灵活度,但是也因为灵活度太高,也会造成业务侵入高,以及设计编写难度较高;
1.使用RocketMQ事务消息
1)设置本地事务状态,以及回查监听:
当本地事务中调用发送消息后,首先会发送一个半消息到Broker,此时对消费端是隐藏的;当半消息发送成功后后,会回调本地事务方法,然后根据本地执行事务的成功与否,进行提交消息状态,当提交COMMIT_MESSAGE则消费端可以接收到消息;
当本地事务结束,没有正常提交半消息状态,则Broker会根据半消息回查监听接口,确定半消息状态;默认第一次回查在本地结束后15s,往后一直每分钟回查一次;
阿里云的例子:
//演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。
private static boolean checkOrderById(String orderId) {
return true;
}
//演示demo,模拟本地事务的执行结果。
private static boolean doLocalTransaction() {
return true;
}
public static void main(String[] args) throws ClientException {
ClientServiceProvider provider = new ClientServiceProvider();
MessageBuilder messageBuilder = new MessageBuilder();
//构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。
Producer producer = provider.newProducerBuilder()
.setTransactionChecker(messageView -> {
/**
* 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。
* 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。
*/
final String orderId = messageView.getProperties().get("OrderId");
if (Strings.isNullOrEmpty(orderId)) {
// 错误的消息,直接返回Rollback。
return TransactionResolution.ROLLBACK;
}
return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
})
.build();
//开启事务分支。
final Transaction transaction;
try {
transaction = producer.beginTransaction();
} catch (ClientException e) {
e.printStackTrace();
//事务分支开启失败,直接退出。
return;
}
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
//一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。
.addProperty("OrderId", "xxx")
//消息体。
.setBody("messageBody".getBytes())
.build();
//发送半事务消息
final SendReceipt sendReceipt;
try {
sendReceipt = producer.send(message, transaction);
} catch (ClientException e) {
//半事务消息发送失败,事务可以直接退出并回滚。
return;
}
/**
* 执行本地事务,并确定本地事务结果。
* 1. 如果本地事务提交成功,则提交消息事务。
* 2. 如果本地事务提交失败,则回滚消息事务。
* 3. 如果本地事务未知异常,则不处理,等待事务消息回查。
*
*/
boolean localTransactionOk = doLocalTransaction();
if (localTransactionOk) {
try {
transaction.commit();
} catch (ClientException e) {
// 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。
e.printStackTrace();
}
} else {
try {
transaction.rollback();
} catch (ClientException e) {
// 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。
e.printStackTrace();
}
}
}
2)通过事务消息给我们提供的半消息以及回查机制,我们就可以灵活的控制分布式事务的提交;
2.使用RabbitMQ消息确认机制
1)开启Confirm模式后,当消息未能成功发送到Broker,则回调confirm监听接口告知生产者;
2)开启Return消息机制后,当消息发送到指定路由或队列失败,则回调告知生产者;
3)开启手动确认消息,保障消息能够成功发送到消费者;
4)当使用Spring提供的事务管理器时,RabbitMQ能够保证在本地事务提交成功后才发送消息,通过设置也可实现提交失败也发送消息;底层使用的Channel开启了事务信道的支持;经测试ActiveMQ等都支持此特性;
3.在使用消息队列保证事务一致性时,可以使用定时方法等协助消息的发送
例如:当使用RabbitMQ发送消息时在本地事务中增加唯一性消息日志,在confirm方法中更新发送状态;然后使用定时器扫描未发送消息进行重发;
4.设计保证幂等性的接口(不限于消费者,不同场景接口幂等处理方式)
- 查询状态,适用于并发很低的情况并且对重复处理有一定容忍度,比如(伪代码):
public void consume(Message message) {
if ("该信息对应订单已消费") {
return;
}
//执行
}
2)可以使用消息日志方式,保存消息记录,进而保证幂等性,例如如下方式:
其实在insert的时候,由于在事务中,所以会触发数据库的排他锁,并发的线程会等待第一个线程事务提交结束;
public boolean checkMessage(Message message) {
try {
logMapper.insert(message.getTopic(), message.getQueue(), message.getId(), "processing");
return true;
} catch (DuplicateKeyException e) {
//此处可以继续验证是否处理成功,如需要则此处需保证线程安全
return false;
}
}
//处理业务
public void consume(Message message) {
if (checkMessage(message)) {
//进行业务处理
//此处可以将日志修改为成功
}
}
3)悲观锁,比如Java中的synchronized,简单粗暴;
4)乐观锁,再表中加入version等字段,每次修改数据库时进行判断提交时版本;该种方式可与第一种查询方法一起使用;
5)利用redis等存储一次性token,比如在提交页面时获取后端唯一token并存储,消费时删除token进行唯一页面提交;
6)状态机幂等,例如处理一个订单,会涉及不同状态,并且处理订单不同业务都受订单状态影响;
public void consume(Message message) {
//判断订单状态是否处理
//处理其他业务,更新时判断订单状态值(update 订单相关业务 where 订单状态=xx状态)
}
7)还是状态机幂等,加入方式2的日志表方式:
订单状态表: id,order_id,status order_id + status 唯一键索引 在同一事务下,订单处理插入状态表,通过唯一键保障该业务幂等;
8)尽可能将业务设计为不会出现幂等性的方式:
比如用户点赞行为,每人只能点赞一次,并统计该文章赞数; 1.用户点赞使用插入数据库并且唯一键的方式; 2.文章不使用更新点赞数的操作,而是统计用户点赞表操作; 9)尽量的避免程序中引入分布式事务问题 有时候不要跟风将系统拆分成类似互联网大公司的微服务,拆的非常细,在链路调用中会有成倍的问题问题出现,耗费非常大的精力解决;
可以结合业务场景,性能要求等,以业务流的方式拆分,例如:
以前下单这块涉及的模块想拆成:库存中心,订单中心,余额中心,分别至少三个库不同服务的调用;
现在根据业务流将以上三个合为一,充分使用数据库的本地事务来避免事务问题,一劳永逸;将其他非强一致事务或对一致性要求不高的业务拆分服务及数据库;然后使用上述分布式事务方式进行实践;
其实,在业务中,分布式事务终归是一个非常难以有统一解决方案的问题,只有最适合自己业务的方案;
本文由 GY 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为:
2023/02/07 11:09