Mongo change stream for pub/sub

I’ve got a working Apollo subscription using Mongo’s change stream, which I put into an async iterator. The strange thing is, my async iterator needs to nest its result into the subscription field, so instead of

{ 
  value: { 
    _id, 
    someField
  },
  done
}

I needed to do this:

{
  value: {
    someSubscription: {
      _id,
      someField
    }
  },
  done
}

I would want this nesting to be handled by my GraphQL server. I should be able to use the same async iterator for different subscriptions, regardless of the subscription field name. Perhaps I can wrap with another function that transforms the result (basically enhancing the schema stitching). I tried using resolve field to do the transform, which should work, but I was still getting null results in my graphql client.

I searched this forum, I haven’t found elsewhere usage of Mongo as a data source for graphql subscription, and also I see mostly references to Apollo’s PubSub thingy for implementing an async iterator.

Here’s my async iterator:

import { MongoClient } from 'mongodb'

function getMyMongoDataIterable () {
  const client = new MongoClient(process.env.MONGO_URL, { useNewUrlParser: true })
  let collection, stream

  return {
    next: async () => {
      // check our connection
      const connected = client.isConnected()
      if(!connected) {
        // change stream requires Mongo 3.6+ using wiredTiger storage engine and replication
        await client.connect({ replicaSet: 'rs0' })
        collection = client.db('meteor').collection('my-collection')
        // watch and get fullDocument field for any change
        stream = collection.watch({ fullDocument: 'updateLookup' })
      }
      const result = await stream.next()
      // actually I'm not sure in what cases this would be 'done'
      const done = result === null
      if(done) {
        client.close()
      }
      return {
        value: {
          // I shouldn't need to do this. 
          // (Why would this function need to know name of subscription field?)
          someSubscription: {
            ...result.fullDocument
          }
          // This would make more sense to me
          // ...result.fullDocument
        },
        done
      }
    },
    throw: () => {
      client.close()
      return (console.log('iterable error'))
    },
    // Requires Node 10+
    [Symbol.asyncIterator]() {
      return this
    }
  }
}

And in my resolvers:

{
  Subscription: {
    someSubscription:  {
      subscribe: () => getMyMongoDataIterable()
    }
  }
}

If you see any issues with this I’d appreciate the feedback.

2 Likes

This is a bit simpler with an async generator, which takes care of turning it into an iterator (i.e., with next() that returns { value, done }).

async function* getMyMongoDataIterable () {
  // [edit: in reality you would want Mongo client declared outside of the generator]
  const client = new MongoClient(process.env.MONGO_URL, { useNewUrlParser: true })
  let collection, stream
  const connected = client.isConnected()
  if(!connected) {
    await client.connect({ replicaSet: 'rs0' })
    collection = client.db('meteor').collection('my-collection')
    stream = collection.watch({ fullDocument: 'updateLookup' })
  }
  const result = await stream.next()

  yield ({
    someSubscription: {
      ...result.fullDocument
    }
  })
  // [edit: where/when to client.close()?]
}

const resolvers = {
  Subscription: {
    someSubscription:  {
      subscribe: getMyMongoDataIterable
    }
  }
}

[Edit: Whoops, I think this creates a new Mongo client every time. It also doesn’t close the connection. The approach could be used however with Meteor’s Mongo client, which manages the connection, presuming rawCollection() allows you to use change streams.]