实例1 start

创建子线程并执行线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
import time

# 创建一个函数
def func(arg):
print('func start')
time.sleep(2)
print(arg)
print('func end')

# 创建一个线程
# target 指定让线程执行的函数
t = threading.Thread(target=func, args=('PolarSnow',))
t.start()

print('main end')

------------
func start
main end
PolarSnow
func end

上面的代码在主线程中创建了一个子线程去执行一个函数

执行这个函数最少需要2秒钟,而主线程直接去执行了print('end'),说明在默认情况下,主线程不会等待子线程直接结束再向下执行,而是主线程先向下执行,即使主线程所有代码已经执行完毕,也不会退出程序,而是等待所有子线程执行结束后再退出程序

实例2 setDaemon

设置主线程不等待子线程执行完毕

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading
import time

def func(arg):
print('func start')
time.sleep(2)
print(arg)
print('func end')

# 创建一个线程
# target 指定让线程执行的函数
t = threading.Thread(target=func, args=('PolarSnow',))
t.setDaemon(True) # 默认为False
t.start()

print('main end')

------------
func start
main end

在启动线程之前设置t.setDaemon(True)为True,意思是主线程不会等待所有的子线程执行完毕再退出程序,而是当主线程执行完毕后,强制退出程序,不管子线程有没有执行完毕

实例3 join

主线程等待子线程执行完毕后,再向下执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
import time

def func(arg):
print('func start')
time.sleep(2)
print(arg)
print('func end')

# 创建一个线程
# target 指定让线程执行的函数
t = threading.Thread(target=func, args=('PolarSnow',))
t.setDaemon(True) # 默认为False
t.start()
t.join() # 主线程停在这里
print('main end')

------------
func start
PolarSnow
func end
main end

使用了join方法之后,主线程会一直停在join方法那一行,直到所有的子线程都执行完毕后,再继续向下执行

实例4 join(1)

设置主线程再次等待子线程的最大时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading
import time

def func(arg):
print('func start')
time.sleep(2)
print(arg)
print('func end')

# 创建一个线程
# target 指定让线程执行的函数
t = threading.Thread(target=func, args=('PolarSnow',))
t.setDaemon(True) # 默认为False
t.start()
t.join(1) # 主线程停在这里
print('main end')

------------
func start
main end

设置为主线程在t.join这一行,最多等待1秒钟的时间,1秒钟之后,不管子线程有没有执行完毕都会继续向下执行

在Python中规定,每一个进程中只可以有一个线程在同一时刻被调用执行,GIL全局解释器锁,就是用来保障这一规则顺利被执行的组件

Python中可以利用多核CPU的是多进程,多个CPU可以同时处理多个进程的任务,但是,每个进程中同一时间只能处理一个线程

IO操作不占用CPU

  • 多线程的应用场景:IO操作
  • 多进程的应用场景:计算性操作

本篇socketserver源码阅读主要目的是熟悉Python中的继承模式

示例代码

1
2
3
4
5
6
7
8
9
10
import socketserver


class MyClass(socketserver.BaseRequestHandler):

def handle(self):
pass

obj = socketserver.ThreadingTCPServer(('127.0.0.1', 1559), MyClass)
obj.serve_forever()

阅读入口

1
obj = socketserver.ThreadingTCPServer(('127.0.0.1', 1559), MyClass)

从以上这一行代码中可以看出,执行了ThreadingTCPServer类中的__init__方法(实例化了这个类的对象,就从这里为入口点来阅读)

ThreadingTCPServer类图

step by step

step 1

首先,从实例化ThreadingTCPServer这个类的对象可以得知,是执行了这个对象的__init__构造方法,但是我们查看这个类的源码发现,只有一个pass

1
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

但是这个类继承了两个父类,那么必然是执行了父类的__init__方法。在查找父类__init__方法的时候,首先从左边的父类开始查找

step 2

ThreadingTCPServer继承的第一个父类中ThreadingMixIn,没有找到__init__构造方法,接着,Python会去ThreadingTCPServer继承的第二个父类总去查找

step 3

ThreadingTCPServer继承的第二个父类TCPServer找到了__init__构造方法并执行

1
2
3
4
5
6
7
8
9
10
11
12
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
"""Constructor. May be extended, do not override."""
BaseServer.__init__(self, server_address, RequestHandlerClass)
self.socket = socket.socket(self.address_family,
self.socket_type)
if bind_and_activate:
try:
self.server_bind()
self.server_activate()
except:
self.server_close()
raise

在这段构造方法中,前三个参数对应了示例代码中,创建ThreadingTCPServer对象时的三个参数

1
obj = socketserver.ThreadingTCPServer(('127.0.0.1', 1559), MyClass)
1
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
  • self —> obj
  • server_address —> (‘127.0.0.1’, 1559)
  • RequestHandlerClass —> MyClass

TCPServer的构造方法拿到这几个参数之后,紧接着又调用了父类的__init__构造方法

step 3.1

TCPServer的父类BaseServer中,__init__构造方法执行了以下内容

1
2
3
4
5
6
def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
self.server_address = server_address
self.RequestHandlerClass = RequestHandlerClass
self.__is_shut_down = threading.Event()
self.__shutdown_request = False

创建了两个公有变量和两个私有变量,并且

  • RequestHandlerClass —> MyClass

step 4

TCPServer在执行完父类的构造方法封装了四个字段后,继续向下执行

1
2
self.socket = socket.socket(self.address_family,
self.socket_type)

创建了一个socket对象

1
2
3
4
5
6
7
if bind_and_activate:
try:
self.server_bind()
self.server_activate()
except:
self.server_close()
raise

step 4.1

1
2
3
4
5
6
7
8
9
10
def server_bind(self):
"""Called by constructor to bind the socket.

May be overridden.

"""
if self.allow_reuse_address:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address)
self.server_address = self.socket.getsockname()

绑定了IP和端口号,self.server_address的值是在执行父类BaseServer的构造方法时创建的

step 4.2

1
2
3
4
5
6
7
def server_activate(self):
"""Called by constructor to activate the server.

May be overridden.

"""
self.socket.listen(self.request_queue_size)

监听socket

至此,TCPServer中的__init__构造方法执行完毕,构造方法执行完毕,就意味着示例代码中的

1
obj = socketserver.ThreadingTCPServer(('127.0.0.1', 1559), MyClass)

这一句就执行完毕了

step 5

接下来测试代码走到第10行

1
obj.serve_forever()

ThreadingTCPServer首先在自己的代码块中查找,没有

根据继承的规则,再去左边的父类ThreadingMixIn中查找,没有

之后再去右边的父类TCPServer中查找,没有

最后在TCPServer的父类BaseServer中找到了serve_forever方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def serve_forever(self, poll_interval=0.5):
"""Handle one request at a time until shutdown.

Polls for shutdown every poll_interval seconds. Ignores
self.timeout. If you need to do periodic tasks, do them in
another thread.
"""
self.__is_shut_down.clear()
try:
# XXX: Consider using another file descriptor or connecting to the
# socket to wake this up instead of polling. Polling reduces our
# responsiveness to a shutdown request and wastes cpu at all other
# times.
with _ServerSelector() as selector:
selector.register(self, selectors.EVENT_READ)

while not self.__shutdown_request:
ready = selector.select(poll_interval) # IO多路复用
if ready: # 如果有新的客户端连接,ready就会有值
self._handle_request_noblock()

self.service_actions()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()

step 5.1

