简介

mq是message queue简称,作为项目上常用的开源mq只有rabbitmq,rocketmq,kafka这三个。其中rabbitmq和kafka最为知名,rocketmq为阿里开源消息队列。
从性能上来说rabbitmq的qps在万数量级,rocketmq的qps在十万级,kafka的qps超百万级,当然三个mq的重量也是依次递增。

rabbitmq

RabbitMQ Tutorials

概览

image
rabbitmq是AMQP协议的标准实现,AMQP几乎就是如图这样定义mq的。MQTT
这里Connection可以认为是一个通道协议,Channel是通道协议中虚拟连接,Broker是消息中间件的一个实例,Virtual Host是虚拟主机,是逻辑上的隔离,隔离了账号权限和资源,Exchange是交换机负责消息接收和转发,Queue是消息缓冲队列,消息会被持久化到队列中,遵循FIFO先进先出。
生产者producer可以通过channel初始化队列queue,交换机exchange和绑定关系binding。
当producer通过channel投递消息到broker里的交换机时,交换机会根据路由键的绑定关系分发到对应的队列queue,然后下发给消费者consumer。
同队列下的消费者是竞争关系,只能消费一次该消息。

rabbitmq消息

消息积压:
rabbitmq通过消息的预取策略进行prefetch限制消费端一次拿取的消息数量。
消息过期:
消息本身可以设置存活时间TTL,但是如果消息在队列里过期,队列并不会对消息操作什么,而是等消息交付到消费者时,进行判断,过期则删除,当然等拿到这个消息时,可能过期很久了,只是没有被清除。整个队列可以设置一个过期时间,队列里的消息共享此过期时间,如果消息和队列同时设置过期时间,则以最少时间为准。
过期的消息会被放在死信队列。
消息幂等:
手动ack时消息id唯一,避免重复消费。
消息事务:
rabbitmq支持开启事务,但是官方不推荐开启事务,我们可以通过rabbitmq的可靠消费来达成最终一致性,如果开启事务会大大降低rabbitmq的性能。消费端本地事务由自己处理,发送端和消费端的事务组合相当于分布式事务,很显然这种分布式事务,rabbitmq可以达成期望,所以rabbitmq可做分布式事务的最终一致性。

rabbitmq交换机

RabbitMQ Tutorials
Exchange通过消息的路由键去匹配队列,交换机的种类很多,除了最基础的一对一(direct)和 广播(fanout)模式还有常用的topic交换机和不常用的head交换机(类似rocketmq的标签,这样就不会使用路由键了)和RPC。
direct交换机:
exchange和队列直接绑定,只有消息键和绑定键完全匹配才会转发到队列。
image-1748176266591
fanout交换机:
exchange无视消息键和绑定键关系,直接将消息广播给所有队列。
image-1748176286965
topic交换机:
exchange根据消息键和绑定键进行模糊匹配,使用*(代表任意一个单词)和#(代表0个或任意个单词)做匹配,举例 如果消息键是hello.nihao.hello或者lazy.lazy会被投递到Q2队列,如果是hello.orange.hello会被投递到Q1队列。
image-1748176310372
rpc:
image-1748176318238
rpc是双向同步消息,rabbitmq做了双向消息传递,那么这里的client和server既是消费者也是生产者。

rabbitmq的可靠消费

Reliability Guide
从rabbitmq可以看出,有四个节点可能导致消息丢失,分别是

  1. producer投递消息到broker这一段
  2. exchange把消息转发到queue这一段
  3. consumer拿到消息消费未完成,消费者宕机
  4. broker宕机
    针对第一个问题,rabbitmq提供了publisher-confirm,当生产者消息成功投递到broker时返回给生产者ack,否则返回nack。
    针对第二个问题,rabbitmq提供了publisher-return,当交换机转发消息到队列时返回ack,否则返回nack。
    针对第三个问题,rabbitmq提供了acknowledge-mode,配置manual手动ack,当消费者消费完成后,手动ack通知rabbitmq而不是一旦投递给消费者就自动ack删除消息。
    针对第四个问题,rabbitmq提供了高可用模式。

