Async Python
Asynchronous programming with AsyncIO
Use cases for Async
-
Good for IO intensive tasks
-
Downloading web pages
-
Accessing APIs
-
Accessing databases (especially on other servers)
-
Reading files
Event loop
-
Single thread
-
Single process
-
... so it uses a single core
-
Cooperative Multitasking
Print sync
- First let's see a few small and useless examples.
- In this example we use
sleepto imitate some external task we need to wait for and then we print out some text. - We do it sequentially. No async here.
The code
import time
def say(text, sec):
time.sleep(sec)
print(text)
def main():
start = time.monotonic()
say("First", 2)
say("Second", 1)
end = time.monotonic()
print(f"Elapsed: {end-start}")
main()
Output
First
Second
Elapsed: 3.0015416080132127
So it takes slightly more than 3 seconds to wait first for 2 and then for 1 second. No big surprise there.
Print async
- This is almost the same example as the previous one using sync, but we wait asynchronously.
- The order of the output is now different.
- It also finishes 1 sec faster. It finishes when the longest wait ends.
What did we don?
- We added
asyncin-front of the function definitions to make them co-routines. - We replaced the
time.sleepbyasyncio.sleepthat can handle async sleep. - We called this new
sleepfunction with theawaitkeyword. That tells the even-loop that other tasks can run till this thing we are awaiting-for finishes. - We called the
sayfunction inside anawait-ed call toasyncio.gather. - We started the event loop with
asyncio.run.
Code
import time
import asyncio
async def say(text, sec):
await asyncio.sleep(sec)
print(text)
async def main():
print('start main')
start = time.monotonic()
await asyncio.gather(
say("First", 2),
say("Second", 1),
)
end = time.monotonic()
print(f"Elapsed: {end-start}")
main_co = main()
print(main_co)
asyncio.run(main_co)
Output
<coroutine object main at 0x78265ad9a4d0>
start main
Second
First
Elapsed: 2.0022734529920854
The first print shows that what the main function returns is a object of type coroutine.
The "Second" print appears before the "First", because the former only had to wait 1 second why the latter waited 2 seconds.
- asyncio
- async
- await
- gather
Sync sleep
-
Let's see the same two examples, but now we also print progress in the
sayfunction. -
First let's see the sequential version.
-
We can see that the first task finishes before the second task starts.
import time
def say(wid, sec):
start = time.monotonic()
print(f"Starting {wid} that will take {sec}s")
time.sleep(sec)
end = time.monotonic()
print(f"Finishing {wid} in {end-start}s")
def main():
start = time.monotonic()
say("First", 2),
say("Second", 1)
end = time.monotonic()
print(f"Elapsed: {end-start}")
main()
Starting First that will take 2s
Finishing First in 2.002233584993519s
Starting Second that will take 1s
Finishing Second in 1.0011189789511263s
Elapsed: 3.003522119950503
Async sleep
-
In the asynchronouse version we can see them both start and then they get to finish after waiting the appropriate time.
-
The first task took longer and thus ended after the second task.
-
One drawback of this mode of operation using
gatheris that we need to know the exact calls up-front.
import time
import asyncio
async def say(wid, sec):
start = time.monotonic()
print(f"Starting {wid} that will take {sec}s")
await asyncio.sleep(sec)
end = time.monotonic()
print(f"Finishing {wid} in {end-start}s")
async def main():
start = time.monotonic()
await asyncio.gather(
say("First", 2),
say("Second", 1)
)
end = time.monotonic()
print(f"Elapsed: {end-start}")
asyncio.run(main())
Starting First that will take 2s
Starting Second that will take 1s
Finishing Second in 1.0006165504455566s
Finishing First in 2.002072811126709s
Elapsed: 2.002306482056156
Sync tasks in loop
In this example too we'll use sleep to pretend we are waiting for some external task to finish, but this time we'll start a number of jobs based on what the user supplies.
We can't know up-front how many tasks we'll have to call.
Code
import time
import sys
def do_task(task_id: int, sec: int):
print(f"Start task {task_id}")
time.sleep(sec)
print(f"End task {task_id}")
def main():
if len(sys.argv) != 2:
exit(f"Usage {sys.argv[0]} NUMBER")
for i in range(int(sys.argv[1])):
do_task(i, 1)
start = time.monotonic()
main()
end = time.monotonic()
print(f"Elapsed time: {end-start}")
Output
Start task 0
End task 0
Start task 1
End task 1
Start task 2
End task 2
Start task 3
End task 3
Elapsed time: 4.000992348068394
As one could expect from such code, the total time required for such program to run is the sum of all the tasks as they run sequentially.
Async sleep in loop
-
create_task
-
4 sleeping in parallel will be much faster
-
This time we used
create_taskto set up the tasks ahead of time and the we awaited all of them.
import time
import sys
import asyncio
async def do_task(task_id: int, sec: int):
print(f"Start {task_id}")
await asyncio.sleep(sec)
print(f"End {task_id}")
async def main():
print("Main started")
if len(sys.argv) != 2:
exit(f"Usage {sys.argv[0]} NUMBER")
co_routines = []
for i in range(int(sys.argv[1])):
co_routines.append(do_task(i, 1))
print("Tasks created")
for t in co_routines:
await t
print("Main ended")
start = time.monotonic()
asyncio.run(main())
end = time.monotonic()
print(f"Elapsed {end-start}")
Main started
Tasks created
Start 0
End 0
Start 1
End 1
Start 2
End 2
Start 3
End 3
Main ended
Elapsed 4.004389520036057
Async sleep in loop with gather
- gather
import time
import asyncio
async def sleep(cnt, sec):
print(f"Start {cnt}")
await asyncio.sleep(sec)
print(f"End {cnt}")
async def main():
co_routines = []
for i in range(4):
co_routines.append(sleep(i, 1))
await asyncio.gather(*co_routines)
start = time.monotonic()
asyncio.run(main())
end = time.monotonic()
print(f"Elapsed {end-start}")
Start 0
Start 1
Start 2
Start 3
End 0
End 1
End 2
End 3
Elapsed 1.0018720626831055
coroutines
import asyncio
async def answer():
print("start to answer")
return 42
async def main():
a_coroutine = answer()
print(a_coroutine)
await asyncio.sleep(0)
print('before await for coroutine')
result = await a_coroutine
print(f"result is {result} after await")
asyncio.run(main())
<coroutine object answer at 0x7f8d06e6d240>
before await for coroutine
start to answer
result is 42 after await
Async Tasks
import asyncio
async def answer():
print("start to answer")
return 42
async def main():
a_task = asyncio.create_task(answer())
print(a_task)
await asyncio.sleep(0)
print('before await for task')
result = await a_task
print(f"result is {result} after await")
asyncio.run(main())
<Task pending name='Task-2' coro=<answer() running at async_task.py:3>>
start to answer
before await for task
result is 42 after await
Count Async
import time
import asyncio
async def count(name, end, sec):
for i in range(end):
print(f"{name} {i}")
await asyncio.sleep(sec)
async def main():
await asyncio.gather(
count('apple', 10, 0.1),
count('peach', 10, 0.2),
)
start = time.time()
asyncio.run(main())
end = time.time()
print(f"Elapsed {end-start}")
apple 0
peach 0
apple 1
peach 1
apple 2
apple 3
peach 2
apple 4
apple 5
peach 3
apple 6
apple 7
peach 4
apple 8
apple 9
peach 5
peach 6
peach 7
peach 8
peach 9
Elapsed 2.010511636734009
Passing the baton while sleeping 0 sec
import asyncio
async def count(name):
print(f"start {name}")
for cnt in range(10):
print(f"{name} {cnt}")
await asyncio.sleep(0)
async def main():
a_task = asyncio.create_task(count("A"))
b_task = asyncio.create_task(count("B"))
print("Before")
#await asyncio.sleep(1)
print("After")
await asyncio.gather(
a_task,
b_task
)
asyncio.run(main())
Async sleep in a queue
import time
import asyncio
import sys
#queue = [4, 3, 1, 2, 1, 7, 3, 4, 5, 1, 2, 1, 2, 3, 4]
queue = [1] * 10
print(f"Total: {sum(queue)}")
parallel = 1
if len(sys.argv) == 2:
parallel = int(sys.argv[1])
async def worker(wid):
while queue:
job = queue.pop(0)
ts = time.monotonic()
print(f"Worker {wid} starting job {job} {ts}")
await asyncio.sleep(job)
print(f"Worker {wid} finished job {job}")
async def main():
tasks = []
for wid in range(parallel):
tasks.append( asyncio.create_task(worker(wid)) )
await asyncio.gather(*tasks)
start = time.monotonic()
print(f"Start {start}")
asyncio.run(main())
end = time.monotonic()
print(f"Elapsed: {end-start}")
Async http
import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, 'http://python.org')
print(html)
print("OK")
asyncio.run(main())
Sync http requests
import requests
import sys
from bs4 import BeautifulSoup
def fetch(url):
response = requests.get(url)
content = response.text
return content
def main():
if len(sys.argv) < 2:
exit(f"Usage: {sys.argv[0]} FILENAME") # examples/parallel/url.txt
filename = sys.argv[1]
with open(filename) as fh:
urls = list(map(lambda url: url.rstrip('\n'), fh.readlines()))
#urls = urls[0:10]
#urls = ['https://httpbin.org/get']
#print(urls)
tasks = []
for url in urls:
# tasks.append([url, fetch(url)])
# for task in tasks:
# url, content = task
content = fetch(url)
print(url)
#print(content)
soup = BeautifulSoup(content, 'html.parser')
print('Missing' if soup.title is None else soup.title.string)
#print(task)
main()
Async http requests
import aiohttp
import asyncio
import sys
from bs4 import BeautifulSoup
async def fetch(session, url):
async with session.get(url) as response:
content = await response.text()
return [url, content]
async def main():
if len(sys.argv) < 2:
exit(f"Usage: {sys.argv[0]} FILENAME") # examples/parallel/url.txt
filename = sys.argv[1]
with open(filename) as fh:
urls = list(map(lambda url: url.rstrip('\n'), fh.readlines()))
#urls = urls[0:6]
#urls = ['https://httpbin.org/get']
#print(urls)
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
tasks.append(asyncio.create_task(fetch(session, url)))
await asyncio.gather(*tasks)
for task in tasks:
url, content = task.result()
print(url)
#print(content)
soup = BeautifulSoup(content, 'html.parser')
print('Missing' if soup.title is None else soup.title.string)
#print(task)
asyncio.run(main())
Async http requests with queue
import aiohttp
import asyncio
import sys
from bs4 import BeautifulSoup
async def fetch(session, urls, results):
while urls:
url = urls.pop(0)
async with session.get(url) as response:
content = await response.text()
soup = BeautifulSoup(content, 'html.parser')
results[url] = None if soup.title is None else soup.title.string
async def main(parallel, urls):
results = {}
async with aiohttp.ClientSession() as session:
tasks = []
for _ in range(parallel):
tasks.append(asyncio.create_task(fetch(session, urls, results)))
await asyncio.gather(*tasks)
return results
def setup():
if len(sys.argv) < 4:
exit(f"Usage: {sys.argv[0]} LIMIT PARALLEL FILENAME") # examples/parallel/url.txt or wikipedia.txt
limit = int(sys.argv[1])
parallel = int(sys.argv[2])
filename = sys.argv[3]
with open(filename) as fh:
urls = list(map(lambda url: url.rstrip('\n'), fh.readlines()))
if len(urls) > limit:
urls = urls[0:limit]
# use asyncio.Queue instead
results = asyncio.run(main(parallel, urls))
for url, title in results.items():
print(f"{url} - {title}")
setup()
Sync chores
We have a number of household chores to do. Each takes a couple of seconds for a machine to do while we have time to do something else. We also have one task, cleaning potatoes, that requires our full attention. It is a CPU-intensive process.
We also have two processes depending each other. We can turn on the dryer only after the washing machine has finished.
import time
def boil_water(sec):
print(f"Start boiling water for {sec} seconds")
time.sleep(sec)
print(f"End boiling water for {sec} seconds")
def washing_machine(sec):
print("Start washing machine")
time.sleep(sec)
print("End washing machine")
def dryer(sec):
print("Start dryer")
time.sleep(sec)
print("End dryer")
def dishwasher(sec):
print("Start dishwasher")
time.sleep(sec)
print("End dishwasher")
def clean_potatoes(pieces):
print("Start cleaning potatoes")
for ix in range(pieces):
print(f"Cleaning potato {ix}")
time.sleep(0.5)
print("End cleaning potatoes")
def main():
dishwasher(3)
washing_machine(3)
dryer(3)
boil_water(4)
clean_potatoes(14)
start = time.time()
main()
end = time.time()
print(f"Elapsed {end-start}")
Start dishwasher
End dishwasher
Start washing machine
End washing machine
Start dryer
End dryer
Start boiling water for 4 seconds
End boiling water for 4 seconds
Start cleaning potatoes
Cleaning potato 0
Cleaning potato 1
Cleaning potato 2
Cleaning potato 3
Cleaning potato 4
Cleaning potato 5
Cleaning potato 6
Cleaning potato 7
Cleaning potato 8
Cleaning potato 9
Cleaning potato 10
Cleaning potato 11
Cleaning potato 12
Cleaning potato 13
End cleaning potatoes
Elapsed 20.017353534698486
Async chores
- async
- await
import time
import asyncio
async def boil_water(sec):
print(f"Start boiling water for {sec} seconds")
await asyncio.sleep(sec)
print(f"End boiling water for {sec} seconds")
async def washing_machine(sec):
print(f"Start washing machine for {sec} seconds")
await asyncio.sleep(sec)
print(f"End washing machine for {sec} seconds")
await dryer(3)
async def dryer(sec):
print(f"Start dryer for {sec} seconds")
await asyncio.sleep(sec)
print(f"End dryer for {sec} seconds")
async def dishwasher(sec):
print(f"Start dishwasher for {sec} seconds")
await asyncio.sleep(sec)
print(f"End dishwasher for {sec} seconds")
async def clean_potatoes(pieces):
print(f"Start cleaning potatoes for {pieces} pieces")
for ix in range(pieces):
print(f"Cleaning potato {ix}")
time.sleep(0.5)
#await asyncio.sleep(0.0001)
print(f"End cleaning potatoes for {pieces} pieces")
async def main():
await asyncio.gather(
dishwasher(3),
washing_machine(3),
boil_water(4),
clean_potatoes(14))
start = time.time()
asyncio.run(main())
end = time.time()
print(f"Elapsed {end-start}")
From the output you can see that we noticed that the washing machine has finished only after we have finished all the potatoes. That's becasue our potato cleaning process was a long-running CPU-intensive process. This means the dryer only starts working after the potatoes are clean.
Start dishwasher for 3 seconds
Start washing machine for 3 seconds
Start boiling water for 4 seconds
Start cleaning potatoes for 14 pieces
Cleaning potato 0
Cleaning potato 1
Cleaning potato 2
Cleaning potato 3
Cleaning potato 4
Cleaning potato 5
Cleaning potato 6
Cleaning potato 7
Cleaning potato 8
Cleaning potato 9
Cleaning potato 10
Cleaning potato 11
Cleaning potato 12
Cleaning potato 13
End cleaning potatoes for 14 pieces
End dishwasher for 3 seconds
End washing machine for 3 seconds
Start dryer for 3 seconds
End boiling water for 4 seconds
End dryer for 3 seconds
Elapsed 10.01340126991272
If after cleaning each potato we look up for a fraction of a second, if we let the main loop run, then we can notice that the washing machine has ended and we can turn on the dryer before continuing with the next potato. This will allow the dryer to work while we are still cleaning the potatoes.
Start dishwasher for 3 seconds
Start washing machine for 3 seconds
Start boiling water for 4 seconds
Start cleaning potatoes for 14 pieces
Cleaning potato 0
Cleaning potato 1
Cleaning potato 2
Cleaning potato 3
Cleaning potato 4
Cleaning potato 5
End dishwasher for 3 seconds
End washing machine for 3 seconds
Start dryer for 3 seconds
Cleaning potato 6
Cleaning potato 7
End boiling water for 4 seconds
Cleaning potato 8
Cleaning potato 9
Cleaning potato 10
Cleaning potato 11
End dryer for 3 seconds
Cleaning potato 12
Cleaning potato 13
End cleaning potatoes for 14 pieces
Elapsed 7.02296781539917
Async more
Explanation
-
The feeling of parallelism
-
Coroutines
-
async/await
-
Asynchronous
-
non-blocking or synchronous vs blocking (aka "normal")
Coroutines
-
Functions that can be suspended mid-way and allow other functions to run (a generator)
-
async defis a native coroutine or asynchronous generator -
async with -
async for
More about asyncio
- AsyncIO in Real Python
- asyncio
- aiohttp
- aiodns
- cchardet
- uvloop drop-in replacement for the standard Python event-loop
Other libraries:
aiofiles https://pypi.org/project/aiofiles/
- mongodb: umongo called micromongo
- postgresql: asyncpg
- mysql:
- redis: asyncio-redis
- sqlite:
unsync
- unsync
- Can seemlessly run tasks in async, thread, or fork so different tasks can have the appropriate solution (e.g. CPU bound processes can use fork.)
Async files
import tempfile
import os
def save_file(filename, size):
with open(filename, 'w') as f:
for _ in range(size):
data = ''
for _ in range(1000):
data += 'xxxxxxxxxxx'
f.write(data)
def main():
tempdir = tempfile.mkdtemp()
for idx in range(300):
filename = os.path.join(tempdir, f'{idx}.txt')
save_file(filename, 100)
main()
import asyncio
import aiofiles
import tempfile
import os
async def save_file(filename, size):
async with aiofiles.open(filename, 'w') as f:
for _ in range(size):
#data = 'x' * 1000000
data = ''
for _ in range(1000):
data += 'xxxxxxxxxxx'
await f.write(data)
async def main():
tempdir = tempfile.mkdtemp()
tasks = []
for idx in range(300):
filename = os.path.join(tempdir, f'{idx}.txt')
tasks.append(asyncio.create_task(save_file(filename, 10)))
await asyncio.gather(*tasks)
#print(os.listdir(tempdir))
asyncio.run(main())
Async example
import asyncio
import time
async def say_after(delay, what):
print(f"start {what}")
await asyncio.sleep(delay)
print(what)
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
await asyncio.sleep(0)
print(f"started at {time.strftime('%X')}")
# Wait until both tasks are completed (should take
# around 2 seconds.)
await task1
await task2
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
Asynchronus programming with Twisted
About Twisted
Echo
from twisted.internet import protocol,reactor
port = 8000
class Echo(protocol.Protocol):
def dataReceived(self, data):
text = data.decode('utf8')
print(f"Received: {text}")
self.transport.write("You said: {}".format(text).encode('utf8'))
class EchoFactory(protocol.Factory):
def buildProtocol(self, addr):
return Echo()
print(f"Listening on port {port}")
reactor.listenTCP(port, EchoFactory())
reactor.run()
from twisted.internet import reactor,protocol
import sys
if len(sys.argv) < 2:
exit("Usage: {sys.argv[0]} TEXT")
message = sys.argv[1]
port = 8000
class EchoClient(protocol.Protocol):
def connectionMade(self):
self.transport.write(message.encode('utf8'))
def dataReceived(self, data):
print(f"Server said: {data}")
self.transport.loseConnection()
class EchoFactory(protocol.ClientFactory):
def buildProtocol(self, addr):
return EchoClient()
def clientConnectionFailed(self, connector, reason):
print("connection failed")
reactor.stop()
def clientConnectionLost(self, connector, reason):
print("connection lost")
reactor.stop()
reactor.connectTCP("localhost", port, EchoFactory())
reactor.run()
Echo with log
from twisted.internet import protocol,reactor
port = 8000
class Echo(protocol.Protocol):
def dataReceived(self, data):
print("Received: {}".format(data))
self.transport.write(data)
class EchoFactory(protocol.Factory):
def buildProtocol(self, addr):
print(f"Contection established with {addr}")
return Echo()
print(f"Started to listen on port {port}")
reactor.listenTCP(port, EchoFactory())
reactor.run()
Simple web client
The code behind this example was deprecated. Need to be fixed.
- getPage() returns a "deferred"
- addCallbacks(on_success, on_failure)
- addBoth(on_both) adds callbock to both success and failure callback chain
from twisted.internet import reactor
from twisted.web.client import getPage
import sys
def printPage(result):
print("Page")
print('Size of the returned page is {}'.format(len(result)))
def printError(error):
print("Error")
print(f"Error: {error}")
#sys.stderr.write(error)
def stop(result):
print('stop')
reactor.stop()
if (len(sys.argv) != 2):
sys.stderr.write("Usage: python " + sys.argv[0] + " <URL>\n")
exit(1)
d = getPage(sys.argv[1])
d.addCallbacks(printPage, printError)
d.addBoth(stop)
reactor.run()
# getPage(sys.argv[1], method='POST', postdata="My test data").
Web client
from twisted.internet import reactor
from twisted.web.client import getPage
import sys
import re
import time
queue = [
'http://docs.python.org/3/',
'http://docs.python.org/3/whatsnew/3.3.html',
'http://docs.python.org/3/tutorial/index.html',
'http://docs.python.org/3/library/index.html',
'http://docs.python.org/3/reference/index.html'
'http://docs.python.org/3/howto/index.html',
'http://docs.python.org/3/howto/pyporting.html',
'http://docs.python.org/3/howto/cporting.html',
'http://docs.python.org/3/howto/curses.html',
'http://docs.python.org/3/howto/descriptor.html',
'http://docs.python.org/3/howto/functional.html',
'http://docs.python.org/3/howto/logging.html',
'http://docs.python.org/3/howto/logging-cookbook.html',
'http://docs.python.org/3/howto/regex.html',
'http://docs.python.org/3/howto/sockets.html',
'http://docs.python.org/3/howto/sorting.html',
'http://docs.python.org/3/howto/unicode.html',
'http://docs.python.org/3/howto/urllib2.html',
'http://docs.python.org/3/howto/webservers.html',
'http://docs.python.org/3/howto/argparse.html',
'http://docs.python.org/3/howto/ipaddress.html',
]
max_parallel = 3
current_parallel = 0
if len(sys.argv) == 2:
max_parallel = int(sys.argv[1])
def printPage(result):
print("page size: ", len(result))
global current_parallel
current_parallel -= 1
print("current_parallel: ", current_parallel)
#urls = re.findall(r'href="([^"]+)"', result)
#for u in urls:
# queue.append(u)
#queue.extend(urls)
process_queue()
def printError(error):
print("Error: ", error)
global current_parallel
current_parallel -= 1
process_queue()
def stop(result):
reactor.stop()
def process_queue():
global current_parallel, max_parallel,queue
print("process_queue cs: {} max: {}".format(current_parallel, max_parallel))
while True:
if current_parallel >= max_parallel:
print("No empty slot")
return
if len(queue) == 0:
print("queue is empty")
if current_parallel == 0:
reactor.stop()
return
url = queue[0] + '?' + str(time.time())
queue[0:1] = []
current_parallel += 1
d = getPage(url)
d.addCallbacks(printPage, printError)
process_queue()
reactor.run()
print("----done ---")