多进程中,每个进程都是独立的,各自持有一份数据,无法共享。本篇文章介绍三种用于进程数据共享的方法

  • queues
  • Array
  • Manager.dict
  • pipe

Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import queues
import multiprocessing

def func(i, q):
q.put(i)
print("--->", i, q.qsize())


q = queues.Queue(9, ctx=multiprocessing)
for i in range(5):
p = multiprocessing.Process(target=func, args=(i, q,))
p.start()
p.join()

Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常

get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常

1
2
3
4
5
6
7
8
9
10
11
12
import multiprocessing

def func(i, q):
q.put(i)
print("--->", i, q.qsize())


q = multiprocessing.Queue()
for i in range(5):
p = multiprocessing.Process(target=func, args=(i, q,))
p.start()
p.join()

Array

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process
from multiprocessing import Array

def func(i, ar):
ar[i] = i
for item in ar:
print(item)
print("------")

ar = Array('i', 5)
for i in range(5):
p = Process(target=func, args=(i, ar,))
p.start()
p.join()

Array的局限性在于受制于数组的特性,即需要指定数据类型且长度固定

1
2
3
4
5
6
7
# 数据类型对照表
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double

Manager.dict

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing import Process, Manager

# 每个子进程执行的函数
# 参数中,传递了一个用于多进程之间数据共享的特殊字典
def func(i, d):
d[i] = i + 100
print(d.values())

# 在主进程中创建特殊字典
m = Manager()
d = m.dict()

for i in range(5):
# 让子进程去修改主进程的特殊字典
p = Process(target=func, args=(i, d))
p.start()
p.join()

------------
[100]
[100, 101]
[100, 101, 102, 103]
[100, 101, 102, 103]
[100, 101, 102, 103, 104]

Manager.dict是多进程数据共享中比较常用的做法

pipe

Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息

send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import multiprocessing

def func1(arg, pp):
pp.send(arg)

def func2(pp):
recv = pp.recv()
print(recv)

pp = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=func1, args=("PolarSnow", pp[0],))
p2 = multiprocessing.Process(target=func2, args=(pp[1],))
p1.start()
p2.start()
p1.join()
p2.join()

------------
PolarSnow

Python原生没有数组的概念,这点不同于Java之类的面向对象语言。Python中原生的列表使用起来很像数组,但是两者有本质的区别

Python是一门动态语言,在Python列表的创建中,不需要指定数据类型,而且不需要指定列表的长度—因为Python中的列表是通过链表的形式实现的;相对应的,数组的特点是必须在创建的时候声明它的数据类型,而且必须指定数组的长度

#列表与数组的区别

内存地址

  • 列表中的所有元素的内存地址可以不是连续的,它是通过每个元素记录它的上一个元素的内存地址和记录它下一个元素的内存地址来排列的
  • 数组是一个连续的内存空间,每一个元素都按照先后顺序排列在内存当中

数据类型

  • Python中的列表可以放任意类型的对象,这个特性是Python赋予的,不是链表本身的特性
  • 数组一旦声明数据类型,之后放入每个的元素必须都是这个数据类型的对象,如果不是,就会抛出异常

Python中使用到数组的场景

  • from multiprocessing import Array

在多线程的介绍中,介绍了可以自定义多线程类,并且重写多线程类的run方法,实现了第二种使用多线程的方式

在多进程中,也可以自定义多进程类,是通过重写多进程类的run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import multiprocessing

class MyProcess(multiprocessing.Process):
def __init__(self, func, args):
# 将函数名和参数封装到本类中
self.func = func
self.args = args
# 为保证继承的类正常运行,这里需要执行父类的构造方法
# 但是没有给父类的构造方法传递任何参数
# 而对象寻找参数时,也是最先从本类中寻找
super(MyProcess, self).__init__()

def run(self):
# 执行参数中的函数
self.func(*self.args)

# 自定义一个需要子进程运行的函数
def f(*args):
print(args)

# 使用自定义的进程对象执行
p = MyProcess(func=f, args=(59,15))
p.start()

进程p调用start()时,自动调用run()

Python中的多进程可以使用到多颗CPU带来的好处,在前面多线程中已经总结出,对于IO密集型的操作,由于IO不占用CPU,所以使用轻量级的多线程即可解决问题;而对于计算密集型操作,由于多线程不支持使用多颗CPU,所以使用多进程是最合适的。本篇文件就介绍Python多进程的使用

创建一个子进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import multiprocessing
import time


def worker(arg):
time.sleep(2)
print(arg)

if __name__ == '__main__':
p = multiprocessing.Process(target=worker, args=("Hello PolarSnow",))
p.start()
print("p.pid:", p.pid)
print("p.name", p.name)
print("p.is_alive:", p.is_alive())

------------
p.pid: 17731
p.name Process-1
p.is_alive: True
Hello PolarSnow

如果你看过前面多线程的文章,可以看出,多进程的基本用法和多线程很相似

多个进程处理同一个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import multiprocessing
import time