其他
优先级队列,延时队列,死信队列,高可用,监控,扩展插件。

rocketmq

为什么选择RocketMQhttps://rocketmq.apache.org/zh/docs/

概览

image-1748176489050
rocketmq和kafka天生就是高可用架构,阿里系列的开源框架都有个特点就是中心化架构,从此图可以看出,Producer生产者发送消息时会先从NameServer 中拿到Broker的相关地址信息,然后发送消息到Broker。而Broker初始化时就需要将自己的相关信息注册到NameServer,当Broker拿到Producer的消息后会本地主从复制备份,数据同步,然后将消息发送给Consumer消费者。
rocketmq通过Topic主题做数据隔离和权限隔离,Topic主题是逻辑属性, 消息是面向主题的,Topic主题支持多种消息类型(Normal,FIFO顺序消息,Delay定时/延时消息,transaction事务消息),当然实际上还是面向队列操作的,Topic主题包含多个队列Queue。一个Topic推荐只支持一种类型的数据,并进行强校验。
rocketmq和rabbitmq的队列消息不同在于,rocketmq的消息在队列中的位置和顺序通过Offset点位标记,所以支持从任意点位读取任意数量的消息。rocketmq的队列性质上和kafka的分区Partition类似,应该就是借鉴的kafka。

rocketmq消息

消息积压:
rocketmq通过动态调整消费者的消费速率来控制消息积压。可以根据系统的负载情况和消息队列的堆积情况,动态调整消费者的并发消费线程数,以适应消息的处理需求。rocketmq有消息的流控策略,当到达触发条件时客户端会收到错误和异常,提醒增加资源或者其他处理策略。
消息过期:
rocketmq对消息的定义比rabbitmq更加丰富,比如定时消费或者延时等待一段时间消费,这把消费的粒度控制在了消息本身,rabbitmq能做的就是整个队列做延时,如果是某个消息设置有效时间,需要拿到这个消息时才知道是否过期。
消息幂等:
rocketmq同样是唯一标识符,ack机制,但是也要注意消费端重复消费的问题,可以对比数据库消费数据。
事务消息:
image-1748176640938
rocketmq提供了事务消息,在生产端开启事务,将消息发送到半消息队列,然后执行本地事务逻辑。事务执行成功后,通过CommitTransaction提交本地事务消息;若事务执行失败,通过RollbackTransaction回滚事务消息。在消费者端,需要消费者通过重试等机制保证消息一致性。

rocketmq主题和标签

image-1748176670904
rocketmq相对于rabbitmq多了分组的概念,按组进行消费。消费者通过Subscription获取消息,处理消息的规则和状态配置。Subscription订阅是消费组和Topic的绑定规则。
Topic把消息的Topic和Tag通过Subscription的消息过滤规则转发到对应的消费组,消费组监听对应的Topic,Consumer Group和Tag。如图所示Consumer Group可以同时监听多个Topic。
image-1748176690539
消息到消费组的过滤规则,rocketmq设置的比rabbitmq更加自由,可以通过Tag(发送端自定义字符串)进行全文过滤匹配,也可以通过SQL92 sql语法对消息属性进行过滤匹配。(vop 3.0 使用的是自定义tag)

rocketmq的可靠消费

