原文:Parallelism, Concurrency, and AsyncIO in Python - by example - 2022.07.05
主要内容:
采用 multiprocessing、threading 和 asyncio 实现CPU密集型和IO密集型(CPU-bound and IO-bound) 任务的加速.
threads、multiprocessing、asyncio 的区别:
Speeding Up Python with Concurrency, Parallelism, and asyncio
并发和并行(Concurrency vs Parallelism)
并发和并行相似,但又不完全一样.
并发是CPU上同时运行多个任务的能力;任务可以在重叠时间段内开始、运行和完成. 对于单 GPU,多任务是通过 context switching 来运行的,其中每个进程的状态被存储,以便于其后续被调用和执行.
并行是跨多核CPU上同时运行多个任务.
尽管二者都可以提升应用的运行速度,但并发和并行并不是都可以任意用的. 应用场景取决于任务是CPU密集型还是IO密集型.
例如,数值计算是CPU密集型,因为随着计算处理核的增加,计算能力随之而增加.
并行适用于CPU密集型. 理论上,如果一个任务被划分为 n 个子任务,每个子任务都能并行的运行,可以有效的将任务耗时降低到 1/n.
并发更适用于IO密集型,因为在IO资源拉取时,可以进行其他事情.
CPU密集型任务的最佳示例是数据科学. 数据科学家需要处理大量的数据. 对于数据处理,其可以将数据划分为多个 batches,再并行运行,能够有效的降低处理的总时间. 增加CPU核的数量能够加快处理速度.
Web爬取是IO密集型任务.CPU影响较小,主要的时间是花在了从网络读取和写入上. 此外,数据库调用、磁盘文件读取和写入等也是IO密集型任务. Django和Flask等Web应用,也是IO密集型应用.
Python示例
Python 库:
Library | Class/Method | Processing Type |
---|---|---|
threading | Thread | concurrent |
concurrent.futures | ThreadPoolExecutor | concurrent |
asyncio | gather | concurrent (via coroutines) |
multiprocessing | Pool | parallel |
concurrent.futures | ProcessPoolExecutor | parallel |
# tasks.py
import os
from multiprocessing import current_process
from threading import current_thread
import requests
def make_request(num):
# io-bound
pid = os.getpid()
thread_name = current_thread().name
process_name = current_process().name
print(f"{pid} - {process_name} - {thread_name}")
requests.get("https://httpbin.org/ip")
async def make_request_async(num, client):
# io-bound
pid = os.getpid()
thread_name = current_thread().name
process_name = current_process().name
print(f"{pid} - {process_name} - {thread_name}")
await client.get("https://httpbin.org/ip")
def get_prime_numbers(num):
# cpu-bound
pid = os.getpid()
thread_name = current_thread().name
process_name = current_process().name
print(f"{pid} - {process_name} - {thread_name}")
numbers = []
prime = [True for i in range(num + 1)]
p = 2
while p * p <= num:
if prime[p]:
for i in range(p * 2, num + 1, p):
prime[i] = False
p += 1
prime[0] = False
prime[1] = False
for p in range(num + 1):
if prime[p]:
numbers.append(p)
IO密集型操作
IO密集型任务,IO耗时比CPU耗时多.
例如,网络爬取是IO密集型,使用线程来加速处理,因为HTML的爬取(IO)比解析(CPU)更慢.
Python 实现:
[1] - benchmark 实现
# io-bound_sync.py
import time
from tasks import make_request
def main():
for num in range(1, 101):
make_request(num)
if __name__ == "__main__":
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")
[2] - Threading 实现
# io-bound_concurrent_1.py
import threading
import time
from tasks import make_request
def main():
tasks = []
for num in range(1, 101):
tasks.append(threading.Thread(target=make_request, args=(num,)))
tasks[-1].start()
for task in tasks:
task.join()
if __name__ == "__main__":
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")
[3] - concurrent.futures 实现
# io-bound_concurrent_2.py
import time
from concurrent.futures import ThreadPoolExecutor, wait
from tasks import make_request
def main():
futures = []
with ThreadPoolExecutor() as executor:
for num in range(1, 101):
futures.append(executor.submit(make_request, num))
#
wait(futures)
if __name__ == "__main__":
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")
[4] - AsyncIO 实现
# io-bound_concurrent_3.py
import asyncio
import time
import httpx
from tasks import make_request_async
async def main():
async with httpx.AsyncClient() as client:
return await asyncio.gather(
*[make_request_async(num, client) for num in range(1, 101)]
)
if __name__ == "__main__":
start_time = time.perf_counter()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end_time = time.perf_counter()
elapsed_time = end_time - start_time
print(f"Elapsed run time: {elapsed_time} seconds")
CPU密集型任务
Python 实现:
[1] - benchmark
# cpu-bound_sync.py
import time
from tasks import get_prime_numbers
def main():
for num in range(1000, 16000):
get_prime_numbers(num)
if __name__ == "__main__":
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")
[2] - Multiprocessing 实现
# cpu-bound_parallel_1.py
import time
from multiprocessing import Pool, cpu_count
from tasks import get_prime_numbers
def main():
with Pool(cpu_count() - 1) as p:
p.starmap(get_prime_numbers, zip(range(1000, 16000)))
p.close()
p.join()
if __name__ == "__main__":
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")
[3] - concurrent.futures 实现
# cpu-bound_parallel_2.py
import time
from concurrent.futures import ProcessPoolExecutor, wait
from multiprocessing import cpu_count
from tasks import get_prime_numbers
def main():
futures = []
with ProcessPoolExecutor(cpu_count() - 1) as executor:
for num in range(1000, 16000):
futures.append(executor.submit(get_prime_numbers, num))
wait(futures)
if __name__ == "__main__":
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")