Reactive joins without using any packages


#1

Edit: This technique resulted in a memory leak that I was unable to track down. I’ve decided to remove all reactive publication from my project. It seems to be a bad idea in general that doesn’t play well with Meteor.

This thread is to continue a discussion I was having with @evolross over on the redis oplog thread. He posted the following over there:

Say you have a reddit style message board and you want to publish all the Users including name and email who commented on a given Thread . Just for fun and to illustrate the flexibility and power, let’s also publish the count of each user’s Comments on that Thread . Let’s say each Comment contains the userId and the threadId . But a Thread document does not contain any knowledge of which Users commented or how many Comments the User created. So we’ll have to query to gather all that up.

If you used normal pub-sub, you would break reactivity because logically you’ll need to do a find on Comments passing a threadId , then you’d need to only get the unique userIds from those Comments , query the Users for their name and email , somehow tally up a comment-count, and then pass that all back via a reactive cursor. Bad news… none of that can be reactive. Hence your use of reactive-publish .

However, if we used observe on the Comments collection and break all this up using .added , .changed , and .removed , and think about it as Comments get added, updated, and removed, we can pull this off easily without any extra packages and have it be totally reactive. Anytime you can eliminate relying on a package, it’s usually more optimal/efficient.

The publication looks like this:

Meteor.publish("usersAndCommentCountsByThread", function(threadId) {
   check(threadId, Match.OneOf(String, null));
   
   let self = this;

   //  Check the thread for existence
   let thread = Threads.findOne(threadId, {fields: {_id: 1}});
   if(thread) {
      //  Create an object to track userIds that are already published and their comment counts on the thread
      //  NOTE: There's a lot of ways to do this part depending on what you want to do in you pub
      let usersAndCommentCounts = {};

      //  Query and observe comments since that is what will affect users and their comments count
      let commentsHandle = Comments.find({threadId: threadId}, {fields: {userId: 1}}).observe({
         //  This code fires any time a new comment is added that matches the above query INCLUDING
         //  any EXISTING comments that match this query
         added: function(addedComment) {
            //  Check if this comment's user is already in the users object, if not, add it
            if(!usersAndCommentCounts[addedComment.userId]) {
               //  If this is the user's first comment, add the user's id to object of users and set their 
               //  comment-count to one
               usersAndCommentCounts[addedComment.userId] = 1;
               //  Query the user getting their name and email
               let user = Users.findOne(addedComment.userId, {fields: {name: 1, email: 1}});
               //  Load the count onto the queried user
               user.commentCount = usersAndCommentCounts[addedComment.userId];
               //  Now publish the user and their comment-count.  Note that we're publishing the comment's
               //  userId since we're actually publishing users in this pub
               self.added("usersCommentCounts", addedComment.userId, user);
            }
            else {
               //  Else, simply increment this user's comment-count
               usersAndCommentCounts[addedComment.userId]++;

               //  Publish the changed comment-count
               let updatedFields = {commentCount: usersAndCommentCounts[addedComment.userId]};
               self.changed("usersCommentCounts", addedComment.userId, updatedFields);
            }
         },
         //  This code fires any time a comment is updated that matches the above query
         changed: function(changedComment, priorComment) {
            //  There's nothing to add here because if a comment is only changed/updated it doesn't affect
            //  the list of users or their comment-count.  So this whole block can be commented out.  I just left
            //  it in to demonstrate.  If we were also doing something like counting "total words" across 
            //  all comments per user, then we would want to use this to update a word-count if they changed 
            //  one of their comments.  If you were doing something here, it would look like this.
 
            //  Do your logic for updating your counts, etc. and create an object of updated fields
            //  let updatedFieldsObject = {wordCount: ...};

            //  Call this.changed with the existing document id and updated fields object
            //  this.changed("usersCommentCounts", changedComment.userId, updatedFieldsObject);
         },
         //  This code fires any time an existing comment is removed that matches the above query
         removed: function(removedComment) {
            //  Check if this user has any additional comments (could query actual DB here for comments by this
            //  User and do a count OR rely on our own tracked count)
            if(usersAndCommentCounts[removedComment.userId] === 1) {
               //  Since the user has no more comments on this thread, remove the user and
               //  comment-count from our object
               delete usersAndCommentCounts[removedComment.userId];

               //  Remove the User from the publication since they don't have any more comments on this thread
               //  Note that we're passing the userId of the comment since we're publishing users in this pub
               self.removed("usersCommentCounts", removedComment.userId);
            }
            else {
               //  Decrement the user's comment-count
               usersAndCommentCounts[removedComment.userId]--;
               
               //  Since the user's comment-count updated, but the user is still being published, we need to
               //  only update the publication with the new data
               let updatedFields = {commentCount: usersAndCommentCounts[removedComment.userId]};
               self.changed("usersCommentCounts", removedComment.userId, updatedFields);
            }
         }
      });

      //  Mark the subscription as ready
      self.ready();
      
      // Stop observing the cursor when client unsubscribes
      self.onStop(function () {
         commentsHandle.stop();
      });
   }
   else {
      self.ready();  // If no thread, mark publication as ready
   }
});

