Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Async Python

Asynchronous programming with AsyncIO

Use cases for Async

  • Good for IO intensive tasks

  • Downloading web pages

  • Accessing APIs

  • Accessing databases (especially on other servers)

  • Reading files

Event loop

  • Single thread

  • Single process

  • ... so it uses a single core

  • Cooperative Multitasking

Print sync

  • First let's see a few small and useless examples.
  • In this example we use sleep to imitate some external task we need to wait for and then we print out some text.
  • We do it sequentially. No async here.

The code

import time

def say(text, sec):
    time.sleep(sec)
    print(text)

def main():
    start = time.monotonic()
    say("First", 2)
    say("Second", 1)
    end = time.monotonic()
    print(f"Elapsed: {end-start}")

main()

Output

First
Second
Elapsed: 3.0015416080132127

So it takes slightly more than 3 seconds to wait first for 2 and then for 1 second. No big surprise there.

Print async

  • This is almost the same example as the previous one using sync, but we wait asynchronously.
  • The order of the output is now different.
  • It also finishes 1 sec faster. It finishes when the longest wait ends.

What did we don?

  • We added async in-front of the function definitions to make them co-routines.
  • We replaced the time.sleep by asyncio.sleep that can handle async sleep.
  • We called this new sleep function with the await keyword. That tells the even-loop that other tasks can run till this thing we are awaiting-for finishes.
  • We called the say function inside an await-ed call to asyncio.gather.
  • We started the event loop with asyncio.run.

Code

import time
import asyncio

async def say(text, sec):
    await asyncio.sleep(sec)
    print(text)

async def main():
    print('start main')
    start = time.monotonic()
    await asyncio.gather(
        say("First", 2),
        say("Second", 1),
    )
    end = time.monotonic()
    print(f"Elapsed: {end-start}")

main_co = main()
print(main_co)
asyncio.run(main_co)

Output

<coroutine object main at 0x78265ad9a4d0>
start main
Second
First
Elapsed: 2.0022734529920854

The first print shows that what the main function returns is a object of type coroutine.

The "Second" print appears before the "First", because the former only had to wait 1 second why the latter waited 2 seconds.


  • asyncio
  • async
  • await
  • gather

Sync sleep

  • Let's see the same two examples, but now we also print progress in the say function.

  • First let's see the sequential version.

  • We can see that the first task finishes before the second task starts.

import time


def say(wid, sec):
    start = time.monotonic()
    print(f"Starting {wid} that will take {sec}s")
    time.sleep(sec)
    end = time.monotonic()
    print(f"Finishing {wid} in {end-start}s")

def main():
    start = time.monotonic()
    say("First", 2),
    say("Second", 1)
    end = time.monotonic()
    print(f"Elapsed: {end-start}")


main()

Starting First that will take 2s
Finishing First in 2.002233584993519s
Starting Second that will take 1s
Finishing Second in 1.0011189789511263s
Elapsed: 3.003522119950503

Async sleep

  • In the asynchronouse version we can see them both start and then they get to finish after waiting the appropriate time.

  • The first task took longer and thus ended after the second task.

  • One drawback of this mode of operation using gather is that we need to know the exact calls up-front.

import time
import asyncio

async def say(wid, sec):
    start = time.monotonic()
    print(f"Starting {wid} that will take {sec}s")
    await asyncio.sleep(sec)
    end = time.monotonic()
    print(f"Finishing {wid} in {end-start}s")

async def main():
    start = time.monotonic()
    await asyncio.gather(
        say("First", 2),
        say("Second", 1)
    )
    end = time.monotonic()
    print(f"Elapsed: {end-start}")


asyncio.run(main())

Starting First that will take 2s
Starting Second that will take 1s
Finishing Second in 1.0006165504455566s
Finishing First in 2.002072811126709s
Elapsed: 2.002306482056156

Sync tasks in loop

In this example too we'll use sleep to pretend we are waiting for some external task to finish, but this time we'll start a number of jobs based on what the user supplies. We can't know up-front how many tasks we'll have to call.

Code

import time
import sys

