How to set up mongodb/local with change streams for a publish method

This is with meteor 1.9.

I thought I’d introduce a little reactivity into my app so I started experimenting with Mongodb change streams but I don’t seem to be getting any notifications when running on my local db started by meteor.

The docHandler callback is never invoked even when I make manual changes to the document that the $match step matches.

Here’s my code - I stripped out some stuff that wasn’t interesting:

// Inspired by https://github.com/kschingiz/meteor-publish-change-streams/blob/master/publish-change-streams.js

Meteor.publish("Candidate.completionState", function(this, candidateId: any) {
  check(candidateId, String);
  const sub = this;
  const userId = sub.userId;

  const selector = {
    _id: candidateId
  };
  const pipeline: AggregationStep[] = [
    {
      $match: selector
    }
  ];

  const initial = 
    collection.findOne(selector)
  ).await();
  if (!initial) {
    throw new Meteor.Error("not-found");
  }
  sub.added("CandidateCompletions", candidateId, initial);
  sub.ready();
  console.log("ready");
  const changeStream = collection.watch(pipeline, {
    fullDocument: "updateLookup"
  });

  const publishRunner = () => {
    const docHandler = (doc: any) => {
      const { fullDocument, operationType } = doc;
      console.log("change happened");
      console.log(JSON.stringify(doc));
    };
    console.log("waiting for changestream");
    changeStream.on("change", docHandler);
  };

  publishRunner();
  this.onStop(() => {
    console.log("closing changestream");
    changeStream.close();
  });
});

From what I can see, the local meteor db is setup as a replica so it could work and the initial synchronous findOne query works fine so the collection connection is working.

What am I missing?

3 Likes

Oh, I did get this to work using classic Meteor way (returning the cursor from Collection.find(xxx) from the publish method) but I would prefer using change streams because of the ability to send out exactly the processed data I want under the collection name I want.

1 Like

Another update: I got the functionality I wanted by instead using the observeChanges method on the cursor returned from find.
It allowed me both to specify the published collection name and add transforms from the collection data to the published data.

It’s a shame that the main meteor docs page doesn’t have this as an example as it is just a few lines of code more than returning a cursor from the publish callback but gives much more control.

An example can be found on http://richsilv.github.io/meteor/meteor-low-level-publications/

Now I don’t see any advantage of using change streams anymore except if there is something else that I’m missing.

The most advantage of using change-streams is avoiding oplog tailing. Oplog tailing performance is bad and it is worst when your mongodb is a replicaset and you have multiple meteor instances running.

3 Likes

Would be great if Meteor implemented support for MongoDB change streams.

2 Likes