By Loïc d'Anterroches, xhtml.net, 20th of February, 2011

Background Tasks with Photon

Background tasks allow you to perform operations in the background to offload process intensive or specialized work. A good example is to use a task to manage a chat server.

A Task Run Something in the Background

Examples of tasks for you to have an idea:

  • return the time to synchronize different systems;
  • receive logs and write them to disk in batch every 60 seconds to make large writes;
  • keep track of all the connected users in a chat and send them the current messages;
  • send notifications emails to have the webpage refresh fast;
  • small memory cache available for all the Photon processes;
  • performing indexing of documents after insert or update.

Good candidats to be converted as tasks:

  • you do not need the answer to the operation for the client right now;
  • the operation task more than 50 ms in average to complete;
  • the operation serves as a reference data for others;
  • the operation connect to remote systems with high latency.

Tasks can be asynchronous, they get a job to be done and do not return the answer directly or synchronous, they answer directly.

Asynchronous Mail Sender Task

Suppose we have a message to send to 25 persons during the processing of a request. For example, you have updated a bug with some comments and 25 persons want to be notified of the bug evolution. This would cost you a lot of time to send directly the emails, so we are going to send them through an asynchronous task.

The task will take:

  • the list of email addresses;
  • the message subject;
  • the message body.

You will see that it is extremely simple to create such a task, you just need to define the method doing the work, simply named work and the name of the task.

// We always put the code in a namespace. Here you will put this code in
// the yourproject/apps/mywebapp/task.php file.
namespace mywebapp\task;
// We need the project configuration to have the sender email.
use photon\config\Container as Conf;

class MailSender extends \photon\task\AsyncTask
{
    /**
     * The name of the task. It must be unique.
     */
    public $name = 'mailsender';

    /**
     * We receive the work and send the emails.
     */
    public function work($socket)
    {
        // task get in the message its name, the id of the client requesting
        // the work (good for logging) and the payload. Most of the time 
        // the payload will be json encoded.
        list($taskname, $client, $payload) = explode(' ', $socket->recv(), 3);
        $payload = json_decode($payload);
        $emails = $payload['emails'];
        $subject = $payload['subject'];
        $body = $payload['body'];
        $from = ;
        $headers = 'From: ' . Conf::f('from_email') . "\r\n" .
                   'X-Mailer: Photon/' . \photon\VERSION;
        foreach ($payload['emails'] as $to) {
            // You just send your email as usual, maybe using some of the
            // nice PEAR libs to do so.
            mail($to, $payload['subject'], $payload['body'], $headers);
        }
    }
}

Now, you have created your task, the last step is to install it. To do so, just add to your configuration file that the mailsender task is available as the \mywebapp\task\MailSender class.

return array( 
    ...
    'installed_tasks' => array('mailsender' => '\mywebapp\task\MailSender'),
);

Now stop, start your project and list the available processes:

$ hnu server stop && hnu server start
$ hnu server list
Photon id                                     Uptime        Served  Mem. (kB)  Peak mem. (kB)
-----------------------------------------------------------------------------------------------
loa-desktop-32559-1298195713                  0d00:00:54    0       1557       1567
loa-desktop-32560-1298195713                  0d00:00:54    0       1557       1567
loa-desktop-task-mailsender-32558-1298195713  0d00:00:54    0       1525       1540
loa-desktop-32561-1298195713                  0d00:00:54    0       1557       1567
-----------------------------------------------------------------------------------------------

You can see the line loa-desktop-task-mailsender-32558-1298195713, your task is available and ready to process work for you. Yeah! Tho next step is to send work to your task.

So, in your code, we are going to send work to the mailsender task:

class MyViews
{
    public function updateTicket($request, $match)
    {
        // Do a lot of work to update the ticket
        $payload = array(
                 'emails' => array('',
                                   '',
                                   '',
                                   ''),
                 'subject' => 'Ticket 123 has been updated.',
                 'body' => 'Hello, Photon is great and you too.');
        // The runner will take care of the communication with the task,
        // even if the task is on a remote system far away.
        $runner = new \photon\task\Runner();
        // The run call will return immediately!
        $runner->run('mailsender', $payload);
        return new \photon\http\Response('Ticket updated!', 'text/plain');
    }
}

Could it be simpler? You start a task runner \photon\task\Runner and send a payload to the mailsender task. Nothing more, nothing less.

With a traditional framework, you need to setup manually a queue system, define a protocol, get workers to connect to the queue and supervise everything. Most of the time people are ending up installing Gearman or rolling their own system. If this is just to send emails, this is really a lot of work for such a small job!

Synchronous Memory Cache

Now, even if we have a very fast framework, if you want to display a feed on your user page, it is better to retrieve the feed from a cache instead of getting it each time. So, we are going to create a memory cache task which will be available for all the Photon processes. We could cache the feed in the memory of each Photon process, but if you have 20 of them and have a refresh every 20 minutes, it means that you will have 20 fetches of the feed every 20 minutes. Not good.

For a memory cache, one want to directly get the results of the request, the request being either to store or get a value. We are going to add the task in the same namespace as the previous task.

// We always put the code in a namespace. Here you will put this code in
// the yourproject/tasks/mywebapp/task.php file.
namespace mywebapp\task;

// Here the MailSender task

