"Transactional publishing" so client always sees a consistent state of subscribed data

In my Meteor app (a research prototype of an enhanced spreadsheet tool), changes to data occur in transactions, and I’ve implemented a technique I call “transactional publishing” that ensures that client code only ever sees consistent states of its subscribed data (i.e., states between transactions). I’m writing about the technique here in case anyone else wants a similar thing. For now, you can copy and paste code from my app (mainly this commit), but if there’s enough interest, I may look into making an Atmosphere package.

The details:

In the past, I had lots of problems with the client code crashing when it operated on inconsistent data (primarily, when one document referred to another that was inserted in the same transaction but wasn’t yet visible on the client) and had added artificial code to recover gracefully in these cases, but this was ugly and masked real bugs in the server that left inconsistent data, and there was no way to be sure I had covered all inconsistencies that might affect the client. A few months ago, after adding undo support to the app using the babrahams:transactions package and learning a lot more about how Meteor collections work in the process, I was inspired to try to find a real solution to the inconsistent data. The (currently) mild performance requirements of the app made the task easier: we only ever run a single replica of the server, transactions on each spreadsheet hosted on the server are linearized using an in-memory lock, the data is small enough that the server can keep a full copy in memory, and we don’t use client-side stub method implementations.

In a normal Meteor app, the server sends a stream of individual document updates to the client over the DDP connection. To implement transactional publishing on top of the existing design of DDP, I had the client subscribe to a special document that indicates whether the server is in the process of committing a transaction and buffer all updates from a transaction until the transaction is finished. (Since I was using babrahams:transactions anyway, I just had the client look for a transaction marked as pending, but I could easily have implemented my own signaling mechanism if I needed to.) The client application logic uses a set of “mirror” minimongo collections, which are updated from the real client collections at the end of each transaction in a single operation with no yields and with observers on the mirror collections paused, so the update appears atomic to the application logic.

When the server commits a transaction, it marks the transaction as pending, performs the data updates, and then marks the transaction as done. For the approach to work, the client needs to see the data updates as coming between the updates to the transaction document that indicate its start and end. But in Meteor, there are two ways that updates can get out of order that I had to prevent:

  • When a write is made to a Mongo collection on the server, observers (including those used internally by Meteor for cursors returned by publish functions) are notified asynchronously, when the write makes it to the oplog tailer. So instead of publishing directly from the real Mongo collections, I used a set of wrapper minimongo collections, since the data is small enough that we can keep a copy in memory. Minimongo collections notify their observers synchronously, so the updates are enqueued on the DDP connection in the order they occur, modulo a bug in minimongo that is monkeypatched in my app.
  • By default, the DDP client batches incoming updates that arrive close in time and applies them to the client collections with observers paused. (This is the same pattern I implemented for the mirror collections, but based on nondeterministic timing rather than transactions and motivated by performance rather than data consistency.) When the observers are resumed, they may be notified of updates in a different order than the updates arrived. If I had a clean way to identify the end of the observer notifications for a single batch, then I could defer processing until that point (to have a reliable indication of whether a transaction is still pending before updating the mirror collections). But this approach wouldn’t let me detect whether the data updates in a batch came outside of a transaction (which indicates a bug in the transactional publishing mechanism and should be logged), and it leaves a theoretical possibility of indefinite postponement if a steady stream of transactions occurs on the server and all of the batch boundaries occur in the middle of transactions. So instead, I just disabled batching by setting Meteor.connection._bufferedWritesInterval = 0.

There was one final tricky issue: the DDP client tries to make the process of reconnecting to the server and reestablishing subscriptions transparent, but I needed to be able to detect the beginning and end of that process in order to defer updates to the mirror collections, so I wrote an onReconnect hook that cancels and reestablishes all subscriptions so I get a new set of ready callbacks.

The above was enough to get transactional publishing working for our app with mild performance requirements. I’ve been able to remove the code that recovers from inconsistencies in the subscribed data, but we still have some similarly ugly problems where data that is currently showing on the client gets deleted and the template tries to re-render and hits an exception before another autorun has a chance to stop it; I haven’t yet investigated if there is something we should be doing differently to prevent that.

I imagine that transactional publishing can be applied in the general case with more work:

  • Instead of keeping a minimongo copy of all data on the server for synchronous publishing, use write fences to ensure that the transaction start, data updates, and transaction end are published in the correct order.
  • If the server allows multiple transactions affecting the data view of a single client to commit concurrently, it just has to enforce quiescent states often enough to provide reasonable liveness of data or otherwise implement a custom mechanism to publish consistent states to the client.
  • If the server has multiple replicas running on the same MongoDB, one needs to ensure that one replica maintains order when publishing the start, content, and end of a transaction committed by another replica. While the oplog tailer is asynchronous, it looks like the implementation may preserve the order of updates for queries that are simple enough that they can be updated from the information in the oplog without re-executing the query; I haven’t confirmed this. So as long as all publications are of such “simple” queries, this setup may work. But if you’re trying to achieve transaction isolation on a multi-replica server, you probably need additional mechanisms that may also help you in some way with transactional publishing. :slight_smile:
  • For client-side stub method implementations, a simple design would be to layer the temporary writes from the stubs on top of the transactional data view. My quick assessment is that the only practical way to implement this would be to move the transactional update batching into the DDP client itself, whether in the form of a replacement package or a large set of monkeypatches. Of course, allowing dumb automatic merging of temporary stub writes with external data updates will probably defeat the end goal of always seeing a consistent state of the data. If it’s possible to spec out stronger semantics, it may be possible to implement them. :slight_smile:

