After taking a look at this post: Publishing aggregations is REAL! and participating in the subsequent discussion. I decided to take some of the custom code I’d been running to deal with publishing aggregations, and convert it into a package. I based the package on the standard publication multiplexers/observerhandles, however It’s still a work in progress (I’m not familiar enough with the core multiplexer to understand why/when to use the noYieldsAllowed function).
That being said, it is functional and (relatively) performant:
https://bitbucket.org/znewsham/meteor-publish-aggregate/src/master/
Usage is trivial:
Meteor.publish("testAggregation", () => {
  const pipeline = [
    { $match: ... },
    { $group: { _id: "$someField", count: { $sum: 1 } } }
  ];
  //defaults shown
  const options = {
    name: `${MyCollection.name}_aggregated`,
    throttle: 10000,
    alwaysRerun: false
  };
  return MyCollection.aggregationCursor(
    pipeline,
    options
  );
});
It would be great if someone with knowledge of the core live data package could chime in on possible problems with this approach.
             
            
              
              
              2 Likes
            
            
                
                
              
           
          
            
            
              I understand the attraction of change streams, but comparing the code in your package with that in jcbernack:reactive-aggregate or tunguska:reactive-aggregate it seems a lot more complex.
Do you have any metrics comparing this approach with the basic Meteor pub/sub API approach used in those packages, or even with cultofcoders:grapher, which takes a completely different approach to multi-collection queries and aggregations?
             
            
              
              
              1 Like
            
            
                
                
              
           
          
            
            
              It certainly ended up being a lot more complicated than I expected! My initial implementation for my specific use case was much simpler, handling general usage is a lot more complicated and as I mentioned in my initial approach, I haven’t even handled edge cases such as change events firing while updating the observers.
I actually wasn’t aware of those packages existance (which I guess highlights my lack of ability with Googling!) however their approaches wouldnt work for me. I’m aggregating over collections stored in a secondary database (in a secondary replicaset) where I specifically do not want to rely on the oplog, as the changes to the entire database are frequent, however the changes to a single queried set of data are typically infrequent - as such, change streams are perfect for me (and infact the only way to go).
Regarding performance, I’ll have a crack at implementing performance metrics shortly - however one thing I note about the reactive-aggregate packages (both are essentially the same) is that if multiple clients subscribe to the same aggregation, it would appear that each aggregation will re-run on change, in my implementation I have ensured that each distinct aggregation is ran only once (similar to the way the regular observe handler works.
Regarding functionality - as my implementation extends the cursor with _publishCursor and observeChanges functions, you can treat them almost identically to any other cursor e.g., return an array of cursors from a publication, or observe changes to an aggregation on the server.
             
            
              
              
              2 Likes