集成rabbitMQ

/ 后端 / 没有评论 / 128浏览


注意:

1.当使用死信时,目前测试结果是,只有当消息为自动确认模式时,可以根据重试次数进入死信;

2.重试次数等只是针对于消息接收端;

一.支持重试次数,但是不支持自动进入死信的配置:

spring:
  rabbitmq:
    addresses: 192.168.134.132:5672
    username: admin
    password: 123456
      #虚拟主机地址
    virtual-host: /
    publisher-confirms: true
    publisher-returns: true
    template:
      mandatory: true
    listener:
      simple:
        acknowledge-mode: manual
        retry:
          max-attempts: 5 #最大重试次数
          initial-interval: 1000ms #重试间隔时间(单位毫秒)
          enabled: true #是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
        default-requeue-rejected: false   #重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列)
@Service
public class TestSender {
<span class="hljs-type">Logger</span> <span class="hljs-variable">log</span> <span class="hljs-operator">=</span> LoggerFactory.getLogger(TestSender.class);

<span class="hljs-meta">@Autowired</span>
<span class="hljs-keyword">private</span> AmqpTemplate template;
<span class="hljs-meta">@Autowired</span>
RabbitTemplate rabbitTemplate;

<span class="hljs-keyword">final</span> RabbitTemplate.<span class="hljs-type">ConfirmCallback</span> <span class="hljs-variable">confirmCallback</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">RabbitTemplate</span>.ConfirmCallback() {
    <span class="hljs-meta">@Override</span>
    <span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title function_">confirm</span><span class="hljs-params">(CorrelationData correlationData, <span class="hljs-type">boolean</span> ack, String cause)</span> {
        <span class="hljs-keyword">if</span> (ack) {
            log.info(<span class="hljs-string">"消息发送成功:correlationData({}),ack({}),cause({})"</span>, correlationData, ack, cause);
        } <span class="hljs-keyword">else</span> {
            log.info(<span class="hljs-string">"消息发送失败:correlationData({}),ack({}),cause({})"</span>, correlationData, ack, cause);
        }
    }
};


<span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title function_">testSender</span><span class="hljs-params">(String message)</span> {
    rabbitTemplate.setConfirmCallback(confirmCallback);

    rabbitTemplate.convertAndSend(RabbitConfig.ORDER_EXCHANGE, RabbitConfig.ORDER_KEY,
            message, <span class="hljs-keyword">new</span> <span class="hljs-title class_">CorrelationData</span>(UUID.randomUUID().toString()));
}

}

@Component
public class TestReceiver {
    @Autowired
    private TestSender testSender;

    @RabbitHandler
    @RabbitListener(queues = RabbitConfig.ORDER_QUEUE)
    public void processTopicMsg(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        System.out.println("receive: " + msg + "《线程名:》" + Thread.currentThread().getName() + "《线程id:》" + Thread.currentThread().getId());
        if (msg.equals("fail")) {
            throw new Exception();
        }
        if (msg.equals("retry")) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            return;
        }
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    }
}

@Configuration
public class RabbitConfig {

    public final static String DEAD_EXCHANGE = "dead_exchange";
    public final static String DEAD_QUEUE = "dead_queue";
    public final static String DEAD_KEY = "dead_key";


    public final static String ORDER_KEY = "order_key";
    public final static String ORDER_EXCHANGE = "order_exchange";
    public final static String ORDER_QUEUE = "order_queue";

    @Bean
    public Queue maintainQueue() {
        Map<String, Object> args = new HashMap<>();
        // 设置该Queue的死信的信箱
        args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 设置死信routingKey
        args.put("x-dead-letter-routing-key", DEAD_KEY);
        return new Queue(ORDER_QUEUE, true, false, false, args);

    }

    @Bean
    public TopicExchange maintainExchange() {
        return new TopicExchange(ORDER_EXCHANGE, true, false);
    }


    @Bean
    public Binding maintainBinding() {
        return BindingBuilder.bind(maintainQueue()).to(maintainExchange())
                .with(ORDER_KEY);
    }

