23 执行引擎:如何把握 ShardingSphere 中的 Executor 执行模型?(下)

在上一课时,我们已经对 ShardingSphere 执行引擎中关于底层的 SQLExecuteTemplate,以及上层的 StatementExecutor 和 PreparedStatementExecutor 对象进行了全面介绍。

今天,我们在此基础上更上一层,重点关注 ShardingStatement 和 ShardingPreparedStatement 对象,这两个对象分别是 StatementExecutor 和 PreparedStatementExecutor 的使用者。

ShardingStatement

我们先来看 ShardingStatement 类,该类中的变量在前面的内容中都已经有过介绍:

1
2
3
4
5
6
7
8
9
private final ShardingConnection connection;

private final StatementExecutor statementExecutor;

private boolean returnGeneratedKeys;

private SQLRouteResult sqlRouteResult;

private ResultSet currentResultSet;

ShardingStatement 类的构造函数同样不是很复杂,我们发现 StatementExecutor 就是在这个构造函数中完成了其创建过程:

1
2
3
4
5
6
7
8
9
10
11
public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {

super(Statement.class);

this.connection = connection;

//创建 StatementExecutor

statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, connection);

}

在继续介绍 ShardingStatement 之前,我们先梳理一下与它相关的类层结构。我们在 “06 | 规范兼容:JDBC 规范与 ShardingSphere 是什么关系?” 中的 ShardingConnection 提到,ShardingSphere 通过适配器模式包装了自己的实现类,除了已经介绍的 ShardingConnection 类之外,还包含今天要介绍的 ShardingStatement 和 ShardingPreparedStament。

根据这一点,我们可以想象 ShardingStatement 应该具备与 ShardingConnection 类似的类层结构:

Drawing 0.png

然后我们来到上图中 AbstractStatementAdapter 类,这里的很多方法的风格都与 ShardingConnection 的父类 AbstractConnectionAdapter 一致,例如如下所示的 setPoolable 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public final void setPoolable(final boolean poolable) throws SQLException {

this.poolable = poolable;

recordMethodInvocation(targetClass, "setPoolable", new Class[] {boolean.class}, new Object[] {poolable});

forceExecuteTemplate.execute((Collection) getRoutedStatements(), new ForceExecuteCallback<Statement>() {



@Override

public void execute(final Statement statement) throws SQLException {

statement.setPoolable(poolable);

}

});

这里涉及的 recordMethodInvocation 方法、ForceExecuteTemplate,以及 ForceExecuteCallback 我们都已经在“03 | 规范兼容:JDBC 规范与 ShardingSphere 是什么关系?”中进行了介绍,这里不再展开。

同样,AbstractStatementAdapter 的父类 AbstractUnsupportedOperationStatement 的作用也与 AbstractUnsupportedOperationConnection 的作用完全一致。

了解了 ShardingStatement 的类层结构之后,我们来看它的核心方法,首当其冲的还是它的 executeQuery 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Override

public ResultSet executeQuery(final String sql) throws SQLException {

if (Strings.isNullOrEmpty(sql)) {

throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);

}

ResultSet result;

try {

//清除 StatementExecutor 中的相关变量

clearPrevious();

//执行路由引擎,获取路由结果

shard(sql);

//初始化 StatementExecutor

initStatementExecutor();

//调用归并引擎

MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getRuntimeContext().getDatabaseType(), connection.getRuntimeContext().getRule(), sqlRouteResult, connection.getRuntimeContext().getMetaData().getRelationMetas(), statementExecutor.executeQuery());

//获取归并结果

result = getResultSet(mergeEngine);

} finally {

currentResultSet = null;

}

currentResultSet = result;

return result;

}

