Hi,
I have an app that logs events.
It only adds documents to a collection on the server named ‘Messages’ (there are no updates or removes). Each event doc has a timestamp field, named ‘t’ and a topic field, called ‘topic’. The collection has a combined index (topic,t) to prevent full collection scans.
I would like to publish the latest doc for each topic to a client collection named ‘LatestMessages’. Below my publication code.
The publication has two parts:
First part: An initial scan of the latest docs for each topic, followed by a self.ready() to push the initial docs asap to the client.
Second part: After the self.ready() a find.observe query is set up. This find.observe query replaces any new documents in the ‘LatestMessages’ collection.
My ‘Messages’ collection presently has 7 million docs.
In terms of performance the following happens:
- The first part (initial scan) runs fast (1s or so) and publishes the initial docs to the client’s ‘LatestMessages’
- It takes 5s-8s for the find.observe query to initialize and console.log its handle.
- An event is written to the ‘Messages’ collection.
- It takes 5s-8s for the find.observe query to push the new doc to ‘LatestMessages’
The find.observe query arguments are quite important:
- sort is required to find the latest doc
- limit is also important. limit:1, ignores two or more events that occur at the same time, limit:100 takes up too much time.
Any advice as to how to make find.observe more responsive?
Meteor.publish('latestmessages', function() {
const self = this;
if (self.userId) {
let initializing = true;
// Seed the collection, find unique topics, find latest values
const raw = Messages.rawCollection();
const distinct = Meteor.wrapAsync(raw.distinct, raw);
const topics = distinct('topic');
topics.forEach((topic) => { // find the latest for each topic and add to db
const doc = Messages.findOne({ topic }, { sort: { t: -1 } });
self.added('LatestMessages', topic, doc);
});
self.ready();
console.log('ready');
// Observe changes to the cursor
const collectionHandle = Messages
.find({}, { sort: { t: -1 }, limit: 10 })
.observe({
// no changed or removed hooks, as the app only logs
added(newDoc) {
if (!initializing) {
console.log('newDoc', newDoc);
const { topic } = newDoc;
if (topics.indexOf(topic) === -1) { // if new topic
self.added('LatestMessages', topic, newDoc); // overwrite the existing doc
topics.push(topic);
} else self.changed('LatestMessages', topic, newDoc);
}
},
});
console.log('collectionHandle', collectionHandle);
initializing = false;
self.onStop(() => {
collectionHandle.stop();
});
} else self.ready();
});