def worker(arg):
time.sleep(2)
print(arg)

if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
p.start()

print("CPU数量:", str(multiprocessing.cpu_count()))

for p in multiprocessing.active_children():
print("子进程名称:", p.name, "子进程id:", p.pid)

------------
CPU数量: 4
子进程名称: Process-5 子进程id: 18705
子进程名称: Process-3 子进程id: 18703
子进程名称: Process-2 子进程id: 18702
子进程名称: Process-1 子进程id: 18701
子进程名称: Process-4 子进程id: 18704
0
1
2
3
4

多个进程处理多个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import multiprocessing
import time


def worker1(arg):
time.sleep(2)
print("worker1", arg)


def worker2(arg):
time.sleep(2)
print("worker2", arg)


def worker3(arg):
time.sleep(2)
print("worker3", arg)

if __name__ == '__main__':

p1 = multiprocessing.Process(target=worker1, args=("PS1",))
p2 = multiprocessing.Process(target=worker1, args=("PS2",))
p3 = multiprocessing.Process(target=worker1, args=("PS3",))
p1.start()
p2.start()
p3.start()

print("CPU数量:", str(multiprocessing.cpu_count()))

# 可以查看当前正在运行的子进程的信息
for p in multiprocessing.active_children():
print("子进程名称:", p.name, "子进程id:", p.pid)

daemon属性

不加daemon属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import multiprocessing
import time


def worker(arg):
time.sleep(2)
print(arg)

if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
# p.daemon = True
p.start()

print("CPU数量:", str(multiprocessing.cpu_count()))

for p in multiprocessing.active_children():
print("子进程名称:", p.name, "子进程id:", p.pid)

print("End")

------------
CPU数量: 4
子进程名称: Process-2 子进程id: 21859
子进程名称: Process-3 子进程id: 21860
子进程名称: Process-1 子进程id: 21858
子进程名称: Process-4 子进程id: 21861
子进程名称: Process-5 子进程id: 21862
End
0
1
2
3
4

加上daemon属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import multiprocessing
import time


def worker(arg):
time.sleep(2)
print(arg)

if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
p.daemon = True
p.start()

print("CPU数量:", str(multiprocessing.cpu_count()))

for p in multiprocessing.active_children():
print("子进程名称:", p.name, "子进程id:", p.pid)

print("End")

------------
CPU数量: 4
子进程名称: Process-2 子进程id: 21913
子进程名称: Process-1 子进程id: 21912
子进程名称: Process-4 子进程id: 21915
子进程名称: Process-5 子进程id: 21916
子进程名称: Process-3 子进程id: 21914
End

p.daemon的默认值为False,意为主进程等待子进程执行完毕后再退出程序

p.daemon的值为True时,意为当主进程执行完毕时,立即退出程序

join方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import multiprocessing
import time


def worker(arg):
time.sleep(2)
print(arg)

if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
p.daemon = True
p.start()

print("CPU数量:", str(multiprocessing.cpu_count()))

for p in multiprocessing.active_children():
print("子进程名称:", p.name, "子进程id:", p.pid)
p.join()
print("End")

------------
CPU数量: 4
子进程名称: Process-2 子进程id: 22196
子进程名称: Process-1 子进程id: 22195
子进程名称: Process-3 子进程id: 22197
子进程名称: Process-5 子进程id: 22199
子进程名称: Process-4 子进程id: 22198
0
1
2
3
End

注意:

  • join方法的作用是阻塞住主进程,让CPU无法执行join后面的代码,专注执行多线程
  • 上面演示的是多个进程对应一个join的情况,意为等待所有的子进程执行完毕后,再向下执行代码;如果join写在for循环里面,多个线程对应多个join时,意为前一个子进程执行完毕后,才能执行下一次循环,执行下一个任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import multiprocessing
import time


def worker(arg):
time.sleep(2)
print(arg)

if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
p.daemon = True
p.start()
p.join()

print("CPU数量:", str(multiprocessing.cpu_count()))

for p in multiprocessing.active_children():
print("子进程名称:", p.name, "子进程id:", p.pid)
# p.join()
print("End")

相当于用子进程串行执行任务

  • join是可以添加参数的,在没有参数的时候,会一直阻塞到函数执行完毕为止;设置参数后(int 时间 单位是秒)意为join最长阻塞的时间,超过这个时间后,不管子进程有没有执行完毕都会继续向下执行
1
p.join(1)

最多等待1秒(最多阻塞1秒)

上一篇文章简单的介绍了一个线程池的基本实现思路。在上一篇的代码实现中,其实还有很多问题。第一,线程没有被重用,我们只是模拟了一个任务拿走一个线程,执行完毕后再“还”回去的过程,其实是在线程池中新创建了一个线程类,被使用过的线程将等待被Python的垃圾回收机制回收;第二,如果执行的任务的数量小于线程池的容量,那我们在线程池的构造方法中,预先填满的线程类就显得非常浪费了