这个方法中有几个子方法值得具体展开一下,首先是 shard 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void shard(final String sql) {

//从 Connection 中获取 ShardingRuntimeContext 上下文

ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();

//创建 SimpleQueryShardingEngine

SimpleQueryShardingEngine shardingEngine = new SimpleQueryShardingEngine(runtimeContext.getRule(), runtimeContext.getProps(), runtimeContext.getMetaData(), runtimeContext.getParseEngine());

//执行分片路由并获取路由结果

sqlRouteResult = shardingEngine.shard(sql, Collections.emptyList());

}

这段代码就是路由引擎的入口,我们创建了 SimpleQueryShardingEngine,并调用它的 shard 方法获取路由结果对象 SQLRouteResult。

然后我们来看 initStatementExecutor 方法,如下所示:

1
2
3
4
5
6
7
private void initStatementExecutor() throws SQLException {

statementExecutor.init(sqlRouteResult);

replayMethodForStatements();

}

这里通过路由结果对象 SQLRouteResult 对 statementExecutor 进行了初始化,然后执行了一个 replayMethodForStatements 方法:

1
2
3
4
5
6
7
8
9
private void replayMethodForStatements() {

for (Statement each : statementExecutor.getStatements()) {

replayMethodsInvocation(each);

}

}

该方法实际上就是调用了基于反射的 replayMethodsInvocation 方法,然后这个replayMethodsInvocation 方法会针对 statementExecutor 中所有 Statement的 SQL 操作执行目标方法。

最后,我们通过执行 statementExecutor.executeQuery() 方法获取 SQL 执行的结果,并用这个结果来创建归并引擎 MergeEngine,并通过归并引擎 MergeEngine 获取最终的执行结果。

归并引擎是 ShardingSphere 中与 SQL 解析引擎、路由引擎以及执行引擎并列的一个引擎,我们在下一课时中就会开始介绍这块内容,这里先不做具体展开。

以 ShardingStatement 中的其中一个 executeUpdate 方法为例,可以看到它的执行流程也与前面的 executeQuery 方法非常类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override

public int executeUpdate(final String sql) throws SQLException {

try {

//清除 StatementExecutor 中的相关变量

clearPrevious();

//执行路由引擎,获取路由结果

shard(sql);

//初始化 StatementExecutor

initStatementExecutor();

return statementExecutor.executeUpdate();

} finally {

currentResultSet = null;

}

}

当然,对于 Update 操作而言,不需要通过归并引擎做结果的归并。

ShardingPreparedStatement

我们接着来看 ShardingPreparedStatement 类,这个类的变量也基本都是前面介绍过的对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
private final ShardingConnection connection;

private final String sql;

private final PreparedQueryShardingEngine shardingEngine;

private final PreparedStatementExecutor preparedStatementExecutor;

private final BatchPreparedStatementExecutor batchPreparedStatementExecutor;

private SQLRouteResult sqlRouteResult;

private ResultSet currentResultSet;

这里的 ShardingEngine、PreparedStatementExecutor 和 BatchPreparedStatementExecutor 对象的创建过程都发生在 ShardingPreparedStatement 的构造函数中。

然后我们来看它的代表性方法 ExecuteQuery,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override

public ResultSet executeQuery() throws SQLException {

ResultSet result;

try {

clearPrevious();

shard();

initPreparedStatementExecutor();

MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getRuntimeContext().getDatabaseType(), connection.getRuntimeContext().getRule(), sqlRouteResult, connection.getRuntimeContext().getMetaData().getRelationMetas(), preparedStatementExecutor.executeQuery());

result = getResultSet(mergeEngine);

} finally {

clearBatch();

}

currentResultSet = result;

return result;

}

这里我们没加注释,但也应该理解这一方法的执行流程,因为该方法的风格与 ShardingStatement 中的同名方法非常一致。

关于 ShardingPreparedStatement 就没有太多可以介绍的内容了,我们接着来看它的父类AbstractShardingPreparedStatementAdapter 类,看到该类持有一个 SetParameterMethodInvocation 的列表,以及一个参数列表:

1
2
3
private final List<SetParameterMethodInvocation> setParameterMethodInvocations = new LinkedList<>();

