事务机制
注:事务机制是确认生产者是否成功发送消息到交换机
RabbitMQ
客户端中与事务机制相关的方法有3个:channel.txSelect,channel.txCommit,channel.txRollback
。
channel.txSelect
用于开启事务;
channel.txCommit
用于提交事务;
channel.txRollback
用于回滚事务。
在通过 channel.txSelect
方法开启事务之后,我们便可以发送消息给 RabbitMQ
了,如果事务提交成功,则消息一定到达了 RabbitMQ
中,如果在事务提交执行之前由于 RabbitMQ
异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行 channel.txRollback
方法来实现事务回滚。
提交事务
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
String exchange = "exchange-1";
String key = "key-1";
// 创建交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC,true);
//开启事务
channel.txSelect();
try{
// 发送消息到交换机
channel.basicPublish(exchange,key,null,"发送路由key为 = key-1 的消息".getBytes());
//提交事务
channel.txCommit();
System.out.println("发送成功");
}catch (Exception e){
System.out.println("发送失败,进行日志记录");
//回滚事务
channel.txRollback();
}
}
复制代码
运行 main
方法,输出 发送成功
。这是由于交换机已经存在了。 根据上图可以看出开启事务机制与未开启事务机制(直接发送)多了四个步骤:
- 1、客户端发送 Tx.Select ,将信道置为事务模式。
- 2、 Broker 回复 Tx.Select-Ok ,确认己将信道置为事务模式。
- 3、在发送完消息之后,客户端发送 Tx.Commit 提交事务。
- 4、 Broker回复 Tx.Commit.Ok ,确认事务提交。
事务回滚
下面来看一下事务回滚,上代码。将 exchange
的值修改为 exchange-122
,并且将创建交换机的代码注释。
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
String exchange = "exchange-122";
String key = "key-1";
// 创建交换机
//channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC,true);
//开启事务
channel.txSelect();
try{
// 发送消息到交换机
channel.basicPublish(exchange,key,null,"发送路由key为 = key-1 的消息".getBytes());
//提交事务
channel.txCommit();
System.out.println("发送成功");
}catch (Exception e){
System.out.println("发送失败,进行日志记录");
//回滚事务
channel.txRollback();
}
}
复制代码
运行 main
方法,输出结果:发送失败,进行日志记录
。 流程步骤为:
- 1、客户端发送 Tx.Select ,将信道置为事务模式。
- 2、Broker 回复 Tx.Select-Ok ,确认己将信道置为事务模式。
- 3、在发送完消息之后,发现异常,客户端发送 Tx.Rollback 回滚事务。
- 4、Broker 回复 Tx.Rollback.Ok ,确认事务回滚。
批量事务
如果要发送多条消息,则将 channel.basicPublish
,channel.txCommit
等方法包裹进循环内即可。
示例:
发送信息到 exchange-1
交换机,该交换机是已经存在了的,但是在发送消息之后发生了异常,这种也会进入回滚事务操作。
String exchange = "exchange-1";
//开启事务
channel.txSelect();
for (int a = 0; a < 10; a++) {
try{
channel.basicPublish(exchange,key,null,"发送路由key为 = key-1 的消息".getBytes());
int i = 1/0;
//提交事务
channel.txCommit();
System.out.println("发送成功");
}catch (Exception e){
System.out.println("发送失败,进行日志记录");
//回滚事务
channel.txRollback();
}
}
复制代码
事务确实能够解决消息发送方和 RabbitMQ
之间消息确认的问题,只有消息成功被RabbitMQ
接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务机制会降低RabbitMQ
的性能,那么有没有更好的方法既能保证消息发送方确认消息已经正确送达,又能基本上不带来性能上的损失呢? 下面就来介绍 RabbitMQ
提供另外一种方式:发送方确认机制。
发送方确认机制
注:发送方确认机制是确认生产者是否成功发送消息到交换机
原理
生产者通过调用channel.confirmSelect
方法将channel
设置成confirm
模式,一旦channel
进入confirm
模式,所有在该channel
上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所匹配的队列之后,broker
就会发送一个确认(Basic.Ack)
给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker
回传给生产者的确认消息中的deliver-tag
包含了确认消息的序号,此外broker
也可以设置basic.ack
的multiple
参数,表示到这个序号之前的所有消息都已经得到了处理。
事务机制在一条消息发送之后会使发送端阻塞,以等待 RabbitMQ
的回应,之后才能继续发送下一条消息。confirm
模式最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等channel
返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ
因为自身内部错误导致消息丢失,就会发送一条nack(Basic.Nack)
命令,生产者应用程序同样可以在回调方法中处理该nack
消息。
在channel
被设置成 confirm
模式之后,所有被发送的后续消息都将被 ack
或者被nack
一次。,不会出现一条消息既被 ack
又被 nack
情况,并且 RabbitMQ
没有对消息被 confirm
的快慢做任何保证。
原生api
普通confirm
每发送一条消息后就调用 channe.waitForConfirms
方法,等待服务端的确认,这实际上是一种串行同步等待的方式。和事务机制一样。也就是慢。
public static void main(String[] args) throws Exception {
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
String quequ = "queue-2";
String exchange = "exchange-2";
String key = "key-2";
//创建交换机
channel.exchangeDeclare(exchange,
BuiltinExchangeType.TOPIC, true);
//创建队列
channel.queueDeclare(quequ, true, false, false, null);
//队列与交换机绑定
channel.queueBind(quequ, exchange, key);
//将信道置为 publisher confirm 模式
channel.confirmSelect();
String message = "发送路由key为 = "+ key + "的消息";
channel.basicPublish(exchange,key,null,
message.getBytes());
boolean b = channel.waitForConfirms();
System.out.println("发送成功 = " + b);
}
结果:发送成功 = true
复制代码
将路由key 修改为:key-22121
,创建交换机、创建队列、队列与交换机绑定进行注释,观察结果是否成功。
public static void main(String[] args) throws Exception {
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
String quequ = "queue-2";
String exchange = "exchange-2";
String key = "key-22121";
//创建交换机
//channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
//创建队列
// channel.queueDeclare(quequ, true, false, false, null);
//队列与交换机绑定
// channel.queueBind(quequ, exchange, key);
//将信道置为 publisher confirm 模式
channel.confirmSelect();
String message = "发送路由key为 = "+ key + "的消息";
channel.basicPublish(exchange,key,null,message.getBytes());
boolean b = channel.waitForConfirms();
System.out.println("发送成功 = " + b);
}
结果:发送成功 = true
复制代码
可以看到发送结果是成功的。那再次修改代码,将 exchange 的值修改为exchange-2222
,其余代码不动,观察结果。
启动直接报错!
如果发送多条消息,只需要将 channel.basicPublish
、channel.waitForConfirms
方法包裹在循环里面即可。但还是每发送一条消息后就调用 channe.waitForConfirms
方法,等待服务端的确认。
channel.confirmSelect();
for (int i = 1; i < 10; i++) {
String message = "发送路由key为 = "+ key + "的消息";
channel.basicPublish(exchange,key,null,message.getBytes());
boolean b = channel.waitForConfirms();
System.out.println("发送成功" + b);
}
复制代码
批量confirm
每发送一批消息后,调用 channel.waitForConfirms
方法,等待服务器的确认返回(也是同步的,只是一次发送多条信息,然后统一确定)。
channel.confirmSelect();
for (int i = 1; i < 10; i++) {
String message = "发送路由key为 = "+ key + "的消息";
channel.basicPublish(exchange,key,null,message.getBytes());
}
//批量确认信息,发送的消息中,如果有失败的,不知道是哪一条失败了
boolean b = channel.waitForConfirms();
System.out.println("发送成功" + b);
复制代码
异步confirm
异步 confirm
方法的编程实现最为复杂,也是最高效的。在客户端 Channel
接口中提供的 addConfirmListener
方法可以添加 ConfirmListener
这个回调接口,这个 ConfirmListener
接口包含两个方法: handleAck
、handleNack
,分别用来处理 RabbitMQ
回传的 Basic.Ack
、Basic.Nack
。在这两个方法中都包含有两个参数 deliveryTag(标记消息的唯一有序序号)
、multiple(是否批量confirm true代表是)
String quequ = "queue-2";
String exchange = "exchange-2";
String key = "key-2";
//创建交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
//创建队列
channel.queueDeclare(quequ, true, false, false, null);
//队列与交换机绑定
channel.queueBind(quequ, exchange, key);
channel.confirmSelect();
final ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap();
// 添加一个异步确认的监听器
channel.addConfirmListener(new ConfirmListener() {
//参数一:deliveryTag: 消息的编号
//参数二:multiple:是否批量confirm true 是
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("map 数据:" + map.size());
if (multiple) {
//如果是批量确认 返回的是小于等于当前序列号的消息 是一个 map
ConcurrentNavigableMap<Long, String> confirmed =
map.headMap(deliveryTag, true);
//清除该部分未确认消息
confirmed.clear();
System.out.println("批量确认清楚 map 数据:" + map.size());
}else{
//只清除当前序列号的消息
map.remove(deliveryTag);
System.out.println("只清除当前序列号的消息 map 数据:" + map.size());
}
System.out.println("消息发送到交换机成功,deliveryTag: " + deliveryTag + ", multiple: " + multiple);
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送到交换机失败, deliveryTag: " + deliveryTag + ", multiple: " + multiple);
String message = map.get(deliveryTag);
System.out.println("消息发送到交换机失败,发布的消息:"+message+"未被确认,序列号为:"+deliveryTag);
//拿到了未确认的信息,可以进行其他逻辑,比如添加处理消息重发
}
});
for (int i = 1; i < 6; i++) {
String message = "发送路由key为 = "+ key + "的消息";
// channel.getNextPublishSeqNo()获取下一个消息的序列号
map.put(channel.getNextPublishSeqNo(),message);
channel.basicPublish(exchange,key,null,message.getBytes());
}
System.out.println("其他逻辑");
复制代码
总结
普通confirm:同步等待确认,简单,但吞吐量非常有限。
批量普通confirm:批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪条消息出现了问题。
异步普通confirm:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微有些麻烦。
Boot方式
在yml中配置是否需要消息确认
spring:
application:
name: info-config-boot
rabbitmq:
host: 47.105.198.54
port: 5672
virtual-host: /test-1
username: test
password: 123456
publisher-confirm-type: correlated
publisher-returns: true
复制代码
publisher-confirm-type
有三个选项:
- NONE:禁用发布确认模式,是默认值
- CORRELATED:发布消息成功到交换器后会触发回调方法
- SIMPLE:经测试有两种效果,其一效果和
CORRELATED
值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate
调用waitForConfirms
或waitForConfirmsOrDie
方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie
方法如果返回false
则会关闭channel
,则接下来无法发送消息到broker
。
编码
实现ConfirmCallback
@Component
public class InfoConfirm implements RabbitTemplate.ConfirmCallback {
Logger logger = LoggerFactory.getLogger(InfoConfirm.class);
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 需要给ConfirmCallback赋值 不然不会走回调方法,默认是null
*/
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
/**
* 此方法用于监听消息是否发送到交换机
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
logger.info("消息成功发送到交换机");
logger.info("id = {} ",correlationData.getId());
byte[] body = correlationData.getReturnedMessage().getBody();
logger.info("message = {}",new String(body));
}else {
logger.info("消息发送到交换机失败");
logger.info("cause = {}",cause);
logger.info("id = {} ",correlationData.getId());
byte[] body = correlationData.getReturnedMessage().getBody();
logger.info("message = {}",new String(body));
}
}
}
复制代码
实现接口 ConfirmCallback
,重写其confirm()
方法,方法内有三个参数correlationData
、ack
、cause
。
- correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
- ack:消息投递到broker 的状态,true表示成功。
- cause:表示投递失败的原因。
对外提供发送方法
@GetMapping("/send")
public void send(){
CorrelationData correlation = new CorrelationData("设置:" + UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("exchange-1","key-55","发送消息",correlation);
}