延时队列,顾名思义,是为了让一些任务不立即执行,放到队列里面等到特定时间后再执行。常用的场景有:
订单一直处于未支付状态,需要及时关闭订单,并退还库存
用户通过遥控设备控制智能设备在指定时间进行工作
eFuture 中未来邮件需要在用户指定的时间点发送
eFuture 是我最近刚完成的一个项目,主要目的是提供“未来邮件”的服务。其中,邮件需要被存储并且需要在特定时间点发送。于是我就需要一个延时队列来保存发信任务,并在指定时间由程序消费任务,从而发送邮件。
对于 eFuture 来说,有两件事是最重要的:
未来邮件在延时任务队列中保存,并且不会因为系统故障(如突发停机)而丢失
未来邮件需要在指定日期被取出并消费掉(发送出)
这两点需求意味着,提供延时任务的组件需要具备容灾能力,所以常用的 Java 内部的实现就不行了(虽说我也没用 Java 来写这个项目)。一番搜索之后,发现常用的实现有两个符合我的要求:RabbitMQ 与 Redis。
RabbitMQ RabbitMQ 本身并没有直接支持延时队列功能,但是我们可以通过 RabbitMQ 队列的特性模拟出延时队列的功能。
RabbitMQ 在创建 Queue 的时候可以指定一个 x-dead-letter-exchange
的选项,指定该选项后, Queue 中过期的消息都将自动转发到相应的 Exchange。我们只需要在对应的 Exchange 上绑定接收过期任务的队列即可。
Python 代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import loggingimport threadingimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost' , port=5672 , virtual_host='/' , credentials=pika.PlainCredentials('user' , 'user' ) )) channel = connection.channel() logging.basicConfig( level=logging.INFO, format ='%(asctime)s [%(levelname)s] %(message)s' , datefmt='%H:%M:%S' ) channel.exchange_delete('msg.exchange' ) channel.queue_delete('msg.queue.dead' ) channel.queue_delete('msg.queue.task' ) channel.exchange_declare('msg.exchange' ) channel.queue_declare('msg.queue.dead' , arguments={'x-dead-letter-exchange' : 'msg.exchange' }) channel.queue_declare('msg.queue.task' ) channel.queue_bind(queue='msg.queue.task' , exchange='msg.exchange' , routing_key='msg.queue.dead' )
上面这段代码主要做了:
利用 pika 连接到 RabbitMQ
配置 logging
清除已有的同名 Exchange 与 Queue
创建 Exchange 与 Queue 并对其中一个队列设置 dead-letter-exchange
。
下面开始测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def push (): for i in range (1 , 6 ): logging.info('push {}' .format (i)) channel.publish( exchange='' , routing_key='msg.queue.dead' , body="hello, {}" .format (i), properties=pika.spec.BasicProperties(content_type="text/plain" , expiration=str (1000 * i)) ) def pop (): for method, properties, body in channel.consume('msg.queue.task' ): logging.info('receive {}' .format (body)) if __name__ == '__main__' : tPush = threading.Thread(target=push) tPop = threading.Thread(target=pop) logging.info('start push...' ) tPush.start() logging.info('start pop...' ) tPop.start()
push
依次向 RabbitMQ 的 msg.queue.dead
中发布了 hello, 1
, hello, 2
, hello, 3
, hello, 4
, hello, 5
,其中 hello, i
的过期时间为 i
秒。
pop
监听 RabbitMQ 的 msg.queue.task
,一旦其中有数据就将其取出进行消费。
运行一下:
1 2 3 4 5 6 7 8 9 10 11 12 15:00:50 [INFO] start push... 15:00:50 [INFO] push 1 15:00:50 [INFO] start pop... 15:00:50 [INFO] push 2 15:00:50 [INFO] push 3 15:00:50 [INFO] push 4 15:00:50 [INFO] push 5 15:00:51 [INFO] receive b'hello, 1' 15:00:52 [INFO] receive b'hello, 2' 15:00:53 [INFO] receive b'hello, 3' 15:00:54 [INFO] receive b'hello, 4' 15:00:55 [INFO] receive b'hello, 5'
看起来我们似乎成功实现了延迟任务队列,在上面的测试用例下这个队列也工作的很好。但是,上面的实现中却有一个致命缺陷,我们把 push
函数稍稍修改一下:
1 2 3 4 5 6 7 8 9 10 def push (): for i in range (5 , 0 , -1 ): logging.info('push {}' .format (i)) channel.publish( exchange='' , routing_key='msg.queue.dead' , body="hello, {}" .format (i), properties=pika.spec.BasicProperties(content_type="text/plain" , expiration=str (1000 * i)) )
再次运行:
1 2 3 4 5 6 7 8 9 10 11 12 15:02:47 [INFO] start push... 15:02:47 [INFO] push 5 15:02:47 [INFO] start pop... 15:02:47 [INFO] push 4 15:02:47 [INFO] push 3 15:02:47 [INFO] push 2 15:02:47 [INFO] push 1 15:02:52 [INFO] receive b'hello, 5' 15:02:52 [INFO] receive b'hello, 4' 15:02:52 [INFO] receive b'hello, 3' 15:02:52 [INFO] receive b'hello, 2' 15:02:52 [INFO] receive b'hello, 1'
可以看到,我们只是把插入消息的顺序调整了一下,队列就失去了作用。在新的 push
函数中,我们把过期时间长的任务先插入队列 msg.queue.dead
,可以看到,只有在第一个消息过期并被转发到 msg.queue.task
后,后面的过期的消息才会被处理。
换言之,如果在 msg.queue.dead
中存在先后两个 task,第一个 task 过期时间 10s,第二个过期时间 1s,两个 task 是同时插入队列的。那么只有等第一个 task 过期从队列里移出后,第二个 task 才会被处理,尽管它在第一个 task 过期前就已经过期了 。从中我们也可以看出 RabbitMQ 对于声明了 x-dead-letter-exchange
的队列中过期消息的处理策略:只检测最前面一个消息是否过期,如果过期就转发到指定的 Exchange,如果没有过期就不作任何处理。
RabbitMQ 的这种特性显然与我们的需求是相悖的,我们需要的是队列头元素始终是最接近过期的,所以我又将目光转向了 Redis。
Redis Redis 内部也没有对延时队列做支持,我们需要使用 Redis 的 zset
来手动实现。ZSet 是 Redis 内置的数据结构之一,其特性是值依据对应的 score 排序,内部使用 HashMap 与 SkipList 来存储数据并保证有序,HashMap 中存放的是值到 score 的映射,SkipList 中存放的是所有值,排序依据是 score。
利用 Redis 实现延时队列主要是就利用 ZSet,将值设置为 task,而 score 设置为任务执行时间点的 timestamp。然后 通过比较当前时间戳与 zset 中第一个元素的 score 来判断其是否过期 。
Python 代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import loggingimport datetimeimport redisQUEUE_KEY = 'delayed task queue' pool = redis.ConnectionPool(host='localhost' , port=6379 , db=0 , password='' ) connection = redis.Redis(connection_pool=pool) logging.basicConfig( level=logging.INFO, format ='%(asctime)s [%(levelname)s] %(message)s' , datefmt='%H:%M:%S' ) def push (message: str , date: datetime.datetime ): connection.zadd(QUEUE_KEY, message, date.timestamp()) def pop (): task = connection.zrange(QUEUE_KEY, 0 , 0 ) if not task: return False , '' message = task[0 ] timestamp = connection.zscore(QUEUE_KEY, message) now = datetime.datetime.now().timestamp() if timestamp < now or abs (timestamp - now) <= 1e-6 : connection.zrem(QUEUE_KEY, message) return True , message return False , '' if __name__ == '__main__' : now = datetime.datetime.now() logging.info('push hello 1' ) push('hello 1' , now + datetime.timedelta(seconds=3 )) logging.info('push hello 2' ) push('hello 2' , now + datetime.timedelta(seconds=7 )) while True : boolean, message = pop() if boolean: logging.info(message)
代码意思应该很明了,我们运行上述代码:
1 2 3 4 15:30:17 [INFO] push hello 1 15:30:17 [INFO] push hello 2 15:30:20 [INFO] b'hello 1' 15:30:24 [INFO] b'hello 2'
修改两个 task 的过期时间:
1 2 3 4 5 6 7 now = datetime.datetime.now() logging.info('push hello 1' ) push('hello 1' , now + datetime.timedelta(seconds=7 )) logging.info('push hello 2' ) push('hello 2' , now + datetime.timedelta(seconds=3 ))
然后再次运行:
1 2 3 4 15:33:12 [INFO] push hello 1 15:33:12 [INFO] push hello 2 15:33:15 [INFO] b'hello 2' 15:33:19 [INFO] b'hello 1'
可以看到,如我们预期正确的执行了。
在代码中 while True
无限循环尝试取出元素是对 CPU 资源的一种浪费,同时也给 Redis 产生了不小的压力,我们可以放缓速度,比如每秒检测一次:
1 2 3 while True : time.sleep(1 ) boolean, message = pop()
总结 除了文中提到的两种实现方式,延时队列还可以使用数据库定期轮询、DelayQueue、Timer、时间轮、Quartz 等方式实现,本文中没有讨论这些,等将来碰到相关使用场景后,或许会填这个坑 :)
使用 Redis 作为延时任务队列的实现,在具体应用的时候我们需要开启 Redis 的持久化,最好两种方式同时开启,同时定期备份 Redis 持久化文件,最大程度避免任务丢失。