一般情况下,我们会设置MySQL默认的字符编码为utf8,但是近些年来,emoji表情的火爆使用,给数据库带来了意外的错误,就是emoji的字符集已经超出了utf8的编码范畴😄

令人抓狂的字符编码问题

谈到字符编码问题,会让很多人感到头疼,这里不在深究各个字符编码的特点和理论,这里只说下Unicode和utf8字符编码的关系

1
2
3
4
5
6
7
Unicode是编码字符集,而UTF-8就是字符编码,即Unicode规则字库的一种实现形式。
随着互联网的发展,对同一字库集的要求越来越迫切,Unicode标准也就自然而然的出现。
它几乎涵盖了各个国家语言可能出现的符号和文字,并将为他们编号。
详见:Unicode on Wikipedia。
Unicode的编号从0000开始一直到10FFFF共分为16个Plane,每个Plane中有65536个字符。
而UTF-8则只实现了第一个Plane,可见UTF-8虽然是一个当今接受度最广的字符集编码,
但是它并没有涵盖整个Unicode的字库,这也造成了它在某些场景下对于特殊字符的处理困难

简单的说在计算机内存中,统一使用Unicode编码,当需要保存到硬盘或者需要传输的时候,就转换为UTF-8编码

用记事本编辑的时候,从文件读取的UTF-8字符被转换为Unicode字符到内存里,编辑完成后,保存的时候再把Unicode转换为UTF-8保存到文件

emoji是Unicode编码,在MySQL中使用utf8编码无法正常显示emoji的表情,为了解决这个问题,MySQL在5.5.3版本之后,引进了新的字符编码utf8mb4,本篇文章主要介绍如何将已经是utf8的database切换到utf8mb4字符编码

什么是utf8mb4

utf8mb4最明显的好处是解决了苹果挖的坑-推广了emoji表情。utf8mb4解决了MySQL数据库存储emoji表情的问题

utf8mb4是utf8的超集,理论上由utf8升级到utf8mb4字符编码没有任何兼容问题

升级utf8到utf8mb4

1. 备份

安全第一,备份所有需要升级字符编码的数据库

  • 可以将库dump出来
  • 如果是虚拟机,可以给整个主机做快照

2. 升级

utf8mb4是MySQL5.5.3版本之后支持的字符集,so,如果你需要使用这个字符集,前提条件是你的MySQL版本必须 >= 5.5.3

3. 修改

在MySQL中,可以为一个database设置字符编码,可以为一张表设置字符编码,甚至可以为某一个字段设置字符编码

  • 查看当前系统默认的字符集设置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
mysql> SHOW VARIABLES WHERE Variable_name LIKE 'character\_set\_%' OR Variable_name LIKE 'collation%';
+--------------------------+-----------------+
| Variable_name | Value |
+--------------------------+-----------------+
| character_set_client | utf8 |
| character_set_connection | utf8 |
| character_set_database | utf8 |
| character_set_filesystem | binary |
| character_set_results | utf8 |
| character_set_server | utf8 |
| character_set_system | utf8 |
| collation_connection | utf8_general_ci |
| collation_database | utf8_general_ci |
| collation_server | utf8_general_ci |
+--------------------------+-----------------+
10 rows in set (0.01 sec)

mysql>
  • 查看database的字符编码
1
2
3
4
5
6
7
8
9
mysql> show create database polarsnow;
+-----------+--------------------------------------------------------------------+
| Database | Create Database |
+-----------+--------------------------------------------------------------------+
| polarsnow | CREATE DATABASE `polarsnow` /*!40100 DEFAULT CHARACTER SET utf8 */ |
+-----------+--------------------------------------------------------------------+
1 row in set (0.00 sec)

mysql>
  • 查看table的字符编码
1
2
3
4
5
6
7
8
9
mysql> show create table ps;
+-------+---------------------------------------------------------------------------------------------+
| Table | Create Table |
+-------+---------------------------------------------------------------------------------------------+
| ps | CREATE TABLE `ps` (
`name` varchar(100) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 |
+-------+---------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
  • 查看column的字符编码
1
2
3
4
5
6
7
mysql> show full columns from ps;
+-------+--------------+-----------------+------+-----+---------+-------+---------------------------------+---------+
| Field | Type | Collation | Null | Key | Default | Extra | Privileges | Comment |
+-------+--------------+-----------------+------+-----+---------+-------+---------------------------------+---------+
| name | varchar(100) | utf8_general_ci | YES | | NULL | | select,insert,update,references | |
+-------+--------------+-----------------+------+-----+---------+-------+---------------------------------+---------+
1 row in set (0.00 sec)

修改database默认的字符集

ALTER DATABASE database_name CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
mysql> ALTER DATABASE polarsnow CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
Query OK, 1 row affected (0.03 sec)

mysql> show create database polarsnow;
+-----------+--------------------------------------------------------------------------------------------------+
| Database | Create Database |
+-----------+--------------------------------------------------------------------------------------------------+
| polarsnow | CREATE DATABASE `polarsnow` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci */ |
+-----------+--------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)

mysql> show tables;
+---------------------+
| Tables_in_polarsnow |
+---------------------+
| ps |
+---------------------+
1 row in set (0.00 sec)

mysql> show create table ps;
+-------+---------------------------------------------------------------------------------------------+
| Table | Create Table |
+-------+---------------------------------------------------------------------------------------------+
| ps | CREATE TABLE `ps` (
`name` varchar(100) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 |
+-------+---------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)

