简介
上面介绍了常见的mq中间件rabbitmq rocketmq kafka。
由于引入一个多余的中间件会产生很多额外成本,所以很多项目考虑用redis实现mq的功能。
mq的主要功能就是一个队列,一端生产数据push到队列,另一端消费数据pull到本地。
只是后续衍生了很多mq附加功能,这些有 指定组和分区消费,手动ack和自动应答ack,备份机制,多节点签收响应机制,重试机制,超时机制,优先级队列,死信队列,延时队列等等。
一般消费端消费消息是有两种形式,一种是主动pull拉,另一种是消费端实时push推。
redis做mq的演变和问题
Redis List
Redis lists
所以最开始使用redis当做mq时,用到了redis的list结构,使用redis的指令lpush和rpop两个指令完成。
list相关数据可以通过配置redis实现持久化保证redis重启时消息不丢失,但是有三个显著的问题:
- 消费端消费消息时,需要不停的自旋,一直调用rpop命令尝试去list拿到消费数据,如果拿到的是nil就不消费如果不是nil就消费,这样导致cpu空转,是一种无谓的损耗。当然也可以用bpop(blockpop)阻塞式等待,直到有数据时阻塞解除,但是当多线程调用bpop或者redis自动断联时会遇到其他异常。
- list里的数据,无法多个分组消费,rpop一次就自动清楚掉数据了,如果pop到本地没消费完成程序就宕掉了,pop就永久丢失了,并没有一个有效的ack反馈机制。
- list无法作为延时队列,没办法设置延时消费,只有拿到消息时才知道消息的属性。

上面第一个问题和第三个问题,是redis的list无法回避的问题,但是针对第二个,list结构可以优化补偿。
当消费者从list中pop到数据后,消费前将消息备份到挂起列表,等消费成功后ack,即将挂起列表里的消息备份删除,消费失败消息就会滞留在挂起列表中,等待后续重新分配或者其他补偿策略。分组消费可以通过redis事务控制,或者加分布式锁处理,但是也会引入新问题。
这种对list数据结构本身的改造,有中通类似的公司实践过,将list改造为RedisReliableQueue。这种改造会脱离业务,让系统后续的业务需求变更成本大大集中在这些非业务工具中,并且不一定适配后续业务需求,并不是好的选择。
Redis Zset
Redis sorted sets
为了解决第三个问题,有人采用了redis的zset结构(zset底层数据结构是压缩链表和跳表),原本的score改为存时间戳,时间戳的有序性保证了消息的有序性,set保证了消息的唯一性。zset通过ZADD添加元素到zset中,通过ZRANGE消费zset内消息,然后ZREM删除消息。
除了上述相对于list的优势外,zset面对的问题和list几乎一样。
Redis Pub/Sub
Redis Pub/Sub
为了解决redis list做mq的第一个问题,redis提供了 pub/sub机制,顾名思义就是发布/订阅。多个消费者绑定channel,当消息publish到channel后,subscribe会阻塞式监听channel并消费消息。
pub/sub解决了list分组消费的问题,pub/sub其实就是广播机制。pub/sub仍然有两个显著的问题:
- 缺乏ack确认机制,和list一样没有消费确认机制,消息消费错误,或者服务器宕机,消息就自然丢失了,如果publish到channel后channel绑定的某一个subscribe没有上线,那么对于这个没有上线的subscribe就相当于从没有接触到这个消息。
- pub/sub用到的channel是一个缓冲区buffer,这个buffer相当脆弱并且只有8m,超过可能导致订阅断开,消息就会丢失,这对消费者的能力要求很高。只要是网络问题,redis连接问题,一定会导致消息无法持久化丢失。
pub/sub更适合实时性的交互,适合消息处理速度很快,不会造成积压的场景。
redis做mq最佳实践
Redis Streams

