分布式事务,简单说说

/ 后端 / 没有评论 / 159浏览

​分布式事务可以说是在分布式架构系统中,比较棘手的问题了.如果根据业务场景制定相应的事务,如利用消息的事务最终一致性解决方案,XA等2PC强一致方案等等.今天就列举一下相关解决方案;

1.根据同步结果保证事务

注:该种方式适用于单一的调用其他服务,利用业务执行顺序,根据同步结果保证事务

@Transactional
@PostMapping
public R test() {
    mapper.delete();
    //将服务调用放到业务执行的最后,根据响应结果选择事务的提交;
    feignClient.run();
    return R.ok();
}

这种方式也存在一定弊端,如虽然调用其他服务成功,但是最终本地事务提交不成功,造成不一致;调用服务超时,本地异常回滚,但是最终调用的服务成功提交了,造成不一致;还需要衡量使用;

2.使用XA等强一致事务

注:该种方式适用于对数据实时一致性要求比较高,对业务的并发量要求不高;


@Transactional
@PostMapping
public R test() {
	//将相关服务的数据库全部整合到本项目,利用JTA,Atomikos等事务框架结合数据库XA特性实现事务;
    mapperOne.delete();
    mapperTwo.delete();
    return R.ok();
}

3.使用Spring的链式事务

注:该种方式适用于对业务的并发量要求不高,数据实时性高,但是允许出现数据不一致的少数情况产生;其内部实现其实是将多个事务进行迭代事务提交,最大努力单阶段提交模式;

代码示例与上述一样,只是在事务管理器配置上使用ChainedTransactionManager

4.使用相关开源框架

例如使用阿里的SEATA框架,它提供了多种事务公式;

1.AT模式:

@GlobalTransaction
@PostMapping
public R test() {
    mapperOne.delete();
    feignClient1.run();
    feignClient2.run();
    return R.ok();
}

1)它对业务代码的侵入性很小,只需简单配置:如更改事务注解,修改数据库连接池为相关代理,数据库层面新建管理器需要的表和各服务需要的日志表;

2)它使用的是二阶段提交方式,一阶段提交业务和日志的本地事务,二阶段根据全局事务管理器进行全局回滚或提交;

3)它的原理大致是这样:

​ 首先根据事务管理器生成XID事务唯一编码,并保存在当前线程上下文中等待传递;

​ 在本地服务中,利用代理数据库连接池解析执行的语句,生成前后SQL镜像,及相应的行锁BranchID,本地事务提交保存,用来后面的回滚;

​ 在调用外部服务时,如使用Feign,则使用请求拦截器传递XID到其他服务,其他服务在收到请求时如Http协议则根据Header获取到XID也保存到当前线程上下文保存;

​ 当一阶段结束,则全局事务管理器执行二阶段提交,回滚则根据各服务日志进行回滚,提交则批量删除日志的前后镜像及行锁等信息,其中客户端资源管理器和独立的事务协调器使用Netty通信;

4)看以上能看出,它在一次事务中,会多次访问数据库;

2.TCC模式:

1)它对业务代码的侵入性就比较大了,类似于下面代码;它与AT模式的不同之处在于提交与回滚不依托于数据库,而是业务自定义;


//开启GlobalTransactionScanner,实现以下接口
public interface MyWork {
        @TwoPhaseBusinessAction(name = "test", commitMethod = "commit", rollbackMethod = "rollback")
        public boolean doSomethings(BusinessActionContext businessActionContext,
                                    @BusinessActionContextParameter(paramName = "amount") String amount);
        public boolean commit(BusinessActionContext businessActionContext);

        public boolean rollback(BusinessActionContext businessActionContext);
    }

3.Saga模式:

1)Saga模式是SEATA提供的长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。

5.使用消息队列框架实现最终一致性

注:该种方式设计得当有很高并发度,以及很高灵活度,但是也因为灵活度太高,也会造成业务侵入高,以及设计编写难度较高;

1.使用RocketMQ事务消息

1)设置本地事务状态,以及回查监听:

​ 当本地事务中调用发送消息后,首先会发送一个半消息到Broker,此时对消费端是隐藏的;当半消息发送成功后后,会回调本地事务方法,然后根据本地执行事务的成功与否,进行提交消息状态,当提交COMMIT_MESSAGE则消费端可以接收到消息;

​ 当本地事务结束,没有正常提交半消息状态,则Broker会根据半消息回查监听接口,确定半消息状态;默认第一次回查在本地结束后15s,往后一直每分钟回查一次;

阿里云的例子:

