Advanced Publications / Reactive Joins In Meteor

First of all, I found this post which has been very helpful. However, it was posted nearly three years ago and I was wondering if best practices have been established or changed since.

The problem I’m facing is pretty straight forward: For each post, I have a comment section. I need to only publish users that have commented on that particular post (publishing all users to the client won’t scale).

Is the best way to approach this still in the above post?

The most common way is to use this package https://atmospherejs.com/reywood/publish-composite

2 Likes

Do something like this:

server side:

Meteor.publish("reports-progress-count", function (id, selector) {
  var self = this;
  var count = 0;
  var initializing = true;

  // observeChanges only returns after the initial `added` callbacks
  // have run. Until then, we don't want to send a lot of
  // `self.changed()` messages - hence tracking the
  // `initializing` state.
  var handle = accounts.find(selector).observeChanges({
    added: function (id) {
      count++;
      if (!initializing)
        self.changed("progressCounts", id, {count: count});
    },
    removed: function (id) {
      count--;
      self.changed("progressCounts", id, {count: count});
    }
    // don't care about changed
  });

  // Instead, we'll send one `self.added()` message right after
  // observeChanges has returned, and mark the subscription as
  // ready.
  initializing = false;
  self.added("progressCounts", id, {count: count});
  self.ready();

  // Stop observing the cursor when client unsubs.
  // Stopping a subscription automatically takes
  // care of sending the client any removed messages.
  self.onStop(function () {
    handle.stop();
  });
});

on the client:


Template.reportsProgress.onCreated(function () {
   this.autorun(() => {
        const id = 'Any_ID';
	this.subscribe('reports-progress-count', id, selector);
   });
});

...
  const id = 'Any_ID';
  let progress = ProgressCounts.findOne({_id: id});
1 Like

I have been trying to use reactive joins for a while now, especially using cultofcoders:grapher, which in turn uses reywood:publish-composite when dealing with reactive data. However, having a subscription spawn multiple mini-subscriptions seems to be making the data fetching really slow when I get a lot of data to display.

I just tried another approach by digging deeper in advanced publications and I’d like to share the idea to get some feedback. I hope it might help others too. The idea is simply to embrace the fact that the lower level publication system allows returning anything as the result set.

In my example, Shifts collection contains documents relative to a worker’s shift times, including a startAt date. A shift can contain a serie of activities stored in the Activities collection that are not fixed in time; the only way to know the date at which they’ve taken place is by looking at their parent shift’s dates. This is by design!

However, sometimes, I only need activities data, without shifts, but for a specific date range, which activities know nothing about. So, I observe changes to the shifts, but return activities data instead.

On the server

import { Meteor } from "meteor/meteor";
import { check } from "meteor/check";
import { Activities, Shifts } from "../../collections";
import { startBetweenDates } from "../../collections/Shifts/selectors";

const ACT_FIELDS = {
  shiftId: 1,
  hoursDuration: 1,
  createdAt: 1,
};

function activitiesInDates({ minDate, maxDate }, collName = "Activities") {
  check(minDate, Date);
  check(maxDate, Date);
  check(collName, String);

  /* Set a flag to bypass initialization step for each shift added initially */
  let initialized = false;

  /* To keep track of the ids of all shifts in the date range. */
  let shiftIds = [];

  const shiftsObserver = Shifts.find(
    { $and: startBetweenDates(minDate, maxDate) },
    { fields: { _id: 1, startAt: 1 } }
  ).observeChanges({
    added: (shiftId) => {
      /* Update list of shift ids */
      shiftIds.push(shiftId);

      /* If a shift is added in the date range after initialization,
       * immediately find its activities and add them individually
       * to the published set. Initial shifts will be treated later. */
      if (initialized) {
        Activities.find(
          { shiftId },
          { fields: ACT_FIELDS, sort: { createdAt: -1 } }
        ).forEach((act) => this.added(collName, act._id, act));
      }
    },

    removed: (shiftId) => {
      /* Update list of shift ids */
      shiftIds = shiftIds.filter((itm) => itm !== shiftId);

      /* Find all activities related to the removed shift
       * and individually remove them from the published set. */
      Activities.find({ shiftId }, { fields: { _id: 1 } }).forEach((act) =>
        this.removed(collName, act._id)
      );
    },
  });

  /* All initial shifts have been stored in the `shiftIds` list.
   * For each of them, find their associated activities and
   * add them individually to the published set */
  if (!initialized) {
    Activities.find(
      { shiftId: { $in: shiftIds } },
      { fields: ACT_FIELDS, sort: { createdAt: -1 } }
    ).forEach((act) => this.added(collName, act._id, act));

    /* Change the flag to modify `added` callback behavior */
    initialized = true;
  }

  /* Track changes to the activities included in the tracked shifts. */
  const activitiesObserver = Activities.find(
    {}, // Don't specify { shiftId: { $in: shiftIds } } here since it wouldn't be reactive
    { fields: ACT_FIELDS }
  ).observe({
    /* Need to `observe` entire document to validate `shiftId` */
    added: (act) => {
      if (initialized && shiftIds.includes(act.shiftId)) {
        this.added(collName, act._id, act);
      }
    },

    changed: (act) => {
      if (initialized && shiftIds.includes(act.shiftId)) {
        this.changed(collName, act._id, act);
      }
    },

    removed: (act) => {
      this.removed(collName, act._id);
    },
  });

  this.ready();

  /* Stop the observers if the subscription is stopped */
  this.onStop(() => {
    shiftsObserver.stop();
    activitiesObserver.stop();
  });
}

Meteor.publish("activitiesInDates", activitiesInDates);

On the client

import React from 'react';
import { withTracker } from 'meteor/react-meteor-data';
import { Meteor } from 'meteor/meteor';
import { Mongo } from 'meteor/mongo';

/* Collection name to which to publish documents is passed
 * as an argument to the publication to ensure consistency. */
const COLL_NAME = 'weekActivities';

/* Local only collection that is used only in this context */
const WeekActivities = new Mongo.Collection(COLL_NAME);

const wrapper = withTracker(({ minDate, maxDate }) => {
  const loading = !Meteor.subscribe(
    'activitiesInDates',
    { minDate, maxDate },
    COLL_NAME
  ).ready();

  const activities = loading ? [] : WeekActivities.find().fetch();

  return { loading, activities };
});

const SchedulePlannerW = ({ loading, activities }) => {
  console.log({ loading, activities });
  return <div>Test de publication avancée en cours...</div>;
};

export default wrapper(SchedulePlannerW);

It seems to be working really well so far, but I’d be glad to get some feecback on this kind of publication, since I’ve not seen it so often in the forums or the documentation.

1 Like