原文:Parallelism, Concurrency, and AsyncIO in Python - by example - 2022.07.05

主要内容:

采用 multiprocessing、threading 和 asyncio 实现CPU密集型和IO密集型(CPU-bound and IO-bound) 任务的加速.

threads、multiprocessing、asyncio 的区别:

Speeding Up Python with Concurrency, Parallelism, and asyncio

并发和并行(Concurrency vs Parallelism)

并发和并行相似,但又不完全一样.

并发是CPU上同时运行多个任务的能力;任务可以在重叠时间段内开始、运行和完成. 对于单 GPU,多任务是通过 context switching 来运行的,其中每个进程的状态被存储,以便于其后续被调用和执行.

并行是跨多核CPU上同时运行多个任务.

尽管二者都可以提升应用的运行速度,但并发和并行并不是都可以任意用的. 应用场景取决于任务是CPU密集型还是IO密集型.

例如,数值计算是CPU密集型,因为随着计算处理核的增加,计算能力随之而增加.

并行适用于CPU密集型. 理论上,如果一个任务被划分为 n 个子任务,每个子任务都能并行的运行,可以有效的将任务耗时降低到 1/n.

并发更适用于IO密集型,因为在IO资源拉取时,可以进行其他事情.

CPU密集型任务的最佳示例是数据科学. 数据科学家需要处理大量的数据. 对于数据处理,其可以将数据划分为多个 batches,再并行运行,能够有效的降低处理的总时间. 增加CPU核的数量能够加快处理速度.

Web爬取是IO密集型任务.CPU影响较小,主要的时间是花在了从网络读取和写入上. 此外,数据库调用、磁盘文件读取和写入等也是IO密集型任务. Django和Flask等Web应用,也是IO密集型应用.

Python示例

Python 库:

LibraryClass/MethodProcessing Type
threadingThreadconcurrent
concurrent.futuresThreadPoolExecutorconcurrent
asynciogatherconcurrent (via coroutines)
multiprocessingPoolparallel
concurrent.futuresProcessPoolExecutorparallel

Github - parallel-concurrent-examples-python

# tasks.py
import os
from multiprocessing import current_process
from threading import current_thread

import requests


def make_request(num):
    # io-bound
    pid = os.getpid()
    thread_name = current_thread().name
    process_name = current_process().name
    print(f"{pid} - {process_name} - {thread_name}")

    requests.get("https://httpbin.org/ip")


async def make_request_async(num, client):
    # io-bound
    pid = os.getpid()
    thread_name = current_thread().name
    process_name = current_process().name
    print(f"{pid} - {process_name} - {thread_name}")

    await client.get("https://httpbin.org/ip")


def get_prime_numbers(num):
    # cpu-bound
    pid = os.getpid()
    thread_name = current_thread().name
    process_name = current_process().name
    print(f"{pid} - {process_name} - {thread_name}")

    numbers = []
    prime = [True for i in range(num + 1)]
    p = 2

    while p * p <= num:
        if prime[p]:
            for i in range(p * 2, num + 1, p):
                prime[i] = False
        p += 1

    prime[0] = False
    prime[1] = False

    for p in range(num + 1):
        if prime[p]:
            numbers.append(p)

IO密集型操作

IO密集型任务,IO耗时比CPU耗时多.

例如,网络爬取是IO密集型,使用线程来加速处理,因为HTML的爬取(IO)比解析(CPU)更慢.

Python 实现:

[1] - benchmark 实现

# io-bound_sync.py
import time

from tasks import make_request


def main():
    for num in range(1, 101):
        make_request(num)


if __name__ == "__main__":
    start_time = time.perf_counter()
    main()
    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

[2] - Threading 实现

# io-bound_concurrent_1.py
import threading
import time

from tasks import make_request


def main():
    tasks = []

    for num in range(1, 101):
        tasks.append(threading.Thread(target=make_request, args=(num,)))
        tasks[-1].start()

    for task in tasks:
        task.join()


if __name__ == "__main__":
    start_time = time.perf_counter()
    main()
    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

[3] - concurrent.futures 实现

# io-bound_concurrent_2.py
import time
from concurrent.futures import ThreadPoolExecutor, wait

from tasks import make_request


def main():
    futures = []

    with ThreadPoolExecutor() as executor:
        for num in range(1, 101):
            futures.append(executor.submit(make_request, num))
    #
    wait(futures)


if __name__ == "__main__":
    start_time = time.perf_counter()
    main()
    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

[4] - AsyncIO 实现

# io-bound_concurrent_3.py
import asyncio
import time
import httpx

from tasks import make_request_async


async def main():
    async with httpx.AsyncClient() as client:
        return await asyncio.gather(
            *[make_request_async(num, client) for num in range(1, 101)]
        )


if __name__ == "__main__":
    start_time = time.perf_counter()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    end_time = time.perf_counter()
    elapsed_time = end_time - start_time
    print(f"Elapsed run time: {elapsed_time} seconds")

CPU密集型任务

Python 实现:

[1] - benchmark

# cpu-bound_sync.py
import time
from tasks import get_prime_numbers


def main():
    for num in range(1000, 16000):
        get_prime_numbers(num)


if __name__ == "__main__":
    start_time = time.perf_counter()
    main()
    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

[2] - Multiprocessing 实现

# cpu-bound_parallel_1.py
import time
from multiprocessing import Pool, cpu_count

from tasks import get_prime_numbers


def main():
    with Pool(cpu_count() - 1) as p:
        p.starmap(get_prime_numbers, zip(range(1000, 16000)))
        p.close()
        p.join()


if __name__ == "__main__":
    start_time = time.perf_counter()
    main()
    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

[3] - concurrent.futures 实现

# cpu-bound_parallel_2.py
import time
from concurrent.futures import ProcessPoolExecutor, wait
from multiprocessing import cpu_count

from tasks import get_prime_numbers


def main():
    futures = []
    with ProcessPoolExecutor(cpu_count() - 1) as executor:
        for num in range(1000, 16000):
            futures.append(executor.submit(get_prime_numbers, num))

    wait(futures)


if __name__ == "__main__":
    start_time = time.perf_counter()
    main()
    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")
Last modification:August 4th, 2022 at 03:58 pm