Python中多进程之间的数据共享

多进程中,每个进程都是独立的,各自持有一份数据,无法共享。本篇文章介绍三种用于进程数据共享的方法

  • queues
  • Array
  • Manager.dict
  • pipe

Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import queues
import multiprocessing

def func(i, q):
q.put(i)
print("--->", i, q.qsize())


q = queues.Queue(9, ctx=multiprocessing)
for i in range(5):
p = multiprocessing.Process(target=func, args=(i, q,))
p.start()
p.join()

Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常

get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常

1
2
3
4
5
6
7
8
9
10
11
12
import multiprocessing

def func(i, q):
q.put(i)
print("--->", i, q.qsize())


q = multiprocessing.Queue()
for i in range(5):
p = multiprocessing.Process(target=func, args=(i, q,))
p.start()
p.join()

Array

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process
from multiprocessing import Array

def func(i, ar):
ar[i] = i
for item in ar:
print(item)
print("------")

ar = Array('i', 5)
for i in range(5):
p = Process(target=func, args=(i, ar,))
p.start()
p.join()

Array的局限性在于受制于数组的特性,即需要指定数据类型且长度固定

1
2
3
4
5
6
7
# 数据类型对照表
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double

Manager.dict

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing import Process, Manager

# 每个子进程执行的函数
# 参数中,传递了一个用于多进程之间数据共享的特殊字典
def func(i, d):
d[i] = i + 100
print(d.values())

# 在主进程中创建特殊字典
m = Manager()
d = m.dict()

for i in range(5):
# 让子进程去修改主进程的特殊字典
p = Process(target=func, args=(i, d))
p.start()
p.join()

------------
[100]
[100, 101]
[100, 101, 102, 103]
[100, 101, 102, 103]
[100, 101, 102, 103, 104]

Manager.dict是多进程数据共享中比较常用的做法

pipe

Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息

send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import multiprocessing

def func1(arg, pp):
pp.send(arg)

def func2(pp):
recv = pp.recv()
print(recv)

pp = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=func1, args=("PolarSnow", pp[0],))
p2 = multiprocessing.Process(target=func2, args=(pp[1],))
p1.start()
p2.start()
p1.join()
p2.join()

------------
PolarSnow