1
ready = selector.select(poll_interval)

IO多路复用,这里没有使用self来调,而是使用selector对象来调select方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def select(self, timeout=None):
timeout = None if timeout is None else max(timeout, 0)
ready = []
try:
r, w, _ = self._select(self._readers, self._writers, [], timeout)
except InterruptedError:
return ready
r = set(r)
w = set(w)
for fd in r | w:
events = 0
if fd in r:
events |= EVENT_READ
if fd in w:
events |= EVENT_WRITE

key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready

step 5.2

在有客户端连接进来的时候,ready就有有值,为真,会执行self._handle_request_noblock方法

因为是self.来调用的,根据继承规则,就要回到最开始的类中,再逐级向上查找,所以回到ThreadingTCPServer中查找,没有

根据继承的规则,再去左边的父类ThreadingMixIn中查找,没有

之后再去右边的父类TCPServer中查找,没有

最后在TCPServer的父类BaseServer中找到了_handle_request_noblock方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def _handle_request_noblock(self):
"""Handle one request, without blocking.

I assume that selector.select() has returned that the socket is
readable before this function was called, so there should be no risk of
blocking in get_request().
"""
try:
request, client_address = self.get_request() # 获取到客户端连接对象socket,和客户端地址
except OSError:
return
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except:
self.handle_error(request, client_address)
self.shutdown_request(request)

step 5.3

1
request, client_address = self.get_request()

根据类图查找,在TCPServer类中

1
2
3
4
5
6
7
def get_request(self):
"""Get the request and client address from the socket.

May be overridden.

"""
return self.socket.accept()

接受客户端连接请求

step 6

_handle_request_noblock方法中又执行了self.process_request方法

回到ThreadingTCPServer中查找,没有

根据继承的规则,在左边的父类ThreadingMixIn中找到该方法

1
2
3
4
5
6
def process_request(self, request, client_address):
"""Start a new thread to process the request."""
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))
t.daemon = self.daemon_threads
t.start()

在这个方法中,使用了多线程去执行self.process_request_thread方法

1
2
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))

使用了self调用,还是要从头查找

step 7

ThreadingMixIn类中找到

1
2
3
4
5
6
7
8
9
10
11
12
def process_request_thread(self, request, client_address):
"""Same as in BaseServer but as a thread.

In addition, exception handling is done here.

"""
try:
self.finish_request(request, client_address)
self.shutdown_request(request)
except:
self.handle_error(request, client_address)
self.shutdown_request(request)

这个方法应该是用来接收客户端发来的请求以及给客户端发送消息

step 8

1
self.finish_request(request, client_address)

根据类图,在BaseServer中找到

1
2
3
def finish_request(self, request, client_address):
"""Finish one request by instantiating RequestHandlerClass."""
self.RequestHandlerClass(request, client_address, self)

step3中已经封装了RequestHandlerClass = MyClass

相当于执行了MyClass的构造方法

1
MyClass(request, client_address, self)

step 9

MyClass中没有定义构造方法,在MyClass继承的父类socketserver.BaseRequestHandler中查找,找到了构造方法

1
2
3
4
5
6
7
8
9
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()

执行了self.handle()

step 10

执行self.handle()方法时,就会执行MyClass中定义的handle方法

在示例代码中,我们已经自定义了一个方法去实现与客户端的交互,

下载软件

http://oss.20150509.cn/exfat-utils-1.0.1-2.el6.src.rpm

http://oss.20150509.cn/fuse-exfat-1.0.1-2.el6.src.rpm

编译安装

1
2
3
4
5
6
7
8
9
10
> yum install scons gcc  # 编译安装需要系统有这两个软件的支持
> yum install fuse-devel # fuse模块,编译需要fuse-devel包支持
> rpm -ivh fuse-exfat-1.0.1-2.el6.src.rpm exfat-utils-1.0.1-2.el6.src.rpm>
>
> cd ~/rpmbuild/SPECS
> rpmbuild -ba fuse-exfat.spec
> rpmbuild -ba exfat-utils.spec
>
> cd ~/rpmbuild/RPMS/x86_64
> rpm -ivh fuse-exfat-1.0.1-2.el6.x86_64.rpm exfat-utils-1.0.1-2.el6.x86_64.rpm

挂载

1
2
3
> mount.exfat /dev/sdx /mnt
> # or
> mount -t exfat /dev/sdx /mnt

如果指定-t找不到此类型的话执行:ln -s /usr/sbin/mount.exfat /sbin/mount.exfat

IO操作是不占用CPU的,IO多路复用,是要管理起所有的IO操作。IO多路复用的典型场景是监听socket对象内部是否发生了变化

socket内部什么时候会有变化:

  • 建立连接
  • 发送消息

socket实例

这里使用socket实现一个简单的Echo Server的功能

  • server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import socket

# 创建socket对象,绑定IP端口,监听
sk = socket.socket()
sk.bind(('127.0.0.1', 1559))
sk.listen(5)

# 循环接受每一个连接池中的连接
while True:
# 接受客户端连接
conn, address = sk.accept()
# 向客户端发送欢迎消息
conn.sendall(bytes('hello', encoding='utf8'))

# 进入到收发消息的循环中
while True:

# Windows客户端在异常断开后抛出异常,这里是处理Windows的断开情况
try:
recv = conn.recv(1024)

# Linux客户端断开recv会是空值,这里处理Linux的断开情况
if not recv:
break

# 这里处理客户端主动发出断开请求的情况
if str(recv, encoding='utf-8') == 'exit':
break
except Exception as ex:
break

# 向客户端发送数据
conn.sendall(recv)
  • client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import socket

sk = socket.socket()
sk.connect(('127.0.0.1', 1559))
# 接收欢迎消息
data = sk.recv(1024)
print(data)

while True:
i = input("> ")
# 向服务端发送消息
sk.sendall(bytes(i, encoding='utf8'))

# 接收服务端发来的消息
msg = sk.recv(1024)
print(str(msg, encoding='utf-8'))

sk.close()

以上的socket代码同一时间仅能处理一个客户端的请求,之后连接上来的客户端在第一个客户端还没有断开的时候,会一直等待,直到上一个客户端的请求断开

select.selext()中的第一个参数

建立连接

上面说到,一种socket会变的情况是建立连接

上面的代码中,涉及到建立连接的socket是sk对象(变量)sk对象在执行到sk.accept()时,接受了一个新客户端的连接请求时候,socket内部就发生了变化,我们就需要监听这种变化,用以分辨出新的客户端的连接

创建socket,绑定并监听之后socket一般不会发生变化,只有当有新的客户端连接进来的时候,socket才会发生变化,我们需要监听的也是这个阶段的变化

得出结论:当socket被创建、绑定并监听之后发生变化,就是有新的客户端进行连接

  • server.py
1
2
3
4
5
6
7
8
9
10
11
import socket
import select

# 创建socket对象,绑定IP端口,监听
sk = socket.socket()
sk.bind(('127.0.0.1', 1559))
sk.listen(5)

while True:
rList, w, e = select.select([sk,], [], [], 1)
print(rList)

上面代码引入了IO多路复用中的select模型,使用select.select()方法,会返回一个有三个元素的元祖

在select.select()方法中的第一个参数,暂时只添加了一个服务端的socket对象,只要服务端socket对象sk有变化(新的客户端连接)就立即把变化的socket对象加入到rList列表中

监听的socket列表中,sk对象有变化 —> rList = [sk]

监听的socket列表中,没有socket发生变化 —> rList = []

