Edit: This technique resulted in a memory leak that I was unable to track down. I’ve decided to remove all reactive publication from my project. It seems to be a bad idea in general that doesn’t play well with Meteor.
This thread is to continue a discussion I was having with @evolross over on the redis oplog thread. He posted the following over there:
Say you have a reddit style message board and you want to publish all the
Users
includingname
andThread
. Just for fun and to illustrate the flexibility and power, let’s also publish thecount
of each user’sComments
on thatThread
. Let’s say eachComment
contains theuserId
and thethreadId
. But aThread
document does not contain any knowledge of whichUsers
commented or how manyComments
theUser
created. So we’ll have to query to gather all that up.If you used normal pub-sub, you would break reactivity because logically you’ll need to do a
find
onComments
passing athreadId
, then you’d need to only get the uniqueuserIds
from thoseComments
, query theUsers
for theirname
andreactive-publish
.However, if we used
observe
on theComments
collection and break all this up using.added
,.changed
, and.removed
, and think about it asComments
get added, updated, and removed, we can pull this off easily without any extra packages and have it be totally reactive. Anytime you can eliminate relying on a package, it’s usually more optimal/efficient.The publication looks like this:
Meteor.publish("usersAndCommentCountsByThread", function(threadId) { check(threadId, Match.OneOf(String, null)); let self = this; // Check the thread for existence let thread = Threads.findOne(threadId, {fields: {_id: 1}}); if(thread) { // Create an object to track userIds that are already published and their comment counts on the thread // NOTE: There's a lot of ways to do this part depending on what you want to do in you pub let usersAndCommentCounts = {}; // Query and observe comments since that is what will affect users and their comments count let commentsHandle = Comments.find({threadId: threadId}, {fields: {userId: 1}}).observe({ // This code fires any time a new comment is added that matches the above query INCLUDING // any EXISTING comments that match this query added: function(addedComment) { // Check if this comment's user is already in the users object, if not, add it if(!usersAndCommentCounts[addedComment.userId]) { // If this is the user's first comment, add the user's id to object of users and set their // comment-count to one usersAndCommentCounts[addedComment.userId] = 1; // Query the user getting their name and email let user = Users.findOne(addedComment.userId, {fields: {name: 1, email: 1}}); // Load the count onto the queried user user.commentCount = usersAndCommentCounts[addedComment.userId]; // Now publish the user and their comment-count. Note that we're publishing the comment's // userId since we're actually publishing users in this pub self.added("usersCommentCounts", addedComment.userId, user); } else { // Else, simply increment this user's comment-count usersAndCommentCounts[addedComment.userId]++; // Publish the changed comment-count let updatedFields = {commentCount: usersAndCommentCounts[addedComment.userId]}; self.changed("usersCommentCounts", addedComment.userId, updatedFields); } }, // This code fires any time a comment is updated that matches the above query changed: function(changedComment, priorComment) { // There's nothing to add here because if a comment is only changed/updated it doesn't affect // the list of users or their comment-count. So this whole block can be commented out. I just left // it in to demonstrate. If we were also doing something like counting "total words" across // all comments per user, then we would want to use this to update a word-count if they changed // one of their comments. If you were doing something here, it would look like this. // Do your logic for updating your counts, etc. and create an object of updated fields // let updatedFieldsObject = {wordCount: ...}; // Call this.changed with the existing document id and updated fields object // this.changed("usersCommentCounts", changedComment.userId, updatedFieldsObject); }, // This code fires any time an existing comment is removed that matches the above query removed: function(removedComment) { // Check if this user has any additional comments (could query actual DB here for comments by this // User and do a count OR rely on our own tracked count) if(usersAndCommentCounts[removedComment.userId] === 1) { // Since the user has no more comments on this thread, remove the user and // comment-count from our object delete usersAndCommentCounts[removedComment.userId]; // Remove the User from the publication since they don't have any more comments on this thread // Note that we're passing the userId of the comment since we're publishing users in this pub self.removed("usersCommentCounts", removedComment.userId); } else { // Decrement the user's comment-count usersAndCommentCounts[removedComment.userId]--; // Since the user's comment-count updated, but the user is still being published, we need to // only update the publication with the new data let updatedFields = {commentCount: usersAndCommentCounts[removedComment.userId]}; self.changed("usersCommentCounts", removedComment.userId, updatedFields); } } }); // Mark the subscription as ready self.ready(); // Stop observing the cursor when client unsubscribes self.onStop(function () { commentsHandle.stop(); }); } else { self.ready(); // If no thread, mark publication as ready } });
As I said, it’s a lot more code. But this is what is happening under the hood by Meteor when you don’t define your own
.added
,.changed
, and.removed
handlers. But when you do, you can see you have ultimate flexibility. You don’t have to do this with every publication, but when you need it, it’s a great tool.The last thing you may be asking is, what the heck is that first argument
"usersCommentCounts"
that is passed into each publication function? That is simply the string name of the Collection that you’re publishing too. The cool thing about this is you can use any of your existing document Collections here. So if you have a Collection calledComments
defined somewhere likeComments = new Meteor.Collection('comments')
and you were doing a custom publication that was ultimately returningComments
, you can simply pass"comments"
as this argument. I don’t know what the string name ofMeteor.users
is (only because I’ve never custom published toMeteor.users
). But if you publish to an existing Collection like this, I don’t think you can mix your custom publication with another publication of the same document type. So , more often than not, you’ll just want to simply make a new client-only collection to hold your published documents, which is what I’m doing in the example above. So somewhere in client code you’ll simply have this:Meteor.startup(function() { // Create client-only collection to hold special custom publications UsersCommentCounts = new Mongo.Collection("usersCommentCounts"); });
Now you can publish directly to this client-only Collection and never have to worry about it mixing with one of your client-server mongo Collection publications. On the client, to subscribe you’ll just have:
Meteor.subscribe("usersAndCommentCountsByThread", threadId);
and then of course to find the documents you’ll have:
UsersCommentCounts.find();
With the above, you can publish all kinds of crazy stuff mixing DB documents/fields, custom counters, 3rd party API request data… all kinds of combinations. You might create a publication that doesn’t even publish any DB data, just stuff you generate on the fly based around DB document data.
Hopefully this opened your mind a little if you didn’t already know about this.
I can see how that technique works perfectly for some types of publications. I am going to apply it to some of mine in an effort to get rid of reactive publish so I have more insight into the performance of my publications.
There is a case I haven’t been able to figure out a performant solution for however. In your example your publication is reactive to changes in comments. What if you also need it to be reactive to changes in the users you are publishing?
The specific use case here is publishing a set of user documents based on the contents of a friendships collection. Users need to be added to and removed from the publication when friendships are added/removed/modified but the users returned also need to live update when they change. The only way I can think of to do this is to add an extra “inner” observe for each user object.
For example, this publication would do the trick but I highly doubt it would perform well, doing a separate query for each user.
const userId = this.userId
const thisPublication = this
let observedFriends: { [userId: string]: Meteor.LiveQueryHandle } = {}
const observeFriend = function (friendId: string) {
return Meteor.users.find({ _id: friendId }, { fields: otherUserFields }).observe({
//call thisPublication.added/removed/changed to update the client
})
}
let friendsCursor = Friendships.find({
$or:
[{ sendingUser: userId }, { receivingUser: userId }],
accepted: true,
type: FRIENDSHIP_TYPE.FRIEND
}, { fields: { sendingUser: 1, receivingUser: 1 } }).observe({
added: function (f: Friendship) {
let friendId = f.sendingUser == userId ? f.receivingUser : f.sendingUser
if (!observedFriends[friendId]) {
observedFriends[friendId] = observeFriend(friendId)
}
},
removed: function (f: Friendship) {
let friendId = f.sendingUser == userId ? f.receivingUser : f.sendingUser
if (observedFriends[friendId]) {
observedFriends[friendId].stop()
delete observedFriends[friendId]
}
}
})
this.onStop(function () {
friendsCursor.stop()
for (const userId of Object.keys(observedFriends)) {
observedFriends[userId].stop()
delete observedFriends[userId]
}
})
What I haven’t been able to figure out is how I could reactively build a list of userIds so I could observe only one query for all the users instead of observing each one individually. Thoughts?