Why Task Systems?

I have been thinking a lot about task-queue systems lately. A queue system is integral to any modern application as it can preform jobs in the background like sending email, processing bulk updates, etc. We all know that Cron is bad and has no place in this modern world of ours, and we need a distributed way of handling these actions :D

At work we recently undertook the (successful!) migration from DelayedJob to Resque and investigated many other options. We also have a custom add-on to Resque (to be released soon) which transforms resque into a multi-app pub/sub message bus. I also recently had the chore opportunity to re-write ActionHero’s task system to include support for more modes. I have been thinking about task systems a lot.

Tasks vs Messages:

I want to make the distinction up front about what I consider a Task System vs. a Message System. To me, the main destination is the notion of a delay and deliverability. With a task system, you assume that there is a queue of actions to be eventually preformed which is likely to, at times, fill faster than can be worked. With a task system you also make the assumption that each message/action/event/task needs to be inspected, and must be delivered to a broker/worker (with the alternative being the message is broadcast to whomever is available to hear it, including a collection of 0 listeners).

tldr: A Message system delivers the message to anyone who is listening during the broadcast, while a tasks system enqueues the message until the recipient can read it.

Queues

The most important thing a task system needs is a persistent atomic list to store tasks. This can be a database table or an array residing in-memory of some application (a list or set in redis). Usually, you want to back this up to disk. Next, the list needs the property that it is somehow possible that items can be added uniquely to it and removed uniquely. That is to say that if a task is added, one-and-only-one copy of that task will ever be present, and that it is possible to remove one-and-only-one item from that last.

For Redis based task systems (like Resque and sidekiq), Redis is a single-threaded app, and allows a worker to pop off an element from a list. Being single threaded, this means that 2 clients who request a pop at the exact same time will be ordered and handled in turn. If there was only one item to pop, only one client would get the data. Tasks can be added by a similar push method.

For Database-backed queues, it’s still possible to ensure an "atomic" pop, but it’s a little trickier. Lets assume we have table with the following structure:

1id | created_at | data | run_at | locked_by | locked_at 2--------------------------------------------------------

This structure is very similar to that created by DelayJobs. Adding a task to a relational database is as simple as a normal insert statement. Most relational Databases have the property that if you do not supply a primary key, one will be uniquely assigned for you, and this ID can be thought of at the place within the task list. Working the queue is a different matter. A worker simply cannot select id from tasks where run_at >= NOW() AND locked_by IS NULL; because this can easily result in a race condition because the acts of select and update (where a worker would claim a task) are not atomic. Putting them within a transaction is also no good, as you can’t read and make decisions on the result (is the result of the select null?).

To solve this, there is actually a multi-step process. First the worker needs to select an eligible task per the sql statement above. DelayedJob actually selects 5 tasks at once in case one of the jobs is claimed by another worker during this next step. The worker then writes a locker_by but not a locked_at. This indicates that the worker intends to work this job, but hasn’t started yet. The worker then reads the job again to ensure that their lock is still there (it’s possible for 2 workers to be on this step for the same job). If the lock is still there, then the worker can lay a locked_at, and start working the job. Here’s how DJ does it for ActiveRecord (mysql, postgres, etc). This process is slower than a redis pop, but generally effective.

The interesting thing about DelayJob’s approach (when compared to the Redis version) above is that the task never ‘leaves’ the queue even when it’s being worked on. That means that if a worker crashes, the data is not lost, and can be examined. It is certainly possible for a redis worker to add back his task’s information, but if there will be a short while between the pop and a subsequent write. There are backends for DelayJob which use noSQL databases (mongo), and they work almost exactly the same way, since these databases don’t have a unique "pop" function.

Priority and Multiple Queues

A pattern Resque made famous was the notion of queues and priority. By default, rescue creates "high", "medium", "default", and "low" queues. This allows you to add normal tasks to the "default" queue, but if you have something important you want want to be worked ASAP, you can add it to the "high" queue and it will be worked by the next available worker. When you launch a a worker, you can also tell it which queues to work in order. You can also say "*", and work all queues in alphabetical order.

Delayed Tasks

Sometimes tasks need to be delayed. If I want to "check a service in 10 minutes" or "send a welcome email 15 minutes after a user signs up", you need not only the above notion of priority, but also of chronology.

Database-backed task systems have this easy. Add a column with a timestamp (run_at above), and just check that against the current time (.. AND where "run_at" <= NOW() ..) in the pop’s where clause. Easy!

Redis-based systems have it harder. You don’t want to be popping each and every task, de-serializing it’s data, checking the timestamp, and then (most of the time) returning it to the queue. This is both dangerous and time consuming. resque-scheduler solves this problem by keeping n timestampe’d queues for tasks that haven’t run yet (and a list of those queues in a hash). This allows for the scheduler to move a task over to the "working" queue whenever its time is ready.

Recurrent Tasks

There are a certain subset of tasks which repeat. A task like "make a database backup" or "email the marketing teal stats" repeat with a fixed frequency. Both DelayJob and Resque don’t have support for this type of task natively, but conceptually if you have delayed tasks, you can always re-enqueue yourself after you finish. Resque Scheduler added support for a "schedule" in later releases that lets you define a yml list of tasks to run, and when to run them.

Duplicatable tasks

This last type of task takes us back to cron. This is the type of task I really do want to run "every day at midnight" on all my servers. This would be a task like "delete the /tmp directory’s contents" or "restart apache". While seemingly simple, this is the hardest type of task to implement in a task system, as it tends to go against the general point of an "atomic, first-come, first serve" queue. As far as I know, there isn’t a mechanism for this in either DJ or Resque, but I did recently add one to ActionHero

My ideal task system (AKA How ActionHero does it)

