MongoDB Change Streams
Mongodb Change stream is a very powerful feature which fits very well with modern day applications.

In today’s world everything is becoming real time and everyone wants push notifications for every insert, update and delete. Streaming and events are the backbone of modern architecture. Microservices that subscribe to a particular category of events and immediately reacts to them. This is where MongoDB’s change streams fit in.
Defining Change Streams
Change streams are real time stream of any changes that might occur on collection, database or deployment. Whenever any insert, update or delete is made, MongoDB triggers a change with data which can be consumed by all the clients listening to it. Some of the characteristics are
- Filter, focus and fits very well with existing aggregation framework
- It scales across different nodes.
- Resumable, retriable
- Total ordering.
Prerequisites
In order to use MongoDB change streams there are few requirements
- One should be running MongoDB 3.6+
- MongoDB must be deployed as Replica Set.
For creating replica sets, follow this link, and for converting a standalone to a replica, follow this link.
How Change Stream Works?
To understand how change stream works, let’s first understand two very important concepts.
Oplog
Oplog is MongoDB’s internal capped collection which is used to replicate data from one server to another. Capped collection are special types of collections which overwrites the documents once defined cap is reached.
Replica Set
Replica sets are MongoDB’s way to provide durability and fault tolerance. This means that at any point of time data is available on multiple servers so in case one of the servers go down, rest of the servers can keep the database up. MongoDB uses oplog to replicate the data.
Oplog and replica set are used by change streams to provide the changes happened on a collection or database.
Why not use Oplog?
I am sure this question comes to everyone’s mind that if change stream uses oplog, why not directly use it? You can tail the oplog to get the changes without using change streams. This is definitely possible but there are several reasons why one shouldn’t do that
- Tailing means opening an indefinite cursor which is obviously not clean and good idea.
- The format in which you get change is propriety and subject to change.
- Since you are tailing oplog, it will have even a small detail which makes it very noisy.
How to consume change streams?
Consuming change streams is very straight forward and easy. Throughout this article I will be using node.js driver to give examples but you can literally use any modern language driver. You can find list of drivers here .
Creating a change stream is as easy as this
const changeStream = someCollection.watch([])
And this is how you use it.
changeStream.on("change", data => { // do something with the change data here.});changeStream.on("error", data => { // log error here.});
Watch method returns you a nodejs stream which can be piped to any stream. It accepts an aggregation pipeline as parameter. That is the most powerful feature of change stream. You can hook in to almost all operators which doesn’t require more than one document while watching the stream. You can say that I am only interested in inserts where id is greater than 100.
someCollection.watch([
{
"$match": {
"operationType": {
"$in": [
"insert"
]
},
"fullDocument.id": {
$gt: 100
}
}
}
])
If you are only interested in updates of a particular user, this can be achieved like this
someCollection.watch([
{
"$match": {
"operationType": {
"$in": [
"update"
]
},
"updateDescription.updatedFields.username": "myUsername"
}
}
])
You can watch insert, update, delete, drop, invalidate etc events. Full list can be found here.
What if my change stream stops in between?
As we have seen using change stream is very easy and straight forward but in real world scenarios things are not as simple. Things could go wrong. Your database might get corrupted, your application might crash or stop etc etc. To make sure you always have a resilient system in place which is capable to restart the change stream where you left is super important. That is where the capability of resuming change streams comes into the picture. Each response in the stream contains the resume token (_id) which can be used to restart the stream from a specific point. Here is how you could make a resumable change stream on a production environment.
const resumeToken = await getResumeTokenFromDb();db.someCollection.watch([
{
"$match": {
"operationType": {
"$in": [
"insert"
]
}
}
}
], {"resumeAfter" : resumeToken});changeStream.on("change", async data => { // do something with the change data here.
await saveYourResumeTokenInDb(data._id);});
There are other ways you can reusume the change stream like startAfter, startAtOperationTime etc.
Production Deployment Recommendations
There are few things one should keep in mind while using change streams in production environment. Like
- Always add capability to resume and start the change streams. This will help to build a more durable, resilient and fault tolerant system. See this section for more details.
- If significant downtime is estimated, such as for an upgrade or a significant disaster, consider increasing the size of the oplog such that operations are retained for a duration of time greater than the estimated downtime.
- Since change streams best suits message or event-based systems. Always make the consumption of the change idempotent. That means even if you apply the change multiple times, make sure it results in the same output.
Use cases
There could be many use cases to use change streams. Some of them are listed below. This list is definitely not exhaustive. Feel free to add in comments if you have used it for some purpose which is not listed here
- Real time notifications
- Real time Analytics
- Real time logging, audit etc
- Used in data pipeline as part of ETL.
- Microservices.
MongoDb change streams provides some benefits over message-based architecture like reduced infrastructure, simpler application and real time. If you are looking for some real use case, see my post regarding Realtime sync from MongoDb to Elasticsearch using change streams. You can also look at serverless change notifier for advance use case.
I have added the code used in this post and some more examples in my github repo. You can find them here
Happy coding!