当你已经用 MongoDB 承载业务数据,但又希望具备 Elasticsearch 的全文检索、聚合分析与近实时查询能力时,最经典的方案就是:
- 业务写 MongoDB
- 同步链路把变更实时投递到 Elasticsearch
- 查询侧走 Elasticsearch(搜索/统计/多维筛选)
很多人一开始会想到:Kafka + Debezium、Logstash、或者自己写同步服务。但它们要么架构偏重,要么运维成本高,要么“断点续传/一致性/重试/死信队列”很难一次做对。
ElasticRelay 的思路是:单一二进制 + 纯配置驱动 + Go 低资源占用,同时把 CDC(Change Data Capture)常见的工程问题(快照/增量/Checkpoint/DLQ)都内置掉,让你用更轻的方式把链路跑起来。
1. ElasticRelay 的 MongoDB → ES 同步是怎么工作的
核心链路可以理解为四步:
- 初始快照(可选):启动时先把目标集合做一次全量扫表(snapshot),把现有数据写入 ES。
- 增量同步(Change Streams):开启 MongoDB Change Streams 监听
insert/update/replace/delete。 - Resume Token Checkpoint:每条变更事件都会带一个 resume token;ElasticRelay 会把它落盘(默认
mongodb_checkpoints.json),用于断点续传。 - ES Bulk 写入:ElasticRelay 将事件转换成 JSON,按批次 bulk 写入 Elasticsearch。
索引命名规则:
- 默认:
<index_prefix>-<collection>(collection 会转小写) - MongoDB 事件里集合名字段:
_collection - 快照记录里集合名字段:
_table(ES sink 同时支持_table与_collection)
2. 前置条件(别跳过)
2.1 MongoDB 必须是 Replica Set 或 Sharded Cluster
Change Streams 的硬要求:
- MongoDB 4.0+(推荐 6/7)
- Replica Set或Sharded Cluster
- 单机 standalone 不支持 Change Streams(会报 “Change Stream not supported”)
2.2 MongoDB 用户权限(最小化建议)
Change Streams 通常需要:
- 业务库的
readWrite(至少read) local库的read(用于 Change Streams/相关内部读取)
仓库自带的 Docker 初始化脚本已经帮你创建了用户:
- 用户:
elasticrelay_user - 密码:
elasticrelay_pass
2.3 Elasticsearch
- ES 7.x / 8.x 均可
- 如果开启了安全认证,准备好
user/password
3. 5 分钟跑通(Docker 起 MongoDB + ES,本机跑 ElasticRelay)
下面这套流程尽量复用仓库现成的 compose 与配置文件。
3.1 启动 MongoDB Replica Set(仓库已内置)
在项目根目录执行:
docker-compose up -d mongodb mongodb-init它会拉起:
elasticrelay-mongodb:MongoDB 7.0(副本集 rs0)elasticrelay-mongodb-init:一次性初始化副本集与测试数据
3.2 启动 Elasticsearch
仓库的docker-compose.yml里 Elasticsearch 默认是注释掉的。你可以选择:
- 方式 A:取消
docker-compose.yml里elasticsearch服务的注释,然后执行:
docker-compose up -d elasticsearch- 方式 B:使用你已有的 Elasticsearch(只要
config/mongodb_config.json里addresses指向你的 ES 即可)。
3.3 构建 ElasticRelay
chmod+x scripts/build.sh ./scripts/build.sh产物默认在bin/elasticrelay。
3.4 配置 MongoDB → ES 同步任务
你可以直接使用仓库自带示例:config/mongodb_config.json。
其中最关键的四块配置:
data_sources:MongoDB 连接信息、监听哪些集合(table_filters)sinks:Elasticsearch 地址与索引前缀(index_prefix)jobs:把 source 与 sink 绑起来,并配置是否先做全量(initial_sync)global:日志、metrics、DLQ 等
最小可跑版本示意(与仓库示例一致):
{"version":"2.0","data_sources":[{"id":"mongodb-primary","type":"mongodb","host":"localhost","port":27017,"user":"elasticrelay_user","password":"elasticrelay_pass","database":"elasticrelay","table_filters":["users","orders","products"]}],"sinks":[{"id":"es-primary","type":"elasticsearch","addresses":["http://localhost:9200"],"options":{"index_prefix":"elasticrelay_mongo"}}],"jobs":[{"id":"mongodb-to-es","source_id":"mongodb-primary","sink_id":"es-primary","enabled":true,"options":{"initial_sync":true}}],"global":{"log_level":"info"}}3.5 启动 ElasticRelay
./bin/elasticrelay --config config/mongodb_config.json启动后你会看到类似日志:
- 连接 MongoDB 成功
- 开始快照(如果
initial_sync=true) - Change Streams started
- 写入 ES(自动创建索引)
4. 验证同步:插入/更新/删除都能实时反映到 ES
4.1 查看 ES 索引是否生成
如果index_prefix=elasticrelay_mongo,同步users集合会生成索引:
elasticrelay_mongo-users
执行:
curl-X GET"http://localhost:9200/_cat/indices?v"4.2 查一条数据
curl-X GET"http://localhost:9200/elasticrelay_mongo-users/_search?pretty"\-H'Content-Type: application/json'\-d'{"query":{"match_all":{}},"size":10}'4.3 触发增量:对 MongoDB 做写入
如果你用的是仓库自带 Docker MongoDB:
dockerexec-it elasticrelay-mongodb mongosh\-u elasticrelay_user -p elasticrelay_pass\--authenticationDatabase admin然后执行:
use elasticrelay// insertdb.users.insertOne({name:'Test User',email:'test@example.com',age:30,created_at:newDate(),updated_at:newDate()})// updatedb.users.updateOne({email:'test@example.com'},{$set:{age:31,updated_at:newDate()}})// deletedb.users.deleteOne({email:'test@example.com'})回到 ES 再查一次,你应该能看到变化实时出现(或删除生效)。
5. 运维要点:Checkpoint、重启恢复、DLQ
5.1 Checkpoint(断点续传)
MongoDB CDC 使用 Change Streams 的resume token做 checkpoint。
- 默认落盘文件:
mongodb_checkpoints.json - 进程重启:会先尝试加载该文件,从上次位置继续
常见注意事项:
- 如果 resume token 因 oplog 窗口太短而失效,会出现类似 “Resume token not found / resume of change stream was not possible”。这时需要:
- 增大 oplog window(生产推荐)
- 或者清理 checkpoint 重新从当前/全量开始(谨慎)
5.2 DLQ(死信队列)
当 Elasticsearch 不可用、索引创建失败、bulk 写入失败时,ElasticRelay 可以把事件打到 DLQ(避免直接丢数据),并按策略重试。
你可以在配置里打开:
global.dlq_config.enabled=trueglobal.dlq_config.storage_path=./dlq
6. 常见坑与排障(高频)
- Change Stream not supported:MongoDB 不是 replica set/sharded;按副本集方式部署。
- 认证失败:核对用户名密码;注意
authSource(仓库示例走admin)。 - ES 写入 429/队列堆积:降低写入批次、加大 flush interval、或扩容 ES。
- 字段映射混乱:MongoDB 文档结构变化大、字段类型不稳定时,建议提前准备 ES template/mapping(或至少保证关键字段类型稳定)。
想要试用 ElasticRelay?查看我们的 GitHub 仓库 或阅读我们的 入门指南。
有问题?加入我们的 社区讨论 或在 Twitter 上联系我们。
ElasticRelay 团队致力于构建更好的数据基础设施工具。关注我们的旅程,我们让实时数据同步变得简单、可靠,并且让每个开发者都能使用。