def do_task(task_id: int, sec: int):
    print(f"Start task {task_id}")
    time.sleep(sec)
    print(f"End task {task_id}")

def main():
    if len(sys.argv) != 2:
        exit(f"Usage {sys.argv[0]} NUMBER")

    for i in range(int(sys.argv[1])):
        do_task(i, 1)


start = time.monotonic()
main()
end = time.monotonic()
print(f"Elapsed time: {end-start}")

Output

Start task 0
End task 0
Start task 1
End task 1
Start task 2
End task 2
Start task 3
End task 3
Elapsed time: 4.000992348068394

As one could expect from such code, the total time required for such program to run is the sum of all the tasks as they run sequentially.

Async sleep in loop

  • create_task

  • 4 sleeping in parallel will be much faster

  • This time we used create_task to set up the tasks ahead of time and the we awaited all of them.

import time
import sys
import asyncio

async def do_task(task_id: int, sec: int):
    print(f"Start {task_id}")
    await asyncio.sleep(sec)
    print(f"End {task_id}")

async def main():
    print("Main started")
    if len(sys.argv) != 2:
        exit(f"Usage {sys.argv[0]} NUMBER")

    co_routines = []
    for i in range(int(sys.argv[1])):
        co_routines.append(do_task(i, 1))

    print("Tasks created")

    for t in co_routines:
        await t

    print("Main ended")

start = time.monotonic()
asyncio.run(main())
end = time.monotonic()
print(f"Elapsed {end-start}")
Main started
Tasks created
Start 0
End 0
Start 1
End 1
Start 2
End 2
Start 3
End 3
Main ended
Elapsed 4.004389520036057

Async sleep in loop with gather

  • gather
import time
import asyncio

async def sleep(cnt, sec):
    print(f"Start {cnt}")
    await asyncio.sleep(sec)
    print(f"End {cnt}")

async def main():
    co_routines = []
    for i in range(4):
        co_routines.append(sleep(i, 1))

    await asyncio.gather(*co_routines)

start = time.monotonic()
asyncio.run(main())
end = time.monotonic()
print(f"Elapsed {end-start}")

Start 0
Start 1
Start 2
Start 3
End 0
End 1
End 2
End 3
Elapsed 1.0018720626831055

coroutines

import asyncio

async def answer():
    print("start to answer")
    return 42

async def main():
    a_coroutine = answer()
    print(a_coroutine)

    await asyncio.sleep(0)
    print('before await for coroutine')

    result = await a_coroutine
    print(f"result is {result} after await")
 
asyncio.run(main())
<coroutine object answer at 0x7f8d06e6d240>
before await for coroutine
start to answer
result is 42 after await

Async Tasks

import asyncio

async def answer():
    print("start to answer")
    return 42

async def main():
    a_task = asyncio.create_task(answer())
    print(a_task)

    await asyncio.sleep(0)
    print('before await for task')

    result = await a_task
    print(f"result is {result} after await")

asyncio.run(main())
<Task pending name='Task-2' coro=<answer() running at async_task.py:3>>
start to answer
before await for task
result is 42 after await

Count Async

import time
import asyncio

async def count(name, end, sec):
    for i in range(end):
        print(f"{name} {i}")
        await asyncio.sleep(sec)

async def main():
    await asyncio.gather(
        count('apple', 10, 0.1),
        count('peach', 10, 0.2),
        )

start = time.time()
asyncio.run(main())
end = time.time()
print(f"Elapsed {end-start}")
apple 0
peach 0
apple 1
peach 1
apple 2
apple 3
peach 2
apple 4
apple 5
peach 3
apple 6
apple 7
peach 4
apple 8
apple 9
peach 5
peach 6
peach 7
peach 8
peach 9
Elapsed 2.010511636734009

Passing the baton while sleeping 0 sec

import asyncio

async def count(name):
    print(f"start {name}")
    for cnt in range(10):
        print(f"{name} {cnt}")
        await asyncio.sleep(0)