改进思路:

  • 之前是将线程放入到队列中,启动线程池后,整个队列就会充满待使用的线程
    • 改进:将任务放入到队列中,然任务排队进入队列。线程的数量通过列表的长度控制,每创建一个线程,就在列表中append一下
  • 之前队列中的线程,每一个任务取走之后,执行完任务都会创建一个新的线程放入到队列中
    • 改进:每个线程执行完任务后,不再释放线程资源,循环去任务队列中获取任务去执行,直到判断拿到的是空值而不是一个任务时,退出自己的线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import queue
import threading
import contextlib
import time

# 队列中的空值
StopEvent = object()


class ThreadPool(object):

def __init__(self, max_num, max_task_num = None):
"""
构造方法
:param max_num: 最多有多少个线程
:param max_task_num: 接收任务队列的长度
"""
if max_task_num:
self.q = queue.Queue(max_task_num)
else:
self.q = queue.Queue()
self.max_num = max_num
self.cancel = False
self.terminal = False
self.generate_list = [] # 当前已经创建的线程
self.free_list = [] # 当前还空闲多少线程

def run(self, func, args, callback=None):
"""
线程池执行一个任务
:param func: 任务函数
:param args: 任务函数所需参数
:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
:return: 如果线程池已经终止,则返回True否则None
"""

# self.cancel是在构造方法和self.close方法中设置的值
# 在close方法中将该值设置为False,意为当前任务队列中的所有任务全部已经执行完毕
if self.cancel:
return
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
# 如果空闲的线程为0 且 当前已经创建的线程数量小于设置的最大线程数量
# 调用本类的其他方法创建一个新的线程
self.generate_thread()
# 将run方法收到的三个参数组成一个数据
w = (func, args, callback,)
# 放入到任务队列当中
self.q.put(w)

def generate_thread(self):
"""
创建一个线程
"""
# 每一个线程被创建出来后都去执行了本类中的call方法
t = threading.Thread(target=self.call)
# 启动线程
t.start()

def call(self):
"""
循环去获取任务函数并执行任务函数
"""

# 获取当前线程
current_thread = threading.currentThread
# 将当前线程加入到"当前已创建线程"列表中
self.generate_list.append(current_thread)

# 在队列中获取一个任务
event = self.q.get()

# while循环,当event不为空对象的时候
while event != StopEvent:
# 队列中的每个元素由一个元祖构成(func, args, callback,)
func, arguments, callback = event
try:
# 去执行函数
# 这里func对应的是action函数
# arguments参数对应了i
# result接收这个函数的返回值
result = func(*arguments)
success = True
except Exception as ex:
success = False
result = None

# 判断callback函数是否为空
if callback is not None:
try:
# 执行这个回调函数,将执行的状态和返回的结果传递给回调函数
callback(success, result)
except Exception as ex:
pass

# 以上action函数执行完后,该线程变为空闲状态
# 以下代码将该线程加入到"当前空闲线程"列表中,等待线程复用
with self.worker_state(self.free_list, current_thread):
# 默认的self.terminal为False,会一直去队列中取任务执行
# 在terminate方法中,设置了self.terminal的值为True,也就是说,一旦调用terminate方法,这个线程将被回收,不再执行新的任务
if self.terminal:
event = StopEvent
else:
# 复用当前线程,再去队列中获取一个任务
event = self.q.get()
else:
# event为空对象,说明当前已经没有需要执行的任务了
# 每个线程把自己的线程对象从"当前已创建线程"列表中移除(不再复用)
self.generate_list.remove(current_thread)

def close(self):
"""
执行完所有的任务后,所有线程停止
"""
self.cancel = True
# 获取当前线程列表的长度(有多少个复用的线程)
full_size = len(self.generate_list)
# 因为任务已经执行完毕,任务队列为空,所有的线程都在等待获取新的任务
# 向任务队列中,插入StopEvent空值,让每个线程拿到这个任务的时候,退出掉自己的线程
while full_size:
# 当前可以复用的线程列表中有多少个线程对象就插入多少个空值
self.q.put(StopEvent)
full_size -= 1

def terminate(self):
"""
无论是否还有任务,终止线程
"""
self.terminal = True

# 把队列中的所有任务全部设置为空对象(结束线程的标识)
# 每个拿到这个空对象任务的线程都会结束自己的线程
while self.generate_list:
self.q.put(StopEvent)
# 所有的线程都被结束后,清空队列中的所有内容
self.q.empty()

# 上下文管理的装饰器,以下函数可以被with调用执行
@contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
"""
用于记录线程中正在等待的线程数
"""
state_list.append(worker_thread)
try:
yield
finally:
state_list.remove(worker_thread)


if __name__ == '__main__':
# 创建一个线程池
# 最多同时创建3个线程在处理任务(消费者从队列中消费数据)
# 任务队列中同一时间最多存放5个任务(生产者向队列中存放数据)
pool = ThreadPool(3, max_task_num=5)

