简介

上面介绍了常见的mq中间件rabbitmq rocketmq kafka。
由于引入一个多余的中间件会产生很多额外成本,所以很多项目考虑用redis实现mq的功能。
mq的主要功能就是一个队列,一端生产数据push到队列,另一端消费数据pull到本地。
只是后续衍生了很多mq附加功能,这些有 指定组和分区消费,手动ack和自动应答ack,备份机制,多节点签收响应机制,重试机制,超时机制,优先级队列,死信队列,延时队列等等。
一般消费端消费消息是有两种形式,一种是主动pull拉,另一种是消费端实时push推。

redis做mq的演变和问题

Redis List

Redis lists
所以最开始使用redis当做mq时,用到了redis的list结构,使用redis的指令lpush和rpop两个指令完成。
list相关数据可以通过配置redis实现持久化保证redis重启时消息不丢失,但是有三个显著的问题:

  1. 消费端消费消息时,需要不停的自旋,一直调用rpop命令尝试去list拿到消费数据,如果拿到的是nil就不消费如果不是nil就消费,这样导致cpu空转,是一种无谓的损耗。当然也可以用bpop(blockpop)阻塞式等待,直到有数据时阻塞解除,但是当多线程调用bpop或者redis自动断联时会遇到其他异常。
  2. list里的数据,无法多个分组消费,rpop一次就自动清楚掉数据了,如果pop到本地没消费完成程序就宕掉了,pop就永久丢失了,并没有一个有效的ack反馈机制。
  3. list无法作为延时队列,没办法设置延时消费,只有拿到消息时才知道消息的属性。

image-1748178762789
上面第一个问题和第三个问题,是redis的list无法回避的问题,但是针对第二个,list结构可以优化补偿。
当消费者从list中pop到数据后,消费前将消息备份到挂起列表,等消费成功后ack,即将挂起列表里的消息备份删除,消费失败消息就会滞留在挂起列表中,等待后续重新分配或者其他补偿策略。分组消费可以通过redis事务控制,或者加分布式锁处理,但是也会引入新问题。
这种对list数据结构本身的改造,有中通类似的公司实践过,将list改造为RedisReliableQueue。这种改造会脱离业务,让系统后续的业务需求变更成本大大集中在这些非业务工具中,并且不一定适配后续业务需求,并不是好的选择。

Redis Zset

Redis sorted sets
为了解决第三个问题,有人采用了redis的zset结构(zset底层数据结构是压缩链表和跳表),原本的score改为存时间戳,时间戳的有序性保证了消息的有序性,set保证了消息的唯一性。zset通过ZADD添加元素到zset中,通过ZRANGE消费zset内消息,然后ZREM删除消息。
除了上述相对于list的优势外,zset面对的问题和list几乎一样。

Redis Pub/Sub

Redis Pub/Sub
为了解决redis list做mq的第一个问题,redis提供了 pub/sub机制,顾名思义就是发布/订阅。多个消费者绑定channel,当消息publish到channel后,subscribe会阻塞式监听channel并消费消息。
pub/sub解决了list分组消费的问题,pub/sub其实就是广播机制。pub/sub仍然有两个显著的问题:

  1. 缺乏ack确认机制,和list一样没有消费确认机制,消息消费错误,或者服务器宕机,消息就自然丢失了,如果publish到channel后channel绑定的某一个subscribe没有上线,那么对于这个没有上线的subscribe就相当于从没有接触到这个消息。
  2. pub/sub用到的channel是一个缓冲区buffer,这个buffer相当脆弱并且只有8m,超过可能导致订阅断开,消息就会丢失,这对消费者的能力要求很高。只要是网络问题,redis连接问题,一定会导致消息无法持久化丢失。
    pub/sub更适合实时性的交互,适合消息处理速度很快,不会造成积压的场景。

redis做mq最佳实践

Redis Streams
image-1748178868831
redis5.0加入了新的数据类型 stream。
stream涉及了几个新的操作指令 :
XADD(发布消息到队列)

XADD asrStreamKey * name flit sex nan

XREAD(消费消息)

XREAD STREAM asrStreamKey 1703137389561-0

XREADGROUP(分组消费)

XREADGROUP GROUP asrStreamGroup consumer-a 
STREAMS asrStreamKey 1703137389561-0