async def main():
    a_task = asyncio.create_task(count("A"))
    b_task = asyncio.create_task(count("B"))

    print("Before")
    #await asyncio.sleep(1)
    print("After")

    await asyncio.gather(
        a_task,
        b_task
    )

asyncio.run(main())

Async sleep in a queue

import time
import asyncio
import sys

#queue = [4, 3, 1, 2, 1, 7, 3, 4, 5, 1, 2, 1, 2, 3, 4]
queue = [1] * 10
print(f"Total: {sum(queue)}")

parallel = 1
if len(sys.argv) == 2:
    parallel = int(sys.argv[1])

async def worker(wid):
    while queue:
        job = queue.pop(0)

        ts = time.monotonic()
        print(f"Worker {wid} starting job {job} {ts}")
        await asyncio.sleep(job)
        print(f"Worker {wid} finished job {job}")

async def main():
    tasks = []
    for wid in range(parallel):
        tasks.append( asyncio.create_task(worker(wid)) )
    await asyncio.gather(*tasks) 


start = time.monotonic()
print(f"Start {start}")
asyncio.run(main())
end = time.monotonic()
print(f"Elapsed: {end-start}")

Async http

import aiohttp
import asyncio

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://python.org')
        print(html)
        print("OK")

asyncio.run(main())

Sync http requests

import requests
import sys
from bs4 import BeautifulSoup

def fetch(url):
    response = requests.get(url)
    content = response.text
    return content

def main():
    if len(sys.argv) < 2:
        exit(f"Usage: {sys.argv[0]} FILENAME") # examples/parallel/url.txt
    filename = sys.argv[1]
    with open(filename) as fh:
        urls = list(map(lambda url: url.rstrip('\n'), fh.readlines()))
    #urls = urls[0:10]
    #urls = ['https://httpbin.org/get']
    #print(urls)

    tasks = []
    for url in urls:
#        tasks.append([url, fetch(url)])
#    for task in tasks:
#        url, content = task
        content = fetch(url)
        print(url)
        #print(content)
        soup = BeautifulSoup(content, 'html.parser')
        print('Missing' if soup.title is None else soup.title.string)

        #print(task)


main()

Async http requests

import aiohttp
import asyncio
import sys
from bs4 import BeautifulSoup

async def fetch(session, url):
    async with session.get(url) as response:
        content = await response.text()
        return [url, content]

async def main():
    if len(sys.argv) < 2:
        exit(f"Usage: {sys.argv[0]} FILENAME") # examples/parallel/url.txt
    filename = sys.argv[1]
    with open(filename) as fh:
        urls = list(map(lambda url: url.rstrip('\n'), fh.readlines()))
    #urls = urls[0:6]
    #urls = ['https://httpbin.org/get']
    #print(urls)

    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(asyncio.create_task(fetch(session, url)))
        await asyncio.gather(*tasks)
        for task in tasks:
            url, content = task.result()
            print(url)
            #print(content)
            soup = BeautifulSoup(content, 'html.parser')
            print('Missing' if soup.title is None else soup.title.string)

            #print(task)


asyncio.run(main())

Async http requests with queue

import aiohttp
import asyncio
import sys
from bs4 import BeautifulSoup

async def fetch(session, urls, results):
    while urls:
        url = urls.pop(0)
        async with session.get(url) as response:
            content = await response.text()
            soup = BeautifulSoup(content, 'html.parser')
            results[url] = None if soup.title is None else soup.title.string


async def main(parallel, urls):
    results = {}
    async with aiohttp.ClientSession() as session:
        tasks = []
        for _ in range(parallel):
            tasks.append(asyncio.create_task(fetch(session, urls, results)))
        await asyncio.gather(*tasks)
    return results

def setup():
    if len(sys.argv) < 4:
        exit(f"Usage: {sys.argv[0]} LIMIT PARALLEL FILENAME") # examples/parallel/url.txt or wikipedia.txt
    limit = int(sys.argv[1])
    parallel = int(sys.argv[2])
    filename = sys.argv[3]

    with open(filename) as fh:
        urls = list(map(lambda url: url.rstrip('\n'), fh.readlines()))
    if len(urls) > limit:
        urls = urls[0:limit]
    # use asyncio.Queue instead

    results = asyncio.run(main(parallel, urls))

    for url, title in results.items():
        print(f"{url} - {title}")