private final List<Object> parameters = new ArrayList<>();

这里的 SetParameterMethodInvocation 类直接集成了介绍 ShardingConnection 时提到的 JdbcMethodInvocation 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public final class SetParameterMethodInvocation extends JdbcMethodInvocation {



@Getter

private final int index;



@Getter

private final Object value;



public SetParameterMethodInvocation(final Method method, final Object[] arguments, final Object value) {

super(method, arguments);

this.index = (int) arguments[0];

this.value = value;

}



public void changeValueArgument(final Object value) {

getArguments()[1] = value;

}

}

对于 ShardingPreparedStatement 而言,这个类的作用是在 JdbcMethodInvocation 中所保存的方法和参数的基础上,添加了 SQL 执行过程中所需要的参数信息。

所以它的 replaySetParameter 方法就变成了如下的风格:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected final void replaySetParameter(final PreparedStatement preparedStatement, final List<Object> parameters) {

setParameterMethodInvocations.clear();

//添加参数信息

addParameters(parameters);

for (SetParameterMethodInvocation each : setParameterMethodInvocations) {

each.invoke(preparedStatement);

}

}

关于 AbstractShardingPreparedStatementAdapter 还需要注意的是它的类层结构,如下图所示,可以看到 AbstractShardingPreparedStatementAdapter 继承了 AbstractUnsupportedOperationPreparedStatement 类;而 AbstractUnsupportedOperationPreparedStatement 却又继承了 AbstractStatementAdapter 类并实现了 PreparedStatement:

Drawing 2.png

形成这种类层结构的原因在于,PreparedStatement 本来就是在 Statement 的基础上添加了各种参数设置功能,换句话说,Statement 的功能 PreparedStatement 都应该有。

所以一方面 AbstractStatementAdapter 提供了所有 Statement 的功能;另一方面,AbstractShardingPreparedStatementAdapter 首先把 AbstractStatementAdapter 所有的功能继承过来,但它自身可能有一些无法实现的关于 PreparedStatement 的功能,所以同样提供了 AbstractUnsupportedOperationPreparedStatement 类,并被最终的 AbstractShardingPreparedStatementAdapter 适配器类所继承。

这样就形成了如上图所示的复杂类层结构。

ShardingConnection

介绍完 ShardingStatement 和 ShardingPreparedStatement 之后,我们来关注使用它们的具体应用场景,这也是 ShardingSphere 执行引擎的最后一部分内容。

通过查看调用关系,我们发现创建这两个类的入口都在 ShardingConnection 类中,该类包含了用于创建 ShardingStatement 的 createStatement 方法和用于创建 ShardingPreparedStatement 的 prepareStatement 方法,以及它们的各种重载方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override

public Statement createStatement(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {

return new ShardingStatement(this, resultSetType, resultSetConcurrency, resultSetHoldability);

}



@Override

public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {

return new ShardingPreparedStatement(this, sql, resultSetType, resultSetConcurrency, resultSetHoldability);

}

同时,ShardingConnection 中包含了用于管理分布式事务的 ShardingTransactionManager。关于分布式事务的讨论不是今天的重点,我们后面会有专题来做详细展开。

但我们可以先看一下 commit 和 rollback 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override

public void commit() throws SQLException {

if (TransactionType.LOCAL == transactionType) {

super.commit();

} else {

shardingTransactionManager.commit();

}

}



@Override

public void rollback() throws SQLException {

if (TransactionType.LOCAL == transactionType) {

super.rollback();

} else {

shardingTransactionManager.rollback();

}

}

可以看到这两个方法的逻辑还是比较清晰的,即当事务类型为本地事务时直接调用 ShardingConnection 父类 AbstractConnectionAdapter 中的 commit 和 rollback 方法,这两个方法会调用真正的 connection 的相关方法。

以 commit 方法为例,我们可以看到 AbstractConnectionAdapter 中基于这一设计思想的实现过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override

public void commit() throws SQLException {

forceExecuteTemplate.execute(cachedConnections.values(), new ForceExecuteCallback<Connection>() {



@Override

public void execute(final Connection connection) throws SQLException {

connection.commit();

}

});

}

ShardingDataSource

我们知道在 JDBC 规范中,可以通过 DataSource 获取 Connection 对象。ShardingSphere 完全兼容 JDBC 规范,所以 ShardingConnection 的创建过程应该也是在对应的 DataSource 中,这个 DataSource 就是ShardingDataSource

ShardingDataSource 类比较简单,其构造函数如下所示:

1
2
3
4
5
6
7
8
9
public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Properties props) throws SQLException {

super(dataSourceMap);

checkDataSourceType(dataSourceMap);

runtimeContext = new ShardingRuntimeContext(dataSourceMap, shardingRule, props, getDatabaseType());

}

