Why is mongo aggregation not reactive in meteor?

Meteor newb here. I’m perplexed by the reactivity of Meteor. I can update a collection from a Mongo console, and it will update the UI instantly. But not with mongo aggregate?

I’m using meteorhacks:aggregate to get mongo’s aggregate() loaded in meteor.

The aggregation works great. I can see the data update instantly in a mongo console. However if I expose it to the UI, there is no update, even on a client refresh.

db.collection:

{a:1,b:2}
{a:1,b:2}

The code:
Collection = new Meteor.Collection(‘collection’)
AggCollection = new Meteor.Collection(‘aggcollection’)

Meteor.methods({
    pleaseAggregate: function() {
      Collection.aggregate([{
        $group : {
            _id : "$a",
            count: { $sum: 1} //should return 2 with the sample data above
        }
      },
      {$out : "aggcollection"}
       ]);
    }
});

HTML

<p>Aggregates: {{agg.count}}</p>

Client.js

Template.debug.helpers({
 agg: function() {
    return AggCollection.find().fetch()[0]
  }
});

BTW it is being published, I have ‘insecure’ installed.

I’m missing something obvious to meteor I guess. What is it?

*Also posted here: http://stackoverflow.com/questions/31620268/why-is-mongo-aggregation-not-reactive-in-meteor

3 Likes

I guess it’s not an obvious problem.
Is it related to the new collection ‘aggcollection’ not having _id field generated by meteor?

@arunoda Any thoughts on this? Why would collections created by aggregate() not be reactive?

insecure just means there are no security around read/write to collections in mongo, however it has nothing to do with publishing, make sure you are publishing/subscribing or have autopublish package. This package publishes your whole mongo db to minimongo (meteor client side db)

I think he meant autopublish. I tried it out also. For whatever reason, when the aggregation puts things into the collection, Meteor doesn’t know about it so doesn’t reactively update

If you are open to trying a different approach, I have found that using a publication works pretty well for making aggregations reactive. I have also experienced issues when trying to detect changes on a collection using $out.

This is an example from one of my projects, but you can tweak it for your needs.

  • /server/publish.js
// Only publish data for the matches we care about. Be careful not to over-publish
Meteor.publish('MatchPointMetrics', function(leagueMatchId) {
  var sub = this;
  var initializing = true;
  // Define our aggregation pipeline
  var pipeline = [
    {$match : {leagueMatchId: leagueMatchId}}, // Only aggregate the data we need
    {$unwind : '$teams'},
    {
      $group: {
        _id: '$teams.teamId',
        total: {
          $sum: '$teams.totalMatchPoints'
        },
        avg: {
          $avg: '$teams.totalMatchPoints'
        },
        count: {
          $sum: 1
        }
      }
    }
  ];
  // Track any changes on the collection we are going to use for aggregation
  var query = Matches.find({leagueMatchId: leagueMatchId});
  var handle = query.observeChanges({
    added: function (id) {
      // observeChanges only returns after the initial `added` callbacks
      // have run. Until then, we don't want to send a lot of
      // `self.changed()` messages - hence tracking the
      // `initializing` state.
      if (!initializing) {
        runAggregation('changed');
      }
    },
    removed: function (id) {
      runAggregation('changed');
    },
    changed: function (id) {
      runAggregation('changed');
    },
    error: function(err){
      throw new Meteor.Error('Uh oh! something went wrong!', err.message);
    }
  });
  // Instead, we'll send one `self.added()` message right after
  // observeChanges has returned, and mark the subscription as
  // ready.
  initializing = false;
  // Run the aggregation initially to add some data to our aggregation collection
  runAggregation('added');
  // Wrap the aggregation call inside of a function
  // since it will be called more than once
  function runAggregation(action){
    Matches.aggregate(pipeline).forEach(function(e) {
      if(action === 'changed'){
        // Aggregate and update our collection with the new data changes
        sub.changed('MatchPointMetrics', e._id, {
          _id: e._id,
          total: e.total,
          avg: e.avg,
          count: e.count
        });
      }
      else {
        // Aggregate and then add a new record to our collection
        sub.added('MatchPointMetrics', e._id, {
          _id: e._id,
          total: e.total,
          avg: e.avg,
          count: e.count
        });
      }
      // Mark the subscription ready
      sub.ready();
    });
  }
  // Stop observing the cursor when client unsubs.
  // Stopping a subscription automatically takes
  // care of sending the client any removed messages.
  sub.onStop(function () {
    handle.stop();
  });
});
  • /lib/collections/matchPointMetrics.js