setup()

Sync chores

We have a number of household chores to do. Each takes a couple of seconds for a machine to do while we have time to do something else. We also have one task, cleaning potatoes, that requires our full attention. It is a CPU-intensive process.

We also have two processes depending each other. We can turn on the dryer only after the washing machine has finished.

import time

def boil_water(sec):
    print(f"Start boiling water for {sec} seconds")
    time.sleep(sec)
    print(f"End boiling water for {sec} seconds")

def washing_machine(sec):
    print("Start washing machine")
    time.sleep(sec)
    print("End washing machine")

def dryer(sec):
    print("Start dryer")
    time.sleep(sec)
    print("End dryer")

def dishwasher(sec):
    print("Start dishwasher")
    time.sleep(sec)
    print("End dishwasher")

def clean_potatoes(pieces):
    print("Start cleaning potatoes")
    for ix in range(pieces):
        print(f"Cleaning potato {ix}")
        time.sleep(0.5)
    print("End cleaning potatoes")

def main():
    dishwasher(3)
    washing_machine(3)
    dryer(3)
    boil_water(4)
    clean_potatoes(14)

start = time.time()
main()
end = time.time()
print(f"Elapsed {end-start}")
Start dishwasher
End dishwasher
Start washing machine
End washing machine
Start dryer
End dryer
Start boiling water for 4 seconds
End boiling water for 4 seconds
Start cleaning potatoes
Cleaning potato 0
Cleaning potato 1
Cleaning potato 2
Cleaning potato 3
Cleaning potato 4
Cleaning potato 5
Cleaning potato 6
Cleaning potato 7
Cleaning potato 8
Cleaning potato 9
Cleaning potato 10
Cleaning potato 11
Cleaning potato 12
Cleaning potato 13
End cleaning potatoes
Elapsed 20.017353534698486

Async chores

  • async
  • await
import time
import asyncio

async def boil_water(sec):
    print(f"Start boiling water for {sec} seconds")
    await asyncio.sleep(sec)
    print(f"End boiling water for {sec} seconds")

async def washing_machine(sec):
    print(f"Start washing machine for {sec} seconds")
    await asyncio.sleep(sec)
    print(f"End washing machine for {sec} seconds")
    await dryer(3)

async def dryer(sec):
    print(f"Start dryer for {sec} seconds")
    await asyncio.sleep(sec)
    print(f"End dryer for {sec} seconds")

async def dishwasher(sec):
    print(f"Start dishwasher for {sec} seconds")
    await asyncio.sleep(sec)
    print(f"End dishwasher for {sec} seconds")

async def clean_potatoes(pieces):
    print(f"Start cleaning potatoes for {pieces} pieces")
    for ix in range(pieces):
        print(f"Cleaning potato {ix}")
        time.sleep(0.5)
        #await asyncio.sleep(0.0001)
    print(f"End cleaning potatoes for {pieces} pieces")

async def main():
    await asyncio.gather(
        dishwasher(3),
        washing_machine(3),
        boil_water(4),
        clean_potatoes(14))

start = time.time()
asyncio.run(main())
end = time.time()
print(f"Elapsed {end-start}")

From the output you can see that we noticed that the washing machine has finished only after we have finished all the potatoes. That's becasue our potato cleaning process was a long-running CPU-intensive process. This means the dryer only starts working after the potatoes are clean.

Start dishwasher for 3 seconds
Start washing machine for 3 seconds
Start boiling water for 4 seconds
Start cleaning potatoes for 14 pieces
Cleaning potato 0
Cleaning potato 1
Cleaning potato 2
Cleaning potato 3
Cleaning potato 4
Cleaning potato 5
Cleaning potato 6
Cleaning potato 7
Cleaning potato 8
Cleaning potato 9
Cleaning potato 10
Cleaning potato 11
Cleaning potato 12
Cleaning potato 13
End cleaning potatoes for 14 pieces
End dishwasher for 3 seconds
End washing machine for 3 seconds
Start dryer for 3 seconds
End boiling water for 4 seconds
End dryer for 3 seconds
Elapsed 10.01340126991272

