不断的学习,我们才能不断的前进
一个好的程序员是那种过单行线马路都要往两边看的人

消息队列之 RabbitMQ

RabbitMQ 消息队列默认采用FIFO的消息,如果需要提前处理某些消息就可以设置成优先级队列,根据优先级消费消息。
RabbitMQ 的消息是可以进行持久化的,对于交换机(exchange)与队列(queue)的持久化只需要将durable属性设置为true即可,当重启RabbitMQ服务后,交换机和队列都会恢复,但是当只有队列的durable属性设置为true时,重启后会造成消息丢失。
专业术语

  • 通道(Channel):一个管道连接,是tcp连接内的连接(broker),使用现有的TCP连接进行数据传输;
  • 交换器(Exchange):消息路由,生产者发送的消息并不是直接发送到队列中而是先到指定的路由中,然后由路由根据路由key绑定的队列发送到指定队列中;
  • 绑定(Binding):建立路由和队列容器的绑定关系;
  • 消息(Message):生产者和消费者需要的消息数据;
  • 连接(Connection):一个tcp连接;
  • 生产(Producing):发送消息的程序就是生产者(Producer),用P表示
  • 队列(Queue):消息在RabbitMQ和应用程序之间传递,但他们也能被存在队列中。队列没有大小限制,你可以存储任意多的消息进队列到无限缓存中,可以多个生产者发送消息到同一队列,也可以多个消费者从一个队列接收消息。
  • 消费(Consuming):等待接收消息的程序。

RabiitMQ官网教程

Docker 安装RabbitMQ

# 获取rabbitmq景象
$ docker pull rabbitmq:management

# 创建并运行容器
# --di分别表示创建并运行容器	--hostname指定主机容器名	--name指定容器名称  -p将rabbitmq端口映射到本地 并设置用户名和密码
$ docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5671:5671 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

docker logs -f myrabbit # 查看日志
docker ps -a # 查看容器信息
docker start id # 输入容器id, 启动容器

http://localhost:15672/

RabbitMQ 消息确认机制

消息确认机制是为了保证消息不丢失,可靠抵达,可以使用AMQP事务消息,但是性能下降严重,所以使用消息确认机制。

  • 生产者生产消息要保证成功发送到broker;发送成功后消息队列,调用confirmCallback 确认模式。
  • broker收到消息后,要把消息投递到消息队列里面:returnCallback ,消息未投递到queue 就会调用退回模式。
  • 消费者消费消息队列里面的消息后,要通知消息队列,成功的消费消息:ack应答机制

发送端的消息确认

生产者生产消息到broker 的消息确认机制 和 broker 投递消息到消息队列的确认机制
yml 配置文件

# 修改yml文件 开启消息确认
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: localhost
    port: 5672 # 端口必须开放

    # 使用自定义 RabbitTemplate 的setReturnCallback、setConfirmCallback
    publisher-confirm-type: correlated  # 开启服务器接收到消息的确认回调
    publisher-returns: true # 开启服务端消息 抵达队列的确认,消息没有投递到队列,会调用这个回调
    template:
      mandatory: true # 只要抵达队列,以异步的方式优先回调returnCallback

Java Demo 代码

@Configuration
public class DirectRabbitMQConfig {

	@Resource
	private AmqpAdmin amqpAdmin;// 用来创建交换机、队列、绑定关系

	@Resource
	private RabbitTemplate rabbitTemplate; // 用来接收和发送消息

	@PostConstruct
	public void initRabbitTemplate(){
		rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
			/**
			 * 消息成功发送到broker服务器会触发这个回调
			 * @param correlationData 消息的ID,在发送消息的时候 需要传入,关联数据库之后就可以知道哪些消息没有到达
			 * @param ack :true 表示成功,false表示失败
			 * @param cause
			 */
			@Override
			public void confirm(CorrelationData correlationData, boolean ack, String cause) {
				System.out.println("消息发送到broker成功");
			}
		});

		// 消息抵达队列的确认回调
		rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
			/**
			 * 失败回调,消息没有投递给指定的队列
			 * @param message 投递失败的消息
			 * @param replyCode 回复的状态码
			 * @param replyText 回复的文本内
			 * @param exchange  消息发送的交换机
			 * @param routingKey 消息发送时指定的路由
			 */
			@Override
			public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
				System.out.println("消息投递给队列失败"+message);
			}
		});
  }
}

消费端的消息确认