消息发送重试和流控制机制
rocketmq同样要面对可靠投递以及可靠转发的问题:

  1. producer消息可靠投递到Broker
  2. 如何可靠交付到消费端
  3. 消费端如何可靠消费
    问题1,producer可以设置rocketmq事务并会通过不断的重试进行发送,直至重试超过预设次数,同步发送线程会一直阻塞直到最终成功或失败,异步线程调用线程不会阻塞。重试会采用随机指数避退策略进行延迟重试。
    问题2,消费端通过ConsumerOffset进行位点管理,当消费端下线后再上线,会严格按照服务端保存的消费进度继续处理消息。如果服务端保存的历史位点信息已过期被删除,此时消费位点向前移动至服务端存储的最小位点。
    问题3,消费端PushConsumer通过消费重试完成消费。
    image-1748176763873
  • Ready:已就绪状态。消息在Apache RocketMQ服务端已就绪,可以被消费者消费。
  • Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态。
  • WaitingRetry:待重试状态,PushConsumer独有的状态。当消费者消息处理失败或消费超时,会触发消费重试逻辑判断。如果当前重试次数未达到最大次数,则该消息变为待重试状态,经过重试间隔后,消息将重新变为已就绪状态可被重新消费。多次重试之间,可通过重试间隔进行延长,防止无效高频的失败。
  • Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机。
  • DLQ:死信状态。消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试,会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复。

kafka

kafaka.apache.org

概览

image-1748176848246
可以看到kafka的架构和rocketmq很像,可以说rocketmq借鉴了很多kafka的设计思想。
kafka在3.0版本前强依赖Zookeeper,Zokeeper作为Broker的控制中心,管理Broker的上下线状态,主题分区,副本信息,存储Broker的ip等路由信息,协助故障转移,负载均衡等。kafka在3.0版本后就不强依赖Zookeeper了,而是在所有Broker中指定一个节点作为Controller节点,由这个节点做统一调度和分配任务,如果Controller节点宕机,会通过Raft协议选举一个新的Controller,所以为了避免脑裂,Raft协议推荐奇数作为Broker部署条件。
kafka中Topic和rocketmq的topic一样是个逻辑划分概念,kafka的Topic下叫Partition分区,相当于rocketmq的队列,是最小的存储单位,Partition分区实质上是一个日志文件,消息以追加的方式写入其中(零拷贝和顺序写保证高效),同样和rocketmq的消息一样,每个消息都会被分配给一个Offset偏移量,从上图可以看到,分区通过Replication复制到其他Broker机器中,实现了横向扩展和可靠冗余备份,分区副本中有一个是Leader负责为该分区接收和发送消息,其余分区的相同消息作为Followers追随者,Followers只负责数据和Leader保持同步,replication-factor复制因子数量就是Followers的数量。ISR(In-Sync Replicas) 同步复制集就是Followers中已经和Leader同步消息的数量,所有副本都确认他们已经将Record消息写入磁盘时,Kakfa才认为消息(record)被提交了。

kafka消息

消息积压:
kafka是可以通过分区动态横向扩展的,所以可以通过添加消费者并行消费消息,达到负载均衡。
消息过期:
kafka消息过期的粒度比较粗,只有Broker级别和Topic级别。实际过期时会给Segement打上delete标签,所以消息的过期最小维度是Segment级别的,不会精细到具体的某一个消息。
消息幂等:
Apache Kafka
kafka的消息幂等是说单个主题的单个分区内消息不重复,这种幂等只能是单会话上的幂等,跨会话以及多个topic-partition的情况,需要kafka的事务。
消息幂等开启后,会给Partition生成唯一PID,producer发来的每条消息都会携带Sequence number, 那么消息主键就是<PID,Partition,SeqNumber>来标记消息唯一性。
image-1748176925622
消息幂等配置enable.idempotence=true,同时要求acks设置为all/-1,并且重试次数大于零。
image-1748176948797
如果消费者消费完消息没来得及提交Offset就宕机了,kafka等待超时就会把消息重新负载到其他消费者身上进行消费,这样就可能重复消费。kakfa一样可以手动ack,ack有三种设置,分别是all/-1(默认的all,所有ISR消息同步后才ack),0(就是发送消息后,不关心消息是否被成功消费),1(主要是leader确认消息,不必所有follower全部确认,确认数由min.insync.replicas ISR指定)。如果broker宕机,只有在ISR列表中的复制才有资格成为leader。
image-1748176958248
当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
image-1748176969271
合理估算重试的时间间隔,可以避免无效的频繁重试。
image-1748176975929
max.in.flight.requests.per.connection < 5,客户端将在单个连接上发送的未确认请求的最大数量。重试会产生乱序问题,如果设置为1,单线程就不会乱序,但是牺牲了性能,一般默认5。
kafka的配置项很多,因为是开源的,大部分企业都是自己拿源码深度定制来满足业务需求。
事务消息:
Producer配置:

transactional.id 事务ID,类型为String字符串,默认为空,客户端自定义,例如"inspector_asr"
enable.idempotence 消息幂等开关,true/false,默认为false,当配置了transactional.id,此项一定要设置为true,否则会抛出客户端配置异常
transaction.timeout.ms 事务超时时间,默认为10秒,最长为15分钟
acks 要求此配置项必须设置为all/-1
retries 因为幂等特性保证了数据不会重复,在需要强可靠性的前提下,需要用户设置的重试次数 > 0
max.in.flight.requests.per.connection 要求<=5,此项配置是表明在producer还未收到broker应答的最大消息批次数量。该值设置的越大,标识可允许的吞吐越高,同时也越容易造成消息乱序

Consumer配置:

isolation.level 事务隔离级别,默认为空,开启事务的话,需要将其设置为"read_committed"

幂等性不能实现多分区以及多会话上的消息无重复,而 Kafka 事务则可以弥补这个缺陷,Kafka 自 0.11 版本开始也提供了对事务的支持,目前主要是在 read committed 隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息。
事务包括producer端事务,投递消息失败就重试。对于consumer端来说,根据事务隔离级别isolation.level读取可消费的已提交事务数据。如果对消息的幂等和事务特别重视,数据库应该设置联合主键判断唯一性,这个也是rocketmq推荐做法。
事务的前提就是先完成上面幂等配置,然后增加如下配置。
image-1748177463705
设置 transactional.id,就支持了跨多个生产者会话的可靠性语义,因为它允许客户端在启动任何新事务之前确保使用相同 TransactionalId 的事务已经完成。
设置consumer端可读取事务的隔离级别。
image-1748177478350
isolation.level=read_committed 读已提交,这样Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然,它也能看到非事务型 Producer 写入的所有消息。

kafka消息消费策略

kafka可以指定消费者,Topic和Partition分区进行消费。指定消费组后,具体组内consumer消费哪一条消息,有消费组选出的leader决定,消费组leader指定分区消费策略partition.assignment.strategy默认策略是Range + CooperativeSticky,还有RoundRobin,Sticky等。

kafka的可靠消费

Apache Kafka
官方定义,kafka消息可靠消费有三个语义

  • At most once—Messages may be lost but are never redelivered.
  • At least once—Messages are never lost but may be redelivered.
  • Exactly once—this is what people actually want, each message is delivered once and only once.
类型 消息是否会重复 消息是否会丢失 优势 劣势 适用场景
最多一次 生产端发送消息后不用等待和处理服务端响应,消息发送速度会很快 网络或服务端有问题会造成消息的丢失 消息系统吞吐量大且对消息的丢失不敏感。例如:日志收集、用户行为等场景。
最少一次 生产端发送消息后需要等待和处理服务端响应,如果失败会重试。 吞吐量较低,有重复发送的消息。 消息系统吞吐量一般,但是绝不能丢消息,对于重复消息不敏感。
精确一次 消息不重复,消息不丢失,消息可靠性很好。 吞吐量较低 吞吐量较低

Exactly once 精确一次 = 幂等 + At least once (ack=-1/all + replication 分区副本数 >= 2 + ISR
最小副本数>=2)
看看要考虑三个问题,kafka究竟怎么处理的

  1. producer丢失数据
  2. broker丢失数据
  3. consumer丢失数据
    问题一,producer支持事务,支持唯一标识,支持重试。
    问题二,broker有副本因子设置 replication.factor >= 3,保证可靠副本数量,有min.insync.replicas > 1,保证ISR可靠消费,有宕机后Raft选举策略。
    问题三,消费端有acks,手动ack,acks=all/-1,配合ISR保证可靠消费。

Redis

Redis做mq方案

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议