[SOLVED] Transactions with MongoDB + Meteor Methods

Hi everybody.

I’m here to share my solution of utilizing Mongo DB transactions with Meteor 1.8.
And just as important - how it can be used in Meteor Methods (e.g. returning a value from a transaction to the client). I’ve learned it’s not as simple is it may sound so I wanted to share the solution, I hope it’ll be useful for others.

Side Note:
I use the term my solution not as a pride of ownership but to clarify that it’s not a well-tested solution. It’s my personal way of solving this riddle with Meteor. Hopefully with time we will have a cleaner, well-tested, standard approach to enjoy transactions in Meteor. That’s why I preferred to not wrap this into a small library and rather make sure people are aware of what’s happening under the hood so when the time comes and the technology matures in this area they can easily adapt their code.

Intro: What’s the big deal?
2 months ago I started developing a meteor project. Pretty quickly I realized that I have to use transactions in my app and how lucky I felt when I found out that Meteor’s latest version (at the time was 1.8) supports Mongo 4 and that Mongo 4 supports transactions :hear_no_evil:! woohoo :beers: ! Right? Well, kind of… yes. But there are a few obstacles.

The Obstacles
To begin with, Meteor collection objects (Mongo.Collection) don’t support the transactions API, which means we have to access the raw collection underneath and working directly with raw collections introduces few obstacles:

1. Missing Documentation - first practical problem is that documentation is missing around that subject and it’s challenging (at the time of writing) to find a complete, working example of how to correctly execute transaction when working with the raw collection in Meteor.

2. Different ID Generation Mechanisms - Meteor uses a different ID generation (string) than Mongo’s default (ObjectID), so using Meteor collections as well as working with its raw collections directly may lead to inconsistency in the database (objects will have different _id values if inserted both via collections and raw collections).

3. Attached SimpleSchema Doesn’t Run - this one is more specific for meteor-collection2 users. If you attach a SimpleSchema to your Meteor collection, you will not be pleased to know that it’s not attached to the raw collection. It basically means that when you’re working with raw collections - you get no object validation against the schema, no object cleaning, etc. (what you do get if you access Meteor collection).

4. Transactions in Methods Doesn’t Return Value/Error To The Client - Mongo transactions are async and you can’t run async operations in a sync Method and expect it to return its value to the client. The method will return before waiting for your async call to return.

Sorry to be the party pooper but I suggest you put the beers back in the fridge :see_no_evil:
Am I the only one who faced all of these problems? not much information online at the time of writing.
So without further ado…

The Solutions

1. Basic Transaction Example in Meteor

import { MongoInternals } from 'meteor/mongo';

...

// finding item in collection1, inserting result to collection2
// both collection1 and collection2 are Meteor Collection objects
const transactionExample = async (collection1, selector, collection2) => {
    const { client } = MongoInternals.defaultRemoteCollectionDriver().mongo;
    const session = await client.startSession();
    await session.startTransaction();
    try {
        const item = await collection1.rawCollection().findOne(selector, { session: session });
        // here you may want to check item value before proceeding
        const response = await collection2.rawCollection().insertOne(item, { session: session });
        // here you may want to check insertion response
        await session.commitTransaction();
    } catch (err) {
        await session.abortTransaction();
        // possibly add: console.error(err.message);
    } finally {
        session.endSession();
    }
}

2. Achieving Document ID Consistency
To ensure you have a consistent _id for all documents you have two options:

  • Option A: Mongo-like ID consistency - set Meteor to generate Mongo Object ID as _id for all collections, in case you’d want to use transactions and for a general consistency in your database (look for idGeneration in the docs).

  • Option B: Meteor-like ID consistency - pass a string value as the _id property in all raw collection insertions.

Having started with option A I advise you to go with option B :clown_face: .
The problem with option A is that when Mongo ID generation is used by Meteor it’s not really generating an identical ID object as Mongo does (i.e. the raw collection) and I found out that a Meteor-generated Object ID can’t be used as a selector when working with raw collections.

As long as a document was created with Meteor collection (i.e. Meteor generated its ID), calling:
await collection.rawCollection().findOne(item._id, { session: session });
will return nothing, no matter how you construct the selector ( { _id: item._id }, etc. ).