redis5.0加入了新的数据类型 stream。
stream涉及了几个新的操作指令 :
XADD(发布消息到队列)
XADD asrStreamKey * name flit sex nan
XREAD(消费消息)
XREAD STREAM asrStreamKey 1703137389561-0
XREADGROUP(分组消费)
XREADGROUP GROUP asrStreamGroup consumer-a
STREAMS asrStreamKey 1703137389561-0
XGROUP(创建分组)
XGROUP asrStreamGroup
XACK(消息确认)
XACK asrStreamKey asrStreamGroup 1703137389561-0
XPENDING(由于消费者宕机已经被消费者拿到但是未ack的消息)
XPENDING asrStreamKey asrStreamGroup - + 10 consumer-a
XDEL(删除消息)
XCLIAM(消息转移)
Redis stream几乎解决list pub/sub 产生的相关问题。通过几个指令可以看出 stream支持基本的消费,ack机制,也支持分组消费。

同时 stream考虑了消息id唯一性,采用时间戳+序号,同时考虑了时钟回拨问题。如果消费者崩溃,对于消费未完成的记录会加入到pending列表,通过XPENDING获取,如果某个消费者彻底宕机,还支持消费转移 XCLAIM。
如果pending并反复在各个消费者中无法消费掉,这种会被stream标记为死信,死信通过XDEL删除或者持久化存储到本地数据库有专人手动操作。
代码流程设计