This is a pain, and I’d rather be using a web application framework that has all the strengths of Meteor plus transaction isolation out of the box, although whether I’d migrate the existing app at this point depends on how much work the migration is. Let me know if such a framework exists!

I’ll be happy to accept feedback and answer questions here.

Interesting.

I had a similar problem. To solve it I made use of Mongo Bulk Operations:

  1. Copy operational collections to a private mirror / scratch collection that is not published using Bulk operations.
  2. Perform all my intermediate calculations in the private mirror / scratch collection.
  3. When intermediate calculations were completed I used Mongo bulk operations to move the data back to the published collections.

From meteor’s perspective, no data was published until bulk operations were complete. So all my data was consistent on the client.

I am not sure what the perf issues may be, but this allowed me to complete intermediate operations without worrying about publishing the intermediate results (which was causing my client to do extra few repaints).

I would be also be interested in any feedback.

Anything that shortens the time interval of the writes to the published collections will help increase the likelihood that the DDP client groups the updates into a single batch. But do Mongo bulk operations actually give you any additional guarantees compared to ordinary collection writes in terms of the way they are processed by the oplog tailer and ultimately delivered to clients? Your technique may reduce occurrences of inconsistent data or completely eliminate them in practice depending on the timing parameters of your system, but I doubt it guarantees semantically that clients won’t see inconsistent data.

thanks for the feedback @mattmccutchen. I hadn’t really examined my solution deeply, I just noticed that it worked great when I did this. So I am using this thread to research my solution…

Good point. I did not know how meteor processes the oplog. And that could be a problem. So I took a look.

Oplog handling

It seems that meteor will process the entire oplog unless the number of oplog operations exceeds a max threshold. That max threshold is setable via an environment variable: METEOR_OPLOG_TOO_FAR_BEHIND. The default is 2000 operations, which is 1000 more operations than the max of a Bulk operation (see below).

I also found this issue #2668 that looks like it was the motivation for the creation of the METEOR_OPLOG_TOO_FAR_BEHIND feature. And it outlines the pitfalls of using oplog tailing for very large #'s of document operations.

Bulk Operations

I found this article and this Mongo documentation on how bulk operations work. The summary:

  • Bulk operation commands max out at 1000 operations for a single DB operation.
  • Bulk operations cannot mix operation classes to be executed to completion on the Mongo server side.
  • If your scenario is less than 1000 operations then the server operations will happen as one operation from the server perspective.

In my case, for publishable collections I only use Bulk insert class of operations. Typically, this will create 100’s of document updates not 1000’s. (I read in data from an external source to a scratch collectin, update the documents in the scratch collection with some extra processing then bulk insert the documents into a meteor published collection.)

Implications?

I think what this implies is:

If you have less than 1000 operations in your Bulk operations then this can be viewed as a single operation from Mongo DB to server to client in Meteor. (Server performance processing the oplog fast enough is another issue)

My Case

For my case, I instrumented the DDP updates on my client. Without the bulk operations I would get intermediate results on my subscriptions. With Bulk operations ~100 Bulk operations would appear on my client as a single subscription update.

For my scenario this appears to work. And now that I researched it a bit I am more confident about why it works :slight_smile:.

But it certainly would not satisfy large document modifications > 1000. And even 100’s of updates could cause the server to choke processing the Oplog depending on how a scenario would work.

Interested in any feedback…
Thanks!

To be clear, this may work every time in your setup, which may be all you need, but it’s not a guarantee like my (much more invasive) technique provides. I don’t know exactly what the oplog tailer does for Mongo bulk operations, but at some point a separate DDP message has to be sent for each updated document because DDP doesn’t have any concept of a bulk operation. By default, the DDP client will buffer incoming updates into a single batch for up to 500 ms as long as there is no 5 ms gap in the incoming stream (parameter definitions). If there’s a network blip, for example, that could cause a gap in the stream that would cause the client to process the updates until that point as a batch and see intermediate data. (Unless the entire set of DDP messages corresponding to the bulk operation is somehow indivisible at the network level, which I highly doubt.)

thanks @mattmccutchen.

Gotcha. I was thinking about my response and I did not really understand how DDP decides how much to send to the client. Your explanation of how it works would point to problems with my solution under quite a few scenarios. All I would need to do is blow out the 500ms DDP update window.

I also realized that my usage scenario is not as sensitive to incomplete collection data as it is to intermediate collection data (which I solve by putting into the scratch collection).

Thanks again for your feedback.