HELP: Email sends emails twice!

I have a collection which contains emails which must be sent each 10 minutes.
I have a cron job to do this.
First I fetch all collection, iterate it, and remove sent documents.
The problem is that I receive two emails instead of one. Why are they duplicated?

 new CronJob('*/10 * * * *', async function() {

    const outgoingEmails = OutgoingEmails.find({}).fetch();

    outgoingEmails.forEach(oe => {
        Email.send({ to: oe.to, from: oe.from, subject: oe.subject, html: oe.body });
        OutgoingEmails.remove({ _id: oe._id });
    });

}, null, true);

I use SMTP from Mailgun if it is important.

Are they duplicated inside the collection?

No, just one copy. No duplicates. On localhost it works ok, but not in production.

Do you have just one server running or multiple? I dont know which ceon package you are using but is probably one time on each server if you have more than one.

I use Amazon AWS Elastic Beanstalk with load balance on 2 instances. Can it be a reason for this? If yes, how to avoid this duplication?

I use this cron: https://www.npmjs.com/package/cron

Its probable. While you are processing the email do you mark it as locked? You need to flag it so that the second server wont rerun it.

You might need to do a findAndUpdate operation for it to work correctly as both servers might attemp to run it almost at the same time.

1 Like

What do you mean with “lock”? I just remove the document which is sent. It looks like both emails are sent simultaneously.
What is findAndUpdate operation? Can you explane it? :slight_smile:

The issue is the email send operation can take some ms until finished, then you remove the document from the db. In that time the second server is getting the same document and sending the email again.

You will need to do something like this

new CronJob('*/10 * * * *', async function() {
    const outgoingEmails = OutgoingEmails.rawCollection().findAndModify({ isLocked: { $exists: false } } }, { $set: { isLocked: true } }).fetch();

    outgoingEmails.forEach(oe => {
        Email.send({ to: oe.to, from: oe.from, subject: oe.subject, html: oe.body });

        OutgoingEmails.remove({ _id: oe._id });
    });

}, null, true);
2 Likes

I am trying to implement it, but I get en error: .fetch is not a function
I had to rewrite it a little bit:

    const outgoingEmails = OutgoingEmails.rawCollection().findAndModify(
        { isLocked: { $exists: false } }, 
        [], 
        { $set: { isLocked: true } }
    ); 

But outgoingEmails is a promise. How to fetch the data? Never used findAndModify. Need assistance, please.

Im not in my laptop right now, but it should look liked this i think.

const outgoingEmails = await OutgoingEmails.rawCollection().findAndModify(
        { isLocked: { $exists: false } }, 
        [], 
        { $set: { isLocked: true } }
    ).toArray(); 
``´
1 Like

Thank you that you are trying to help me, but I get
UnhandledPromiseRejectionWarning: TypeError: OutgoingEmails.rawCollection(…).findAndModify(…).toArray is not a function
:-/

Documents was updated with “isLocked”. I just need to fetch the collection…

You are right, my bad. It wont return an array it finds and update just one (Im to tired right now).

There are two options, you set only one of the servers to run the cron, use transactions or do it in 2 steps (I think this is easier)

  const opId = Random.id();
  OutgoingEmails.update({ isLocked: { $exists: false } }, { $set: { isLocked: opId } }, { multi: true });

  const emails = OutgoingEmails.find({ isLocked: opId }).fetch();

  // TODO: Send Email

  // TODO: Remove sent emails

I think that might work.

1 Like

In MongoDB manual written: Modifies and returns a single document.
{ multi: true } does not help, it is just one document which is returned. :-/

I ended up with this solution, which works:

    const opId = Math.random();

    OutgoingEmails.update({ isLocked: { $exists: false } }, { $set: { isLocked: opId } }, { multi: true });

    const emails = OutgoingEmails.find({ isLocked: opId }).fetch();

    emails.forEach(oe => {
        Email.send({ to: oe.to, from: oe.from, subject: oe.subject, html: oe.body });
        OutgoingEmails.remove({ _id: oe._id });
    });

Thank you for your advices, @pmogollon :wink:

1 Like

I don’t think this is gonna work. You still can have a race condition, like so:
server 1: lock the collection
server 2: lock the collection (a no-op)
server 1: fetch the locked docs
server 2: fetch the locked docs
Result: duplication

If you have more than one process doing more than one step, you’ll never escape the race conditions. (Unless it’s a multi-threaded environment with kernel-based lock management, or a database with real, bona-fide transactions.)

This stuff is always rather tricky. It’s more reliable to ensure that exactly one server is handling things. Max Savin wrote a tool for this, I think, which is probably excellent, judging by his other work.

Good point @alan99, IIRC this line won’t do anything if it’s already locked:

    OutgoingEmails.update({ isLocked: { $exists: false } }, { $set: { isLocked: opId } }, { multi: true });

So you’ll need to look at the result to see if any document was updated before proceeding

Another solution is to use a job scheduler which works in a multi-server environment such as msavin:sjobs, or my own (more CPU efficient) fork, wildhart:jobs.

Both these packages use a collection to manage the list of pending jobs, but only one server at a time can be in control of the job queue (controlled by another collection). If one server goes down the other server automatically takes over the job queue.

Then you can either set a job which repeats every 10 minutes to query the emails collection and send the emails, or create a job which sends a single email after 10 minutes, then you don’t need the emails collection anymore.

Can I ask why you’re only sending the emails every 10 minutes?

Also, since you use mailgun (as do I), the mailgun-js API is a bit quicker than using Email.send() (which uses SMTP)

3 Likes

Ah, that’s much simpler. Also I missed the OP is setting a server-unique opId, which serves the same purpose. Apologies for my hasty post!

1 Like

I looking now at a similar situation. In a generalized form, I’d love to read the Amazon Instance ID or preferably the number of the “machine” in the balancing batch (e.g. 0 or 2 or 4 etc). I’d run some server side processes if I’m on the first machine (scheduled Push) and some other automated processes on some other machine. Perhaps best is if I could see the processor % occupancy or memory and run those processes on a particular Amazon Instance Id, making this entire process dynamic and ensuring it only runs on one machine at a time. Gotta dig some more but what I know for certain is that I’d be using Amazon to select the machine and not add complexity in the Meteor side of things.