mysql> show full columns from ps;
+-------+--------------+-----------------+------+-----+---------+-------+---------------------------------+---------+
| Field | Type | Collation | Null | Key | Default | Extra | Privileges | Comment |
+-------+--------------+-----------------+------+-----+---------+-------+---------------------------------+---------+
| name | varchar(100) | utf8_general_ci | YES | | NULL | | select,insert,update,references | |
+-------+--------------+-----------------+------+-----+---------+-------+---------------------------------+---------+
1 row in set (0.00 sec)

mysql> create table test_tb2 (tb2 varchar(100) );
Query OK, 0 rows affected (0.21 sec)

mysql> show tables;
+---------------------+
| Tables_in_polarsnow |
+---------------------+
| ps |
| test_tb2 |
+---------------------+
2 rows in set (0.00 sec)

mysql> show create table test_tb2;
+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
| test_tb2 | CREATE TABLE `test_tb2` (
`tb2` varchar(100) COLLATE utf8mb4_unicode_ci DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci |
+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)

mysql>

可以看到,虽然修改了database的字符集为utf8mb4,但是实际只是修改了database新创建的表,默认使用utf8mb4,原来已经存在的表,字符集并没有跟着改变,需要手动为每张表设置字符集

修改table的字符集

  • 只修改表默认的字符集 ALTER TABLE table_name DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
  • 修改表默认的字符集和所有字符列的字符集 ALTER TABLE table_name CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
mysql> show create table ps;
+-------+---------------------------------------------------------------------------------------------+
| Table | Create Table |
+-------+---------------------------------------------------------------------------------------------+
| ps | CREATE TABLE `ps` (
`name` varchar(100) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 |
+-------+---------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)

mysql> show full columns from ps;
+-------+--------------+-----------------+------+-----+---------+-------+---------------------------------+---------+
| Field | Type | Collation | Null | Key | Default | Extra | Privileges | Comment |
+-------+--------------+-----------------+------+-----+---------+-------+---------------------------------+---------+
| name | varchar(100) | utf8_general_ci | YES | | NULL | | select,insert,update,references | |
+-------+--------------+-----------------+------+-----+---------+-------+---------------------------------+---------+
1 row in set (0.00 sec)

mysql> ALTER TABLE ps CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
Query OK, 0 rows affected (0.38 sec)
Records: 0 Duplicates: 0 Warnings: 0

mysql> show create table ps;
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------+
| ps | CREATE TABLE `ps` (
`name` varchar(100) COLLATE utf8mb4_unicode_ci DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)

mysql> show full columns from ps;
+-------+--------------+--------------------+------+-----+---------+-------+---------------------------------+---------+
| Field | Type | Collation | Null | Key | Default | Extra | Privileges | Comment |
+-------+--------------+--------------------+------+-----+---------+-------+---------------------------------+---------+
| name | varchar(100) | utf8mb4_unicode_ci | YES | | NULL | | select,insert,update,references | |
+-------+--------------+--------------------+------+-----+---------+-------+---------------------------------+---------+
1 row in set (0.00 sec)

mysql>

修改column默认的字符集

ALTER TABLE table_name CHANGE column_name column_name VARCHAR(191) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

注:VARCHAR(191) 根据字段实例的类型填写

4. 检查字段的最大长度和索引列

  • 字段长度

由于从utf8升级到了utf8mb4,一个字符所占用的空间也由3个字节增长到4个字节,但是我们当初创建表时,设置的字段类型以及最大的长度没有改变。例如,你在utf8下设置某一字段的类型为TINYTEXT, 这中字段类型最大可以容纳255字节,三个字节一个字符的情况下可以容纳85个字符,四个字节一个字符的情况下只能容纳63个字符,如果原表中的这个字段的值有一个或多个超过了63个字符,那么转换成utf8mb4字符编码时将转换失败,你必须先将TINYTEXT更改为TEXT等更高容量的类型之后才能继续转换字符编码

  • 索引

在InnoDB引擎中,最大的索引长度为767字节,三个字节一个字符的情况下,索引列的字符长度最大可以达到255,四个字节一个字符的情况下,索引的字符长度最大只能到191。如果你已经存在的表中的索引列的类型为VARCHAR(255)那么转换utf8mb4时同样会转换失败。你需要先将VARCHAR(255)更改为VARCHAR(191)才能继续转换字符编码

5. 修改配置文件

SET NAMES utf8 COLLATE utf8_unicode_ci becomes SET NAMES utf8mb4 COLLATE utf8mb4_unicode_ci

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
> vim /etc/my.cnf
# 对本地的mysql客户端的配置
[client]
default-character-set = utf8mb4

# 对其他远程连接的mysql客户端的配置
[mysql]
default-character-set = utf8mb4

# 本地mysql服务的配置
[mysqld]
character-set-client-handshake = FALSE
character-set-server = utf8mb4
collation-server = utf8mb4_unicode_ci
> service mysqld restart

检查修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
mysql> SHOW VARIABLES WHERE Variable_name LIKE 'character\_set\_%' OR Variable_name LIKE 'collation%';
+--------------------------+--------------------+
| Variable_name | Value |
+--------------------------+--------------------+
| character_set_client | utf8mb4 |
| character_set_connection | utf8mb4 |
| character_set_database | utf8mb4 |
| character_set_filesystem | binary |
| character_set_results | utf8mb4 |
| character_set_server | utf8mb4 |
| character_set_system | utf8 |
| collation_connection | utf8mb4_unicode_ci |
| collation_database | utf8mb4_unicode_ci |
| collation_server | utf8mb4_unicode_ci |
+--------------------------+--------------------+
10 rows in set (0.00 sec)

