通化市网站建设_网站建设公司_Redis_seo优化
2026/1/20 10:23:08 网站建设 项目流程

Python操作Mongo一次性加载海量数据的问题

先说问题

实际中我写脚本查询mongo数据做了一个空间换时间的优化,就是将一个满足查询条件的mongo的set的数据一次性全部加载到内存中(只返回id,不返回其他字段),这样在接下来的逻辑中不需要频繁的查mongo数据库了,减少了网络的IO。

但是发现,如果mongo数据库的数据量特别大的话,查询mongo会报这样的错误:

raise CursorNotFound(errmsg, code, response, max_wire_version)
pymongo.errors.CursorNotFound: cursor id 335798838154 not found, full error: {'ok': 0.0, 'errmsg': 'cursor id 335798838154 not found', 'code': 43, 'codeName': 'CursorNotFound'}

报错的原因是,在默认情况下MongoDB的游标(cursor)在服务端用有生命周期限制,默认10分钟内数据未安全被读取完的话游标会被MongoDB自动关闭。

修改的方案是:改用分批(分页)加载数据,每批大概10000条,游标的生命周期就是这10000条一批的周期,理论上查10000条数据(只返回id)不会超过游标的默认生命周期的!

原生pymongo脚本的改造方案

# Mongo 配置
# Notice gbm mongo URL
# 测试环境
gbm_mongo_url = 'mongodb://xxx'
gbm_db_str = 'xxx_ddd'# Mongo 初始化
gbm_mongo_client = pymongo.MongoClient(gbm_mongo_url)
gbm_db = gbm_mongo_client[gbm_db_str]print('mongo connected success')def get_all_my_custom_info_custom_id_list():print('开始查询已经做过信息采集的客户:>>>>>>>>>>>>>>>>>')ret_set = set()  # 使用 set 避免重复,效率更高batch_size = 10000  # 每次取 1w 条skip = 0while True:cursor = gbm_db["数据量特别大的mongo的set的名字"].find({},{"custom_id": 1} # 只返回 custom_id 这个字段).skip(skip).limit(batch_size)batch = list(cursor)if not batch:breakfor row in batch:custom_id = row.get('custom_id')if custom_id:ret_set.add(str(custom_id))skip += batch_sizeprint(f'已加载 {len(ret_set)} 个 custom_id...')return list(ret_set)

使用ORM操作MongoDB的方案

def get_all_info_ref_custom_id_list():ret_set = set()# 每次取 1w 条batch_size = 10000skip = 0while True:# 使用 ORM 方式分批查询,只获取 custom_id 字段# MyCustomInfo 是对应的mongo模型类的定义batch = MyCustomInfo.objects().only('custom_id').skip(skip).limit(batch_size)batch_count = 0for row in batch:custom_id = row.custom_idif custom_id:ret_set.add(str(custom_id))batch_count += 1if batch_count < batch_size:  # 如果返回的数据少于批次大小,说明已经到达末尾breakskip += batch_sizeprint(f'已加载 {len(ret_set)} 个 custom_id...')return list(ret_set)

~~~

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询