In fact an item containing a Meteor-generated Mongo.ObjectID _id value that is passed as a selector to any raw collection queries will actually filter the results out because of it. This probably can be reported as a bug in Meteor repo.

A cleaner solution (Option B) then would be to add a string id before every raw insertion:

const itemWithID = { ...item, _id: new Mongo.ObjectID()._str };
await collection.rawCollection().insertOne(itemWithID , { session: session });

3. Utilizing SimpleSchema - you need to clean and/or validate your objects manually before raw collection insertions:

import SimpleSchema from 'simpl-schema';

...

// optionally define your SimpleSchema with cleaning options
let cleanOptions = { filter: true, autoConvert: true, removeEmptyStrings: true, trimStrings: true };
schema = new SimpleSchema( { /* your schema defs */ } , { clean: cleanOptions });

// attach Simple Schema to a Meteor collection
collection.attachSchema(schema);

// calling collection.insert(..) will now automatically clean and validate
// when working with the raw collection, we need to clean and validate manually

... 

// within a transaction, before every insertion:

// 1. clean the item (if desired)
let cleanOptions = { filter: true, autoConvert: true, removeEmptyStrings: true, trimStrings: true };
let cleanItem = await collection.simpleSchema().clean(item, { ...cleanOptions, modifier: false });

// 2. validate the clean item against the schema (fail insertion on validation failure)
await collection.simpleSchema().validate(cleanItem, { modifier: false });

// 3. adding string ID to avoid the Mongo's ObjectID generation
cleanItem = { ...cleanItem, _id: new Mongo.ObjectID()._str };

// at last, insert the clean, validated item
await collection.rawCollection().insertOne(cleanItem, { session: session });

...

4. Methods :male_detective:
By now we have the transaction code sample, we have a consistent _id generation across raw collections and we can even utilize SimpleSchema.
The last missing piece in the puzzle is to be able to utilize all of this in a Method in a synchronized manner, i.e. to return a transaction value (or a transaction failure error) to the client.

Option A: Async Method + Async/Await

UPDATE: thanks to @robfallows’s input I added this option which is simpler than Option B. Unfortunately it’s not officially documented at the time of writing and I didn’t find many good examples of it except for his great article. I have used both option A and option B in production for several months now and they both work flawlessly

As @robfallows describes, you can decalre an async method and execute your transaction as you’d expect with async/await:

Meteor.methods({
    async doSomething() {
        // just put your transaction here
    }
});

To keep our methods clean we can define a utility function that will run our mongo operations within a transaction context:

import { MongoInternals } from 'meteor/mongo';

...

// utility async function to wrap async raw mongo operations with a transaction
const runTransactionAsync = async asyncRawMongoOperations => {

    // setup a transaction
    const { client } = MongoInternals.defaultRemoteCollectionDriver().mongo;
    const session = await client.startSession();
    await session.startTransaction();
    try {
        // running the async operations
        let result = await asyncRawMongoOperations(session);
        await session.commitTransaction();
        // transaction committed - return value to the client
        return result;
    } catch (err) {
        await session.abortTransaction();
        console.error(err.message);
        // transaction aborted - report error to the client
        throw new Meteor.Error('Database Transaction Failed', err.message);
    } finally {
        session.endSession();
    }
};

And then use it as follows from our async method:

import { runTransactionAsync } from '/imports/utils'; // or where you defined it

Meteor.methods({
    async doSomething(arg) {
        // remember to check method input first

        // define the operations we want to run in transaction
        const asyncRawMongoOperations = async session => {

            // it's critical to receive the session parameter here
            // and pass it to every raw operation as shown below
            const item = await collection1.rawCollection().findOne(arg, { session: session });
            const response = await collection2.rawCollection().insertOne(item, { session: session });

            // if Mongo or you throw an error here runTransactionAsync(..) will catch it
            // and wrap it with a Meteor.Error(..) so it will arrive to the client safely

            return 'whatever you want'; // will be the result in the client
        };

        let result = await runTransactionAsync(asyncRawMongoOperations);
        return result;
    }
});

Option B: Meteor.wrapAsync to the Rescue!

We are going to utilize Meteor.wrapAsync(func, [context]) in order to wrap our async transaction execution when used from within a Meteor method. Also around this I feel there is a lack of examples in the documentation however this thread is already long enough so I will not explain the not-very-intuitive syntax.