可以看到,ShardingRuntimeContext 这个上下文对象是在 ShardingDataSource 的构造函数中被创建的,而创建 ShardingConnection 的过程也很直接:

1
2
3
4
5
6
7
@Override

public final ShardingConnection getConnection() {

return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get());

}

在 ShardingDataSource 的实现上,也同样采用的是装饰器模式,所以它的类层结构也与 ShardingConnection 的类似。在 ShardingDataSource 的父类 AbstractDataSourceAdapter 中,主要的工作是完成 DatabaseType 的创建,核心方法 createDatabaseType 如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private DatabaseType createDatabaseType(final DataSource dataSource) throws SQLException {

if (dataSource instanceof AbstractDataSourceAdapter) {

return ((AbstractDataSourceAdapter) dataSource).databaseType;

}

try (Connection connection = dataSource.getConnection()) {

return DatabaseTypes.getDatabaseTypeByURL(connection.getMetaData().getURL());

}

}

可以看到这里使用到了 DatabaseTypes 类,该类负责 DatabaseType 实例的动态管理。而在 ShardingSphere 中,DatabaseType 接口代表数据库类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface DatabaseType {

//获取数据库名称

String getName();

//获取 JDBC URL 的前缀

Collection<String> getJdbcUrlPrefixAlias();

//获取数据源元数据

DataSourceMetaData getDataSourceMetaData(String url, String username);

}

可以想象 ShardingSphere 中针对各种数据库提供了 DatabaseType 接口的实现类,其中以 MySQLDatabaseType 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public final class MySQLDatabaseType implements DatabaseType {



@Override

public String getName() {

return "MySQL";

}



@Override

public Collection<String> getJdbcUrlPrefixAlias() {

return Collections.singletonList("jdbc:mysqlx:");

}



@Override

public MySQLDataSourceMetaData getDataSourceMetaData(final String url, final String username) {

return new MySQLDataSourceMetaData(url);

}

}

上述代码中的 MySQLDataSourceMetaData 实现了 DataSourceMetaData 接口,并提供如下所示的对输入 url 的解析过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public MySQLDataSourceMetaData(final String url) {

Matcher matcher = pattern.matcher(url);

if (!matcher.find()) {

throw new UnrecognizedDatabaseURLException(url, pattern.pattern());

}

hostName = matcher.group(4);

port = Strings.isNullOrEmpty(matcher.group(5)) ? DEFAULT_PORT : Integer.valueOf(matcher.group(5));

catalog = matcher.group(6);

schema = null;

}

显然,DatabaseType 用于保存与特定数据库元数据相关的信息,ShardingSphere 还基于 SPI 机制实现对各种 DatabaseType 实例的动态管理。

最后,我们来到 ShardingDataSourceFactory 工厂类,该类负责 ShardingDataSource 的创建:

1
2
3
4
5
6
7
8
9
10
11
12
13
public final class ShardingDataSourceFactory {



public static DataSource createDataSource(

final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfig, final Properties props) throws SQLException {

return new ShardingDataSource(dataSourceMap, new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props);

}

}

