Meteor Scaling - Redis Oplog [Status: Prod ready]

@imagio A bit off-topic, but sure. Here’s an involved reactive join-type example custom publication.

Say you have a reddit style message board and you want to publish all the Users including name and email who commented on a given Thread. Just for fun and to illustrate the flexibility and power, let’s also publish the count of each user’s Comments on that Thread. Let’s say each Comment contains the userId and the threadId. But a Thread document does not contain any knowledge of which Users commented or how many Comments the User created. So we’ll have to query to gather all that up.

If you used normal pub-sub, you would break reactivity because logically you’ll need to do a find on Comments passing a threadId, then you’d need to only get the unique userIds from those Comments, query the Users for their name and email, somehow tally up a comment-count, and then pass that all back via a reactive cursor. Bad news… none of that can be reactive. Hence your use of reactive-publish.

However, if we used observe on the Comments collection and break all this up using .added, .changed, and .removed, and think about it as Comments get added, updated, and removed, we can pull this off easily without any extra packages and have it be totally reactive. Anytime you can eliminate relying on a package, it’s usually more optimal/efficient.

The publication looks like this:

Meteor.publish("usersAndCommentCountsByThread", function(threadId) {
   check(threadId, Match.OneOf(String, null));
   
   let self = this;

   //  Check the thread for existence
   let thread = Threads.findOne(threadId, {fields: {_id: 1}});
   if(thread) {
      //  Create an object to track userIds that are already published and their comment counts on the thread
      //  NOTE: There's a lot of ways to do this part depending on what you want to do in you pub
      let usersAndCommentCounts = {};

      //  Query and observe comments since that is what will affect users and their comments count
      let commentsHandle = Comments.find({threadId: threadId}, {fields: {userId: 1}}).observe({
         //  This code fires any time a new comment is added that matches the above query INCLUDING
         //  any EXISTING comments that match this query
         added: function(addedComment) {
            //  Check if this comment's user is already in the users object, if not, add it
            if(!usersAndCommentCounts[addedComment.userId]) {
               //  If this is the user's first comment, add the user's id to object of users and set their 
               //  comment-count to one
               usersAndCommentCounts[addedComment.userId] = 1;
               //  Query the user getting their name and email
               let user = Users.findOne(addedComment.userId, {fields: {name: 1, email: 1}});
               //  Load the count onto the queried user
               user.commentCount = usersAndCommentCounts[addedComment.userId];
               //  Now publish the user and their comment-count.  Note that we're publishing the comment's
               //  userId since we're actually publishing users in this pub
               self.added("usersCommentCounts", addedComment.userId, user);
            }
            else {
               //  Else, simply increment this user's comment-count
               usersAndCommentCounts[addedComment.userId]++;

               //  Publish the changed comment-count
               let updatedFields = {commentCount: usersAndCommentCounts[addedComment.userId]};
               self.changed("usersCommentCounts", addedComment.userId, updatedFields);
            }
         },
         //  This code fires any time a comment is updated that matches the above query
         changed: function(changedComment, priorComment) {
            //  There's nothing to add here because if a comment is only changed/updated it doesn't affect
            //  the list of users or their comment-count.  So this whole block can be commented out.  I just left
            //  it in to demonstrate.  If we were also doing something like counting "total words" across 
            //  all comments per user, then we would want to use this to update a word-count if they changed 
            //  one of their comments.  If you were doing something here, it would look like this.
 
            //  Do your logic for updating your counts, etc. and create an object of updated fields
            //  let updatedFieldsObject = {wordCount: ...};

            //  Call this.changed with the existing document id and updated fields object
            //  this.changed("usersCommentCounts", changedComment.userId, updatedFieldsObject);
         },
         //  This code fires any time an existing comment is removed that matches the above query
         removed: function(removedComment) {
            //  Check if this user has any additional comments (could query actual DB here for comments by this
            //  User and do a count OR rely on our own tracked count)
            if(usersAndCommentCounts[removedComment.userId] === 1) {
               //  Since the user has no more comments on this thread, remove the user and
               //  comment-count from our object
               delete usersAndCommentCounts[removedComment.userId];

               //  Remove the User from the publication since they don't have any more comments on this thread
               //  Note that we're passing the userId of the comment since we're publishing users in this pub
               self.removed("usersCommentCounts", removedComment.userId);
            }
            else {
               //  Decrement the user's comment-count
               usersAndCommentCounts[removedComment.userId]--;
               
               //  Since the user's comment-count updated, but the user is still being published, we need to
               //  only update the publication with the new data
               let updatedFields = {commentCount: usersAndCommentCounts[removedComment.userId]};
               self.changed("usersCommentCounts", removedComment.userId, updatedFields);
            }
         }
      });

      //  Mark the subscription as ready
      self.ready();
      
      // Stop observing the cursor when client unsubscribes
      self.onStop(function () {
         commentsHandle.stop();
      });
   }
   else {
      self.ready();  // If no thread, mark publication as ready
   }
});