注:character_set_system 一直都会是 utf8,不能被更改

6. 修复&优化所有数据表

1
> mysqlcheck -u root -p --auto-repair --optimize --all-databases

总结

不要在MySQL上使用utf8字符编码,推荐使用utf8mb4,至于为什么,引用国外友人的一段话:

Never use utf8 in MySQL — always use utf8mb4 instead. Updating your databases and code might take some time, but it’s definitely worth the effort. Why would you arbitrarily limit the set of symbols that can be used in your database? Why would you lose data every time a user enters an astral symbol as part of a comment or message or whatever it is you store in your database? There’s no reason not to strive for full Unicode support everywhere. Do the right thing, and use utf8mb4. 🍻

参考文档

从MySQL5.6版本开始,支持以事务的方式来做主从同步,最大限度的保证MySQL的主从一致性。实现这一复制特性的关键是GTID(Global Transaction Identifiers)全局事务ID,通过GTID来强化数据库的主从一致性,故障恢复以及容错能力

MySQL5.7支持在线修改复制类型,MySQL5.6只能离线修改,本篇文章主要介绍离线修改复制类型的方法,后续的文章中会介绍如何在线更换复制类型

什么是GTID

官方文档:

A GTID is represented as a pair of coordinates, separated by a colon character (:), as shown here:

1
GTID = source_id:transaction_id

每一个 GTID 代表一个数据库事务。在上面的定义中,source_id 表示执行事务的主库 uuid(server_uuid),transaction_id 是一个从 1 开始的自增计数,表示在这个主库上执行的第 n 个事务。MySQL 会保证事务与 GTID 之间的 1 : 1 映射

服务器资源

  • master:192.168.10.153
  • slave:192.168.10.157

安装MySQL5.7

可以参考上一篇文章来安装MySQL5.7

master配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
> vim /etc/my.cnf
# client的配置会被MySQL客户端应用读取
# 只有MySQL附带的客户端应用程序保证可以读取到这段内容
[client]
port = 3306
socket = /tmp/mysql.sock
# 生产环境中所使用的字符集推荐设置为utf8mb4
# 这里默认使用utf8,字符集的问题将有独立的文章介绍
default-character-set = utf8

# 客户端读取的配置文件
[mysql]
no-auto-rehash
# 生产环境中所使用的字符集推荐设置为utf8mb4
# 这里默认使用utf8,字符集的问题将有独立的文章介绍
default-character-set = utf8

# MySQL服务端读取的配置文件
[mysqld]
server-id = 10153 # 保证server-id的唯一性,这里采用了IP的后两位来保证唯一性
port = 3306 # MySQL服务监听的端口号
user = mysql # 以mysql用户来运行MySQL服务进程
basedir = /usr/local/mysql # MySQL服务的根目录(编译安装指定路径,yum安装注释掉即可)
datadir = /data/mysqldata # 数据目录
socket = /tmp/mysql.sock # socket文件所在的位置
default-storage-engine = INNODB # 默认的存储引擎
# 生产环境中所使用的字符集推荐设置为utf8mb4
# 这里默认使用utf8,字符集的问题将有独立的文章介绍
character-set-server = utf8
connect_timeout = 60 # 连接超时时间
interactive_timeout = 28800 # MySQL在关闭一个交互的连接之前所要等待的秒数(交互连接如mysql gui tool中的连接)
wait_timeout = 28800 # MySQL在关闭一个非交互的连接之前所要等待的秒数
back_log = 500 # 操作系统在监听队列中所能保持的连接数
event_scheduler = ON # 开启定时任务机制
skip_name_resolve = ON # 忽略IP方向解析

###########binlog##########
log-bin = /data/mysqlLog/logs/mysql-bin # 打开二进制日志功能
# 当设置隔离级别为READ-COMMITED必须设置二进制日志格式为ROW
# 现在MySQL官方认为STATEMENT这个已经不再适合继续使用
# 但mixed类型在默认的事务隔离级别下,可能会导致主从数据不一致
binlog_format = row # 复制模式为行级模式(复制模式的介绍可以参考本站关于复制基础的文章)
max_binlog_size = 128M # 每个二进制日志最大的文件大小
binlog_cache_size = 2M # 二进制日志缓存大小
expire-logs-days = 5 # 二进制日志的保存时间(保存最近5天的二进制日志)
# 将slave在master收到的更新记入到slave自己的二进制日志文件中
# 在MySQL级联复制中,这个参数必须打开(A=>B=>C)
log-slave-updates=true

# 以下三个参数启用复制有关的所有校验功能
binlog_checksum = CRC32 # checksum使用zlib中的CRC-32算法
# 不仅dump thread会对event进行校验,当master上执行show binlog events的时候
# 也会对event进行校验
# 设置为1,可以保证event被完整无缺地写入到主服务器的binlog中了
master_verify_checksum = 1
slave_sql_verify_checksum = 1 # 设置为1,slave上的IO Thread写入到Relay Log时和SQL Thread读取Relay Log时会对checksum进行验证

binlog_rows_query_log_events = 1 # 可用于在二进制日志记录事件相关的信息,可降低故障排除的复杂度

###### GTID事务复制支持部分 ######
gtid-mode=on # 开启全局事务ID
enforce-gtid-consistency=true # 开启强制全局事务ID一致性(用于启动GTID及满足附属的其它需求)

# 启用此两项,可用于实现在崩溃时保证二进制及从服务器安全的功能
master-info-repository=TABLE
relay-log-info-repository=TABLE


