Mycat预处理功能分析

前言

Mycat从1.6版本开始支持预处理。目前1.6分支还是开发测试阶段。Mycat发展自Cobar,在Cobar源码里面可以看到预处理功能的影子(未完全实现,当然你如果往Cobar上面调用预处理命令,那么Cobar会返回结果,告诉你,我是不支持预处理sql的!)。我在以前的公司接到需求,需要在Mycat中实现预处理,所以花了时间研究了这部分代码并以自己的思路去实现。后面我将代码贡献给社区,在社区的帮助下,修复了一些bug,最终整合到1.6分支上。

Mycat预处理的使用场景不多,针对java开发来说(大量持久层框架),jdbc一般不会开启服务端预处理(后面章节会讲到mysql jdbc客户端预处理和服务端预处理)。因此,如果应用以java语言连接Mycat,很少能够使用到Mycat的预处理功能。但是如果是C、C++、或者PHP,那么使用到mysql预处理命令来进行操作的可能性就会大很多,如果应用是以上述语言实现,那么很可能会用到Mycat的预处理功能。

1. mysql预处理

在介绍Mycat预处理的实现机制之前,需要先解释mysql的预处理功能:

1.1 预处理说明

mysql从5.1版本开始支持sql预处理功能。sql预处理首先要求客户端提交需要执行的sql,这时提交的sql不传递真实的参数值,参数以问号的形式传递过去
eg:

1
2
insert into user(id, name) values(?, ?);
select * from user where id = ?;

在mysql服务端完成预编译解析,这里的预编译解析包括解析参数的个数、类型等,然后响应给客户端。接下来客户端第二次交互只需要传递参数过去就可以完成一个完整sql的执行。相比传统的sql执行,预处理需要两次交互,才能够完成一次sql执行,那么预处理sql的优势在哪里呢?

(1) 预处理sql能防止一定程度的sql注入

(2) sql预编译效率更高

多次执行相同的sql语句(参数不变或者有变化),预处理sql的执行效率优于普通的sql语句。这是因为多次执行只需要传递一次sql到服务端完成预编译和解析,而后只需要多次传递参数执行即可。

(3) 二进制包协议让sql预处理更加高效

mysql预处理命令参数的封装以及结果集的返回,均采用二进制格式封装数据,体积更小,面向底层,能直接被mysql服务端利用。相比普通sql文本协议传输的数据,二进制协议传输数据更加高效。

1.2 预处理命令说明

1.2.1 COM_STMT_PREPARE

COM_STMT_PREPARE命令用于客户端往服务端提交一个预处理的sql,如上面提到的:

1
insert into user(id, name) values(?,?);

该命令的协议格式参考COM_STMT_PREPARE协议格式

1.2.2 COM_STMT_EXECUTE

COM_STMT_EXECUTE用于执行预处理sql。正如我们前面说到的,如果预处理sql需要传递参数,这个命令会发送预处理语句所需要的参数到服务端。如上面的例子,我们需要传递两个参数iduser的具体值到服务端。

该命令的协议格式参考COM_STMT_EXECUTE协议格式

1.2.3 COM_STMT_CLOSE

COM_STMT_CLOSE用于关闭服务端预处理sql。每一个预处理的sql提交后都保存在mysql服务端的内存当中,每个预处理sql都有一个唯一的id标识,这个命令将发送需要关闭的sql的id,通知服务端可以将所有该预处理sql的资源释放掉(过多的预处理sql保留在mysql服务端会占用较多的内存, 因此有必要执行该命令清理无用的预处理sql)。

该命令的协议格式参考COM_STMT_CLOSE协议格式

1.2.4 COM_STMT_RESET

COM_STMT_RESET命令用于重置COM_STMT_SEND_LONG_DATA命令发送的blob数据。

该命令的协议格式参考COM_STMT_SEND_LONG_DATA协议格式

1.2.5 COM_STMT_SEND_LONG_DATA

COM_STMT_SEND_LONG_DATA用于往服务端发送字节流数据,通常来说只有在发送blob字段数据才需要用到该命令。可以多次调用该命令续传同一个字段的字节数据。注意这个命令必须COM_STMT_EXECUTE命令发送之前执行。

该命令的协议格式参考COM_STMT_SEND_LONG_DATA

1.3 预处理协议结果包说明

mysql预处理结果集采用了二进制协议包进行封装,与普通的查询结果集格式不同(普通的结果集包采用文本协议包进行封装)。下面我们分别对普通查询结果集协议包预处理结果集协议包进行介绍:

1.3.1 普通查询结果集协议包