XGROUP(创建分组)

XGROUP asrStreamGroup

XACK(消息确认)

XACK asrStreamKey asrStreamGroup 1703137389561-0

XPENDING(由于消费者宕机已经被消费者拿到但是未ack的消息)

XPENDING asrStreamKey asrStreamGroup - + 10 consumer-a

XDEL(删除消息)
XCLIAM(消息转移)
Redis stream几乎解决list pub/sub 产生的相关问题。通过几个指令可以看出 stream支持基本的消费,ack机制,也支持分组消费。
image-1748179083827
同时 stream考虑了消息id唯一性,采用时间戳+序号,同时考虑了时钟回拨问题。如果消费者崩溃,对于消费未完成的记录会加入到pending列表,通过XPENDING获取,如果某个消费者彻底宕机,还支持消费转移 XCLAIM。
如果pending并反复在各个消费者中无法消费掉,这种会被stream标记为死信,死信通过XDEL删除或者持久化存储到本地数据库有专人手动操作。
代码流程设计
image-1748179120422
核心代码
每个机器需要配置消费者组和消费者成员信息以及自己信息

@Data
@Configuration
@ConfigurationProperties(prefix = "stream.consumers")
public class RedisStreamConsumersProperties {

    /**
     * 本机消费者
     */
    private String ownConsumer;

    /**
     * 其他机器消费者
     */
    private List<String> otherConsumers;

}

提供工具类,一共四种状态

@Slf4j
@Component
@RequiredArgsConstructor
public class RedisStreamUtils {

    @NonNull
    private final ReactiveRedisTemplate<String, String> reactiveRedisTemplate;

    @NonNull
    private final RedisStreamConsumersProperties redisStreamConsumersProperties;

    public <T> void xAdd(String streamKey, T t) {
        reactiveRedisTemplate.opsForStream()
                .add(StreamRecords.newRecord()
                        .in(streamKey)
                        .ofObject(t)
                        .withId(RecordId.autoGenerate())
                )
                .onErrorResume(e -> {
                            log.error("redis stream 发送消息失败:{}", e.getMessage(), e);
                            return Mono.empty();
                        }
                )
                .subscribe(a ->
                        log.info("redis stream 发送消息成功:{}", a.getValue())
                );
    }

    public void xAck(String streamKey, String streamGroup, RecordId recordId) {
        reactiveRedisTemplate
                .opsForStream()
                .acknowledge(streamKey, streamGroup, recordId)
                .onErrorResume(e -> {
                    log.error("redis stream 手动签收消息失败:{}", e.getMessage(), e);
                    return Mono.empty();
                })
                .subscribe(c -> {
                            log.info("手动签收消息成功:{}", recordId);
                            reactiveRedisTemplate.opsForStream().delete(streamKey, recordId).subscribe(d -> log.info("删除消息成功"));
                        }
                );
    }

    public void xPending(String streamKey, String streamGroup, String consumerName) {
        reactiveRedisTemplate.opsForStream()
                .pending(streamKey, Consumer.from(streamGroup, consumerName))
                .onErrorResume(e -> {
                            log.error("redis stream pending消息查询失败:{}", e.getMessage(), e);
                            return Mono.empty();
                        }
                )
                .subscribe(p -> p.forEach(pd -> {
                                    if (pd.getElapsedTimeSinceLastDelivery().compareTo(Duration.ofMinutes(30)) >= 0) {
                                        //超过30分钟未签收 异常数据
                                        log.info("{}超时pending", pd.toString());
                                        //消息补偿处理 转移给其他同组消费者
                                        xClaim(streamKey, streamGroup, getNewOwner(), pd.getId());
                                    }
                                    if (pd.getTotalDeliveryCount() >= 5 || pd.getElapsedTimeSinceLastDelivery().compareTo(Duration.ofDays(1)) >= 0) {
                                        log.info("{}多次周转未投递成功{},异常记录",pd.toString());
                                        //TODO 超过五次转移仍然异常 消息记录持久化到数据库 删除消息
                                        xAck(streamKey, streamGroup, pd.getId());
                                    }
                                }
                        )
                );
    }