If after cleaning each potato we look up for a fraction of a second, if we let the main loop run, then we can notice that the washing machine has ended and we can turn on the dryer before continuing with the next potato. This will allow the dryer to work while we are still cleaning the potatoes.

Start dishwasher for 3 seconds
Start washing machine for 3 seconds
Start boiling water for 4 seconds
Start cleaning potatoes for 14 pieces
Cleaning potato 0
Cleaning potato 1
Cleaning potato 2
Cleaning potato 3
Cleaning potato 4
Cleaning potato 5
End dishwasher for 3 seconds
End washing machine for 3 seconds
Start dryer for 3 seconds
Cleaning potato 6
Cleaning potato 7
End boiling water for 4 seconds
Cleaning potato 8
Cleaning potato 9
Cleaning potato 10
Cleaning potato 11
End dryer for 3 seconds
Cleaning potato 12
Cleaning potato 13
End cleaning potatoes for 14 pieces
Elapsed 7.02296781539917

Async more

Explanation

  • The feeling of parallelism

  • Coroutines

  • async/await

  • Asynchronous

  • non-blocking or synchronous vs blocking (aka "normal")

Coroutines

  • Functions that can be suspended mid-way and allow other functions to run (a generator)

  • async def is a native coroutine or asynchronous generator

  • async with

  • async for

More about asyncio

Other libraries:

aiofiles https://pypi.org/project/aiofiles/

unsync

  • unsync
  • Can seemlessly run tasks in async, thread, or fork so different tasks can have the appropriate solution (e.g. CPU bound processes can use fork.)

Async files

import tempfile
import os


def save_file(filename, size):
    with open(filename, 'w') as f:
        for _ in range(size):
            data = ''
            for _ in range(1000):
                data += 'xxxxxxxxxxx'

            f.write(data)

def main():
    tempdir = tempfile.mkdtemp()
    for idx in range(300):
        filename = os.path.join(tempdir, f'{idx}.txt')
        save_file(filename, 100)

main()
import asyncio
import aiofiles
import tempfile
import os


async def save_file(filename, size):
    async with aiofiles.open(filename, 'w') as f:
        for _ in range(size):
            #data = 'x' * 1000000
            data = ''
            for _ in range(1000):
                data += 'xxxxxxxxxxx'

            await f.write(data)

async def main():
    tempdir = tempfile.mkdtemp()

    tasks = []
    for idx in range(300):
        filename = os.path.join(tempdir, f'{idx}.txt')
        tasks.append(asyncio.create_task(save_file(filename, 10)))
    await asyncio.gather(*tasks)
    #print(os.listdir(tempdir))

asyncio.run(main())

Async example

import asyncio
import time

async def say_after(delay, what):
    print(f"start {what}")
    await asyncio.sleep(delay)
    print(what)

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))
    
    await asyncio.sleep(0)
    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

Asynchronus programming with Twisted

About Twisted

Echo

from twisted.internet import protocol,reactor

port = 8000

class Echo(protocol.Protocol):
    def dataReceived(self, data):
        text = data.decode('utf8')
        print(f"Received: {text}")
        self.transport.write("You said: {}".format(text).encode('utf8'))

class EchoFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return Echo()

print(f"Listening on port {port}")
reactor.listenTCP(port, EchoFactory())
reactor.run()
from twisted.internet import reactor,protocol
import sys

if len(sys.argv) < 2:
    exit("Usage: {sys.argv[0]} TEXT")

message = sys.argv[1]
port = 8000

class EchoClient(protocol.Protocol):
    def connectionMade(self):
        self.transport.write(message.encode('utf8'))

    def dataReceived(self, data):
        print(f"Server said: {data}")
        self.transport.loseConnection()

