2020/12/12

2020 Perl Advent Calendar - Day 12

<< First | < Prev | Next >

Yesterday we saw Future::Mutex for constraining the concurrency of certain parts of an asychronous program. It keeps a queue of pending calls to be invoked once it is free. Today we will look at another future-related helper object which maintains a queue - named, appropriately enough - Future::Queue. It works best in a pipeline-like structure with a producer and a consumer. Each side could be represented by a different asynchronous function, with the queue helping to move data between them.

A queue instance acts similarly to a regular Perl array, storing a list of values which may be of any Perl type. Values can be added at one end with the push method, and consumed from the other using the shift method. What makes it different from a regular array is that the shift method returns a future that will eventually yield the next item in the queue. If an item is available it will be taken immediately; if not then the future will remain pending until the next time one is added using the push method.

A typical use of this object is to store a queue of work items to be performed. Each item could be a hash reference perhaps, containing keys that describe the work to be performed and how to return the results. Items would arrive from some part of the program, perhaps a server socket of some kind, so perhaps the hash would include the socket on which to send the response back.

use feature 'signatures';
use Future::Queue;

my $queue = Future::Queue->new;

sub on_received_request($request, $client)
{
    $queue->push({
        request => $request,
        client  => $client,
    });
}

# configure a server socket somehow, to invoke
# on_received_request() at the appropriate time.

An asynchronous function can then be constructed to wait for items from this queue by waiting on the asynchronous shift method to provide another item. Whatever work is required is then performed, and a result sent back to the client that requested it.

use feature 'signatures';
use Future::AsyncAwait;

async sub perform_work($request) { ... }

async sub queue_worker($queue)
{
    while(1) {
        my $work = await $queue->shift;
        
        my $response = await perform_work($work->{request});
        
        await $work->{client}->send_response($response);
    }
}

Being an asynchronous function we can now invoke it in a toplevel await expression to start the while loop, which will continue to process requests until the program is terminated. Since it is an asynchronous function, however, and that it calls other asynchronous functions to perform the actual work items and send their responses bck to the clients, perhaps we can start multiple of them to run concurrently. For example, we could decide to run four copies of it, so we can make use of some concurrency in processing these requests.

...

await Future->needs_all(
    map { queue_worker($queue) } 1 .. 4
);

Using a queue in this manner can be seen as a sortof pattern adapter between the push-based supplier of items (which invokes some code when a new one is available), into a pull-based consumer (which actively asks to be given the next one). Different parts of a large program could be structured in these two different ways, and a Future::Queue makes a convenient conversion point between the two.

<< First | < Prev | Next >

No comments:

Post a Comment