Reactive Aggregation and high sustained data traffic between data server and meteor server

Hello everyone, we have a curious situation with sustained queries running between our server (on Digital Ocean) and our database (on Mongo Atlas) that seems to only occur when we run aggregate queries through tunguska-reactive-aggregate.

(Sorry this is a long description. I wanted to put the facts out as best as I could to help give context)

We have an admin portion of the app that queries for the first 10 records of a collection and waits for the user to select a record, ask for the next 10, sort or filter. Typical paginated usage. On the server, we assemble an aggregate query (using tunguska-reactive-aggregate) to assemble things from other collections that will help a user find things in the current collection. Things like the user’s email that is not available in the current collection.

What happens is that while the user is looking at these first 10 records, Atlas is showing considerable, sustained activity (“internet”) and Digital Ocean is likewise showing this activity coming in, but no activity going out. On the client side, minimongo shows the 10 records and the total bytes transferred is correct for these 10 records

If the user leaves this page up, no further minimongo activity is recorded (as expected) but Digital Ocean continues to be receiving a sustained traffic of data from Atlas. If the user leaves the window open for very long they are getting billed the whole time. So far that has reached $40 but that could get well over $300 at the rate Atlas charges. So far we/they have been fortunate.

Once the user selects a record or leaves the page, the activity between Atlas and Digital Ocean ceases, so it seems that this is a publication issue and it gets extinguished when the subscription ends. Should a second admin user open the same page and leave it on their browser, the traffic does NOT increase at all. If that admin users goes to the listing page of a different collection, then they DO add to the traffic. The amount of added usage increases with the total # of records in the collection. These are hopefully useful pieces of the puzzle.

So my question is this: do reactive aggregate queries behave this way (is this expected behaviour) or have we created a poor aggregate query?

The query is assembled incrementally, but here is an example of the first one sent when the subscription starts. I’m hoping someone has seen this issue before or perhaps sees what blunder I made in the query itself?

Here’s the query:

[
  {
    "$match": {}
  },
  {
    "$unwind": {
      "path": "$products",
      "preserveNullAndEmptyArrays": true
    }
  },
  {
    "$sort": {
      "products.paid": -1,
      "products.publishDate": -1
    }
  },
  {
    "$addFields": {
      "ttaCount": {
        "$size": {
          "$ifNull": [
            "$products.children",
            []
          ]
        }
      }
    }
  },
  {
    "$group": {
      "_id": "$_id",
      "_version": {
        "$first": "$_version"
      },
      "createdAt": {
        "$first": "$createdAt"
      },
      "updatedAt": {
        "$first": "$updatedAt"
      },
      "category": {
        "$first": "$category"
      },
      "duration": {
        "$first": "$duration"
      },
      "logoImageId": {
        "$first": "$logoImageId"
      },
      "institution": {
        "$first": "$institution"
      },
      "country": {
        "$first": "$country"
      },
      "region": {
        "$first": "$region"
      },
      "city": {
        "$first": "$city"
      },
      "title": {
        "$first": "$title"
      },
      "applicationUrl": {
        "$first": "$applicationUrl"
      },
      "jobBody": {
        "$first": "$jobBody"
      },
      "userId": {
        "$first": "$userId"
      },
      "isPublished": {
        "$first": "$isPublished"
      },
      "preventPublishOverride": {
        "$first": "$preventPublishOverride"
      },
      "bannerImageId": {
        "$first": "$bannerImageId"
      },
      "slug": {
        "$first": "$slug"
      },
      "currentListing": {
        "$first": "$products"
      },
      "ttaCount": {
        "$sum": "$ttaCount"
      }
    }
  },
  {
    "$lookup": {
      "from": "userProfiles",
      "localField": "userId",
      "foreignField": "_id",
      "as": "user"
    }
  },
  {
    "$unwind": {
      "path": "$user",
      "preserveNullAndEmptyArrays": true
    }
  },
  {
    "$addFields": {
      "live": {
        "$and": [
          {
            "$eq": [
              "$isPublished",
              true
            ]
          },
          {
            "$eq": [
              "$preventPublishOverride",
              false
            ]
          }
        ]
      }
    }
  },
  {
    "$project": {
      "_id": 1,
      "_version": 1,
      "createdAt": 1,
      "updatedAt": 1,
      "category": 1,
      "duration": 1,
      "logoImageId": 1,
      "institution": 1,
      "country": 1,
      "region": 1,
      "city": 1,
      "title": 1,
      "applicationUrl": 1,
      "products": 1,
      "jobBody": 1,
      "userId": 1,
      "isPublished": 1,
      "preventPublishOverride": 1,
      "bannerImageId": 1,
      "slug": 1,
      "live": 1,
      "ttaCount": 1,
      "user.email": 1,
      "currentListing": 1
    }
  },
  {
    "$match": {}
  },
  {
    "$sort": {
      "updatedAt": -1
    }
  },
  {
    "$skip": 0
  },
  {
    "$limit": 10
  }
]```

There are topics here about questionable bandwidth with Atlas. You might want to search for it.

We moved our publications requiring aggregations to methods to improve our app performance but we have not encountered this issue with Atlas. Our boxes are on the same region as Atlas (both AWS)

1 Like

Thanks. @rjdavid! I have been looking at various Atlas posts and the ones that also have DO are a bit troubling. I have zero experience with AWS for hosting meteor apps, but there might be a need to learn it, if I can’t get to the bottom of why this strange behaviour is happening. It just feels like there is some tuning I need to do on the server or the aggregating query.

Also one of the goals was to have a reactive admin, and we achieved that. We can move to methods rather than publications but I would rather not. I wonder if this is a publication issue rather than the query that is inside the publication. Some kind of retrying that is going on?

My suspicion is that the observers being used to ensure reactive-aggregate is reactive are too coarse. Consider a collection of customer orders. There may be thousands of customers, but your user is only interested in the current total value of the orders from customer “Bob”. The aggregation pipeline has a $match on customer “Bob”, together with the stages needed to compute and reshape the output document(s) for the user to consume.

The simplest way to write the publication will be something like:

Meteor.publish("orderValue", function () {
  ReactiveAggregate(this, Orders, pipeline);
});

or

Meteor.publish("orderValue", function () {
  ReactiveAggregate(this, Orders, pipeline, {
    observers: [ Orders.find() ]
  });
});

The problem with this is that any changes to Orders (like other customers’ orders changing) will cause the aggregation pipeline to re-run. If the Orders collection is “busy”, this will generate a constant churn of aggregation re-runs.

What you really want to do to mitigate this is to only re-run when Bob’s orders change:

Meteor.publish("orderValue", function () {
  ReactiveAggregate(this, Orders, pipeline, {
    observers: [ Orders.find({ customer: "Bob" }) ]
  });
});

Is that a possible scenario?

3 Likes

Hi @robfallows. Thanks for the feedback.

We haven’t specified any Observers. This was because the search is for the entire collection (when it first runs) but it only shows the first 10 records according to skip and limit. What would be ideal would be for the Observer to watch just the records currently sent to the client but not sure this is possible.

Do you think that such an aggregate query is a poor candidate for reactivity? The use case seems very admin-like, but maybe we have to forego reactivity and use a method for the query. I hope not.

No - you’re correct. The cursor would have to be on the results of the pipeline, which isn’t available until the pipeline completes.

Perhaps, or it could be that the data architecture is not structured appropriately for a collection with busy data and reactivity - only you can answer that.

However, tunguska-reactive-aggregate gives you some control over the reactivity, which may help you in this situation. Check the README for more information.

  1. Use the debounceCount and/or debounceDelay options to control how frequently the aggregation re-runs. You may use them independently or in combination. Using these is probably the simplest way to change your code, reduce the number of re-runs and still have reactivity:

    • debounceCount: An integer representing the number of observer changes across all observers before the aggregation will be re-run. Defaults to 0 (do not count) for backwards compatibility with the original API. Can be used in conjunction with debounceDelay to fine-tune reactivity. The first of the two debounce options to be reached will re-run the aggregation.
    • debounceDelay: An integer representing the maximum number of milli-seconds to wait for observer changes before the aggregation is re-run. Defaults to 0 (do not wait) for backwards compatibility with the original API. Can be used in conjunction with debounceCount to fine-tune reactivity. The first of the two debounce options to be reached will re-run the aggregation. Used on its own, this will re-run the aggregation every debounceDelay ms. (I have just realised I can add another optimisation here, but that will need another release, which won’t be today!)
  2. Non-reactive Aggregations. Probably not what you want, but may be useful if you want the behaviour of a Meteor Method without lots of refactoring.

  3. On-demand Aggregations. By observing an independent collection, you can trigger a re-run by updating that instead of observing the collection(s) you are aggregating.

2 Likes

Thanks, @robfallows for this detailed and comprehensive way to handle the situation. Very helpful indeed! I will be trying all of them. #3 is intriguing and I will have think about that kind of external control.

2 Likes