# 回调函数,每一个子线程处理完一个任务之后,都会调用这个方法
# 用来处理子线程执行任务后的执行状态和返回结果
def callback(status, result):
# status, execute action status
# result, execute action return value
pass

# 真正实现多线程的函数
def action(i):
print(i)

# 创建30个任务
for i in range(30):
# 每一次迭代就执行一次run方法
ret = pool.run(action, (i,), callback)

time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))

# 关闭线程池
pool.close()

# 终止线程池并清空任务队列
# pool.terminate()

在使用多线程的过程中,一旦任务数量过多,如果不对派生出来的子线程做出限制的话,多线程极有可能会拉低程序的性能,这时,我们需要维护一个固定数量的线程,也就是线程池的概念

实现线程池的思路:

  • 设置线程池的容量
  • 每取走一个线程,在线程池中就减少一个线程
  • 每个子线程的任务执行完毕,把自己的线程配额放回线程池中
  • 当线程池中的线程全部分配完毕,新的任务需要等待获取线程池中的线程

基于以上的需求,我们可以通过队列来实现线程池的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import queue
import threading
import time

class ThreadPool:
# 接收的实例化参数为线程池的最大容量
def __init__(self, maxsize=5):
self.maxsize = maxsize

# 设置队列的最大容量
self.q = queue.Queue(maxsize)

# 用线程类填满队列
for i in range(maxsize):
self.q.put(threading.Thread)

def get_thread(self):
# 从队列中取出一个线程类返回
return self.q.get()

def add_thread(self):
# 先线程池添加一个线程类
# 用来被执行完任务的函数调用,实现回收线程配额的功能
self.q.put(threading.Thread)

def task(arg, pool):
"""接收了一个线程池对象"""
print(arg)
time.sleep(1)

# 执行完毕函数的主体后,将拿到的线程配额再放回到线程池中
pool.add_thread()

if __name__ == '__main__':
# 创建一个容量为5的线程池
pool = ThreadPool(5)

for i in range(59):

# 从线程池中拿到一个线程对象
# 相当于 t = threading.Thread
t = pool.get_thread()

# 创建子线程对象
# 把线程池对象传递到执行函数中
o = t(target=task, args=(i, pool,))

o.start()

在多线程的使用过程中,需要考虑的一个问题就是线程安全。如果使用多线程对多个对象进行操作,也许不会有异常的情况的产生,因为每个对象是独立的,每个对象的改变对其他对象没有影响。但是如果使用多线程操作同一个对象,极有可能产生脏数据;但是同时访问同一个对象不会产生脏数据

多线程修改同一对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading
import time

NUM = 5

def func():
global NUM
NUM -= 1
time.sleep(1)
print(NUM)

for i in range(5):
t = threading.Thread(target=func)
t.start()

------------
00
0
0
0

上面的小程序中,定义了一个全局变量NUM,在func方法中对全局变量NUM做自减一的操作,我们理想中的结果是

1
2
3
4
5
4
3
2
1
0

但实际结果却全都是0

这是因为每个线程运行到sleep(1)的时候都在这里卡住,最后,所有的线程都执行完了NUM -= 1,这个时候全局变量NUM已经=0,而第一个线程还没有执行完sleep(1),所以最后所有的线程打印出来的都是0

Lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import threading
import time

NUM = 5

# 函数里接收一个锁对象
def func(lock):
global NUM

# 获取锁
lock.acquire()
NUM -= 1
time.sleep(1)
print(NUM)

# 释放锁
lock.release()

# 创建一个互斥锁
lock = threading.Lock()

for i in range(5):
# 把创建的锁对象当做参数传递给函数
t = threading.Thread(target=func, args=(lock,))
t.start()

------------
4
3
2
1
0

RLock

RLock和Lock的使用方法相同,但却是Lock的”升级版”,RLock支持在函数内的多层嵌套锁,例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import threading
import time

NUM = 5

# 函数里接收一个锁对象
def func(lock):
global NUM

# 获取锁
lock.acquire()
NUM -= 1

# 嵌套锁
lock.acquire()
time.sleep(1)
lock.release()

print(NUM)

# 释放锁
lock.release()

# 创建一个互斥锁
lock = threading.RLock()

for i in range(5):
# 把创建的锁对象当做参数传递给函数
t = threading.Thread(target=func, args=(lock,))
t.start()

Semaphore

信号量,可以放出一批锁,类似于批量“授权”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import threading
import time

NUM = 5

# 函数里接收一个锁对象
def func(i, lock):
global NUM

# 获取锁,这里最多可以发放3把锁
lock.acquire()
NUM -= 1
time.sleep(1)
print(i, NUM)

# 释放锁
lock.release()

# 创建一个锁,锁的数量有3个
lock = threading.BoundedSemaphore(3)

for i in range(5):
# 把创建的锁对象当做参数传递给函数
t = threading.Thread(target=func, args=(i, lock,))
t.start()

------------
0 2
1 2
2 2
4 0
3 0