When building ActionHero’s task system, I took a look at all the types of tasks above, and tried to sort out how to build a system that could accommodate all these options. Luckily, I had some great existing systems I could borrow steal concepts from.

Anti-paterns I wanted to correct

  • The need to run separate workers from web servers
  • In a event-based world (node, twisted-python, event-machine), the same argument for handling many event-driven clients per process can be said about task workers. "You spend most of your times waiting for IO, so why not do something else while you wait?" This is the main selling point of sidekiq when compared to rescue. Taking the metaphor further, why not have the same process able to handle both web requests and tasks? ActionHero does this.
  • It is likely that tasks will be more CPU intensive than web requests, so ActionHero limits how many ‘worker timers’ can be running at once, rather than ‘as many as I can handle’ which is the case for web requests. You can also run the server and disable its web servers to use it just as a worker.
  • This also makes deployments simple since running the server and the worker is the same thing!

The need to run any process "uniquely"

  • resque-scheduler is a great addition to the Resque ecosystem, but you must only run 1-and-only-1 instance of it at a time, which is odd and introduces another single point of failure.
  • The notion that only one worker can move a task from "delayed" to "ready" seems silly to me if the tasks are still kept in a pop-able while they are delayed

All task data should not be stored in the queue, but somewhere more permanent.

  • References to a task need to be kept uniquely in a queue, but the task data itself doesn’t have to be. Why not keep it in a hash with a ‘state’ field? Doing this allows for a few benefits:
  • if a task is lost / crashed there will still be some record of what it was
  • you can follow the status of a task as it evolves
  • less information to be constantly poping and moving around
  • separation of ‘metadata’ from ‘action’

ActionHero’s data model

actionHero defines tasks in /tasks:

1exports.sayHello = { 2 name: "sayHello", 3 description: "I say hello", 4 scope: "any", 5 frequency: null, 6 run: function (api, params, next) { 7 api.log("Hello, " + params.name); 8 next(null, true); 9 }, 10};

and you can invoke the task in your code like so:

1var task = new api.task({ 2 name: "sayHello", 3 runAt: new Date().getTime() + 30000, // run 30 seconds from now 4 params: { name: "Evan" }, // any optional params to pass to the task 5}); // and then you can either task.enqueue() or tasks.run()

First, note the metadata. scope is the toggle for defining if the tasks is duplicatable or not. "any" means that any worker can work this task atomically. "all" means that the task will be worked by all servers currently running, and duplicated accordingly (I’ll explain how later).

Next, you can choose to either run or enqueue the task. The difference between run and enqueue+now is that run() invokes the task in-process rather than passing it to the first available worker. This is useful if you really need to invoke a task now for a request.

runAt is an optional parameter to define when the task should be run (which defaults to now). This isn’t a guarantee that the task will be run exactly at that time (as all the workers might be busy), but rathe that the task is "ready" to be run at that time. This solves the "delayed tasks" problem.

Finally the frequency option. This is used to denote if a task is recurrent, meaning if the task should re-enqueue itself when it completes.

ActionHero uses redis to store its task data. To enable "all" task scopes, this means that every server (not worker) needs a ‘local’ queue of what to work on in addition to the global list. To enable the runAt parameter, there needs to be a ‘holding pen’ for tasks which aren’t ready ro run yet. This leafs us to:

1api.tasks.queues = { 2 globalQueue: "actionHero:tasks:global", 3 delayedQueuePrefix: "actionHero:tasks:delayed", 4 localQueue: "actionHero:tasks:" + api.id.replace(/:/g, "-"), 5 data: "actionHero:tasks:data", // actually a hash 6 workerStatus: "actionHero:tasks:workerStatus", // actually a hash 7 enqueuedPeriodicTasks: "actionHero:tasks:enqueuedPeriodicTasks", 8};

and the worker’s run the following pattern:

1api.taskProcessor.prototype.process = function (callback) { 2 var self = this; 3 clearTimeout(self.timer); 4 self.processLocalQueue(function () { 5 self.processGlobalQueue(function () { 6 self.processDelayedQueue(function () { 7 self.setWorkerStatus("idle", function () { 8 if (self.running == true) { 9 self.timer = setTimeout(function () { 10 self.process(); 11 }, self.cycleTimeMS); 12 } 13 if (typeof callback == "function") { 14 callback(); 15 } 16 }); 17 }); 18 }); 19 }); 20};

Interestingly, the only step that actually "preforms" the task is self.processLocalQueue. The rest of the methods move the task’s pointer from queue to another. Remember that the actual job’s data is stored in the data hash. This allows us to do things like check enqueuedPeriodicTasks at boot and ensure that all the periodic tasks I know about (where frequency > 0) are in system somewhere by inspecting the data hash. Even currently processing jobs will be present.

To run a duplicatable tasks, the worker which moves the job from the globalQueue to it’s own localQueue will also place a duplicate copy in every other server’s queue.

For delayed tasks, the delayedQueuePrefix is used to create a number of queues of the form delayedQueuePrefix + timestamp. A queue may hold a number of jobs which can be run at that time. Rather than checking if every individual tasks is ready to be run, we can simply use the names of the delayed queues and compare them to the current time. As the delayed queues are still atomic lists, it safe for many workers to pop from them at once, and re-insert the job’s pointed into the globalQueue to be worked like normal.

TODO

  • Right now, there is no concept of ‘priority’ ("high", "medium", "low") in ActionHero’s task system. That’s next on the list!

Fin.

I did not have a goal in sharing my thoughts on task systems with you, but hopefully the comparison of how various systems work will be helpful as you choose what to use for your next project.

Originally published at 24 Mar 2013

Hi, I'm Evan

I write about Technology, Software, and Startups. I use my Product Management, Software Engineering, and Leadership skills to build teams that create world-class digital products.

Get in touch