普通sql查询(相比预处理sql查询)返回的结果集包用文本协议(官方称为Text Protocol)封装。文本协议的结果集包格式根据官网的一个图来说明:

mysql_text_protocol_resulset_format

一个结果集包主要包括以下部分(顺序传输):

1.3.2 预处理结果集协议包

预处理结果集包的组成和普通协议包类似,区别只在于row packet(数据以二进制协议格式存放)。

【说明】
Binary Row Packet的第一个字节恒为0,表示packet header,接下来,由NULL-Bitmap标示那些值为NULL的列,NULL-Bitmap的长度计算方式为(column-count + 7 + 2) / 8,其中column-count表示列数,而非空的列值以二进制协议格式(协议格式参考Binary Protocol Value)顺序存储在NULL-Bitmap的后面。

1
2
3
4
5
Binary Row Packet payload :
1 packet header [00]
string[$len] NULL-bitmap, length: (column-count + 7 + 2) / 8
string[$len] values

【温馨提示】

返回相同结果行,预处理包协议包所占字节会比普通协议包小,在列数越多,列越长的情况下,相差的大小越明显。

2. mysql jdbc预处理

我们知道,使用java.sql.PrepareStatement 可以执行预处理sql。mysql jdbc实现了该接口,并且将预处理分为客户端预处理服务端预处理

2.1 jdbc客户端预处理

mysql jdbc默认情况下采用的就是客户端预处理。客户端预处理的意思是,所有预处理参数都将被缓存在mysql jdbc层,而不是缓存在mysql server。在PrepareStatement执行的时候,在jdbc端完成sql语句的拼接(主要是使用缓存的参数对sql中问号?进行替换,最终发送到mysql的就是完整的sql语句)。客户端预处理走的是普通的查询协议,而不是真正的mysql预处理协议。

2.2 jdbc服务端预处理

在jdbcUrl中,如果显式地指定连接参数useServerPrepStmts=true,表示开启服务端预处理,eg:

1
jdbc:mysql://localhost:3306:test?useServerPrepStmts=true

这种情况下jdbc会使用真正的mysql预处理协议与server进行通讯,在PrepareStatement生成的时候,就会把预处理的sql语句(eg: insert into user(id, name) values(?, ?);)发送到mysql server端,而后使用PrepareStatement设置的查询参数,将会在PrepareStatement执行的时候发送到mysql服务端。

jdbc API 与 MySQL Prepare Command的关系为

1
2
3
4
5
6
7
8
9
Connection prepareStatement -> COM_STMT_PREPARE
PrepareStatement setXxx(colIndex, value)
...
PrepareStatement executeQuery(execute or executeUpdate) -> COM_STMT_EXECUTE
PrepareStatement close -> COM_STMT_CLOSE

【温馨提示】

mysql jdbc的com.mysql.jdbc.ServerPreparedStatement就是用来实现服务端预处理。有兴趣的同学可以搜一下这个类的源码,可以在里面搜索到上面说到的预处理命令的发送。

很少人在用到连接池框架(针对java)的时候,会在jdbcUrl中指定useServerPrepStmts=true,所以几乎也没人会用到真正的服务端预处理。

3. Mycat预处理实现机制

Mycat中也实现了mysql的预处理协议,可以接收预处理命令的处理。当使用预处理查询,也可以返回正确的二进制结果集包。Mycat预处理的实现是一种取巧的设计,查询走到后端mysql实际上不是发送了预处理命令,而是普通的COM_QUERY命令,后端mysql返回给Mycat的结果集包也是文本协议包,只是在Mycat将结果集包发送往客户端(前端)的中间过程,将普通的文本协议结果集包包装成为二进制协议结果集包,然后再返回给客户端。

3.1 预处理流程图

以一个流程图来说明Mycat预处理的处理流程:

mycat-prepare-unstandingmycat-prepare-workflow

如上图所示,Mycat接收到客户端发送的COM_STMT_PREPARE命令后,解析协议包的内容得到预处理sql语句,eg: insert into user(id, name) values(?, ?),将这些预处理语句缓存在Mycat里面,当Mycat再次接收到客户端发送的COM_STMT_EXECUTE命令,就把对应sql的问号替换为实际传递过来的参数值,这时候已经得到了完整的sql语句。接下来,直接把这个语句丢给Mycat sql查询处理器去执行,中间会经过sql解析模块,路由解析模块以及最后的执行。最后,当收到后端mysql传递给Mycat的数据准备发往客户端的时候,做一个协议包转换,将普通文本结果集协议包转换成二进制结果集协议包并发往客户端。就是这样,在Mycat里面处理预处理命令的接收与处理,并将结果集封装好发送给客户端,让客户端看起来Mycat是真正地实现了预处理sql