    public void xClaim(String streamKey, String streamGroup, String newOwner, RecordId recordId) {
        reactiveRedisTemplate.opsForStream()
                .claim(streamKey, streamGroup, newOwner, Duration.ofMinutes(30), recordId)
                .onErrorResume(e -> {
                    log.error("redis stream 转移消息失败:{}", e.getMessage(), e);
                    return Mono.empty();
                })
                .subscribe(c -> log.info("消息转移成功:{}", recordId.getValue()));
    }

    private String getNewOwner(){
        List<String> otherConsumers = redisStreamConsumersProperties.getOtherConsumers();
        int random = RandomUtil.randomInt(0, otherConsumers.size() * 2);
        return otherConsumers.get(random % otherConsumers.size());
    }
}

监听消费关系:

@Slf4j
@RequiredArgsConstructor
@Configuration
public class RedisStreamConfig {

    @Value("${stream.consumers.ownConsumer}")
    private String consumer;

    private final ReactiveRedisTemplate<String, String> reactiveRedisTemplate;

    private final StreamAsrListener streamAsrListener;

    private final StreamDataListener streamDataListener;

    /**
     * 绑定一个消费关系
     *
     * @param connectionFactory 连接工厂
     * @return 订阅关系
     */
    @Bean
    public Subscription asrSubscription(LettuceConnectionFactory connectionFactory) {

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, AsrTO>> options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .batchSize(1)
                .executor(Executors.newSingleThreadExecutor())
                .pollTimeout(Duration.ZERO) // 超时时间,如果为0表示不设置超时时间,如果为正数,一旦超时则会抛出异常
                .targetType(AsrTO.class)
                .errorHandler(e -> log.error("redis stream 异常:{}", e.getMessage(), e))
                .build();
        StreamMessageListenerContainer<String, ObjectRecord<String, AsrTO>> listenerContainer = StreamMessageListenerContainer.create(connectionFactory, options);

        return prepareAsrStreamAndGroup(reactiveRedisTemplate.opsForStream(), ASR_STREAM_KEY, ASR_STREAM_GROUP)
                .map(s -> {
                    Subscription subscription = listenerContainer.receive(
                            Consumer.from(ASR_STREAM_GROUP, consumer),
                            StreamOffset.create(ASR_STREAM_KEY, ReadOffset.lastConsumed()),
                            streamAsrListener);
                    listenerContainer.start();
                    return subscription;
                }).block();
    }

    /**
     * 绑定一个消费关系
     *
     * @param connectionFactory 连接工厂
     * @return 订阅关系
     */
    @Bean
    public Subscription dataSubscription(LettuceConnectionFactory connectionFactory) {

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, MessageTO>> options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .batchSize(1)
                .executor(Executors.newSingleThreadExecutor())
                .pollTimeout(Duration.ZERO) // 超时时间,如果为0表示不设置超时时间,如果为正数,一旦超时则会抛出异常
                .targetType(MessageTO.class)
                .errorHandler(e -> log.error("redis stream 异常:{}", e.getMessage(), e))
                .build();
        StreamMessageListenerContainer<String, ObjectRecord<String, MessageTO>> listenerContainer = StreamMessageListenerContainer.create(connectionFactory, options);

        return prepareAsrStreamAndGroup(reactiveRedisTemplate.opsForStream(), DATA_STREAM_KEY, DATA_STREAM_GROUP)
                .map(s -> {
                    Subscription subscription = listenerContainer.receive(
                            Consumer.from(DATA_STREAM_GROUP, consumer),
                            StreamOffset.create(DATA_STREAM_KEY, ReadOffset.lastConsumed()),
                            streamDataListener);
                    listenerContainer.start();
                    return subscription;
                }).block();
    }

    /**
     * 查找Stream信息,如果不存在,则创建Stream
     */
    private Mono<StreamInfo.XInfoStream> prepareAsrStreamAndGroup(ReactiveStreamOperations<String, ?, ?> ops, String streamKey, String streamGroup) {
        // info方法查询Stream信息,如果该Stream不存在,底层会报错,这时会调用onErrorResume方法。
        return ops.info(streamKey)
                .onErrorResume(err -> {
                    log.warn("获取stream信息异常:{}", err.getMessage());
                    //创建Stream
                    return ops.createGroup(streamKey, streamGroup)
                            .flatMap(s -> ops.info(streamKey));
                });
    }
}

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议