How to use futures and fibers for external API calls?


#1

So I have the following scenario -

I have a set of Mongo records (10,000+) returned by a Meteor cursor. I need to iterate through each of the records, pull a bit of data, send that bit over to an external API, then update the record based upon the response. Each API call takes about 2 seconds on average.

I have my program working to process individual records, but the forEach loop of the cursor executes and attempts to execute all 10,000 requests at once. Of course, I don’t have enough memory or bandwidth to execute 10,000 simultaneous API requests, so I run out of resources. This is also not what I want to do, anyways.

Time is not really a concern - so long as the program is running - this is not a user facing process.

What I am trying to achieve is one of the following
a) Setup the system so that it only calls the API when the last request is received or timed out. There will never be more than one request open at once
b) Setup the system so that I can have n threads working on the list [using the last digit in a document number to determine thread]

So I have been looking at fibers and futures and trying to wrap my head on them - any help or suggestions would be appreciated!


#2

If you are running the code in a method, you could look at using this.unblock to allow you to start another, parallel fiber.


#3

Instead of the cursor forEach, you can use underscore each and wrapAsync. Something along the lines of:

apiCallFunction = function(dataFromCursor) {
  /* do something */
  return someData;
}

_.each(myCollection.find().fetch(), function(doc) {
  var someData = Meteor.wrapAsync(apiCallFunction, doc.data);
  myCollection.update({_id: doc._id}, {$set: {doc.otherData: someData}});
});

This is semi-pseudo code, so you may need to tweak here and there, but this is the gist of it.


#4

Thanks for the responses, but my program is still attempting to do 10,000 connections at once even with the underscore method.

What I’m trying to do is ensure apiCallFunction is not running in parallel, or being able to control how many copies of apiCallFunction are runnning to a number my system can handle.


#5

what about something like this:

// get a nonreactive array from the collection
var arrayFromCursor = myCollection.find().fetch();

// your api call function
var apiCallFunction = function(dataFromArray) {
    /* do something */
    return someData;
}

// create a blocking array iterator
var processArray = function() {
    var index = 0;
    return {
        next: function() {
            var element;
            if (!this.hasNext()) {
                return null;
            }
            doc = arrayFromCursor[index];
            // api call function wrapped to make it run sychronous, shorthand for fiber/future
            var someData = Meteor.wrapAsync(apiCallFunction, doc.data);
            var result = myCollection.update({_id: doc._id}, {$set: {doc.otherData: someData}});
            if (result > 0) {
                console.log(doc._id + ' processed.');
            } else  {
                // if you want the full error, instead of blocking result, put this whole login in the callback of the update.
                console.log(doc._id + ' could not be processed.');
            }
            index = index + 1;
        },
        hasNext: function() {
            return index < arrayFromCursor.length;
        },
    }
};

// now run the whole iterator
while(processArray.hasNext()) {  
    processArray.next();
}

#6

Ok, so one thing I found out about wrapAsync is that even though the documentation states to use it in a synchronous form, you dont include a callback. However, but you still need to include it in the function, and call it, or your program will hang!

eg
function myMeteorFunction() {
var mySync = Meteor.wrapAsync(Meteor.myNameSpace.myFunc);
myCursor.forEach(function(doc)
{
mySync(doc)
}):
}

function mySync(doc,cb)
{
// do somework with doc
//, call an API
//, ride a bicycle
cb(); // must be called or your Meteor client hangs forever, until you refresh screen.
}


#7

I eventually went with a forEach loop after getting wrapAsync working .

Now my problem is the cursor will close (I think) after some time, it seems, maybe 10 minutes?

However, I am watching the application do the work (go to the API, update the document, do the next one), so it is not like the cursor is idle. (Or is it)?

I saw an issue open regarding cursors that silently close - could this be what I am experiencing? If so, any way to work around it?