以上代码跑起来的效果:

1
2
3
4
5
6
7
[]
[]
[]
...
# 每秒打印一个[]
# select.sekect()中的第四个参数起了作用
# 超时时间,监听的对象没有发生变化的时候,多少秒循环一次

说明没有新的连接请求,下面将试验有客户端连接产生的情况

  • client.py
1
2
3
4
5
import socket

sk = socket.socket()
sk.connect(('127.0.0.1', 1559))
sk.close()

客户端一旦连接到服务端,服务端的回显如下

1
2
3
4
5
[<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1559)>]
[<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1559)>]
[<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1559)>]
...
# 疯狂的快速打印...

从上面可以看出,有一个客户端连接进来了。服务端的sk对象内部发生了变化,当select监听到sk对象发生变化后,立即将发生变化的对象赋值给了rList列表(append到列表)从打印出来的内容中可以看出,列表中的元素是发生变化的socket对象

处理rList(服务端socket)

rList中保存了所有发生变化的socket对象,以上代码中只监听了服务端socket对象,这里暂时只讨论服务端socket变化的情况

  • server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import socket
import select

# 创建socket对象,绑定IP端口,监听
sk = socket.socket()
sk.bind(('127.0.0.1', 1559))
sk.listen(5)

while True:
# 监听服务端socket对象sk
rList, w, e = select.select([sk,], [], [], 1)
print(rList)

# 遍历rList中的每一个socket对象
# 目前rList中只会出现服务端的socket对象
for s in rList:
conn, address = s.accept()
conn.sendall(bytes('hello', encoding='utf8'))
  • client.py
1
2
3
4
5
6
7
8
9
10
11
import socket

sk = socket.socket()
sk.connect(('127.0.0.1', 1559))
data = sk.recv(1024)
print(data)

while True:
input("> ")

sk.close()

连续运行三个客户端连接时,服务端的回显:

1
2
3
4
5
6
7
8
9
[]
[]
[<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1559)>]
[]
[<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1559)>]
[]
[<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1559)>]
[]
[]

每个客户端的回显:

1
2
b'hello'
>

以上通过对rList中服务端socket对象执行accept()方法,来实现了一个类似并发连接的效果,每一个连接进来的客户端都会被服务端接受请求,“同时”提供服务

接收客户端消息

上面提到,不仅创建连接会触发socket的变化,与客户端连接建立后,客户端发来消息,也会引发客户端socket连接的内部变化

  • server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import socket
import select

# 创建socket对象,绑定IP端口,监听
sk = socket.socket()
sk.bind(('127.0.0.1', 1559))
sk.listen(5)

inputs = [sk]
while True:
rList, w, e = select.select(inputs, [], [], 1)
print(rList)

for s in rList:
conn, address = s.accept()
# conn也是一个socket对象
# 当服务端socket接收到客户的请求后,会分配一个新的socket对象专门用来和这个客户端进行连接通信

# 当服务端分配新的socket对象给新连接进来的客户端的时候
# 我们也需要监听这个客户端的socket对象是否会发生变化
# 一旦发生变化,意味着客户端向服务器端发来了消息
inputs.append(conn)
conn.sendall(bytes('hello', encoding='utf8'))

在上面的代码中,我把服务端socket为新客户端创建的socket也加入到了监听列表中,那么如果有客户端发来消息,select监听到客户端socket(conn)发生变化并加入到rList列表中后,在for循环处理中,客户端的socket并没有accept()方法,而且也不要这个方法,这就需要在for循环中对两类socket区分对待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import socket
import select

# 创建socket对象,绑定IP端口,监听
sk = socket.socket()
sk.bind(('127.0.0.1', 1559))
sk.listen(5)

inputs = [sk]
while True:
rList, w, e = select.select(inputs, [], [], 1)
print("select当前监听socket对象的数量>", len(inputs), " | 发生变化的socket数量>", len(rList))

for s in rList:
# 判断socket对象如果是服务端的socket对象的话
if s == sk:
conn, address = s.accept()
# conn也是一个socket对象
# 当服务端socket接收到客户的请求后,会分配一个新的socket对象专门用来和这个客户端进行连接通信

# 当服务端分配新的socket对象给新连接进来的客户端的时候
# 我们也需要监听这个客户端的socket对象是否会发生变化
# 一旦发生变化,意味着客户端向服务器端发来了消息
inputs.append(conn)
conn.sendall(bytes('hello', encoding='utf8'))
# 其他的就都是客户端的socket对象了
else:
# 意味着客户端给服务端发送消息了
msg = s.recv(1024)
print(msg)
  • client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import socket

sk = socket.socket()
sk.connect(('127.0.0.1', 1559))
# 接收欢迎消息
data = sk.recv(1024)
print(data)

while True:
i = input("> ")
# 向服务端发送消息
sk.sendall(bytes(i, encoding='utf8'))

sk.close()

当我运行一个客户端,并Ctrl+C退出时,服务端回显界面在疯狂的打印消息。问题出在了服务端监听的客户端socket连接,当客户端与服务端断开连接时,应在服务端select监听socket对象列表中将该客户端socket对象移除

  • server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import socket
import select

# 创建socket对象,绑定IP端口,监听
sk = socket.socket()
sk.bind(('127.0.0.1', 1559))
sk.listen(5)

inputs = [sk]
while True:
rList, w, e = select.select(inputs, [], [], 1)
print("select当前监听socket对象的数量>", len(inputs), " | 发生变化的socket数量>", len(rList))

for s in rList:
# 判断socket对象如果是服务端的socket对象的话
if s == sk:
conn, address = s.accept()
# conn也是一个socket对象
# 当服务端socket接收到客户的请求后,会分配一个新的socket对象专门用来和这个客户端进行连接通信

# 当服务端分配新的socket对象给新连接进来的客户端的时候
# 我们也需要监听这个客户端的socket对象是否会发生变化
# 一旦发生变化,意味着客户端向服务器端发来了消息
inputs.append(conn)
conn.sendall(bytes('hello', encoding='utf8'))
# 其他的就都是客户端的socket对象了
else:
try:
# 意味着客户端给服务端发送消息了
msg = s.recv(1024)

# Linux平台下的处理
if not msg:
raise Exception('客户端已断开连接')
print(msg)
except Exception as ex:
# Windows平台下的处理
inputs.remove(s)

使用最新的server.py与client.py进行测试时,依次运行多个客户端,再依次关闭多个客户端,服务端的回显如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
select当前监听socket对象的数量> 1  | 发生变化的socket数量> 0
select当前监听socket对象的数量> 1 | 发生变化的socket数量> 0
select当前监听socket对象的数量> 1 | 发生变化的socket数量> 0
select当前监听socket对象的数量> 1 | 发生变化的socket数量> 0
select当前监听socket对象的数量> 1 | 发生变化的socket数量> 1
select当前监听socket对象的数量> 2 | 发生变化的socket数量> 0
select当前监听socket对象的数量> 2 | 发生变化的socket数量> 0
select当前监听socket对象的数量> 2 | 发生变化的socket数量> 1
select当前监听socket对象的数量> 3 | 发生变化的socket数量> 0
select当前监听socket对象的数量> 3 | 发生变化的socket数量> 0
select当前监听socket对象的数量> 3 | 发生变化的socket数量> 1
select当前监听socket对象的数量> 4 | 发生变化的socket数量> 0
select当前监听socket对象的数量> 4 | 发生变化的socket数量> 0
select当前监听socket对象的数量> 4 | 发生变化的socket数量> 0
select当前监听socket对象的数量> 4 | 发生变化的socket数量> 0
select当前监听socket对象的数量> 4 | 发生变化的socket数量> 1
b''
select当前监听socket对象的数量> 3 | 发生变化的socket数量> 0
select当前监听socket对象的数量> 3 | 发生变化的socket数量> 0
select当前监听socket对象的数量> 3 | 发生变化的socket数量> 1
b''
select当前监听socket对象的数量> 2 | 发生变化的socket数量> 0
select当前监听socket对象的数量> 2 | 发生变化的socket数量> 1
b''
select当前监听socket对象的数量> 1 | 发生变化的socket数量> 0
select当前监听socket对象的数量> 1 | 发生变化的socket数量> 0
select当前监听socket对象的数量> 1 | 发生变化的socket数量> 0
select当前监听socket对象的数量> 1 | 发生变化的socket数量> 0