sync-master-info=1 # 确保无信息丢失
slave-parallel-workers=4 # 设定从服务器的SQL线程数;0表示关闭多线程复制功能
# rpl_semi_sync_master_enabled = 1 # 半同步复制

# 慢SQL的相关配置
slow_query_log = 1
slow_query_log_file = /data/mysqlLog/logs/mysql.slow
long_query_time = 1

log_error = /data/mysqlLog/logs/error.log # 错误信息的配置
max_connections = 3000 # MySQL的最大连接数
max_connect_errors = 32767 # 某一客户端尝试连接此MySQL服务器,但是失败(如密码错误等等)32767次,则MySQL会无条件强制阻止此客户端连接
log_bin_trust_function_creators = 1 # 允许使用MySQL自定义函数
transaction_isolation = READ-COMMITTED # 设置事务隔离级别

slave配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
> vim /etc/my.cnf
[client]
port = 3306
socket = /tmp/mysql.sock
default-character-set = utf8

[mysql]
no-auto-rehash
default-character-set = utf8

[mysqld]
server-id = 10157
port = 3306
user = mysql
basedir = /usr/local/mysql
datadir = /data/mysqldata
socket = /tmp/mysql.sock
default-storage-engine = INNODB
character-set-server = utf8
connect_timeout = 60
wait_timeout = 18000
back_log = 500
event_scheduler = ON

###########binlog##########
log-bin = /data/mysqlLog/logs/mysql-bin
binlog_format = row
max_binlog_size = 128M
binlog_cache_size = 2M
expire-logs-days = 5
log-slave-updates=true
gtid-mode=on
enforce-gtid-consistency=true
master-info-repository=TABLE
relay-log-info-repository=TABLE
sync-master-info=1
slave-parallel-workers=4
# rpl_semi_sync_slave_enabled = 1
# skip-slave-start # slave复制进程不随mysql启动而启动

slow_query_log = 1
slow_query_log_file = /data/mysqlLog/logs/mysql.slow
long_query_time = 2

log-error = /data/mysqlLog/logs/error.log
max_connections = 3000
max_connect_errors = 10000
log_bin_trust_function_creators = 1
transaction_isolation = READ-COMMITTED

分别启动主库和从库

1
> systemctl restart mysqld

在主库中创建复制用户

1
2
3
4
5
6
7
8
mysql> SET GLOBAL validate_password_policy = LOW;
Query OK, 0 rows affected (0.00 sec)

mysql> create user 'repl' identified by '12345678';
Query OK, 0 rows affected (0.05 sec)

mysql> grant replication slave on *.* to repl@'%';
Query OK, 0 rows affected (0.02 sec)

查看主库与从库GTID状态

1
2
3
4
5
6
7
mysql> show variables like 'gtid_mode';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| gtid_mode | ON |
+---------------+-------+
1 row in set (0.01 sec)

在从库启动复制线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
mysql> change master to
-> master_host='192.168.10.153',
-> master_port=3306,
-> master_user='repl',
-> master_password='12345678',
-> master_auto_position=1 for channel "db153";
Query OK, 0 rows affected, 2 warnings (0.27 sec)

mysql> start slave for channel "db153";
Query OK, 0 rows affected (0.09 sec)

mysql> show slave status for channel "db153" \G
*************************** 1. row ***************************
Slave_IO_State: Waiting for master to send event
Master_Host: 192.168.10.153
Master_User: repl
Master_Port: 3306
Connect_Retry: 60
Master_Log_File: mysql-bin.000001
Read_Master_Log_Pos: 595
Relay_Log_File: localhost-relay-bin-db153.000002
Relay_Log_Pos: 808
Relay_Master_Log_File: mysql-bin.000001
Slave_IO_Running: Yes
Slave_SQL_Running: Yes
Replicate_Do_DB:
Replicate_Ignore_DB:
Replicate_Do_Table:
Replicate_Ignore_Table:
Replicate_Wild_Do_Table:
Replicate_Wild_Ignore_Table:
Last_Errno: 0
Last_Error:
Skip_Counter: 0
Exec_Master_Log_Pos: 595
Relay_Log_Space: 1025
Until_Condition: None
Until_Log_File:
Until_Log_Pos: 0
Master_SSL_Allowed: No
Master_SSL_CA_File:
Master_SSL_CA_Path:
Master_SSL_Cert:
Master_SSL_Cipher:
Master_SSL_Key:
Seconds_Behind_Master: 0
Master_SSL_Verify_Server_Cert: No
Last_IO_Errno: 0
Last_IO_Error:
Last_SQL_Errno: 0
Last_SQL_Error:
Replicate_Ignore_Server_Ids:
Master_Server_Id: 10153
Master_UUID: 1349d343-6611-11e6-b341-005056ad5f2f
Master_Info_File: mysql.slave_master_info
SQL_Delay: 0
SQL_Remaining_Delay: NULL
Slave_SQL_Running_State: Slave has read all relay log; waiting for more updates
Master_Retry_Count: 86400
Master_Bind:
Last_IO_Error_Timestamp:
Last_SQL_Error_Timestamp:
Master_SSL_Crl:
Master_SSL_Crlpath:
Retrieved_Gtid_Set: 1349d343-6611-11e6-b341-005056ad5f2f:1-2
Executed_Gtid_Set: 1349d343-6611-11e6-b341-005056ad5f2f:1-2
Auto_Position: 1
Replicate_Rewrite_DB:
Channel_Name: db153
Master_TLS_Version:
1 row in set (0.00 sec)

mysql>

验证主从同步