    @Bean
    public Queue deadLetterQueue() {
        return new Queue(DEAD_QUEUE);
    }

    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_EXCHANGE, true, false);
    }

    @Bean
    public Binding deadLetterBindding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DEAD_KEY);
    }

二.支持自动进入死信:

spring:
  rabbitmq:
    addresses: 192.168.134.132:5672
    username: admin
    password: 123456
      #虚拟主机地址
    virtual-host: /
    publisher-confirms: true
    publisher-returns: true
    template:
      //关闭手动确认,这里可以选择true,但是下面监听设置为auto,也会重试次数到达后放入死信
      mandatory: false
    listener:
      simple:
       //关闭手动确认
        acknowledge-mode: auto
        retry:
          max-attempts: 5 #最大重试次数
          initial-interval: 1000ms #重试间隔时间(单位毫秒)
          enabled: true #是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
        default-requeue-rejected: false   #重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列)

(1)当设置为自动确认消息时,会根据重试次数然后自动进入死信;

(2)当设置为手动确认消息时,则需要根据重试后的报错进行手动移入死信队列,设置为不重新移入队列,放入死信中

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

(3)当关闭重试次数设置时,如果报错,则直接进入死信队列

(4)接收端只有抛出异常后,才算是没有接收成功;如果不处理,并且接收端设置为自动确认,则算是成功接收;

三.利用死信进行定时消息

首先创建一个队列和交换机绑定,并且设置绑定死信,然后不写消费端去接收消息.在发送消息时指定过期时间,或者直接将队列设置过期消息;并且尝试发现,设置消息为手动确认后,定时消息仍然可以到时间进入到死信队列;

 MessageProperties properties = new MessageProperties();
        properties.setExpiration("10000");
        properties.setMessageId(UUID.randomUUID().toString());
        Message m = new Message(message.getBytes("utf-8"), properties);
        rabbitTemplate.send("timeExchange", "timekey",
                m, new CorrelationData(UUID.randomUUID().toString()));

四.事务消息

(1)需要关闭ack事务确认机制:

publisher-confirms: false

开启方式:

//设置开启事务
rabbitTemplate .setChannelTransacted(true);

声明事务管理器:

@Bean
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }

使用声明式事务注解:

    @Transactional(rollbackFor = Exception.class)
    public void testSender(String message) throws Exception {
        rabbitTemplate.convertAndSend(RabbitConfig.ORDER_EXCHANGE, RabbitConfig.ORDER_KEY,
                message, new CorrelationData(UUID.randomUUID().toString()));
        throw new Exception();
    }

五.配置参数

@RabbitListener注解中指明binding信息,就能自动创建queueexchange并建立binding关系。
(1)在2.0版本之后,可以指定多个routingkeykey={"ord","con"}
(2)exchange属性中,可以使用type = ExchangeTypes.DIRECT指定不同类型的交换机。
(3)arguments属性,可以用于指定headers类型的exchange。arguments = @Argument(name = "x-message-ttl", value = "10000", type= "java.lang.Integer")),
(4)queue属性中exclusive排他队列,只对创建这个queueConnection可见,Connection关闭,那么这个queue删除。
(5)queue属性中的autoDelete,若是这个consumer下线,那么这个queue队列将会删除。
bindings注意事项:
1. 对于(4)(5)这两种情况,durable=true队列持久化是不起作用的。
2. 注意不能和queues属性同时使用。
3. 特别注意:如果注解声明的queuexchange以及binging关系都存在的情况下,但是我们在bindings属性中又进行配置,那么bindings新增或者修改的参数都不会生效。但是queue存在,exchange存在但是没有binding,那么应用程序启动后,会自动创建binding关系。


处理监听端异常:

  @RabbitHandler
    @RabbitListener(queues = RabbitConfig.ORDER_QUEUE,errorHandler = "rabbitListenerErrorHandler")
@Bean
    public RabbitListenerErrorHandler rabbitListenerErrorHandler(){
        return new RabbitListenerErrorHandler() {
            @Override
            public Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message, ListenerExecutionFailedException exception) throws Exception {
                System.out.println("-------------------------------------"+message);
                throw exception;
            }
        };
    }

文章正在审核中... - 简书