特别注意:这里的服务端定义,当我收到客户端发来的空值的时候,我就默认认为客户端主动需要断开与服务端的连接。由于服务端的这个默认规则,在写客户端的时候,一定要注意处理客户端输入的值为空的情况

给客户端回复消息

  • server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import socket
import select

# 创建socket对象,绑定IP端口,监听
sk = socket.socket()
sk.bind(('127.0.0.1', 1559))
sk.listen(5)

inputs = [sk]
while True:
rList, w, e = select.select(inputs, [], [], 1)
print("select当前监听socket对象的数量>", len(inputs), " | 发生变化的socket数量>", len(rList))

for s in rList:
# 判断socket对象如果是服务端的socket对象的话
if s == sk:
conn, address = s.accept()
# conn也是一个socket对象
# 当服务端socket接收到客户的请求后,会分配一个新的socket对象专门用来和这个客户端进行连接通信

# 当服务端分配新的socket对象给新连接进来的客户端的时候
# 我们也需要监听这个客户端的socket对象是否会发生变化
# 一旦发生变化,意味着客户端向服务器端发来了消息
inputs.append(conn)
conn.sendall(bytes('hello', encoding='utf8'))
# 其他的就都是客户端的socket对象了
else:
try:
# 意味着客户端给服务端发送消息了
msg = s.recv(1024)

# Linux平台下的处理
if not msg:
raise Exception('客户端已断开连接')
print(msg)

# 向客户端回复消息
# 这种写法是完全可以的,但是缺点是读写都混在了一起
s.sendall(msg)
except Exception as ex:
# Windows平台下的处理
inputs.remove(s)

但是一般情况下,会做读写分离,可以通过select,实现读写分离(收发分离)

select.selext()中的第二个参数

1
rList, wList, e = select.select([], [], [], 1)

select.select()的第二个参数有什么值,wList中就会有什么值

利用select的这个特性,可以把需要回复消息的客户端socket对象赋值给select的第二个参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import socket
import select

# 创建socket对象,绑定IP端口,监听
sk = socket.socket()
sk.bind(('127.0.0.1', 1559))
sk.listen(5)

inputs = [sk]
outputs = []
while True:
rList, wList, e = select.select(inputs, outputs, [], 1)
print("---" * 20)
print("select当前监听inputs对象的数量>", len(inputs), " | 发生变化的socket数量>", len(rList))
print("select当前监听outputs对象的数量>", len(outputs), " | 需要回复客户端消息的数量>", len(wList))

# 遍历rList(建立连接和接收数据)
for s in rList:
# 判断socket对象如果是服务端的socket对象的话
if s == sk:
conn, address = s.accept()
# conn也是一个socket对象
# 当服务端socket接收到客户的请求后,会分配一个新的socket对象专门用来和这个客户端进行连接通信

# 当服务端分配新的socket对象给新连接进来的客户端的时候
# 我们也需要监听这个客户端的socket对象是否会发生变化
# 一旦发生变化,意味着客户端向服务器端发来了消息
inputs.append(conn)
conn.sendall(bytes('hello', encoding='utf8'))
# 其他的就都是客户端的socket对象了
else:
try:
# 意味着客户端给服务端发送消息了
msg = s.recv(1024)

# Linux平台下的处理
if not msg:
raise Exception('客户端已断开连接')
else:
outputs.append(s)
print(msg)

# 向客户端回复消息
# 这种写法是完全可以的,但是缺点是读写都混在了一起
# s.sendall(msg)
except Exception as ex:
# Windows平台下的处理
inputs.remove(s)

# 遍历wList(遍历给服务端发送过消息的客户端)
for s in wList:

# 给所有的客户端统一回复内容
s.sendall(bytes('server response', encoding='utf8'))

# 回复完成后,一定要将outputs中该socket对象移除
outputs.remove(s)

wList = 所有给服务端发送消息的客户端,也是需要回复消息客户端列表

  • client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import socket

sk = socket.socket()
sk.connect(('127.0.0.1', 1559))
# 接收欢迎消息
data = sk.recv(1024)
print(data)

while True:
i = input("> ")
# 向服务端发送消息
sk.sendall(bytes(i, encoding='utf8'))

# 接收服务端发来的消息
msg = sk.recv(1024)
print(str(msg, encoding='utf-8'))

sk.close()

执行过程:

  • 依次连接三个客户端
  • 第一个客户端依次向服务发送了两次消息

服务端的回显:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
------------------------------------------------------------
select当前监听inputs对象的数量> 1 | 发生变化的socket数量> 0
select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0
------------------------------------------------------------
select当前监听inputs对象的数量> 1 | 发生变化的socket数量> 0
select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0
------------------------------------------------------------
select当前监听inputs对象的数量> 2 | 发生变化的socket数量> 0
select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0
------------------------------------------------------------
select当前监听inputs对象的数量> 2 | 发生变化的socket数量> 1
select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0
------------------------------------------------------------
select当前监听inputs对象的数量> 3 | 发生变化的socket数量> 0
select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0
------------------------------------------------------------
select当前监听inputs对象的数量> 3 | 发生变化的socket数量> 1
select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0
------------------------------------------------------------
select当前监听inputs对象的数量> 4 | 发生变化的socket数量> 0
select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0
------------------------------------------------------------
select当前监听inputs对象的数量> 4 | 发生变化的socket数量> 1
select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0
b'ps'
------------------------------------------------------------
select当前监听inputs对象的数量> 4 | 发生变化的socket数量> 0
select当前监听outputs对象的数量> 1 | 需要回复客户端消息的数量> 1
------------------------------------------------------------
select当前监听inputs对象的数量> 4 | 发生变化的socket数量> 0
select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0
------------------------------------------------------------
select当前监听inputs对象的数量> 4 | 发生变化的socket数量> 1
select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0
b'ps'
------------------------------------------------------------
select当前监听inputs对象的数量> 4 | 发生变化的socket数量> 0
select当前监听outputs对象的数量> 1 | 需要回复客户端消息的数量> 1
------------------------------------------------------------
select当前监听inputs对象的数量> 4 | 发生变化的socket数量> 0
select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0
------------------------------------------------------------
select当前监听inputs对象的数量> 3 | 发生变化的socket数量> 0
select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0
------------------------------------------------------------
select当前监听inputs对象的数量> 3 | 发生变化的socket数量> 0
select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0
------------------------------------------------------------
select当前监听inputs对象的数量> 2 | 发生变化的socket数量> 0
select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0
------------------------------------------------------------
select当前监听inputs对象的数量> 2 | 发生变化的socket数量> 1
select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0
------------------------------------------------------------
select当前监听inputs对象的数量> 1 | 发生变化的socket数量> 0
select当前监听outputs对象的数量> 0 | 需要回复客户端消息的数量> 0