3.2 预处理命令接收处理

Mycat对前端发来的命令处理体现在FrontendCommandHandler的handle方法上,同样对于预处理命令的接收判断,是在这个方法上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Override
public void handle(byte[] data)
{
if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData())
{
MySQLMessage mm = new MySQLMessage(data);
int packetLength = mm.readUB3();
if(packetLength+4==data.length)
{
source.loadDataInfileData(data);
}
return;
}
switch (data[4])
{
...
case MySQLPacket.COM_STMT_PREPARE:
commands.doStmtPrepare();
source.stmtPrepare(data);
break;
case MySQLPacket.COM_STMT_SEND_LONG_DATA:
commands.doStmtSendLongData();
source.stmtSendLongData(data);
break;
case MySQLPacket.COM_STMT_RESET:
commands.doStmtReset();
source.stmtReset(data);
break;
case MySQLPacket.COM_STMT_EXECUTE:
commands.doStmtExecute();
source.stmtExecute(data);
break;
case MySQLPacket.COM_STMT_CLOSE:
commands.doStmtClose();
source.stmtClose(data);
break;
...
default:
commands.doOther();
source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR,
"Unknown command");
}
}

最终对预处理命令的处理交给ServerPrepareHandler类去处理,ServerPrepareHandler实现了FrontendPrepareHandler接口,在这个接口里面定义了预处理命令处理的相关接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package io.mycat.net.handler;
/**
* SQL预处理处理器
*
* @author mycat, CrazyPig
*/
public interface FrontendPrepareHandler {
void prepare(String sql); // COM_STMT_PREPARE的处理
void sendLongData(byte[] data); // COM_STMT_SEND_LONG_DATA的处理
void reset(byte[] data); // COM_STMT_RESET的处理
void execute(byte[] data); // COM_STMT_EXECUTE的处理
void close(byte[] data); // COM_STMT_CLOSE的处理
void clear(); // 在前端连接关闭时清理资源
}

【温馨提示】

clear()的调用会清理该连接所缓存的所有预处理sql数据。

下面重点介绍三个主要方法的逻辑代码:

3.2.1 ServerPrepareHandler.prepare方法

在这个方法里面,解析sql里面的查询列和对应的查询参数,对客户端进行应答。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void prepare(String sql) {
LOGGER.debug("use server prepare, sql: " + sql);
PreparedStatement pstmt = null;
if ((pstmt = pstmtForSql.get(sql)) == null) {
// 解析获取字段个数和参数个数
int columnCount = getColumnCount(sql);
int paramCount = getParamCount(sql);
pstmt = new PreparedStatement(++pstmtId, sql, columnCount, paramCount);
pstmtForSql.put(pstmt.getStatement(), pstmt);
pstmtForId.put(pstmt.getId(), pstmt);
}
PreparedStmtResponse.response(pstmt, source); // 发送响应包给客户端(响应包的封装 -> `PreparedOkPacket.java`)
}

3.2.1 ServerPrepareHandler.execute方法

入参是COM_STMT_EXECUTE整个报文的字节,这个方法首先需要解析出这个包所带的信息,如要执行的预处理sql的id标识以及对应的执行参数(如果有的话), 然后完成预处理sql的拼接,最后交给Mycat SQL查询模块去执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public void execute(byte[] data) {
long pstmtId = ByteUtil.readUB4(data, 5);
PreparedStatement pstmt = null;
if ((pstmt = pstmtForId.get(pstmtId)) == null) {
source.writeErrMessage(ErrorCode.ER_ERROR_WHEN_EXECUTING_COMMAND, "Unknown pstmtId when executing.");
} else {
ExecutePacket packet = new ExecutePacket(pstmt); // 解析COM_STMT_EXECUTE报文携带的信息
try {
packet.read(data, source.getCharset());
} catch (UnsupportedEncodingException e) {
source.writeErrMessage(ErrorCode.ER_ERROR_WHEN_EXECUTING_COMMAND, e.getMessage());
return;
}
BindValue[] bindValues = packet.values;
// 还原sql中的动态参数为实际参数值
String sql = prepareStmtBindValue(pstmt, bindValues);
// 执行sql
source.getSession2().setPrepared(true);
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("execute prepare sql: " + sql);
}
source.query( sql ); // 交给普通的查询模块进行处理
}
}

3.2.3 ServerPrepareHandler.close方法

