2020/12/11

2020 Perl Advent Calendar - Day 11

<< First | < Prev | Next >

In the past few days we've seen various structures and techniques for achieving concurrency in asynchronous programs. The needs_all and wait_any constructors make it easy to split the flow of control into multiple concurrent pieces, and reconverge afterwards. Sometimes however, this concurrency gets in the way, because there may be situations in which we need to only do one thing at once. Often this comes up when we need to communicate with some external service or device, where having two concurrent actions in flight at once would confuse it.

Now, while we could just choose not to use any asynchronous programming techniques in such a program, that is something of an over-reaction. Perhaps the service or device is just a small part of a larger program and the rest can still operate concurrently, or perhaps we have some sort of user interface to drive as well, and it would be nice to keep that working even while communicating. What we need is some way to limit the concurrency around a small part of the program, while allowing the rest of the logic to run unimpeded. The simplest way of doing this is to use a mutex, provided by Future::Mutex.

This is an object that provides a single asynchronous method, enter, which takes an asynchronous function and runs it at some point in time. If the mutex is free it will be run immediately; if it is busy it be queued to run later when the previous call has finished. When the passed function eventually provides a result, that will become the eventual result of calling the method - i.e. it transparently wraps an asynchronous function.

use Future::AsyncAwait;
use Future::Mutex;

my $mutex = Future::Mutex->new;

my $result = await $mutex->enter(async sub {
    return "the result";
});

say "Returned: $result";

For example, this (somewhat simplified) code example from a hardware device communication module provides an interface onto certain kinds of IO operation. Internally, each operation is performed as a sequence of steps (a "start", some writing and reading, and finally a "stop") which must proceed in that order, uninterrupted by any others. That inner sequence is surrounded by a mutex lock.

use Future::AsyncAwait;
use Future::Mutex;

async sub _i2c_txn($self, $code)
{
    my $mutex = $self->{mutex} //= Future::Mutex->new;
    
    return await $mutex->enter(async sub {
        await $self->start_bit;
        my $ret = await $self->$code;
        await $self->stop_bit;
        return $ret;
    });
}

async sub recv($self, $address, $length)
{
    return await $self->_i2c_txn(async sub {
        await $self->write(chr($address << 1 | 1));
        return await $self->read($length);
    });
}

The _i2c_txn helper method is transparent in terms of return value, because of the way it just returns the result of the mutex enter method. Whatever the inner block of code eventually returns becomes its own eventual return value. This is used by the recv method to return the result of the read call. This is another common style with wrappers around asynchronous code - because a future represents both control flow ("I have finished") and data flow ("The answer is ..."), code can be neatly structured by using them. An operation that completes will necessarily yield its answer at the same time.

You may be looking at this wondering "where is the correspoding leave or similar, to go with the enter?". Well this is one of the great strengths of representing asynchronous operations by having functions that return futures. Calling the function starts an operation, and the future that it returns will complete when the operation finishes. There is no need for the code to explicitly point out that it has now finished because we can observe the returned future and be informed when it does. This avoids an entire class of bug caused by forgetting to release a mutex at the end of an operation.

<< First | < Prev | Next >

No comments:

Post a Comment