-
什么叫消息队列?
- 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
- 消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
-
为什么要用消息队列?
- 主要原因是由于在高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达MySQL,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。
-
RabbitMQ的特点
- 可靠性(Reliability)
- RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
- 灵活的路由(Flexible Routing)
- 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
- 消息集群(Clustering)
- 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
- 高可用(Highly Available Queues)
- 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
- 多种协议(Multi-protocol)
- RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
- 多语言客户端(Many Clients)
- RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
- 管理界面(Management UI)
- RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
- 跟踪机制(Tracing)
- 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
- 插件机制(Plugin System)
- RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
- 可靠性(Reliability)
-
概念的东西听的玄之又玄,好了,接下来我们从最简单的模式来开始我们的RabbitMQ学习之旅吧
简单队列模式(点对点模式)
-
新建个maven项目,大概目录结构如下
-
pom导入jar
-
<!--指定 jdk 编译版本--> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <!--rabbitmq 依赖客户端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!--操作文件流的一个依赖--> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies> 复制代码
一、生产者
-
package com.dy.producer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 这是一个测试的生产者 *@author DingYongJun *@date 2021/8/1 */ public class DyProducerTest_01 { public static final String Queue_name = "dayu"; /** * 这里为了方便,我们使用main函数来测试 * 纯属看你个人选择 * @param args */ public static void main(String[] args) throws Exception{ //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("120.48.29.41"); factory.setUsername("admin"); factory.setPassword("111111"); //创建一个新连接 Connection connection = factory.newConnection(); //创建一个通道 channel 实现了自动 close 接口 自动关闭 不需要显示关闭 Channel channel = connection.createChannel(); /** * 生成一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 默认消息存储在内存中 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 */ channel.queueDeclare(Queue_name,false,false,false,null); String message="我是生产者,我告诉你一个好消息!"; /** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息 * 4.发送消息的消息体 */ channel.basicPublish("",Queue_name,null,message.getBytes()); System.out.println("消息发送完毕"); } } 复制代码
-
执行一下看看是否能成功将消息发到队列中
-
查看RabbitMQ管理页面
-
证明生产者建立成功并成功发送了消息。
-
二、消费者
-
package com.dy.consumer; import com.rabbitmq.client.*; /** * 这是一个测试的消费者 *@author DingYongJun *@date 2021/8/1 */ public class DyConsumerTest_01 { public static final String Queue_name = "dayu"; public static void main(String[] args) throws Exception{ //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("120.48.29.41"); factory.setUsername("admin"); factory.setPassword("111111"); //建立连接 Connection connection = factory.newConnection(); //建立通道 Channel channel = connection.createChannel(); System.out.println("我是消费者,我在等待接收消息!"); DeliverCallback deliverCallback = (String var1, Delivery var2)->{ String message= new String(var2.getBody()); System.out.println(message); }; CancelCallback cancelCallback = (String var1)->{ System.out.println("消息消费被中断"); }; /** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 * 3.消费者未成功消费的回调 */ channel.basicConsume(Queue_name,true,deliverCallback,cancelCallback); } } 复制代码
-
解释下,DeliverCallback和CancelCallback
-
看下需要哪些参数
-
看下DeliverCallback,这是典型的函数式接口,所以我们可以用lamda表达式来创建它的对象。
-
import java.io.IOException; @FunctionalInterface public interface DeliverCallback { void handle(String var1, Delivery var2) throws IOException; } 复制代码
-
-
CancelCallback同理
-
import java.io.IOException; @FunctionalInterface public interface CancelCallback { void handle(String var1) throws IOException; } 复制代码
-
-
-
执行消费者,看是否能拿到指定队列的消息。
- 成功消费消息。
-
查看RabbitMQ管理页面
- 消息为零,证明消息成功被消费掉了!
三、总结
-
最简单的工作队列,其中一个消息生产者,一个消息消费者,一个队列。也称为
点对点模式
。 -
生产者大致步骤
- 获取连接
- 创建通道
- 创建队列声明
- 发送消息
- 关闭队列
-
消费者大致步骤
- 获取连接
- 获取通道
- 监听队列
- 消费消息