2020/12/13

2020 Perl Advent Calendar - Day 13

<< First | < Prev | Next >

Yesterday we saw Future::Queue, and how it can help adapt data transfer between push- and pull-based code structures. Another kind of helper object that forms a queue is Future::Buffer. This acts somewhat like a UNIX FIFO or named pipe, supporting a write operation to append more data into one end of it, and various read-like operations that consume data from other end. As with Future::Queue, the methods to consume data return futures, which either complete immediately if the data is already there, or will remain pending until sufficient has been provided.

Where Future::Queue operates on distinct values, a Future::Buffer maintains an continuous string of data. Write operations will concatenate more data onto the end of the buffer, and read operations will consume substrings from the beginning of it. Thus, while there is a one-to-one correspondence between push and shift operations on a queue, it could be the case that one write into a buffer satisfies multiple reads; or that it takes multiple writes to provide enough data for the next read.

There are in fact three different read-like methods, differing in how they specify the amount of data they'll consume. read_atmost is most similar to sysread, in that it will complete with any amount of data up to the size specified. read_exactly demands an exact amount of data, and will remain pending until that is available. read_until takes a regexp pattern or string (for example, a linefeed), and will consume data up to the next match. This makes it simple to create line-based readers that consume asynchronously arriving bytes of input, for example.

use Future::Buffer;

my $buffer = Future::Buffer->new;
...

async sub respond_to_lines
{
    while(1) {
        my $line = await $buffer->read_until("\n");
        say "The line was: $line";
    }
}

To keep the buffer full of data to work on, we can as before use another while loop, concurrently with the first, to obtain bytes of data and write them into the buffer. In this example we use Future::IO to asynchronously read bytes from the STDIN filehandle.

use Future::IO;

async sub keep_buffer_filled
{
    while(1) {
        my $bytes = await Future::IO->sysread(\*STDIN, 4096);
        await $buffer->write($bytes);
    }
}

Similar to yeterday's case with Future::Queue, we can see that a Future::Buffer can act as a transfer mechanism between a push-based provider of data and a pull-based consumer.

This mode of operation is not without its downsides though. If data arrives at a rate faster than it can be consumed then the buffer will grow and consume ever more memory. We can instead opt to run the buffer with a pull-based provider as well, having it request more data from upstream when it needs it. In the example above, we see what turns out to be a common pattern - a while loop which just obtains more data from some source then calls write on the buffer. Rather than working in this manner, we can instead give the buffer object itself an asynchronous callback function for it to call when it needs more data, via the fill parameter.

use Future::Buffer;
use Future::IO;

my $buffer = Future::Buffer->new(
    fill => async sub {
        await Future::IO->sysread(\*STDIN, 4096);
    }
);

async sub respond_to_lines
{
    # same as before
}

Here, we have given an asynchronous callback function that lets the buffer request more data from the standard input stream directly. It will invoke this whenever necessary; i.e. when it has at least one read future outstanding for which it doesn't yet have enough data to complete. This allows it to fetch more data to satisfy readers, without always just pushing more data into the buffer as fast as it arrives. It provides a way to give backpressure further up the data flow, into the standard input stream, and beyond into wherever that data was coming from.

<< First | < Prev | Next >

No comments:

Post a Comment