class EchoFactory(protocol.ClientFactory):
    def buildProtocol(self, addr):
        return EchoClient()

    def clientConnectionFailed(self, connector, reason):
        print("connection failed")
        reactor.stop()

    def clientConnectionLost(self, connector, reason):
        print("connection lost")
        reactor.stop()

reactor.connectTCP("localhost", port, EchoFactory())
reactor.run()

Echo with log

from twisted.internet import protocol,reactor

port = 8000

class Echo(protocol.Protocol):
    def dataReceived(self, data):
        print("Received: {}".format(data))
        self.transport.write(data)

class EchoFactory(protocol.Factory):
    def buildProtocol(self, addr):
        print(f"Contection established with {addr}")
        return Echo()

print(f"Started to listen on port {port}")
reactor.listenTCP(port, EchoFactory())
reactor.run()

Simple web client

The code behind this example was deprecated. Need to be fixed.

  • getPage() returns a "deferred"
  • addCallbacks(on_success, on_failure)
  • addBoth(on_both) adds callbock to both success and failure callback chain
from twisted.internet import reactor
from twisted.web.client import getPage
import sys

def printPage(result):
    print("Page")
    print('Size of the returned page is {}'.format(len(result)))

def printError(error):
    print("Error")
    print(f"Error: {error}")
    #sys.stderr.write(error)

def stop(result):
    print('stop')
    reactor.stop()

if (len(sys.argv) != 2):
    sys.stderr.write("Usage: python " + sys.argv[0] + " <URL>\n")
    exit(1)

d = getPage(sys.argv[1])
d.addCallbacks(printPage, printError)
d.addBoth(stop)

reactor.run()

# getPage(sys.argv[1], method='POST', postdata="My test data").

Web client

from twisted.internet import reactor
from twisted.web.client import getPage
import sys
import re
import time

queue = [
  'http://docs.python.org/3/',
  'http://docs.python.org/3/whatsnew/3.3.html',
  'http://docs.python.org/3/tutorial/index.html',
  'http://docs.python.org/3/library/index.html',
  'http://docs.python.org/3/reference/index.html'
  'http://docs.python.org/3/howto/index.html',
  'http://docs.python.org/3/howto/pyporting.html',
  'http://docs.python.org/3/howto/cporting.html',
  'http://docs.python.org/3/howto/curses.html',
  'http://docs.python.org/3/howto/descriptor.html',
  'http://docs.python.org/3/howto/functional.html',
  'http://docs.python.org/3/howto/logging.html',
  'http://docs.python.org/3/howto/logging-cookbook.html',
  'http://docs.python.org/3/howto/regex.html',
  'http://docs.python.org/3/howto/sockets.html',
  'http://docs.python.org/3/howto/sorting.html',
  'http://docs.python.org/3/howto/unicode.html',
  'http://docs.python.org/3/howto/urllib2.html',
  'http://docs.python.org/3/howto/webservers.html',
  'http://docs.python.org/3/howto/argparse.html',
  'http://docs.python.org/3/howto/ipaddress.html',
]

max_parallel = 3
current_parallel = 0
if len(sys.argv) == 2:
  max_parallel = int(sys.argv[1])

def printPage(result):
  print("page size: ", len(result))
  global current_parallel
  current_parallel -= 1
  print("current_parallel: ", current_parallel)
  #urls = re.findall(r'href="([^"]+)"', result)
  #for u in urls:
  #  queue.append(u)
  #queue.extend(urls)
  process_queue()

def printError(error):
  print("Error: ", error)
  global current_parallel
  current_parallel -= 1
  process_queue()


def stop(result):
  reactor.stop()

def process_queue():
  global current_parallel, max_parallel,queue
  print("process_queue cs: {} max: {}".format(current_parallel, max_parallel))
  while True:
    if current_parallel >= max_parallel:
      print("No empty slot")
      return
    if len(queue) == 0:
      print("queue is empty")
      if current_parallel == 0:
        reactor.stop()
      return
    url = queue[0] + '?' + str(time.time())
    queue[0:1] = []
    current_parallel += 1
    d = getPage(url)
    d.addCallbacks(printPage, printError)

process_queue()
reactor.run()
print("----done ---")