从结果中可以看出,总共有3把锁被放出,所以前三个线程打印值是相同的,三个线程依次修改NUM的值,然后依次进入time.sleep(1)等待(此时NUM已经被自减了3次,值=2),最后依次打印出NUM的值

event

事件, 同样是“批量授权”,但是和信号量指定创建锁的数量不同的是,事件要么锁住所有的线程,要么释放所有的线程,不能自定义锁的数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import threading

# 接收一个event对象
def func(i, event):
print(i)

# 这里的wait()方法会阻塞所有的线程,但是是否阻塞取决于event维护的全局Flag
event.wait() # wait会去检测"Flag"的值的真假
print(i + 100)

# 创建一个event对象
event = threading.Event()

# 设置event对象中的"Flag"为False
# 默认情况下就是False
event.clear()

for i in range(5):
t = threading.Thread(target=func, args=(i, event))
t.start()

inp = input("> ")
if inp == "go":
# 设置event对象中的"Flag"为True
# 释放了所有的锁
event.set()

------------
0
1
2
3
4
> go
100
103
101
104
102
  • wait:检测“Flag”的值
  • clear:将“Flag”设置为False
  • set:将“Flag”设置为True

Condition

条件,上面说了事件的特性,就是要阻塞就阻塞住所有的子线程,要释放就释放掉所有的锁,不支持信号量中的自定义释放锁的数量

条件锁,既支持事件阻塞所有子线程的特性,又具有信号量指定释放锁数量的特性

condition的第一种用法

指定释放锁的数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import threading

# 接收一个condition对象
def func(i, con):
print(i)

# 给线程上锁
con.acquire()
# wait会根据notify指定的数量去释放锁,默认锁住所有线程
con.wait()
print(i + 100)
con.release()


condition = threading.Condition()


for i in range(5):
t = threading.Thread(target=func, args=(i, condition))
t.start()

while True:
inp = input("> ")
if inp == "q":
break

# 指定释放锁数量的三剑客
# 语法中规定他们必须按照这个顺序写
condition.acquire()
condition.notify(int(inp)) # 这里的int类型的inp变量,就是指定释放锁的数量
condition.release()

------------
0
1
2
3
4
> 1
> 100
2
> 102
101
3
> 104
103

最后只打印两个数字,是因为总共有5个测试数据,前面已经累计释放了三个子线程处理的测试数据,所以第三次,我即使指定了释放3个锁(当时程序中只剩两个子线程被锁住),也只会出现2个值

condition的第二种用法

如果条件为真,释放一个锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import threading

def select():
ret = False
inp = input("> ")
if inp == "true":
ret = True
else:
ret = False
return ret

# 接收一个condition对象
def func(i, con):
print(i)

# 给线程上锁
con.acquire()
# wait_for会根据参数的值的真假来选择是否释放一个锁
# 这里传入的是一个函数,也就是说如果函数的返回值为真,就释放一个锁
con.wait_for(select)
print(i + 100)
con.release()

# 创建一把条件锁
condition = threading.Condition()

for i in range(5):
t = threading.Thread(target=func, args=(i, condition))
t.start()

------------
0
> 1
2
3
4
true
100
> true
101
>

con.wait_for(select)中,我们传入了一个函数名,当第一个子线程执行到这一行的时候,会自动去执行这个函数,从而打印出了第一个>符号,之后就停在了等待用户输入的那一行input()

其他所有的子线程都停在了con.acquare()这一行,等待前面的子线程释放锁

这时,我输入一个trueselect函数返回了一个True, wait_for()接收到这个True之后,就继续向下执行,最后释放掉自己的锁

接着下一个被调度到的子线程执行到wait_for()又去执行了里面传入的函数……以此类推

Timer

Timer不是锁,是定时器,用来指定x秒后执行某些操作

1
2
3
4
5
6
7
8
9
10
11
12
from threading import Timer


def hello():
print("hello, world")


t = Timer(1, hello)
t.start()

------------
hello, world

结果没有立即输入,而是等了一秒钟之后输出了hello, world

