How can I manage a job queue across servers/cores with an exact delay?

UPDATE:

What I need:
-Job queue to run on all servers/cores
-Job queue to persist on restart
-Only one server or core to run each job
-Jobs so run async
-Be able to set the EXACT delay on a job (and it to persist on restart)

I have meteor running on multiple cores using the cluster package!

I have a dedicated custom queue handler built for my app but It was only setup for a single server/core.
Whats an efficient way to set it up for multiple?

Basically I wan’t ALL cores/servers checking for jobs but only one to actually run a job at a time.
Do I flag jobs as being ran and then not find them in mongo? Will that even work if I’m looking for jobs in under a second?

Here’s my queue for reference:

Jobs = new Mongo.Collection('jobs')

var throttle = [];
var running = false;

Meteor.setInterval(function(){
if (!(running)) {
	running = true;
	var now = new Date();
	var time = now.getTime();
	var unfinishedJobs = Jobs.find({_id: { $nin: throttle }, date: {$lt: time} });
	unfinishedJobs.map(function (job) {
		throttle.push(job._id);
		if (job.name === 'setTime') {
			setTime(job.campaign);
		}else if (job.name === 'matchLimit') {
			matchLimitReached(job.match);
		}

		/* kill throttle */
		var index = throttle.indexOf(job._id);
		if (index > -1) {
		    throttle.splice(index, 1);
		}
		Jobs.remove({_id: job._id});
	});
	running = false;
}
}, 500);

ps NO I cannot use: synced-cron, job-collection, or differential:workers… This queue absolutely has to support delayed jobs that run EXACTLY at their specified times. (those packages either don’t support that or have delays that are off by seconds)

1 Like

Why every half second?

Why not use a pub/sub design pattern? As soon as one job is finished, another job is pulled to the queue and assigned to the free worked.

Something on the line of:

If you are not handling millions of user, I suggest a mongodb pub/sub approach.

1 Like

Honestly I’d run it every 1ms if I thought it were possible!

I like the pub/sub idea but I’m not sure how to implement that with a delay? The package you sent doesn’t have support for delays.

What I need:
-Job queue to run on all servers/cores
-Job queue to persist on restart
-Only one server or core to run each job
-jobs so run async

The problem is that I need to be able to set the exact second in the future these jobs will run! (I run an online tournament website)

Is it possible for me to have some persistent mongodb script that just looks for jobs that are past todays date and send them to me or something?

simply add a time check to the job. If currentTime < scheduledTime then put back in queue and go to next task.

Makes sense but if the job is scheduled one week from now that just seems really inefficient…

Basically it will just keep checking that job or those 100 jobs over and over making mongo calls.

I don’t know if thats particularly better than what I’m already doing?

For anyone interested here’s my v1 solution:
(will run on multiple cores)
At two cores I can run 10k jobs in ~30seconds with NO duplicates.
I’m sure this could be refactored, but at least it’s working.
(would need another collection for totalWorkers if you are going to use across multiple servers each with multiple cores)

Jobs = new Mongo.Collection('jobs');
Jobs._ensureIndex({'date': 1,});
Throttle = new Mongo.Collection('throttle');
Jobs._ensureIndex({'job': 1});
Processing = new Mongo.Collection('processing');
Processing._ensureIndex({'core': 1, 'running': 1});

var core = parseInt(process.env.CLUSTER_WORKER_ID);
Processing.update({core: core}, { $set: {running: false} });
var totalCores = parseInt(process.env.CLUSTER_WORKERS_COUNT);

var checkForJobs = function (){
	var running = Processing.findOne({core: core}).running;
	if (!(running)) {
		var now = new Date();
		var time = now.getTime();
		var throttle = [];
		Throttle.find({}).map(function (job) {
			throttle.push(job.job)
		});
		var total = Jobs.find({ _id: {$nin: throttle}, date: {$lt: time} }).count();
		var totalCoresAvailable = (Processing.find({running: false}).count())
		var skip = Math.round(total/totalCoresAvailable) * (core -1);
		var limit = Math.floor(total/totalCores);
		if (limit === 0) { limit = 1; }
		Processing.update({core: core}, { $set: {running: true} });


		var areJobs = Jobs.find({ _id: {$nin: throttle}, date: {$lt: time} }, { sort: {date: -1}, limit: limit, skip: skip }).fetch();
		areJobs.map(function (job, index) {
			Throttle.insert({job: job._id});
			if (index === (areJobs.length - 1)) {
				Processing.update({ core: core, running: true }, { $set: {running: false} });
				Meteor.setTimeout(checkForJobs, 500);
			}
		});
		areJobs.map(function (job, index) {
			var cb = function(){
				Jobs.remove({_id: job._id});
				Throttle.remove({job: job._id});
			};

			/* set custom jobs here --fixthis */
			if (job.name === 'consoleLog') {
				Meteor.setTimeout(function() {
					console.log(job.message + core);
					cb();
				}, 1000);
			}

		});
		if (areJobs.length === 0) {
			Processing.update({ core: core, running: true }, { $set: {running: false} });
			Meteor.setTimeout(checkForJobs, 500);
		}

	}
};
if (core) {
	Meteor.setTimeout(checkForJobs, 500);
}
1 Like

remember you can directly schedule the job in the future with settimeout.

Definitely! The timeouts are only to check if any job needs to be ran.

I query every half second for jobs that are ready to be ran
(if I haven’t scheduled them to run yet)

Added one more line to clear the throttle on startup just in case the server crashed unfinished jobs would still be processed on restart.

query every half second = polling pattern = considered bad. You can google why.

Reactive + setTimeout to schedule jobs rather than polling in the future is considered better.

Anyhow you might find no difference between the 2 approaches for your specific scenario

1 Like

Ahh okay!

@muaddib How would I go about converting this to reactive instead of polling?

I understand pub/sub and listen for changes to the collection but I don’t have new jobs coming in all the time?

How do I set it up to be reactive based on times?

(Any code examples would be extremely helpful!)