很多人对Redis的印象还停留在“缓存”二字实际上Redis 5.0之后已经具备了一流的消息队列能力。但超过60%的团队都用错了方案——要么丢消息要么内存爆炸要么回调地狱。本文将带你彻底搞懂Redis作为消息队列的三种姿势并给出生产级最佳实践。一、开篇当老板让你用Redis做MQ“小王公司预算有限不想再买一套RabbitMQ了你直接用Redis搞个消息队列吧不就push/pop嘛简单”—— 这是一个真实的故事也是一个“坑”的开始。很多人被这种需求折磨过用List做队列结果消费者重启后消息丢了用Pub/Sub发布者一断线消息全没了好不容易上Stream又发现消费者组配置复杂……Redis到底适不适合做消息队列适合哪些场景三种方案如何抉择本文将从底层原理、原子性、持久化、可靠性四个维度彻底讲透Redis消息队列的全部姿势并提供可直接拷贝的Java/Go代码。二、Redis做消息队列是与非2.1 为什么选择Redis做MQ零额外组件无需部署RabbitMQ/Kafka减少运维成本极高性能单机可达10万 QPS低延迟亚毫秒级数据结构丰富List、Pub/Sub、Stream三种模式应对不同场景渐进式过渡中小项目从缓存扩展到消息队列一套技术栈搞定2.2 Redis的致命短板必须知道对比项RedisRabbitMQ/RocketMQ消息持久化部分支持RDB/AOF完整磁盘存储消费确认Stream支持其他不支持内置ACK机制消息回溯仅Stream支持普遍支持死信队列需手动模拟原生支持海量堆积内存限制会OOM磁盘无界堆积结论Redis适合轻量级、实时性高、数据量可控的消息场景重型业务MQ请绕道。三、方案一List —— 最朴素的生产者-消费者队列3.1 基础模型生产者LPUSH key value消费者RPOP key这是最简单的队列先进先出FIFO。java// 生产者 public void produce(String queueName, String message) { jedis.lpush(queueName, message); } // 消费者轮询 public String consume(String queueName) { return jedis.rpop(queueName); }3.2 阻塞版本BRPOP轮询会空耗CPU。使用BRPOP阻塞等待消息超时机制防止永久阻塞。java// 阻塞5秒没有消息返回null ListString result jedis.brpop(5, queueName); if (result ! null) { String message result.get(1); // 处理消息 }3.3 致命缺陷无确认机制消费者从List中RPOP后消息立刻从队列移除。如果此时消费者宕机或处理失败消息永久丢失。工业级改进引入“两个队列” —— 主队列 处理中队列javapublic boolean reliableConsume(String queueName, String processingQueue, int timeoutSec) { // 1. 从主队列取消息原子移动到处理队列 String message jedis.rpoplpush(queueName, processingQueue); if (message null) return false; try { process(message); // 处理成功从处理队列删除 jedis.lrem(processingQueue, 1, message); return true; } catch (Exception e) { // 处理失败重新放回主队列或记录死信 jedis.lpush(queueName, message); jedis.lrem(processingQueue, 1, message); return false; } }RPOPLPUSH是原子命令能保证消息不会丢失在“取”和“移”之间。但是如果进程在process(message)之前崩溃消息会一直留在处理队列需要额外写一个监控线程扫描处理队列中的超时消息并重试。3.4 List模式适用场景✅ 消息可丢失如日志收集✅ 消费者处理逻辑幂等✅ 简单任务队列如发送邮件❌ 不适用金融级可靠场景四、方案二Pub/Sub —— 广播与实时推送4.1 发布订阅模型生产者发布消息到频道所有订阅该频道的消费者实时收到消息。java// 订阅者异步监听 JedisPubSub listener new JedisPubSub() { Override public void onMessage(String channel, String message) { System.out.println(收到: message); } }; jedis.subscribe(listener, news); // 发布者 jedis.publish(news, Redis 7.2 发布啦);4.2 最致命的两个坑坑一消息不持久化如果消息发布时没有消费者在线消息直接丢弃。Pub/Sub 是纯实时的没有消息存储。坑二消费者无法回溯新订阅者加入后只能收到之后的消息历史消息无法获取。4.3 生产技巧结合List做可靠发布java// 保证消息至少被一个消费者收到借助List兜底 public void reliablePublish(String channel, String queue, String message) { // 1. 先存入持久队列持久化 jedis.lpush(queue, message); // 2. 再发布实时通知唤醒消费者 jedis.publish(channel, queue); } // 消费者收到channel通知后主动去List拉取消息4.4 Pub/Sub适用场景✅ 实时推送聊天、在线状态、配置刷新✅ 多消费者广播一个消息分发给多个服务✅ 对丢消息不敏感的场景❌ 绝对不能用于订单、支付等核心链路五、方案三Stream —— 企业级消息队列的终极形态Redis 5.0 引入了Stream数据类型终于让Redis具备了媲美专业MQ的可靠性能力。5.1 Stream核心概念概念解释Stream消息队列本身每条消息有唯一IDConsumer Group消费者组组内多个消费者分摊消息last_delivered_id消费者组已消费的最新消息IDPending List已发给消费者但未ACK的消息XACK消费者确认消息处理完成这些机制实现了消息持久化、消费确认、故障恢复、消息回溯。5.2 基础操作增删改查bash# 添加消息* 表示自动生成ID XADD mystream * name Alice age 30 # 读取消息从0开始读所有消息 XRANGE mystream - # 读取新消息阻塞$代表最新ID XREAD BLOCK 0 STREAMS mystream $ # 创建消费者组 XGROUP CREATE mystream mygroup 0 MKSTREAM # 消费者组读取消息 XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream 5.3 Java实战可靠消息队列使用Lettuce推荐或Jedis需要升级版。以下是用Lettuce实现的完整生产消费ACK流程。java// 1. 生产者添加消息 public void produce(String streamKey, MapString, String body) { MapString, String msg new HashMap(); msg.put(id, UUID.randomUUID().toString()); msg.put(data, JSON.toJSONString(body)); String msgId redis.xadd(streamKey, msg); System.out.println(消息已发送: msgId); } // 2. 消费者组内消费 ACK public void consume(String streamKey, String group, String consumer) { while (true) { // 从消费者组读取一条消息阻塞10秒 ListStreamMessageString, String messages redis.xreadgroup( Consumer.from(group, consumer), XReadArgs.Builder.block(10000).count(1), StreamOffset.from(streamKey, ) ); if (messages null || messages.isEmpty()) continue; StreamMessageString, String msg messages.get(0); try { // 业务处理 processMessage(msg.getBody()); // 确认处理完成 redis.xack(streamKey, group, msg.getId()); } catch (Exception e) { // 不ACK消息会留在Pending列表后续可claim重试 log.error(处理失败等待重试, e); } } }5.4 故障恢复处理未ACK的消息如果消费者处理消息时宕机消息会停留在Pending列表。需要另一个监控线程执行XPENDING和XCLAIM。java// 监控线程扫描pending超过30s的消息并转移给其他消费者 public void claimPending(String streamKey, String group, String newConsumer) { // 获取pending摘要 ListObject pending redis.xpending(streamKey, group, Range.unbounded(), Limit.unlimited()); for (Object p : pending) { String msgId (String) ((List?) p).get(0); long idleMs (Long) ((List?) p).get(1); if (idleMs 30000) { // 闲置超过30秒 // 转移消息到新消费者 ListStreamMessageString, String claimed redis.xclaim( streamKey, group, newConsumer, Duration.ofSeconds(1), msgId ); // 重新处理 retry(claimed.get(0)); } } }5.5 消息回溯与限流回溯用XRANGE或XREAD指定起始ID可重新消费任何历史消息限流MAXLEN ~ 10000近似裁剪控制Stream内存占用bash# 保留最多10000条消息性能友好 XADD mystream MAXLEN ~ 10000 * field value六、三大方案决战对比特性ListPub/SubStream消息持久化是RDB/AOF否是消费确认需手动模拟无✅ ACK Pending消息回溯不支持不支持✅ XRANGE消费者组无无✅ 支持故障恢复手动无✅ XCLAIM多播单播✅ 广播✅ 消费者组内多播内存控制可限制长度无MAXLEN复杂度⭐⭐⭐⭐⭐⭐⭐七、Stream vs 专业MQ到底该怎么选对比维度Redis StreamRabbitMQKafka吞吐量万级QPS10万5万50万消息堆积受内存限制超限OOM磁盘几乎无限磁盘海量消息顺序严格按ID顺序保证队列内顺序分区内有序事务支持弱MULTI强弱运维成本低中高适用场景中小项目、实时性高、可靠要求中等企业级业务解耦大数据、日志、流处理选型建议小型项目/个人项目 → Redis Stream完全够用核心交易系统 → RabbitMQ / RocketMQ日志/埋点/大数据 → Kafka实时广播通知 → Redis Pub/Sub八、生产最佳实践避坑五条1️⃣ 内存爆炸Stream必须有MAXLENjava// 添加消息时限制长度防止无限堆积 redis.xadd(streamKey, XAddArgs.Builder.maxlen(10000), body);2️⃣ 消费组名与消费者名规范text组名业务域_版本如 order_v1 消费者名机器标识_线程池如 app01_worker33️⃣ 监控关键指标Stream长度 (XLEN)消费者组pending数量 (XPENDING)各消费者的空闲时间 (XPENDING详情)4️⃣ 幂等性设计消息可能被重复消费因重试、claim等。消费逻辑必须幂等或借助唯一ID去重。5️⃣ 优雅停机在应用关闭时调用XACK确认当前正在处理的消息避免被误判为超时pending。javaPreDestroy public void gracefulShutdown() { for (StreamMessage msg : processingMessages) { jedis.xack(streamKey, group, msg.getId()); } }九、实战案例基于Redis Stream的订单30分钟超时关闭需求用户下单后30分钟未支付自动取消订单恢复库存。设计订单创建后发送消息到order_stream消息体含orderId和expireTime消费者组监听按expireTime时间戳延时消费Stream本身不支持延时需要借助优先级队列或业务轮询实际生产常用时间轮 最小堆但Redis中可简单实现将expireTime作为score存入ZSet定时扫描并触发取消。这里提供一种混合方案Stream ZSet延时队列java// 1. 下单时同时写入ZSet延迟队列 public void createOrder(Order order) { saveOrderToDB(order); String orderId order.getId(); long expireTimestamp System.currentTimeMillis() 30*60*1000; jedis.zadd(delay:order_cancel, expireTimestamp, orderId); } // 2. 定时任务每分钟扫描ZSet public void scanExpiredOrders() { long now System.currentTimeMillis(); SetString expiredOrders jedis.zrangeByScore(delay:order_cancel, 0, now); for (String orderId : expiredOrders) { // 将任务投递到Stream由专门消费者处理取消逻辑 MapString, String msg new HashMap(); msg.put(orderId, orderId); jedis.xadd(stream:order_cancel, msg); jedis.zrem(delay:order_cancel, orderId); // 移除已处理 } } // 3. Stream消费者处理取消订单 public void cancelOrderConsumer() { ListStreamMessage msgs jedis.xreadgroup(...); for (StreamMessage msg : msgs) { String orderId msg.get(orderId); // 检查订单是否未支付 - 取消 恢复库存 cancelOrder(orderId); jedis.xack(...); } }十、总结与面试速记一句话总结List简单无脑但丢了消息别找我Pub/Sub实时广播离线消息不存在StreamRedis亲儿子轻量可靠中小项目首选面试必问三板斧1. Redis的Pub/Sub和Stream有什么区别Pub/Sub没有存储消息即发即收消费者离线则丢消息Stream持久化支持消费确认、回溯、消费者组更像专业MQ。2. Redis Stream如何保证消息不丢失消息持久化到RDB/AOF 消费者组ACK机制 Pending超时转移XCLAIM。3. Redis做消息队列的最大缺点是什么内存限制。大量堆积会导致Redis OOM不适合海量消息场景。最终建议日消息量 100万Redis Stream非常香需要可靠的大规模解耦请上RabbitMQ/Kafka永远不要用List做金融级异步任务如果这篇文章帮你避开了Redis消息队列的坑欢迎点赞、收藏、转发。下一期我们讲《Redis Cluster源码级剖析》敬请期待。评论区的三个问题欢迎讨论你在生产环境用过Stream吗遇到什么坑用List RPOPLPUSH实现可靠性会有什么并发问题你觉得Redis Stream未来能否替代RabbitMQ