master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
mysql> show master status;
+------------------+----------+--------------+------------------+------------------------------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+------------------------------------------+
| mysql-bin.000001 | 595 | | | 1349d343-6611-11e6-b341-005056ad5f2f:1-2 |
+------------------+----------+--------------+------------------+------------------------------------------+
1 row in set (0.00 sec)

mysql> create database polarsnow;
Query OK, 1 row affected (0.03 sec)

mysql> show master status;
+------------------+----------+--------------+------------------+------------------------------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+------------------------------------------+
| mysql-bin.000001 | 769 | | | 1349d343-6611-11e6-b341-005056ad5f2f:1-3 |
+------------------+----------+--------------+------------------+------------------------------------------+
1 row in set (0.00 sec)

mysql>

slave

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
mysql> show slave status for channel "db153"\G
*************************** 1. row ***************************
Slave_IO_State: Waiting for master to send event
Master_Host: 192.168.10.153
Master_User: repl
Master_Port: 3306
Connect_Retry: 60
Master_Log_File: mysql-bin.000001
Read_Master_Log_Pos: 769
Relay_Log_File: localhost-relay-bin-db153.000002
Relay_Log_Pos: 982
Relay_Master_Log_File: mysql-bin.000001
Slave_IO_Running: Yes
Slave_SQL_Running: Yes
Replicate_Do_DB:
Replicate_Ignore_DB:
Replicate_Do_Table:
Replicate_Ignore_Table:
Replicate_Wild_Do_Table:
Replicate_Wild_Ignore_Table:
Last_Errno: 0
Last_Error:
Skip_Counter: 0
Exec_Master_Log_Pos: 769
Relay_Log_Space: 1199
Until_Condition: None
Until_Log_File:
Until_Log_Pos: 0
Master_SSL_Allowed: No
Master_SSL_CA_File:
Master_SSL_CA_Path:
Master_SSL_Cert:
Master_SSL_Cipher:
Master_SSL_Key:
Seconds_Behind_Master: 0
Master_SSL_Verify_Server_Cert: No
Last_IO_Errno: 0
Last_IO_Error:
Last_SQL_Errno: 0
Last_SQL_Error:
Replicate_Ignore_Server_Ids:
Master_Server_Id: 10153
Master_UUID: 1349d343-6611-11e6-b341-005056ad5f2f
Master_Info_File: mysql.slave_master_info
SQL_Delay: 0
SQL_Remaining_Delay: NULL
Slave_SQL_Running_State: Slave has read all relay log; waiting for more updates
Master_Retry_Count: 86400
Master_Bind:
Last_IO_Error_Timestamp:
Last_SQL_Error_Timestamp:
Master_SSL_Crl:
Master_SSL_Crlpath:
Retrieved_Gtid_Set: 1349d343-6611-11e6-b341-005056ad5f2f:1-3
Executed_Gtid_Set: 1349d343-6611-11e6-b341-005056ad5f2f:1-3
Auto_Position: 1
Replicate_Rewrite_DB:
Channel_Name: db153
Master_TLS_Version:
1 row in set (0.00 sec)

mysql> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| mysql |
| performance_schema |
| polarsnow |
| sys |
+--------------------+
5 rows in set (0.00 sec)

mysql>

至此,事务同步配置成功!

在之前使用二进制复制的主从模式时,经历了各种主从数据不一致的情况,而从MySQL5.6开始新引入的事务复制能否解决二进制bin-log复制的数据一致性的痛点还有待观察和检验……

附录

这里使用yum的方式安装mysql5.7,仅为测试使用,生产环境中,还是建议使用cmake编译安装mysql

安装MySQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
> yum remove -y mysql
> cd /usr/local/src/
> wget http://repo.mysql.com/mysql57-community-release-el7-8.noarch.rpm
> rpm -ivh mysql57-community-release-el7-8.noarch.rpm
警告:mysql57-community-release-el7-8.noarch.rpm: 头V3 DSA/SHA1 Signature, 密钥 ID 5072e1f5: NOKEY
准备中... ################################# [100%]
正在升级/安装...
1:mysql57-community-release-el7-8 ################################# [100%]
> yum install -y mysql-server
> systemctl start mysqld
> systemctl status mysqld
● mysqld.service - MySQL Server
Loaded: loaded (/usr/lib/systemd/system/mysqld.service; enabled; vendor preset: disabled)
Active: active (running) since 五 2016-08-19 09:30:24 EDT; 29s ago
Process: 105136 ExecStart=/usr/sbin/mysqld --daemonize --pid-file=/var/run/mysqld/mysqld.pid $MYSQLD_OPTS (code=exited, status=0/SUCCESS)
Process: 105049 ExecStartPre=/usr/bin/mysqld_pre_systemd (code=exited, status=0/SUCCESS)
Main PID: 105138 (mysqld)
CGroup: /system.slice/mysqld.service
└─105138 /usr/sbin/mysqld --daemonize --pid-file=/var/run/mysqld/mysqld.pid

8月 19 09:30:16 localhost.localdomain systemd[1]: Starting MySQL Server...
8月 19 09:30:24 localhost.localdomain systemd[1]: Started MySQL Server.
> cat /var/log/mysqld.log| grep "temporary password"
2016-08-19T13:30:20.547835Z 1 [Note] A temporary password is generated for root@localhost: TVu7ouUi>55l
> mysql -uroot -p
Enter password:TVu7ouUi>55l
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 2
Server version: 5.7.14

Copyright (c) 2000, 2016, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql>

设置root用户密码

MySQL5.7默认的密码复杂度为:大小写字母+数字+特殊符号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
mysql> alter user user() identified by 'w{YQW6L;Dsf6vw';
Query OK, 0 rows affected (0.00 sec)