As I said, it’s a lot more code. But this is what is happening under the hood by Meteor when you don’t define your own .added, .changed, and .removed handlers. But when you do, you can see you have ultimate flexibility. You don’t have to do this with every publication, but when you need it, it’s a great tool.

The last thing you may be asking is, what the heck is that first argument "usersCommentCounts" that is passed into each publication function? That is simply the string name of the Collection that you’re publishing too. The cool thing about this is you can use any of your existing document Collections here. So if you have a Collection called Comments defined somewhere like Comments = new Meteor.Collection('comments') and you were doing a custom publication that was ultimately returning Comments, you can simply pass "comments" as this argument. I don’t know what the string name of Meteor.users is (only because I’ve never custom published to Meteor.users). But if you publish to an existing Collection like this, I don’t think you can mix your custom publication with another publication of the same document type. So, more often than not, you’ll just want to simply make a new client-only collection to hold your published documents, which is what I’m doing in the example above. So somewhere in client code you’ll simply have this:

Meteor.startup(function() {
   //  Create client-only collection to hold special custom publications
   UsersCommentCounts = new Mongo.Collection("usersCommentCounts");
});

Now you can publish directly to this client-only Collection and never have to worry about it mixing with one of your client-server mongo Collection publications. On the client, to subscribe you’ll just have:

Meteor.subscribe("usersAndCommentCountsByThread", threadId);

and then of course to find the documents you’ll have:

UsersCommentCounts.find();

With the above, you can publish all kinds of crazy stuff mixing DB documents/fields, custom counters, 3rd party API request data… all kinds of combinations. You might create a publication that doesn’t even publish any DB data, just stuff you generate on the fly based around DB document data.

Hopefully this opened your mind a little if you didn’t already know about this.

6 Likes

@evolross Thanks so much for the detailed reply! That is definitely helpful and I think I can apply that technique to some of my publications. I was already doing something similar to publish counts for badges on my navigation.

So that I don’t pollute this thread with off topic things I have started a new thread to discuss this further. I would appreciate it if you could take a look at it! Reactive joins without using any packages

1 Like

Any future plans integrating Redis 5.0 and use Redis Streams for redis-oplog? It seems like it’s gonna solve Meteor scaling issues even further…

Guys, look, we have an ‘events.files’ collection, that has thousands of inserts per second.
MongoDB is being updated by Node.JS. We are manually sending a change to the Redis from an insert callback. The documents all have the same taskId field, which is a Mongo object.

We tried to do it the following way and we see the event in a debug mode of redis-oplog.
But the collection is not reactive and re-subscription doesn’t happen on Meteor side🙁

redis.publish('events.files', JSON.stringify({
 e: 'i',
 d: {task: taskId}
}));

What are we doing wrong? Please heeeeeeeelp!

@martineboh is there any difference in their pub/sub api ?

@dimanjet the document does not look like that. It contains the _id of the document as well! Look how we do it: https://github.com/cult-of-coders/redis-oplog/blob/master/lib/mongo/lib/dispatchers.js#L78 Sometimes the source-code is your best documentation! :smiley: Hope you solve it.

Thank you! It makes things a lot clearer.

I wonder if it is possible to make bulk insert and publish all of them by one common field? We don’t know the _id here yet, because we have an array of several thousands elements. I don’t think would be a good solution to make publish for the every document.

