Meteor Mongo Collections find forEach cursor iteration and saving to ElasticSearch Problem

i have Meteor App which is connected to MongoDB.
In mongo i have a table which has ~700k records.
I have a cron job each week, where i read all the records from the table (using Mongo Cursor) and in batches of 10k i want to insert them inside Elastic Search so they are indexed.

let articles = []
Collections.Articles.find({}).forEach(function(doc) {
        articles.push({ 
            index: {_index: 'main', _type: 'article', _id: doc.id }
        },
        doc);
        if (0 === articles.length % 10000) {
          client.bulk({   maxRetries: 5,  index: 'main',   type: 'article',    body: articles   })
          data = []
        }
    })

Since for each is synchronous, goes over each record before it continues, and client.bulk is async, this is overloading the elastic search server and it crashes with Out of Memory Exception.
Is there a way to pause the forEach during the time when the insert is being done? I tried async/await but this does not seem to work as well.

let articles = []
Collections.Articles.find({}).forEach(async function(doc) {
        articles.push({ 
            index: {_index: 'main', _type: 'article', _id: doc.id }
        },
        doc);
        if (0 === articles.length % 10000) {
          await client.bulk({   maxRetries: 5,  index: 'main',   type: 'article',    body: articles   })
          data = []
        }
    })

Any way how to achieve this?

EDIT: I am trying to achieve something like this - if i use promises

let articles = []
Collections.Articles.find({}).forEach(function(doc) {
        articles.push({ 
            index: {_index: 'main', _type: 'article', _id: doc.id }
        },
        doc);
        if (0 === articles.length % 10000) {
        // Pause FETCHING rows with forEach
            client.bulk({   maxRetries: 5,  index: 'main',   type: 'article',    body: articles    }).then(() => {
                                          console.log('inserted')
                                          // RESUME FETCHING rows with forEach
                                          console.log("RESUME READING");  
            })
          data = []
        }
    })

You’d better map your documents into a bulk and then loop with for ... of, see https://stackoverflow.com/questions/37576685/using-async-await-with-a-foreach-loop.

1 Like

This could be a better solution for your data sync: https://github.com/yougov/mongo-connector

@softwarerero Async iteration did the trick! Thanks a lot!
Here is the working code

let articles = []
let cursor = Collections.Articles.find({})

for await (doc of cursor) {
        articles.push({ 
            index: {_index: 'main', _type: 'article', _id: doc.id }
        },
        doc);
        if (articles.length === 10000) {
            await client.bulk({   maxRetries: 5,  index: 'trusted',   type: 'artikel',    body: articles   })
            articles = []
        }
    }