mysql>
mysql> SHOW VARIABLES LIKE 'validate_password%';
+--------------------------------------+--------+
| Variable_name | Value |
+--------------------------------------+--------+
| validate_password_dictionary_file | |
| validate_password_length | 8 |
| validate_password_mixed_case_count | 1 |
| validate_password_number_count | 1 |
| validate_password_policy | MEDIUM |
| validate_password_special_char_count | 1 |
+--------------------------------------+--------+
6 rows in set (0.01 sec)

mysql对于密码有3种检验策略,默认validate_password_policyMEDIUM

  • LOW policy tests password length only. Passwords must be at least 8 characters long.
  • MEDIUM policy adds the conditions that passwords must contain at least 1 numeric character, 1 lowercase and uppercase character, and 1 special (nonalphanumeric) character.
  • STRONG policy adds the condition that password substrings of length 4 or longer must not match words

在默认MEDIUM的策略下,修改密码为:12345678会报错

1
2
mysql> alter user user() identified by '12345678';
ERROR 1819 (HY000): Your password does not satisfy the current policy requirements

修改MySQL密码检查策略

1
2
3
4
5
mysql> SET GLOBAL validate_password_policy = LOW;
Query OK, 0 rows affected (0.00 sec)

mysql> alter user user() identified by '12345678';
Query OK, 0 rows affected (0.00 sec)

整形

存储类型 存储空间(字节) 最小值 最大值
TINYINT 1 -128 127
0 255
SMALLINT 2 -32768 32767
0 65535
MEDIUMINT 3 -8388608 8388607
0 16777215
INT 4 -2147483648 4147483647
0 4294967295
BIGINT 8 -9223372036854775808 9223372036854775807
0 18446744073709551615

老生常谈的问题

  • int(11) VS int(21) 有什么区别

是存储空间有区别?还是存储范围有区别?

答案是:都没有区别,只在特性情况下显示上有些区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 设置空位补零
mysql> create table testint (a int(11) zerofill, b int(21) zerofill);
Query OK, 0 rows affected (0.03 sec)

mysql> insert into testint values(1,1);
Query OK, 1 row affected (0.01 sec)

mysql> select * from testint;
+-------------+-----------------------+
| a | b |
+-------------+-----------------------+
| 00000000001 | 000000000000000000001 |
+-------------+-----------------------+
1 row in set (0.00 sec)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 默认情况下数字前面不会补零
mysql> create table testint (a int(11), b int(21));
Query OK, 0 rows affected (0.03 sec)

mysql> insert into testint values(1,1);
Query OK, 1 row affected (0.01 sec)

mysql> select * from testint;
+------+------+
| a | b |
+------+------+
| 1 | 1 |
+------+------+
1 row in set (0.00 sec)

浮点型

类型 存储空间(字节) 精度 精确性
FLOAT 4 单精度 非精确
DOUBLE 8 双精度 比FLOAT精度高
  • FLOAT(9,5)
  • DOUBLE(9,5)

指定数字的总位数最大为9,小数点后最多显示5位数

FLOAT和DOUBLE都是非精确型的数据类型,非精确型的数据类型的问题是精度的丢失

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
mysql> create table `t` (                                                                           ->   `a` int(11) default null,
-> `b` float(7,4) default null
-> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Query OK, 0 rows affected (0.02 sec)

mysql> insert into t values (1, 123.12345);
Query OK, 1 row affected (0.00 sec)

mysql> select * from t;
+------+----------+
| a | b |
+------+----------+
| 1 | 123.1235 |
+------+----------+
1 row in set (0.00 sec)

精确的数字类型

DECIMAL(9,5) 定点数-更精确的数字类型

  • 高精度的数据类型,常用来存储交易相关的数据
  • DECIMAL(M,N),M代表总精度,N代表小数点右侧的位数(1 < M < 254; 0 < N < 60)
  • 和FLOAT和DOUBLE不同的是,DECIMAL的存储空间是变长的

经验之谈

  • 存储性别、省份、类型等分类信息时选择TINYINT或者enmu
  • BIGINT存储空间更大,INT和BIGINT之间通常选择GIBINT
  • 交易等高精度数据时选择使用DECIMAL数据类型

在使用ssh客户端连接远程主机时,如果远程主机是首次访问,会提示添加远程主机的指纹信息;如果远程主机的信息发生变更,则不能连接建立连接,报REMOTE HOST IDENTIFICATION HAS CHANGED!的错误,此时需要手动去~/.ssh/known_hosts中删除相关主机的指纹信息,重新保存才可建立连接

这里提供两种方式,强制ssh保存所有的主机指纹信息,即使远程主机信息发生变更,则替换~/.ssh/known_hosts文件中的相关条目

全局配置

1
2
3
> vim /etc/ssh/ssh_config
# StrictHostKeyChecking ask
StrictHostKeyChecking no

用户配置

1
2
> vim ~/.ssh/config
StrictHostKeyChecking no

生产者向rabbitmq server发送消息,其实消息并不是由生产者直接向rabbitmq server中的队列中插入数据,在队列的前面,还挡着一层exchange,数据是由生产者发送给rabbitmq的exchange,然后由exchange存入对应的队列的

发布与订阅-fanout

exchange type= fanout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 消费者
import pika

# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))
# 创建频道对象
channel = connection.channel()

# 指定exchange类型
channel.exchange_declare(exchange='logs', type='fanout')

# 随机创建一个队列名称
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 将该队列与前面创建的exchange做绑定
# 当绑定完成后,生产者再向 exchange='logs' 中发送消息时,将自动将该消息插入到该队列中
channel.queue_bind(exchange='logs', queue=queue_name)


print(' [*] Waiting for logs. To exit press CTRL+C')

# 取到数据后的回调函数
def callback(ch, method, properties, body):
print(" [x] %r" % body)

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 生产者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='123.57.233.243'))
channel = connection.channel()