第一个客户端的回显:

1
2
3
4
5
6
b'hello'
> ps
server response
> ps
server response
>

以上代码实现了简单的收发消息的分离,现在又多了一点需求,目前所有给服务端发送消息的客户端,服务端都统一回复了相同的内容,现在要服务端实现Echo Server的功能,即客户端发送什么消息,服务端就给客户端回复什么消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import socket
import select

# 创建socket对象,绑定IP端口,监听
sk = socket.socket()
sk.bind(('127.0.0.1', 1559))
sk.listen(5)

inputs = [sk]
outputs = []
messages = {}

"""
messages = {
socket_obj1: [msg]
socket_obj2: [msg]
}
"""

while True:
rList, wList, e = select.select(inputs, outputs, [], 1)
print("---" * 20)
print("select当前监听inputs对象的数量>", len(inputs), " | 发生变化的socket数量>", len(rList))
print("select当前监听outputs对象的数量>", len(outputs), " | 需要回复客户端消息的数量>", len(wList))

# 遍历rList(建立连接和接收数据)
for s in rList:
# 判断socket对象如果是服务端的socket对象的话
if s == sk:
conn, address = s.accept()
# conn也是一个socket对象
# 当服务端socket接收到客户的请求后,会分配一个新的socket对象专门用来和这个客户端进行连接通信

# 当服务端分配新的socket对象给新连接进来的客户端的时候
# 我们也需要监听这个客户端的socket对象是否会发生变化
# 一旦发生变化,意味着客户端向服务器端发来了消息
inputs.append(conn)

# 在messages中为该对象创建key
messages[conn] = []

conn.sendall(bytes('hello', encoding='utf8'))
# 其他的就都是客户端的socket对象了
else:
try:
# 意味着客户端给服务端发送消息了
msg = s.recv(1024)

# Linux平台下的处理
if not msg:
raise Exception('客户端已断开连接')
else:
outputs.append(s)
messages[s].append(msg)

# 向客户端回复消息
# 这种写法是完全可以的,但是缺点是读写都混在了一起
# s.sendall(msg)
except Exception as ex:
# Windows平台下的处理
inputs.remove(s)

# 在客户端断开连接后,相对应的该客户端的messages中的信息也需要删除
del messages[s]

# 遍历wList(遍历给服务端发送过消息的客户端)
for s in wList:

# 在该客户端连接对象的messages信息中取出一个进行回复
msg = messages[s].pop()

# 根据客户端发来的消息进行回复
s.sendall(msg)

# 回复完成后,一定要将outputs中该socket对象移除
outputs.remove(s)

服务端做出以上修改,客户端不需要改变

  • 11行 为了让一个客户端socket对象收消息和发消息产生关联,引入了一个新的全局变量messages
  • 40行 在新客户端连接进来的时候,就预先为该socket对象在messages中创建对应的key
  • 54行 在该对象中添加消息
  • 64行 在客户端关闭连接收,清理该对象的消息列表
  • 70行 将该对象在第40行插入的消息中取出来

总结

使用IO多路复用,实际上实现了类似并发效果的伪并发。内部实际使用了循环来高效的处理阻塞请求

Python中有一个select模块,其中提供了:select、poll、epoll三个方法,分别调用系统的 select,poll,epoll 从而实现IO多路复用。

  • Windows Python:提供: select
  • Mac Python:提供: select
  • Linux Python:提供: select、poll、epoll

注意:网络操作、文件操作、终端操作等均属于IO操作,对于windows只支持Socket操作,其他系统支持所有IO操作,但是无法检测普通文件操作自动上次读取是否已经变化

对于select方法:

1
2
3
4
5
6
7
8
9
10
11
12
句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超时时间)


参数: 可接受四个参数(前三个必须)
返回值:三个列表

select方法用来监视文件句柄,如果句柄发生变化,则获取该句柄。
1、当 参数1 序列中的句柄发生可读时(accetp和read),则获取发生变化的句柄并添加到 返回值1 序列中
2、当 参数2 序列中含有句柄时,则将该序列中所有的句柄添加到 返回值2 序列中
3、当 参数3 序列中的句柄发生错误时,则将该发生错误的句柄添加到 返回值3 序列中
4、当 超时时间 未设置,则select会一直阻塞,直到监听的句柄发生变化
当 超时时间 = 1时,那么如果监听的句柄均无任何变化,则select会阻塞 1 秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。

附加:select poll epoll的区别

IO多路复用是系统内核实现的,系统内部维护了一个for循环,一个一个地去检测对象是否有变化

首先需要明确一点的是,for循环的效率是不高的

IO多路复用种类 实现原理 监听对象个数
select 系统内部维护了一个for循环 1024
poll 系统内部维护了一个for循环 没有限制
epoll 句柄序列发生变化时自动通知epoll 没有限制

Python中2.7与3.5的继承略有区别, 在Python2.7中的类可以分为经典类(默认就是经典类)和新式类(继承了object类的就是新式类),而在Python3.5中,所有的类默认继承object类

  • Python2.7的类如果继承了object类,那么多继承向上寻找的路径和Python3.5是一样的,图中橙色的寻找路径

  • 但是如果Python2.7没有继承object类,那么就是图中蓝色的寻找路径

更多关于继承的内容可以参考以下文章:

http://docs.20150509.cn/2016/06/28/Python面向对象基础篇之继承/

Python的变量作用域中分为四个级别,简称为:BGEL,作用域的级别依次升高,级别最高的是Local,如果该变量在Local中已经声明并赋值,将优先使用Local中的变量对应的值

Python变量作用域的四种情况:

  • B:build-in 系统固定模块里面的变量,也叫系统变量,比如int
  • G:global 全局变量,也就是模块级别定义的变量
  • E:enclosing 嵌套的父级函数的局部作用域,就是包含此函数的上层函数的局部作用域
  • L:local 局部作用域,即为函数中定义的变量

E和L是相对的,E中的变量相对上层来说也是L

Python的变量作用域可以用下图表示:

一个变量使用哪个域中定义的值,取决于它的相对位置

从上图中看,如果从Local的位置向上看,最先看到的是Local中的变量,其次是Enclosing中的变量,再次是Global中的变量,最后才是Build-in变量

变量的取值取决于你在哪个位置,比如你在E和L中间,那同名的这个变量肯定是会向前看,取E中的值

Python作用域的产生

在Python中,没有块级作用域。只有模块(module),类(class)以及函数(def,lambda)才会引入新的作用域

1
2
3
4
if True:
name = "PolarSnow"

print(name)

以上代码是可以正常运行的,但是在Java(有块级作用域)中运行就会报,变量未定义的错误

1
2
3
4
5
def func():
name = "Polarsnow"

func()
print(name)

以上代码执行就会报错,name变量未定义

Python变量作用域

L(Local)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int = 7

def func2():
int = 6
def func():
int = 5
print(int)
return func

f = func2()
f()

------------
5

LEG都对int变量自定义了值,但是最后取得是L中的值

E(Enclosing)

1
2
3
4
5
6
7
8
9
10
11
12
13
int = 7

def func2():
int = 6
def func():
print(int)
return func

f = func2()
f()

------------
6

在local中取值,但是local中没有,就会去E里面找