消费端默认是自动确认的(只要消费者连上消息队列会全部自动回复),只要消息接收到,服务端就会移除消息。存在问题的:如果客户端收到了很多消息,但是只处理了一个消息就宕机了,剩下的消息都会被确认,服务端会删除所有的消息,所以应该确定一个删除一个。手动ACK确认模式,只要不回复确认,消息就不会被删除.

修改yml 配置文件,开启手动确认

spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: localhost
    port: 5672 # 端口必须开放
	
    # 开启手动ACK确认,只要不回复确认,消息就不会被删除
    listener:
      simple:
        retry:// 消息重试机制
          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间隔时间(毫秒)
          initial-interval: 3000
        # 开启手动ack
        acknowledge-mode: manual

Java事例代码

@Service
@RabbitListener(queues = {"duanxin.fanout.queue"})
public class DuanxinConsumer {

	@RabbitHandler
	public void receiveMessage(Message message, Channel channel) {
		System.out.println("duanxin fanout 接收到了订单消息是: ->"+ message);
		
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

		try {
			// false 表示单个签收,true 表示批量签收,如果这里出错会出现重复消费
			channel.basicAck(deliveryTag,false); // 向服务器回复,表示签收消息
      
      // // 表示拒绝签收,第一个multiple=false表示单个拒绝;true表示批量拒绝
			// // requeue=false,表示丢弃;true表示发回服务器,重新入队。
			// channel.basicNack(deliveryTag,false,false);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

RabbitMQ 角色权限分类

  • none

    • 不能访问management plugin
  • Management 查看自己相关结点信息

    • 列出自己可以通过AMQP登入的虚拟机
    • 查看自己的虚拟机结点 virtual hosts的queues、exchanges、binding信息
    • 查看和关闭自己的channels和connections
    • 查看有关自己的虚拟机结点virtual hosts的统计信息,包括其他用户在这个结点virtual hosts中的活动信息
  • Policymaker

    • 包含managerment所有权限
    • 查看和创建、删除自己的virtual hosts所属的policies和parameters信息
  • Monitoring

    • 包含managements的所有权限
    • 罗列出所有的virtual hosts,包括不能登陆的virtual hosts。
    • 查看其他用户的connections和channels信息
    • 查看结点级别的数据,如clustering和memory使用情况。
    • 查看所有的virtual hosts的全局统计信息
  • Administrator

    • 最高权限
    • 可以创建和删除virtual hosts
    • 可以查看和删除users
    • 查看逛街permissions

RabbitMq的六种模式

  • 简单模式:一个生产者一个消费者
  • 工作队列模式:一个生产者,几个消费者(一个任务只能被一个消费者消费);任务分发可以采用轮询(默认),公平分发(需要设置为自动应答)。
  • 发布订阅模式fanout:一次向许多消费者发送消息,类似于redis的发布订阅,向订阅某个频道的所有消费者发送消息
  • 路由模式direct:生产者发送消息给路由表,路由表根据设置的路由选择消费者; 在fanout模式上面增加了一个路由key
  • 主题模式topic:路由模式的路由,是采用的模糊匹配
  • RPC模式:

连接和消费的过程

生产者生产消息的流程

  1. 建立链接(Connection)
  2. 在链接(Connection)上开启一个信道(Channel)
  3. 声明一个交换机(Exchange)
  4. 声明一个队列(Queue)
  5. 使用路由键(RoutingKey)将队列(Queue)和交换机(Exchange)绑定起来
  6. 根据路由键(RoutingKey)发送消息到交换机(Exchange)
  7. rabbitMQ 根据交换机(Exchange)和路由键(RoutingKey),将消息或存放到队列(Queue),或丢弃,或退回给生产者
  8. 关闭信道(Channel)
  9. 关闭链接(Connection

image-20210518140815110

消费者消费消息

  1. 建立链接(Connection)
  2. 在链接(Connection)上开启一个信道(Channel)
  3. 请求消费指定队列(Queue)的消息,并设置回调函数(onMessage)
  4. RabbitMQ将消息推送给消费者
  5. 消费者发送消息确定(Ack)
  6. RabbitMQ删除被确认的消息
  7. 关闭信道(Channel)
  8. 关闭链接(Connection)

image-20210518140731942

要保证消费者正确的消费消息,通过ack应答来确定的

为什么消息中间件不直接使用Http协议?

常用的消息中间件协议有OpenWire、AMQP、MQTT、Kafka、OpenMessage协议。

  • AMQP高级消息队列协议:支持分布式事务、消息的持久化、高性能和高可靠的消息处理优势
  • MQTT:优点轻量、结构简单、传输快;没有持久化
  • OpenMessage:结构简单、解析速度快、支持事务和持久化设计
  • Kafka是基于TCP/IP的二进制协议,消息内部是通过长度来进行分割;特点是结构简单、解析速度快、无事务支持、有持久化
  1. 因为Http请求报文头和响应报文头是比较复杂的,包含了Cookie和加密解密,状态码,响应码等内容,但是对于一个消息而言,不需要这些功能,只需要负责消息的传递、存储、分发就行,追求的是高性能。

  2. 大部分Http协议都是短链接,在实 际交互过程中,一个请求到获取响应很有可能会中断,中断以后就不会进行持久化,就会造成请求的丢失,而消息中间件可能是一个长期获取消息的过程,出现问题和故障要对数据或消息进行持久化,目的是为了保证消息和数据的高可靠性和稳健的运行。

消息分发策略的机制

发布订阅、轮询分发、公平分发、重发、消息拉取

为什么rabbitMQ基于channel而不是connection?

无论是生产者还是消费者,都需要和 RabbitMQ Broker 建立连接,这个连接就是一条 TCP 连接,也就是 Connection。一旦 TCP 连接建立起来,客户端紧接着可以创建一个 AMQP 信道(Channel),每个信道都会被指派一个唯一的 ID。信道是建立在 Connection 之上的虚拟连接,RabbitMQ 处理的每条 AMQP 指令都是通过信道完成的。

如果不用信道,那应用程序就会用TCP连接到Rabbit服务器上,高峰时期大量的连接就会造成资源的浪费,而且底层的操作系统每秒处理Tcp的连接数量也是有限的

因为connection连接是基于tcp连接,需要三次握手和四次挥手(三次握手、四次挥手开销比较大),如果多个线程都需要和rabbitmq进行通信的话,就需要建立和销毁多个tcp连接,并且会花费很昂贵的开销,所以采用类似NIO的操作, 选择使用Channel来使得 TCP 连接复用,不仅可以减少性能开销,同时也便于管理。

为什么选择使用rabbitMQ消息队列?

最开始的时候业务是采用的单体结构,就是所有的业务逻辑都是堆砌在一起的,学习消息队列之后,发现一些业务可以进行拆分,最后选择的是rabbitMQ,因为RabbitMQ是异步的、分发的、多线程的机制,可以提升网站的性能,使得业务之间可以进行解藕,让业务变得更加稳健。就比如发送邮件功能,原来的逻辑是用户评论之后,就发送邮件,最后返回结果,如果中途有异常就会进行回滚。现在只要用户评论之后写入数据库就行,发送邮件功能可以由消息队列后面进行异步处理。应用场景:异步解藕、高内聚低耦合、流量削峰、日志监控。

RabbitMQ 生产者和消费者出现问题会怎么办?

消息的幂等性,避免重复消费消息?

消息队列的幂等性:一个请求(一条消息),不管重复来多少次,结果是不会改变的。就是避免消息被重复消费,比如在消息读写过程中,遇到意外kill进程或重启,导致consumer有些消息处理了,但没来得及提交offset,重启后少数消息会再被消费一次。解决方案有:

  • 使用redis的进行写入,因为set具有天然幂等性
  • 主键或者唯一的id,可以在消费前进行查询一下,是否被消费过
  • 给消息加上全局的唯一ID
  • 数据库的唯一字段,因为唯一字段,下次在插入同样的数据就会报错

RabbitMQ如何保证消息队列的顺序性

把消息都放到一个队列里面,然后保证一个消费者消费消息即可

RabbitMQ 消息积压的问题如何解决?

消息积压一般是消费者出故障或者消费慢导致的。

  1. 先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉;
  2. 新建⼀个topic,partition是原来的10倍,临时建⽴好原先10倍或者20倍的queue数量;
  3. 然后写⼀个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据;消费之后不做耗时的处理,直接均匀轮询写⼊临时建⽴好的10倍数量的queue;
  4. 接着临时征⽤10倍的机器来部署consumer,每⼀批consumer消费⼀个临时queue的数据;
  5. 这种做法相当于是临时将queue资源和consumer资源扩⼤10倍,以正常的10倍速度来消费数据;
  6. 等快速消费完积压数据之后,得恢复原先部署架构,重新⽤原先的consumer机器来消费消息。

RabbitMQ如何保证数据⼀致性?

  1. ⽣产者确认机制: 消息持久化后异步回调通知⽣产者,保证消息已经发出去;
  2. 消息持久化: 设置消息持久化;
  3. 消费者确认机制: 消费者成功消费消息之后,⼿动确认,保证消息已经消费。

目录