See below how I use a utility function called runTransaction to execute my raw collection opertaions. This function receives an async function with the desired operations to run, it first wraps it with another async function that sets up a transaction and runs the input operations from within that transaction. Using the wrapAsync it then executes this in a synchronized manner. Yes, I know it’s confusing.

import { MongoInternals } from 'meteor/mongo';

...

// utility function to make an async transaction synchronized
const runTransaction = asyncRawMongoOperations => {

    // getting Mongo's client to setup a transaction
    const { client } = MongoInternals.defaultRemoteCollectionDriver().mongo;

    // define an async function named 'asyncExecution' that:
    // 1. sets-up a transaction
    // 2. runs asyncRawMongoOperaions function parameter within the transaction
    // 3. returns asyncRawMongoOperaions results or reports a transaction error
    let asyncExecution = async function(client, callback) {
        let error = null, result = null;
        const session = await client.startSession();
        await session.startTransaction();
        try {
            // running the async operations
            result = await asyncRawMongoOperations(session);
            await session.commitTransaction();
        } catch (err) {
            // abort transaction and set a Meteor.Error, this will be returned to the client
            await session.abortTransaction();
            console.error(err.message);
            error = new Meteor.Error('Database Transaction Failed', err.message);
        } finally {
            session.endSession();
        }
        callback(error, result); // return values to the client, see Meteor.wrapAsync docs
    };

    // make asyncExecution synchronized
    let syncExecution = Meteor.wrapAsync(asyncExecution);

    // run syncExecution and return result to to the caller
    return syncExecution(client);
};

Then we can use this utility function to define and execute our raw mongo operations from within a Method (usage very similar to Option A but in a method which is not declared async):

import { runTransaction } from '/imports/utils'; // or where you defined it

Meteor.methods({
    doSomething(arg) {
        // remember to check method input first

        // define our transaction operations
        const asyncRawMongoOperations = async session => {

            // it's critical to receive the session parameter here
            // and pass it to every raw operation as shown below
            const item = await collection1.rawCollection().findOne(arg, { session: session });
            const response = await collection2.rawCollection().insertOne(item, { session: session });

            // if Mongo or you throw an error here runTransaction(..) will catch it
            // and wrap it with a Meteor.Error(..) so it will arrive to the client safely

            return 'whatever you want'; // will be the result in the client
        };

        // where the async magic happens, synchronously ;) 
        return runTransaction(asyncRawMongoOperations);
    }
});

Throwing Custom Error
If you use these utility function/s I posted (runTransactionAsync / runTransaction), note they catch every error thrown within the transaction but expect an object similar to Mongo Error so if you want to throw a custom error message from within your transaction code, make sure you adapt Mongo Error structure (e.g. { message: 'your error message' }) or modify the catch clause to fit your needs. Keep in mind you probably do want to catch Mongo Error there in case anything fails in Mongo.

Conclusion
Now you can go get the beers… go ahead and grab one for me too if you don’t mind :crazy_face:.

If someone falls down the same rabbit hole as I did, I can share the transaction API (utility functions) that I use in order to automatically:

  1. clean and validate selectors and modifiers (SimpleSchema) on any insert/update.
  2. check Mongo’s responses after every raw operation, and throw error if needed.
  3. assign string _id on every insert.

So that every time I need to use a transaction I focus only on the actual operations I need to do, avoiding all the pitfalls on the way.

P.S.

  1. I didn’t run the exact code that’s pasted here because I use it from within utility functions and I preferred to share a more clear and simple version of it. If something doesn’t work feel free to let me know.

  2. I will be happy if other people can help simplifying or improving these solutions or have any comments whatsoever.

Cheers.

23 Likes

I think that’s a great summary of the issues with using rawCollection. However, there is an opportunity for simplification related to your point 4

You definitely can do this. If you declare an async method:

Meteor.methods({
  async myMethod() {
    // ...
  }
});

Meteor will use a Fiber to wait for the result of a Promise (or await) before returning the result to the client.

Check this article - you may want to skip down to the section on async/await.

7 Likes

Thanks @robfallows!
I had no idea it was possible. Added as Option A for running in Method! thanks.