G(Global)

1
2
3
4
5
6
7
8
9
10
11
12
int = 7

def func2():
def func():
print(int)
return func

f = func2()
f()

------------
7

L和E中都没有找到int的值,继续向上找,在G中找到了int的值

B(Build-in)

1
2
3
4
5
6
7
8
9
10
def func2():
def func():
print(int)
return func

f = func2()
f()

------------
<class 'int'>

int变量的值在LEG中都没有找到,但是int是内建变量,在系统中已经对其有定义,最后在B中,找到了int的值

如果一个变量在local中查找,查找到B中还没有找到,就会报变量未定义的错误

global & nonlocal 关键字

global

函数内部可以访问全局变量中的值,但是不能修改全局变量的值。如果需要修改,要加上global关键字

1
2
3
4
5
6
7
8
9
name = "ps"

def readonly():
print("inner --->", name)

readonly()

------------
ps

函数内部是可以访问全局变量的值的,原则 –> GBEL

1
2
3
4
5
6
7
8
9
10
11
name = "ps"

def readonly():
global name
name = "PolarSnow"

readonly()
print(name)

------------
PolarSnow

加上了global关键字,就可以修改全局变量的值

nonlocal

global关键字声明的变量必须在全局作用域上,不能嵌套作用域上,当要修改嵌套作用域(enclosing作用域,外层非全局作用域)中的变量就需要nonlocal关键字了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def outer():
count = 10
def inner():
nonlocal count
count = 20
print("inner ---> ", count)
inner()
print("outer --->", count)

outer()

------------
20
20

变量作用域中的疑难杂症

1
2
3
4
5
6
7
8
9
10
name = "123"

def func1():
print(name)

def func2():
name = "456"
func1()

func2()

最后会输出什么结果?

结果是: 123

1
2
3
4
5
6
7
8
9
10
11
name = "123"

def func1():
print(name)

def func2():
name = "456"
return func1

ret = func2()
ret()

结果是: 123

上面两段代码的结果都是全局变量中的值123,这里需要特别注意的是,变量作用域在函数执行之前就已经确定了!

Python作为解释型的语言,代码从上至下执行,遇到def时,就将其函数体保存在内存的堆区,把对应的函数名保存在栈区并指向堆区的函数体,在函数执行之前,这个函数中的变量作用域就已经被记录到内存中了,不会因为后期的调用或嵌套而更改


1
2
3
4
5
6
7
8
9
10
11
l = [x + 1 for x in range(10) if x > 5]
print("display list --->", l)
print("first element --->", l[0])

print("---" * 5)

ll = [lambda :x for x in range(10) if x > 5]
print("display list --->", ll)

ret = ll[0]()
print("first element --->", ret)

结果是:

1
2
3
4
5
display list ---> [7, 8, 9, 10]
first element ---> 7
---------------
display list ---> [<function <listcomp>.<lambda> at 0x101b7b730>, <function <listcomp>.<lambda> at 0x101b7b7b8>, <function <listcomp>.<lambda> at 0x101b7b840>, <function <listcomp>.<lambda> at 0x101b7b8c8>]
first element ---> 9

问题来了,问什么使用了lambda之后,第一个元素变成9了呢?

我们把这个问题拆解一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
lll = []
for x in range(10):
if x > 5:
lll.append(x + 1)
print(lll)
print(lll[0])

llll = []
for x in range(10):
if x > 5:
def func():
return x
llll.append(func)
print(llll)
print(llll[0]())

------------
[7, 8, 9, 10]
7
[<function func at 0x10137b620>, <function func at 0x10137b6a8>, <function func at 0x10137b730>, <function func at 0x10137b7b8>]
9

为什么 x+1 时第一个元素 = 7, 而 lambda :x 时第一个元素 = 9了呢

原因就是Python程序在解释道lambda的时候,并没有执行里面的代码,而是直接放到的了内存中,随着外侧循环的结束,x的值已经变成了9,此时再把内存里保存的lambda函数拿出来执行,x变量获取到的就是当前已经变成9的这个值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
lllll = []
for x in range(10):
if x > 5:
def func(i = x):
return i
lllll.append(func)
print(lllll)
print(lllll[0]())
print(lllll[1]())
print(lllll[2]())
print(lllll[3]())

------------
[<function func at 0x101b7b620>, <function func at 0x101b7b6a8>, <function func at 0x101b7b730>, <function func at 0x101b7b7b8>]
6
7
8
9

上面的代码仅仅小修改了一下,在func中执行一条赋值语句,结果就立刻不一样了,这次列表的第一个值变成了6

之前不是说代码运行到def就直接保存到内存,不执行了吗?没错,函数体是没有被执行,但是函数的参数是个赋值表达式,被Python解释器执行了,获取到了每个循环的x的值,并赋值给函数自己内部的变量(local)

当程序执行完毕,运行列表中第一个函数取值的时候,该函数取到的是函数内部的变量i,所以最后的结果和上面是截然不同的

在Python2.x中需要import SocketServer,在Python3.x中,都变成了小写,需要import sockerserver

Python Version: 3.5+

  • server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import socketserver

# 自定义socketserver需要继承socketserver.BaseRequestHandler
class MyServer(socketserver.BaseRequestHandler):

# 每一个对象实例化的时候,都会执行handle里面的代码
# 自定义的handle方法会重写父类的handle方法,主要用来与客户端进行交互
def handle(self):
conn = self.request
# 这条欢迎消息会在每个客户端连上之后发送
conn.sendall(bytes("Welcome to docs.20150509.cn", encoding='utf8'))
while True:
recv_data = conn.recv(1024)
if len(recv_data) == 0:break
conn.sendall(recv_data.upper())

if __name__ == "__main__":
server = socketserver.ThreadingTCPServer(('127.0.0.1', 5959), MyServer)
# 每多一个客户端的连接,就睡多起一个线程去处理
server.serve_forever()
  • client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import socket

ip_port = ('127.0.0.1', 5959)
s = socket.socket()
s.connect(ip_port)

# 首先接收服务端发来的欢迎消息
welcome_msg = s.recv(1024)
print(str(welcome_msg, encoding='utf-8'))

while True:
# 输入消息
sent_data = input("> ").strip()
if len(sent_data) == 0: continue
# 发送消息
s.send(bytes(sent_data, encoding='utf8'))

# 接收消息
recv_data = s.recv(1024)
print(str(recv_data, encoding='utf-8'))

s.close()
  • 执行
1
2
3
4
Welcome to docs.20150509.cn
> polarsnow
POLARSNOW
>

多个客户端可以同时连接使用,每个客户端连接时都会创建一个对象,在服务端都会新起一个进来来负责这个线程的处理

浅谈源码

我们就来看看继承的socketserver.BaseRequestHandler的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class BaseRequestHandler:

"""Base class for request handler classes.

This class is instantiated for each request to be handled. The
constructor sets the instance variables request, client_address
and server, and then calls the handle() method. To implement a
specific service, all you need to do is to derive a class which
defines a handle() method.

The handle() method can find the request as self.request, the
client address as self.client_address, and the server (in case it
needs access to per-server information) as self.server. Since a
separate instance is created for each request, the handle() method
can define arbitrary other instance variariables.

"""

def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()

def setup(self):
pass

def handle(self):
pass

def finish(self):
pass

我没有省略,源码中只有这几行

可以看出,创建对象执行构造方法的时候,会自动去执行三个方法

  • self.setup() 负责客户端连接时的处理
  • self.handle() 负责客户端连接后的交互
  • self.finish() 负责客户端断开前的操作

