If you are open to trying a different approach, I have found that using a publication works pretty well for making aggregations reactive. I have also experienced issues when trying to detect changes on a collection using $out.
This is an example from one of my projects, but you can tweak it for your needs.
// Only publish data for the matches we care about. Be careful not to over-publish
Meteor.publish('MatchPointMetrics', function(leagueMatchId) {
var sub = this;
var initializing = true;
// Define our aggregation pipeline
var pipeline = [
{$match : {leagueMatchId: leagueMatchId}}, // Only aggregate the data we need
{$unwind : '$teams'},
{
$group: {
_id: '$teams.teamId',
total: {
$sum: '$teams.totalMatchPoints'
},
avg: {
$avg: '$teams.totalMatchPoints'
},
count: {
$sum: 1
}
}
}
];
// Track any changes on the collection we are going to use for aggregation
var query = Matches.find({leagueMatchId: leagueMatchId});
var handle = query.observeChanges({
added: function (id) {
// observeChanges only returns after the initial `added` callbacks
// have run. Until then, we don't want to send a lot of
// `self.changed()` messages - hence tracking the
// `initializing` state.
if (!initializing) {
runAggregation('changed');
}
},
removed: function (id) {
runAggregation('changed');
},
changed: function (id) {
runAggregation('changed');
},
error: function(err){
throw new Meteor.Error('Uh oh! something went wrong!', err.message);
}
});
// Instead, we'll send one `self.added()` message right after
// observeChanges has returned, and mark the subscription as
// ready.
initializing = false;
// Run the aggregation initially to add some data to our aggregation collection
runAggregation('added');
// Wrap the aggregation call inside of a function
// since it will be called more than once
function runAggregation(action){
Matches.aggregate(pipeline).forEach(function(e) {
if(action === 'changed'){
// Aggregate and update our collection with the new data changes
sub.changed('MatchPointMetrics', e._id, {
_id: e._id,
total: e.total,
avg: e.avg,
count: e.count
});
}
else {
// Aggregate and then add a new record to our collection
sub.added('MatchPointMetrics', e._id, {
_id: e._id,
total: e.total,
avg: e.avg,
count: e.count
});
}
// Mark the subscription ready
sub.ready();
});
}
// Stop observing the cursor when client unsubs.
// Stopping a subscription automatically takes
// care of sending the client any removed messages.
sub.onStop(function () {
handle.stop();
});
});
- /lib/collections/matchPointMetrics.js
MatchPointMetrics = new Mongo.Collection('MatchPointMetrics');
- /client/views/aggregate/aggregate.js
Template.aggregate.onRendered(function(){
// This ID is used the by aggregation to limit the records being processed
var leagueMatchId = this.data._id;
// These subscriptions don't need to be wrapped in an autorun in this example
this.subscribe('leagueNightMatches', leagueMatchId);
this.subscribe('MatchPointMetrics', leagueMatchId);
});
Template.aggregate.helpers({
homeTeamTotalPoints: function() {
return MatchPointMetrics.find({_id: this.homeTeamId});
},
visitingTeamTotalPoints: function() {
return MatchPointMetrics.find({_id: this.visitingTeamId});
}
});
- /client/views/aggregate/aggregate.html
<template name="aggregate">
<h4>Total Match Points By Team</h4>
{{#each homeTeamTotalPoints}}
<ul>
<li>Match ID: {{_id}}</li>
<li>Home team total match points: {{total}}</li>
<li>Matches played: {{count}}</li>
</ul>
{{/each}}
{{#each visitingTeamTotalPoints}}
<ul>
<li>Match ID: {{_id}}</li>
<li>Visiting team total match points: {{total}}</li>
<li>Matches played: {{count}}</li>
</ul>
{{/each}}
</template>
The example above is using meteorhacks:aggregate, but it is also possible to do it this way:
var db = MongoInternals.defaultRemoteCollectionDriver().mongo.db;
// Only publish data for the matches we care about. Be careful not to over-publish
Meteor.publish('MatchPointMetrics', function(leagueMatchId) {
var sub = this;
var initializing = true;
// Define our aggregation pipeline
var pipeline = [
{$match : {leagueMatchId: leagueMatchId}}, // Only aggregate the data we need
{$unwind : '$teams'},
{
$group: {
_id: '$teams.teamId',
total: {
$sum: '$teams.totalMatchPoints'
},
avg: {
$avg: '$teams.totalMatchPoints'
},
count: {
$sum: 1
}
}
}
];
// Track any changes on the collection we are going to use for aggregation
var query = Matches.find({leagueMatchId: leagueMatchId});
var handle = query.observeChanges({
added: function (id) {
// observeChanges only returns after the initial `added` callbacks
// have run. Until then, we don't want to send a lot of
// `self.changed()` messages - hence tracking the
// `initializing` state.
if (!initializing) {
runAggregation('changed');
}
},
removed: function (id) {
runAggregation('changed');
},
changed: function (id) {
runAggregation('changed');
},
error: function(err){
throw new Meteor.Error('Uh oh! something went wrong!', err.message);
}
});
// Instead, we'll send one `self.added()` message right after
// observeChanges has returned, and mark the subscription as
// ready.
initializing = false;
// Run the aggregation initially to add some data to our aggregation collection
runAggregation('added');
// Wrap the aggregation call inside of a function
// since it will be called more than once
function runAggregation(action){
// Wrap the callback to the current Meteor environment running in a fiber
db.collection('matches').aggregate(pipeline, Meteor.bindEnvironment(function(err, result) {
_.each(result, function(e) {
if(action === 'changed'){
// Aggregate and update our collection with the new data changes
sub.changed('MatchPointMetrics', e._id, {
_id: e._id,
total: e.total,
avg: e.avg,
count: e.count
});
}
else {
// Aggregate and then add a new record to our collection
sub.added('MatchPointMetrics', e._id, {
_id: e._id,
total: e.total,
avg: e.avg,
count: e.count
});
}
});
sub.ready();
}, function(error) {
// handle any errors with the aggregation
Meteor._debug('Error performing aggregation: ' + error);
}));
}
// Stop observing the cursor when client unsubs.
// Stopping a subscription automatically takes
// care of sending the client any removed messages.
sub.onStop(function () {
handle.stop();
});
});
If you don’t need the aggregations to be reactive on every change, a meteor method call might be a better approach to only aggregate every once in a while. This post by @riaan53 really sheds some light on various use cases when doing reactive aggregation and also things to be aware of.
I hope this helps !