Not surprised to see that googling ‘async method meteor’ doesn’t provide any links to the official docs. An accepted answer in stackoverflow also hints to go down the ‘wrapAsync’ road.
Do you mind sharing how you came to find out about this elegant approach?

It’s something that should have worked from 1.3-ish (support for async/await) - see here for the GitHub issue.

1 Like

Great write-up. I wish there was a simpler way :slight_smile:

1 Like

When using new Mongo.ObjectID()._str, is Mongo checking the database to avoid duplicates? I recognize that there is a probably a better chance of winning the lottery every day from now until the day I die than accidentally creating a duplicate. If there is no guarantee, is there a good reason not to use the same method Meteor uses internally, which I believe is Random.insecure.id(), which uses all letters, upper and lower, and numbers?

Mongo will fail attempts of duplicate insertions, in that sense it does guarantee that there are no duplicate values for _id field per collection, see here.

As for switching to Random.insecure.id() - I think it won’t make much of a difference because Mongo.ObjectID()._str will generate a random string - perhaps similar to using Random.insecure.id().

Worth mentioning that Meteor’s implementation for ObjectID generation is different than Mongo’s original implementation which is probably better than pure randomness.

1 Like

We had all the problems with transactions described above, but as we already had very large codebase we just couldn’t go with the rawCollection solution.

Basically, the only thing that would really work for us is session handling behind the scenes.

I’ve come up with potential solution using Meteor.EnvironmentVariable and patching MongoInternals.NpmModule.Collection which is mongodb driver class.

Since this is a low level solution, there are no problems with id generation and simple-schema validation works, too.

You can check the code and tests on github.

I think that since EnvironmentVariable is used also in DDP (_CurrentMethodInvocation), there should be no race conditions here. What I mean here is that in some cases session ids could get mixed up.

I’ve added some tests and if you have any suggestions what else to test, it would be very helpful.

Client test is implemented to simulate multiple users. Each user has its own DDP connection.
Using only Meteor.call() methods were executed sequentially (I assume this is the way DDP works), so I had to go with multiple connections.
That being said, client test sometimes fails because of the timeout and probability of failure increases with larger USER_COUNT numbers. It seems that there is some kind of a “hiccup” with DDP, but I’m really not sure.

I did some tests on our app locally and this appears to be working as expected. We are yet to test this on our staging environment.

An example of usage:

const Invoice = new Mongo.Collection('invoice');
const LineItems = new Mongo.Collection('lineItems');
const Payments = new Mongo.Collection('payments');

function insertInvoice(invoice, lineItems) {
    runInTransaction(() => {
        Invoice.insert(invoice);
        LineItems.insert(lineItems);

        const payment = createPaymentDoc(); 
        Payments.insert(payment);
    });
}
1 Like

@bhunjadi I was also experimenting with Meteor.EnvironmentVariable a few days ago but I forked mongo package and patched mongo_driver.js instead. There may be a problem with your solution: the callbacks. It seems reasonable that inside a transaction all callbacks should be called with either result argument (if the transaction was successful) or error argument. So they should be deferred until the transaction is ended. Now it’s easy to get “false positive” calls before a failed operation:

runInTransaction(() => {
  const id = Invoice.insert(invoice, (err, res) => {
    // This is called with `res` argument but it shouldn't
    console.log({ err, res })
  })
  Invoice.update(id, { $unset: { _id: 1 } }, (err, res) => {
    // This is properly called with `err` argument
    console.log({ err, res })
  })
})

I guess the desired callback behaviour depends on the use case. I’m not really sure if I would generally expect that the first callback returns an error, though. It would feel kind of weird to me that res does not match the returned id there.

What would these callback functions do in practice that they would expect error there?

How did you solve this in mongo_driver? Did you allow options to be passed in insert, update, etc?

I somehow ignored the fact that this is just how Mongo Driver works: the callbacks are fired immediately regardless of the transaction result. So we shouldn’t probably worry about it at all as the behavior must be consistent with the Driver’s one.

It would feel kind of weird to me that res does not match the returned id there.

It will but a bit later when the transaction is ended successfully. Imagine, just for example, that inside a callback somebody sends an email on a successful insert. With the current implementation, the email is always sent, even though the next operation can fail and the transaction can be aborted with no actual insert.
But like I said it’s a Mongo Driver “issue” (maybe not at all) so my initial comment was out of scope.

