Realtime sync from MongoDb to Elasticsearch using change streams

Ashish Modi
3 min readFeb 8, 2020

--

In the world of microservices, using different technologies for different use case is totally fine. Mongodb for primary data store and using elasticsearch for powering advanced search functionalities, logging and analytics.

In this article we will learn about how we can leverage mongodb change streams to setup a real time synchronization between mongodb and elasticsearch. I will be using nodejs for example, but any language can be used to set this up.

Prerequisites

You need basic knowledge of mongodb change streams. What they are and how they work? You can read more on this article Power of MongoDb change streams. You also need basic CRUD operation knowledge of elastic search. I will be using nodejs driver for both mongodb and elasticsearch.

Step 1 — Open Change stream

First step is to start watching a collection for which data needs to be synced to elasticsearch. We need to open two change streams.

  • One stream to listen for insert, update and replaces events.

In this code we are watching a collection and filtering the matches for insert, update and replace operation type.

  • Another stream to listen for delete and react accordingly.

In this code we are watching a collection and filtering the matches for delete operation type.

Step 2— Start listening to changes and react

In this step we will listen to change event emitted from upsert change stream created in step 1 and index data in to elasticsearch.

Next step is to listen to change event emitted from delete change stream and delete data from elasticsearch.

As you can see, using literally two steps, you can create a real time synchronization between mongodb and elasticsearch. It gives you full control if you need to modify the data in between. You could easily add your transformation logic and modify the data before sending it over to elasticsearch.

How to make this more durable and fault tolerant?

The code above is good for local environment but for production environment we need some durability and fault tolerance in case something happens with the overall system. That is where the power of resume token of change streams comes into the picture.

Basically resume tokens are way of mongodb change streams to tell consumer about the current change which is being listened. Consumer can then save the token and restart the change stream from where it was left. To more understand I will highly recommend to read this article.

Let’s modify the code which we did in previous steps and see how we can put resume tokens logic and make it more durable.

This is how updated code will look like for upsert change stream.

and this is how it will look like for delete change stream.

The important thing to note here is the new option we are passing while creating the change stream for upsert and delete.

{"resumeAfter": resumeToken}

This gives change stream the point from where it needs to start the change stream.

Similarly, we need to modify the code where we are consuming the change. We need to make sure that after everything is done, we save the resume token given by change in the form of _id. The code will looks like this

To better manage the token, we can create a separate file which have functions for getting and saving the resume tokens. The code will look like this

So, these little changes will make your solution fault tolerant. If your change stream breaks in between because of connectivity to elasticsearch or something else, you can just restart your app and it will start from where you left it.

Conclusion

Mongodb change stream is a very powerful feature which can be used to build different style of applications. Syncing to elasticsearch is one of the use cases and we have seen above that how easy it is to create a real time sync tool with zero third party requirements. It can purely be done using your own choice of language. I hope you found this post helpful. Feel free to comment in case you have any questions, or you think it can be improved.

You can also look at serverless change notifier for advance use case. This approach can be used to add serverless capability to realtime sync from mongodb to elasticsearch.

You can find the working code on my github repository.

Happy coding!

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

Ashish Modi
Ashish Modi

Written by Ashish Modi

Architect with 14+ years of exp in app’n development & architecture. Worked on node.js, .net, mongodb, serverless, elasticssearch and AWS.

Responses (1)

Write a response