I’ve been working on a long-awaited feature: support for MongoDB Change Streams in Meteor!
This new capability allows you to listen to real-time events directly from MongoDB (like inserts, updates, and deletes), using either a new watchChangeStream api.
I’m working in replace the current Oplog Observe Driver by a Change Stream observer driver, it will makes the meteor reactivity works via chnages streams instead oplog or pooling
This is the first working implementation, and while it’s already functional, I’d love to hear your feedback on the developer experience (DX), naming, ergonomics, and potential edge cases.
Let me know your thoughts, suggestions, or use cases you’d like this to support!
Amazing news! We’ve been using our own homebrew method for a while, and while it held well in production, I would be more comfortable using something already baked into Meteor.
One thing I made sure from the start was to prevent opening too many change streams (we have tens of collections) because that can become a performance bottleneck in specific situations. So we open one stream, something like this:
const WATCHABLE_COLLECTION_NAMES = ['CollectionA', 'CollectionB']
const db = this.client.db();
const pipeline = [{
$match: {
'ns.coll': { $in: WATCHABLE_COLLECTION_NAMES }
}
}];
this.changeStream = db.watch(pipeline);
//... then separate logic for filtering and processing the events
// try { const change = await this.changeStream.next(); } catch (err) {}
// ...
The approach makes sense in our scenario, where we have to watch most documents and the number of events is not huge.
How does it work in your example above, would something like this open two streams?
const changeStream = MyCollection.watchChangeStream([
{ $match: { operationType: 'insert' } }
]);
//...
// then later on, maybe in a separate module or package
const changeStream2 = MyCollection.watchChangeStream([
{ $match: { operationType: 'delete' } }
]);
I’m not sure how I feel about the changeStreams flag in observeChanges. Might be easy to miss. Also as radekmie mentioned in the PR comments, I think they represent two different things. Maybe something like .find().watchChanges() is more appropriate. What were the other options that you considered?
Why not just use Collection.watch instead of Collection.watchChangeStream? This would align more closely with the nodejs mongo driver.
Thank you for your suggestions @jam , I’ll definitely take a look at your package!
To be honest, the .find().watchChanges() method is only included for backward compatibility purposes. I’m not sure if I’ll keep it; it will depend on the feedback from the community here.
As for the current MyCollection.watchChangeStream, I agree — I’ll rename it to MyCollection.watch. Thanks for the suggestion!
That’s a great setup I really liked your scenario @illustreets!
It makes total sense to avoid opening multiple change streams when you’re dealing with many collections. Thanks for sharing that pattern; it’s definitely valuable.
And yes, in the current implementation, calling MyCollection.watchChangeStream() multiple times like in your example would open separate streams. But I agree with your approach — it’s much more efficient to centralize and filter events on a single stream when possible.
I’ll incorporate this idea into the next push for sure!
I hope I’m not missing the mark but I’m trying to look at this from the perspective of someone who’s entirely new to Meteor (Which is I’m since I’m constantly learning new things about Meteor everyday!!).
How does this impact the current state of Meteor? Do meteor applications scale better now? Do we get performance improvements? If so, by how much? What about backwards compatibility? What migration paths are we talking about here for old applications to reap the benefits of this new feature?
Also, why wasn’t @jam package integrated into the core? I feel its DX is way better and offers other cool benefits. Mainly subs caching which I feel is a huge missing part of pub-sub in Meteor.
The efforts to improve pub-sub are split into the following categories:
Observing Changes; How Meteor learns about changes (polling, redis-oplog and change streams),
Splitting Responsibility; Does the server handle calculating which documents got changed and which didn’t? Or maybe it’s the client responsibility. (mergebox and publication strategies)
Websockets; The method of communicating such changes (sockjs). I read somewhere that ws is more performant. This also has to do with DX which feathersjs comments on it compared to Meteor.
You @italojs implemented one of @zodern’s packages into the core so what prevented it this time?
After this PR gets merged when you do Meteor.publish it’d utilize change-streams instead of polling? Or I’d have to explicitly enable it by tapping into these weird APIs watchChangeStream or setting true on observeChanges?
In a nutshell, how’d this code be impacted in any shape/form or performance post this update?
import { Meteor } from 'meteor/meteor';
import { TasksCollection } from '/imports/db/TasksCollection';
Meteor.publish('tasks', function publishTasks() {
return TasksCollection.find({ userId: this.userId });
});
Haha, dont horry my friend, your questions is AWESOME, to be hosnest I spent all this time to reply you because I didn’t have the answers yet.
Change streams are more efficient than the traditional oplog tailing approach for several reasons:
With oplog tailing, Meteor must continuously poll and process the entire MongoDB oplog, filtering out irrelevant changes for each collection and user. This results in unnecessary CPU and memory usage, especially as the number of collections, users, or writes increases.
Change streams, on the other hand, allow Meteor to subscribe directly to the specific changes relevant to the application, collections, or even queries. This means the server receives only the events it actually needs, reducing the amount of data processed and minimizing resource consumption.
Change streams are natively supported by MongoDB and are designed for scalability, providing a more direct and efficient notification mechanism for data changes.
In practice, this leads to lower latency for data updates, less server overhead, and better scalability for large applications.
About the performance improvements, my last benchmark using our benchmark repo said:
12–35% faster on critical web metrics
27% lower memory usage
14% faster user sessions
Additionally, change streams reduce the risk of missing events during high write loads and simplify the architecture for real-time data propagation, making Meteor more robust and future-proof for modern applications.
I will mantain the old Oplog and Pooling implementation, so if you arent able to use change stream or just dont want to do it, you will flag it and it SHOULD be transparent for you, any change else will be required(i hope).
He did an awesome work for sure but specifically the change stream part i’m visuallizing it in another way, instead add change stream as a new feature using a new API for it, I want to modify the current meteor’s reaticity system that uses oplog, to use change stream as the default option. If you take a look in my implementation, you will see what i’m talking about(feel free for more questions)
The sub cache feature is really impressive but here I’m trying to close only on change stream, I see we bringing it in another front/moment.
Until now is just to enable it in the package.json, nothing else.