VMware vSphere Client上添加存储时显示:在 ESXi“192.168.59.59”上调用对象 “datastoresystem”的“HostDatastoreSystem.QueryVmfsDatastoreCreateOptions” 失败

  • 在该主机的配置—>安全配置文件中,开启SSHESXi Shell服务
  • ssh root@192.168.59.59
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
> ls /vmfs/devices/disks
naa.6782bcb0635695001f1f261018908d00
naa.6782bcb0635695001f1f261018908d00:1
naa.6782bcb0635695001f1f261018908d00:2
naa.6782bcb0635695001f1f261018908d00:3
naa.6782bcb0635695001f1f261018908d00:5
naa.6782bcb0635695001f1f261018908d00:6
naa.6782bcb0635695001f1f261018908d00:7
naa.6782bcb0635695001f1f261018908d00:8
naa.6782bcb0635695001f1f261018908d00:9
naa.6782bcb0635695001f21a2451bd15e08
naa.6782bcb0635695001f21a2451bd15e08:1
naa.6782bcb0635695001f21a2451bd15e08:2
naa.6782bcb0635695001f21a25f1d533c53
naa.6782bcb0635695001f21a25f1d533c53:1
vml.02000000006782bcb0635695001f1f261018908d00504552432036
vml.02000000006782bcb0635695001f1f261018908d00504552432036:1
vml.02000000006782bcb0635695001f1f261018908d00504552432036:2
vml.02000000006782bcb0635695001f1f261018908d00504552432036:3
vml.02000000006782bcb0635695001f1f261018908d00504552432036:5
vml.02000000006782bcb0635695001f1f261018908d00504552432036:6
vml.02000000006782bcb0635695001f1f261018908d00504552432036:7
vml.02000000006782bcb0635695001f1f261018908d00504552432036:8
vml.02000000006782bcb0635695001f1f261018908d00504552432036:9
vml.02000000006782bcb0635695001f21a2451bd15e08504552432036
vml.02000000006782bcb0635695001f21a2451bd15e08504552432036:1
vml.02000000006782bcb0635695001f21a2451bd15e08504552432036:2
vml.02000000006782bcb0635695001f21a25f1d533c53504552432036
vml.02000000006782bcb0635695001f21a25f1d533c53504552432036:1
  • vSphere Client中找到报错的那块磁盘名字,我这里是naa.6782bcb0635695001f21a2451bd15e08
  • 将该块磁盘重新分区
1
> partedUtil mklabel /dev/disks/naa.6782bcb0635695001f21a2451bd15e08 gpt

我这里的磁盘容量为2T,这里使用gpt分区表,如果容量不是很大,可以写msdos 使用MBR分区表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
> ls /vmfs/devices/disks
naa.6782bcb0635695001f1f261018908d00
naa.6782bcb0635695001f1f261018908d00:1
naa.6782bcb0635695001f1f261018908d00:2
naa.6782bcb0635695001f1f261018908d00:3
naa.6782bcb0635695001f1f261018908d00:5
naa.6782bcb0635695001f1f261018908d00:6
naa.6782bcb0635695001f1f261018908d00:7
naa.6782bcb0635695001f1f261018908d00:8
naa.6782bcb0635695001f1f261018908d00:9
naa.6782bcb0635695001f21a2451bd15e08
naa.6782bcb0635695001f21a25f1d533c53
naa.6782bcb0635695001f21a25f1d533c53:1
vml.02000000006782bcb0635695001f1f261018908d00504552432036
vml.02000000006782bcb0635695001f1f261018908d00504552432036:1
vml.02000000006782bcb0635695001f1f261018908d00504552432036:2
vml.02000000006782bcb0635695001f1f261018908d00504552432036:3
vml.02000000006782bcb0635695001f1f261018908d00504552432036:5
vml.02000000006782bcb0635695001f1f261018908d00504552432036:6
vml.02000000006782bcb0635695001f1f261018908d00504552432036:7
vml.02000000006782bcb0635695001f1f261018908d00504552432036:8
vml.02000000006782bcb0635695001f1f261018908d00504552432036:9
vml.02000000006782bcb0635695001f21a2451bd15e08504552432036
vml.02000000006782bcb0635695001f21a25f1d533c53504552432036
vml.02000000006782bcb0635695001f21a25f1d533c53504552432036:1

再次添加磁盘,成功!


参考文档: https://kb.vmware.com/selfservice/microsites/search.do?cmd=displayKC&externalId=2076191

生活中非常常见的一种场景就是排队,早期的鸟儿🐦有虫🐛吃,越早排队就越早能办理业务。本篇文章介绍Python中的“排队系统”,先进先出队列的基本使用

put数据

消息队列的长度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 先进先出队列
import queue

# 最多接收10个数据
q = queue.Queue(10)

# put 向队列中添加数据
q.put(15)
q.put(59)

# 获取当前队列长度
print(q.qsize())

# 取出最前面的一个数据
print(q.get())

超时时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 先进先出队列
import queue

# 最多接收10个数据
q = queue.Queue(2)

# put 向队列中添加数据
q.put(15)
q.put(59)
print(q.qsize())

# 超时时间为2秒
q.put('PolarSnow', timeout=2)

# 获取当前队列长度
print(q.qsize())

# 取出最前面的一个数据
print(q.get())

------------
2
Traceback (most recent call last):
File "/Users/lvrui/PycharmProjects/untitled/10/temp.py", line 157, in <module>
q.put('PolarSnow', timeout=2)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/queue.py", line 141, in put
raise Full
queue.Full

现在队列的最大长度设置为2,当第三个数据向里面插入时,最多等待两秒,两秒后还没有进入到队列中就报错

设置队列不阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 先进先出队列
import queue

# 最多接收10个数据
q = queue.Queue(2)

# put 向队列中添加数据
q.put(15)
q.put(59)

# 设置队列不阻塞(当队列满的时候再插入数据,直接报错)
q.put('PolarSnow', block=False)

