Python进程池:multiprocessing.pool

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

对于子进程的print结果,spyder无法打印出来。请用其他方式运行程序。

例1:使用进程池

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#coding: utf-8
import multiprocessing
import time

def func(msg):
print("msg:", msg)
time.sleep(3)
print("end")

if __name__ == "__main__":
pool = multiprocessing.Pool(processes=3)
last_time = time.time()
for i in range(4):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

print("run Main function")
pool.close()
pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
print("Sub-process(es) done.")
print("Cost time:", time.time() - last_time)

运行结果为:

1
2
3
4
5
6
7
8
9
10
11
run Main function
msg: hello 0
msg: hello 1
msg: hello 2
end
end
msg: hello 3
end
end
Sub-process(es) done.
Cost time: 6.180286645889282

函数解释

  • apply_async(func[, args[, kwds[, callback]]]):它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解区别,看例1例2结果区别)
  • close():关闭pool,使其不在接受新的任务。
  • terminate():结束工作进程,不在处理未完成的任务。
  • join():主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用

执行说明:创建一个进程池pool,并设定进程的数量为3,range(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3,所以会出现输出msg: hello 3出现在end后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出run Main function,主程序在pool.join()处等待各个进程的结束。

例2:使用进程池(阻塞)

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#coding: utf-8
import multiprocessing
import time

def func(msg):
print("msg:", msg)
time.sleep(3)
print("end")

if __name__ == "__main__":
pool = multiprocessing.Pool(processes=3)
last_time = time.time()
for i in range(4):
msg = "hello %d" %(i)
pool.apply(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

print("run Main function")
pool.close()
pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
print("Sub-process(es) done.")
print("Cost time:", time.time() - last_time)

执行结果:

1
2
3
4
5
6
7
8
9
10
11
msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
run Main function
Sub-process(es) done.
Cost time: 12.180671453475952

可以看到,当使用apply代替apply_async时,进程会堵塞在那,直到执行完毕。主进程也就在所有进行完成后运行。

例3:使用进程池,并关注结果

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import multiprocessing
import time

def func(msg):
print("msg:", msg)
time.sleep(3)
print("end")
return("done" + msg)

if __name__ == "__main__":
pool = multiprocessing.Pool(processes=3)
result = []
last_time = time.time()
for i in range(4):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
print("run Main function")
pool.close()
pool.join()

for res in result:
print(":::", res.get())
print("Sub-process(es) done.")
print("Cost time:", time.time() - last_time)

运行结果为:

1
2
3
4
5
6
7
run Main function
::: donehello 0
::: donehello 1
::: donehello 2
::: donehello 3
Sub-process(es) done.
Cost time: 6.130031585693359

当更改上面的multiprocessing.Pool(processes=3)multiprocessing.Pool(processes=4)时,运行结果如下:

1
2
3
4
5
6
7
run Main function
::: donehello 0
::: donehello 1
::: donehello 2
::: donehello 3
Sub-process(es) done.
Cost time: 3.2096006870269775

对比第一个结果和第二个结果,在第一个结果中,因为只开辟了三个进程池,所以第四个请求会等待前面的请求运行完了,释放了一个进程时,才会运行,所以程序总的耗时为6s。

例4:使用多个进程池另外,观察第一个结果和第二个结果,主函数中的print("run Main function")会立即执行,并没有等待子进程运行完,也就是说因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行。

例4:进程池中使用不同子函数

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#coding: utf-8
import multiprocessing
import os, time, random

def Lee():
print("\nRun task Lee-%s" %(os.getpid())) #os.getpid()获取当前的进程的ID
start = time.time()
time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
end = time.time()
print('Task Lee, runs %0.2f seconds.' %(end - start))

def Marlon():
print("\nRun task Marlon-%s" %(os.getpid()))
start = time.time()
time.sleep(random.random() * 40)
end=time.time()
print('Task Marlon runs %0.2f seconds.' %(end - start))

def Allen():
print("\nRun task Allen-%s" %(os.getpid()))
start = time.time()
time.sleep(random.random() * 30)
end = time.time()
print('Task Allen runs %0.2f seconds.' %(end - start))

def Frank():
print("\nRun task Frank-%s" %(os.getpid()))
start = time.time()
time.sleep(random.random() * 20)
end = time.time()
print('Task Frank runs %0.2f seconds.' %(end - start))

if __name__=='__main__':
function_list= [Lee, Marlon, Allen, Frank]
print("parent process %s" %(os.getpid()))

pool=multiprocessing.Pool(4)
for func in function_list:
pool.apply_async(func) #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中

print('Waiting for all subprocesses done...')
pool.close()
pool.join() #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
print('All subprocesses done.')

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
parent process 24276
Waiting for all subprocesses done...

Run task Lee-16688
Run task Marlon-27824


Run task Allen-2188

Run task Frank-27380
Task Marlon runs 2.47 seconds.
Task Frank runs 5.59 seconds.
Task Lee, runs 6.12 seconds.
Task Allen runs 19.02 seconds.
All subprocesses done.

map用法

代码:

1
2
3
4
5
6
7
8
9
10
#coding: utf-8
import multiprocessing

def m1(x):
print(x * x)

if __name__ == '__main__':
pool = multiprocessing.Pool(multiprocessing.cpu_count())
i_list = range(8)
pool.map(m1, i_list)

运行结果:

1
2
3
4
5
6
7
8
0
1
4
25
36
9
16
49

但是代码若改成这样的形式,得到的值就是有顺序的:

1
2
3
4
5
6
7
8
9
10
11
#coding: utf-8
import multiprocessing

def m1(x):
return x * x

if __name__ == '__main__':
pool = multiprocessing.Pool(multiprocessing.cpu_count())
i_list = range(8)
res = pool.map(m1, i_list)
print(res)

运行结果:

1
[0, 1, 4, 9, 16, 25, 36, 49]

若要使用apply_async完成该功能

代码:

1
2
3
4
5
6
7
8
9
10
#coding: utf-8
import multiprocessing

def m1(x):
print(x * x)

if __name__ == '__main__':
pool = multiprocessing.Pool(multiprocessing.cpu_count())
multi_res = [pool.apply_async(m1, (i,)) for i in range(8)]
print([res.get() for res in multi_res])

运行结果:

1
2
3
4
5
6
7
8
0
4
1
9
36
25
16
49

同样代码若改成这样的形式,得到的值就是有顺序的:

1
2
3
4
5
6
7
8
9
10
#coding: utf-8
import multiprocessing

def m1(x):
return x * x

if __name__ == '__main__':
pool = multiprocessing.Pool(multiprocessing.cpu_count())
multi_res = [pool.apply_async(m1, (i,)) for i in range(8)]
print([res.get() for res in multi_res])

运行结果:

1
[0, 1, 4, 9, 16, 25, 36, 49]

ThreadPool VS Pool

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Pool
import os, time

print("hi outside of main()")

def hello(x):
print("inside hello()")
print("Proccess id: ", os.getpid())
time.sleep(3)
return x*x

if __name__ == "__main__":
p = Pool(5)
pool_output = p.map(hello, range(3))

print(pool_output)

运行结果为:

1
2
3
4
5
6
7
8
9
10
11
12
13
hi outside of main()
hi outside of main()
hi outside of main()
inside hello()
Proccess id: 31816
hi outside of main()
inside hello()
Proccess id: 32888
inside hello()
Proccess id: 34140
hi outside of main()
hi outside of main()
[0, 1, 4]

当使用ThreadPool,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing.pool import ThreadPool
import os, time

print("hi outside of main()")

def hello(x):
print("inside hello()")
print("Proccess id: ", os.getpid())
time.sleep(3)
return x*x

if __name__ == "__main__":
p = ThreadPool(5)
pool_output = p.map(hello, range(3))

print(pool_output)

运行结果为:

1
2
3
4
5
6
7
8
hi outside of main()
inside hello()
inside hello()
Proccess id: 34396
Proccess id: 34396
inside hello()
Proccess id: 34396
[0, 1, 4]

从这两个运行结果中可以观察到:在Pool中每次运行都会打印outside __main__(),而后者不会。

解释:multiprocessing.pool.ThreadPoolmultiprocessing.Pool行为一样,区别就在于Pool采用的是进程,而ThreadPool采用的是线程。

因此,在Pool中,会开启5个独立的进程,每个进程都有自己的Python解释器,都会重新运行最上面的print函数,所以hi outside of main()打印了多次。注意,只有在windows下的Poolspawn新的进程,在linux下使用fork则不会,只会看到上面的消息打印了一次。

multiprocessing.pool.ThreadPool没有官方文档,是因为它没有完成,缺少测试和文档。所以可以从source code中看他的实现。

这两者使用的规则为:

  • IO bound jobs -> multiprocessing.pool.ThreadPool
  • CPU bound jobs -> multiprocessing.Pool
  • Hybrid jobs -> depends on the workload, I usually prefer the multiprocessing.Pool due to the advantage process isolation brings

对于Python3,可以再concurrent.future.Executor 看看pool的实现。

参考

python进程池:multiprocessing.pool
进程池 Pool
Whats the difference between ThreadPool and Pool in multiprocessing module?

------ 本文结束------
坚持原创技术分享,您的支持将鼓励我继续创作!

欢迎关注我的其它发布渠道