注意:
1.当使用死信时,目前测试结果是,只有当消息为自动确认模式时,可以根据重试次数进入死信;
2.重试次数等只是针对于消息接收端;
一.支持重试次数,但是不支持自动进入死信的配置:
二.支持自动进入死信:
(1)当设置为自动确认消息时,会根据重试次数然后自动进入死信;
(2)当设置为手动确认消息时,则需要根据重试后的报错进行手动移入死信队列,设置为不重新移入队列,放入死信中
(3)当关闭重试次数设置时,如果报错,则直接进入死信队列
(4)接收端只有抛出异常后,才算是没有接收成功;如果不处理,并且接收端设置为自动确认,则算是成功接收;
三.利用死信进行定时消息
首先创建一个队列和交换机绑定,并且设置绑定死信,然后不写消费端去接收消息.在发送消息时指定过期时间,或者直接将队列设置过期消息;并且尝试发现,设置消息为手动确认后,定时消息仍然可以到时间进入到死信队列;
四.事务消息
(1)需要关闭ack事务确认机制:
开启方式:
声明事务管理器:
使用声明式事务注解:
五.配置参数
- NONE:无应答,rabbitmq默认consumer正确处理所有请求。
- AUTO:consumer自动应答,处理成功(注意:此处的成功确认是没有发生异常)发出ack,处理失败发出nack。rabbitmq发出消息后会等待consumer端应答,只有收到
ack
确定信息后才会将消息在rabbitmq清除掉。收到nack
异常信息的处理方法由setDefaultRequeueReject()
方法设置,这种模式下,发送错误的消息可以恢复。 - MANUAL:基本等同于AUTO模式,区别是需要人为调用方法确认。
factory.setConcurrentConsumers(1);
每个MessageListenerContainer将会创建的Consumer的最小数量,默认是1个。factory.setMaxConcurrentConsumers(1);
设置每个MessageListenerContainer将会创建的Consumer的最大数量,默认等于最小数量。factory.setPrefetchCount(1);
设置每次请求发送给每个Consumer的消息数量。factory.setChannelTransacted(false);
设置Channel的事务。factory.setTxSize(1);
设置事务当中可以处理的消息数量。factory.setDefaultRequeueRejected(true);
设置rabbitmq收到nack/reject
消息时的处理方式,true
,重新放回到queue头部,设置为false
丢弃。factory.setErrorHandler();
实现ErrorHandler接口设置进去,所有未catch的异常都会由ErrorHandler处理。
@RabbitListener注解中指明binding
信息,就能自动创建queue
、exchange
并建立binding
关系。
(1)在2.0版本之后,可以指定多个routingkey
即key={"ord","con"}
。
(2)exchange属性中,可以使用type = ExchangeTypes.DIRECT
指定不同类型的交换机。
(3)arguments属性,可以用于指定headers类型的exchange。arguments = @Argument(name = "x-message-ttl", value = "10000", type= "java.lang.Integer")),
(4)queue属性中exclusive
,排他队列,只对创建这个queue
的Connection
可见,Connection
关闭,那么这个queue
删除。
(5)queue属性中的autoDelete
,若是这个consumer
下线,那么这个queue
队列将会删除。
bindings注意事项:
1. 对于(4)(5)这两种情况,durable=true
队列持久化是不起作用的。
2. 注意不能和queues
属性同时使用。
3. 特别注意:如果注解声明的queue
和xchange
以及binging
关系都存在的情况下,但是我们在bindings
属性中又进行配置,那么bindings
新增或者修改的参数都不会生效。但是queue
存在,exchange
存在但是没有binding
,那么应用程序启动后,会自动创建binding
关系。
处理监听端异常:
本文由 GY 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为:
2021/11/19 15:24