济源市网站建设_网站建设公司_支付系统_seo优化
2025/12/17 11:22:49 网站建设 项目流程

当你已经用 MongoDB 承载业务数据,但又希望具备 Elasticsearch 的全文检索、聚合分析与近实时查询能力时,最经典的方案就是:

  • 业务写 MongoDB
  • 同步链路把变更实时投递到 Elasticsearch
  • 查询侧走 Elasticsearch(搜索/统计/多维筛选)

很多人一开始会想到:Kafka + Debezium、Logstash、或者自己写同步服务。但它们要么架构偏重,要么运维成本高,要么“断点续传/一致性/重试/死信队列”很难一次做对。

ElasticRelay 的思路是:单一二进制 + 纯配置驱动 + Go 低资源占用,同时把 CDC(Change Data Capture)常见的工程问题(快照/增量/Checkpoint/DLQ)都内置掉,让你用更轻的方式把链路跑起来。


1. ElasticRelay 的 MongoDB → ES 同步是怎么工作的

核心链路可以理解为四步:

  1. 初始快照(可选):启动时先把目标集合做一次全量扫表(snapshot),把现有数据写入 ES。
  2. 增量同步(Change Streams):开启 MongoDB Change Streams 监听insert/update/replace/delete
  3. Resume Token Checkpoint:每条变更事件都会带一个 resume token;ElasticRelay 会把它落盘(默认mongodb_checkpoints.json),用于断点续传。
  4. ES Bulk 写入:ElasticRelay 将事件转换成 JSON,按批次 bulk 写入 Elasticsearch。

