Handling external API events and keeping DB in sync

I’m integrating with an API that sends server-side events. Some of these events happen in rapid succession, i.e. within milliseconds. I want to capture them and update some data in Mongo. For some reason, I’ve seen updates based on these events being written out of order. I’m assuming it’s due to the timing of the events, the need to check for potential duplicate events on my end, and async writes. I’ve tried awaiting Collection.findOne so that I can check the most recent state before calling Collection.update but it looks like the events come in too quickly for the previous Collection.update to complete. Here’s some example code to help illustrate what I have currently:

// handle server-side events from external API
const getStatus = async() => {
  const url = `${Meteor.settings.url}`
  const es = new EventSource(url, {
    headers: {
      Authorization: authorization
    }
  });

  es.onmessage = async(e) => {
    try {
      const data = JSON.parse(e.data);
      // the API I'm integrating with has been sending duplicate status events so I'm using this to ensure uniqueness
      const evtId = Events.insert(data); 

      if (evtId) {
        await updateCollection(data);
      }
    } catch (error) {
        console.error(error);
      }
    }
  };
}

export const updateCollection = async({_id, status, lastUpdated}) => {
    try {
      const { status: currentStatus } = await Collection.findOne({_id: _id}) || {}; 
     // I was hoping to use something like this to catch writes that are happening out of order but my guess is the events are happening too quickly for the previous status event to finish writing to the db
      if (currentStatus === 'finished') {
        console.log('finished');
        return;
      }

      Collection.update({_id: _id}, {$set: {
        status: status,
        lastUpdated: new Date(lastUpdated),
      }});

      if (status !== 'finished') {
        return;
      }
      
     // do more actions now that we know the status is 'finished'
   
      return;
    } catch(error) {
      console.error(error)
    }
  }

Curious to hear how others have handled this situation. Any tips would be appreciated. :slight_smile:

Two options that you can research:

  1. Transactions
  2. Job queue processing
1 Like