As I said, it’s a lot more code. But this is what is happening under the hood by Meteor when you don’t define your own .added , .changed , and .removed handlers. But when you do, you can see you have ultimate flexibility. You don’t have to do this with every publication, but when you need it, it’s a great tool.

The last thing you may be asking is, what the heck is that first argument "usersCommentCounts" that is passed into each publication function? That is simply the string name of the Collection that you’re publishing too. The cool thing about this is you can use any of your existing document Collections here. So if you have a Collection called Comments defined somewhere like Comments = new Meteor.Collection('comments') and you were doing a custom publication that was ultimately returning Comments , you can simply pass "comments" as this argument. I don’t know what the string name of Meteor.users is (only because I’ve never custom published to Meteor.users ). But if you publish to an existing Collection like this, I don’t think you can mix your custom publication with another publication of the same document type. So , more often than not, you’ll just want to simply make a new client-only collection to hold your published documents, which is what I’m doing in the example above. So somewhere in client code you’ll simply have this:

Meteor.startup(function() {
   //  Create client-only collection to hold special custom publications
   UsersCommentCounts = new Mongo.Collection("usersCommentCounts");
});

Now you can publish directly to this client-only Collection and never have to worry about it mixing with one of your client-server mongo Collection publications. On the client, to subscribe you’ll just have:

Meteor.subscribe("usersAndCommentCountsByThread", threadId);

and then of course to find the documents you’ll have:

UsersCommentCounts.find();

With the above, you can publish all kinds of crazy stuff mixing DB documents/fields, custom counters, 3rd party API request data… all kinds of combinations. You might create a publication that doesn’t even publish any DB data, just stuff you generate on the fly based around DB document data.

Hopefully this opened your mind a little if you didn’t already know about this.

I can see how that technique works perfectly for some types of publications. I am going to apply it to some of mine in an effort to get rid of reactive publish so I have more insight into the performance of my publications.

There is a case I haven’t been able to figure out a performant solution for however. In your example your publication is reactive to changes in comments. What if you also need it to be reactive to changes in the users you are publishing?

The specific use case here is publishing a set of user documents based on the contents of a friendships collection. Users need to be added to and removed from the publication when friendships are added/removed/modified but the users returned also need to live update when they change. The only way I can think of to do this is to add an extra “inner” observe for each user object.

For example, this publication would do the trick but I highly doubt it would perform well, doing a separate query for each user.

const userId = this.userId
const thisPublication = this

let observedFriends: { [userId: string]: Meteor.LiveQueryHandle } = {}
const observeFriend = function (friendId: string) {
	return Meteor.users.find({ _id: friendId }, { fields: otherUserFields }).observe({
		//call thisPublication.added/removed/changed to update the client
	})
}
let friendsCursor = Friendships.find({
	$or:
		[{ sendingUser: userId }, { receivingUser: userId }],
	accepted: true,
	type: FRIENDSHIP_TYPE.FRIEND
}, { fields: { sendingUser: 1, receivingUser: 1 } }).observe({
	added: function (f: Friendship) { 
		let friendId = f.sendingUser == userId ? f.receivingUser : f.sendingUser
		if (!observedFriends[friendId]) {
			observedFriends[friendId] = observeFriend(friendId)
		}
	},
	
	removed: function (f: Friendship) { 
		let friendId = f.sendingUser == userId ? f.receivingUser : f.sendingUser
		if (observedFriends[friendId]) {
			observedFriends[friendId].stop()
			delete observedFriends[friendId]
		}
	}
})