We’re also in active process of trying tulip/oplogtoredis solution. This program tails the oplog of a Mongo server, and publishes changes to Redis. But change events are not seen back in Meteor yet. Will try that too.

No support for bulk elements. A thousand pushes to redis per second is nothing in my humble opinion :smiley: But if you like you can implement that yourself, I can offer you some guidelines, and do a PR.

Oplogtoredis is genious but is good if you have lots of sources updating mongodb.

Our production app regularly does a thousand pushes through redis-oplog a second and I can confirm that it’s nothing - especially for Redis. It barely makes a bump on performance charts on Compose.io’s Redis product. How Meteor handles that action… that depends on what those one thousand pushes are doing.

1 Like

Security
I am still trying to comprehend the security implications with Redis Oplog. This is the Security Specs for Mongo Atlas (if the Oplog is used with them): Mongo Atlas
Is there anyone experienced with TLS and encryption within Redis? Has anyone used Redis with something other than naked http?
Tx.

On AWS, we used a redis instance (elasticache) in the same private network as the EC2 instances in EBS, so we had no concerns for naked http (and it’s less CPU intensive). If you have multi-region servers, you can enable in the AWS UI secure connection (i.e. over ssh tunnel). I think it’s the standard way of doing things with redis as it does not come with built-in encryption (at least not the versions we are using)

You can, of course, do it all yourself, but that can turn costly and risky quickly if you don’t have the expertise in-house.

It’s not encryption but you can add IP white-listing to your Redis deployments. You make a good point though… will have to look into securing Redis more.

I’m just curious if SyntheticMutator is working for anyone who is using Meteor 1.8. None of the methods work for me as per this issue.

How is the compatibility with Astronomy? Does it work out of the box or do you need to do a lot of manual hookups?

It’s just standard pub/sub @jorgeer it should work without any change just like normal pub/sub (This is what we mean by full BC). Unless astronomy is doing some weird hacks to the internal live processing of Meteor.

@diaconutheodor I just remember reading this https://github.com/cult-of-coders/redis-oplog/issues/235, which is closed but I wasn’t sure if it was solved or if there are outstanding problems that are not resolvable because of the way astronomy uses the collections.

I believe it’s solved in here: https://github.com/cult-of-coders/redis-oplog/pull/283 many people use this one. I have to roll it out as redis oplog v2, but didn’t get the chance. Please check it out.

After adding redis-oplog package, insert/update/remove callbacks in methods stopped working. Now I can only make it in a sync way like

let id = Collection.insert()

not

Collection.insert({}, (err, _id)=> {})

as it was before. May be you could help us please, if this ia a normal thing to happen?

@diaconutheodor, When I run my mongodb update scripts from mongo command line or ROBO Mongo, the updated db collections are not getting reflected in redis-oplog immediatley. What is the best solution for this ?
Is there any way to clear redis-oplog cache ?

1 Like

That’s because redis-oplog running inside of Meteor doesn’t know about any of the external updates you may be doing.

Please see the the section Outside Mutations of the redis-oplog docs for help about how to deal with outside mutations.

4 Likes

@evolross, Thanks,
@diaconutheodor
Actually I have a bulk update service where in which I need to push the bulk update at once to redis for that I configured for one collection pushtoRedis to false using ConfigureRedis oplog in main.js
Like this
Collection.configureRedisOplog({ mutation(options) { options.pushToRedis = false; } })
Now I am trying to push all updates to redis at the end
When I refer to outside mutations document
redis.publish('tasks', JSON.stringify({ [RedisPipe.EVENT]: Events.UPDATE, [RedisPipe.DOC]: {_id: taskId}, [RedisPipe.FIELDS]: ['status'] }))

getRedisPusher.publish('tasks', EJSON.stringify({ [RedisPipe.DOC]: {_id: taskId}, [RedisPipe.EVENT]: Events.UPDATE, [RedisPipe.FIELDS]: ['status'] });

I see the above two ways of doing it. What way I should follow and how do they differ ?
and in the first way of doing it how and from where do I import “redis” ??