索引命名规则:

  • 默认<index_prefix>-<collection>(collection 会转小写)
  • MongoDB 事件里集合名字段_collection
  • 快照记录里集合名字段_table(ES sink 同时支持_table_collection

2. 前置条件(别跳过)

2.1 MongoDB 必须是 Replica Set 或 Sharded Cluster

Change Streams 的硬要求:

  • MongoDB 4.0+(推荐 6/7)
  • Replica SetSharded Cluster
  • 单机 standalone 不支持 Change Streams(会报 “Change Stream not supported”)

2.2 MongoDB 用户权限(最小化建议)

Change Streams 通常需要:

  • 业务库的readWrite(至少read
  • local库的read(用于 Change Streams/相关内部读取)

仓库自带的 Docker 初始化脚本已经帮你创建了用户:

  • 用户:elasticrelay_user
  • 密码:elasticrelay_pass

2.3 Elasticsearch

  • ES 7.x / 8.x 均可
  • 如果开启了安全认证,准备好user/password

3. 5 分钟跑通(Docker 起 MongoDB + ES,本机跑 ElasticRelay)

下面这套流程尽量复用仓库现成的 compose 与配置文件。

3.1 启动 MongoDB Replica Set(仓库已内置)

在项目根目录执行:

docker-compose up -d mongodb mongodb-init

它会拉起:

  • elasticrelay-mongodb:MongoDB 7.0(副本集 rs0)
  • elasticrelay-mongodb-init:一次性初始化副本集与测试数据

3.2 启动 Elasticsearch

仓库的docker-compose.yml里 Elasticsearch 默认是注释掉的。你可以选择:

  • 方式 A:取消docker-compose.ymlelasticsearch服务的注释,然后执行:
docker-compose up -d elasticsearch
  • 方式 B:使用你已有的 Elasticsearch(只要config/mongodb_config.jsonaddresses指向你的 ES 即可)。

3.3 构建 ElasticRelay

chmod+x scripts/build.sh ./scripts/build.sh

产物默认在bin/elasticrelay

3.4 配置 MongoDB → ES 同步任务

你可以直接使用仓库自带示例:config/mongodb_config.json

其中最关键的四块配置:

  • data_sources:MongoDB 连接信息、监听哪些集合(table_filters
  • sinks:Elasticsearch 地址与索引前缀(index_prefix
  • jobs:把 source 与 sink 绑起来,并配置是否先做全量(initial_sync
  • global:日志、metrics、DLQ 等

最小可跑版本示意(与仓库示例一致):

{"version":"2.0","data_sources":[{"id":"mongodb-primary","type":"mongodb","host":"localhost","port":27017,"user":"elasticrelay_user","password":"elasticrelay_pass","database":"elasticrelay","table_filters":["users","orders","products"]}],"sinks":[{"id":"es-primary","type":"elasticsearch","addresses":["http://localhost:9200"],"options":{"index_prefix":"elasticrelay_mongo"}}],"jobs":[{"id":"mongodb-to-es","source_id":"mongodb-primary","sink_id":"es-primary","enabled":true,"options":{"initial_sync":true}}],"global":{"log_level":"info"}}

3.5 启动 ElasticRelay

./bin/elasticrelay --config config/mongodb_config.json

启动后你会看到类似日志:

  • 连接 MongoDB 成功
  • 开始快照(如果initial_sync=true
  • Change Streams started
  • 写入 ES(自动创建索引)

4. 验证同步:插入/更新/删除都能实时反映到 ES

4.1 查看 ES 索引是否生成

如果index_prefix=elasticrelay_mongo,同步users集合会生成索引:

  • elasticrelay_mongo-users

执行:

curl-X GET"http://localhost:9200/_cat/indices?v"

4.2 查一条数据

curl-X GET"http://localhost:9200/elasticrelay_mongo-users/_search?pretty"\-H'Content-Type: application/json'\-d'{"query":{"match_all":{}},"size":10}'

4.3 触发增量:对 MongoDB 做写入

如果你用的是仓库自带 Docker MongoDB:

dockerexec-it elasticrelay-mongodb mongosh\-u elasticrelay_user -p elasticrelay_pass\--authenticationDatabase admin

然后执行:

use elasticrelay// insertdb.users.insertOne({name:'Test User',email:'test@example.com',age:30,created_at:newDate(),updated_at:newDate()})// updatedb.users.updateOne({email:'test@example.com'},{$set:{age:31,updated_at:newDate()}})// deletedb.users.deleteOne({email:'test@example.com'})

回到 ES 再查一次,你应该能看到变化实时出现(或删除生效)。


5. 运维要点:Checkpoint、重启恢复、DLQ

5.1 Checkpoint(断点续传)

MongoDB CDC 使用 Change Streams 的resume token做 checkpoint。

  • 默认落盘文件:mongodb_checkpoints.json
  • 进程重启:会先尝试加载该文件,从上次位置继续

常见注意事项:

  • 如果 resume token 因 oplog 窗口太短而失效,会出现类似 “Resume token not found / resume of change stream was not possible”。这时需要:
    • 增大 oplog window(生产推荐)
    • 或者清理 checkpoint 重新从当前/全量开始(谨慎)

5.2 DLQ(死信队列)

当 Elasticsearch 不可用、索引创建失败、bulk 写入失败时,ElasticRelay 可以把事件打到 DLQ(避免直接丢数据),并按策略重试。

你可以在配置里打开:

  • global.dlq_config.enabled=true
  • global.dlq_config.storage_path=./dlq

6. 常见坑与排障(高频)

  • Change Stream not supported:MongoDB 不是 replica set/sharded;按副本集方式部署。
  • 认证失败:核对用户名密码;注意authSource(仓库示例走admin)。
  • ES 写入 429/队列堆积:降低写入批次、加大 flush interval、或扩容 ES。
  • 字段映射混乱:MongoDB 文档结构变化大、字段类型不稳定时,建议提前准备 ES template/mapping(或至少保证关键字段类型稳定)。

想要试用 ElasticRelay?查看我们的 GitHub 仓库 或阅读我们的 入门指南。

有问题?加入我们的 社区讨论 或在 Twitter 上联系我们。

ElasticRelay 团队致力于构建更好的数据基础设施工具。关注我们的旅程,我们让实时数据同步变得简单、可靠,并且让每个开发者都能使用。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询