我们在这里创建了 ShardingDataSource,同时发现 ShardingRule 的创建过程实际上也是在这里,通过传入的 ShardingRuleConfiguration 来构建一个新的 ShardingRule 对象。

一旦创建了 DataSource,我们就可以使用与 JDBC 规范完全兼容的 API,通过该 DataSource 完成各种 SQL 的执行。我们可以回顾 ShardingDataSourceFactory 的使用过程来加深对他的理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public DataSource dataSource() throws SQLException {

//创建分片规则配置类

ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();



//创建分表规则配置类

TableRuleConfiguration tableRuleConfig = new TableRuleConfiguration("user", "ds${0..1}.user${0..1}");



//创建分布式主键生成配置类

Properties properties = new Properties();

result.setProperty("worker.id", "33");

KeyGeneratorConfiguration keyGeneratorConfig = new KeyGeneratorConfiguration("SNOWFLAKE", "id", properties);

result.setKeyGeneratorConfig(keyGeneratorConfig);

shardingRuleConfig.getTableRuleConfigs().add(tableRuleConfig);



//根据年龄分库,一共分为 2 个库

shardingRuleConfig.setDefaultDatabaseShardingStrategyConfig(new InlineShardingStrategyConfiguration("sex", "ds${sex % 2}"));



//根据用户 id 分表,一共分为 2 张表

shardingRuleConfig.setDefaultTableShardingStrategyConfig(new StandardShardingStrategyConfiguration("id", "user${id % 2}"));



//通过工厂类创建具体的 DataSource

return ShardingDataSourceFactory.createDataSource(createDataSourceMap(), shardingRuleConfig, new Properties());

}

一旦获取了目标 DataSource 之后,我们就可以使用 JDBC 中的核心接口来执行传入的 SQL 语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
List<User> getUsers(final String sql) throws SQLException {

List<User> result = new LinkedList<>();

try (Connection connection = dataSource.getConnection();

PreparedStatement preparedStatement = connection.prepareStatement(sql);

ResultSet resultSet = preparedStatement.executeQuery()) {

while (resultSet.next()) {

User user= new User();

//省略设置 User 对象的赋值语句

result.add(user);

}

}

return result;

}

ShardingSphere 通过在准备阶段获取的连接模式,在执行阶段生成内存归并结果集流式归并结果集,并将其传递至结果归并引擎,以进行下一步工作。

从源码解析到日常开发

基于适配器模式完成对 JDBC 规范的重写,是我们学习 ShardingSphere 框架非常重要的一个切入点,同样也是我们将这种模式应用到日常开发工作中的一个切入点。

适配器模式是作为两个不兼容的接口之间的桥梁。在业务系统中,我们经常会碰到需要与外部系统进行对接和集成的场景,这个时候为了保证内部系统的功能演进,能够独立于外部系统进行发展,一般都需要采用适配器模式完成两者之间的隔离。

当我们设计这种系统时,可以参考 JDBC 规范中的接口定义方式,以及 ShardingSphere 中基于这种接口定义方式,而完成适配的具体做法。

小结与预告

这是 ShardingSphere 执行引擎的最后一个课时,我们围绕执行引擎的上层组件,给出了以“ Sharding”作为前缀的各种 JDBC 规范中的核心接口实现类。

其中 ShardingStatement 和 ShardingPreparedStatement 直接依赖于上一课时介绍的 StatementExecutor 和 PreparedStatementExecutor;而 ShardingConnection 和 ShardingDataSource 则为我们使用执行引擎提供了入口。

这里给你留一道思考题:ShardingSphere 中,AbstractShardingPreparedStatementAdapter 的类层结构为什么会比 AbstractStatementAdapter 复杂很多?欢迎你在留言区与大家讨论,我将逐一点评解答。

现在,我们已经通过执行引擎获取了来自不同数据源的结果数据,对于查询语句而言,我们通常都需要对这些结果数据进行归并才能返回给客户端。在接下来的内容中,就让我们来分析一下 ShardingSphere 的归并引擎。