multiprocessing 使用子进程代替线程,有效避免 GIL, Global Interpreter Lock 的影响.
multiprocessing 模块允许充分利用机器上的多个核心进行处理.
multiprocessing 库中的 multiprocessing.pool.Pool
对象,提供了可以跨多个输入值并行化函数的执行,跨进程分配输入数据(数据并行)的方法.
multiprocessing.pool.Pool
提供了如下接口:
[1] - apply(func[, args[, kwds]]), 等价于 apply_async( ... ).get()
[2] - apply_async(func[, args[, kwds[, callback[, error_callback]]]])
[3] - map(func, iterable[, chunksize]), 等价于 map_async( ... ).get()
[4] - map_async(func, iterable[, chunksize[, callback[, error_callback]]])
[5] - imap(func, iterable[, chunksize])
[6] - imap_unordered(func, iterable[, chunksize])
[7] - starmap(func, iterable[, chunksize]), 等价于 starmap_async( ... ).get()
[8] - starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])
1. apply 和 apply_async
1.1. apply
apply(func[,args[,kwds]])
apply是阻塞的,需要等待上一个进程结束,下一个进程才开始,所以无法加速.
示例1如:
from multiprocessing import Pool
import time
def square(x):
time.sleep(2)
print('[INFO]...processing: ', x)
return x**x
if __name__ == '__main__':
p = Pool(2)
xs = [1, 2, 3, 4]
#
start = time.time()
for x in xs:
ret = p.apply(square, (x,)) #会阻塞
print(ret)
print("[INFO]timecost: ", time.time() - start)
示例2:
if __name__ == '__main__':
start = time.time()
with Pool(processes=2) as p:
ret = list(p.apply(square, args=(x,)) for x in xs)
print("[INFO]timecost: ", time.time() - start)
示例3:
#With tqdm
from tqdm import tqdm
if __name__ == '__main__':
start = time.time()
with Pool(processes=2) as p:
ret = list(tqdm((p.apply(square, args=(x,)) for x in xs), total=len(xs)))
print("[INFO]timecost: ", time.time() - start)
1.2. apply_async
apply_async(func[,args[,kwds[,callback[,error_callback]]]])
单次启动一个任务,但是异步执行,启动后不等这个进程结束又开始执行新任务.
相比 apply,apply_async是异步的,返回一个异步对象,可以使用 .get()
方法等待结果 , 如果不需结果不必获取. 有加速效果.
示例1如:
from multiprocessing import Pool
import time
def square(x):
time.sleep(2)
print('[INFO]...processing: ', x)
return x**x
if __name__ == '__main__':
p = Pool(2)
xs = [1, 2, 3, 4]
#
start = time.time()
rets = []
for x in xs:
ret = p.apply_async(square, (x,))
rets.append(ret)
#
for ret in rets:
print(ret.get()) #get会阻塞
print("[INFO]timecost: ", time.time() - start)
示例2如:
if __name__ == '__main__':
start = time.time()
with Pool(processes=2) as p:
rets = list(p.apply_async(square, args=(x,)) for x in xs)
rets = [r.get() for r in rets]
print("[INFO]timecost: ", time.time() - start)
示例3如:
#With tqdm
from tqdm import tqdm
if __name__ == '__main__':
start = time.time()
with Pool(processes=2) as p:
rets = list(p.apply_async(square, args=(x,)) for x in xs)
rets = [r.get() for r in tqdm(rets)]
print("[INFO]timecost: ", time.time() - start)
示例4如:
#回调函数Callback方式
#With tqdm
from tqdm import tqdm
if __name__ == '__main__':
start = time.time()
with tqdm(total=len(xs)) as pbar:
with Pool(processes=2) as p:
def callback(*args):
#callback
pbar.update()
return
results = [
p.apply_async(
square,
args=(x, ),
callback=callback) for x in xs]
results = [r.get() for r in results]
print("[INFO]timecost: ", time.time() - start)
2. map 和 map_async
注:避免使用 map 和 map_async,有更好的选择,如starmap.
2.1. map
map(func,iterable[,chunksize])
阻塞到任务列表中所有任务完成再往下执行.
示例1如:
from multiprocessing import Pool
import time
def square(x): #map:只接收一个参数
time.sleep(2)
print('[INFO]...processing: ', x)
return x**x
if __name__ == '__main__':
p = Pool(2)
xs = [1, 2, 3, 4]
#
start = time.time()
ret = p.map(square, xs) #会阻塞
print(ret)
print("[INFO]timecost: ", time.time() - start)
示例2如:
#With tqdm
if __name__ == '__main__':
start = time.time()
with Pool(processes=2) as p:
rets = list(tqdm(p.map(square, xs, chunksize=len(xs)//2)))
print(rets)
print("[INFO]timecost: ", time.time() - start)
2.2. map_async
map_async(func,iterable[,chunksize[,callback[,error_callback]]])
map_async生成子进程时使用的是list.
示例如:
if __name__ == '__main__':
start = time.time()
with Pool(processes=2) as p:
rets = p.map_async(square, xs)
print(rets.get())#get会阻塞
print("[INFO]timecost: ", time.time() - start)
3. imap 和 imap_unordered
imap 和 imap_unordered 与 map_async 同样是异步,区别是:
[1] - map_async生成子进程时使用的是list,而imap和 imap_unordered则是Iterable,map_async效率略高,而imap和 imap_unordered内存消耗显著的小.
[2] - 在处理结果上,imap 和 imap_unordered 可以尽快返回一个Iterable的结果,而map_async 则需要等待全部Task执行完毕,返回list.
imap 和 imap_unordered 的区别是:
imap 和 map_async一样,都按顺序等待Task的执行结果,而imap_unordered则不必.
imap_unordered返回的Iterable,会优先迭代到先执行完成的Task.
使用imap/imap_unordered替代map_async主要的原因有:
[1] - 可迭代对象足够大,将其转换为列表会导致您耗尽/使用太多内存。
[2] - 希望能够在完成所有结果之前就先处理结果
3.1. imap
imap(func,iterable[,chunksize])
示例如:
from multiprocessing import Pool
import time
def square(x): #map:只接收一个参数
time.sleep(2)
print('[INFO]...processing: ', x)
return x**x
if __name__ == '__main__':
p = Pool(2)
xs = [1, 2, 3, 4]
#
start = time.time()
rets = p.imap(square, xs) #不会阻塞
for ret in rets:#这里会阻塞
print(ret)
print("[INFO]timecost: ", time.time() - start)
示例2如:
if __name__ == '__main__':
start = time.time()
with Pool(processes=2) as p:
results = list(p.imap(square, xs, chunksize=len(xs) // 2))
print(results)
print("[INFO]timecost: ", time.time() - start)
示例3如:
#With tqdm
if __name__ == '__main__':
start = time.time()
with Pool(processes=2) as p:
results = list(tqdm(p.imap(square, xs, chunksize=len(xs) // 2), total=len(xs)))
print(results)
print("[INFO]timecost: ", time.time() - start)
3.2. imap_unordered
imap_unordered(func,iterable[,chunksize])
相对 imap,imap_unordered 的结果是无序的,哪个进程先结束,结果就先获得. 而 imap结果是有序的.
示例1如:
from multiprocessing import Pool
import time
def square(x): #map:只接收一个参数
time.sleep(2)
print('[INFO]...processing: ', x)
return x**x
if __name__ == '__main__':
p = Pool(2)
xs = [1, 2, 3, 4]
#
start = time.time()
rets = p.imap_unordered(square, xs) #不会阻塞
for ret in rets:#这里会阻塞
print(ret)
print("[INFO]timecost: ", time.time() - start)
示例2如:
#With tqdm
if __name__ == '__main__':
start = time.time()
with Pool(processes=2) as p:
results = list(tqdm(p.imap(square, xs, chunksize=len(xs) // 2), total=len(xs)))
print(results)
print("[INFO]timecost: ", time.time() - start)
4. starmap 和 starmap_async
starmap 和 starmap_async 与 map 和 map_async 的区别是: starmap 和 starmap_async 可以传入多个参数.
4.1. startmap
示例如:
from multiprocessing import Pool
import time
def square(x, y):
time.sleep(2)
print('[INFO]...processing: ', x)
return x**y
if __name__ == '__main__':
xs = [1, 2, 3, 4]
#
start = time.time()
with Pool(processes=2) as p:
rets = p.starmap(square, zip(xs, xs), chunksize=len(xs)//2)
print("[INFO]timecost: ", time.time() - start)
4.2. starmap_async
示例如:
from multiprocessing import Pool
import time
def square(x, y):
time.sleep(2)
print('[INFO]...processing: ', x)
return x**y
if __name__ == '__main__':
xs = [1, 2, 3, 4]
#
start = time.time()
with Pool(processes=2) as p:
rets = p.starmap_async(square, zip(xs, xs), chunksize=len(xs)//2).get()
print("[INFO]timecost: ", time.time() - start)
参考
[1] - python 多进程加速执行代码 mutiprocessing Pool - 2021.01.06 - 知乎
[2] - multiprocessing --- 基于进程的并行
[3] - Python进程池multiprocessing.Pool八个函数对比 - 2020.10.24
[4] - python multiprocessing 中imap和map的不同 - 2018.11.20