Two subscriptions to the same publication

Hello

I have a problem with the following piece of code in meteor.

To begin with I have 2 publications over the same collection.

  • The first one publishes top 10 non-processed records.
  • The second one publishes all processed records.

The workflow is as follows:

  1. User click one of the records from top list.
  2. Meteor server method is called where record is updated and marked processed.
  3. Client side receive reactive update. I expect that the record just marked processed will be removed from the first subscription and added to the second one. But in ~50% of cases it raises Expected to find a document to change error in code below:
update: function (msg) {
	var mongoId = MongoID.idParse(msg.id);
	var doc = self._collection.findOne(mongoId);

	// Is this a "replace the whole doc" message coming from the quiescence
	// of method writes to an object? (Note that 'undefined' is a valid
	// value meaning "remove it".)
	if (msg.msg === 'replace') {
	  var replace = msg.replace;
	  if (!replace) {
		if (doc)
		  self._collection.remove(mongoId);
	  } else if (!doc) {
		self._collection.insert(replace);
	  } else {
		// XXX check that replace has no $ ops
		self._collection.update(mongoId, replace);
	  }
	  return;
	} else if (msg.msg === 'added') {
	  if (doc) {
		throw new Error("Expected not to find a document already present for an add");
	  }
	  self._collection.insert(_.extend({_id: mongoId}, msg.fields));
	} else if (msg.msg === 'removed') {
	  if (!doc)
		throw new Error("Expected to find a document already present for removed");
	  self._collection.remove(mongoId);
	} else if (msg.msg === 'changed') {
	  if (!doc)
		throw new Error("Expected to find a document to change");
	  if (!_.isEmpty(msg.fields)) {
		var modifier = {};
		_.each(msg.fields, function (value, key) {
		  if (value === undefined) {
			if (!modifier.$unset)
			  modifier.$unset = {};
			modifier.$unset[key] = 1;
		  } else {
			if (!modifier.$set)
			  modifier.$set = {};
			modifier.$set[key] = value;
		  }
		});
		self._collection.update(mongoId, modifier);
	  }
	} else {
	  throw new Error("I don't know how to deal with this message");
	}
},

I found that after step #2 client side receive 3 messages. Sometimes they go in correct order - sometimes not.

Correct order is:

  1. changed - record updated
  2. removed - record removed from the first subscription
  3. added - record added to another subscription

Incorrect order:

  1. removed - record removed from the first subscription
  2. changed - since the record is removed by the previous “removed” message it does not exist anymore and meteor throws the Expected to find a document to change error
  3. added - record added to another subscription

Sometimes I also receive Expected to find a document already present for removed error which also caused by the wrong message order.

So, my question is the order of these messages is guaranteed by meteor? It looks like a bug for me that I receive changed message after removed. Is it a bug or there is something I don’t understand?

1 Like

Meteor does not guarantee messages order, but I don’t quite understand why you’re using a watcher when all you’re doing is accepting the updates.

Is there a reason why you’re listening to update events instead of just using two find() with filters on “not processed” and “post processed”?

I don’t use the watcher. But it throws errors and I looked into it only for debugging purposes. As you said I have two find() filters. All these messages processed by watcher in background.

Error: Expected to find a document to change
    at Object.update (http://localhost:3000/packages/mongo.js?hash=adc10c38562bf65f25a42b2f193db2c0c97e2ae4:246:29)
    at Object.store.(anonymous function) [as update] (http://localhost:3000/packages/ddp-client.js?hash=27502404fad7fc072e57e8b0b6719f40d92709c7:3613:48)
    at http://localhost:3000/packages/ddp-client.js?hash=27502404fad7fc072e57e8b0b6719f40d92709c7:4441:19
    at Array.forEach (native)
    at Function._.each._.forEach (http://localhost:3000/packages/underscore.js?hash=27b3d669b418de8577518760446467e6ff429b1e:149:11)
    at http://localhost:3000/packages/ddp-client.js?hash=27502404fad7fc072e57e8b0b6719f40d92709c7:4440:13
    at Function._.each._.forEach (http://localhost:3000/packages/underscore.js?hash=27b3d669b418de8577518760446467e6ff429b1e:157:22)
    at Connection._performWrites (http://localhost:3000/packages/ddp-client.js?hash=27502404fad7fc072e57e8b0b6719f40d92709c7:4437:9)
    at Connection._flushBufferedWrites (http://localhost:3000/packages/ddp-client.js?hash=27502404fad7fc072e57e8b0b6719f40d92709c7:4423:10)
    at Connection._livedata_data (http://localhost:3000/packages/ddp-client.js?hash=27502404fad7fc072e57e8b0b6719f40d92709c7:4391:12)

Can you share your find() snippets? I’ve never seen that error as far as I can tell.

I slightly modified it to make it clearer. I use query constructor to make filter identical on server side and client side. Here is all the code.

// My query constructor
Individuals.appliedJobsQuery = ({jobId, states, limit}) => {

    let query = {
        filter: {
            applications: {$elemMatch: {}}
        },
        options: {}
    };

    if (jobId)
        query.filter.applications.$elemMatch.jobId = jobId;

    if (_.isArray(states))
        query.filter.applications.$elemMatch.state = {$in: states };

    if (_.isNumber(limit))
        query.options.limit = limit;

    return query;
};

// My subscriptions:
// Top 10 individuals in status #2
this.subscribe('Publications.Individuals.appliedForJob', this.params._id, 10, [2]); 
// All individuals in status #3, #4 and #5
this.subscribe('Publications.Individuals.appliedForJob', this.params._id, null, [3, 4, 5]); 

// Publication. Yes, I use publishComposite
Meteor.publishComposite('Publications.Individuals.appliedForJob', function(jobId, limit, states) {
    return {
        find() {
            let query = Individuals.appliedJobsQuery({ 
				jobId: jobId, 
				limit: limit, 
				states: states 
			});
            return Individuals.find(query.filter, query.options);
        },
		children: [ 
			/* ... */ 
		]
	}
});

/* Top 10 unprocessed records */
let unprocessedQuery = Individuals.appliedJobsQuery({ jobId: job._id, states: [2], limit: Template.ShortlistTemplate.matchesLength);
let unprocessed = Individuals.find(unprocessedQuery.filter, unprocessedQuery.options);
	
/* All processed records */
let processedQuery = Individuals.appliedJobsQuery({ jobId: job._id, states: [3, 4, 5])
let processed = Individuals.find(processedQuery.filter, processedQuery.options);

Probably it’s because Meteor.publishComposite? I just checked and this package is not updated since 2015. I’ll try to replace it with simple Meteor.publish.