Thursday, November 19, 2015

A simple introduction to Asyncio

This is a simple explanation of the asyncio module and new supporting language features in Python 3.5. Even though the new keywords async and await are new language constructs, they are mostly* useless without an event loop, and that is supplied in the standard library as asyncio. Also, you need awaitable functions, which are only supplied by asyncio (or in the growing set of async libraries, like asyncssh, quamash etc.).

Note: My previous post uses these without the asyncio event loop. But you should not normally do that.

In [2]:
import asyncio
In [3]:
async def awaitable(x):
    print('Awaiting for {} seconds...'.format(x), flush=True)
    await asyncio.sleep(x)
    print('Done awaiting for {} seconds.'.format(x), flush=True)
    return 'Returned from {}'.format(x)

This was our first async function. Unlike a normal generator function, you can't just include an await statement to get it to change from a normal function to an async function; you must also use async def instead of def. This was done partially because await is (still) a valid function name, and there is a lot more code out there when yield was changed to be a keyword.

Note: this is using the new syntax. Exactly the same behavior can be accieved in Python 3.4 or even 3.5 using @asyncio.coroutine decorators in place of async and yield from in place of await. async for and async with are new constructs, though.

Under the covers, this is still just a special generator that will behave in a special way when iterated against. Let's see this first in it's natural setting, an event loop:

In [4]:
loop = asyncio.get_event_loop()
In [5]:
tasks = asyncio.gather(*(awaitable(i) for i in range(3)))
In [6]:
%%time
loop.run_until_complete(tasks)
Awaiting for 0 seconds...
Awaiting for 1 seconds...
Awaiting for 2 seconds...
Done awaiting for 0 seconds.
Done awaiting for 1 seconds.
Done awaiting for 2 seconds.
CPU times: user 12.9 ms, sys: 2.79 ms, total: 15.6 ms
Wall time: 2.01 s
Out[6]:
['Returned from 0', 'Returned from 1', 'Returned from 2']

Closing the loop will close it forever. You cannot restart a closed loop. (see the method later if you need to get the global loop working again)

In [7]:
loop.close()

Even though there are no new threads, the global interpreter lock is still in place, etc, we see that these are completing in parallel! This is the core of async programming; the code is make of resumable functions (coroutines) that can keep yielding a "not ready" object when they are not complete.

Let's look at making our own loop without touching the main loop. We will have to use the new loop with any awaiting functions, too:

In [8]:
async def awaitable(x, *, loop=None):
    print('Awaiting for {} seconds...'.format(x), flush=True)
    await asyncio.sleep(x, loop=loop)
    print('Done awaiting for {} seconds.'.format(x), flush=True)
In [9]:
newloop = asyncio.new_event_loop()
In [10]:
tasks = asyncio.gather(*(awaitable(i, loop=newloop) for i in range(3)),
                       loop=newloop)
In [11]:
%%time
newloop.run_until_complete(tasks)
Awaiting for 1 seconds...
Awaiting for 2 seconds...
Awaiting for 0 seconds...
Done awaiting for 0 seconds.
Done awaiting for 1 seconds.
Done awaiting for 2 seconds.
CPU times: user 10.6 ms, sys: 2.37 ms, total: 13 ms
Wall time: 2.01 s
Out[11]:
[None, None, None]

Notice how we had to use the new loop everywhere? It had to be specified in the wait function, and in the gather function. Those default to registering into the main loop unless another is specified (by keyword only). The await using the loop to produce the delays. The gather function actually registers the tasks into the loop. Maybe this would be more obvious if we used run_forever instead:

In [12]:
async def stopit(x, loop):
    print('Stopping in {} seconds...'.format(x), flush=True)
    await asyncio.sleep(x, loop=loop)
    print('Stopping loop.'.format(x), flush=True)
    loop.stop()
In [13]:
asyncio.gather(awaitable(2, loop=newloop), stopit(3, newloop), loop=newloop);
In [14]:
newloop.run_forever()
Awaiting for 2 seconds...
Stopping in 3 seconds...
Done awaiting for 2 seconds.
Stopping loop.

We can see that gather did, in fact, add the tasks to the event loop (other methods can do this too, like asyncio.ensure_future or loop.create_task). The loop will run forever until stop is called, and obviously it must be called from something that is running in the loop (since the run_forever call blocks).

Since all these calls requiring the loop are a little ugly, you can also use asyncio.set_event_loop to set the global event loop. You can use other event loops that follow the same design principles; the QT event loop has been patched to work with asyncio in the quamash library, for example.

To use global loop code again, we can combine the previous loop creation and set the global event loop with a new loop:

In [15]:
asyncio.set_event_loop(asyncio.new_event_loop())

We can also restart the global event loop simply by resetting the global event loop policy:

In [16]:
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())

Here's a quick example modified from the asyncssh readme, to show using ssh with the async syntax:

In [17]:
import asyncio, asyncssh, sys

async def run_client():
    with (await asyncssh.connect('localhost')) as conn:
        stdin, stdout, stderr = await conn.open_session('echo "Hello!"')
        output = await stdout.read()
        print(output, end='')

        status = stdout.channel.get_exit_status()
        if status:
            print('Program exited with status {}'.format(status), file=sys.stderr)
        else:
            print('Program exited successfully')

asyncio.get_event_loop().run_until_complete(run_client())
Hello!
Program exited successfully

Note that Cryptography 1.1 is required by the latest version of AsyncSSH, and that is not yet supported by Conda, and since Conda uses its own version of openssh, it is hard to install a new cryptography with pip. I would use pip install asyncssh==1.3.0 until Conda releases a new cryptography release. Both libraries are under very active development.

Bonus

As you just saw, to quickly run an asyncio function as a synchronous function, just do

asyncio.get_event_loop().run_until_complete(function())

If you really do not want scheduled global code running, you can always use a new loop (but then make sure the async functions use it) or you can temporarily change the global event loop. For example, first using the global event loop, then using a new temporary global loop:

In [18]:
%%time
asyncio.get_event_loop().run_until_complete(asyncio.sleep(3))
CPU times: user 2.65 ms, sys: 1.16 ms, total: 3.81 ms
Wall time: 3.01 s
In [19]:
from contextlib import contextmanager
@contextmanager
def temp_eventloop():
    oldeventloop = asyncio.get_event_loop()
    asyncio.set_event_loop(asyncio.new_event_loop())
    try:
        yield
        asyncio.get_event_loop().close()
    finally:
        asyncio.set_event_loop(oldeventloop)
In [20]:
main_loop = asyncio.get_event_loop()
In [21]:
%%time
with temp_eventloop() as loop:
    asyncio.get_event_loop().run_until_complete(asyncio.sleep(3))
CPU times: user 3.55 ms, sys: 1.43 ms, total: 4.98 ms
Wall time: 3.01 s
In [22]:
main_loop is asyncio.get_event_loop()
Out[22]:
True

No comments:

Post a Comment