# 获取当前队列长度
print(q.qsize())

# 取出最前面的一个数据
print(q.get())

默认程序会阻塞,等待新的值插入到队列当中,使用了block=False参数后,强制设置为不阻塞,一旦超出队列长度,立即抛出异常

get数据

设置超时时间

1
2
3
4
5
6
7
8
9
10
import queue

q = queue.Queue(2)
q.put(15)
q.put(59)

print(q.qsize())
print(q.get())
print(q.get())
print(q.get(timeout=2))

当取值的次数大于队列的长度的时候就会产生阻塞,设置超时时间意为最多等待x秒,队列中再没有数据,就抛出异常

设置不阻塞

1
2
3
4
5
6
7
8
9
10
import queue

q = queue.Queue(2)
q.put(15)
q.put(59)

print(q.qsize())
print(q.get())
print(q.get())
print(q.get(block=False))

获取队列的次数大于队列长度时,默认会阻塞,通过设置block=False来实现非阻塞,立即抛出异常

Queue类

__init__

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)

# mutex must be held whenever the queue is mutating. All methods
# that acquire mutex must release it before returning. mutex
# is shared between the three conditions, so acquiring and
# releasing the conditions also acquires and releases mutex.
self.mutex = threading.Lock()

# Notify not_empty whenever an item is added to the queue; a
# thread waiting to get is notified then.
self.not_empty = threading.Condition(self.mutex)

# Notify not_full whenever an item is removed from the queue;
# a thread waiting to put is notified then.
self.not_full = threading.Condition(self.mutex)

# Notify all_tasks_done whenever the number of unfinished tasks
# drops to zero; thread waiting to join() is notified to resume
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0

如果不给这个构造方法传参数,队列的长度为无限大

get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def get(self, block=True, timeout=None):
'''Remove and return an item from the queue.

If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until an item is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Empty exception if no item was available within that time.
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
'''
with self.not_empty:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while not self._qsize():
remaining = endtime - time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item

参数为是否阻塞和超时时间

get_nowait

1
2
3
4
5
6
7
def get_nowait(self):
'''Remove and return an item from the queue without blocking.

Only get an item if one is immediately available. Otherwise
raise the Empty exception.
'''
return self.get(block=False)

非阻塞获取队列中的值

put && put_nowait

get & get_nowait同理

empty

1
2
3
4
5
6
7
8
9
10
11
12
13
def empty(self):
'''Return True if the queue is empty, False otherwise (not reliable!).

This method is likely to be removed at some point. Use qsize() == 0
as a direct substitute, but be aware that either approach risks a race
condition where a queue can grow before the result of empty() or
qsize() can be used.

To create code that needs to wait for all queued tasks to be
completed, the preferred technique is to use the join() method.
'''
with self.mutex:
return not self._qsize()

检查队列是否为空,为空返回True,不为空返回False

full

1
2
3
4
5
6
7
8
9
10
def full(self):
'''Return True if the queue is full, False otherwise (not reliable!).

This method is likely to be removed at some point. Use qsize() >= n
as a direct substitute, but be aware that either approach risks a race
condition where a queue can shrink before the result of full() or
qsize() can be used.
'''
with self.mutex:
return 0 < self.maxsize <= self._qsize()

判断队列是否已经满了

qsize

1
2
3
4
def qsize(self):
'''Return the approximate size of the queue (not reliable!).'''
with self.mutex:
return self._qsize()

返回队列中元素的个数(真实个数)

构造方法中封装了maxsize字段,可以使用对象.maxsize来获取当前队列的最大值

join & task_done

1
2
3
4
5
import queue

q = queue.Queue(2)
q.put(15)
q.put(59)

程序执行完这5行代码后会退出,退出前队列中还有值,退出后被清空

1
2
3
4
5
6
7
import queue

q = queue.Queue(2)
q.put(15)
q.put(59)

q.join()

程序会一直卡在第7行,只要队列中还有值,程序就不会退出

1
2
3
4
5
6
7
8
9
10
import queue

q = queue.Queue(2)
q.put(15)
q.put(59)

q.get()
q.get()

q.join()

队列中插入两个元素,后面取出了两个元素,执行后你会发现,程序还是卡在第10行的那个join代码

1
2
3
4
5
6
7
8
9
10
11
12
import queue

q = queue.Queue(2)
q.put(15)
q.put(59)

q.get()
q.task_done() # get取完队列中的一个值后,使用task_done方法告诉队列,我已经取出了一个值并处理完毕
q.get()
q.task_done()

q.join()

在每次get取值之后,还需要在跟队列声明一下,我已经取出了数据并处理完毕,这样执行到join代码的时候才不会被卡住

Python提供的所有队列类型

  • 先进先出队列 queue.Queue
  • 后进先出队列 queue.LifoQueue (Queue的基础上进行的封装)
  • 优先级队列 queue.PriorityQueue (Queue的基础上进行的封装)
  • 双向队列 queue.deque