# 指定exchange='logs'
channel.exchange_declare(exchange='logs', type='fanout')

message = "PolarSnow"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()

启动多个消费者,那么多个消费者就创建了多个队列(因为每一个消费者都是随机的队列名字),且所有的消费者创建的队列都和名字为logs的exchange做了绑定,也就是说,如果有生产者向exchange='logs'中发送消息,那么所有的消费者对应的队列都会收到消息

关键字匹配-direct

exchange type=direct

在指定exchange名称的基础之上,还可以通过匹配关键字来分流消息

就像上图一样,同一个队列可以绑定多个关键字,且同一个关键字可以被不同的队列重复绑定

下面就来实现一个上图效果的队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 消费者1[info, error, warning]
import pika

# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))
# 创建频道对象
channel = connection.channel()

# 指定exchange的名称和类型
# 类型direct, 绑定关键字
channel.exchange_declare(exchange='direct_logs', type='direct')

# 创建一个名字随机的队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 这里需要为一个队列绑定多个关键字,在这里输入需要绑定的关键字列表
severities = ['info', 'error', 'warning']

# 循环绑定所有的关键字routing_key=severity
for severity in severities:
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 消费者2[error,]
import pika

# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))
# 创建频道对象
channel = connection.channel()

# 指定exchange的名称和类型
# 类型direct, 绑定关键字
channel.exchange_declare(exchange='direct_logs', type='direct')

# 创建一个名字随机的队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 这里需要为一个队列绑定多个关键字,在这里输入需要绑定的关键字列表
severities = ['error',]

# 循环绑定所有的关键字routing_key=severity
for severity in severities:
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 生产者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='123.57.233.243'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', type='direct')

# 指定关键字为info时,只有第二个队列可以收到
severity = 'info'
# 指定关键字为error时,两个指定了exchange='direct_logs'的队列都可以收到
# serverity = 'error'

message = 'PolarSnow'
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

模糊匹配-topic

exchange type = topic

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • # 表示可以匹配 0 个 或 多个 单词
    • 表示只能匹配 一个 单词
1
2
3
发送者路由值              队列中
docs.20150509.cn docs.* -- 不匹配
docs.20150509.cn docs.# -- 匹配
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 消费者1[*.info.*, *.*.*error, warning.#]
import pika

# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))
# 创建频道对象
channel = connection.channel()

# 指定exchange的名称和类型
# 类型direct, 绑定关键字
channel.exchange_declare(exchange='topic_logs', type='direct')

# 创建一个名字随机的队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 跟上面的关键字匹配不同,这里需要绑定通配符
severities = ['*.info.*', '*.*.*error', 'warning.#']

# 循环绑定所有的关键字routing_key=severity
for severity in severities:
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 消费者2[*.*.*error,]
import pika

# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))
# 创建频道对象
channel = connection.channel()

# 指定exchange的名称和类型
# 类型direct, 绑定关键字
channel.exchange_declare(exchange='topic_logs', type='direct')

# 创建一个名字随机的队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 跟上面的关键字匹配不同,这里需要绑定通配符
severities = ['*.*.*error',]

# 循环绑定所有的关键字routing_key=severity
for severity in severities:
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 生产者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='123.57.233.243'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', type='direct')

# 指定关键字为info时,只有第二个队列可以收到
# rabbitmq绑定了topic,支持模糊匹配
severity = 'a.info.b'
# 指定关键字为error时,两个指定了exchange='topic_logs'的队列都可以收到
# serverity = 'a.b.c.error'

message = 'PolarSnow'
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

基于消息队列的主机管理系统

  • CLI在向server端发送命令时,指定了主机列表和一个随机的md5值
  • 消息队列中匹配主机对应的队列,将需要执行的命令插入到队列中
  • 对应的消费者(agent)监听到有新的消息后,在本地执行命令,并将执行结果发送给server端。在server端创建以md5为名字的临时队列,并将执行结果放在该队列中
  • 此次任务的所有消费者拿到的md5值都是相同的,所以,所有的消费者执行命令之后,都会将执行结果放入以md5命令的临时消息队列中
  • 全部消费者执行完毕后,在TempQ(md5)中保存了此次任务,所有消费者的执行结果
  • 最后临时队列中的结果可以一一被读出展示给CLI,最后删除该临时队列

消息不丢失

程序在去消息队列中取数据时,如果在没有对该消息处理完毕时,机器宕机,那么将丢失掉对这条消息的处理,下面介绍如何避免这样的情况发生

no_ack

no_ack = False, 如果消费者遇到问题,没有完成对消息的处理,那么rabbitmq会重新将该任务添加到队列中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import pika
import time

# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))

# 创建频道
channel = connection.channel()

# 切换到指定的队列中,如果队列不存在,则创建
channel.queue_declare(queue='hello')

# 取到消息后执行的回调函数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(10)
print('ok')
# 通知rabbitmq server,已经对消息处理完毕,可以释放掉保存的这个消息资源
ch.basic_ack(delivery_tag=method.delivery_tag)

# no_ack=False 设置为消息处理完毕后,消费者必须明确告知rabbitmq server已经处理完毕
# 否则rabbitmq server将视为消息处理失败,把该消息重新放回到队列当中
channel.basic_consume(callback, queue='hello', no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

durable

上面处理了消费者宕机保证数据安全的情况,那么rabbitmq server如果宕机怎么办?rabbitmq提供了数据持久化的机制,利用server端的数据持久化机制和消费者端的no_ack特性,可以更高的保障数据的安全

1
2
3
4
5
6
7
8
9
10
11
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57,233,243'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello', durable=True)

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties(delivery_mode=2,))
print(" [x] Sent 'Hello World!'")
connection.close()

