MongoDB Change Streams support in Meteor – feedback wanted!

Hey everyone! :wave:

I’ve been working on a long-awaited feature: support for MongoDB Change Streams in Meteor!

This new capability allows you to listen to real-time events directly from MongoDB (like inserts, updates, and deletes), using either a new watchChangeStream method or by enabling it inside the existing observeChanges API. All of this happens without relying on polling :tada:

I just opened a PR in our github.

:white_check_mark: What’s working so far

Mongo.Collection.watchChangeStream

You can now open a MongoDB Change Stream directly on a server-side collection:

// server.js 
const changeStream = MyCollection.watchChangeStream([
  { $match: { operationType: 'insert' } }
]);

changeStream.on('change', (change) => {
  console.log('Detected change event:', change);
});
changeStream.on('error', (err) => {
  console.error('Change stream error:', err);
});

observeChanges

Having an easy migration in mind, you will be able to enable Change Streams with the changeStreams: true option. Events will be automatically mapped to the usual added, changed, and removed callbacks:

// server.js 
const handle = MyCollection.find().observeChanges({
  changeStreams: true, // <-----NEW FLAG
  added: (id, fields) => {
    console.log('Added', id, fields);
  },
  changed: (id, fields) => {
    console.log('Changed', id, fields);
  },
  removed: (id) => {
    console.log('Removed', id);
  },
  onError: (err) => {
    console.error('observeChanges error:', err);
  }
});

:warning: Still a Work In Progress

This is the first working implementation, and while it’s already functional, I’d love to hear your feedback on the developer experience (DX), naming, ergonomics, and potential edge cases.

Let me know your thoughts, suggestions, or use cases you’d like this to support!

Thanks :pray:

4 Likes

Great work!

I assume you meant to write observeChangesAsync in the example?

1 Like

Looks interesting! Does find() take in a query in the const handle example?

I imagine not too hard to whip up a similar feature on the client using Tracker

Amazing news! We’ve been using our own homebrew method for a while, and while it held well in production, I would be more comfortable using something already baked into Meteor.

One thing I made sure from the start was to prevent opening too many change streams (we have tens of collections) because that can become a performance bottleneck in specific situations. So we open one stream, something like this:

const WATCHABLE_COLLECTION_NAMES = ['CollectionA', 'CollectionB']
const db = this.client.db();
const pipeline = [{
    $match: {
        'ns.coll': { $in: WATCHABLE_COLLECTION_NAMES }
    }
}];
this.changeStream = db.watch(pipeline);
//... then separate logic for filtering and processing the events
// try { const change = await this.changeStream.next(); } catch (err) {}
// ...

The approach makes sense in our scenario, where we have to watch most documents and the number of events is not huge.

How does it work in your example above, would something like this open two streams?

const changeStream = MyCollection.watchChangeStream([
  { $match: { operationType: 'insert' } }
]);
//...
// then later on, maybe in a separate module or package
const changeStream2 = MyCollection.watchChangeStream([
  { $match: { operationType: 'delete' } }
]);
2 Likes

Nice this will be great to have in Meteor!

Some initial thoughts:

  1. I’m not sure how I feel about the changeStreams flag in observeChanges. Might be easy to miss. Also as radekmie mentioned in the PR comments, I think they represent two different things. Maybe something like .find().watchChanges() is more appropriate. What were the other options that you considered?
  2. Why not just use Collection.watch instead of Collection.watchChangeStream? This would align more closely with the nodejs mongo driver.

I hope that the Meteor team will steal the ideas and functionality from jam:pub-sub. :slight_smile: I don’t think it gets easier from a DX perspective than Meteor.publish.stream. See here for more info on how it uses change streams: GitHub - jamauro/pub-sub: Publish / subscribe using a Method and/or Change Streams, and cache subscriptions for Meteor apps

6 Likes

Thank you for your suggestions @jam , I’ll definitely take a look at your package!

To be honest, the .find().watchChanges() method is only included for backward compatibility purposes. I’m not sure if I’ll keep it; it will depend on the feedback from the community here.

As for the current MyCollection.watchChangeStream, I agree — I’ll rename it to MyCollection.watch. Thanks for the suggestion!

That’s a great setup I really liked your scenario @illustreets!

It makes total sense to avoid opening multiple change streams when you’re dealing with many collections. Thanks for sharing that pattern; it’s definitely valuable.

And yes, in the current implementation, calling MyCollection.watchChangeStream() multiple times like in your example would open separate streams. But I agree with your approach — it’s much more efficient to centralize and filter events on a single stream when possible.

I’ll incorporate this idea into the next push for sure! :raised_hands:

@permb still didnt worked on observeChangesAsync, I was expecting to know the community opnion first

@msavin i dont think so, do you have any sceanario in mind?

Hurr durr

I hope I’m not missing the mark but I’m trying to look at this from the perspective of someone who’s entirely new to Meteor (Which is I’m since I’m constantly learning new things about Meteor everyday!!).

How does this impact the current state of Meteor? Do meteor applications scale better now? Do we get performance improvements? If so, by how much? What about backwards compatibility? What migration paths are we talking about here for old applications to reap the benefits of this new feature?

Also, why wasn’t @jam package integrated into the core? I feel its DX is way better and offers other cool benefits. Mainly subs caching which I feel is a huge missing part of pub-sub in Meteor.

The efforts to improve pub-sub are split into the following categories:

  • Observing Changes; How Meteor learns about changes (polling, redis-oplog and change streams),
  • Splitting Responsibility; Does the server handle calculating which documents got changed and which didn’t? Or maybe it’s the client responsibility. (mergebox and publication strategies)
  • Subs-caching; How often the client re-fetches those changes which packages like meteorhacks:subs-manager and ccorcos:subs-cache took a stab at.
  • Websockets; The method of communicating such changes (sockjs). I read somewhere that ws is more performant. This also has to do with DX which feathersjs comments on it compared to Meteor.

You @italojs implemented one of @zodern’s packages into the core so what prevented it this time?


After this PR gets merged when you do Meteor.publish it’d utilize change-streams instead of polling? Or I’d have to explicitly enable it by tapping into these weird APIs watchChangeStream or setting true on observeChanges?

In a nutshell, how’d this code be impacted in any shape/form or performance post this update?

import { Meteor } from 'meteor/meteor';
import { TasksCollection } from '/imports/db/TasksCollection';

Meteor.publish('tasks', function publishTasks() {
  return TasksCollection.find({ userId: this.userId });
});
9 Likes