在我们自己的类中,可以通过重写父类的这方法来实现对客户端请求的处理

网络编程中,每次客户端与服务端约定俗成发送包的大小为1024字节,最大为8192字节。但是即使你设置了8192,硬件网卡每次也只能收到1500字节,这个是由硬件网卡的MTU值(最大传输单元,单位字节)决定的。一般千兆网卡的MTU值是1500字节。当一端给另一端发包超过1024字节,另一端没有循环接收消息,就会产生粘包的问题

粘包的问题看上图

客户端给服务端发送了一个命令请求,服务端的返回体大小为11024字节,但是客户端每次只收1024字节,所以客户端的第一次请求之后,服务端还有10000字节的内容还没给客户端发

当客户端发给第二个请求时,第二个请求的返回体大小为100字节,但是队列中还有10000字节的内容没有发送给客户端,所以此时服务端会将上次剩余的10000字节中再拿出1024字节来发送给客户端

解决粘包问题

解决粘包问题的关键点在于我们能否知道每次需要发送的内容总共有多大,如果我们能知道总共有多大,相应的对端就可以算出我需要循环几次可以吧内容全部读完

先来看看原始会发生粘包情况的代码:

  • server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import socket
import subprocess

ip_port = ('127.0.0.1', 5555)

# step 1: 创建socket套接字, 里面封装了通信协议
s = socket.socket()

# step 2: 绑定IP及端口号
s.bind(ip_port)

# step 3: 监听绑定的端口
s.listen(5)

# 把接收客户端连接请求的操作循环起来就可以接受多个用户的请求啦
# 注意:同一时间只能处理一个客户端的请求,其他连接上的用户会排队等待
# 最后支持多少个用户排队等待,是由listen中的参数决定的(连接池)
while True:

# 等待客户端的连接(阻塞函数)
# step 4: 接受客户端的连接
conn, addr = s.accept()
# conn对象里封装了连接过来的这个客户端的通信线路信息
# 后期跟这个客户端的通信与交互都需要在conn这条通信线路上进行

while True:

# step 5: 接收消息(在conn通道没有被关闭的情况下是阻塞的函数,一旦conn被客户端关闭,该函数将不会阻塞)
recv_data = conn.recv(1024)

# 如果conn通道被客户端主动关闭,recv函数将不再阻塞,recv_data将接收到空字符串
# 通过判断recv_data为空字符串来退出服务端的连接
if len(recv_data) == 0: break

# step 6:处理消息
cmd = str(recv_data, encoding='utf-8')
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
res = p.stdout.read()
send_data = str(res, encoding='utf-8')

# step 7: 发送消息
conn.send(bytes(send_data, encoding='utf-8'))

# step 8: 断开连接
conn.close()
  • client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import socket

ip_port = ('127.0.0.1', 5555)

# step 1: 创建socket套接字, 里面封装了通信协议
s = socket.socket()

# step 2: 连接服务端
s.connect(ip_port)

while True:

# step 3: 发送消息
send_data = input("> ").strip()

# 如果客户端输入了exit,退出循环,主动close掉与服务端的连接
if send_data == "exit": break

# 如果什么也没有输入,重新循环接收输入
if len(send_data) == 0: continue

# 这里注意,和服务端不同的是,服务端找到对端是通过conn对象,而客户端是s对象
# 在Python3.x中,socket对象发送对象必须是字节类型(2.7中可以是字符串)
s.send(bytes(send_data, encoding='utf-8'))

# step 4: 收消息
recv_data = s.recv(1024)
print(str(recv_data, encoding='utf-8'))

# step 5: 断开连接
s.close()

要解决上面粘包的问题,首先用白话来描述一下解决的过程

  • 客户端不知道服务端会返回多少内容,所以客户端接收信息的代码需要循环起来
  • 由于客户端接收循环不知道何时停止,前提需要先知道服务端总共会有多少字节的内容发过来
  • 服务端在发送实际数据之前,先计算要发送的数据有多大,把这个信息提前告知客户端,让客户端计算出需要几次循环才能把我服务端本次产生的数据接收完毕
  • 客户端在接收到服务端发来的总长度的信息时,计算出循环的次数,此时应告知服务器端,可以发送数据了

下面我们就按照上面的思路来实现这段代码

  • server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import socket
import subprocess

ip_port = ('127.0.0.1', 5556)

# step 1: 创建socket套接字, 里面封装了通信协议
s = socket.socket()

# step 2: 绑定IP及端口号
s.bind(ip_port)

# step 3: 监听绑定的端口
s.listen(5)

# 把接收客户端连接请求的操作循环起来就可以接受多个用户的请求啦
# 注意:同一时间只能处理一个客户端的请求,其他连接上的用户会排队等待
# 最后支持多少个用户排队等待,是由listen中的参数决定的(连接池)
while True:

# 等待客户端的连接(阻塞函数)
# step 4: 接受客户端的连接
conn, addr = s.accept()
# conn对象里封装了连接过来的这个客户端的通信线路信息
# 后期跟这个客户端的通信与交互都需要在conn这条通信线路上进行

while True:

# step 5: 接收消息(在conn通道没有被关闭的情况下是阻塞的函数,一旦conn被客户端关闭,该函数将不会阻塞)
recv_data = conn.recv(1024)

# 如果conn通道被客户端主动关闭,recv函数将不再阻塞,recv_data将接收到空字符串
# 通过判断recv_data为空字符串来退出服务端的连接
if len(recv_data) == 0: break

# step 6:处理消息
cmd = str(recv_data, encoding='utf-8')
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
res = p.stdout.read()
send_data = str(res, encoding='utf-8') # utf-8 ---> str ---> utf-8

# ------------与客户端沟通数据大小并发送数据的流程------------
# I. Server ---> Client
# 由于发送数据时必须是字节类型,所以计算长度时,应该先将字符串转为字节类型再计算长度
send_data = bytes(send_data, encoding='utf-8')
# 计算字节长度,因为后面发送信息的时候需要做字符串拼接,所以这里先转成字符类型
length_data = str(len(send_data))
# 通知客户端要要发送数据的总长度
conn.send(bytes('length_data:'+length_data, encoding='utf-8'))

# III. Server <--- Client
# 接收客户端发送的确认开始传输的命令
feedback = str(conn.recv(1024), encoding='utf-8')
if feedback == "confirm":
# IV. Server ---> Client
# 开始向客户端传输数据
conn.send(send_data)

# step 7: 发送消息
conn.send(send_data)

# step 8: 断开连接
conn.close()
  • client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import socket

ip_port = ('127.0.0.1', 5556)

# step 1: 创建socket套接字, 里面封装了通信协议
s = socket.socket()

# step 2: 连接服务端
s.connect(ip_port)

while True:

# step 3: 发送消息
send_data = input("> ").strip()

# 如果客户端输入了exit,退出循环,主动close掉与服务端的连接
if send_data == "exit": break

# 如果什么也没有输入,重新循环接收输入
if len(send_data) == 0: continue

# 这里注意,和服务端不同的是,服务端找到对端是通过conn对象,而客户端是s对象
# 在Python3.x中,socket对象发送对象必须是字节类型(2.7中可以是字符串)
s.send(bytes(send_data, encoding='utf-8'))

