How to speed up publication using find.observe

Hi,

I have an app that logs events.

It only adds documents to a collection on the server named ‘Messages’ (there are no updates or removes). Each event doc has a timestamp field, named ‘t’ and a topic field, called ‘topic’. The collection has a combined index (topic,t) to prevent full collection scans.

I would like to publish the latest doc for each topic to a client collection named ‘LatestMessages’. Below my publication code.

The publication has two parts:
First part: An initial scan of the latest docs for each topic, followed by a self.ready() to push the initial docs asap to the client.

Second part: After the self.ready() a find.observe query is set up. This find.observe query replaces any new documents in the ‘LatestMessages’ collection.

My ‘Messages’ collection presently has 7 million docs.

In terms of performance the following happens:

  1. The first part (initial scan) runs fast (1s or so) and publishes the initial docs to the client’s ‘LatestMessages’
  2. It takes 5s-8s for the find.observe query to initialize and console.log its handle.
  3. An event is written to the ‘Messages’ collection.
  4. It takes 5s-8s for the find.observe query to push the new doc to ‘LatestMessages’

The find.observe query arguments are quite important:

  1. sort is required to find the latest doc
  2. limit is also important. limit:1, ignores two or more events that occur at the same time, limit:100 takes up too much time.

Any advice as to how to make find.observe more responsive?

Meteor.publish('latestmessages', function() {
  const self = this;
  if (self.userId) {
    let initializing = true;

    // Seed the collection, find unique topics, find latest values
    const raw = Messages.rawCollection();
    const distinct = Meteor.wrapAsync(raw.distinct, raw);
    const topics = distinct('topic');
    topics.forEach((topic) => { // find the latest for each topic and add to db
      const doc = Messages.findOne({ topic }, { sort: { t: -1 } });
      self.added('LatestMessages', topic, doc);
    });
    self.ready();

    console.log('ready');
    // Observe changes to the cursor
    const collectionHandle = Messages
      .find({}, { sort: { t: -1 }, limit: 10 })
        .observe({
          // no changed or removed hooks, as the app only logs
          added(newDoc) {
            if (!initializing) {
              console.log('newDoc', newDoc);
              const { topic } = newDoc;
              if (topics.indexOf(topic) === -1) { // if new topic
                self.added('LatestMessages', topic, newDoc); // overwrite the existing doc
                topics.push(topic);
              } else self.changed('LatestMessages', topic, newDoc);
            }
          },
        });
    console.log('collectionHandle', collectionHandle);

    initializing = false;
    self.onStop(() => {
      collectionHandle.stop();
    });
  } else self.ready();
});