Reactive aggregation

Is reactive aggregation possible on meteor? Or will it be?

1 Like

I’ve been asking the same question. You can look at this package which creates reactive triggers.

And this one which wraps mongo’s aggregate pipeline.

It can be partially created by polling the aggregate query every T seconds and publishing the result using observe handles, notifying the client of the changes.

Right now there are no packages that give you reactive queries. At best they give you a way to aggregate the data, but they’re not reactive. To get a reactive aggregate you have to roll up your sleeves. See after 35m35s of:

1 Like

this link maybe can halp

Read the code! meteor-aggregate does not provide reactive aggregations, it just exposes a simple wrapper for the raw aggregate method.

But if you look at the code for this, it does provide a way for you to reactively aggregate by specifying a Mongo find query to observe:

Hi all,

I highly recommend the peerlibrary:reactive-publish package.
Simple and sweet.

I’m sure it’s nice but this is about publishing aggregations, not queries or reactive joins.

I solved this in a project by simply aggregating the data into another collection. This is super simple to use and has some benefits performance-wise.

You can use cursor.obseve to trigger the aggregation on document insert, update or remove

If the aggregation is expensive, this is also a good solution. If it is really expensive, you can also wrap the callbacks in a .throttle, so that multiple changes within a short interval only trigger one aggregation (.debounce also works, but may delay the aggregation if many frequent changes ocure over a long period.

3 Likes

Here is my thinking with peerlibrary:reactive-publish:

  1. The package makes all collections reactive. Why is this significant? On the server, only published cursors are reactive.
  2. The package gives you an autorun that automatically cleans stuff up. this.autorun
  3. If all collections are reactive then you could easily do what @macrozone describes without resorting to cursor.observe.

For example:

Meteor.publish('aggregates', function () {
  this.autorun(function () {
    let data1 = Data1.find(/* use your aggregation functions*/).fetch();
    let data2 = Data2.find(/* use your aggregation functions*/).fetch();
    // create joined aggregated data.... from 2 sources.
    // Maybe you need to do more aggregate calculations...
    ...
    Agg.update(/* update your aggregated data in a collection*/);
    return Agg.find(/* whatever */);
  });
});

In this example Data1, Data2 and Agg would all be reactive and would trigger the autorun call. I think you could wrap the code within the autorun in .throttle. An alternative to using a separate aggregate collection is to publish the data yourself.

TheMeteorChef has a good description of how to use the mongo aggregate calls here. Using peerlibrary:reactive-publish makes the queries reactive for publishing.

I got a request to share an example of how to do an aggregation with cursor.observe

@brucejo showed an interesting example using reactive-publish, but be aware, that this runs the aggregation for every user that subscribes this publication and on every change.

if the result is the same for every user, you aggregate too much. But it’s an interesting way, if you aggregate something per user.

So the other way with cursor.observe would be something like this:


const doTheAggregation () => {
  // here you do the actual aggregation. In this example its one job
  const data1 = Data1.find(...).fetch();
  const data2 = Data2.find(...).fetch();
  // ...
  AggregatedCollection.update(...);
}

// this function will update the AggregatedCollection whenever it is run. 
// But as it might be expensive, we want to have control when it should run
// so our approach is to run it whenever one of the source data changes:

Data1.find({}).observe({
   added: doTheAggregation,
   changed: doTheAggregation,
   removed: doTheAggregation
});

Data2.find({}).observe({
   added: doTheAggregation,
   changed: doTheAggregation,
   removed: doTheAggregation
});

// be aware that added is also called for every document whenever the server starts
// you could do something like this to prevent this:

let startup = true;
Data1.find({}).observe({
   added: () => {
     if(!startup) {
       doTheAggregation();
     } 
  },
   changed: doTheAggregation,
   removed: doTheAggregation
});
startup = false;


// also you might want to throttle or debounce doTheAggregation with _.debounce or _.throttle so that it does not run when multiple documents are changed in a short time period.

1 Like

Your solution seems very interesting, but how are you doing an aggregation function in .find()? I would have to implement your solution via the aggregation package (collection.aggregate(pipeline)) but that’s not reactive even if it’s in the this.autorun. So, how did you manage to implement?

Mea culpa!

As I have been doing this longer I am uncovering more of my ignorance.

I was not really doing aggregations in my case. :flushed:

@macrozone’s observations are exactly right and not suitable for all aggregations.

I have been using this pattern to make observations on multiple collections that affect the selector for another collection. And in my case the data is per user.

Slightly better informed suggestion

I do not think I could ever claim I am “ignorance free”. With that caveat here is a suggestion to try.

For true aggregations, you might want to check the sibling server-autorun and reactive-mongo packages. Then you could set up an autorun that simply listens to the sources for your aggregations, detecting a change you can calculate the aggregations and dump them into another collection that could be reactive for all subscribers.

For calculating the aggregations you could just query the Collections yourself and do the calculations.

Accessing Mongo’s Collection.aggregate method within Meteor

You may be able to use the MongoDB aggregation pipeline by wrapping the function for Meteor. I forgot where I found this technique, but here is an example that I used for bulkWrite:

Mongo.Collection.prototype.bulkWrite = function bulkWrite(array) {
  let coll;
  if(this.rawCollection) {
    // >= Meteor 1.0.4
    coll = this.rawCollection();
  } else {
  // < Meteor 1.0.4
    coll = this._getCollection();
  }
  return wrapAsync(coll.bulkWrite.bind(coll))(array);
};

So to call the aggregate function, you may be able to add this and you will have access within Meteor’s fiber system. I do not believe the Collection.aggregate call would become reactive because I am imagining that the reactive packages intercept the selector and observe that. You may need to monitor one of the sources for the aggregation directly so the autorun would fire:

Mongo.Collection.prototype.aggregate = function aggregate(array) {
  let coll;
  if(this.rawCollection) {
    // >= Meteor 1.0.4
    coll = this.rawCollection();
  } else {
  // < Meteor 1.0.4
    coll = this._getCollection();
  }
  return wrapAsync(coll.aggregate.bind(coll))(array);
};

Finally, the end

Maybe there is some help in this long winded response :slight_smile:.

Let me know if you make some progress.

1 Like

Here’s what I ended up doing. I had a unique case where I have a collection of TicketGroups that each have an eventId. For my pagination, I want a limit of 50, but not 50 TicketGroups; I need 50 unique Events (i.e. group by eventId, which may yield many many more than 50 TicketGroups), but I have no Events collection, just TicketGroup collection. So, I couldn’t just watch the first 50 TicketGroups via a straightforward find().fetch(), I needed to make the aggregation call on the TicketGroup collection grouped by eventId. Then, I would need to watch for any changes on any of the TicketGroups that have an eventId within the first 50 (or whatever limit is up to) unique eventIds returned.

I am partially using your idea of a separate aggregate collection, but the observation is tricky. I generate TicketGroups via a non-reactive aggregation call to the TicketGroup collection and group by eventId so I have 50 unique eventIds. I map those to an array and then do a normal Collection.find().fetch() with the operator eventId: {$in: eventIds} - so now the autorun has a reactive query and it’s only watching TicketGroups that have those aggregated eventIds, so the autorun will only be triggered watching those specific TicketGroups I’d want to return to the client. This way, it doesn’t overrun, and it doesn’t have to watch all TicketGroups in the collection for that user (which might be tens of thousands…). Now my agg collection has a document not only with just the aggregated data, but also with the exact amount based on what the limit is up to. It also only returns fields aggregation function uses in {fields: {}} options param in reactive call so it’s only watching for changes on those fields. Super optimized.

On a deployed application (Digital Ocean - 2GB RAM, mLab mongo 2GB DB), the TicketGroups collection has 1,100,000 documents and is about 1GB. The user I am testing on has 55,199 TicketGroups (documents) and I am loading 50 events worth of Aggregated TicketGroups to the client. It’s aggregating about 1227 documents (~24.54 per event) and rendering the aggregation as a list of 50 events.

Speeds:

  • Publication on server completes and returns data in ~650ms
  • Publication on server updates with increased limit (+50 events) in ~920ms, 990ms on next +50 events
  • Client subscribes, receives, and renders first 50 events in ~700ms
  • Client subscribes, receives, and renders next 50 events in ~1600ms

I tested this against a raw Method call to perform the same aggregation and it’s nearly equal which is a great sign. Can’t get much faster than how a straightforward aggregation call would perform.

Here’s my code.

Meteor.publish('inventoryReactiveAgg', function(limit, sort, search) {
  if (!this.userId) return;

  search = search ? generateSearch(search) : null;

  const
    userBid   = Meteor.users.findOne({ _id: this.userId }).profile.bid,
    pipeline  = [
      { $match : {
          bid              : userBid,
          'event.dateUnix' : { $gte : momentTz().tz("America/New_York").startOf('hour').subtract(24, 'hours').unix()},
          ...search
        }
      },
      { $group  : {
          _id        : '$eventId',
          mine       : { $sum      : '$ticket.quantity' },
          name       : { $first    : '$event.name' },
          dateUnix   : { $first    : '$event.dateUnix' },
          date       : { $first    : '$event.date' },
          venue      : { $first    : '$event.venue' },
        }
      },
      { $limit : limit || Meteor.settings.public.limit },
      { $sort  : sort  || { 'dateUnix': 1 }}
    ];

  this.autorun(function () {
    let
      tgs_agg      = TicketGroups.aggregate(pipeline),
      // Get eventIds within what's returned from agg within limit
      reac_find     = Object.assign(pipeline[0]['$match'], {'eventId': {$in: tgs_agg.map(a => a._id)}}),
      // Only reactively watch for changes in collection on tg's to those eventIds and the fields in agg function
      reac_fields  = {fields: {'eventId': 1, 'ticket.quantity': 1, 'event': 1}},
      // map to null so doesn't take up room on server, will still work
      tgs_reac    = TicketGroups.find(reac_find, reac_fields).map(a => null),
      // Define doc in agg col to update
      agg_find  = {userId: this.userId};

    TicketGroups_agg.upsert(agg_find, {
      $set: {
        'data': tgs_agg
      }
    });

    // anything we can null so doesnt take up room on server
    // imagine if many users are connected, this could unecessarily overload memory
    tgs_reac  = null;
    tgs_agg   = null;
    reac_find = null;

    return TicketGroups_agg.find(agg_find);
  });
});

It works very well.

2 Likes

Very cool. I see in your example that your aggregations are per user, so it is appropriate to perform the autorun in the publish code.

And thanks for sharing!

If you already know exactly what fields are being mutated and how they effect other data, you may consider just doing the aggregate yourself in application code (i.e. inside a method call).

This is basically what I’m getting at.

Note, since this appears to be a bidding application, the end user may dislike your approach’s eventual consistency. Some changes, like a change to a user’s bid price followed by a purchase, may occur before other users receive any updates to inventory. In other words, some users will be able to change their bids, see new inventory and buy the inventory all before other users receive the pre-change inventory—and these “late” users will be seeing stale/nonexistent inventory.

This is due to the non-transactional architecture of what you’re doing and the specific kind of concurrency in Meteor (all user calls are ordered, but between users calls may be interleaved).

You almost certainly want to use a single document to represent a group of inventory all users may bid on, because then per-document locking in Mongo essentially enforce a first-in, first-out order book.

@doctorpangloss with my solution, the return from publication was nearly same speed as straightforward method call. Also, bid is an identification number (Broker ID) :slight_smile:. So it’s not a bidding app and the eventual consistency is OK in this case.

It was really the match step that was killing me in the aggregation. I further reduced the query time in two ways:

  • I added a configurable $lte to the date $match so it wouldn’t look at ‘all future events’ - just what was in the range of my $gte and $lte (huge speedup here)
  • I added a new field to the data (dateDayUnix) and am filtering and indexing by it instead of by dateUnix. This new field is the Unix ms time for the end of day of the events date. This helped because while each of the events have dates, dateUnix was the exact time in the day for the event, whereas dateDayUnix is just the general day the event is, and this significantly improved speed since index table is now smaller (many events share same day, but not nearly as many share exact same time).

Great, it sounds like it’s under control and it’s a great example

Yup, thank you. Would like to think about how to make such a solution available as a package since the current reactiveAggregation packages fail miserably on large collections. I was getting +10s waits with those packages.