MatchPointMetrics = new Mongo.Collection('MatchPointMetrics');
  • /client/views/aggregate/aggregate.js
Template.aggregate.onRendered(function(){
  // This ID is used the by aggregation to limit the records being processed
  var leagueMatchId = this.data._id;
  // These subscriptions don't need to be wrapped in an autorun in this example
  this.subscribe('leagueNightMatches', leagueMatchId);
  this.subscribe('MatchPointMetrics', leagueMatchId);
});
Template.aggregate.helpers({
  homeTeamTotalPoints: function() {
    return MatchPointMetrics.find({_id: this.homeTeamId});
  },
  visitingTeamTotalPoints: function() {
    return MatchPointMetrics.find({_id: this.visitingTeamId});
  }
});
  • /client/views/aggregate/aggregate.html
<template name="aggregate">
  <h4>Total Match Points By Team</h4>
  {{#each homeTeamTotalPoints}}
    <ul>
      <li>Match ID: {{_id}}</li>
      <li>Home team total match points: {{total}}</li>
      <li>Matches played: {{count}}</li>
    </ul>
  {{/each}}
  {{#each visitingTeamTotalPoints}}
    <ul>
      <li>Match ID: {{_id}}</li>
      <li>Visiting team total match points: {{total}}</li>
      <li>Matches played: {{count}}</li>
    </ul>
  {{/each}}
</template>

The example above is using meteorhacks:aggregate, but it is also possible to do it this way:

  • /server/publish.js
var db = MongoInternals.defaultRemoteCollectionDriver().mongo.db;
// Only publish data for the matches we care about. Be careful not to over-publish
Meteor.publish('MatchPointMetrics', function(leagueMatchId) {
  var sub = this;
  var initializing = true;
  // Define our aggregation pipeline
  var pipeline = [
    {$match : {leagueMatchId: leagueMatchId}}, // Only aggregate the data we need
    {$unwind : '$teams'},
    {
      $group: {
        _id: '$teams.teamId',
        total: {
          $sum: '$teams.totalMatchPoints'
        },
        avg: {
          $avg: '$teams.totalMatchPoints'
        },
        count: {
          $sum: 1
        }
      }
    }
  ];
  // Track any changes on the collection we are going to use for aggregation
  var query = Matches.find({leagueMatchId: leagueMatchId});
  var handle = query.observeChanges({
    added: function (id) {
      // observeChanges only returns after the initial `added` callbacks
      // have run. Until then, we don't want to send a lot of
      // `self.changed()` messages - hence tracking the
      // `initializing` state.
      if (!initializing) {
        runAggregation('changed');
      }
    },
    removed: function (id) {
      runAggregation('changed');
    },
    changed: function (id) {
      runAggregation('changed');
    },
    error: function(err){
      throw new Meteor.Error('Uh oh! something went wrong!', err.message);
    }
  });
  // Instead, we'll send one `self.added()` message right after
  // observeChanges has returned, and mark the subscription as
  // ready.
  initializing = false;
  // Run the aggregation initially to add some data to our aggregation collection
  runAggregation('added');
  // Wrap the aggregation call inside of a function
  // since it will be called more than once
  function runAggregation(action){
    // Wrap the callback to the current Meteor environment running in a fiber
    db.collection('matches').aggregate(pipeline, Meteor.bindEnvironment(function(err, result) {
      _.each(result, function(e) {
        if(action === 'changed'){
          // Aggregate and update our collection with the new data changes
          sub.changed('MatchPointMetrics', e._id, {
            _id: e._id,
            total: e.total,
            avg: e.avg,
            count: e.count
          });
        }
        else {
          // Aggregate and then add a new record to our collection
          sub.added('MatchPointMetrics', e._id, {
            _id: e._id,
            total: e.total,
            avg: e.avg,
            count: e.count
          });
        }
      });
      sub.ready();
    }, function(error) {
      // handle any errors with the aggregation
      Meteor._debug('Error performing aggregation: ' + error);
    }));
  }
  // Stop observing the cursor when client unsubs.
  // Stopping a subscription automatically takes
  // care of sending the client any removed messages.
  sub.onStop(function () {
    handle.stop();
  });
});

If you don’t need the aggregations to be reactive on every change, a meteor method call might be a better approach to only aggregate every once in a while. This post by @riaan53 really sheds some light on various use cases when doing reactive aggregation and also things to be aware of.

I hope this helps !

10 Likes

@keithnicholas Yes exactly I meant autopublish .

@levid You can reproduce the error when you use the aggregate $out to update a collection. You’ll see that the reactivity fails on the server-side. If you were to pass the output to a new collection.insert() operation it works fine. This leads me to think there is a bug somewhere server-side in Meteor. @debergalis Is there a similar issue I can link to on Github or is this a new issue?

   results = Collection.aggregate([{
        $group : {
            _id : "$a",
            count: { $sum: 1} //should return 2 with the sample data above
        }
   }]);

  OtherCollection.insert(results);

Here is a repo to reproduce the issue:

Unless I’m making a mistake in the code, this should be ticketed a bug in meteor.

1 Like

Thanks for sharing. I got almost all of this, but there’s a problem. In your case, your $group aggregation is actually an id. What happens if you are grouping by, say, month?

So when you track changes, you aren’t able to edit the document, because you don’t know what is the record’s id for the aggregation.

E.g.: after aggregating, you have:
{ _id: { month: "January" }, totalProd: 25 }

Then you add it to the aggregated Collection. If you have a new record in January, how the subscription can keep track of it considering this.change() takes an id?

See @glasser 's response: https://github.com/meteor/meteor/issues/4947
The issue is Mongo’s internal use of a temp collection.

FYI, Mongo groupby requires the group items to be a key of " _id".
The _id doesn’t impact the reactivity. You can test this in a mongo console, without using pipeline.

Hi Levid!

I like your solution (without including the aggregation plugin). Thanks for posting it!

I noticed in your server code, the

Meteor.publish( .... )

function doesn’t have a return statement. I tried returning ‘query’ but that doesn’t include the aggregation. What do you return from that function?

Hi @thunderrabbit,

My example doesn’t return a cursor or array of cursors, instead it is directly controlling the published record set ‘MatchPointMetrics’ using the added/changed/removed interface. You can find a full explanation by referring to the ‘counts-by-room’ example in the meteor docs:

If a publish function does not return a cursor or array of cursors, it is assumed to be using the low-level added/changed/removed interface, and it must also call ready once the initial record set is complete.

If you do want to return a cursor, you can add this code to the bottom of the publication, but it’s essentially already doing this using added/changed/removed.

return MatchPointMetrics.find({_id: leagueMatchId});

HI levid!

Thanks for that. You’re right of course; no return statement needed! In my case I needed to change

sub.changed('MatchPointMetrics', e._id, { ... }

to

sub.changed('match_point_metrics', e._id, { ... }

Where match_point_metrics is the name of my actual collection in Mongo:

MatchPointMetrics = new Mongo.Collection("match_point_metrics");

I don’t yet know why this is the case, but it works, so I’m happy.

Thank you for helping me aggregate a collection!

Just wanted to share a working solution for reactive aggregations, can anyone see any issues with this when scaling this up? Anyhow it works really well when I’ve tested it, though I was unable to send removed events to the client (don’t know why) so I’m clearing the data in those docs instead.

const btoa = Meteor.npmRequire('btoa');

Meteor.publish('voting-results', function() {
  const sub = this;
  let initializing = true;
  const pipeline = [
    // {
    //   $match: {
    //   },
    // },
    {
      $group: {
        _id: {
          name: "$choice",
        },
        count: {
          $sum: 1
        },
        isFinal: {
          $sum: "$isFinal"
        }
      }
    }
  ];

  var query = Vote.find({});
  var handle = query.observeChanges({
    added: function (id) {
      // observeChanges only returns after the initial `added` callbacks
      // have run. Until then, we don't want to send a lot of
      // `self.changed()` messages - hence tracking the
      // `initializing` state.
      if (!initializing) {
        runAggregation('changed');
      }
    },
    removed: function (id) {
      runAggregation('changed');
    },
    changed: function (id) {
      runAggregation('changed');
    },
    error: function(err){
      throw new Meteor.Error('Uh oh! something went wrong!', err.message);
    }
  });

  // Wrap the aggregation call inside of a function
  // since it will be called more than once
  function runAggregation(action) {
    sub._ids = sub._ids || {};
    sub._iteration = sub._iteration || 0;

    // Wrap the callback to the current Meteor environment running in a fiber
    Vote.aggregate(pipeline, Meteor.bindEnvironment(function(err, result) {
      _.each(result, function(row) {
        let _id = btoa(JSON.stringify(row._id)).replace(/=+$/, '');
        if(typeof sub._ids[_id] !== 'undefined'){
          // Aggregate and update our collection with the new data changes
          sub.changed('voting-options', _id, _.extend(row._id, {_id: _id, count: row.count, isFinal: row.isFinal}));
        }
        else {
          // Aggregate and then add a new record to our collection
          sub.added('voting-options', _id, _.extend(row._id, {_id: _id, count: row.count, isFinal: row.isFinal}));
        }
        sub._ids[_id] = sub._iteration;
      });
      // Remove old options that was removed.
      for (let _id in sub._ids) {
        if (sub._ids[_id] != sub._iteration) {
          delete sub._ids[_id];
          sub.changed('voting-options', _id, {_id: _id, name: undefined, count: undefined, isFinal: undefined});
        }
      }

      sub._iteration++;
      sub.ready();
    }, function(error) {
      // handle any errors with the aggregation
      Meteor._debug('Error performing aggregation: ' + error);
    }));
  }

  // Instead, we'll send one `self.added()` message right after
  // observeChanges has returned, and mark the subscription as
  // ready.
  initializing = false;
  // Run the aggregation initially to add some data to our aggregation collection
  runAggregation('added');

  // Stop observing the cursor when client unsubs.
  // Stopping a subscription automatically takes
  // care of sending the client any removed messages.
  sub.onStop(function () {
    handle.stop();
  });
});
3 Likes

Thanks a lot @timbrandin for this! :smile:

1 Like

Thanks @levid and @timbrandin, very useful stuff!

@timbrandin, is there any reason to use
sub.changed('voting-options', _id, {_id: _id, name: undefined, count: undefined, isFinal: undefined});
instead of
sub.removed('voting-options', _id);

I couldn’t actually get the remove to work, it gave me other errors I’m not sure why. But that would be the correct solution yes!

Skickat från min iPhone

Alright, it is working for me.
Thanks for the quick response!

1 Like

Maybe you could build off from this basic reactive aggregation example: http://stackoverflow.com/questions/34393216/meteor-aggregated-result-not-showing-in-template/34401468#34401468

I turned my helper to reactively publish aggregations into a package: Source at GitHub

3 Likes