this.onStop(function () {
	friendsCursor.stop()
	for (const userId of Object.keys(observedFriends)) {
		observedFriends[userId].stop()
		delete observedFriends[userId]
	}
})

What I haven’t been able to figure out is how I could reactively build a list of userIds so I could observe only one query for all the users instead of observing each one individually. Thoughts?


Meteor Scaling - Redis Oplog [Status: Prod ready]
#2

So I have devised an initial solution. It definitely needs more work but it seems to be OK for 1-1 reactive publications without using any packages or messing with meteor internals.


/*
* A generalized reactive publish function that doesn't use meteor internal hacks like server autorun and reactive-publish
*/
function createReactivePublisher<T,U>(
	collectionName: string,
	pub: SubscriptionMember,
	collection1: Mongo.Collection<T>,
	collection1Filter: Mongo.Query<T>,
	collection1Fields: {[P in keyof T]: number},
	getCollection2IdFromCollection1: (doc: T) => string,
	collection2: Mongo.Collection<U>,
	collection2Filter: Mongo.Query<U>,
	collection2Fields: {[R in keyof U]: number},
) {
	const userId = pub.userId
	let initializing = true
	let cursor2InitialIds: string[] = []
	let cursor2RemovedIds: string[] = []
	let handles: { [cursor2Id: string]: Meteor.LiveQueryHandle } = {}

	const observeChangesHandlers = {
		added: function (id: string, doc: U) {
			pub.added(collectionName, id, doc)
		},
		changed: function (id: string, doc: Partial<U>) {
			pub.changed(collectionName, id, doc)
		},
		removed: function (id: string) {
			pub.removed(collectionName, id)
		}
	}

	// Add an observer on a single id. Used after the initial run to monitor any docs added to the publication.
	const addObserver = function (id: string) {
		return collection2.find(Object.assign({ _id: id }, collection2Filter), { fields: collection2Fields }).observeChanges(observeChangesHandlers)	
	}

	let col1Handle = collection1.find(collection1Filter, {fields: collection1Fields}).observe({
		added: function (doc: T) { 
			let col2Id = getCollection2IdFromCollection1(doc)

			if (initializing) {
				// While initializing just get all the ids so we can watch them in one initial observer
				cursor2InitialIds.push(col2Id)
			} else {
				// After initialization add a new handler for each new doc that comes in
				if (cursor2InitialIds.indexOf(col2Id) == -1 && !handles[col2Id]) {
					handles[col2Id] = addObserver(col2Id)
				}
			}
			
		},
		
		removed: function (doc: T) {
			// When a doc is removed stop its observation handler
			let col2Id = getCollection2IdFromCollection1(doc)
			if (handles[col2Id]) {
				handles[col2Id].stop()
				delete handles[col2Id]
			}

			// We keep track of which docs are removed after the initial run so the initial run observer
			// can ignore changes to them
			if (cursor2InitialIds.indexOf(col2Id) > -1) {
				cursor2RemovedIds.push(col2Id)
			}
			pub.removed(collectionName, col2Id)
		}

	})



	// The initial observeChanges needs special casing to ignore docs that are subsequently removed
	// This prevents updates about removed documents going to the client because we don't alter the
	// initial observer after we make it
	const initialObserveChangesHandlers = {
		added: function (id: string, doc: U) {
			if (cursor2RemovedIds.indexOf(id) == -1) {
				pub.added(collectionName, id, doc)
			}
		},
		changed: function (id: string, doc: Partial<U>) {
			if (cursor2RemovedIds.indexOf(id) == -1) {
				pub.changed(collectionName, id, doc)
			}
		},
		removed: function (id: string) {
			if (cursor2RemovedIds.indexOf(id) == -1) {
				pub.removed(collectionName, id)
			}
		}
	}

	// Observe all of the _ids we just got from the initial observer run
	let initialObserver = collection2.find(Object.assign({ _id: { $in: cursor2InitialIds } }, collection2Filter), {fields: collection2Fields}).observeChanges(initialObserveChangesHandlers)
	initializing = false
	pub.ready()

	// When the publication stops make sure to stop all of the observers
	pub.onStop(function () {
		col1Handle.stop()
		initialObserver.stop()
		for (const id of Object.keys(handles)) {
			handles[id].stop()
			delete handles[id]
		}
	})

}