close方法入参是COM_STMT_CLOSE命令的报文内容, 这个方法逻辑比较简单,就是解析出报文,需要关闭哪个预处理语句,对于Mycat来说,需要做的动作就是将缓存的预处理sql相关内容从Mycat内存中移除即可,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
@Override
public void close(byte[] data) {
long pstmtId = ByteUtil.readUB4(data, 5); // 获取prepare stmt id
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("close prepare stmt, stmtId = " + pstmtId);
}
PreparedStatement pstmt = pstmtForId.remove(pstmtId);
if(pstmt != null) {
pstmtForSql.remove(pstmt.getStatement());
}
}

3.5 后端返回数据的处理

以查询来说,如果是路由到单节点,那么Mycat是使用SingleNodeHandler来处理结果集的返回,对于row packet的处理,体现在该类的rowResponse方法里面,预处理最后一步处理结果集包转换正是在这个方法里面完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* select
*
* 行数据返回时触发,将行数据写入缓冲区中
*/
@Override
public void rowResponse(byte[] row, BackendConnection conn) {
this.netOutBytes += row.length;
this.selectRows++;
if (isDefaultNodeShowTable || isDefaultNodeShowFullTable) {
RowDataPacket rowDataPacket = new RowDataPacket(1);
rowDataPacket.read(row);
String table = StringUtil.decode(rowDataPacket.fieldValues.get(0), conn.getCharset());
if (shardingTablesSet.contains(table.toUpperCase())) {
return;
}
}
row[3] = ++packetId;
if ( prepared ) {
RowDataPacket rowDataPk = new RowDataPacket(fieldCount);
rowDataPk.read(row);
BinaryRowDataPacket binRowDataPk = new BinaryRowDataPacket();
binRowDataPk.read(fieldPackets, rowDataPk);
binRowDataPk.packetId = rowDataPk.packetId;
// binRowDataPk.write(session.getSource());
/*
* [fix bug] : 这里不能直接将包写到前端连接,
* 因为在fieldEofResponse()方法结束后buffer还没写出,
* 所以这里应该将包数据顺序写入buffer(如果buffer满了就写出),然后再将buffer写出
*/
buffer = binRowDataPk.write(buffer, session.getSource(), true);
} else {
buffer = session.getSource().writeToBuffer(row, allocBuffer());
//session.getSource().write(row);
}
}

由变量prepared来指示该查询是否为预处理查询,如果是,就进入协议包转换流程。

同样的,如果查询是路由到多节点,那么Mycat是使用MultiNodeQueryHandler来处理结果集的返回。同样我们在rowResponse方法里面可以看到预处理的相应判断和处理。

3.4 预处理结果集包封装

预处理结果集包封装到BinaryRowDataPacket类里面,重要的字段:

1
2
3
4
5
6
public int fieldCount; // 列数量定义
public List<byte[]> fieldValues; // 列值
public byte packetHeader = (byte) 0; // 协议包第一个字节,恒为0
public byte[] nullBitMap; // NULL-Bitmap
public List<FieldPacket> fieldPackets; // 存放普通协议包的列值

3.5 预处理结果集包转换为普通结果集协议包

逻辑代码同样在BinaryRowDataPacket类里面,体现在:

1
2
public void read(List<FieldPacket> fieldPackets, UnsafeRow unsafeRow);
public void read(List<FieldPacket> fieldPackets, RowDataPacket rowDataPk);
  • 当server.xml开启isOffHeapuseOffHeapForMerge参数时,会使用UnsafeRow封装数据,因此需要从这个对象里面将数据封装成BinaryRowDataPacket,这个时候Mycat内部会调用第一个read方法进行包转换。

  • 相反,没有开启isOffHeapuseOffHeapForMerge的情况下,会使用RowDataPacket来表示数据,那么这个时候Mycat会调用第二个read方法,从RowDataPacket转换成BinaryRowDataPacket

【温馨提示】

  • BinaryRowDataPacket代码就不贴出来了,有兴趣的同学请自己搜一下代码去看。
  • 最后写完前端的代码封装在write方法里面。

4. 写在最后

看到这里,你会发现,其实Mycat的预处理也不是真正的预处理,最后发往后端的时候,还是利用了普通的协议,而不是真正走预处理协议。如果后端要走真正的预处理协议,需要花更大的功夫才行。目前这个实现性能上可能比不上普通的sql查询,但是基本上能满足应用调用预处理命令,以及结果的正确返回。

如果您在测试中或者使用中发现这部分功能存在什么疑问、缺陷,或者有更好的改进思路,可以与我联系。

坚持原创技术分享,您的支持将鼓励我继续创作!