除了顺序执行和并行执行的模型之外,还有第三种模型,叫做异步模型,这是事件驱动模型的基础。

异步活动的执行模型可以只有一个单一的主控制流,能在单核心系统和多核心系统中运行。

在并发执行的异步模型中,许多任务被穿插在同一时间线上,所有的任务都由一个控制流执行(单一线程)。任务的执行可能被暂停或恢复,中间的这段时间线程将会去执行其他任务。如图:

如图所示,任务(不同的颜色表示不同的任务)可能被其他任务插入,但是都处在同一个线程下。这表明,当某一个任务执行的时候,其他的任务都暂停了。

与多线程编程模型很大的一点不同是,多线程由操作系统决定在时间线上什么时候挂起某个活动或恢复某个活动,而在异步并发模型中,必须假设线程可能在任何时间被挂起和替换。

可以将任务编写成许多可以间隔执行的小步骤, 这样的话如果一个任务需要另一个任务的输出,那么被依赖的任务必须接收它的输入

1. 使用Python的 concurrent.futures 模块

python-parallel-programming-cookbook-cb 使用Python的 concurrent.futures 模块

concurrent.futures 模块具有线程池和进程池、管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能。

concurrent.futures 模块由以下部分组成:

  • concurrent.futures.Executor: 这是一个虚拟基类,提供了异步执行的方法。
  • submit(function, argument): 调度函数(可调用的对象)的执行,将 argument 作为参数传入。
  • map(function, argument): 将 argument 作为参数执行函数,以 异步 的方式。
  • shutdown(Wait=True): 发出让执行者释放所有资源的信号。
  • concurrent.futures.Future: 其中包括函数的异步执行。Future对象是submit任务(即带有参数的functions)到executor的实例。

Executor是抽象类,可以通过子类访问,即线程或进程的 ExecutorPools 。因为,线程或进程的实例是依赖于资源的任务,所以最好以“池”的形式将他们组织在一起,作为可以重用的launcher或executor。

2.1. 使用线程池和进程池

线程池或进程池是用于在程序中优化和简化线程/进程的使用。

通过池,可以提交任务给executor。池由两部分组成,一部分是内部的队列,存放着待执行的任务;另一部分是一系列的进程或线程,用于执行这些任务。

池的概念主要目的是为了重用:让线程或进程在生命周期内可以多次使用。它减少了创建创建线程和进程的开销,提高了程序性能。重用不是必须的规则,但它是程序员在应用中使用池的主要原因。

current.Futures 模块提供了两种 Executor 的子类,各自独立操作一个线程池和一个进程池。这两个子类分别是:

  • concurrent.futures.ThreadPoolExecutor(max_workers)
  • concurrent.futures.ProcessPoolExecutor(max_workers)

max_workers 参数表示最多有多少个worker并行执行任务。

2.2. 线程池和进程池的示例

假设给定任务:给一个list number_list ,包含1到10。对list中的每一个数字,乘以1+2+3…+10000000的和.

下面的代码分别测试了:

  • 顺序执行
  • 通过有5个worker的线程池执行
  • 通过有5个worker的进程池执行
#!/usr/bin/python3
#!--*-- coding:utf-8 --*--
import concurrent.futures
import time

number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

def evaluate_item(x):
        # 计算总和
        result_item = count(x)
        # 打印输入和输出结果
        return result_item


def  count(number) :
        for i in range(0, 10000000):
                i=i+1
        return i * number


if __name__ == "__main__":
        #顺序执行
        start_time = time.time()
        for item in number_list:
            print(evaluate_item(item))
        print("Sequential execution in " + str(time.time() - start_time), "seconds")
        
        #线程池执行
        start_time_1 = time.time()
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
                futures = [executor.submit(evaluate_item, item) for item in number_list]
                for future in concurrent.futures.as_completed(futures):
                        print(future.result())
        print("Thread pool execution in " + str(time.time() - start_time_1), "seconds")
        
        #进程池执行
        start_time_2 = time.time()
        with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
                futures = [executor.submit(evaluate_item, item) for item in number_list]
                for future in concurrent.futures.as_completed(futures):
                        print(future.result())
        print("Process pool execution in " + str(time.time() - start_time_2), "seconds")

输出如:

10000000
20000000
30000000
40000000
50000000
60000000
70000000
80000000
90000000
100000000
Sequential execution in 3.668065309524536 seconds
20000000
10000000
40000000
60000000
50000000
30000000
70000000
80000000
90000000
100000000
Thread pool execution in 3.499577283859253 seconds
40000000
50000000
20000000
10000000
30000000
70000000
60000000
80000000
90000000
100000000
Process pool execution in 1.1742675304412842 seconds
Last modification:June 10th, 2022 at 07:25 pm