核心代码
每个机器需要配置消费者组和消费者成员信息以及自己信息
@Data
@Configuration
@ConfigurationProperties(prefix = "stream.consumers")
public class RedisStreamConsumersProperties {
/**
* 本机消费者
*/
private String ownConsumer;
/**
* 其他机器消费者
*/
private List<String> otherConsumers;
}
提供工具类,一共四种状态
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisStreamUtils {
@NonNull
private final ReactiveRedisTemplate<String, String> reactiveRedisTemplate;
@NonNull
private final RedisStreamConsumersProperties redisStreamConsumersProperties;
public <T> void xAdd(String streamKey, T t) {
reactiveRedisTemplate.opsForStream()
.add(StreamRecords.newRecord()
.in(streamKey)
.ofObject(t)
.withId(RecordId.autoGenerate())
)
.onErrorResume(e -> {
log.error("redis stream 发送消息失败:{}", e.getMessage(), e);
return Mono.empty();
}
)
.subscribe(a ->
log.info("redis stream 发送消息成功:{}", a.getValue())
);
}
public void xAck(String streamKey, String streamGroup, RecordId recordId) {
reactiveRedisTemplate
.opsForStream()
.acknowledge(streamKey, streamGroup, recordId)
.onErrorResume(e -> {
log.error("redis stream 手动签收消息失败:{}", e.getMessage(), e);
return Mono.empty();
})
.subscribe(c -> {
log.info("手动签收消息成功:{}", recordId);
reactiveRedisTemplate.opsForStream().delete(streamKey, recordId).subscribe(d -> log.info("删除消息成功"));
}
);
}
public void xPending(String streamKey, String streamGroup, String consumerName) {
reactiveRedisTemplate.opsForStream()
.pending(streamKey, Consumer.from(streamGroup, consumerName))
.onErrorResume(e -> {
log.error("redis stream pending消息查询失败:{}", e.getMessage(), e);
return Mono.empty();
}
)
.subscribe(p -> p.forEach(pd -> {
if (pd.getElapsedTimeSinceLastDelivery().compareTo(Duration.ofMinutes(30)) >= 0) {
//超过30分钟未签收 异常数据
log.info("{}超时pending", pd.toString());
//消息补偿处理 转移给其他同组消费者
xClaim(streamKey, streamGroup, getNewOwner(), pd.getId());
}
if (pd.getTotalDeliveryCount() >= 5 || pd.getElapsedTimeSinceLastDelivery().compareTo(Duration.ofDays(1)) >= 0) {
log.info("{}多次周转未投递成功{},异常记录",pd.toString());
//TODO 超过五次转移仍然异常 消息记录持久化到数据库 删除消息
xAck(streamKey, streamGroup, pd.getId());
}
}
)
);
}
public void xClaim(String streamKey, String streamGroup, String newOwner, RecordId recordId) {
reactiveRedisTemplate.opsForStream()
.claim(streamKey, streamGroup, newOwner, Duration.ofMinutes(30), recordId)
.onErrorResume(e -> {
log.error("redis stream 转移消息失败:{}", e.getMessage(), e);
return Mono.empty();
})
.subscribe(c -> log.info("消息转移成功:{}", recordId.getValue()));
}
private String getNewOwner(){
List<String> otherConsumers = redisStreamConsumersProperties.getOtherConsumers();
int random = RandomUtil.randomInt(0, otherConsumers.size() * 2);
return otherConsumers.get(random % otherConsumers.size());
}
}
监听消费关系:
@Slf4j
@RequiredArgsConstructor
@Configuration
public class RedisStreamConfig {
@Value("${stream.consumers.ownConsumer}")
private String consumer;
private final ReactiveRedisTemplate<String, String> reactiveRedisTemplate;
private final StreamAsrListener streamAsrListener;
private final StreamDataListener streamDataListener;
/**
* 绑定一个消费关系
*
* @param connectionFactory 连接工厂
* @return 订阅关系
*/
@Bean
public Subscription asrSubscription(LettuceConnectionFactory connectionFactory) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, AsrTO>> options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.batchSize(1)
.executor(Executors.newSingleThreadExecutor())
.pollTimeout(Duration.ZERO) // 超时时间,如果为0表示不设置超时时间,如果为正数,一旦超时则会抛出异常
.targetType(AsrTO.class)
.errorHandler(e -> log.error("redis stream 异常:{}", e.getMessage(), e))
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, AsrTO>> listenerContainer = StreamMessageListenerContainer.create(connectionFactory, options);
return prepareAsrStreamAndGroup(reactiveRedisTemplate.opsForStream(), ASR_STREAM_KEY, ASR_STREAM_GROUP)
.map(s -> {
Subscription subscription = listenerContainer.receive(
Consumer.from(ASR_STREAM_GROUP, consumer),
StreamOffset.create(ASR_STREAM_KEY, ReadOffset.lastConsumed()),
streamAsrListener);
listenerContainer.start();
return subscription;
}).block();
}
/**
* 绑定一个消费关系
*
* @param connectionFactory 连接工厂
* @return 订阅关系
*/
@Bean
public Subscription dataSubscription(LettuceConnectionFactory connectionFactory) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, MessageTO>> options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.batchSize(1)
.executor(Executors.newSingleThreadExecutor())
.pollTimeout(Duration.ZERO) // 超时时间,如果为0表示不设置超时时间,如果为正数,一旦超时则会抛出异常
.targetType(MessageTO.class)
.errorHandler(e -> log.error("redis stream 异常:{}", e.getMessage(), e))
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, MessageTO>> listenerContainer = StreamMessageListenerContainer.create(connectionFactory, options);
return prepareAsrStreamAndGroup(reactiveRedisTemplate.opsForStream(), DATA_STREAM_KEY, DATA_STREAM_GROUP)
.map(s -> {
Subscription subscription = listenerContainer.receive(
Consumer.from(DATA_STREAM_GROUP, consumer),
StreamOffset.create(DATA_STREAM_KEY, ReadOffset.lastConsumed()),
streamDataListener);
listenerContainer.start();
return subscription;
}).block();
}
/**
* 查找Stream信息,如果不存在,则创建Stream
*/
private Mono<StreamInfo.XInfoStream> prepareAsrStreamAndGroup(ReactiveStreamOperations<String, ?, ?> ops, String streamKey, String streamGroup) {
// info方法查询Stream信息,如果该Stream不存在,底层会报错,这时会调用onErrorResume方法。
return ops.info(streamKey)
.onErrorResume(err -> {
log.warn("获取stream信息异常:{}", err.getMessage());
//创建Stream
return ops.createGroup(streamKey, streamGroup)
.flatMap(s -> ops.info(streamKey));
});
}
}
Q.E.D.






Comments | 0 条评论