// Notice that we extend the "SyncTask"
class MemoryCache extends \photon\task\SyncTask
{
    /**
     * The name of the task. It must be unique.
     *
     * By convention, prefix the name with the name of your 
     * application, this will save your day when you will need 
     * to integrate another "mailer" task in your system.
     */
    public $name = 'mywebapp_memorycache';

    /**
     * This is the memory storage. 
     *
     * For each key we store: array($value, $expiration_time, $stats)
     */
    public static $storage = array();
    public static $counter = 0;

    /**
     * We receive the work and answer directly.
     */
    public function work($socket)
    {
        list($taskname, $client, $payload) = explode(' ', $socket->recv(), 3);
        $payload = json_decode($payload);
        $ans = sprintf('%s %s %%s', $client, $taskname);
        switch ($payload['action']) {
        case 'get':
            if (isset(self::$storage[$payload['key']]) 
                && self::$storage[$payload['key']][1] > time()) {
                $ans = sprintf($ans, 
                               json_encode(self::$storage[$payload['key']][0]));
                self::$storage[$payload['key']][2]++;
            } else {
                $ans = sprintf($ans, json_encode(null));
                unset(self::$storage[$payload['key']]);
            } 
            break;
        case 'put':
            self::$storage[$payload['key']] = array($payload['val'], 
                                              $payload['timeout'] + time(), 
                                              0);
            $ans = sprintf($ans, json_encode('OK'));
            break;
        case default:
            $ans = sprintf($ans, json_encode('KO'));
        }
        $socket->send($ans);
    }

    /**
     * the loop() method is called after each call to work or at least 
     * every 200 ms. We are going to use it to clean the old keys.
     *
     * This is not strictly needed, but may prevent accumulation of dead
     *  keys in the memory.
     */
    public function loop()
    {
        // checking every 200ms is not good, so we are going to check
        // every few minutes
        self::$counter++;
        if (0 !== (self::$counter % 2000)) { 
            return;
        }
        self::$counter = 0; // We reset the counter 
        $time = time();
        foreach (self::$storage as $key => $val) {
            if ($val[1] < $time) {
                unset(self::$storage[$key]);
            }
        }
    }
}

As you can see, it is a bit more complex, but in less than 40 lines of code you have a functional albeit limited memory cache task. The really important point to notice is that you must both receive the work with $socket->recv() and send the answer with $socket->send() within the work() method. This is asynchronous task, that is, your client is waiting for your answer right now!

Now, we have two tasks we can enable:

return array( 
    ...
    'installed_tasks' => array(
                      'mailsender' => '\mywebapp\task\MailSender',
                      'mywebapp_memorycache' => '\mywebapp\task\MemoryCache',

                     ),
    );

By default, a task is listening on the tcp://127.0.0.1:5997 socket, as we do not want both tasks to listen on the same socket, we are going to tell the memory cache to listen on another socket. This is done in the configuration file:

return array( 
    ...
    'installed_tasks' => array(
                      'mailsender' => '\mywebapp\task\MailSender',
                      'mywebapp_memorycache' => '\mywebapp\task\MemoryCache',

                     ),
    'photon_task_mywebapp_memorycache' => 
               array('sub_addr' => 'tcp://127.0.0.1:5998'),
    );

We just say, instead of listening to the standard socket, use another one. Now, stop and start photon, then check what you have as running tasks:

$ hnu server list
Waiting for the answers...
Photon id                                    Uptime        Served  Mem. (kB)  Peak mem. (kB)
----------------------------------------------------------------------------------------------
loa-desktop-3201-1298206641                            0d00:00:02    0       1558       1568
loa-desktop-task-mailsender-3198-1298206641            0d00:00:02    0       1536       1551
loa-desktop-3200-1298206641                            0d00:00:02    0       1558       1568
loa-desktop-task-mywebapp_memorycache-3197-1298206641  0d00:00:01    0       1536       1551
loa-desktop-3199-1298206641                            0d00:00:02    0       1558       1568
----------------------------------------------------------------------------------------------
5 Photon servers running. Memory usage: 7749kB.

Perfect, you have the default three handlers receiving the requests from Mongrel2 and the two background tasks. Let's have fun and use this new task.

class MyViews
{
    public function displayFeed($request, $match)
    {
        $payload = array('action' => 'get',
                         'key' => 'my_feed');
        $runner = new \photon\task\Runner();
        // The run call will wait for the task answer.
        $ans = $runner->run('mywebapp_memorycache', $payload);
        if ($ans === null) { 
            $ans = get_the_feed(); 
            $payload = array('action' => 'put',
                             'key' => 'my_feed',
                             'val' => $ans, 
                             'timeout' => 3600); // 1 hour
            $runner->run('mywebapp_memorycache', $payload);
        }
        return new \photon\http\Response($ans);
    }
}

Simple... you just created a memory cache daemon which will be available to the 3 photon processes running at the moment.

But, you can see that in your request, the get_the_feed() call is going to take time, so, the better way would be to create an asynchronous task to retrieve the feed, store the result in memory for one year and store in another key the age of the feed. If the feed is older than 1h, trigger the asynchronous task to refresh the feed and push it in the memory cache. You just need to control that if 30 clients are triggering the retrieval of the feed, only one is performed per hour. This is easy as you would have only one task running to refresh the feed.

Now, I hope you understand how easy it is to write tasks and use them. Also, you can start to think your application in a way to provide maximal speed to your end users.