Wednesday, December 28, 2011

Tasksets in Gearman

Here's a little tidbit about using the Gearman Perl Client:

Gearman is a powerful yet simple job queuing and distribution framework with clients in a variety of languages,  written by the Djanga folks (who also wrote memcached).

For a recent project, I needed to be able to queue up a potentially large number of jobs during one phase of execution, and begin processing them  went out to find more work to queue. At some point, we would know that we had no more jobs to add to the queue, and would be ready to move on to the next stage when all the jobs had completed. Gearman::Client provides a synchronous "do_task()" method that waits for a task to return its result, and a "dispatch_background()" method that runs jobs in the background. The limitation of background jobs is that it does not return results: you are merely able to monitor the status of the job to see when it completes. Task sets to the rescue!

Task sets allow you to group a number of jobs together, begin executing them immediately in the background, but with full access to the return values, errors and retries via event handlers. Furthermore, task sets provide a wait() method that will wait until all jobs have finished (either succeeded or failed), which was exactly what we needed.

Here's some sample client code that I used to verify this functionality:

1:  #!/usr/bin/env perl  
2:  use strict;  
3:  use warnings;  
4:  use Gearman::Client;  
5:  my $client = Gearman::Client->new();  
6:  $client->job_servers("localhost:4730");  
7:  my $tasks = $client->new_task_set();  
8:  for my $count ( 1 .. 10 ) {  
9:    $tasks->add_task(  
10:      echo => $count,  
11:      {  
12:        on_complete =>  
13:         sub { my $result = shift; warn "from job $count got " . $$result; },  
14:        on_fail => sub { warn "ERROR from job $count"; }  
15:      }  
16:    );  
17:  }  
18:  warn "done adding tasks";  
19:  sleep 5;  
20:  warn "waiting";  
21:  $tasks->wait;  
22:  warn "bye";  

The critical part of this prototype was to verify that jobs began to be processed by workers when the job was added to the taskset on line 9, and not only at the call to wait() on line 21. 

And here is the trivial echo worker that returns whatever you send to it:

1:  #!/usr/bin/env perl  
2:  use strict;  
3:  use warnings;  
4:  use Gearman::Worker;  
5:  sub echo_worker {  
6:    my $job = shift;  
7:    my $arg = $job->arg;  
8:    warn "ARG: $arg";  
9:    return $arg ;  
10:  }  
11:  my $w = Gearman::Worker->new();  
12:  $w->job_servers( "localhost:4730" );  
13:  $w->register_function( echo => \&echo_worker );  
14:  $w->work while 1;  

Without tasksets, it would have been substantially trickier to 

No comments:

Post a Comment