2020/12/14

2020 Perl Advent Calendar - Day 14

<< First | < Prev | Next >

So far in this series we've seen how to build asynchronous functions by building up smaller components using familiar Perl syntax combined with the async and await keywords, and how to use await at the toplevel of the program in order to cause the whole thing to run. We haven't yet seen the bottom-most layer, how to actually create these base-level components that create pending futures. Lets take a look at some examples of that today.

We'll start with a deeper look at Future::IO, which we have mentioned briefly in some previous posts. We saw how it provides methods named like core IO functions, such as sleep and sysread, which work asynchronously by returning futures. In day 10 we saw some ways to use it, so now we'll take a peek at how it is implemented.

These examples won't be a complete in-depth dive covering all the details; for that you can inspect the code yourself on CPAN, or read various other bits of documentation. This is just an brief trip into a few sections, to get an overview of the general ideas and concepts. It also isn't necessary to really follow or understand these details in order to simply use futures and async/await syntax - these are more details of interest when implementing a base-level system. Don't worry if you don't follow this one, or want to skip over it.

The basic principle is that at the lowest level the event system will provide a subclass of the Future class, which provides some helper methods that the async/await syntax will use to interact with them. The full interface that is required is described by Future::AsyncAwait::Awaitable. Instances of this subclass are then constructed by the basic future-returning functions provided by the event system, and the async/await syntax can then interact with them in the appropriate way.

The job of Future::IO is two-fold. It acts as a common interface for asynchronous code which wants to perform basic IO operations (usually sleeps, or reads and writes on filehandles) asynchronously, and it acts as a central place in which some particular event system can provide an actual implementation of these operations, in terms of its own event loop. This is achieved by the package variable $Future::IO::IMPL which stores the name of a class in which it should find the actual operation methods. Most of the methods in the Future::IO class itself just redirect to wherever that is pointing.

use feature 'signatures';
package Future::IO;

our $IMPL;

sub sleep($class, $secs)
{
    return ($IMPL //= "Future::IO::_DefaultImpl")->sleep($secs);
}

Now if we were to call Future::IO->sleep we'll get a future created either by the event system which set the $IMPL variable, or if none exists yet the Future::IO module itself provides a small default one.

The default implementation is provided by an internal package that stores its state in a few lexical variables by using some Struct::Dumb structures. For example, every sleep future is backed by an entry in a list of alarms, where it stores the epoch timestamp for the time it should expire. This package is also the subclass of Future, instances of which are returned as the actual implementation futures.

use feature 'signatures';
package Future::IO::_DefaultImpl;
use base 'Future';

use Struct::Dumb;
struct Alarm => [qw( time f )];

my @alarms;

sub sleep($class, $secs)
{
    my $time = time() + $secs;

    my $f = $class->new;
    
    my $idx;
    # ... code here to find the right index to insert so
    # that @alarms remains sorted in time order
    
    splice @alarms, $idx, 0, Alarm($time, $f);
    
    return $f;
}

There are similar structures and methods defined for sysread and syswrite, though they are somewhat more complex so we won't go into the details in this brief overview.

In order for this future class to work properly, it has to provide a method called await, which is used by the async/await syntax to implement a toplevel await expression on such an instance. (Do not be confused by the identical name. While the method is involved in the process, there is more work involved in the await expression than simply invoking the method directly.) As is common with most future class implementations, this particular await method repeatedly invokes ticks of a toplevel event system implemented by an internal function called _await_once until the future instance is ready. Thus most of the real work happens in this event system. This is the mechanism by which we can asynchronously peform other work while waiting for this particular instance to have its result.

The full body of the _await_once function is around 80 lines long, but for the case of sleep futures the relevant code looks somewhat like the following. Various other details about filehandle reads and writes have mostly been elided for clarity.

sub _await_once
{
    my $rvec = ... # code to set up filehandle read vectors
    my $wvec = ... # similar for write vectors
    
    my $maxwait;
    $maxwait = $alarms[0]->time - time() if @alarms;
    
    select($rvec, $wvec, undef, $maxwait);
    
    ... # code to perform any read or write 
        # operations that are now possible
    
    my $now = time();
    while(@alarms and $alarms[0]->time <= $now) {
        (shift @alarms)->f->done;
    }
}

sub await($self)
{
    _await_once until $self->is_ready;
    return $self;
}

Here we see that on each tick of the event system we set up some variables relating to filehandle input or output that is also going on, and also inspect the @alarms array to check the next time we need to complete one of these sleep futures. We then call select to wait for some filehandle IO, waiting for a time no longer than when the next alarm will expire. Once the select call returns we check what the current time is, and complete any of the sleep futures as required. This sequence will continue, performing any IO operations that other concurrent futures have requested, until the particular instance we were waiting for has finished. In the meantime, any other futures that complete might go on to cause more to be added to the toplevel arrays, and so next time we invoke _await_once those too will be taken into account.

This default implementation isn't very good, and can't support many situations (especially on MSWin32 where the select system call is nowhere near a useful as on other OSes), but is sufficient for small use-cases and examples. Any larger script or program would be advised to pick a better event system and set that as the implementation instead.

For example, if the program decides to use IO::Async as its main event system, it can load the module Future::IO::Impl::IOAsync which will set this variable and provide a better implementation package.

use Future::IO;
use Future::IO::Impl::IOAsync;

# now we can use Future::IO->sleep, Future::IO->sysread,
# etc... mixed with other IO::Async operations.

The actual implementation methods mostly just defer to the future support already built into IO::Async. For example, the sleep method can simply call the delay_future method on the toplevel event loop, because that already returns a suitable future instance. As this is in the IO::Async::Future class, it already provides its own await method, and so no further work is necessary here.

my $loop;

sub sleep($class, $secs)
{
    $loop //= IO::Async::Loop->new;
    
    return $loop->delay_future(after => $secs);
}

By using Future::IO a module can have simple access to future-returning IO operations that yield futures, and will operate concurrently with each other, and with other parts of the program. A toplevel program or script can pick which actual event system it wants to use in order to implement these futures, and any module using them will then operate correctly against the other activity going on in the same program.

<< First | < Prev | Next >

No comments:

Post a Comment