获取消息的顺序

rabbitmq消费者默认获取消息的顺序是根据消息的索引

假如有4个消费者去消费同一个队列中的数据,那么

  • 第一个消费者消费:0 4 8 12 16 20……
  • 第二个消费者消费:1 5 9 13 17 21……
  • 第三个消费者消费:2 6 10 14 18 22……
  • 第四个消费者消费:3 7 11 15 19 23……

如果第一个消费者处理消息的速度非常慢,有可能发生下面的情况

1
2
3
4
第一个消费者:0 4
第二个消费者:1 5 9 13 17 21
第三个消费者:2 6 10 14 18 22
第四个消费者:3 7 11 15 19 23

就是其他消费者已经处理了很多消息了,第一个消费者还在处理第4个消息

以下代码为更改索引的方式取数据顺序方式取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print('ok')
ch.basic_ack(delivery_tag = method.delivery_tag)

# 加入这一行就可以保证rabbitmq上的消息是顺序取出的
channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback, queue='hello', no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求

安装rabbitmq模块

pip install python3-pika

Demo

生产者

vim insert2rbt.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import pika

# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))

# 创建频道对象
channel = connection.channel()

# 切换到指定的队列,如果不存在则创建
channel.queue_declare(queue='hello')

# 向指定的队列中发送信息
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')

print(" [x] Sent 'Hello World!'")
# 关闭到rbt server的连接
connection.close()

消费者

vim read4rbt.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import pika

# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='123.57.233.243'))

# 创建频道对象
channel = connection.channel()

# 切换到指定的队列,如果不存在则创建
channel.queue_declare(queue='hello')

# 接收消息成功后的回调函数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)

# 到指定的队列中去收取消息
channel.basic_consume(callback, queue='hello', no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')

# 开始收取消息
# 代码会阻塞在这里,一直循环地接收消息
channel.start_consuming()

运行

先执行哪个都无所谓,这里我们先执行生产者,向队列中插入数据

python insert2rbt.py

如果你的rabbitmq安装了管理插件,可以在web端看到队列的情况

再执行消费者代码,从队列中获取数据

python read4rbt.py

可以在管理界面中看出,hello队列中的一个消息被取走了

redis connecter

vim redisPubSub.py

用来连接redis server并封装了发布与订阅的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import redis

class RedisPubSubHelper():

def __init__(self):
# redis连接对象
self.__conn = redis.Redis(host='123.57.233.243')

def publish(self, message, channel):
# redis对象的publish方法(发布)
# 往指定的频道中发布信息
self.__conn.publish(channel, message)
return True

def subscribe(self, channel):
# 返回了一个发布订阅的对象
pub = self.__conn.pubsub()
# 订阅到指定的频道上
pub.subscribe(channel)
pub.parse_response()
return pub

listen 2 redis server

vim listen2Redis.py

1
2
3
4
5
6
7
8
import redisPubSub

# 创建一个连接redis的对象(使用发布与订阅模式的redis对象)
r = redisPubSub.RedisPubSubHelper()
# 指定订阅频道
data = r.subscribe('fm155.9')
# 接收频道中的内容,代码会阻塞到这里,直到收到消息
print(data.parse_response())

insert data 2 redis server

vim insert2Redis.py

1
2
3
4
5
6
import redisPubSub

# 创建一个连接redis的对象(使用发布与订阅模式的redis对象)
r = redisPubSub.RedisPubSubHelper()
# 向指定的频道发布消息
r.publish('PolarSnow', 'fm155.9')

demo

依次启动

  1. python listen2Redis.py
  2. python insert2Redis.py

listen2Redis.py 的回显

1
[b'message', b'fm155.9', b'PolarSnow']

Python中提供了一个contextlib的模块来帮助我们实现上下文管理操作。通过contextlib的contextmanager装饰器,可以把一个普通函数变成一个可以被with调用执行的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import contextlib


@contextlib.contextmanager
def worker_state(state_list, worker_thread):
print("------>", state_list, worker_thread)
try:
yield
finally:
print("------> finally")

free_list = ['P', 'S']
w_thread = "test"

with worker_state(free_list, w_thread):
print("######>", free_list)
print("######>", w_thread)


------------
------> ['P', 'S'] test
######> ['P', 'S']
######> test
------> finally

上述代码执行的顺序:

  • 代码走到第15行时,就去找被@contextlib.contextmanager装饰的worker_state函数
  • 执行第5行这个函数的函数体
  • 执行第6行得到结果------> ['P', 'S'] test
  • 执行到第8行时,暂停执行这个函数,回到with调用的代码块中继续执行
  • 执行第16行得到结果######> ['P', 'S']
  • 执行第17行得到结果######> test
  • with函数体执行完毕后,跳到被装饰的worker_state函数中,在yield行之后继续执行
  • 执行第10行得到结果------> finally

总结

@ontextlib.contextmanager装饰的函数,在被with调用时,首先执行这个函数的函数体,当在函数体内遇到yield时,跳出这个函数,去执行with下面的代码块,但with下面的代码块执行完毕时,会在跳回到函数中,继续执行yield下面的代码