MongoDB and ElasticSearch sync module for node (support attachment sync)
Supports one-to-one and one-to-many relationships.
Chinese Documentation - 中文文档
- one-to-one - one mongodb collection to one elasticsearch index
- one-to-many - one mongodb collection to one elasticsearch server many indexs, or one mongodb collection to many elasticsearch servers one index
elasticsearch: v6.1.2
mongodb: v3.6.2
Nodejs: v8.9.3
node-mongodb-es-connector package keeps your mongoDB collections and elastic search cluster in sync. It does so by tailing the mongo oplog and replicate whatever crud operation into elastic search cluster without any overhead. Please note that a replica set is needed for the package to tail mongoDB.(support attentment sync)
npm install es-mongodb-sync
or Download from GitHub.
Create a file in the crawlerData folder,the Naming rules is ElasticSearchIndexName.json
or any name .json
If you have more additional configuration in the crawlerData
For example:
"mongodb": {
"m_database": "myTest",
"m_collectionname": "books",
"m_filterfilds": {
"version" : "2.0"
"m_returnfilds": {
"bName": 1,
"bPrice": 1,
"bImgSrc": 1
"m_extendfilds": {
"bA": "this is a extend fild bA",
"bB": "this is a extend fild bB"
"m_extendinit": {
"m_comparefild": "_id",
"m_comparefildType": "ObjectId",
"m_startFrom": "2018-07-20 13:44:00",
"m_endTo": "2018-07-20 13:46:59"
"m_connection": {
"m_servers": [
"m_authentication": {
"username": "UserAdmin",
"password": "pass1234",
"m_documentsinbatch": 5000,
"m_delaytime": 1000
"elasticsearch": {
"e_index": "mybooks",
"e_type": "books",
"e_connection": {
"e_server": "http://localhost1:9200,http://localhost2:9200,http://localhost3:9200",
"e_httpauth": {
"username": "EsAdmin",
"password": "pass1234"
"e_pipeline": "mypipeline",
"e_iscontainattachment": true
- m_database - MongoDB dataBase to watch. (required)
- m_collectionname - MongoDB collection to watch. (required)
- m_filterfilds - MongoDB filterQuery,support simple filter.(Default value is
). (required) - m_returnfilds - MongoDB need to return to the field.(Default value is
). (required) - m_extendfilds - MongoDB expand field.(can default key and value). (selective)
- m_extendinit - Mongodb initialization supplemental configuration. (Default value is
). (selective)- m_comparefild - MongoDB compare fild.(Default value is
or other). (selective) - m_comparefildType - MongoDB compare fild type.(Default value is
). (selective) - m_startFrom - StartTime.(Default value is a DateTime). (selective)
- m_endTo - EndTime.(Default value is a DateTime). (selective)
- m_comparefild - MongoDB compare fild.(Default value is
- m_connection (required)
- m_servers - MongoDB servers.(Array). (required)
- m_authentication - If you do not need to verify the default value is
. (required)- username - MongoDB connection userName. (required)
- password - MongoDB connection passWord. (required)
- authsource - MongoDB user authentication. (required)
- replicaset - MongoDB replicaSet name. (required)
- ssl - MongoDB ssl.(Default value is
). (selective)
- m_url - replace
(Either-or) (selective). - m_documentsinbatch - An integer that specifies number of documents to send to ElasticSearch in batches (can be set to very high number). (required)
- m_delaytime - Number of milliseconds between batches the default value is
ms. (required) - e_index - ElasticSearch index where documents from watcher collection is saved. (required)
- e_type - ElasticSearch type given to documents from watcher collection. (required)
- e_connection (required)
- e_server - URL to a running ElasticSearch cluster. (required)
- e_httpauth - If you do not need to verify the default value is
. (selective)- username - ElasticSearch connection userName. (selective)
- password - ElasticSearch connection passWord. (selective)
- e_pipeline - ElasticSearch pipeline name. (selective)
- e_iscontainattachment - Is or not contain attachment the default value is
. (selective)
node app.js
index.js (only crud config json )
1.start() - must start up before all the APIs.
2.addWatcher() - add a config json.
Name | Type |
fileName | string |
obj | jsonObject |
return: true or false
3.updateWatcher() - update a config json.
Name | Type |
fileName | string |
obj | jsonObject |
return: true or false
4.deleteWatcher() - delete a config json.
Name | Type |
fileName | string |
return: true or false
5.isExistWatcher() - check out this config json exist.
Name | Type |
fileName | string |
return: true or false
6.getInfoArray() - get every config status.(waiting/initialling/running/stoped).
- v1.1.12 - update promise plugin,and referencing the Bluebird plugin in the project.Real-time synchronization in support of more than 1000 indexes.Message queues using promise.
- v2.0.0 - support elasticsearch pipeline aggregations and attachment into elasticsearch.
- v2.0.12 - add watch config file sync status(
). - v2.0.18 - update logs directory.
- v2.1.1 - update init method (master doc->attachment).
- v2.1.8 - use promise queue (init and mongo-oplog).
- v2.1.9 - add
. - v2.1.16 - add timed task about watch mongodb, add timestamp for init data, cancel full data synchronization in init.
- v2.1.20 - support elasticsearch cluster synchronization.
- v2.1.21 - support configuration file encryption.
Install Ingest Attachment Processor Plugin
more Elasticsearch Pipeline knowledge:
prepare make a pipeline in elasticsearch
PUT _ingest/pipeline/mypipeline
"description" : "Extract attachment information from arrays",
"processors" : [
"foreach": {
"field": "attachments",
"processor": {
"attachment": {
"target_field": "_ingest._value.attachment",
"field": ""
- mongodbData
- elasticsearch
The MIT License (MIT). Please see LICENSE for more information.