Python操作Mongo一次性加载海量数据的问题
先说问题
实际中我写脚本查询mongo数据做了一个空间换时间的优化,就是将一个满足查询条件的mongo的set的数据一次性全部加载到内存中(只返回id,不返回其他字段),这样在接下来的逻辑中不需要频繁的查mongo数据库了,减少了网络的IO。
但是发现,如果mongo数据库的数据量特别大的话,查询mongo会报这样的错误:
raise CursorNotFound(errmsg, code, response, max_wire_version) pymongo.errors.CursorNotFound: cursor id 335798838154 not found, full error: {'ok': 0.0, 'errmsg': 'cursor id 335798838154 not found', 'code': 43, 'codeName': 'CursorNotFound'}
报错的原因是,在默认情况下MongoDB的游标(cursor)在服务端用有生命周期限制,默认10分钟内数据未安全被读取完的话游标会被MongoDB自动关闭。
修改的方案是:改用分批(分页)加载数据,每批大概10000条,游标的生命周期就是这10000条一批的周期,理论上查10000条数据(只返回id)不会超过游标的默认生命周期的!
原生pymongo脚本的改造方案
# Mongo 配置 # Notice gbm mongo URL # 测试环境 gbm_mongo_url = 'mongodb://xxx' gbm_db_str = 'xxx_ddd'# Mongo 初始化 gbm_mongo_client = pymongo.MongoClient(gbm_mongo_url) gbm_db = gbm_mongo_client[gbm_db_str]print('mongo connected success')def get_all_my_custom_info_custom_id_list():print('开始查询已经做过信息采集的客户:>>>>>>>>>>>>>>>>>')ret_set = set() # 使用 set 避免重复,效率更高batch_size = 10000 # 每次取 1w 条skip = 0while True:cursor = gbm_db["数据量特别大的mongo的set的名字"].find({},{"custom_id": 1} # 只返回 custom_id 这个字段).skip(skip).limit(batch_size)batch = list(cursor)if not batch:breakfor row in batch:custom_id = row.get('custom_id')if custom_id:ret_set.add(str(custom_id))skip += batch_sizeprint(f'已加载 {len(ret_set)} 个 custom_id...')return list(ret_set)
使用ORM操作MongoDB的方案
def get_all_info_ref_custom_id_list():ret_set = set()# 每次取 1w 条batch_size = 10000skip = 0while True:# 使用 ORM 方式分批查询,只获取 custom_id 字段# MyCustomInfo 是对应的mongo模型类的定义batch = MyCustomInfo.objects().only('custom_id').skip(skip).limit(batch_size)batch_count = 0for row in batch:custom_id = row.custom_idif custom_id:ret_set.add(str(custom_id))batch_count += 1if batch_count < batch_size: # 如果返回的数据少于批次大小,说明已经到达末尾breakskip += batch_sizeprint(f'已加载 {len(ret_set)} 个 custom_id...')return list(ret_set)
~~~