Redis实现延迟队列的方案
- 人工智能
- 2025-09-10 19:27:03

在分布式系统中,延迟队列是一种常见的需求,例如订单超时取消、任务定时执行等。Redis 作为高性能的内存数据库,提供了多种实现延迟队列的方案。本文将介绍几种不同的 Redis 方案,并分析其优缺点及适用场景。
Sorted Set利用 Redis 的 Sorted Set 数据结构,将消息 ID 作为成员,到期时间戳作为分数。通过 ZRANGEBYSCORE 命令获取已到期的消息。
const ( queueName = "delay_queue" ) type ZSetDelayQueue struct { rds *redis.Client scanInterval time.Duration } // PushToDelayQueue :将任务推到延迟队列中,任务会在指定的延迟时间后可用。 // val: 需要推送的值, score: 过期时间点 func (d *ZSetDelayQueue) PushToDelayQueue(ctx context.Context, val string, score float64) error { _, err := d.rds.ZAdd(ctx, queueName, redis.Z{Score: score, Member: val}).Result() if err != nil { return err } return nil } // PopFromDelayQueue :从延迟队列中取出过期的任务 func (d *ZSetDelayQueue) PopFromDelayQueue(ctx context.Context) ([]string, error) { nowStr := strconv.FormatInt(time.Now().Unix(), 10) var vals []string // 使用Redis事务保证原子性 err := d.rds.Watch(ctx, func(tx *redis.Tx) error { // 获取过期的任务 var err error vals, err = d.rds.ZRangeByScore(ctx, queueName, &redis.ZRangeBy{ Min: "0", Max: nowStr, }).Result() if err != nil { return err } // 删除过期的任务 if len(vals) == 0 { return nil } _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { pipe.ZRemRangeByScore(ctx, queueName, "0", nowStr) return nil }) return nil }, queueName) if err != nil { return nil, err } return vals, nil }✅ 优点
实现简单,易于理解支持任意精度的延迟时间可以按时间顺序处理任务支持快速查找和删除特定任务❌缺点
需要轮询检查到期任务大量任务时性能可能下降没有内置的消费者竞争机制适用场景
任务量中等的系统对延迟精度要求不是特别高的场景需要支持任务优先级调整的场景 Key Expiration + Pub/Sub利用 Redis 的键过期通知功能,为每个延迟任务设置一个过期键,当键过期时通过 Pub/Sub 机制通知消费者。
type KeyExpDelayQueue struct { rdb *redis.Client } var ( expireKeyPrefix = "expire_key" ) func (q *KeyExpDelayQueue) AddTask(ctx context.Context, val string, delay time.Duration) error { // 设置过期键 expireKey := expireKeyPrefix + val // 设置过期时间 return q.rdb.Set(ctx, expireKey, val, delay).Err() } func (q *KeyExpDelayQueue) StartConsume(ctx context.Context, handler func(val string) error) error { // 确保开启了键空间通知 q.rdb.ConfigSet(ctx, "notify-keyspace-events", "Ex") // 订阅过期事件 pubsub := q.rdb.Subscribe(ctx, "__keyevent@0__:expired") defer pubsub.Close() // 监听处理过期事件 ch := pubsub.Channel() for msg := range ch { // 检查key是否为关注的过期键 if msg.Payload[:len(expireKeyPrefix)] != expireKeyPrefix { continue } // 处理过期事件 val := msg.Payload[len(expireKeyPrefix):] if err := handler(val); err == nil { q.rdb.Del(ctx, msg.Payload) } } return nil }✅优点
不需要轮询,事件驱动模式实现简单,占用资源少任务触发及时❌缺点
需要开启 Redis 键空间通知功能可靠性较低,如果消费者断开连接可能丢失通知无法查看待处理的任务列表无法实现任务优先级适用场景
对可靠性要求不高的场景任务量较小的系统需要实时触发的场景 Sorted Set方案进阶版为了解耦任务发现和任务处理、支持多消费者模型和可靠性保障,在原sorted set方案基础上引入新结构和处理机制
Stream+Consumer Group:List+Block: Stream+Consumer Group使用 Redis Stream 作为消息队列,结合有序集合存储延迟信息。定时将到期消息从有序集合移动到 Stream 中,由消费者组处理
Stream+Consumer Group方案支持多消费者组,那么多个消费者就可以进行并行处理,提升处理效率。并且Stream还提供了消息确认机制,确保任务能被处理
type streamDelayQueue struct { rdb *redis.Client delayKey, streamKey, groupName string } var ( moveTaskScript = ` local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], '0', ARGV[1]) local count = 0 for _, task in ipairs(tasks) do redis.call('XADD', KEYS[2], '*', 'task', task) redis.call('ZREM', KEYS[1], task) count = count + 1 end return count ` ) // AddTask 添加任务到延迟队列 func (q *streamDelayQueue) AddTask(ctx context.Context, taskID string, delay time.Duration) error { // 1. 计算过期时间 expireAt := time.Now().Add(delay).Unix() // 2. 将任务推到有序集合 return q.rdb.ZAdd(ctx, q.delayKey, redis.Z{Score: float64(expireAt), Member: taskID}).Err() } // MoveReadyTasks 将准备好的任务移动到任务队列 func (q *streamDelayQueue) MoveReadyTasks(ctx context.Context) (int64, error) { return q.rdb.Eval(ctx, moveTaskScript, []string{q.delayKey, q.streamKey}, time.Now().Unix()).Int64() } // ConsumeTasks 消费任务 func (q *streamDelayQueue) ConsumeTasks(ctx context.Context, consumerName string, count int64) ([]redis.XStream, error) { return q.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ Group: q.groupName, Consumer: consumerName, Streams: []string{q.streamKey, ">"}, Count: count, Block: 0, }).Result() } // AckTask 确认任务 func (q *streamDelayQueue) AckTask(ctx context.Context, taskID string) error { return q.rdb.XAck(ctx, q.streamKey, q.groupName, taskID).Err() } List+Block使用 Redis List 作为队列,结合定时任务将到期的延迟任务添加到队列中,消费者使用阻塞操作等待任务。
本方案则支持多消费者进行竞争
type listBlockDelayQueue struct { rdb *redis.Client delayKey, listKey string } // AddTask 添加任务到延迟队列 func (q *listBlockDelayQueue) AddTask(ctx context.Context, taskID string, delay time.Duration) error { // 计算执行时间 execTime := time.Now().Add(delay).Unix() // 添加到有序集合 return q.rdb.ZAdd(ctx, q.delayKey, redis.Z{ Score: float64(execTime), Member: taskID, }).Err() } // MoveReadyTasks 将到期任务移动到List func (q *listBlockDelayQueue) MoveReadyTasks(ctx context.Context) (int64, error) { now := time.Now().Unix() script := ` local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], '0', ARGV[1]) local count = 0 for i, task in ipairs(tasks) do redis.call('LPUSH', KEYS[2], task) redis.call('ZREM', KEYS[1], task) count = count + 1 end return count ` result, err := q.rdb.Eval(ctx, script, []string{q.delayKey, q.listKey}, now).Int64() return result, err } // ConsumeTask 消费任务(堵塞操作) func (q *listBlockDelayQueue) ConsumeTask(ctx context.Context, timeout time.Duration) (string, error) { result, err := q.rdb.BRPop(ctx, timeout, q.listKey).Result() if err != nil { return "", err } return result[1], nil }Redis实现延迟队列的方案由讯客互联人工智能栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Redis实现延迟队列的方案”