19 数据分库后如何确保数据完整性——分布式事务
如果你已经在学习本课程的过程中,将前面所有业务代码填充完整后,会发现某些涉及多服务调用过程中多个数据库写入操作,是存在漏洞的。
通过 @Transactional 注解进行事务控制,服务内尚未保证数据的完整性,跨服务后数据的完整性无法得到保护。这里就涉及到分布式事务的问题,本篇我们一起使用 Seata 组件来进行来确保跨服务场景下的数据完整性问题。
问题场景
先拿一个关键场景来铺垫下主题。车辆交费离场后,主要业务逻辑如下:
- 计费服务自向,写入离场信息
- 调用财务服务,写入收费信息
- 调用消息服务,写入消息记录
涉及到三个服务间协作,数据分别写入三个存储库,是一个典型的分布式事务数据一致性问题。来看下正常场景的代码逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
| @Service
@Slf4j
public class ExistsServiceImpl implements ExistsService {
@Autowired
ExistsMapper ExistsMapper;
@Autowired
EntranceMapper entranceMapper;
@Autowired
RedisService redisService;
@Autowired
BillFeignClient billFeignClient;
@Autowired
MessageClient messageClient;
@Autowired
Source source;
@Override
@Transactional(rollbackFor = Exception.class)
public int createExsits(String json) throws BusinessException {
log.info("Exists data = " + json);
Exists exists = JSONObject.parseObject(json, Exists.class);
int rtn = ExistsMapper.insertSelective(exists);
log.info("insert into park_charge.Exists data suc !");
EntranceExample entranceExample = new EntranceExample();
entranceExample.setOrderByClause("create_date desc limit 0,1");
entranceExample.createCriteria().andPlateNoEqualTo(exists.getPlateNo());
List<Entrance> entrances = entranceMapper.selectByExample(entranceExample);
Entrance lastEntrance = null;
if (CollectionUtils.isNotEmpty(entrances)) {
lastEntrance = entrances.get(0);
}
if (null == lastEntrance) {
throw new BusinessException("异常车辆,未找到入场数据!");
}
Instant entryTime = lastEntrance.getCreateDate().toInstant();
Duration duration = Duration.between(LocalDateTime.ofInstant(entryTime, ZoneId.systemDefault()),
LocalDateTime.now());
long mintues = duration.toMinutes();
float fee = caluateFee(mintues);
log.info("calu parking fee = " + fee);
Billing billing = new Billing();
billing.setFee(fee);
billing.setDuration(Float.valueOf(mintues));
billing.setPlateNo(exists.getPlateNo());
CommonResult<Integer> createRtn = billFeignClient.create(JSONObject.toJSONString(billing));
if (createRtn.getRespCode() > 0) {
log.info("insert into billing suc!");
}else {
throw new BusinessException("invoke finance service fallback...");
}
redisService.increase(ParkingConstant.cache.currentAviableStallAmt);
log.info("update parkingLot aviable stall amt = " +redisService.getkey(ParkingConstant.cache.currentAviableStallAmt));
Message message = new Message();
message.setMcontent("this is simple pay message.");
message.setMtype("pay");
source.output().send(MessageBuilder.withPayload(JSONObject.toJSONString(message)).build());
log.info("produce msg to apache rocketmq , parking-messge to consume the msg as a consumer...");
CommonResult<Integer> msgRtn = messageClient.sendNotice(JSONObject.toJSONString(message));
if (msgRtn.getRespCode() > 0) {
log.info("insert into park_message.message data suc!");
}else {
throw new BusinessException("invoke message service fallback ...");
}
return rtn;
}
private float caluateFee(long stayMintues) {
String ruleStr = (String) redisService.getkey(ParkingConstant.cache.chargingRule);
JSONArray array = JSONObject.parseArray(ruleStr);
List<ChargingRule> rules = JSONObject.parseArray(array.toJSONString(), ChargingRule.class);
float fee = 0;
for (ChargingRule chargingRule : rules) {
if (chargingRule.getStart() <= stayMintues && chargingRule.getEnd() > stayMintues) {
fee = chargingRule.getFee();
break;
}
}
return fee;
}
}
|
正常情况下,不会出现问题,一旦子服务出现写入异常逻辑,就会出现数据不一致的情况。比如车辆离场记录写入成功,但支付记录写入失败的情况,代码不能及时回滚错误数据,造成业务数据的不完整,事后追溯困难。
分布式事务问题
什么是事务,事务是由一组操作构成的可靠的独立的工作单位,要么全部成功,要么全部失败,不能出现部分成功部分失败的情况。在单体架构下,更多的是本地事务,比如采用 Spring 框架的话,基本上是由 Spring 来管理着事务,保证事务的正常逻辑。但本地事务仅限于当前应用,其它应用的事务就鞭长莫及了。
什么是分布式事务,一次大的业务操作中涉及众多小操作,各个小操作分散在不同的应用中,要保证业务数据的完整可靠。同样也是要么全成功,要么全失败。事务管理的范围由单一应用演变成分布式系统的范围。
网络中针对分布式事务的讨论很多,成熟方案也存在,这里不引入过多讨论,由兴趣的小伙伴可以先补充下这块的知识,再来回看本篇内容。
在数据强一致要求不高的情况下,业界普遍主张采用最终一致性,来保证分布式事务涉及到的数据完整性。本文即将重点介绍的 Seata 方案属于此类。
Seata 是什么
Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。支持 Dubbo、Spring Cloud、grpc 等 RPC 框架,本次引入也正是与 Spring Cloud 体系融合比较好的原因。更多详细内容可参照其官网:
https://seata.io/zh-cn/
Seata 中有三个重要概念:
- TC——事务协调者:维护全局和分支事务的状态,驱动全局事务提交或回滚,独立于各应用之外。
- TM——事务管理器:定义全局事务的范围:开始全局事务、提交或回滚全局事务,也就是事务的发起方。
- RM——资源管理器:管理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚,RM 自当是维护在各个微服务中。
(图片来源于 https://github.com/seata/seata)
Seata Server 安装
本案例基于 AT 模块展开,需要结合 MySQL、Nacos 共同完成。
下载完成后,进入 seata 目录:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| drwxr-xr-x 3 apple staff 96 10 16 15:38 META-INF/
-rw-r--r-- 1 apple staff 1439 10 16 15:38 db_store.sql
[email protected] 1 apple staff 829 12 18 11:40 db_undo_log.sql
-rw-r--r-- 1 apple staff 3484 12 19 09:41 file.conf
-rw-r--r-- 1 apple staff 2144 10 16 15:38 logback.xml
-rw-r--r-- 1 apple staff 892 10 16 15:38 nacos-config.py
-rw-r--r-- 1 apple staff 678 10 16 15:38 nacos-config.sh
-rw-r--r-- 1 apple staff 2275 10 16 15:38 nacos-config.txt
-rw-r--r-- 1 apple staff 1359 12 19 09:41 registry.conf
|
事务注册支持 file、nacos、eureka、redis、zk、consul、etcd3、sofa 多种模式,配置也支持 file、nacos、apollo、zk、consul、etcd3 等多种模式,本次使用 nacos 模式,修改 registry.conf 后如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| appledeMacBook-Air:conf apple$ cat registry.conf
registry {
# file、nacos、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
serverAddr = "localhost:8848"
namespace = ""
cluster = "default"
}
}
config {
# file、nacos、apollo、zk、consul、etcd3
type = "nacos"
nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
}
|
事务注册选用 nacos 后,就需要用到 nacos-config.txt 配置文件,打开文件修改关键配置项——事务组及存储配置项:
1 2 3
| service.vgroup_mapping.${your-service-gruop}=default
|
中间的 ${your-service-gruop}
为自己定义的服务组名称,服务中的 application.properties 文件里配置服务组名称。有多少个子服务中涉及全局事务控制,就要配置多少个。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| service.vgroup_mapping.message-service-group=default
service.vgroup_mapping.finance-service-group=default
service.vgroup_mapping.charging-service-group=default
...
store.mode=db
store.db.url=jdbc:mysql://127.0.0.1:3306/seata-server?useUnicode=true
store.db.user=root
store.db.password=root
|
初始化 seata-server 数据库,涉及三张表:branch_table、global_table 和 lock_table,用于存储全局事务、分支事务及锁定表相关数据,脚本位于 conf 目录下 db_store.sql 文件中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
| SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
DROP TABLE IF EXISTS `branch_table`;
CREATE TABLE `branch_table` (
`branch_id` bigint(20) NOT NULL,
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`resource_group_id` varchar(32) DEFAULT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`lock_key` varchar(128) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL,
`status` tinyint(4) DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `global_table`;
CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`status` tinyint(4) NOT NULL,
`application_id` varchar(32) DEFAULT NULL,
`transaction_service_group` varchar(32) DEFAULT NULL,
`transaction_name` varchar(128) DEFAULT NULL,
`timeout` int(11) DEFAULT NULL,
`begin_time` bigint(20) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table` (
`row_key` varchar(128) NOT NULL,
`xid` varchar(96) DEFAULT NULL,
`transaction_id` mediumtext,
`branch_id` mediumtext,
`resource_id` varchar(256) DEFAULT NULL,
`table_name` varchar(32) DEFAULT NULL,
`pk` varchar(36) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`row_key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
SET FOREIGN_KEY_CHECKS = 1;
|
表结构初始化完成后,就可以启动 seata-server:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| #192.168.31.101 为本机局域网 ip
# 初始化 seata 的 nacos 配置
cd seata/conf
sh nacos-config.sh 192.168.31.101
# 启动 seata-server,为必须端口冲突,此处调整为 8091
cd seata/bin
nohup sh seata-server.sh -h 192.168.31.101 -p 8091 -m db &
|
服务中 Seata 配置
每个独立的业务库,都需要 undo_log 数据表的支持,以便发生异常时回滚。分别在会员库,财务库和消息库三个库中分别执行如下脚本,写入 un_log 表。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| CREATE TABLE `undo_log`
(
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`branch_id` BIGINT(20) NOT NULL,
`xid` VARCHAR(100) NOT NULL,
`context` VARCHAR(128) NOT NULL,
`rollback_info` LONGBLOB NOT NULL,
`log_status` INT(11) NOT NULL,
`log_created` DATETIME NOT NULL,
`log_modified` DATETIME NOT NULL,
`ext` VARCHAR(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;
|
在各模块服务的 pom.xml 文件中增加 seata 相关 jar 支持:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</dependency>
|
jar 包引入后,对应模块服务的 application.properties 中增加 seata 相关的配置项:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
spring.cloud.alibaba.seata.tx-service-group=message-service-group
logging.level.io.seata = debug
hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=20000
feign.client.config.defalut.connectTimeout=5000
ribbon.ConnectTimeout=6000
|
application.properties 同级目录下,依据 Spring Boot 约定优于配置的原则,增加 registry.conf 文件,应用启动时会默认加载此文件,代码中已写有默认文件名,如下图:
数据源代理配置
若要全局事务生效,针对每个微服务对应的存储库,必须由 Seata 进行数据源代理,以便统一管理,配置代码如下,将下列代码文件写入所有相关的微服务模块中,服务启动时自动配置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| @Configuration
public class DataSourceProxyConfig {
@Value("${mybatis.mapper-locations}")
private String mapperLocations;
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource hikariDataSource(){
return new HikariDataSource();
}
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver()
.getResources(mapperLocations));
sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
return sqlSessionFactoryBean.getObject();
}
}
|
还记得本章节文首的主要业务逻辑代码吗?方法除局部事务注解 @Transactional 外,还需要增加 Seata 全局事务配置注解:
1 2 3 4 5 6 7
| @Transactional(rollbackFor = Exception.class)
@GlobalTransactional
public int createExsits(String json) throws BusinessException { }
|
至此,Seata Server、全局事务配置、事务回滚配置、数据源代理、代码支持等均已完成,下面我们来启动应用,看看有什么不同:
1 2 3 4 5 6 7 8 9 10 11
| 2020-01-09 09:22:19.179 INFO 16457 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2020-01-09 09:22:19.970 INFO 16457 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2020-01-09 09:22:20.053 INFO 16457 --- [ main] io.seata.core.rpc.netty.RmRpcClient : register to RM resourceId:jdbc:mysql:
2020-01-09 09:22:20.053 INFO 16457 --- [ main] io.seata.core.rpc.netty.RmRpcClient : register resource, resourceId:jdbc:mysql:
2020-01-09 09:22:20.060 DEBUG 16457 --- [lector_RMROLE_1] i.s.core.rpc.netty.AbstractRpcRemoting : [email protected] msgId:2, future :[email protected], body:version=0.9.0,extraData=null,identified=true,resultCode=null,msg=null
|
从第 3 行日志开始,可以看到相应的应用已将自己交由全局事务管控。
哪到底这个分布式事务到底有没有真正有用呢?下面我们一起做个测试,看数据的完整性能否得到保证。
分布式事务测试
异常情况测试
只启动 parking-charge 计费服务,其它两个服务(财务子服务和消息子服务)不启动,当调用 finance-service 时,服务不可用,hystrix 会直接快速失败,抛出异常,此时全局事务失败,刚才成功写入的出场记录被回滚清除,看以下关键日志输出的截图:
正常情况测试
将三个微服务实例全部启动,可以在 nacos 控制台看到三个正常的服务实例,通过 swagger-ui 或 PostMan 发起请求调用,特别关注下三个子服务的控制台输出情况:
parkging-charging 计费服务实例,作为业务发起方,开启全局事务,此时显示全局事务编号是 192.168.31.101:8091:2032205087,被调用方事务编号应该当是一样的。
parking-finance 财务子服务控制台日志输出,可以看到服务正常执行,全局事务编号为 192.168.31.101:8091:2032205087。
parking-message 消息子服务的控制台日志输出情况,全局事务编号与上两个服务保持一致。
经过上面两个一正一反的测试,可以看到分布式事务配置已然正常运行。
细心的朋友发现了,seata 的支持表中都没有数据存在,这是怎么回事呢?什么时候会有数据呢,大家思考一下,算是给大家留的下一个思考题目。