消息队列之 RabbitMQ
RabbitMQ 消息队列默认采用FIFO的消息,如果需要提前处理某些消息就可以设置成优先级队列,根据优先级消费消息。
RabbitMQ 的消息是可以进行持久化的,对于交换机(exchange)与队列(queue)的持久化只需要将durable属性设置为true即可,当重启RabbitMQ服务后,交换机和队列都会恢复,但是当只有队列的durable属性设置为true时,重启后会造成消息丢失。
专业术语
- 通道(Channel):一个管道连接,是tcp连接内的连接(broker),使用现有的TCP连接进行数据传输;
- 交换器(Exchange):消息路由,生产者发送的消息并不是直接发送到队列中而是先到指定的路由中,然后由路由根据路由key绑定的队列发送到指定队列中;
- 绑定(Binding):建立路由和队列容器的绑定关系;
- 消息(Message):生产者和消费者需要的消息数据;
- 连接(Connection):一个tcp连接;
- 生产(Producing):发送消息的程序就是生产者(Producer),用P表示
- 队列(Queue):消息在RabbitMQ和应用程序之间传递,但他们也能被存在队列中。队列没有大小限制,你可以存储任意多的消息进队列到无限缓存中,可以多个生产者发送消息到同一队列,也可以多个消费者从一个队列接收消息。
- 消费(Consuming):等待接收消息的程序。
Docker 安装RabbitMQ
# 获取rabbitmq景象
$ docker pull rabbitmq:management
# 创建并运行容器
# --di分别表示创建并运行容器 --hostname指定主机容器名 --name指定容器名称 -p将rabbitmq端口映射到本地 并设置用户名和密码
$ docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5671:5671 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
docker logs -f myrabbit # 查看日志
docker ps -a # 查看容器信息
docker start id # 输入容器id, 启动容器
http://localhost:15672/
RabbitMQ 消息确认机制
消息确认机制是为了保证消息不丢失,可靠抵达,可以使用AMQP事务消息,但是性能下降严重,所以使用消息确认机制。
生产者生产消息要保证成功发送到broker
;发送成功后消息队列,调用confirmCallback 确认模式。broker收到消息后,要把消息投递到消息队列里面
:returnCallback ,消息未投递到queue 就会调用退回模式。消费者消费消息队列里面的消息后,要通知消息队列,成功的消费消息
:ack应答机制
发送端的消息确认
生产者生产消息到broker 的消息确认机制 和 broker 投递消息到消息队列的确认机制
yml 配置文件
# 修改yml文件 开启消息确认
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: localhost
port: 5672 # 端口必须开放
# 使用自定义 RabbitTemplate 的setReturnCallback、setConfirmCallback
publisher-confirm-type: correlated # 开启服务器接收到消息的确认回调
publisher-returns: true # 开启服务端消息 抵达队列的确认,消息没有投递到队列,会调用这个回调
template:
mandatory: true # 只要抵达队列,以异步的方式优先回调returnCallback
Java Demo 代码
@Configuration
public class DirectRabbitMQConfig {
@Resource
private AmqpAdmin amqpAdmin;// 用来创建交换机、队列、绑定关系
@Resource
private RabbitTemplate rabbitTemplate; // 用来接收和发送消息
@PostConstruct
public void initRabbitTemplate(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 消息成功发送到broker服务器会触发这个回调
* @param correlationData 消息的ID,在发送消息的时候 需要传入,关联数据库之后就可以知道哪些消息没有到达
* @param ack :true 表示成功,false表示失败
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("消息发送到broker成功");
}
});
// 消息抵达队列的确认回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
/**
* 失败回调,消息没有投递给指定的队列
* @param message 投递失败的消息
* @param replyCode 回复的状态码
* @param replyText 回复的文本内
* @param exchange 消息发送的交换机
* @param routingKey 消息发送时指定的路由
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息投递给队列失败"+message);
}
});
}
}
消费端的消息确认
消费端默认是自动确认的
(只要消费者连上消息队列会全部自动回复),只要消息接收到,服务端就会移除消息。存在问题的
:如果客户端收到了很多消息,但是只处理了一个消息就宕机了,剩下的消息都会被确认,服务端会删除所有的消息,所以应该确定一个删除一个。手动ACK确认模式,只要不回复确认,消息就不会被删除
.
修改yml 配置文件,开启手动确认
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: localhost
port: 5672 # 端口必须开放
# 开启手动ACK确认,只要不回复确认,消息就不会被删除
listener:
simple:
retry:// 消息重试机制
# 开启消费者(程序出现异常)重试机制,默认开启并一直重试
enabled: true
# 最大重试次数
max-attempts: 5
# 重试间隔时间(毫秒)
initial-interval: 3000
# 开启手动ack
acknowledge-mode: manual
Java事例代码
@Service
@RabbitListener(queues = {"duanxin.fanout.queue"})
public class DuanxinConsumer {
@RabbitHandler
public void receiveMessage(Message message, Channel channel) {
System.out.println("duanxin fanout 接收到了订单消息是: ->"+ message);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// false 表示单个签收,true 表示批量签收,如果这里出错会出现重复消费
channel.basicAck(deliveryTag,false); // 向服务器回复,表示签收消息
// // 表示拒绝签收,第一个multiple=false表示单个拒绝;true表示批量拒绝
// // requeue=false,表示丢弃;true表示发回服务器,重新入队。
// channel.basicNack(deliveryTag,false,false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
RabbitMQ 角色权限分类
-
none
- 不能访问management plugin
-
Management 查看自己相关结点信息
- 列出自己可以通过AMQP登入的虚拟机
- 查看自己的虚拟机结点 virtual hosts的queues、exchanges、binding信息
- 查看和关闭自己的channels和connections
- 查看有关自己的虚拟机结点virtual hosts的统计信息,包括其他用户在这个结点virtual hosts中的活动信息
-
Policymaker
- 包含managerment所有权限
- 查看和创建、删除自己的virtual hosts所属的policies和parameters信息
-
Monitoring
- 包含managements的所有权限
- 罗列出所有的virtual hosts,包括不能登陆的virtual hosts。
- 查看其他用户的connections和channels信息
- 查看结点级别的数据,如clustering和memory使用情况。
- 查看所有的virtual hosts的全局统计信息
-
Administrator
- 最高权限
- 可以创建和删除virtual hosts
- 可以查看和删除users
- 查看逛街permissions
RabbitMq的六种模式
- 简单模式:一个生产者一个消费者
- 工作队列模式:一个生产者,几个消费者(一个任务只能被一个消费者消费);任务分发可以采用轮询(默认),公平分发(需要设置为自动应答)。
发布订阅模式fanout
:一次向许多消费者发送消息,类似于redis的发布订阅,向订阅某个频道的所有消费者发送消息路由模式direct
:生产者发送消息给路由表,路由表根据设置的路由选择消费者; 在fanout模式上面增加了一个路由key主题模式topic
:路由模式的路由,是采用的模糊匹配- RPC模式:
连接和消费的过程
生产者生产消息的流程
- 建立链接(Connection)
- 在链接(Connection)上开启一个信道(Channel)
- 声明一个交换机(Exchange)
- 声明一个队列(Queue)
- 使用路由键(RoutingKey)将队列(Queue)和交换机(Exchange)绑定起来
- 根据路由键(RoutingKey)发送消息到交换机(Exchange)
- rabbitMQ 根据交换机(Exchange)和路由键(RoutingKey),将消息或存放到队列(Queue),或丢弃,或退回给生产者
- 关闭信道(Channel)
- 关闭链接(Connection
消费者消费消息
- 建立链接(Connection)
- 在链接(Connection)上开启一个信道(Channel)
- 请求消费指定队列(Queue)的消息,并设置回调函数(onMessage)
- RabbitMQ将消息推送给消费者
- 消费者发送消息确定(Ack)
- RabbitMQ删除被确认的消息
- 关闭信道(Channel)
- 关闭链接(Connection)
要保证消费者正确的消费消息,通过ack应答来确定的
为什么消息中间件不直接使用Http协议?
常用的消息中间件协议有OpenWire、AMQP、MQTT、Kafka、OpenMessage协议。
AMQP
高级消息队列协议:支持分布式事务、消息的持久化、高性能和高可靠的消息处理优势- MQTT:优点轻量、结构简单、传输快;没有持久化
- OpenMessage:结构简单、解析速度快、支持事务和持久化设计
- Kafka是基于TCP/IP的二进制协议,消息内部是通过长度来进行分割;特点是结构简单、解析速度快、无事务支持、有持久化
-
因为Http请求报文头和响应报文头是比较复杂的,包含了Cookie和加密解密,状态码,响应码等内容,但是对于一个消息而言,不需要这些功能,只需要负责消息的传递、存储、分发就行,追求的是高性能。
-
大部分Http协议都是短链接,在实 际交互过程中,一个请求到获取响应很有可能会中断,中断以后就不会进行持久化,就会造成请求的丢失,而消息中间件可能是一个长期获取消息的过程,出现问题和故障要对数据或消息进行持久化,目的是为了保证消息和数据的高可靠性和稳健的运行。
消息分发策略的机制
发布订阅、轮询分发、公平分发、重发、消息拉取
为什么rabbitMQ基于channel而不是connection?
无论是生产者还是消费者,都需要和 RabbitMQ Broker 建立连接,这个连接就是一条 TCP 连接,也就是 Connection。一旦 TCP 连接建立起来,客户端紧接着可以创建一个 AMQP 信道(Channel)
,每个信道都会被指派一个唯一的 ID。信道是建立在 Connection 之上的虚拟连接,RabbitMQ 处理的每条 AMQP 指令都是通过信道完成的。
如果不用信道,那应用程序就会用TCP连接到Rabbit服务器上,高峰时期大量的连接就会造成资源的浪费,而且底层的操作系统每秒处理Tcp的连接数量也是有限的。
因为connection连接是基于tcp连接,需要三次握手和四次挥手(三次握手、四次挥手开销比较大),如果多个线程都需要和rabbitmq进行通信的话,就需要建立和销毁多个tcp连接,并且会花费很昂贵的开销
,所以采用类似NIO的操作, 选择使用Channel来使得 TCP 连接复用
,不仅可以减少性能开销,同时也便于管理。
为什么选择使用rabbitMQ消息队列?
最开始的时候业务是采用的单体结构,就是所有的业务逻辑都是堆砌在一起的,学习消息队列之后,发现一些业务可以进行拆分,最后选择的是rabbitMQ,因为RabbitMQ是异步的、分发的、多线程的机制,可以提升网站的性能,使得业务之间可以进行解藕,让业务变得更加稳健
。就比如发送邮件功能,原来的逻辑是用户评论之后,就发送邮件,最后返回结果,如果中途有异常就会进行回滚。现在只要用户评论之后写入数据库就行,发送邮件功能可以由消息队列后面进行异步处理。应用场景:异步解藕、高内聚低耦合、流量削峰、日志监控。
RabbitMQ 生产者和消费者出现问题会怎么办?
消息的幂等性,避免重复消费消息?
消息队列的幂等性:一个请求(一条消息),不管重复来多少次,结果是不会改变的。就是避免消息被重复消费,比如在消息读写过程中,遇到意外kill进程或重启,导致consumer有些消息处理了,但没来得及提交offset,重启后少数消息会再被消费一次。解决方案有:
- 使用redis的进行写入,因为set具有天然幂等性
- 主键或者唯一的id,可以在消费前进行查询一下,是否被消费过
- 给消息加上全局的唯一ID
- 数据库的唯一字段,因为唯一字段,下次在插入同样的数据就会报错
RabbitMQ如何保证消息队列的顺序性
把消息都放到一个队列里面,然后保证一个消费者消费消息即可
RabbitMQ 消息积压的问题如何解决?
消息积压一般是消费者出故障或者消费慢导致的。
- 先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉;
- 新建⼀个topic,partition是原来的10倍,临时建⽴好原先10倍或者20倍的queue数量;
- 然后写⼀个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据;消费之后不做耗时的处理,直接均匀轮询写⼊临时建⽴好的10倍数量的queue;
- 接着临时征⽤10倍的机器来部署consumer,每⼀批consumer消费⼀个临时queue的数据;
- 这种做法相当于是临时将queue资源和consumer资源扩⼤10倍,以正常的10倍速度来消费数据;
- 等快速消费完积压数据之后,得恢复原先部署架构,重新⽤原先的consumer机器来消费消息。
RabbitMQ如何保证数据⼀致性?
- ⽣产者确认机制: 消息持久化后异步回调通知⽣产者,保证消息已经发出去;
- 消息持久化: 设置消息持久化;
- 消费者确认机制: 消费者成功消费消息之后,⼿动确认,保证消息已经消费。