#3

It seems like if you needed to observe two different collections (e.g. friendships changing and users changing) you would just have to queries each with their own observe and added, changed, removed functions.

Then each one could call its own self.added, self.changed, and self.removed on the same publication of users. I think that’s fair game, though I’ve never done it.


#4

To answer the original question “why manual reactivity can be better than using the reactive-publish” package. Notice the “can be” here …

In our case, we have pubs that depend on changes in another collection. Say a user has access to new content, you need to publish that content. If a user loses access, you need to remove that content from the pub. Sometimes it gets more complex, we have dependence on yet an another collection.

What we found is that reactive-publish is constantly stopping and creating new observers (@mitar can you confirm I am right). If you want fine-tuned reactivity you want to minimize stopping and starting new observers especially when there is a lot of interdependence.

What we just did was we add observers for NEWLY-added documents. In other words, when a user has access to new content, we start a new observer and keep the old one running.

If a user loses access to content, we keep the same observers, but stop pushing (i.e. this.stop and prevent this.changed)

We found our servers behave better, at the expense of a lot more code and more testing. Often, we get warnings from AWS that we have CPU spikes, we can relate direcly to using reactive-pubs package.

Don’t take me wrong, when starting out, you should minimize code-writing and reactive-pubs is great for that. Especially as we learn what users want from our app. But we are now at the point of maturity we can ill afford cpu spikes as more users log in at the same time (we sell to schools, so students log-in in batches) and start using our app.

EDIT: This all relates to controlling your reactivity. People complained that Blaze was re-computing all the time. It mostly has to do with ill-designed trackers. We need to know what we are monitoring for reactivity (e.g. don’t use collection.find in your template helpers without specifying fields).


#5

That’s exactly the technique I am trying to use in the code above. I add an initial observer on first run then never stop it. I add a new observer for docs that get added subsequently and I keep a list of docs to ignore for those that get removed from the initial observer.


#6

@ramez Will you share any other experiences or tips you have from scaling Meteor? How many clients are you able to run per instance (cpu core)? How many subscriptions do you run per client? Do you use Apollo or other data loading methods to avoid reactive data and reduce load?

We have just smacked headlong into a wall of bad performance as we have added more users. We pretty much ignored writing efficient code in the pursuit of rapid iteration and now it’s coming back to bite us. We’re running on a kubernetes cluster so scaling up and down is really easy but our app is so resource hungry and slow that our cost per customer is through the roof. We can only run ~40-60 sessions stably per instance at the moment with each instance having 1vcpu and 2gb ram.


#7

Our app has hundreds of connections, like 500+, on a single Galaxy Double container (2 vCPUs). Some tips for you:

  • Are you using a CDN? Especially to deliver your bundle? This is a must-have optimization that you must do in production.
  • Do you have indexes on Mongo Collections?
  • Are you over-using pub-sub when you could just deliver unchanging data via Meteor Methods? This is a big one too. pub-sub has a lot of overhead if you don’t need it. We stripped out a lot of pub-sub and went with Meteor Methods and redis-oplog Vent for very specific reactive updates.
  • redis-oplog of course for pub-sub
  • Caching? Data caching helped a lot in our use-case. When a lot of users need the same data. Don’t go back to the database if you don’t have to.

#8

Agreed on all - except we stopped using CDN for the bundle as we don’t use the web app (mobile and our own implementation of NWJS both come with bundle within) – and it failed with proxy / firewall for some clients

Also, look into using ServiceWorkers if you are a webapp: https://github.com/NitroBAY/meteor-service-worker

Our stats are simple at the moment, around 500 users per T3 instance on EBS. Kadira doesn’t show good stats for observes when using redis-oplog. Delays for methods and subs are in the tens of ms range. All good stats


