IO操作是不占用CPU的,IO多路复用,是要管理起所有的IO操作。IO多路复用的典型场景是监听socket对象内部是否发生了变化
socket内部什么时候会有变化:
socket实例 这里使用socket实现一个简单的Echo Server的功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import socketsk = socket.socket() sk.bind(('127.0.0.1' , 1559 )) sk.listen(5 ) while True : conn, address = sk.accept() conn.sendall(bytes('hello' , encoding='utf8' )) while True : try : recv = conn.recv(1024 ) if not recv: break if str(recv, encoding='utf-8' ) == 'exit' : break except Exception as ex: break conn.sendall(recv)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import socketsk = socket.socket() sk.connect(('127.0.0.1' , 1559 )) data = sk.recv(1024 ) print(data) while True : i = input("> " ) sk.sendall(bytes(i, encoding='utf8' )) msg = sk.recv(1024 ) print(str(msg, encoding='utf-8' )) sk.close()
以上的socket代码同一时间仅能处理一个客户端的请求,之后连接上来的客户端在第一个客户端还没有断开的时候,会一直等待,直到上一个客户端的请求断开
select.selext()中的第一个参数 建立连接 上面说到,一种socket会变的情况是建立连接
上面的代码中,涉及到建立连接的socket是sk对象(变量)sk对象在执行到sk.accept()
时,接受了一个新客户端的连接请求时候,socket内部就发生了变化,我们就需要监听这种变化,用以分辨出新的客户端的连接
创建socket,绑定并监听之后socket一般不会发生变化,只有当有新的客户端连接进来的时候,socket才会发生变化,我们需要监听的也是这个阶段的变化
得出结论:当socket被创建、绑定并监听之后发生变化,就是有新的客户端进行连接
1 2 3 4 5 6 7 8 9 10 11 import socketimport selectsk = socket.socket() sk.bind(('127.0.0.1' , 1559 )) sk.listen(5 ) while True : rList, w, e = select.select([sk,], [], [], 1 ) print(rList)
上面代码引入了IO多路复用中的select模型,使用select.select()方法,会返回一个有三个元素的元祖
在select.select()方法中的第一个参数,暂时只添加了一个服务端的socket对象,只要服务端socket对象sk有变化(新的客户端连接)就立即把变化的socket对象加入到rList列表中
监听的socket列表中,sk对象有变化 —> rList = [sk]
监听的socket列表中,没有socket发生变化 —> rList = []
以上代码跑起来的效果:
1 2 3 4 5 6 7 [] [] [] ... # 每秒打印一个[] # select.sekect()中的第四个参数起了作用 # 超时时间,监听的对象没有发生变化的时候,多少秒循环一次
说明没有新的连接请求,下面将试验有客户端连接产生的情况
1 2 3 4 5 import socketsk = socket.socket() sk.connect(('127.0.0.1' , 1559 )) sk.close()
客户端一旦连接到服务端,服务端的回显如下
1 2 3 4 5 [<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1559)>] [<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1559)>] [<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1559)>] ... # 疯狂的快速打印...
从上面可以看出,有一个客户端连接进来了。服务端的sk对象内部发生了变化,当select监听到sk对象发生变化后,立即将发生变化的对象赋值给了rList列表(append到列表)从打印出来的内容中可以看出,列表中的元素是发生变化的socket对象
处理rList(服务端socket) rList中保存了所有发生变化的socket对象,以上代码中只监听了服务端socket对象,这里暂时只讨论服务端socket变化的情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import socketimport selectsk = socket.socket() sk.bind(('127.0.0.1' , 1559 )) sk.listen(5 ) while True : rList, w, e = select.select([sk,], [], [], 1 ) print(rList) for s in rList: conn, address = s.accept() conn.sendall(bytes('hello' , encoding='utf8' ))
1 2 3 4 5 6 7 8 9 10 11 import socketsk = socket.socket() sk.connect(('127.0.0.1' , 1559 )) data = sk.recv(1024 ) print(data) while True : input("> " ) sk.close()
连续运行三个客户端连接时,服务端的回显:
1 2 3 4 5 6 7 8 9 [] [] [<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1559)>] [] [<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1559)>] [] [<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1559)>] [] []
每个 客户端的回显:
以上通过对rList中服务端socket对象执行accept()方法,来实现了一个类似并发连接的效果,每一个连接进来的客户端都会被服务端接受请求,“同时”提供服务
接收客户端消息 上面提到,不仅创建连接会触发socket的变化,与客户端连接建立后,客户端发来消息,也会引发客户端socket连接的内部变化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import socketimport selectsk = socket.socket() sk.bind(('127.0.0.1' , 1559 )) sk.listen(5 ) inputs = [sk] while True : rList, w, e = select.select(inputs, [], [], 1 ) print(rList) for s in rList: conn, address = s.accept() inputs.append(conn) conn.sendall(bytes('hello' , encoding='utf8' ))
在上面的代码中,我把服务端socket为新客户端创建的socket也加入到了监听列表中,那么如果有客户端发来消息,select监听到客户端socket(conn)发生变化并加入到rList列表中后,在for循环处理中,客户端的socket并没有accept()方法,而且也不要这个方法,这就需要在for循环中对两类socket区分对待
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import socketimport selectsk = socket.socket() sk.bind(('127.0.0.1' , 1559 )) sk.listen(5 ) inputs = [sk] while True : rList, w, e = select.select(inputs, [], [], 1 ) print("select当前监听socket对象的数量>" , len(inputs), " | 发生变化的socket数量>" , len(rList)) for s in rList: if s == sk: conn, address = s.accept() inputs.append(conn) conn.sendall(bytes('hello' , encoding='utf8' )) else : msg = s.recv(1024 ) print(msg)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import socketsk = socket.socket() sk.connect(('127.0.0.1' , 1559 )) data = sk.recv(1024 ) print(data) while True : i = input("> " ) sk.sendall(bytes(i, encoding='utf8' )) sk.close()
当我运行一个客户端,并Ctrl+C退出时,服务端回显界面在疯狂的打印消息。问题出在了服务端监听的客户端socket连接,当客户端与服务端断开连接时,应在服务端select监听socket对象列表中将该客户端socket对象移除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import socketimport selectsk = socket.socket() sk.bind(('127.0.0.1' , 1559 )) sk.listen(5 ) inputs = [sk] while True : rList, w, e = select.select(inputs, [], [], 1 ) print("select当前监听socket对象的数量>" , len(inputs), " | 发生变化的socket数量>" , len(rList)) for s in rList: if s == sk: conn, address = s.accept() inputs.append(conn) conn.sendall(bytes('hello' , encoding='utf8' )) else : try : msg = s.recv(1024 ) if not msg: raise Exception('客户端已断开连接' ) print(msg) except Exception as ex: inputs.remove(s)
使用最新的server.py与client.py进行测试时,依次运行多个客户端,再依次关闭多个客户端,服务端的回显如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 select当前监听socket对象的数量> 1 | 发生变化的socket数量> 0 select当前监听socket对象的数量> 1 | 发生变化的socket数量> 0 select当前监听socket对象的数量> 1 | 发生变化的socket数量> 0 select当前监听socket对象的数量> 1 | 发生变化的socket数量> 0 select当前监听socket对象的数量> 1 | 发生变化的socket数量> 1 select当前监听socket对象的数量> 2 | 发生变化的socket数量> 0 select当前监听socket对象的数量> 2 | 发生变化的socket数量> 0 select当前监听socket对象的数量> 2 | 发生变化的socket数量> 1 select当前监听socket对象的数量> 3 | 发生变化的socket数量> 0 select当前监听socket对象的数量> 3 | 发生变化的socket数量> 0 select当前监听socket对象的数量> 3 | 发生变化的socket数量> 1 select当前监听socket对象的数量> 4 | 发生变化的socket数量> 0 select当前监听socket对象的数量> 4 | 发生变化的socket数量> 0 select当前监听socket对象的数量> 4 | 发生变化的socket数量> 0 select当前监听socket对象的数量> 4 | 发生变化的socket数量> 0 select当前监听socket对象的数量> 4 | 发生变化的socket数量> 1 b'' select当前监听socket对象的数量> 3 | 发生变化的socket数量> 0 select当前监听socket对象的数量> 3 | 发生变化的socket数量> 0 select当前监听socket对象的数量> 3 | 发生变化的socket数量> 1 b'' select当前监听socket对象的数量> 2 | 发生变化的socket数量> 0 select当前监听socket对象的数量> 2 | 发生变化的socket数量> 1 b'' select当前监听socket对象的数量> 1 | 发生变化的socket数量> 0 select当前监听socket对象的数量> 1 | 发生变化的socket数量> 0 select当前监听socket对象的数量> 1 | 发生变化的socket数量> 0 select当前监听socket对象的数量> 1 | 发生变化的socket数量> 0
特别注意: 这里的服务端定义,当我收到客户端发来的空值的时候,我就默认认为客户端主动需要断开与服务端的连接。由于服务端的这个默认规则,在写客户端的时候,一定要注意处理客户端输入的值为空的情况
给客户端回复消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import socketimport selectsk = socket.socket() sk.bind(('127.0.0.1' , 1559 )) sk.listen(5 ) inputs = [sk] while True : rList, w, e = select.select(inputs, [], [], 1 ) print("select当前监听socket对象的数量>" , len(inputs), " | 发生变化的socket数量>" , len(rList)) for s in rList: if s == sk: conn, address = s.accept() inputs.append(conn) conn.sendall(bytes('hello' , encoding='utf8' )) else : try : msg = s.recv(1024 ) if not msg: raise Exception('客户端已断开连接' ) print(msg) s.sendall(msg) except Exception as ex: inputs.remove(s)
但是一般情况下,会做读写分离,可以通过select,实现读写分离(收发分离)
select.selext()中的第二个参数 1 rList, wList, e = select.select([], [], [], 1 )
select.select()的第二个参数有什么值,wList中就会有什么值
利用select的这个特性,可以把需要回复消息的客户端socket对象赋值给select的第二个参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 import socketimport selectsk = socket.socket() sk.bind(('127.0.0.1' , 1559 )) sk.listen(5 ) inputs = [sk] outputs = [] while True : rList, wList, e = select.select(inputs, outputs, [], 1 ) print("---" * 20 ) print("select当前监听inputs对象的数量>" , len(inputs), " | 发生变化的socket数量>" , len(rList)) print("select当前监听outputs对象的数量>" , len(outputs), " | 需要回复客户端消息的数量>" , len(wList)) for s in rList: if s == sk: conn, address = s.accept() inputs.append(conn) conn.sendall(bytes('hello' , encoding='utf8' )) else : try : msg = s.recv(1024 ) if not msg: raise Exception('客户端已断开连接' ) else : outputs.append(s) print(msg) except Exception as ex: inputs.remove(s) for s in wList: s.sendall(bytes('server response' , encoding='utf8' )) outputs.remove(s)
wList = 所有给服务端发送消息的客户端,也是需要回复消息客户端列表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import socketsk = socket.socket() sk.connect(('127.0.0.1' , 1559 )) data = sk.recv(1024 ) print(data) while True : i = input("> " ) sk.sendall(bytes(i, encoding='utf8' )) msg = sk.recv(1024 ) print(str(msg, encoding='utf-8' )) sk.close()
执行过程:
依次连接三个客户端
第一个客户端依次向服务发送了两次消息
服务端的回显:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 ------------------------------------------------------------ select当前监听inputs对象的数量> 1 | 发生变化的socket数量> 0 select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0 ------------------------------------------------------------ select当前监听inputs对象的数量> 1 | 发生变化的socket数量> 0 select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0 ------------------------------------------------------------ select当前监听inputs对象的数量> 2 | 发生变化的socket数量> 0 select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0 ------------------------------------------------------------ select当前监听inputs对象的数量> 2 | 发生变化的socket数量> 1 select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0 ------------------------------------------------------------ select当前监听inputs对象的数量> 3 | 发生变化的socket数量> 0 select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0 ------------------------------------------------------------ select当前监听inputs对象的数量> 3 | 发生变化的socket数量> 1 select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0 ------------------------------------------------------------ select当前监听inputs对象的数量> 4 | 发生变化的socket数量> 0 select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0 ------------------------------------------------------------ select当前监听inputs对象的数量> 4 | 发生变化的socket数量> 1 select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0 b'ps' ------------------------------------------------------------ select当前监听inputs对象的数量> 4 | 发生变化的socket数量> 0 select当前监听outputs对象的数量> 1 | 需要回复客户端消息的数量> 1 ------------------------------------------------------------ select当前监听inputs对象的数量> 4 | 发生变化的socket数量> 0 select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0 ------------------------------------------------------------ select当前监听inputs对象的数量> 4 | 发生变化的socket数量> 1 select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0 b'ps' ------------------------------------------------------------ select当前监听inputs对象的数量> 4 | 发生变化的socket数量> 0 select当前监听outputs对象的数量> 1 | 需要回复客户端消息的数量> 1 ------------------------------------------------------------ select当前监听inputs对象的数量> 4 | 发生变化的socket数量> 0 select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0 ------------------------------------------------------------ select当前监听inputs对象的数量> 3 | 发生变化的socket数量> 0 select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0 ------------------------------------------------------------ select当前监听inputs对象的数量> 3 | 发生变化的socket数量> 0 select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0 ------------------------------------------------------------ select当前监听inputs对象的数量> 2 | 发生变化的socket数量> 0 select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0 ------------------------------------------------------------ select当前监听inputs对象的数量> 2 | 发生变化的socket数量> 1 select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0 ------------------------------------------------------------ select当前监听inputs对象的数量> 1 | 发生变化的socket数量> 0 select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0
第一个客户端的回显:
1 2 3 4 5 6 b'hello' > ps server response > ps server response >
以上代码实现了简单的收发消息的分离,现在又多了一点需求,目前所有给服务端发送消息的客户端,服务端都统一回复了相同的内容,现在要服务端实现Echo Server的功能,即客户端发送什么消息,服务端就给客户端回复什么消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 import socketimport selectsk = socket.socket() sk.bind(('127.0.0.1' , 1559 )) sk.listen(5 ) inputs = [sk] outputs = [] messages = {} """ messages = { socket_obj1: [msg] socket_obj2: [msg] } """ while True : rList, wList, e = select.select(inputs, outputs, [], 1 ) print("---" * 20 ) print("select当前监听inputs对象的数量>" , len(inputs), " | 发生变化的socket数量>" , len(rList)) print("select当前监听outputs对象的数量>" , len(outputs), " | 需要回复客户端消息的数量>" , len(wList)) for s in rList: if s == sk: conn, address = s.accept() inputs.append(conn) messages[conn] = [] conn.sendall(bytes('hello' , encoding='utf8' )) else : try : msg = s.recv(1024 ) if not msg: raise Exception('客户端已断开连接' ) else : outputs.append(s) messages[s].append(msg) except Exception as ex: inputs.remove(s) del messages[s] for s in wList: msg = messages[s].pop() s.sendall(msg) outputs.remove(s)
服务端做出以上修改,客户端不需要改变
11行 为了让一个客户端socket对象收消息和发消息产生关联,引入了一个新的全局变量messages
40行 在新客户端连接进来的时候,就预先为该socket对象在messages中创建对应的key
54行 在该对象中添加消息
64行 在客户端关闭连接收,清理该对象的消息列表
70行 将该对象在第40行插入的消息中取出来
总结 使用IO多路复用,实际上实现了类似并发效果的伪并发。内部实际使用了循环来高效的处理阻塞请求
Python中有一个select模块,其中提供了:select、poll、epoll三个方法,分别调用系统的 select,poll,epoll 从而实现IO多路复用。
Windows Python:提供: select
Mac Python:提供: select
Linux Python:提供: select、poll、epoll
注意:网络操作、文件操作、终端操作等均属于IO操作,对于windows只支持Socket操作,其他系统支持所有IO操作,但是无法检测普通文件操作自动上次读取是否已经变化
对于select方法:
1 2 3 4 5 6 7 8 9 10 11 12 句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超时时间) 参数: 可接受四个参数(前三个必须) 返回值:三个列表 select方法用来监视文件句柄,如果句柄发生变化,则获取该句柄。 1、当 参数1 序列中的句柄发生可读时(accetp和read),则获取发生变化的句柄并添加到 返回值1 序列中 2、当 参数2 序列中含有句柄时,则将该序列中所有的句柄添加到 返回值2 序列中 3、当 参数3 序列中的句柄发生错误时,则将该发生错误的句柄添加到 返回值3 序列中 4、当 超时时间 未设置,则select会一直阻塞,直到监听的句柄发生变化 当 超时时间 = 1时,那么如果监听的句柄均无任何变化,则select会阻塞 1 秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。
附加:select poll epoll的区别 IO多路复用是系统内核实现的,系统内部维护了一个for循环,一个一个地去检测对象是否有变化
首先需要明确一点的是,for循环的效率是不高的
IO多路复用种类
实现原理
监听对象个数
select
系统内部维护了一个for循环
1024
poll
系统内部维护了一个for循环
没有限制
epoll
句柄序列发生变化时自动通知epoll
没有限制