//演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。
    private static boolean checkOrderById(String orderId) {
        return true;
    }

    //演示demo,模拟本地事务的执行结果。
    private static boolean doLocalTransaction() {
        return true;
    }

    public static void main(String[] args) throws ClientException {
        ClientServiceProvider provider = new ClientServiceProvider();
        MessageBuilder messageBuilder = new MessageBuilder();
        //构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。
        Producer producer = provider.newProducerBuilder()
                .setTransactionChecker(messageView -> {
                    /**
                     * 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。
                     * 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。
                     */
                    final String orderId = messageView.getProperties().get("OrderId");
                    if (Strings.isNullOrEmpty(orderId)) {
                        // 错误的消息,直接返回Rollback。
                        return TransactionResolution.ROLLBACK;
                    }
                    return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
                })
                .build();
        //开启事务分支。
        final Transaction transaction;
        try {
            transaction = producer.beginTransaction();
        } catch (ClientException e) {
            e.printStackTrace();
            //事务分支开启失败,直接退出。
            return;
        }
        Message message = messageBuilder.setTopic("topic")
                //设置消息索引键,可根据关键字精确查找某条消息。
                .setKeys("messageKey")
                //设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("messageTag")
                //一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。
                .addProperty("OrderId", "xxx")
                //消息体。
                .setBody("messageBody".getBytes())
                .build();
        //发送半事务消息
        final SendReceipt sendReceipt;
        try {
            sendReceipt = producer.send(message, transaction);
        } catch (ClientException e) {
            //半事务消息发送失败,事务可以直接退出并回滚。
            return;
        }
        /**
         * 执行本地事务,并确定本地事务结果。
         * 1. 如果本地事务提交成功,则提交消息事务。
         * 2. 如果本地事务提交失败,则回滚消息事务。
         * 3. 如果本地事务未知异常,则不处理,等待事务消息回查。
         *
         */
        boolean localTransactionOk = doLocalTransaction();
        if (localTransactionOk) {
            try {
                transaction.commit();
            } catch (ClientException e) {
                // 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。
                e.printStackTrace();
            }
        } else {
            try {
                transaction.rollback();
            } catch (ClientException e) {
                // 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。
                e.printStackTrace();
            }
        }
    }

2)通过事务消息给我们提供的半消息以及回查机制,我们就可以灵活的控制分布式事务的提交;

2.使用RabbitMQ消息确认机制

1)开启Confirm模式后,当消息未能成功发送到Broker,则回调confirm监听接口告知生产者;

2)开启Return消息机制后,当消息发送到指定路由或队列失败,则回调告知生产者;

3)开启手动确认消息,保障消息能够成功发送到消费者;

4)当使用Spring提供的事务管理器时,RabbitMQ能够保证在本地事务提交成功后才发送消息,通过设置也可实现提交失败也发送消息;底层使用的Channel开启了事务信道的支持;经测试ActiveMQ等都支持此特性;

3.在使用消息队列保证事务一致性时,可以使用定时方法等协助消息的发送

例如:当使用RabbitMQ发送消息时在本地事务中增加唯一性消息日志,在confirm方法中更新发送状态;然后使用定时器扫描未发送消息进行重发;

4.设计保证幂等性的接口(不限于消费者,不同场景接口幂等处理方式)

  1. 查询状态,适用于并发很低的情况并且对重复处理有一定容忍度,比如(伪代码):
public void consume(Message message) {
       if ("该信息对应订单已消费") {
          return;
       }
       //执行
   }

2)可以使用消息日志方式,保存消息记录,进而保证幂等性,例如如下方式:

其实在insert的时候,由于在事务中,所以会触发数据库的排他锁,并发的线程会等待第一个线程事务提交结束;


public boolean checkMessage(Message message) {
       try {
           logMapper.insert(message.getTopic(), message.getQueue(), message.getId(), "processing");
           return true;
       } catch (DuplicateKeyException e) {
           //此处可以继续验证是否处理成功,如需要则此处需保证线程安全
           return false;
       }
   }
   
  	//处理业务
   public void consume(Message message) {
       if (checkMessage(message)) {
           //进行业务处理
           //此处可以将日志修改为成功
       }
   }

3)悲观锁,比如Java中的synchronized,简单粗暴;

4)乐观锁,再表中加入version等字段,每次修改数据库时进行判断提交时版本;该种方式可与第一种查询方法一起使用;

5)利用redis等存储一次性token,比如在提交页面时获取后端唯一token并存储,消费时删除token进行唯一页面提交;

6)状态机幂等,例如处理一个订单,会涉及不同状态,并且处理订单不同业务都受订单状态影响;

public void consume(Message message) {
     //判断订单状态是否处理
     //处理其他业务,更新时判断订单状态值(update 订单相关业务 where 订单状态=xx状态)
}

7)还是状态机幂等,加入方式2的日志表方式:

订单状态表: id,order_id,status order_id + status 唯一键索引 在同一事务下,订单处理插入状态表,通过唯一键保障该业务幂等;

8)尽可能将业务设计为不会出现幂等性的方式:

比如用户点赞行为,每人只能点赞一次,并统计该文章赞数; 1.用户点赞使用插入数据库并且唯一键的方式; 2.文章不使用更新点赞数的操作,而是统计用户点赞表操作; 9)尽量的避免程序中引入分布式事务问题 有时候不要跟风将系统拆分成类似互联网大公司的微服务,拆的非常细,在链路调用中会有成倍的问题问题出现,耗费非常大的精力解决;

可以结合业务场景,性能要求等,以业务流的方式拆分,例如:

以前下单这块涉及的模块想拆成:库存中心,订单中心,余额中心,分别至少三个库不同服务的调用;

现在根据业务流将以上三个合为一,充分使用数据库的本地事务来避免事务问题,一劳永逸;将其他非强一致事务或对一致性要求不高的业务拆分服务及数据库;然后使用上述分布式事务方式进行实践;

其实,在业务中,分布式事务终归是一个非常难以有统一解决方案的问题,只有最适合自己业务的方案;