How did you solve this in mongo_driver ? Did you allow options to be passed in insert, update, etc?

I tried to do this on two levels:

  1. Expose session (or _session) option for collection methods for the developers who want to implement transactions on their own. It required minimal work but I haven’t finished it because of that “issue” with callbacks, as ClientSession object itself provides no promises, callbacks, or events that can be used to fire callbacks at the right time (after the transaction).
  2. Use Meteor.EnvironmentVariable to write a wrapper on top of p.1, just like you did. Here I did the following:
// inside my version of `runInTransaction`
create a promise
attach it to the session object
if (transaction is successful) resolve
else reject

// inside Mongo.Collection methods
if (session && callback)
  if (session.promise resolved) fire the callback with the command result
  else fire the callback with the error that stopped the transaction
else fire the callback immediately with the command result
1 Like

Meteor is wondering if there are plans to add an API to support mongodb transaction processing so that it can be used a bit more easily, anyone

1 Like

@bhunjadi Do you use your package in production? Is it stable enough?

Yes, we are running this in production for almost 4 months now with no issues.

However, when writing the package I found some cases where transactions are not working as expected. You can check these tests and see for yourself if this could affect you.
If you are using mat33:collection-hooks or some other package that uses this package to write to the DB, there might be some issues in this case.

Since we are not using MongoDB callbacks to write to database in parts of the application where we need transactions, we are not affected by this issue.

1 Like

Thanks!

Did you try these packages? Is it works properly with your package?

mikowals:batch-insert
aldeed:collection2
reywood:publish-composite

aldeed:collection2 - yes, it seems to be working, but this is only used for validation on top of Meteor’s mongo package
reywood:publish-composite - not using it directly, but it is a dependency of cultofcoders:grapher which we are using
mikowals:batch-insert - did not try

I looked at the batch-insert package source and I think it could work, but the best way is to check test it yourself.

My package patches mongo driver collection (MongoInternals.NpmModule.Collection) and a lot of use cases should be covered that way. From what I can see, batch-insert uses rawCollection() here which should be patched.

1 Like

@asafratzon Thank you for the guide! This is super helpful given the sparse documentation.

I’m having some trouble importing MongoInternals:

import { MongoInternals } from 'meteor/mongo';

is showing “Cannot resolve symbol ‘MongoInternals’”. I might be getting something simple wrong. I’m meteor v1.12 and mongodb v3.6.3 which according to the driver repo is the latest version.

I’ve looked around and can’t seem to find others having a similar issue. Any help is appreciated!

Hi @rogue_ai_0 .
This has to be related to your environment setup.
Try testing with a clean meteor installation and it should work out of the box.
You don’t need to install mongo npm driver on your own…
Hope it helps.

@asafratzon I’m seeing the same error message with a clean install, but it actually runs fine. I think it may be a quirk with the Intellij IDE. Thanks!

Hello, I’m trying to use transactions but I got this error:

W20210510-12:11:29.071(-5)? (STDERR) MongoError: Transaction numbers are only allowed on a replica set member or mongos
W20210510-12:11:29.072(-5)? (STDERR)     at MessageStream.messageHandler (/Users/diavrank/.meteor/packages/npm-mongo/.3.9.0.1d2m6we.wlzw++os+web.browser+web.browser.legacy+web.cordova/npm/node_modules/mongodb/lib/cmap/connection.js:268:20)
W20210510-12:11:29.072(-5)? (STDERR)     at MessageStream.emit (events.js:314:20)
W20210510-12:11:29.072(-5)? (STDERR)     at MessageStream.EventEmitter.emit (domain.js:483:12)
W20210510-12:11:29.072(-5)? (STDERR)     at processIncomingData (/Users/diavrank/.meteor/packages/npm-mongo/.3.9.0.1d2m6we.wlzw++os+web.browser+web.browser.legacy+web.cordova/npm/node_modules/mongodb/lib/cmap/message_stream.js:144:12)
W20210510-12:11:29.072(-5)? (STDERR)     at MessageStream._write (/Users/diavrank/.meteor/packages/npm-mongo/.3.9.0.1d2m6we.wlzw++os+web.browser+web.browser.legacy+web.cordova/npm/node_modules/mongodb/lib/cmap/message_stream.js:42:5)
W20210510-12:11:29.072(-5)? (STDERR)     at doWrite (_stream_writable.js:403:12)
W20210510-12:11:29.073(-5)? (STDERR)     at writeOrBuffer (_stream_writable.js:387:5)
W20210510-12:11:29.073(-5)? (STDERR)     at MessageStream.Writable.write (_stream_writable.js:318:11)
W20210510-12:11:29.073(-5)? (STDERR)     at Socket.ondata (_stream_readable.js:718:22)
W20210510-12:11:29.073(-5)? (STDERR)     at Socket.emit (events.js:314:20)
W20210510-12:11:29.073(-5)? (STDERR)     at Socket.EventEmitter.emit (domain.js:483:12)
W20210510-12:11:29.073(-5)? (STDERR)     at addChunk (_stream_readable.js:297:12)
W20210510-12:11:29.073(-5)? (STDERR)     at readableAddChunk (_stream_readable.js:272:9)
W20210510-12:11:29.073(-5)? (STDERR)     at Socket.Readable.push (_stream_readable.js:213:10)
W20210510-12:11:29.073(-5)? (STDERR)     at TCP.onStreamRead (internal/stream_base_commons.js:188:23)
W20210510-12:11:29.073(-5)? (STDERR)  => awaited here:
W20210510-12:11:29.073(-5)? (STDERR)     at Function.Promise.await (/Users/diavrank/.meteor/packages/promise/.0.11.2.8skplc.uqog2++os+web.browser+web.browser.legacy+web.cordova/npm/node_modules/meteor-promise/promise_server.js:56:12)
W20210510-12:11:29.073(-5)? (STDERR)     at imports/api/MatchPlayers/MatchPlayersServ.ts:111:21
W20210510-12:11:29.074(-5)? (STDERR)     at /Users/diavrank/.meteor/packages/promise/.0.11.2.8skplc.uqog2++os+web.browser+web.browser.legacy+web.cordova/npm/node_modules/meteor-promise/fiber_pool.js:43:40
W20210510-12:11:29.074(-5)? (STDERR)  => awaited here:
W20210510-12:11:29.074(-5)? (STDERR)     at Function.Promise.await (/Users/diavrank/.meteor/packages/promise/.0.11.2.8skplc.uqog2++os+web.browser+web.browser.legacy+web.cordova/npm/node_modules/meteor-promise/promise_server.js:56:12)
W20210510-12:11:29.074(-5)? (STDERR)     at imports/startup/server/utilities/Transaction.ts:14:13
W20210510-12:11:29.074(-5)? (STDERR)     at /Users/diavrank/.meteor/packages/promise/.0.11.2.8skplc.uqog2++os+web.browser+web.browser.legacy+web.cordova/npm/node_modules/meteor-promise/fiber_pool.js:43:40 {
W20210510-12:11:29.074(-5)? (STDERR)   ok: 0,
W20210510-12:11:29.074(-5)? (STDERR)   code: 20,
W20210510-12:11:29.074(-5)? (STDERR)   codeName: 'IllegalOperation'
W20210510-12:11:29.074(-5)? (STDERR) }

I am using TypeScript, so, I changed a bit the code:

import { Meteor } from 'meteor/meteor';
import { MongoInternals } from 'meteor/mongo';
import { ClientSession } from 'mongodb';

// utility async function to wrap async raw mongo operations with a transaction
const runAsync = async(asyncRawMongoOperations: (arg0: ClientSession) => any) => {

	// setup a transaction
	const { client } = MongoInternals.defaultRemoteCollectionDriver().mongo;
	const session = await client.startSession();
	await session.startTransaction();
	try {
		// running the async operations
		let result = await asyncRawMongoOperations(session);
		await session.commitTransaction();
		// transaction committed - return value to the client
		return result;
	} catch (err) {
		await session.abortTransaction();
		console.error(err);
		// transaction aborted - report error to the client
		throw new Meteor.Error('Database Transaction Failed', err.message);
	} finally {
		session.endSession();
	}
};

export default { runAsync };

Any suggestion to solve this error?

Thanks in advance.

Notes:

  • Environment: Localhost
  • Meteor 2.2
  • Mongo 4.4.0
  • Node 12.18
  • O.S. : Mac
1 Like