多进程中,每个进程都是独立的,各自持有一份数据,无法共享。本篇文章介绍三种用于进程数据共享的方法
- queues
- Array
- Manager.dict
- pipe
Queue
1 2 3 4 5 6 7 8 9 10 11 12 13
| from multiprocessing import queues import multiprocessing
def func(i, q): q.put(i) print("--->", i, q.qsize())
q = queues.Queue(9, ctx=multiprocessing) for i in range(5): p = multiprocessing.Process(target=func, args=(i, q,)) p.start() p.join()
|
Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常
get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常
1 2 3 4 5 6 7 8 9 10 11 12
| import multiprocessing
def func(i, q): q.put(i) print("--->", i, q.qsize())
q = multiprocessing.Queue() for i in range(5): p = multiprocessing.Process(target=func, args=(i, q,)) p.start() p.join()
|
Array
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| from multiprocessing import Process from multiprocessing import Array
def func(i, ar): ar[i] = i for item in ar: print(item) print("------")
ar = Array('i', 5) for i in range(5): p = Process(target=func, args=(i, ar,)) p.start() p.join()
|
Array的局限性在于受制于数组的特性,即需要指定数据类型且长度固定
1 2 3 4 5 6 7
| 'c': ctypes.c_char, 'u': ctypes.c_wchar, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, 'h': ctypes.c_short, 'H': ctypes.c_ushort, 'i': ctypes.c_int, 'I': ctypes.c_uint, 'l': ctypes.c_long, 'L': ctypes.c_ulong, 'f': ctypes.c_float, 'd': ctypes.c_double
|
Manager.dict
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| from multiprocessing import Process, Manager
def func(i, d): d[i] = i + 100 print(d.values())
m = Manager() d = m.dict()
for i in range(5): p = Process(target=func, args=(i, d)) p.start() p.join()
------------ [100] [100, 101] [100, 101, 102, 103] [100, 101, 102, 103] [100, 101, 102, 103, 104]
|
Manager.dict是多进程数据共享中比较常用的做法
pipe
Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息
send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import multiprocessing
def func1(arg, pp): pp.send(arg)
def func2(pp): recv = pp.recv() print(recv)
pp = multiprocessing.Pipe() p1 = multiprocessing.Process(target=func1, args=("PolarSnow", pp[0],)) p2 = multiprocessing.Process(target=func2, args=(pp[1],)) p1.start() p2.start() p1.join() p2.join()
------------ PolarSnow
|