# step 4: 收消息
# ------------与服务端沟通数据大小并发送数据的流程------------
# 服务端首先发来的信息是即将发送数据的总长度(字节长度)
length_msg = str(s.recv(1024), encoding='utf-8') # 接收的格式 length_data:11024
if length_msg.startswith("length_data"):
# 取得服务端统计的数据总长度
length_data = int(length_msg.split(":")[1])

# II. Server <--- Client
# 客户端已经接收到数据的总长度,回复服务端可以发送数据
s.send(bytes("confirm", encoding='utf8'))

# V. 接收服务端发送过来的数据
# 累计下载字节统计
recv_total_size = 0
# 累计下载数据
recv_total_data = b''
# 进入循环下载
while recv_total_size < length_data:
recv_data = s.recv(1024)
recv_total_data += recv_data
recv_total_size += len(recv_data)
print("传输总大小为: %s ; 当前已传输大小为: %s" % (length_data, recv_total_size))
# 打印完整的数据
print(str(recv_total_data, encoding='utf-8'))

# step 5: 断开连接
s.close()
```

- <font color="orange">执行结果</font>

```bash
> ifconfig
传输总大小为: 1857 ; 当前已传输大小为: 1024
传输总大小为: 1857 ; 当前已传输大小为: 2048
lo0: flags=8049<UP,LOOPBACK,RUNNING,MULTICAST> mtu 16384
options=3<RXCSUM,TXCSUM>
inet6 ::1 prefixlen 128
inet 127.0.0.1 netmask 0xff000000
inet6 fe80::1%lo0 prefixlen 64 scopeid 0x1
nd6 options=1<PERFORMNUD>
gif0: flags=8010<POINTOPOINT,MULTICAST> mtu 1280
stf0: flags=0<> mtu 1280
en0: flags=8863<UP,BROADCAST,SMART,RUNNING,SIMPLEX,MULTICAST> mtu 1500
ether 60:03:08:a5:3b:9e
inet6 fe80::6203:8ff:fea5:3b9e%en0 prefixlen 64 scopeid 0x4
inet 192.168.1.103 netmask 0xffffff00 broadcast 192.168.1.255
nd6 options=1<PERFORMNUD>
media: autoselect
status: active
en1: flags=963<UP,BROADCAST,SMART,RUNNING,PROMISC,SIMPLEX> mtu 1500
options=60<TSO4,TSO6>
ether 72:00:01:f7:dd:70
media: autoselect <full-duplex>
status: inactive
en2: flags=963<UP,BROADCAST,SMART,RUNNING,PROMISC,SIMPLEX> mtu 1500
options=60<TSO4,TSO6>
ether 72:00:01:f7:dd:71
media: autoselect <full-duplex>
status: inactive
p2p0: flags=8843<UP,BROADCAST,RUNNING,SIMPLEX,MULTICAST> mtu 2304
ether 02:03:08:a5:3b:9e
media: autoselect
status: inactive
awdl0: flags=8943<UP,BROADCAST,RUNNING,PROMISC,SIMPLEX,MULTICAST> mtu 1484
ether 0e:80:6c:2f:ac:25
inet6 fe80::c80:6cff:fe2f:ac25%awdl0 prefixlen 64 scopeid 0x8
nd6 options=1<PERFORMNUD>
media: autoselect
status: active
bridge0: flags=8863<UP,BROADCAST,SMART,RUNNING,SIMPLEX,MULTICAST> mtu 1500
options=63<RXCSUM,TXCSUM,TSO4,TSO6>
ether 62:03:08:5a:2d:00
Configuration:
id 0:0:0:0:0:0 priority 0 hellotime 0 fwddelay 0
maxage 0 holdcnt 0 proto stp maxaddr 100 timeout 1200
root id 0:0:0:0:0:0 priority 0 ifcost 0 port 0
ipfilter disabled flags 0x2
member: en1 flags=3<LEARNING,DISCOVER>
ifmaxaddr 0 port 5 priority 0 path cost 0
member: en2 flags=3<LEARNING,DISCOVER>
ifmaxaddr 0 port 6 priority 0 path cost 0
nd6 options=1<PERFORMNUD>
media: <unknown type>
status: inactive
lo0: flags=8049<UP,LOOPBACK,RUNNING,MULTICAST> mtu 16384
options=3<RXCSUM,TXCSUM>
inet6 ::1 prefixlen 128
inet 127.0.0.1 netmask 0xff000000
inet6 fe80::1%lo0 prefixlen 64 scopeid 0x1

>

解决粘包问题实例:

  • server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import socket
import subprocess

ip_port = ('127.0.0.1', 5555)

# step 1: 创建socket套接字, 里面封装了通信协议
s = socket.socket()

# step 2: 绑定IP及端口号
s.bind(ip_port)

# step 3: 监听绑定的端口
s.listen(5)

# 把接收客户端连接请求的操作循环起来就可以接受多个用户的请求啦
# 注意:同一时间只能处理一个客户端的请求,其他连接上的用户会排队等待
# 最后支持多少个用户排队等待,是由listen中的参数决定的(连接池)
while True:

# 等待客户端的连接(阻塞函数)
# step 4: 接受客户端的连接
conn, addr = s.accept()
# conn对象里封装了连接过来的这个客户端的通信线路信息
# 后期跟这个客户端的通信与交互都需要在conn这条通信线路上进行

while True:

# step 5: 接收消息(在conn通道没有被关闭的情况下是阻塞的函数,一旦conn被客户端关闭,该函数将不会阻塞)
recv_data = conn.recv(1024)

# 如果conn通道被客户端主动关闭,recv函数将不再阻塞,recv_data将接收到空字符串
# 通过判断recv_data为空字符串来退出服务端的连接
if len(recv_data) == 0: break

# step 6:处理消息
cmd = str(recv_data, encoding='utf-8')
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
res = p.stdout.read()
send_data = str(res, encoding='utf-8')

# step 7: 发送消息
conn.send(bytes(send_data, encoding='utf-8'))

# step 8: 断开连接
conn.close()

服务端在接收消息之后,拿到系统中去执行,取回标准输出,这里需要特别注意一下字符编码的问题。

在不同的系统中,字符编码可能不一样,如果是Windows平台,字符编码可能是gbk,在linux系统中,字符编码可能是GB18030``zh_CN.UTF-8等等,在这些系统中执行完命令后,回显的字符编码与系统是一致的,我们需要把回显的字节按照系统的编码类型进行编码并转化成Python的string字符类型,由Python的字符串作为中间人去帮我们做转换

类似于这样的过程 gbk ---> str ---> utf-8

  • client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import socket

ip_port = ('127.0.0.1', 5555)

# step 1: 创建socket套接字, 里面封装了通信协议
s = socket.socket()

# step 2: 连接服务端
s.connect(ip_port)

while True:

# step 3: 发送消息
send_data = input("> ").strip()

# 如果客户端输入了exit,退出循环,主动close掉与服务端的连接
if send_data == "exit": break

# 如果什么也没有输入,重新循环接收输入
if len(send_data) == 0: continue

# 这里注意,和服务端不同的是,服务端找到对端是通过conn对象,而客户端是s对象
# 在Python3.x中,socket对象发送对象必须是字节类型(2.7中可以是字符串)
s.send(bytes(send_data, encoding='utf-8'))

# step 4: 收消息
recv_data = s.recv(1024)
print(str(recv_data, encoding='utf-8'))

# step 5: 断开连接
s.close()

提示:上面只对可以正确执行的系统命令做了处理,如果输入了一个错误的命令,那么stdout标准输出根本不会捕捉到,所以会出现问题