后进先出队列

1
2
3
4
5
6
7
8
9
10
import queue

q = queue.LifoQueue()
q.put('Polar')
q.put('Snow')

print(q.get())

------------
Snow

优先级队列

1
2
3
4
5
6
7
8
9
10
import queue

q = queue.PriorityQueue()
q.put((1, 'Snow'))
q.put((0, 'Polar'))

print(q.get())

------------
Polar

数字越小优先级越高,数字相同,先进先出

双向队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import queue

q = queue.deque()

q.append('A') # 在右侧追加 A
q.append('X') # 在右侧追加 A X
q.appendleft('Z') # 在左侧追加 Z A X
q.appendleft('V') # 在左侧追加 V Z A X

print(q.pop()) # 从右侧取 --> X
print(q.popleft()) # 从左侧取 --> V

------------
X
V

Python中的多线程通过threading模块来实现。实现Python中的多线程有两种方式,本篇文章介绍多线程中,run方法的使用

上一篇文章中介绍了多线程的基本使用方法,在执行Thread对象的start方法之后,声明指定的target函数已经就绪,准备被CPU调用执行。当CPU的时间片分到这个线程的时候,会去执行Thread对象的run方法

这里的start和run方法一定要区分开

  • start方法是声明分到一个子线程的函数已经就绪,等待被CPU执行
  • run方法是执行到这个子线程时,自动调用的方法

拿上一篇多线程基本使用文章中的例子🌰为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading
import time

def func(arg):
print('func start')
time.sleep(2)
print(arg)
print('func end')

# 创建一个线程
# target 指定让线程执行的函数
t = threading.Thread(target=func, args=('PolarSnow',))
t.setDaemon(True) # 默认为False
t.start()
t.join(1) # 主线程停在这里
print('main end')

------------
func start
main end

t = threading.Thread(target=func, args=('PolarSnow',))实例化了一个Thread对象,执行了Thread类的构造方法,构造方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
"""This constructor should always be called with keyword arguments. Arguments are:

*group* should be None; reserved for future extension when a ThreadGroup
class is implemented.

*target* is the callable object to be invoked by the run()
method. Defaults to None, meaning nothing is called.

*name* is the thread name. By default, a unique name is constructed of
the form "Thread-N" where N is a small decimal number.

*args* is the argument tuple for the target invocation. Defaults to ().

*kwargs* is a dictionary of keyword arguments for the target
invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke
the base class constructor (Thread.__init__()) before doing anything
else to the thread.

"""
assert group is None, "group argument must be None for now"
if kwargs is None:
kwargs = {}
self._target = target
self._name = str(name or _newname())
self._args = args
self._kwargs = kwargs
if daemon is not None:
self._daemonic = daemon
else:
self._daemonic = current_thread().daemon
self._ident = None
self._tstate_lock = None
self._started = Event()
self._is_stopped = False
self._initialized = True
# sys.stderr is not stored in the class like
# sys.exc_info since it can be changed between instances
self._stderr = _sys.stderr
# For debugging and _after_fork()
_dangling.add(self)

在Thread的构造方法中,封装了我们传进去的两个参数

而子线程被CPU调度执行的时候,自动执行了Thread对象中的run方法,run方法源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def run(self):
"""Method representing the thread's activity.

You may override this method in a subclass. The standard run() method
invokes the callable object passed to the object's constructor as the
target argument, if any, with sequential and keyword arguments taken
from the args and kwargs arguments, respectively.

"""
try:
if self._target:
self._target(*self._args, **self._kwargs)
finally:
# Avoid a refcycle if the thread is running a function with
# an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs

可以清楚的看到,这里self._target(*self._args, **self._kwargs)去执行了我们传进去的函数名所指向的函数体,简单的说,就是把target参数的那个函数执行了,并且参数就是我们传递进去的那些参数

自定义run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
import threading

# 自定义一个类,继承threading.Thread
class MyThread(threading.Thread):

# 因为CPU执行子线程的时候,会自动执行Thread对象的run方法
# 所以这里我们重写父类的run方法
# 还记得Python的继承关系吗?self.run还是从原点开始找
def run(self):
pass

t = MyThread() # 创建一个线程对象
t.start() # 等待被CPU运行

模拟Thread类的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import threading


class MyThread(threading.Thread):

# 重写父类的构造方法
def __init__(self, func, args):
# 将函数名和参数封装到本类中
self.func = func
self.args = args
# 为保证继承的类正常运行,这里需要执行父类的构造方法
# 但是没有给父类的构造方法传递任何参数
# 而对象寻找参数时,也是最先从本类中寻找
super(MyThread, self).__init__()

def run(self):
# 执行参数中的函数
self.func(self.args)

# 自定义一个需要子线程运行的函数
def f(arg):
print(arg)

# 使用自定义的线程对象执行
t = MyThread(func=f, args=59)
t.start()

实际使用中还是推荐使用threading原生提供的方式,这里的run方法仅供理解threading的内部执行过程