Python in Parallel
Parallel
Types of Problems
- CPU intensive application - use more of the cores to reduce the wallclock time.
- IO intensive applications - don't waste the CPU and wallclock time while waiting for the IO process.
- Interactive applications - make sure they are responsive during long operations.
Types of solutions
- Number of processes (forking on Unix or spawning) (called multiprocessing in Python)
- Number of threads (Single threaded vs Multi-threaded)
- Asynchronous, non-blocking or synchronous vs blocking (aka "normal") Cooperative Multitasking
Time slicing
-
While one program waits for IO, other program can run. This is called time-slicing.
-
No Multitasking (e.g. MS DOS)
-
Cooperative multitasking (program gives up the CPU by going into wait-state) (MS Windows 3.1) (if not, freezing)
-
Pre-emptive multitasking (OS is responsible to switch between running processes) (Unix, Linux, Windows 95/NT and later)
Concurrencies
-
Trival case when there is no shared data
-
Shared data (Syncing processes, memory allocation, scheduling)
-
Deadlocks: when two processes are waiting for each other
-
Resource starvation: running out of memory, disk space, process count
How many parallels to use?
-
First of all, I call them "parallels" as this applies to forks, threads, spawns, and even to async code.
-
Overhead of creating new parallel.
-
Overhead of communication (sending job input to parallel, receiving results).
-
Total number of items to process.
-
Time it takes to process an item.
-
Distribution of processing times. (e.g. one long and many short jobs.)
-
Number of cores (CPUs).
Dividing jobs
-
N items to process
-
K in parallel
-
Divide the items in K groups of size int(N/K) and int(N/K)+1.
-
Create K parallels with one item each. When it is done, give it another item.
-
Create K parallels with one item each. When done let it stop and create a new parallel.
Performance Monitoring
- Linux, OSX: htop
- Windows: Performance Monitor
Threads
Python Threading docs
Threaded counters
- threading
- Thread
- run
import threading
import sys
class ThreadedCount(threading.Thread):
def run(self):
for cnt in range(6):
print(f"{cnt} {threading.current_thread().name}")
return
a = ThreadedCount()
b = ThreadedCount()
c = ThreadedCount()
a.start()
b.start()
c.start()
print('main - Running {} threads'.format(threading.active_count()))
a.join()
b.join()
c.join()
print("main - thread is done")
0 Thread-1
1 Thread-1
0 Thread-2
2 Thread-1
1 Thread-2
0 Thread-3
3 Thread-1
2 Thread-2
main - Running 4 threads
3 Thread-2
1 Thread-3
4 Thread-2
2 Thread-3
5 Thread-2
3 Thread-3
4 Thread-1
4 Thread-3
5 Thread-1
5 Thread-3
main - thread is done
Simple threaded counters
- threading
- Thread
- run
import threading
import sys
class ThreadedCount(threading.Thread):
def run(self):
thread = threading.current_thread()
print('{} - start'.format(thread.name))
for c in range(10):
print('{} - count {}'.format(thread.name, c))
print('{} - end'.format(thread.name))
return
a = ThreadedCount()
b = ThreadedCount()
c = ThreadedCount()
a.start()
b.start()
c.start()
print('main - running {} threads'.format(threading.active_count()))
a.join()
b.join()
c.join()
print("main - thread is done")
Thread-1 - start
Thread-1 - count 0
Thread-1 - count 1
Thread-2 - start
Thread-1 - count 2
Thread-2 - count 0
Thread-1 - count 3
Thread-3 - start
main - running 4 threads
Thread-2 - count 1
Thread-1 - count 4
Thread-2 - count 2
Thread-1 - count 5
Thread-2 - count 3
Thread-1 - count 6
Thread-2 - count 4
Thread-1 - count 7
Thread-2 - count 5
Thread-1 - count 8
Thread-2 - count 6
Thread-1 - count 9
Thread-2 - count 7
Thread-1 - end
Thread-2 - count 8
Thread-2 - count 9
Thread-2 - end
Thread-3 - count 0
Thread-3 - count 1
Thread-3 - count 2
Thread-3 - count 3
Thread-3 - count 4
Thread-3 - count 5
Thread-3 - count 6
Thread-3 - count 7
Thread-3 - count 8
Thread-3 - count 9
Thread-3 - end
main - thread is done
Simple threaded counters (parameterized)
The same as the previous one, but with parameters controlling the numbers of threads and the range of the counter.
import threading
import sys
num_threads, count_till = 3, 5
class ThreadedCount(threading.Thread):
def run(self):
thread = threading.current_thread()
print(f'{thread.name} - start')
for cnt in range(count_till):
print(f'{thread.name} - count {cnt}')
print(f'{thread.name} - end')
return
threads = []
for ix in range(num_threads):
threads.append(ThreadedCount())
for th in threads:
th.start()
print('main - running {} threads'.format(threading.active_count()))
for th in threads:
th.join()
print("main - thread is done")
Thread-1 - start
Thread-1 - count 0
Thread-1 - count 1
Thread-1 - count 2
Thread-1 - count 3
Thread-1 - count 4
Thread-1 - end
Thread-2 - start
Thread-2 - count 0
Thread-2 - count 1
Thread-2 - count 2
Thread-2 - count 3
Thread-2 - count 4
Thread-2 - end
Thread-3 - start
Thread-3 - count 0
Thread-3 - count 1
Thread-3 - count 2
Thread-3 - count 3
Thread-3 - count 4
Thread-3 - end
main - running 1 threads
main - thread is done
Pass parameters to threads - Counter with attributes
- threading
- Thread
- init
import threading
import sys
class ThreadedCount(threading.Thread):
def __init__(self, name, start, stop):
super().__init__()
self.name = name
self.counter = start
self.limit = stop
print('__init__ of {} in {}'.format(self.name, threading.current_thread()))
def run(self):
print('start run of {} in {}'.format(self.name, threading.current_thread()))
while self.counter < self.limit:
print('count {} of {}'.format(self.name, self.counter))
self.counter += 1
print('end run of {} in {}'
.format(self.name, threading.current_thread()))
return
foo = ThreadedCount("Foo", 1, 11)
bar = ThreadedCount("Bar", 1, 11)
foo.start()
bar.start()
print('main - running {} threads'.format(threading.active_count()))
foo.join()
bar.join()
print("main - thread is done")
__init__ of Foo in <_MainThread(MainThread, started 139645405484864)>
__init__ of Bar in <_MainThread(MainThread, started 139645405484864)>
start run of Foo in <ThreadedCount(Foo, started 139645391374080)>
count Foo of 1
count Foo of 2
start run of Bar in <ThreadedCount(Bar, started 139645382981376)>
count Bar of 1
main - running 3 threads
count Foo of 3
count Bar of 2
count Foo of 4
count Bar of 3
count Foo of 5
count Bar of 4
count Foo of 6
count Bar of 5
count Foo of 7
count Bar of 6
count Foo of 8
count Bar of 7
count Foo of 9
count Bar of 8
count Foo of 10
count Bar of 9
end run of Foo in <ThreadedCount(Foo, started 139645391374080)>
count Bar of 10
end run of Bar in <ThreadedCount(Bar, started 139645382981376)>
main - thread is done
Create a central counter
import threading
import sys
import time
cnt = 0
num = 30
limit = 100000
class ThreadedCount(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.counter = 0
def run(self):
global cnt
while self.counter < limit:
self.counter += 1
cnt += 1
return
start = time.time()
threads = [ ThreadedCount() for n in range(num) ]
[ t.start() for t in threads ]
[ t.join() for t in threads ]
end = time.time()
print("Expected: {}".format(num * limit))
print("Received: {}".format(cnt))
print("Elapsed: {}".format(end-start))
# Expected: 3000000
# Received: 2659032
# Elapsed: 0.437514066696167
Lock - acquire - release
- Lock
- acquire
- release
import threading
import sys
import time
cnt = 0
num = 30
limit = 100000
locker = threading.Lock()
class ThreadedCount(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.counter = 0
def run(self):
global cnt
while self.counter < limit:
self.counter += 1
locker.acquire()
cnt += 1
locker.release()
return
start = time.time()
threads = [ ThreadedCount() for n in range(num) ]
[ t.start() for t in threads ]
[ t.join() for t in threads ]
end = time.time()
print("Expected: {}".format(num * limit))
print("Received: {}".format(cnt))
print("Elapsed: {}".format(end-start))
# Expected: 3000000
# Received: 3000000
# Elapsed: 12.333643198013306
Counter - plain
import sys
import time
cnt = 0
num = 30
limit = 100000
class Count():
def __init__(self):
self.counter = 0
def run(self):
global cnt
while self.counter < limit:
self.counter += 1
cnt += 1
return
start = time.time()
for _ in range(num):
c = Count()
c.run()
end = time.time()
print("Expected: {}".format(num * limit))
print("Received: {}".format(cnt))
print("Elapsed: {}".format(end-start))
# Expected: 3000000
# Received: 3000000
# Elapsed: 0.4130408763885498
GIL - Global Interpreter Lock
-
Solves the problem introduced by having reference count.
-
Not going away any time soon.
-
CPython and PyPy have it.
-
Jython and IronPython don't have it.
-
See PEP 554 - Multiple Interpreters in the Stdlib for C extensions.
Thread load
import threading
import sys
import time
import random
results = []
locker = threading.Lock()
class ThreadedCount(threading.Thread):
def __init__(self, n):
threading.Thread.__init__(self)
self.n = n
def run(self):
count = 0
total = 0
while count < 40000000 / self.n:
rnd = random.random()
total += rnd
count += 1
locker.acquire()
results.append({'count': count, 'total': total})
locker.release()
return
def main():
if len(sys.argv) != 2:
exit("Usage: {} POOL_SIZE")
size = int(sys.argv[1])
start = time.time()
threads = [ ThreadedCount(n=size) for i in range(size) ]
[ t.start() for t in threads ]
[ t.join() for t in threads ]
print("Results: {}".format(results))
totals = map(lambda r: r['total'], results)
print("Total: {}".format(sum(totals)))
end = time.time()
print(end - start)
if __name__ == '__main__':
main()
$ time python thread_load.py 1
Results: [{'count': 40000000, 'total': 19996878.531261113}]
Total: 19996878.531261113
6.478948354721069
real 0m6.539s
user 0m6.491s
sys 0m0.012s
$ time python thread_load.py 4
Results: [{'count': 10000000, 'total': 5000680.7382364655}, {'count': 10000000, 'total': 5000496.15077697}, {'count': 10000000, 'total': 5000225.747780174}, {'count': 10000000, 'total': 4999503.803068357}]
Total: 20000906.43986197
6.180345296859741
real 0m6.241s
user 0m6.283s
sys 0m0.029s
Exercise: thread files
- Get a list of files (from the current directory or from all the files in the "slides" repository.
- Process each file:
-
- get size of file
-
- count how many times each character appear in the file.
- The script should accept the number of threads to use.
Exercise: thread URL requests.
In the following script we fetch the URLs listed in a file:
{% embed include file="src/examples/parallel/urls.txt)
It takes about 1.5-2 sec / URL from home. (It depends on a lot of factors including your network connection.)
import time
import requests
import sys
from bs4 import BeautifulSoup
def get_urls(limit):
with open('urls.txt') as fh:
urls = list(map(lambda line: line.rstrip("\n"), fh))
if len(urls) > limit:
urls = urls[:limit]
return urls
def get_title(url):
try:
resp = requests.get(url)
if resp.status_code != 200:
return None, f"Incorrect status_code {resp.status_code} for {url}"
except Exception as err:
return None, f"Error: {err} for {url}"
soup = BeautifulSoup(resp.content, 'html.parser')
return soup.title.string, None
def main():
if len(sys.argv) < 2:
exit(f"Usage: {sys.argv[0]} LIMIT")
limit = int(sys.argv[1])
urls = get_urls(limit)
print(urls)
start = time.time()
titles = []
for url in urls:
#print(f"Processing {url}")
title, err = get_title(url)
if err:
print(err)
else:
print(title)
titles.append({
"url": url,
"title": title,
"err": err,
})
end = time.time()
print("Elapsed time: {} for {} pages.".format(end-start, len(urls)))
print(titles)
if __name__ == '__main__':
main()
Create a version of the above script that can use K threads.
Exercise: thread queue
Write an application that handles a queue of jobs in N=5 threads. Each job contains a number between 0-5. Each thread takes the next element from the queue and sleeps for the given amount of second (as an imitation of actual work it should be doing). When finished it checks for another job. If there are no more jobs in the queue, the thread can close itself.
import threading
import random
import sys
thread_count = 5
counter = 0
queue = map(lambda x: ('main', random.randrange(5)), range(20))
print(queue)
If that's done, change the code so that each thread will generate a random number between 0-5 (for sleep-time) and in 33% of the cases it will add it to the central queue as a new job.
Another extension to this exercise is to change the code to limit the number of jobs each thread can execute in its lifetime. When the thread has finished that many jobs it will quit and the main thread will create a new worker thread.
Solution: thread queue
import threading
import random
import sys
import time
thread_count = 5
counter = 0
queue = list(map(lambda x: ('main', random.randrange(5)), range(20)))
#print(queue)
locker = threading.Lock()
class ThreadedCount(threading.Thread):
def run(self):
global counter
my_counter = 0
thread = threading.current_thread()
print('{} - start thread'.format(thread.name))
while (True):
locker.acquire()
job = None
if len(queue) > 0:
counter += 1
my_counter += 1
job = queue[0]
queue[0:1] = []
locker.release()
if job == None:
print('{} - no more jobs'.format(thread.name))
break
print('{} - working on job {} ({}) from {} sleep for {}'
.format(thread.name, counter, my_counter, job[0], job[1]))
time.sleep(job[1])
return
threads = []
for i in range(thread_count):
threads.append(ThreadedCount())
for t in threads:
t.start()
for t in threads:
t.join()
Solution: thread URL requests.
import time
import threading
import requests
import sys
from bs4 import BeautifulSoup
from fetch_urls import get_urls, get_title
titles = []
locker = threading.Lock()
class GetURLs(threading.Thread):
def __init__(self, urls):
threading.Thread.__init__(self)
self.urls = urls
def run(self):
my_titles = []
for url in self.urls:
title, err = get_title(url)
my_titles.append({
'url': url,
'title': title,
'err': err,
})
locker.acquire()
titles.extend(my_titles)
locker.release()
return
def main():
if len(sys.argv) < 3:
exit(f"Usage: {sys.argv[0]} LIMIT THREADS")
limit = int(sys.argv[1])
threads_count = int(sys.argv[2])
urls = get_urls(limit)
print(urls)
start_time = time.time()
batch_size = int(limit/threads_count)
left_over = limit % threads_count
batches = []
end = 0
for ix in range(threads_count):
start = end
end = start + batch_size
if ix < left_over:
end += 1
batches.append(urls[start:end])
threads = [ GetURLs(batches[ix]) for ix in range(threads_count) ]
[ t.start() for t in threads ]
[ t.join() for t in threads ]
end_time = time.time()
print("Elapsed time: {} for {} pages.".format(end_time-start_time, len(urls)))
print(titles)
if __name__ == '__main__':
main()
Concurrency
import sys
import concurrent.futures
import requests
import time
def download_all(urls):
for url in urls:
res = requests.get(url)
print(f"{url} - {len(res.text)}")
return
if __name__ == '__main__':
if len(sys.argv) != 2:
exit(f"Usage: {sys.argv[0]} FILENAME (examples/parallel/urls.txt)")
filename = sys.argv[1]
with open(filename) as fh:
urls = list(map(lambda s: s.rstrip("\n"), fh.readlines()))
start = time.time()
download_all(urls)
end = time.time()
elapsed = end-start
print(f"Elapsed time : {elapsed:2f}")
import concurrent.futures
import requests
import threading
import time
import sys
from bs4 import BeautifulSoup
thread_local = threading.local()
def get_title(url):
res = requests.get(url)
if res.status_code:
soup = BeautifulSoup(res.text, 'html.parser')
return None if soup.title is None else soup.title.string
else:
return None
def main():
if len(sys.argv) < 4:
exit(f"{sys.argv[0]} LIMIT PARALLEL FILENAME")
limit = int(sys.argv[1])
parallel = int(sys.argv[2])
filename = sys.argv[3]
with open(filename) as fh:
urls = list(map(lambda line: line.rstrip("\n"), fh))
if len(urls) > limit:
urls = urls[0:limit]
#print(urls)
start = time.monotonic()
#titles = list(map(get_title, urls))
with concurrent.futures.ThreadPoolExecutor(max_workers=parallel) as exe:
titles = list(exe.map(get_title, urls))
for ix in range(len(urls)):
print(f"{urls[ix]} {titles[ix]}")
end = time.monotonic()
print(f"Elapsed time: {end-start}")
main()
Create a counter queue
- Queue
import threading
import Queue
class ThreadedCount(threading.Thread):
def __init__(self, name, start, stop):
threading.Thread.__init__(self)
self.name = name
self.counter = start
self.limit = stop
def run(self):
while self.counter < self.limit:
self.counter += 1
print(self.name, self.counter)
print(self.name , "finished")
return
queue = Queue()
foo = ThreadedCount("Foo", 1, 10)
bar = ThreadedCount("Bar", 1, 10)
foo.start()
bar.start()
print("main - running")
foo.join()
bar.join()
print("main - thread is done")
A Queue of tasks
from queue import Queue
from threading import Thread
def source():
"""Returning the list of tasks"""
return range(1, 10)
def do_work(item):
print("Working on item " + str(item) + "\n", end="")
# print("Working on item ", str(item))
# would show the output intermingled as the separate items of the print statement
# (even the trailing newline) might be printed only after context switch
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
def main():
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are done
num_worker_threads = 3
q = Queue()
main()
Forking
Fork
-
fork
-
getpid
-
getppid
-
wait
import os
import time
print('{} - start running'.format(os.getpid()))
pid = os.fork()
if not pid:
print('{} - in child. Parent is {}'.format(os.getpid(), os.getppid()))
time.sleep(1)
exit(3)
print('{} - in parent (child pid is {})'.format(os.getpid(), pid))
child_pid, exit_code = os.wait()
print('{} - Child with pid {} exited. Exit code {}'.format(os.getpid(), child_pid, exit_code))
print('Real exit code {}'.format(int(exit_code/256))) # The upper byte
print('Also known as {}'.format(exit_code >> 8)) # Right shift 8 bits
10278 - start running
10279 - in child. Parent is 10278
10278 - start running
10278 - in parent (child pid is 10279)
10278 - Child with pid 10279 exited. Exit code 768
Real exit code 3
Also known as 3
Forking
- fork
- wait
import os
import time
name = "common"
def child():
time.sleep(1)
print(f"In Child ({name}).")
print(f"In Child PID: {os.getpid()} PPID: {os.getppid()}")
time.sleep(5)
exit(3)
def parent(child_pid):
time.sleep(1)
print(f"In Parent ({name}) The child is: {child_pid}")
print(f"In Parent PID: {os.getpid()} PPID: {os.getppid()}")
r = os.wait()
print(r)
print(f'{os.getpid()} - start running')
pid = os.fork()
print(f'my pid: {os.getpid()} pid received from fork: {pid}')
if pid == 0:
child()
else:
parent(pid)
0
In Child of common
In Child PID: 11212 PPID: 11211
11212
In Parent (common) The child is: 11212
In Parent PID: 11211 PPID: 4195
(11212, 768)
Fork skeleton
import os
import glob
files = glob.glob("*.py")
# print(files)
count = len(files)
print(f"Number of items to process: {count}")
parallel = 4 # How many in parallel
batch = int(count/parallel)
leftover = count % parallel
print(f"batch size: {batch} leftover: {leftover}")
def parent(pid):
print(f"parent {pid}")
def child(files):
print(f"{os.getpid()} {files}")
exit()
end = 0
for ix in range(parallel):
start = end
end = start + batch
if ix < leftover:
end += 1
print(f"start={start} end={end}")
pid = os.fork()
if pid:
parent(pid)
else:
child(files[start:end])
print(f"In parent {os.getpid()}")
for ix in range(parallel):
r = os.wait()
print(r)
Fork with load
import os
import random
import sys
if len(sys.argv) != 2:
exit("Usage: {} N".format(sys.argv[0]))
n = int(sys.argv[1])
for p in range(0, n):
pid = os.fork()
if not pid:
print('In Child')
i = 0
while i < 40000000/n:
x = random.random()
y = random.random()
z = x+y
i += 1
exit(3)
print('In Parent of', pid)
for p in range(0, n):
r = os.wait()
print(r)
Fork load results
- time
$ time python fork_load.py 1
In Parent of 96355
In Child
(96355, 768)
real 0m26.391s
user 0m25.893s
sys 0m0.190s
$ time python fork_load.py 8
In Parent of 96372
In Parent of 96373
In Parent of 96374
In Child
In Child
In Parent of 96375
In Child
In Child
In Parent of 96376
In Child
In Parent of 96377
In Child
In Child
In Parent of 96378
In Parent of 96379
In Child
(96374, 768)
(96372, 768)
(96375, 768)
(96373, 768)
(96376, 768)
(96377, 768)
(96378, 768)
(96379, 768)
real 0m12.754s
user 0m45.196s
sys 0m0.164s
Marshalling / Serialization
Marshalling (or serialization) is the operation when we take an arbitrary data structure and convert it into a string in a way that we can convert the string back to the same data structure.
Marshalling can be used to save data persistent between execution of the same script, to transfer data between processes, or even between machines. In some cases it can be used to communicate between two processes written in different programming languages.
The marshal module provides such features but it is not recommended as it was built for internal object serialization for python.
The pickle module was designed for this task.
The json module can be used too.
Fork with random
When the random module is loaded it automatically calls random.seed()
to initialize the
random generator. When we create a fork this is not called again and thus all the processes
will return the same random numbers. We can fix this by calling random.seed()
manually.
import os, time, random
print('{} - start running'.format(os.getpid()))
pid = os.fork()
if not pid:
#random.seed()
print('{} - in child'.format(os.getpid()))
print(random.random())
time.sleep(1)
exit(3)
print('{} - in parent (child pid is {})'.format(os.getpid(), pid))
print(random.random())
done = os.wait()
print('{} - Child exited {}'.format(os.getpid(), done))
Exercise: fork return data
Create a script that will go over a list of numbers and does some computation on each number.
import sys
import time
from mymodule import calc
def main(n):
results = {}
print(f"do 1-{n}")
for ix in range(1, n):
results[ix] = calc(ix)
return results
if __name__ == '__main__':
if len(sys.argv) < 2:
exit(f"Usage: {sys.argv[0]} NUMBER")
start = time.time()
results = main(1+int(sys.argv[1]))
end = time.time()
total = sum(results.values())
print(f"Total: {total}")
print("Elapsed time: {}".format(end-start))
Allow the child process to return data to the parent process. Before exiting from the child process, serialize the data-structure you want to send back and save in a file that corresponds to the parent process and the child process. (eg. created from the PID of the paraent process and the PID of the child process) In the parent process, when one of the children exits, check if there is a file corresponding to this child process, read the file and de-serialize it.
Solution: fork return data
import sys
import os
import json
import time
from mymodule import calc
def child(start, end):
results = {}
for ix in range(start, end):
results[ix] = calc(ix)
filename = str(os.getpid()) + '.json'
with open(filename, 'w') as fh:
json.dump(results, fh)
exit()
def main(total_number, parallels):
results = {}
processes = []
a_range = int(total_number / parallels)
for cnt in range(parallels):
start = 1 + cnt * a_range
end = start + a_range
if cnt == parallels - 1:
end = total_number + 1
print(f"do: {start}-{end}")
pid = os.fork()
if pid:
processes.append(pid) # parent
else:
child(start, end)
for _ in range(len(processes)):
pid, exit_code = os.wait()
#print(pid, exit_code)
filename = str(pid) + '.json'
with open(filename) as fh:
res = json.load(fh)
print(f"{pid}: {res}")
results.update(res)
os.unlink(filename)
return results
if __name__ == '__main__':
if len(sys.argv) < 3:
exit(f"Usage: {sys.argv[0]} NUMBER PARALLEL")
start = time.time()
results = main(int(sys.argv[1]), int(sys.argv[2]))
print(f"results: {results}")
end = time.time()
total = sum(results.values())
print(f"Total: {total}")
print("Elapsed time: {}".format(end-start))
Asynchronous programming with AsyncIO
Use cases for Async
-
Good for IO intensive tasks
-
Downloading web pages
-
Accessing APIs
-
Accessing databases (especially on other servers)
-
Reading files
Event loop
-
Single thread
-
Single process
-
... so it uses a single core
-
Cooperative Multitasking
Print sync
- First let's see a few small and useless examples.
- In this example we wait and print sequentially. No async here.
import time
def say(text, sec):
time.sleep(sec)
print(text)
def main():
start = time.monotonic()
say("First", 2)
say("Second", 1)
end = time.monotonic()
print(f"Elapsed: {end-start}")
main()
First
Second
Elapsed: 3.0015416080132127
Print async
-
asyncio
-
async
-
await
-
gather
-
This is almost the same example but we wait in parallel.
-
The order of the output is now different.
-
It also finishes 1 sec faster. It finishes when the longest wait ends.
import time
import asyncio
async def say(text, sec):
await asyncio.sleep(sec)
print(text)
async def main():
print('start main')
start = time.monotonic()
await asyncio.gather(
say("First", 2),
say("Second", 1),
)
end = time.monotonic()
print(f"Elapsed: {end-start}")
main_co = main()
print(main_co)
asyncio.run(main_co)
Second
First
Elapsed: 2.002833483973518
Sync sleep
- The same, but now we also print progress in the say function. Still sequential.
import time
def say(wid, sec):
start = time.monotonic()
print(f"Starting {wid} that will take {sec}s")
time.sleep(sec)
end = time.monotonic()
print(f"Finishing {wid} in {end-start}s")
def main():
start = time.monotonic()
say("First", 2),
say("Second", 1)
end = time.monotonic()
print(f"Elapsed: {end-start}")
main()
Starting First that will take 2s
Finishing First in 2.002233584993519s
Starting Second that will take 1s
Finishing Second in 1.0011189789511263s
Elapsed: 3.003522119950503
Async sleep
- In Async we can see that they start in the right order, but then get to the finish line after waiting in parallel.
import time
import asyncio
async def say(wid, sec):
start = time.monotonic()
print(f"Starting {wid} that will take {sec}s")
await asyncio.sleep(sec)
end = time.monotonic()
print(f"Finishing {wid} in {end-start}s")
async def main():
start = time.monotonic()
await asyncio.gather(
say("First", 2),
say("Second", 1)
)
end = time.monotonic()
print(f"Elapsed: {end-start}")
asyncio.run(main())
Starting First that will take 2s
Starting Second that will take 1s
Finishing Second in 1.0006165504455566s
Finishing First in 2.002072811126709s
Elapsed: 2.002306482056156
Sync sleep in loop
import time
def sleep(cnt, sec):
print(f"Start {cnt}")
time.sleep(sec)
print(f"End {cnt}")
def main():
for i in range(4):
sleep(i, 1)
start = time.monotonic()
main()
end = time.monotonic()
print(f"Elapsed {end-start}")
Start 0
End 0
Start 1
End 1
Start 2
End 2
Start 3
End 3
Elapsed 4.004362344741821
Async sleep in loop
-
create_task
-
4 sleeping in parallel will be much faster
-
This time we used
create_task
to set up the tasks ahead of time and the we awaited all of them.
import time
import asyncio
async def sleep(cnt, sec):
print(f"Start {cnt}")
await asyncio.sleep(sec)
print(f"End {cnt}")
async def main():
co_routines = []
for i in range(4):
co_routines.append(sleep(i, 1))
for t in co_routines:
await t
start = time.monotonic()
asyncio.run(main())
end = time.monotonic()
print(f"Elapsed {end-start}")
Start 0
Start 1
Start 2
Start 3
End 0
End 1
End 2
End 3
Elapsed 1.0030033588409424
Async sleep in loop with gather
- gather
import time
import asyncio
async def sleep(cnt, sec):
print(f"Start {cnt}")
await asyncio.sleep(sec)
print(f"End {cnt}")
async def main():
co_routines = []
for i in range(4):
co_routines.append(sleep(i, 1))
await asyncio.gather(*co_routines)
start = time.monotonic()
asyncio.run(main())
end = time.monotonic()
print(f"Elapsed {end-start}")
Start 0
Start 1
Start 2
Start 3
End 0
End 1
End 2
End 3
Elapsed 1.0018720626831055
coroutines
import asyncio
async def answer():
print("start to answer")
return 42
async def main():
a_coroutine = answer()
print(a_coroutine)
await asyncio.sleep(0)
print('before await for coroutine')
result = await a_coroutine
print(f"result is {result} after await")
asyncio.run(main())
<coroutine object answer at 0x7f8d06e6d240>
before await for coroutine
start to answer
result is 42 after await
Async Tasks
import asyncio
async def answer():
print("start to answer")
return 42
async def main():
a_task = asyncio.create_task(answer())
print(a_task)
await asyncio.sleep(0)
print('before await for task')
result = await a_task
print(f"result is {result} after await")
asyncio.run(main())
<Task pending name='Task-2' coro=<answer() running at async_task.py:3>>
start to answer
before await for task
result is 42 after await
Count Async
import time
import asyncio
async def count(name, end, sec):
for i in range(end):
print(f"{name} {i}")
await asyncio.sleep(sec)
async def main():
await asyncio.gather(
count('apple', 10, 0.1),
count('peach', 10, 0.2),
)
start = time.time()
asyncio.run(main())
end = time.time()
print(f"Elapsed {end-start}")
apple 0
peach 0
apple 1
peach 1
apple 2
apple 3
peach 2
apple 4
apple 5
peach 3
apple 6
apple 7
peach 4
apple 8
apple 9
peach 5
peach 6
peach 7
peach 8
peach 9
Elapsed 2.010511636734009
Passing the baton while sleeping 0 sec
import asyncio
async def count(name):
print(f"start {name}")
for cnt in range(10):
print(f"{name} {cnt}")
await asyncio.sleep(0)
async def main():
a_task = asyncio.create_task(count("A"))
b_task = asyncio.create_task(count("B"))
print("Before")
#await asyncio.sleep(1)
print("After")
await asyncio.gather(
a_task,
b_task
)
asyncio.run(main())
Async sleep in a queue
import time
import asyncio
import sys
#queue = [4, 3, 1, 2, 1, 7, 3, 4, 5, 1, 2, 1, 2, 3, 4]
queue = [1] * 10
print(f"Total: {sum(queue)}")
parallel = 1
if len(sys.argv) == 2:
parallel = int(sys.argv[1])
async def worker(wid):
while queue:
job = queue.pop(0)
ts = time.monotonic()
print(f"Worker {wid} starting job {job} {ts}")
await asyncio.sleep(job)
print(f"Worker {wid} finished job {job}")
async def main():
tasks = []
for wid in range(parallel):
tasks.append( asyncio.create_task(worker(wid)) )
await asyncio.gather(*tasks)
start = time.monotonic()
print(f"Start {start}")
asyncio.run(main())
end = time.monotonic()
print(f"Elapsed: {end-start}")
Async http
import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, 'http://python.org')
print(html)
print("OK")
asyncio.run(main())
Sync http requests
import requests
import sys
from bs4 import BeautifulSoup
def fetch(url):
response = requests.get(url)
content = response.text
return content
def main():
if len(sys.argv) < 2:
exit(f"Usage: {sys.argv[0]} FILENAME") # examples/parallel/url.txt
filename = sys.argv[1]
with open(filename) as fh:
urls = list(map(lambda url: url.rstrip('\n'), fh.readlines()))
#urls = urls[0:10]
#urls = ['https://httpbin.org/get']
#print(urls)
tasks = []
for url in urls:
# tasks.append([url, fetch(url)])
# for task in tasks:
# url, content = task
content = fetch(url)
print(url)
#print(content)
soup = BeautifulSoup(content, 'html.parser')
print('Missing' if soup.title is None else soup.title.string)
#print(task)
main()
Async http requests
import aiohttp
import asyncio
import sys
from bs4 import BeautifulSoup
async def fetch(session, url):
async with session.get(url) as response:
content = await response.text()
return [url, content]
async def main():
if len(sys.argv) < 2:
exit(f"Usage: {sys.argv[0]} FILENAME") # examples/parallel/url.txt
filename = sys.argv[1]
with open(filename) as fh:
urls = list(map(lambda url: url.rstrip('\n'), fh.readlines()))
#urls = urls[0:6]
#urls = ['https://httpbin.org/get']
#print(urls)
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
tasks.append(asyncio.create_task(fetch(session, url)))
await asyncio.gather(*tasks)
for task in tasks:
url, content = task.result()
print(url)
#print(content)
soup = BeautifulSoup(content, 'html.parser')
print('Missing' if soup.title is None else soup.title.string)
#print(task)
asyncio.run(main())
Async http requests with queue
import aiohttp
import asyncio
import sys
from bs4 import BeautifulSoup
async def fetch(session, urls, results):
while urls:
url = urls.pop(0)
async with session.get(url) as response:
content = await response.text()
soup = BeautifulSoup(content, 'html.parser')
results[url] = None if soup.title is None else soup.title.string
async def main(parallel, urls):
results = {}
async with aiohttp.ClientSession() as session:
tasks = []
for _ in range(parallel):
tasks.append(asyncio.create_task(fetch(session, urls, results)))
await asyncio.gather(*tasks)
return results
def setup():
if len(sys.argv) < 4:
exit(f"Usage: {sys.argv[0]} LIMIT PARALLEL FILENAME") # examples/parallel/url.txt or wikipedia.txt
limit = int(sys.argv[1])
parallel = int(sys.argv[2])
filename = sys.argv[3]
with open(filename) as fh:
urls = list(map(lambda url: url.rstrip('\n'), fh.readlines()))
if len(urls) > limit:
urls = urls[0:limit]
# use asyncio.Queue instead
results = asyncio.run(main(parallel, urls))
for url, title in results.items():
print(f"{url} - {title}")
setup()
Sync chores
We have a number of household chores to do. Each takes a couple of seconds for a machine to do while we have time to do something else. We also have one task, cleaning potatoes, that requires our full attention. It is a CPU-intensive process.
We also have two processes depending each other. We can turn on the dryer only after the washing machine has finished.
import time
def boil_water(sec):
print(f"Start boiling water for {sec} seconds")
time.sleep(sec)
print(f"End boiling water for {sec} seconds")
def washing_machine(sec):
print("Start washing machine")
time.sleep(sec)
print("End washing machine")
def dryer(sec):
print("Start dryer")
time.sleep(sec)
print("End dryer")
def dishwasher(sec):
print("Start dishwasher")
time.sleep(sec)
print("End dishwasher")
def clean_potatoes(pieces):
print("Start cleaning potatoes")
for ix in range(pieces):
print(f"Cleaning potato {ix}")
time.sleep(0.5)
print("End cleaning potatoes")
def main():
dishwasher(3)
washing_machine(3)
dryer(3)
boil_water(4)
clean_potatoes(14)
start = time.time()
main()
end = time.time()
print(f"Elapsed {end-start}")
Start dishwasher
End dishwasher
Start washing machine
End washing machine
Start dryer
End dryer
Start boiling water for 4 seconds
End boiling water for 4 seconds
Start cleaning potatoes
Cleaning potato 0
Cleaning potato 1
Cleaning potato 2
Cleaning potato 3
Cleaning potato 4
Cleaning potato 5
Cleaning potato 6
Cleaning potato 7
Cleaning potato 8
Cleaning potato 9
Cleaning potato 10
Cleaning potato 11
Cleaning potato 12
Cleaning potato 13
End cleaning potatoes
Elapsed 20.017353534698486
Async chores
- async
- await
import time
import asyncio
async def boil_water(sec):
print(f"Start boiling water for {sec} seconds")
await asyncio.sleep(sec)
print(f"End boiling water for {sec} seconds")
async def washing_machine(sec):
print(f"Start washing machine for {sec} seconds")
await asyncio.sleep(sec)
print(f"End washing machine for {sec} seconds")
await dryer(3)
async def dryer(sec):
print(f"Start dryer for {sec} seconds")
await asyncio.sleep(sec)
print(f"End dryer for {sec} seconds")
async def dishwasher(sec):
print(f"Start dishwasher for {sec} seconds")
await asyncio.sleep(sec)
print(f"End dishwasher for {sec} seconds")
async def clean_potatoes(pieces):
print(f"Start cleaning potatoes for {pieces} pieces")
for ix in range(pieces):
print(f"Cleaning potato {ix}")
time.sleep(0.5)
#await asyncio.sleep(0.0001)
print(f"End cleaning potatoes for {pieces} pieces")
async def main():
await asyncio.gather(
dishwasher(3),
washing_machine(3),
boil_water(4),
clean_potatoes(14))
start = time.time()
asyncio.run(main())
end = time.time()
print(f"Elapsed {end-start}")
From the output you can see that we noticed that the washing machine has finished only after we have finished all the potatoes. That's becasue our potato cleaning process was a long-running CPU-intensive process. This means the dryer only starts working after the potatoes are clean.
Start dishwasher for 3 seconds
Start washing machine for 3 seconds
Start boiling water for 4 seconds
Start cleaning potatoes for 14 pieces
Cleaning potato 0
Cleaning potato 1
Cleaning potato 2
Cleaning potato 3
Cleaning potato 4
Cleaning potato 5
Cleaning potato 6
Cleaning potato 7
Cleaning potato 8
Cleaning potato 9
Cleaning potato 10
Cleaning potato 11
Cleaning potato 12
Cleaning potato 13
End cleaning potatoes for 14 pieces
End dishwasher for 3 seconds
End washing machine for 3 seconds
Start dryer for 3 seconds
End boiling water for 4 seconds
End dryer for 3 seconds
Elapsed 10.01340126991272
If after cleaning each potato we look up for a fraction of a second, if we let the main loop run, then we can notice that the washing machine has ended and we can turn on the dryer before continuing with the next potato. This will allow the dryer to work while we are still cleaning the potatoes.
Start dishwasher for 3 seconds
Start washing machine for 3 seconds
Start boiling water for 4 seconds
Start cleaning potatoes for 14 pieces
Cleaning potato 0
Cleaning potato 1
Cleaning potato 2
Cleaning potato 3
Cleaning potato 4
Cleaning potato 5
End dishwasher for 3 seconds
End washing machine for 3 seconds
Start dryer for 3 seconds
Cleaning potato 6
Cleaning potato 7
End boiling water for 4 seconds
Cleaning potato 8
Cleaning potato 9
Cleaning potato 10
Cleaning potato 11
End dryer for 3 seconds
Cleaning potato 12
Cleaning potato 13
End cleaning potatoes for 14 pieces
Elapsed 7.02296781539917
Async more
Explanation
-
The feeling of parallelism
-
Coroutines
-
async/await
-
Asynchronous
-
non-blocking or synchronous vs blocking (aka "normal")
Coroutines
-
Functions that can be suspended mid-way and allow other functions to run (a generator)
-
async def
is a native coroutine or asynchronous generator -
async with
-
async for
More about asyncio
- AsyncIO in Real Python
- asyncio
- aiohttp
- aiodns
- cchardet
- uvloop drop-in replacement for the standard Python event-loop
Other libraries:
aiofiles https://pypi.org/project/aiofiles/
- mongodb: umongo called micromongo
- postgresql: asyncpg
- mysql:
- redis: asyncio-redis
- sqlite:
unsync
- unsync
- Can seemlessly run tasks in async, thread, or fork so different tasks can have the appropriate solution (e.g. CPU bound processes can use fork.)
Async files
import tempfile
import os
def save_file(filename, size):
with open(filename, 'w') as f:
for _ in range(size):
data = ''
for _ in range(1000):
data += 'xxxxxxxxxxx'
f.write(data)
def main():
tempdir = tempfile.mkdtemp()
for idx in range(300):
filename = os.path.join(tempdir, f'{idx}.txt')
save_file(filename, 100)
main()
import asyncio
import aiofiles
import tempfile
import os
async def save_file(filename, size):
async with aiofiles.open(filename, 'w') as f:
for _ in range(size):
#data = 'x' * 1000000
data = ''
for _ in range(1000):
data += 'xxxxxxxxxxx'
await f.write(data)
async def main():
tempdir = tempfile.mkdtemp()
tasks = []
for idx in range(300):
filename = os.path.join(tempdir, f'{idx}.txt')
tasks.append(asyncio.create_task(save_file(filename, 10)))
await asyncio.gather(*tasks)
#print(os.listdir(tempdir))
asyncio.run(main())
Async example
import asyncio
import time
async def say_after(delay, what):
print(f"start {what}")
await asyncio.sleep(delay)
print(what)
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
await asyncio.sleep(0)
print(f"started at {time.strftime('%X')}")
# Wait until both tasks are completed (should take
# around 2 seconds.)
await task1
await task2
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
Asynchronus programming with Twisted
About Twisted
Echo
from twisted.internet import protocol,reactor
port = 8000
class Echo(protocol.Protocol):
def dataReceived(self, data):
text = data.decode('utf8')
print(f"Received: {text}")
self.transport.write("You said: {}".format(text).encode('utf8'))
class EchoFactory(protocol.Factory):
def buildProtocol(self, addr):
return Echo()
print(f"Listening on port {port}")
reactor.listenTCP(port, EchoFactory())
reactor.run()
from twisted.internet import reactor,protocol
import sys
if len(sys.argv) < 2:
exit("Usage: {sys.argv[0]} TEXT")
message = sys.argv[1]
port = 8000
class EchoClient(protocol.Protocol):
def connectionMade(self):
self.transport.write(message.encode('utf8'))
def dataReceived(self, data):
print(f"Server said: {data}")
self.transport.loseConnection()
class EchoFactory(protocol.ClientFactory):
def buildProtocol(self, addr):
return EchoClient()
def clientConnectionFailed(self, connector, reason):
print("connection failed")
reactor.stop()
def clientConnectionLost(self, connector, reason):
print("connection lost")
reactor.stop()
reactor.connectTCP("localhost", port, EchoFactory())
reactor.run()
Echo with log
from twisted.internet import protocol,reactor
port = 8000
class Echo(protocol.Protocol):
def dataReceived(self, data):
print("Received: {}".format(data))
self.transport.write(data)
class EchoFactory(protocol.Factory):
def buildProtocol(self, addr):
print(f"Contection established with {addr}")
return Echo()
print(f"Started to listen on port {port}")
reactor.listenTCP(port, EchoFactory())
reactor.run()
Simple web client
The code behind this example was deprecated. Need to be fixed.
- getPage() returns a "deferred"
- addCallbacks(on_success, on_failure)
- addBoth(on_both) adds callbock to both success and failure callback chain
from twisted.internet import reactor
from twisted.web.client import getPage
import sys
def printPage(result):
print("Page")
print('Size of the returned page is {}'.format(len(result)))
def printError(error):
print("Error")
print(f"Error: {error}")
#sys.stderr.write(error)
def stop(result):
print('stop')
reactor.stop()
if (len(sys.argv) != 2):
sys.stderr.write("Usage: python " + sys.argv[0] + " <URL>\n")
exit(1)
d = getPage(sys.argv[1])
d.addCallbacks(printPage, printError)
d.addBoth(stop)
reactor.run()
# getPage(sys.argv[1], method='POST', postdata="My test data").
Web client
from twisted.internet import reactor
from twisted.web.client import getPage
import sys
import re
import time
queue = [
'http://docs.python.org/3/',
'http://docs.python.org/3/whatsnew/3.3.html',
'http://docs.python.org/3/tutorial/index.html',
'http://docs.python.org/3/library/index.html',
'http://docs.python.org/3/reference/index.html'
'http://docs.python.org/3/howto/index.html',
'http://docs.python.org/3/howto/pyporting.html',
'http://docs.python.org/3/howto/cporting.html',
'http://docs.python.org/3/howto/curses.html',
'http://docs.python.org/3/howto/descriptor.html',
'http://docs.python.org/3/howto/functional.html',
'http://docs.python.org/3/howto/logging.html',
'http://docs.python.org/3/howto/logging-cookbook.html',
'http://docs.python.org/3/howto/regex.html',
'http://docs.python.org/3/howto/sockets.html',
'http://docs.python.org/3/howto/sorting.html',
'http://docs.python.org/3/howto/unicode.html',
'http://docs.python.org/3/howto/urllib2.html',
'http://docs.python.org/3/howto/webservers.html',
'http://docs.python.org/3/howto/argparse.html',
'http://docs.python.org/3/howto/ipaddress.html',
]
max_parallel = 3
current_parallel = 0
if len(sys.argv) == 2:
max_parallel = int(sys.argv[1])
def printPage(result):
print("page size: ", len(result))
global current_parallel
current_parallel -= 1
print("current_parallel: ", current_parallel)
#urls = re.findall(r'href="([^"]+)"', result)
#for u in urls:
# queue.append(u)
#queue.extend(urls)
process_queue()
def printError(error):
print("Error: ", error)
global current_parallel
current_parallel -= 1
process_queue()
def stop(result):
reactor.stop()
def process_queue():
global current_parallel, max_parallel,queue
print("process_queue cs: {} max: {}".format(current_parallel, max_parallel))
while True:
if current_parallel >= max_parallel:
print("No empty slot")
return
if len(queue) == 0:
print("queue is empty")
if current_parallel == 0:
reactor.stop()
return
url = queue[0] + '?' + str(time.time())
queue[0:1] = []
current_parallel += 1
d = getPage(url)
d.addCallbacks(printPage, printError)
process_queue()
reactor.run()
print("----done ---")
Multiprocess
Multiprocess CPU count
-
cpu_count
The multiprocessing
package makes it easy to run some function many times in parallel.
Running processes in parallel can reduce the overall runtime of the process. Generally one would think that the more we run in parallel the faster the whole process will end, but creating the parallel processes has some overhead and the number of CPUs the computer has also puts a limitation on the paralellizm that might be worth it.
import multiprocessing as mp
print(mp.cpu_count())
Multiprocess N files: Pool
- multiprocess
- Pool
- map
In this example we "analyze" files by counting how many characters they have, how many digits, and how many spaces.
Analyze N files in parallel.
import multiprocessing as mp
import os
import sys
def analyze(filename):
print("Process {:>5} analyzing {}".format(os.getpid(), filename))
digits = 0
letters = 0
spaces = 0
other = 0
total = 0
with open(filename) as fh:
for line in fh:
for char in line:
total += 1
if char.isdigit():
digits += 1
break
if char.isalnum():
letters += 1
break
if char == ' ':
spaces += 1
break
other += 1
return {
'filename': filename,
'total': total,
'digits': digits,
'spaces': spaces,
'letters': letters,
'other': other,
}
def main():
if len(sys.argv) < 3:
exit(f"Usage: {sys.argv[0]} POOL_SIZE FILEs")
size = int(sys.argv[1])
files = sys.argv[2:]
with mp.Pool(size) as pool:
results = pool.map(analyze, files)
for res in results:
print(res)
if __name__ == '__main__':
main()
$ python multiprocess_files.py 3 multiprocess_*.py
Process 12093 analyzing multiprocess_files.py
Process 12093 analyzing multiprocess_pool_async.py
Process 12095 analyzing multiprocess_load.py
Process 12094 analyzing multiprocessing_and_logging.py
Process 12094 analyzing multiprocess_pool.py
{'filename': 'multiprocess_files.py', 'total': 47, 'digits': 0, 'spaces': 37, 'letters': 6, 'other': 4}
{'filename': 'multiprocessing_and_logging.py', 'total': 45, 'digits': 0, 'spaces': 27, 'letters': 11, 'other': 7}
{'filename': 'multiprocess_load.py', 'total': 32, 'digits': 0, 'spaces': 20, 'letters': 7, 'other': 5}
{'filename': 'multiprocess_pool_async.py', 'total': 30, 'digits': 0, 'spaces': 16, 'letters': 6, 'other': 8}
{'filename': 'multiprocess_pool.py', 'total': 21, 'digits': 0, 'spaces': 11, 'letters': 6, 'other': 4}
We asked it to use 3 processes, so looking at the process ID you can see one of them worked twice. The returned results can be any Python datastructure. A dictionary is usually a good idea.
Multiprocess load
- multiprocess
- Pool
- map
import random
import multiprocessing
import time
import sys
# Works only in Python 3
def calc(n):
count = 0
total = 0
while count < 80_000_000 / n:
rnd = random.random()
total += rnd
count += 1
return {'count': count, 'total': total}
def main():
if len(sys.argv) != 2:
exit(f"Usage: {sys.argv[0]} POOL_SIZE")
start = time.time()
size = int(sys.argv[1])
with multiprocessing.Pool(size) as pool:
results = pool.map(calc, [size] * size)
print("Results: {}".format(results))
totals = map(lambda r: r['total'], results)
print("Total: {}".format(sum(totals)))
end = time.time()
print(end - start)
if __name__ == '__main__':
main()
Multiprocess: Pool
- multiprocess
- Pool
- map
Pool(3)
creates 3 child-processes and let's them compute the values. map
returns the results in the same order as the input came in.
import multiprocessing
import os
import sys
def f(x):
print(f"Input {x} in process {os.getpid()}")
return x*x
def main():
if len(sys.argv) != 3:
exit(f"Usage: {sys.argv[0]} NUMBERS POOL_SIZE")
numbers = int(sys.argv[1])
size = int(sys.argv[2])
with multiprocessing.Pool(size) as p:
results = p.map(f, range(numbers))
print(results)
if __name__ == '__main__':
main()
python multiprocess_pool.py 11 3
python multiprocess_pool.py 100 5
Multiprocess load async
- imap
- map_async
import multiprocessing
import os
def f(x):
print(f"Input {x} in process {os.getpid()}")
return x*x
def prt(z):
print(z)
def main():
with multiprocessing.Pool(5) as p:
results = p.imap(f, range(11)) # <multiprocessing.pool.IMapIterator object
print(results)
print('--')
for r in results:
print(r)
#results = p.map_async(f, range(11)) # <multiprocessing.pool.MapResult object>, not iterable
#results = []
#p.map_async(f, range(11)) # <multiprocessing.pool.MapResult object>, not iterable
#print(results)
#for r in results:
# print(r)
if __name__ == '__main__':
main()
Multiprocess and logging
- multiprocess
- logging
Tested on Windows
from multiprocessing import Pool
import os
import logging
import logging.handlers
count = 0
def f(x):
global count
count += 1
#print("Input {} in process {}".format(x, os.getpid()))
logger = logging.getLogger("app")
logger.info("f({}) count {} in PID {}".format(x, count, os.getpid()))
return x*x
def prt(z):
print(z)
def setup_logger():
level = logging.DEBUG
logger = logging.getLogger("app")
logger.setLevel(level)
log_file = 'try.log'
formatter = logging.Formatter('%(asctime)s - %(levelname)-8s - %(filename)-20s:%(lineno)-5d - %(funcName)-22s - %(message)s')
ch = logging.FileHandler(log_file)
#ch = logging.handlers.TimedRotatingFileHandler(log_file, when='D', backupCount=2)
ch.setLevel(level)
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.info("Setup logger in PID {}".format(os.getpid()))
def main():
logger = logging.getLogger('app')
logger.info("main")
with Pool(5) as p:
results = p.imap(f, range(110)) # <multiprocessing.pool.IMapIterator object
print(results)
print('--')
for r in results:
print(r)
setup_logger()
if __name__ == '__main__':
main()
Exercise: Process N files in parallel
Create a script that given two number N and X will create N files (1.txt - N.txt). In each file put X rows of random ASCII characters: (digits, lower- and upper-case letters, space). (see string) Each row should be 0-80 characters long. (random length for each row). Using the script create a bunch of files.
Write a script that given a list of files will read all the files. For each file and count how many times each digit(!) appears and provide a combined report. First write the script in a single process (linear) way. Then convert it to be able to work with multiprocess. This version should also accept a number that indicates the size of the pool. Ideally you'd only need to write a few lines of code for this and you'd be able to use the code from the previous (linear) solution as a module
Submit the 3 scripts.
The report could look like this:
0 1 2 3 4 5 6 7 8 9
1.txt 3 1 3 2 8 3 2 3 2 6
2.txt 6 5 3 1 6 7 4 4 4 4
3.txt 6 3 4 7 2 5 5 1 7 6
TOTAL 15 9 10 10 16 15 11 8 13 16
Create 100 files with 10000 rows in each one and measure how long the linear process takes vs the parallel process with various numbers.
Exercise: Process N Excel files in parallel
- Create N Excel files with 10 random numbers in the first row of each file. This should be create_random_excel.py.
- Write a process that reads the N Excel files and sums up the numbers in each one of them and then sums up the numbers of all the files. This should be add_excel.py that gets the list of Excel files on the command line.
Exercise: Fetch URLs in parallel
- top-websites
- Given a file with a list of URLs, collect the title of each site.
{% embed include file="src/examples/parallel/urls.txt)
import time
import requests
import sys
from bs4 import BeautifulSoup
def get_urls(limit):
with open('urls.txt') as fh:
urls = list(map(lambda line: line.rstrip("\n"), fh))
if len(urls) > limit:
urls = urls[:limit]
return urls
def get_title(url):
try:
resp = requests.get(url)
if resp.status_code != 200:
return None, f"Incorrect status_code {resp.status_code} for {url}"
except Exception as err:
return None, f"Error: {err} for {url}"
soup = BeautifulSoup(resp.content, 'html.parser')
return soup.title.string, None
def main():
if len(sys.argv) < 2:
exit(f"Usage: {sys.argv[0]} LIMIT")
limit = int(sys.argv[1])
urls = get_urls(limit)
print(urls)
start = time.time()
titles = []
for url in urls:
#print(f"Processing {url}")
title, err = get_title(url)
if err:
print(err)
else:
print(title)
titles.append({
"url": url,
"title": title,
"err": err,
})
end = time.time()
print("Elapsed time: {} for {} pages.".format(end-start, len(urls)))
print(titles)
if __name__ == '__main__':
main()
Exercise: Fetch URLs from one site.
Download the sitemap or the other [sitemap](the https://code-maven.com/slides/sitemap.xml) file and fetch the first N URLs from there. Collecting the titles.
import time
import requests
import xml.etree.ElementTree as ET
from bs4 import BeautifulSoup
def get_urls(content):
urls = []
root = ET.fromstring(content)
for child in root:
for ch in child:
if ch.tag.endswith('loc'):
urls.append(ch.text)
#print(len(urls)) # 2653
MAX = 20
if len(urls) > MAX:
urls = urls[:MAX]
return urls
def main():
start = time.time()
url = 'https://code-maven.com/slides/sitemap.xml'
resp = requests.get(url)
if resp.status_code != 200:
exit(f"Incorrect status_code {resp.status_code}")
urls = get_urls(resp.content)
titles = []
for url in urls:
resp = requests.get(url)
if resp.status_code != 200:
print(f"Incorrect status_code {resp.status_code} for {url}")
continue
soup = BeautifulSoup(resp.content, 'html.parser')
print(soup.title.string)
titles.append(soup.title.string)
end = time.time()
print("Elapsed time: {} for {} pages.".format(end-start, len(urls)))
print(titles)
if __name__ == '__main__':
main()
Solution: Process N files in parallel
import sys
import string
import random
def main():
if len(sys.argv) != 3:
exit(f"Usage: {sys.argv[0]} NUMBER_OF_FILES NUMBER_OF_ROWS")
number_of_files = int(sys.argv[1])
number_of_rows = int(sys.argv[2])
characters = string.ascii_letters + ' ' + string.digits
# print(number_of_rows)
for file_id in range(1, number_of_files + 1):
filename = f"{file_id}.txt"
# print(filename)
with open(filename, "w") as fh:
for _ in range(number_of_rows):
length = random.randrange(0, 81)
# print(length)
row = ''.join(random.choices(characters, k=length))
fh.write(row + "\n")
if __name__ == "__main__":
main()
import sys
import string
def count_digits(filename):
count = {}
for cr in string.digits:
count[cr] = 0
with open(filename) as fh:
for row in fh:
for cr in row:
if cr in string.digits:
count[cr] += 1
return {
"filename": filename,
"count": count,
}
def print_table(results):
dw = 6
width = 0
for res in results:
width = max(width, len(res["filename"]))
print(" " * (width + 1), end="")
for n in range(10):
print(f"{n:{dw}}", end="")
print("")
for res in results:
print(f'{res["filename"]:{width}} ', end="")
for digit in string.digits:
print(f"{res['count'][digit]:{dw}}", end="")
print("")
total = {}
for digit in string.digits:
total[digit] = 0
for res in results:
for digit in string.digits:
total[digit] += res["count"][digit]
name = "TOTAL"
print(f'{name:{width}} ', end="")
for digit in string.digits:
print(f"{total[digit]:{dw}}", end="")
print("")
def main():
if len(sys.argv) < 2:
exit(f"Usage: {sys.argv[0]} FILENAMEs")
files = sys.argv[1:]
results = []
for filename in files:
result = count_digits(filename)
results.append(result)
print_table(results)
if __name__ == "__main__":
main()
import sys
import count_digits
def main():
if len(sys.argv) < 2:
exit(f"Usage: {sys.argv[0]} FILENAMEs")
files = sys.argv[1:]
results = map(count_digits.count_digits, files)
count_digits.print_table(list(results))
if __name__ == "__main__":
main()
import sys
import count_digits
import multiprocessing as mp
def main():
if len(sys.argv) < 3:
exit(f"Usage: {sys.argv[0]} POOL FILENAMEs")
size = int(sys.argv[1])
files = sys.argv[2:]
with mp.Pool(size) as pool:
results = pool.map(count_digits.count_digits, files)
count_digits.print_table(list(results))
if __name__ == "__main__":
main()
Solution: Fetch URLs in parallel
- First create function and use regular map.
- Deal with encoding.
- Replace continue by return, include None in results.
- It has some 2 sec overhead, but then 20 items reduced from 18 sec to 7 sec using pool of 5.
import time
import requests
import xml.etree.ElementTree as ET
from bs4 import BeautifulSoup
from multiprocessing import Pool
import os
def get_urls(content):
urls = []
root = ET.fromstring(content)
for child in root:
for ch in child:
if ch.tag.endswith('loc'):
urls.append(ch.text)
#print(len(urls)) # 2653
MAX = 20
if len(urls) > MAX:
urls = urls[:MAX]
return urls
def get_title(url):
resp = requests.get(url)
if resp.status_code != 200:
print(f"Incorrect status_code {resp.status_code} for {url}")
return
soup = BeautifulSoup(resp.content, 'html.parser')
print(soup.title.string)
return soup.title.string.encode('utf-8')
def main():
start = time.time()
url = 'https://code-maven.com/slides/sitemap.xml'
resp = requests.get(url)
if resp.status_code != 200:
exit(f"Incorrect status_code {resp.status_code}")
urls = get_urls(resp.content)
titles = []
# for url in urls:
# titles.append(get_title(url))
# titles = list(map(get_title, urls))
with Pool(5) as pool:
results = pool.map(get_title, urls)
for r in results:
titles.append(r)
end = time.time()
print("Elapsed time: {} for {} pages.".format(end-start, len(urls)))
print(list(titles))
print("DONE")
if __name__ == '__main__':
main()
Multitasking
What is Multitasking?
- Multitasking
- A wrapper around threading and os.fork by Ran Aroussi
pip install multitasking
Multitasking example
- multitasking
- task
- set_max_threads
import multitasking
import time
import random
multitasking.set_max_threads(2)
@multitasking.task
def work(ix, sec):
print(f"Start {ix} sleeping for {sec}s")
time.sleep(sec)
print(f"Finish {ix}")
if __name__ == "__main__":
tasks = (6, 0.7, 0.8, 0.3, 0.4, 3, 0.1)
for ix, sec in enumerate(tasks):
work(ix+1, sec)
print("do some work after all the jobs are done")
Start 1 sleeping for 6s
Start 2 sleeping for 0.7s
do some work after all the jobs are done
Finish 2
Start 3 sleeping for 0.8s
Finish 3
Start 4 sleeping for 0.3s
Finish 4
Start 5 sleeping for 0.4s
Finish 5
Start 6 sleeping for 3s
Finish 6
Start 7 sleeping for 0.1s
Finish 7
Finish 1
Multitasking example with wait
- wait_for_tasks
import multitasking
import time
import random
multitasking.set_max_threads(2)
@multitasking.task
def work(ix, sec):
print(f"Start {ix} sleeping for {sec}s")
time.sleep(sec)
print(f"Finish {ix}")
if __name__ == "__main__":
tasks = (6, 0.7, 0.8, 0.3, 0.4, 3, 0.1)
for ix, sec in enumerate(tasks):
work(ix+1, sec)
multitasking.wait_for_tasks()
print("do some work after all the jobs are done")
Start 1 sleeping for 6s
Start 2 sleeping for 0.7s
Finish 2
Start 3 sleeping for 0.8s
Finish 3
Start 4 sleeping for 0.3s
Finish 4
Start 5 sleeping for 0.4s
Finish 5
Start 6 sleeping for 3s
Finish 6
Start 7 sleeping for 0.1s
Finish 7
Finish 1
do some work after all the jobs are done
Multitaksing - second loop waits for first one
import multitasking
import time
import random
@multitasking.task
def first(count):
sleep = random.randint(1,10)/2
if count == 10:
sleep = 10
print("Start First {} (sleeping for {}s)".format(count, sleep))
time.sleep(sleep)
print("finish First {} (after for {}s)".format(count, sleep))
@multitasking.task
def second(count):
sleep = random.randint(1,10)/2
print("Start Second {} (sleeping for {}s)".format(count, sleep))
time.sleep(sleep)
print("finish Second {} (after for {}s)".format(count, sleep))
if __name__ == "__main__":
for i in range(0, 10):
first(i+1)
multitasking.wait_for_tasks()
print('first done')
for i in range(0, 10):
second(i+1)
multitasking.wait_for_tasks()
print('second done')
Multitasking counter
import multitasking
import time
multitasking.set_max_threads(10)
counter = 0
@multitasking.task
def count(n):
global counter
for _ in range(n):
counter += 1
if __name__ == "__main__":
start = time.time()
k = 10
n = 1000000
for _ in range(k):
count(n)
multitasking.wait_for_tasks()
end = time.time()
expected = k * n
print(f'done actual: {counter} expected: {expected}. Missing: {expected-counter}')
print(f'Elapsed time {end-start}')
done actual: 3198547 expected: 10000000. Missing: 6801453
Elapsed time 0.5210244655609131
Multitasking counter with thread locking
- threading
- Lock
- acquire
- release
import multitasking
import time
import threading
multitasking.set_max_threads(10)
counter = 0
locker = threading.Lock()
@multitasking.task
def count(n):
global counter
for _ in range(n):
locker.acquire()
counter += 1
locker.release()
if __name__ == "__main__":
start = time.time()
k = 10
n = 1000000
for _ in range(k):
count(n)
multitasking.wait_for_tasks()
end = time.time()
expected = k * n
print(f'done actual: {counter} expected: {expected}. Missing: {expected-counter}')
print(f'Elapsed time {end-start}')
done actual: 10000000 expected: 10000000. Missing: 0
Elapsed time 37.231414556503296