消息队列
消息队列,即 Message Queue(MQ),是一种应用程序对应用程序的通信方法,。应用程序通过读写出入队列的消息来通信,而无须专用连接来连接它们。
消息队列是是典型的生产者、消费者模型。生产者不断生成消息添加到队列中,消费者不断地从队列中获取消息。因为消息的生产和消费都是异步的,并且消息队列只关注消息的发送与接收,并没有业务逻辑的侵入,这样就实现了生产者和消费者之间的解耦。
消息传递是指程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信。
使用消息队列的好处就是将一些无需即时返回且耗时的操作提取出来,进行了异步处理,大大节省了服务器的请求响应时间,从而提高了系统的吞吐量。
为什么使用消息队列
主要在三个方面:异步、解耦、削峰
异步处理
例如,用户注册模块,需要发送注册邮件和注册短信,那么传统的方式有:
- 串行的方式:将注册信息直接写入数据库后,然后先发送注册邮件,再发送注册短信,以上三个任务都完成了,才返回给客户端;这种方式会让用户一直等待,假如每个阶段要消耗 50ms,那么用户就需要等待 150ms;
- 并行的方式:将注册的信息写入数据库后,同时发送邮件和短信,这样就让邮件和短信的阶段并行操作,节省了时间,用户只需要等待 100ms;
然而,如果使用消息队列,就更能够高效地处理。引入消息队列后,可以把发送邮件、短信等操作当作不是必须的业务逻辑来异步处理。假设,写入消息队列的时间是 5ms,那么用户总共只需要等待写入数据库的时间加上写入消息队列的时间,总共是 55ms。
应用解耦
例如,对于一个购物系统而言,用户下单后,订单系统需要通知库存系统,那么最普通的做法就是「订单系统直接调用库存系统的接口去通知并更改」。
这种做法的缺点就是:
- 当库存系统出现故障时,那么订单会失败
- 订单系统和库存系统直接联系过于紧密,高度耦合
那么,如果引入消息队列呢?
用户下单后,订单系统完成持久化处理后,将消息写入消息队列中,直接返回用户订单下单成功;库存系统通过订阅订单系统的信息,获取下单消息,进行库存管理的操作。
这样,即使库存系统出现了故障,消息队列里存储的消息至少保证了消息的可靠传递,不会导致消息丢失。
流量削峰
例如,一个系统每天大部分时间的请求只有每秒 50 个,但是高峰期期间却会突增到每秒 10000 个请求,而系统最高只能处理 1000 个请求。所以,这样直接访问肯定会导致系统崩溃的。即低峰期无压力,高峰期扛不住。那么,如果使用了消息队列,把所有的请求都先写入消息队列,系统再从消息队列里慢慢拉取请求,只要拉取并处理的速度不超过系统自己能够处理的最大能力即可。
消息队列的缺点
- 系统的可用性降低了,因为引入的外部依赖越过,系统就越复杂
- 使用消息队列,需要保证消息不能重复消费,消息不能丢失,已经消息传递的顺序等等问题
- 当生产者生产消息并添加到消息队列中就会直接返回请求成功,但是必须要确保已添加到消息队列的消息不会堆积,处理也不会出现问题,不然数据的一致性就会出现问题
AMQP
AMQP,即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是为面向消息的中间件设计的。基于此协议的客户端与消息中间件可传递消息,并不受不同产品或者不同编程语言等条件的限制。
AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全等。
JMS
JMS(Java Message Service)是 Sun 公司最早提出的消息标准,是为了 Java 应用提供统一的消息操作,它与 AMQP 有以下的不同:
- JSM 定义了同一个接口;AMQP 是通过规定协议来统一数据交互的格式;
- JMS 只能在 Java 语言中使用;而 AMQP 只是协议,是跨语言的;
- JMS 规定了两种消息模型;而 AMQP 的消息模型则更加丰富;
常见的 AMQP
- ActiveMQ:基于 JMS
- RabbitMQ:基于 AMQP 协议,稳定性好
- RocketMQ:基于 JMS,是阿里开发的,由 Apache 维护
- Kafka:分布式的消息系统,高吞吐量
消息队列的应用场景
一个大型的软件系统,会有很多的组件或者模块或者子系统,如果将这些模块进行通信呢?传统的 IPC 是很多都在单一系统上,模块耦合性很大,不适合拓展;如果使用 Socket 进行通信,那么又需要考虑到以下一些问题:
- 信息的发送者和接收者如何维持这个连接,如果一方连接中断,丢失的数据怎么办
- 如何降低发送者和接收者的耦合度
- 如何让 Priority 高的接收者更先接受到数据
- 如果做到负载均衡?
- 如何做到可拓展,甚至可以将该通信模块发送到集群 cluster 上?
- 如何保证接收者接受到了完整、正确的数据
AMQP 协议就解决了以上问题,而 RabbitMQ 就是基于 AMQP 实现的。
RabbitMQ 简介
RabbitMQ is the most widely deployed open source message broker.
RabbitMQ 就是在 AMQP 的基础上实现的,可复用的企业消息系统。它可以用于大型软件系统各个模块之间的高效通信,支持高并发,支持可拓展。
常用命令
- 启动 rabbitmq:
rabbitmq-service start
- 关闭 rabbitmq:
rabbitmq-service stop
- 启动监控管理器:
rabbitmq-plugins enable rabbitmq_management
- 关闭监控管理器:
rabbitmq-plugins disable rabbitmq_management
- 关闭应用:
rabbitmqctl stop_app
- 启动应用:
rabbitmqctl start_app
概念介绍
- Broker:消息队列服务器实体
- Exchange:消息交换机,指定消息按什么原则,路由由哪个队列(用于转发消息,但是不会被存储)
- Queue:消息队列载体,每个消息都会被投入到一个或者多个队列中
- Binding:将 exchange 和 queue 按照路由的规则绑定起来
- Rounting Key:路由的关键字,exchange 就是根据这个关键字进行消息投递
- vhost:虚拟主机,一个 broker 里可以开启多个 vhost,用作不同用户的权限分离
- producer:消息生产者,就是投递消息的程序
- consumer:消息消费者,就是接受消息的程序
- channel:消息通道,在客户端的每个连接里,可以建立多个 channel,每个 channel 代表一个任务
四种交换机 Exchange
Direct Exchange
其行为是「先匹配,再投送」。绑定时会设定一个 rounting key,只有消息的 rounting key 匹配时,才会被交换机投送到绑定的队列中去。这是 RabbitMQ 默认的交换机模式,也是最简单的模式,是根据 key 全文匹配去寻找队列。
Topic Exchange
按规则转发消息主要是根据通配符。在这种交换机下,队列和交换机的绑定会定义一种路由模式。通配符需要在这种路由模式和路由键之间匹配后,交换机才能转发消息。
路由键必须是一串字符,用句号(.)隔开。
路由模式必须包含一个星号(*),主要是用于匹配路由键指定位置的一个单词,(#)表示相当于一个或者多个单词
Headers Exchange
设置 header attribute 参数类型的交换机,headers 是一个自定义匹配规则的类型。在队列与交换器绑定时,会设定一组键值对规则,消息中也包括了一组键值对属性,当有一对或者全部匹配时,消息就会被投送到对应的队列中。
Fanout Exchange
转发消息到所有绑定队列中,消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列。
使用流程
消息在 Producer 中产生,发送到消息队列的 Exchange 上,Exchange 根据配置的路由方式 Rounting Key,发送到并绑定 Queue。Queue 将消息通过 push 或者 pull 的方式传递给 Consumer。
- 客户端连接到消息队列的服务器,开启一个 channel
- 客户端声明一个 exchange,并设置相关属性
- 客户端声明一个 queue,并设置相关属性
- 客户端使用 rounting key,在 exchange 和 queue 之间建立好绑定关系
- 客户端投递信息到 exchange
- exchange 收到信息后,根据 rounting key 和 binding 关系,将消息传递到 queue 中