假设有这样一个需求,你需要从 Redis 中持续不断读取数据,并把这些数据写入到 MongoDB 中。你可能会这样写代码:
import json import redis import pymongo client = redis.Redis() handler = pymongo.MongoClient().example.col while True: data_raw = client.blpop('data', timeout=300) if not data_raw: continue data = json.loads(data_raw[1].decode()) handler.insert_one(data)
但这样写有一个问题,就是每来一条数据都要连接一次 MongoDB,大量时间浪费在了网络 I/O上。
于是大家会把代码改成下面这样:
import json import redis import pymongo client = redis.Redis() handler = pymongo.MongoClient().example.col to_be_insert = [] while True: data_raw = client.blpop('data', timeout=300) if not data_raw: continue data = json.loads(data_raw[1].decode()) to_be_insert.append(data) if len(to_be_insert) >= 1000: handler.insert_many(to_be_insert) to_be_insert = []
每凑够1000条数据,批量写入到 MongoDB 中。
现在又面临另外一个问题。假设因为某种原因,我需要更新这个程序,于是我按下了键盘上的Ctrl + C强制关闭了这个程序。而此时to_be_insert列表里面有999条数据将会永久丢失——它们已经被从 Redis 中删除了,但又没有来得及写入 MongoDB 中。
我想实现,当我按下 Ctrl + C 时,程序不再从 Redis 中读取数据,但会先把to_be_insert中的数据(无论有几条)都插入 MongoDB 中。最后再关闭程序。
要实现这个需求,就必须在我们按下Ctrl + C时,程序还能继续运行一段代码。可问题是按下Ctrl + C时,程序就直接结束了,如何还能再运行一段代码?
实际上,当我们按下键盘上的Ctrl + C时,Python 收到一个名为SIGINT的信号。具体规则可以阅读官方文档。收到信号以后,Python 会调用一个信号回调函数。只不过默认的回调函数就是让程序抛出一个 KeyboardInterrupt异常导致程序关闭。现在,我们可以设法让 Python 使用我们自定义的一段函数来作为信号回调函数。
要使用信号,我们需用导入 Python 的signal库。然后自定义一个信号回调函数,当 Python 收到某个信号时,调用这个函数。
所以我们修改一下上面的代码:
import signal import json import redis import pymongo client = redis.Redis() handler = pymongo.MongoClient().example.col stop = False def keyboard_handler(signum, frame): global stop stop = True signal.signal(signal.SIGINT, keyboard_handler) to_be_insert = [] while not stop: data_raw = client.blpop('data', timeout=300) if not data_raw: continue data = json.loads(data_raw[1].decode()) to_be_insert.append(data) if len(to_be_insert) >= 1000: handler.insert_many(to_be_insert) to_be_insert = [] if to_be_insert: handler.insert_many(to_be_insert)
新闻热点
疑难解答