Pub/Sub with Mongodb Change Stream

The idea is using built-in Mongodb Change Stream to create publications. Hopefully by not using oplog we can have some performance boost.

const handleChangeStreamEvents = <TSchema extends { [key: string]: any }>(
  self: Subscription,
  collectionName: string,
  doc: ChangeEvent<TSchema>
) => {
  console.log("changed", doc);
  switch (doc.operationType) {
    case "replace":
      console.log("call changed");
      doc.fullDocument &&
        self.changed(collectionName, doc.documentKey._id, doc.fullDocument);
      break;
    case "insert":
      console.log("call added");
      doc.fullDocument &&
        self.added(collectionName, doc.documentKey._id, doc.fullDocument);
      break;
    case "delete":
      self.removed(collectionName, doc.documentKey._id);
      break;
    case "update":
      const fields: Partial<TSchema> = {};
      doc.updateDescription.removedFields.map((item) => {
        fields[item] = undefined;
      });
      Object.keys(doc.updateDescription.updatedFields).map(
        (item: keyof TSchema) => {
          fields[item] = doc.updateDescription.updatedFields[item];
        }
      );
      self.changed(collectionName, doc.documentKey._id, fields);
      break;
    case "drop":
    case "dropDatabase":
    case "rename":
    case "invalidate":
      self.stop();
      break;
    default:
      break;
  }
};

Meteor.publish("links.all", function () {
  const collectionName = LinksCollection.rawCollection().collectionName;

  const links = LinksCollection.find();
  links.map((link) => {
    this.added(collectionName, link._id, link);
  });

  const changeStream = LinksCollection.rawCollection().watch();
  changeStream.on("change", (doc) => {
    handleChangeStreamEvents<Link>(this, collectionName, doc);
  });

  this.onStop(() => {
    changeStream.close();
  });

  this.ready();
});

My test repo: https://github.com/minhna/meteor-change-stream-publication
More on Meteor low level API: Publications and Data Loading | Meteor Guide

5 Likes

This is really cool - I looked into the same a while ago and found that mongo doesn’t do great when there are a lot of change streams registered (this was in version 3.2, so could easily have changed) - so it was generally OK if you had one change stream per collection, but one change stream per unique selector (or even with the same concept of “channel” that redis-oplog has) wasn’t viable.

Yeah, I’m not sure how it handle lots of connections. I think redis-oplog is better solution. But the cultofcoders:redis-oplog package is abandoned. That would be great if Meteor brings it to the core.

1 Like

@minhna Out of curiosity, how/why did you come up with the idea?

The idea of using MongoDB Change Stream has already come up in some discussions, but we would need to look into it very carefully. Thanks for sharing your repositories with us @minhna.
cc @renanccastro

Explore bringing Redis-oplog to the core.

This is an item that will be in our next roadmap update. So it’s something we’re really considering.

1 Like

@aureooms Just curiosity :slight_smile: tailing oplog by default gives you not a very good performance. It’s a reason people switch to other system because they thought it will has problem with gigantic system. Ofcouse most of systems are not and Meteor works very well. But we always think far ahead right? and sometime it’s over thinking. haha.

@minhna Any idea what would be the proper way to avoid missing events happening in between those operations? Maybe using startAtOperationTime (see also https://docs.mongodb.com/upcoming/changeStreams/#footnote-start-time). Could also use resumeAfter/startAfter to restart a broken stream.

@aureooms, handling that kind of issue is hard. I think the best way is once the connection was lost, the socket closed, then when it was restored and you fetch all the data again. I think that’s way meteor does right now. It sounds like ineffective but it’s simple and reliable.

Update on this. redis-oplog has been picked up by Community Packages. I’m getting oriented there right now, so please give me a month or two.

6 Likes

@minhna Ok I agree that the second point is just a question of resource usage and not correctness. But for the first point, I think you need proper logic, let me rephrase the question: Between the initializing find request and the watch request, a lot of things could happen to the database, i.e. you could miss change events. If you want to be correct you need a way to tell the watch request to start watching at the same database time as the one you are reading from in the find request (which is what implicitly happen when nothing happens to the database in between those requests). I see in the documentation that there seems to be a mechanism to do just that. I just wonder what would be the correct way to implement it.

The Change Streams docs I linked to says :

Starting in MongoDB 4.0, you can specify a startAtOperationTime to open the cursor at a particular point in time. If the specified starting point is in the past, it must be in the time range of the oplog.

So it is possible as long as the oplog contains sufficient information to start from that point in time.

1 Like

@aureooms in this example, you can move the watch() block code above the find() command. It may solve the issue. I’m just working with normal data then it doesn’t really matter if something happended in the windows of couple miliseconds

@storyteller awesome. It will be a huge boost. Thank you.

@minhna OK. I believe the generic solution would be to run the find request inside a clientSession with read level snapshot and some fixed operationTime, then run the watch request with the same startAtOperationTime. How the most recent relevant OperationTime can be computed is still to be discovered.

Note that there is a fork of redis-oplog that is more actively maintained. Although, as I understand it, it is not a direct replacement because of the changes done. We are currently using the original package but it is in my pipeline to explore this one

@ramez might give more light into this although this seems like hijacking this thread

2 Likes

Thankd @rjdavid for the mention

@storyteller

If you could start with my fork, it has at least a year or two of developments ahead of the cult-of-coders one. It implements the key element of edge caching which reduces db hits substantially and optimizes the application n-fold! We would be out of business if it wasn’t for this fork.

Also, the cult-of-coders one uses extra memory and has quite a few places where it is less than optimal. After doing my code review I couldn’t in good conscious use it in our applications.

Generally, it is a drop-in replacement. Happy to work on harmonizing it further.

We need to look to the future …

2 Likes

Even though it may be more performant, I find the lack of support for oplogtoredis (oplogtoredis support · Issue #17 · ramezrafla/redis-oplog · GitHub) an instant blocker for most of the apps I’ve worked with. We have way too many places with bulk updates that’d require a lot of extra work to notify Redis about.

This is an amazing fork (one I didn’t know existed and could be useful for a lot of stuff I work on), but I don’t think it should become the “default” - as your docs say, it looks excellent for read heavy workloads, but it seems like it would have fairly bad performance on write heavy workloads where the fields are large (particularly in the case where the application isn’t using specific channels for updates, I’ve not dug into it to determine if this is true or not) - I suspect if the behaviour of redis-oplog were to change in such a dramatic way, a lot of applications would see some unusual behaviour, and possibly a performance degradation.

Hey @radekmie,

The challenge is that oplogtoredis only shared changed fields, not their values. So you need an extra hit to the DB to get the values (which negates a big part of the value of this new redis-oplog). It’s easy to add, but I don’t use oplogtoredis so can’t test.

The right way is to augment oplogtoredis to share values too.

@znewsham

The only real place where you can optimize the original redis-oplog is when it comes to reading as it is predictable (if the data has not changed and you are reading it for the 2nd time … well …)

Write-heavy apps wouldn’t suffer greatly. In fact, they would not suffer at all in well-behaved cases. A case where I see an issue is with really large collections (and I mean REALLY large – to the point I would affix the label “bad design” – use files instead) that they drain redis CPU / memory.

However, you can disable redis-oplog at mutation time with option {pushToRedis:false} and you will get original performance as-is.

I’ll be looking into acquiring oplogtoredis for MCP soonish.

1 Like