#9
  • I am using a CDN
  • I’ve got indexes everywhere I need them
  • I am likely overusing pub-sub. All data in my app is via meteor publications. There are probably a few places I could remove it but unfortunately almost all of my app needs to be reactive. It would be a big refactoring task to replace some of that with something like redis vent. That may be the eventual solution however. My client side is mobx and react so I can really use any data transport I feel like as my client doesn’t heavily depend on minimongo.
  • I’m using redis-oplog
  • I don’t do any sort of special caching, but there isn’t a lot of shared data between my users.
  • I have a service worker for web push and client side caching of assets. After first load almost all of the assets are served from the worker. With some more effort I could probably have working offline support =)

I’m thinking that there must be some issues with publications that I haven’t figured out yet. I’m working on completely removing reactive-publish from the project. I’m replacing all the reactive publishers with the technique I outlined above. Hopefully that will net some performance improvements.


#10

We are having issues with manual reactive pubs with redis-oplog where the data is not properly sent down in production – but works well in development (See https://github.com/cult-of-coders/redis-oplog/issues/299)

Anybody facing or has faced this? We have tried everything (We are also seeing sporadic effects with peerlibrary:reactive-publish, but not as bad as with manually reactive pubs).


#11

I haven’t seen anything like that with redis-oplog but I also haven’t had it in production for very long.

I rewrote all of my reactive publications and got reactive-publish out of my project. So far there has been a huge reduction in CPU usage and things seem to be faster than ever for my users!

This has lead me tot another issue however. I think I could probably support ~150+ users per instance now, but I can’t do that due to the initial start up spike. When I get 60+ users connecting to an instance I get a massive CPU spike as all of the observers are set up. This leads to the instance becoming non-responsive and being killed by my health checkers. It seems that when an instance gets into this 100% cpu state it becomes stuck there. If there are less people connecting the cpu will spike but will quickly drop after the initial observers are created. @ramez @evolross have you dealt with initial connection spikes like this? What techniques did you use to get around the problem?


#12

When you say instance - how big are you talking? As running on a micro or compact won’t have the CPU to handle much of anything. We get 500+ connections on a Galaxy Double container - that’s 2GB RAM and 2 ECUs. Additionally, on that particular component of the app we have absolutely zero pub/sub (aside from Meteor’s internal pub/sub on version, HCR, etc.). Everything is Meteor Methods and redis-oplog Vent.

And you should know, getting to that level of performance took TWO major rewrites of that part of the app - which took months. So a lot of time and energy. You can’t really expect to get blazing performance without putting in some work/energy on a detail-precision level of how your particular app works and what its bottlenecks are. Overall quick optimizations only get you so far.


#13

An instance in my case is 1vcpu and 2gb of ram in my kubernetes cluster on google kubernetes engine. I was mostly wondering of the initial cpu spike is a normal thing for meteor publications or if I might be doing something wrong there.


#14

I think I remember reading about the spikes a few years ago. A lot of people encountered this when restarting containers or when a hot-code reload happens because, like you say, every users’ subscriptions have to be re-set up all at the same time.

I’m not sure what the consensus was on the solution because we never had to deal with that issue. Search around the forums. There’s probably some useful threads.


#15

Another optimization I just thought of… anywhere you do a query, especially in publications, always limit fields to only what is needed. You mentioned coding quickly and this is typically something people ignore when working fast. Are you doing that?

If you use Galaxy, use Meteor APM to see if you can determine where your bottlenecks are. If not using Galaxy, fire up your own Kadira/Meteor APM (search around for tutorials).


#16

Yes I was sure to limit fields only to what is necessary. That is one of the things I discovered I had missed a few of when I enabled redis-oplog debug mode. I’m using a self hosted version of Kadira, but it is a little less useful when using redis-oplog.


#17

I have given up on this solution and simply refactored to remove reactive publication from my app. The technique I laid out resulted in a memory leak that I wasn’t able to track down. After a lot of fighting with it I’ve come to the conclusion that reactive publications are just generally a bad idea in Meteor, at least for more complex apps.


#18

@imagio, we had the exact opposite effect – pure bliss! Once it works, man does it work well. Less CPU, faster response, better control.

Why don’t you get some consulting time with @diaconutheodor to solve your problem? In an hour our app was blazing fast.


#19

Removing reactive publishing wasn’t all that tough for my app and it makes things a lot more predictable performance wise. I’m probably just going to start removing meteor publications in places where they aren’t strictly necessary and replace them with Apollo or just a meteor method. Publications are just too expensive performance wise for some of the data heavy parts of my app. My client is all reactive because I use mobx instead of depending on tracker/minimongo which gives me complete freedom to use any data transport.

For reference if others want to see the (flawed) solution I came up with and perhaps improve on it here is the code for my createReactivePublisher function

import { SubscriptionMember, Mongo, Meteor } from "meteor/accounts-base";
import logger from "../../universal/lib/logging";
import _ from "underscore";

interface QueryOptions<T>{
    sort?: Mongo.SortSpecifier;
    skip?: number;
    limit?: number;
    fields?: { [P in keyof T]?: number }
    reactive?: boolean;
    transform?: Function;
}

/*
* A generalized reactive publish function that doesn't use meteor internal hacks like server autorun and reactive-publish
* 
*/
export function createReactivePublisher<T,U>(opts: {
	collectionName: string,
	pub: SubscriptionMember,
	collection1: Mongo.Collection<T>,
	collection1Filter: Mongo.Query<T>,
	collection1Options: QueryOptions<T>,
	getCollection2IdsFromCollection1: (doc: T) => string[],
	collection2: Mongo.Collection<U>,
	getCollection2Filter?: (ids: string[]) => Mongo.Query<U>,
    collection2Options: QueryOptions<U>,
    debug?: boolean
}) {
	const { collectionName, pub, collection1, collection1Filter, collection1Options, debug,
		getCollection2IdsFromCollection1, collection2, collection2Options, getCollection2Filter } = opts
	
	const userId = pub.userId
	const l = logger.child({ module: "ReactivePublish", userId, collectionName })
	l.info("Starting...")
	let initializing = true

	//On first run we capture all of the related ids and observe them with a single $in query
	let cursor2InitialIds: string[] = []

	//If a doc is subsequently removed we note it here and then ignore any updates to it that come
	//through the initial observer. This prevents us from having to stop and restart that observer.
	let cursor2RemovedIds: string[] = []

	//If related documents come in after the initial query we save handles to them here
	//so they can be stopped if they are removed or the publication stops
	let handles: { [cursor2Id: string]: Meteor.LiveQueryHandle } = {}

	// Handle the added/changed/removed of the dependent collection. 
	// This is literally the same implementation as meteor's _publishCursor that is used
	// if you return a normal cursor from a publish function
	// https://github.com/meteor/meteor/blob/3051150f2f5ae953f391802e73682fba613b3d46/packages/mongo/collection.js#L364
	const observeChangesHandlers = {
		added: function THReactiveSecondObserverChangeHandlerAdded(id: string, doc: U) {
			pub.added(collectionName, id, doc)
		},
		changed: function THReactiveSecondObserverChangeHandlerChanged(id: string, doc: Partial<U>) {
			pub.changed(collectionName, id, doc)
		},
		removed: function THReactiveSecondObserverChangeHandlerRemoved(id: string) {
			pub.removed(collectionName, id)
		}
	}

	// Add an observer on a single id. Used after the initial run to monitor any docs added to the publication.
    const addObserver = function THReactiveAddObserver(id: string) {
        if (debug) { l.info({ id }, "Added collection2 observer after initial run") }
        let selector: any
        if (getCollection2Filter) {
            selector = getCollection2Filter([id])
        } else {
            selector = {_id: id}
        }
		return collection2.find(selector, collection2Options).observeChanges(observeChangesHandlers)	
	}


	// When some change to the first collection causes a document of the dependent collection to be removed.
	const handleRelatedRemoval = function THReactiveHandleRelatedRemoval(col2Ids: [string]) {
		for (const col2Id of col2Ids) {
			if (handles[col2Id]) {
				// If a doc was published previously after the initial run, stop that observer now
				handles[col2Id].stop()
				delete handles[col2Id]
			}
	
			// We keep track of which docs are removed after the initial run so the initial run observer
			// can ignore changes to them. For perf we don't want to stop and restart the initial observer
			// that likely contains a lot of docs.
			if (cursor2InitialIds.indexOf(col2Id) > -1) {
				cursor2RemovedIds.push(col2Id)
			}
	
			//This actually removes the document from the publication
			pub.removed(collectionName, col2Id)
		}
		
	}

	const handleRelatedAdd = function THReactiveHandleRelatedAdd(col2Ids: [string]) {
		for (const col2Id of col2Ids) {
			if (initializing) {
				// While initializing just keep track of all the ids so we can watch them in one initial observer using $in
				// This works because added is always synchronous on first run, it will be called for each doc the query returns
				// the first time it is run.
				cursor2InitialIds.push(col2Id)
			} else {
				// After initialization add a new handler for each new doc that comes in if it isn't already published
				if ((cursor2InitialIds.indexOf(col2Id) == -1 || cursor2RemovedIds.indexOf(col2Id) > -1) && !handles[col2Id]) {
					handles[col2Id] = addObserver(col2Id)
				}
			}
		}
		
	}
	
	// Observe the first collection. This is where we figure out what ids from the dependent collection we 
	// want to publish. It also keeps track of future changes to the query to add/remove dependent docs
	// as needed.
	let col1Handle = collection1.find(collection1Filter, collection1Options).observe({
		added: function THReactiveFirstObserveAdded(doc: T) { 
			//This is synchronous on first run -- run for each doc the query returns
			let col2Ids = getCollection2IdsFromCollection1(doc)
			handleRelatedAdd(col2Ids)		
		},

		changed: function THReactiveFirstObserveChanged(newDoc: T, oldDoc: T) {
			let oldCol2Ids = getCollection2IdsFromCollection1(oldDoc)
			let newCol2Ids = getCollection2IdsFromCollection1(newDoc)

			//When a doc's related id changes remove the old one and add the new one
			if (!_.isEqual(oldCol2Ids,newCol2Ids)) {
				handleRelatedRemoval(oldCol2Ids)
				handleRelatedAdd(newCol2Ids)
			}
		},
		
		removed: function THReactiveFirstObserveRemoved(doc: T) {
			let col2Ids = getCollection2IdsFromCollection1(doc)
			handleRelatedRemoval(col2Ids)
		}
	})

    if (debug) { l.info({ cursor2InitialIds }, "Got cursor2 initial ids from cursor1 observer") }


	// The initial observeChanges of dependent docs needs special casing to ignore docs that are subsequently removed.
	// This prevents updates about removed documents going to the client because we don't alter the
	// initial observer after we make it.
	let cusor2InitialAddDebugIds: string[] = []
	const initialObserveChangesHandlers = {
		added: function THReactiveFirstObserverChangeHandlerAdded(id: string, doc: U) {
			cusor2InitialAddDebugIds.push(id)
			if (cursor2RemovedIds.indexOf(id) == -1) {
				pub.added(collectionName, id, doc)
			}
		},
		changed: function THReactiveFirstObserverChangeHandlerChanged(id: string, doc: Partial<U>) {
			if (cursor2RemovedIds.indexOf(id) == -1) {
				pub.changed(collectionName, id, doc)
			}
		},
		removed: function THReactiveFirstObserverChangeHandlerRemoved(id: string) {
			if (cursor2RemovedIds.indexOf(id) == -1) {
				pub.removed(collectionName, id)
			}
		}
	}


    // Observe all of the _ids we just got on first run
    let selector: any
    if (getCollection2Filter) {
        selector = getCollection2Filter(cursor2InitialIds)
    } else {
        selector = {_id: {$in: cursor2InitialIds}}
    }
    if (debug) { l.info({ selector }, "Starting initial cursor2 observer") }

	let initialObserver = collection2.find(selector, collection2Options).observeChanges(initialObserveChangesHandlers)
	initializing = false

	l.info("Done initializing... marking pub ready!")

	// When the publication stops make sure to stop all of the observers
	pub.onStop(function () {
		l.info("Stopped...")
		col1Handle.stop()
		initialObserver.stop()
		for (const id of Object.keys(handles)) {
			handles[id].stop()
			delete handles[id]
		}
	})

}