入门
随着互联化的蔓延,各种项目都逐渐向分布式服务做转换。如今微服务已经普遍存在,本地事务已经无法满足分布式的要求,由此分布式事务问题诞生。 分布式事务被称为世界性的难题,目前分布式事务存在两大理论依据:CAP定律 BASE理论。
CAP定律
这个定理的内容是指的是在一个分布式系统中、Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可得兼。
一致性(C)
在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访问同一份最新的数据副本)
可用性(A)
在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性)
分区容错性(P)
以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。
BASE理论
BASE是Basically Available(基本可用)、Soft state(软状态)和 Eventually consistent(最终一致性)三个短语的缩写。BASE理论是对CAP中一致性和可用性权衡的结果,其来源于对大规模互联网系统分布式实践的总结, 是基于CAP定理逐步演化而来的。BASE理论的核心思想是:即使无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当的方式来使系统达到最终一致性。
基本可用
基本可用是指分布式系统在出现不可预知故障的时候,允许损失部分可用性----注意,这绝不等价于系统不可用。比如:
(1)响应时间上的损失。正常情况下,一个在线搜索引擎需要在0.5秒之内返回给用户相应的查询结果,但由于出现故障,查询结果的响应时间增加了1~2秒
(2)系统功能上的损失:正常情况下,在一个电子商务网站上进行购物的时候,消费者几乎能够顺利完成每一笔订单,但是在一些节日大促购物高峰的时候,由于消费者的购物行为激增,为了保护购物系统的稳定性,部分消费者可能会被引导到一个降级页面
软状态
软状态指允许系统中的数据存在中间状态,并认为该中间状态的存在不会影响系统的整体可用性,即允许系统在不同节点的数据副本之间进行数据同步的过程存在延时
最终一致性
最终一致性强调的是所有的数据副本,在经过一段时间的同步之后,最终都能够达到一个一致的状态。因此,最终一致性的本质是需要系统保证最终数据能够达到一致,而不需要实时保证系统数据的强一致性。
框架定位
LCN并不生产事务,LCN只是本地事务的协调工
TX-LCN定位于一款事务协调性框架,框架其本身并不操作事务,而是基于对事务的协调从而达到事务一致性的效果。
解决方案
在一个分布式系统下存在多个模块协调来完成一次业务。那么就存在一次业务事务下可能横跨多种数据源节点的可能。TX-LCN将可以解决这样的问题。
例如存在服务模块A 、B、 C。A模块是mysql作为数据源的服务,B模块是基于redis作为数据源的服务,C模块是基于mongo作为数据源的服务。若需要解决他们的事务一致性就需要针对不同的节点采用不同的方案,并且统一协调完成分布式事务的处理。
方案:
若采用TX-LCN分布式事务框架,则可以将A模块采用LCN模式、B/C采用TCC模式就能完美解决。
快速开始
说明
TX-LCN 主要有两个模块,Tx-Client(TC) Tx-Manager(TM). TC作为微服务下的依赖,TM是独立的服务。
本教程带领大家了解框架的基本步骤,详细配置可参考 dubbo springcloud
一、TM配置与启动
TM的准备环境
- 安装TM需要依赖的中间件: JRE1.8+, Mysql5.6+, Redis3.2+
如果需要手动编译源码, 还需要Git, Maven, JDK1.8+
- 创建MySQL数据库, 名称为: tx-manager
- 创建数据表
CREATE TABLE `t_tx_exception` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`group_id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`unit_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`mod_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`transaction_state` tinyint(4) NULL DEFAULT NULL,
`registrar` tinyint(4) NULL DEFAULT NULL,
`remark` varchar(4096) NULL DEFAULT NULL,
`ex_state` tinyint(4) NULL DEFAULT NULL COMMENT '0 未解决 1已解决',
`create_time` datetime(0) NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
TM下载与配置
- 从历史版本TM下载找到5.0.2.RELEASE的TM, 下载.
- 修改配置信息
spring.application.name=tx-manager
server.port=7970
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/tx-manager?characterEncoding=UTF-8
spring.datasource.username=root
spring.datasource.password=root
mybatis.configuration.map-underscore-to-camel-case=true
mybatis.configuration.use-generated-keys=true
#tx-lcn.logger.enabled=true
# TxManager Host Ip
#tx-lcn.manager.host=127.0.0.1
# TxClient连接请求端口
#tx-lcn.manager.port=8070
# 心跳检测时间(ms)
#tx-lcn.manager.heart-time=15000
# 分布式事务执行总时间
#tx-lcn.manager.dtx-time=30000
#参数延迟删除时间单位ms
#tx-lcn.message.netty.attr-delay-time=10000
#tx-lcn.manager.concurrent-level=128
# 开启日志
#tx-lcn.logger.enabled=true
#logging.level.com.codingapi=debug
#redis 主机
#spring.redis.host=127.0.0.1
#redis 端口
#spring.redis.port=6379
#redis 密码
#spring.redis.password=
- # 给出信息都是默认值
- 关于详细配置说明见TM配置
- application.properties 加载顺序如下:
- 0、命令行启动参数指定
- 1、file:./config/(当前jar目录下的config目录)
- 2、file:./(当前jar目录)
- 3、classpath:/config/(classpath下的config目录)
- 4、classpath:/(classpath根目录)
- 发布的二进制可执行Jar包含一个默认配置文件(也就是4),可按需要覆盖默认配置
- 手动编译TM,简单指引
# git clone https://github.com/codingapi/tx-lcn.git & cd txlcn-tm
# mvn clean package '-Dmaven.test.skip=true'
target文件夹下,即为TM executable jar.
二、TC微服务模块
微服务示例架构
- 服务A作为DTX发起方,远程调用服务B
TC引入pom依赖
com.codingapi.txlcn
txlcn-tc5.0.2.RELEASE com.codingapi.txlcn
txlcn-txmsg-netty5.0.2.RELEASE
TC开启分布式事务注解
在主类上使用@EnableDistributedTransaction
@SpringBootApplication
@EnableDistributedTransaction
public class DemoAApplication {
public static void main(String[] args) {
SpringApplication.run(DemoDubboClientApplication.class, args);
}
}
TC微服务A业务方法配置
@Service
public class ServiceA {
@Autowired
private ValueDao valueDao; //本地db操作
@Autowired
private ServiceB serviceB;//远程B模块业务
@LcnTransaction //分布式事务注解
@Transactional //本地事务注解
public String execute(String value) throws BusinessException {
// step1. call remote service B
String result = serviceB.rpc(value); // (1)
// step2. local store operate. DTX commit if save success, rollback if not.
valueDao.save(value); // (2)
valueDao.saveBackup(value); // (3)
return result + " > " + "ok-A";
}
}
TC微服务B业务方法配置
@Service
public class ServiceB {
@Autowired
private ValueDao valueDao; //本地db操作
@LcnTransaction //分布式事务注解
@Transactional //本地事务注解
public String rpc(String value) throws BusinessException {
valueDao.save(value); // (4)
valueDao.saveBackup(value); // (5)
return "ok-B";
}
}
TC配置信息说明
# 默认之配置为TM的本机默认端口
tx-lcn.client.manager-address=127.0.0.1:8070
尝试下简单的分布式事务
步骤引导
- 阅读快速开始
- 准备开发环境 JDK1.8+, Mysql5.6+, Redis3.2+, Consul(SpringCloud), ZooKeeper(Dubbo), Git, Maven
- 初始化数据
- 启动TxManager(TM)
- 配置微服务模块
- 启动模块与测试
3. 初始化数据
为了演示方便,我们接下来3个微服务用一个数据库,一张数据表。
- 创建MySQL数据库
create database if not exists `txlcn-demo` default charset utf8 collate utf8_general_ci;
- 创建数据表
create table `t_demo` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`kid` varchar(45) DEFAULT NULL,
`group_id` varchar(64) DEFAULT NULL,
`demo_field` varchar(255) DEFAULT NULL,
`app_name` varchar(128) DEFAULT NULL,
`create_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
4. 启动TxManager(TM)
TM启动详情,见快速启动
这里介绍如何在开发环境友好启动TM.
- 新建SpringBoot模块
- 新增依赖
com.codingapi.txlcn
txlcn-tm5.0.2.RELEASE
- 在主类上标注 @EnableTransactionManagerServer
@SpringBootApplication
@EnableTransactionManagerServer
public class TransactionManagerApplication {
public static void main(String[] args) {
SpringApplication.run(TransactionManagerApplication.class, args);
}
}
- 运行DemoTransactionManager
5. 配置微服务模块
Dubbo Demo见Dubbo-Demo
SpringCloud Demo见SpringCloud-Demo
6. 启动模块与测试
- 正常提交事务
- 访问 发起方提供的Rest接口 /txlcn?value=the-value。发现事务全部提交
- 回滚事务
- 访问 发起方提供的Rest接口 /txlcn?value=the-value&ex=throw。发现发起方由本地事务回滚,而参与方ServiceB、ServiceC,由于TX-LCN的协调,数据也回滚了。
Dubbo示例
Dubbo 示例说明
共三个模块如下:
DubboServiceA (发起方 | LCN模式)
DubboServiceB (参与方 | TXC模式)
DubboServiceC (参与方 | TCC模式)
代码地址:https://github.com/codingapi/txlcn-demo
一、调用关系说明:
- DubboServiceA -> DemoConsumerController的txlcn的Mapping是调用发起方法,代码如下。
@RestController
public class DemoConsumerController {
@Autowired
private DemoApiService demoApiService;
@RequestMapping("/txlcn")
public String sayHello(@RequestParam("value") String value,
@RequestParam(value = "ex", required = false) String exFlag) {
return demoApiService.execute(value, exFlag);
}
}
- DemoApiService.execute(value, exFlag)方法代码:
@Service
public class DemoApiServiceImpl implements DemoApiService {
@Reference(version = "${demo.service.version}",
application = "${dubbo.application.b}",
registry = "${dubbo.registry.address}",
retries = -1,
check = false,
loadbalance = "txlcn_random")
private DemoServiceB demoServiceB;
@Reference(version = "${demo.service.version}",
application = "${dubbo.application.c}",
retries = -1,
check = false,
registry = "${dubbo.registry.address}",
loadbalance = "txlcn_random")
private DemoServiceC demoServiceC;
@Autowired
private DemoMapper demoMapper;
@Override
@LcnTransaction
public String execute(String name, String exFlag) {
String bResp = demoServiceB.rpc(name);
String cResp = demoServiceC.rpc(name);
Demo demo = new Demo();
demo.setGroupId(TracingContext.tracing().groupId());
demo.setDemoField(name);
demo.setAppName(Transactions.getApplicationId());
demo.setCreateTime(new Date());
demoMapper.save(demo);
if (Objects.nonNull(exFlag)) {
throw new IllegalStateException("by exFlag");
}
return bResp + " > " + cResp + " > " + "ok-service-a";
}
}
- 参与方DemoServiceB.rpc(name)的代码
@Service(
version = "${demo.service.version}",
application = "${dubbo.application.id}",
protocol = "${dubbo.protocol.id}",
registry = "${dubbo.registry.id}"
)
@Slf4j
public class DefaultDemoService implements DDemoService {
@Autowired
private DemoMapper demoMapper;
@Override
@TxTransaction(type = "txc")
public String rpc(String name) {
Demo demo = new Demo();
demo.setDemoField(name);
demo.setGroupId(TracingContext.tracing().groupId());
demo.setCreateTime(new Date());
demo.setAppName(Transactions.getApplicationId());
demoMapper.save(demo);
return "ok-service-d";
}
}
- 参与方DemoServiceC.rpc(name)的代码
@Service(
version = "${demo.service.version}",
application = "${dubbo.application.id}",
protocol = "${dubbo.protocol.id}",
registry = "${dubbo.registry.id}"
)
@Slf4j
public class DefaultDemoService implements EDemoService {
@Autowired
private DemoMapper demoMapper;
private ConcurrentHashMap
@Override
@TccTransaction(confirmMethod = "cm", cancelMethod = "cl", executeClass = DefaultDemoService.class)
public String rpc(String name) {
Demo demo = new Demo();
demo.setDemoField(name);
demo.setAppName(Transactions.getApplicationId());
demo.setCreateTime(new Date());
demo.setGroupId(TracingContext.tracing().groupId());
demoMapper.save(demo);
ids.put(TracingContext.tracing().groupId(), demo.getId());
return "ok-service-c";
}
public void cm(String name) {
log.info("tcc-confirm-" + TracingContext.tracing().groupId());
ids.remove(TracingContext.tracing().groupId());
}
public void cl(String name) {
log.info("tcc-cancel-" + TracingContext.tracing().groupId());
demoMapper.deleteByKId(ids.get(TracingContext.tracing().groupId()));
}
}
二、工程代码概览
- 事务发起方,txlcn-demo-dubbo-service-a
- 工程截图
- 项目配置文件 application.properties
##################
# 这个是启动本服务的配置文件,其它的application-xxx.properties 是开发者的个性化配置,不用关心。
# 你可以在 https://txlcn.org/zh-cn/docs/setting/client.html 看到所有的个性化配置
#################
# Spring boot application
spring.application.name=DubboServiceAApplication
server.port=12004
management.port=12007
# Service Version
demo.service.version=1.0.0
# Dubbo Config properties
## ApplicationConfig Bean
dubbo.application.id=DubboServiceAApplication
dubbo.application.name=DubboServiceAApplication
dubbo.application.b=DubboServiceBApplication
dubbo.application.c=DubboServiceCApplication
## ProtocolConfig Bean
dubbo.protocol.id=dubbo
dubbo.protocol.name=dubbo
dubbo.protocol.port=12345
dubbo.registry.protocol=zookeeper
dubbo.registry.address=127.0.0.1:2181
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/txlcn-demo?\
characterEncoding=UTF-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.hikari.maximum-pool-size=20
mybatis.configuration.map-underscore-to-camel-case=true
mybatis.configuration.use-generated-keys=true
logging.level.com.codingapi.txlcn=DEBUG
- 启动类
@SpringBootApplication
@EnableDistributedTransaction
public class DubboServiceAApplication {
public static void main(String[] args) {
SpringApplication.run(DubboServiceAApplication.class, args);
}
}
- 事务参与方,txlcn-demo-dubbo-service-b
- 工程截图
- 项目配置文件 application.properties
##################
# 这个是启动本服务的配置文件,其它的application-xxx.properties 是开发者的个性化配置,不用关心。
# 你可以在 https://txlcn.org/zh-cn/docs/setting/client.html 看到所有的个性化配置
#################
# Spring boot application
spring.application.name=DubboServiceBApplication
server.port=12005
management.port=12008
# Service version
demo.service.version=1.0.0
# Base packages to scan Dubbo Components (e.g @Service , @Reference)
dubbo.scan.basePackages=com.example
# Dubbo Config properties
## ApplicationConfig Bean
dubbo.application.id=DubboServiceBApplication
dubbo.application.name=DubboServiceBApplication
## ProtocolConfig Bean
dubbo.protocol.id=dubbo
dubbo.protocol.name=dubbo
dubbo.protocol.port=12345
## RegistryConfig Bean
dubbo.registry.id=my-registry
dubbo.registry.address=127.0.0.1:2181
dubbo.registry.protocol=zookeeper
dubbo.application.qos.enable=false
## DB
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/txlcn-demo?\
characterEncoding=UTF-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.hikari.maximum-pool-size=20
mybatis.configuration.map-underscore-to-camel-case=true
mybatis.configuration.use-generated-keys=true
logging.level.com.codingapi.txlcn=DEBUG
- 启动类
@SpringBootApplication
@EnableDistributedTransaction
public class DubboServiceBApplication {
public static void main(String[] args) {
SpringApplication.run(DubboServiceBApplication.class, args);
}
}
- 事务参与方,txlcn-demo-spring-service-c
- 工程截图
- 项目配置文件 application.properties
##################
# 这个是启动本服务的配置文件,其它的application-xxx.properties 是开发者的个性化配置,不用关心。
# 你可以在 https://txlcn.org/zh-cn/docs/setting/client.html 看到所有的个性化配置
#################
# Spring boot application
spring.application.name=DubboServiceCApplication
server.port=12006
management.port=12009
# Service version
demo.service.version=1.0.0
# Base packages to scan Dubbo Components (e.g @Service , @Reference)
dubbo.scan.basePackages=org.txlcn.demo.dubbo
# Dubbo Config properties
## ApplicationConfig Bean
dubbo.application.id=DubboServiceCApplication
dubbo.application.name=DubboServiceCApplication
## ProtocolConfig Bean
dubbo.protocol.id=dubbo
dubbo.protocol.name=dubbo
dubbo.protocol.port=12346
## RegistryConfig Bean
dubbo.registry.id=my-registry
dubbo.registry.address=127.0.0.1:2181
dubbo.registry.protocol=zookeeper
dubbo.application.qos.enable=false
#db
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/txlcn-demo\
?characterEncoding=UTF-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.hikari.maximum-pool-size=20
mybatis.configuration.map-underscore-to-camel-case=true
mybatis.configuration.use-generated-keys=true
logging.level.com.codingapi.txlcn=DEBUG
- 启动类
@SpringBootApplication
@EnableDistributedTransaction
public class DubboServiceCApplication {
public static void main(String[] args) {
SpringApplication.run(DubboServiceCApplication.class, args);
}
}
三、启动SpringCloud微服务
事务参与方 ServiceB
事务参与方 ServiceC
事务发起方 ServiceA
事务控制原理
TX-LCN由两大模块组成, TxClient、TxManager,TxClient作为模块的依赖框架,提供TX-LCN的标准支持,TxManager作为分布式事务的控制放。事务发起方或者参与反都由TxClient端来控制。
原理图:
核心步骤
- 创建事务组
- 是指在事务发起方开始执行业务代码之前先调用TxManager创建事务组对象,然后拿到事务标示GroupId的过程。
- 加入事务组
- 添加事务组是指参与方在执行完业务方法以后,将该模块的事务信息通知给TxManager的操作。
- 通知事务组
- 是指在发起方执行完业务代码以后,将发起方执行结果状态通知给TxManager,TxManager将根据事务最终状态和事务组的信息来通知相应的参与模块提交或回滚事务,并返回结果给事务发起方。
LCN事务模式
一、原理介绍:
LCN模式是通过代理Connection的方式实现对本地事务的操作,然后在由TxManager统一协调控制事务。当本地事务提交回滚或者关闭连接时将会执行假操作,该代理的连接将由LCN连接池管理。
二、模式特点:
- 该模式对代码的嵌入性为低。
- 该模式仅限于本地存在连接对象且可通过连接对象控制事务的模块。
- 该模式下的事务提交与回滚是由本地事务方控制,对于数据一致性上有较高的保障。
- 该模式缺陷在于代理的连接需要随事务发起方一共释放连接,增加了连接占用的时间。
TCC事务模式
一、原理介绍:
TCC事务机制相对于传统事务机制(X/Open XA Two-Phase-Commit),其特征在于它不依赖资源管理器(RM)对XA的支持,而是通过对(由业务系统提供的)业务逻辑的调度来实现分布式事务。主要由三步操作,Try: 尝试执行业务、 Confirm:确认执行业务、 Cancel: 取消执行业务。
二、模式特点:
- 该模式对代码的嵌入性高,要求每个业务需要写三种步骤的操作。
- 该模式对有无本地事务控制都可以支持使用面广。
- 数据一致性控制几乎完全由开发者控制,对业务开发难度要求高。
TXC事务模式
一、原理介绍:
TXC模式命名来源于淘宝,实现原理是在执行SQL之前,先查询SQL的影响数据,然后保存执行的SQL快走信息和创建锁。当需要回滚的时候就采用这些记录数据回滚数据库,目前锁实现依赖redis分布式锁控制。
二、模式特点:
- 该模式同样对代码的嵌入性低。
- 该模式仅限于对支持SQL方式的模块支持。
- 该模式由于每次执行SQL之前需要先查询影响数据,因此相比LCN模式消耗资源与时间要多。
- 该模式不会占用数据库的连接资源。
TC配置说明
一、application.properties
# 是否启动LCN负载均衡策略(优化选项,开启与否,功能不受影响)
tx-lcn.ribbon.loadbalancer.dtx.enabled=true
# tx-manager 的配置地址,可以指定TM集群中的任何一个或多个地址
# tx-manager 下集群策略,每个TC都会从始至终与TM集群保持集群大小个连接。
# TM方,每有TM进入集群,会找到所有TC并通知其与新TM建立连接。
# TC方,启动时按配置与集群建立连接,成功后,会再与集群协商,查询集群大小并保持与所有TM的连接
tx-lcn.client.manager-address=127.0.0.1:8070
# 该参数是分布式事务框架存储的业务切面信息。采用的是h2数据库。绝对路径。该参数默认的值为{user.dir}/.txlcn/{application.name}-{application.port}
tx-lcn.aspect.log.file-path=logs/.txlcn/demo-8080
# 调用链长度等级,默认值为3(优化选项。系统中每个请求大致调用链平均长度,估算值。)
tx-lcn.client.chain-level=3
# 该参数为tc与tm通讯时的最大超时时间,单位ms。该参数不需要配置会在连接初始化时由tm返回。
tx-lcn.client.tm-rpc-timeout=2000
# 该参数为分布式事务的最大时间,单位ms。该参数不允许TC方配置,会在连接初始化时由tm返回。
tx-lcn.client.dtx-time=8000
# 该参数为雪花算法的机器编号,所有TC不能相同。该参数不允许配置,会在连接初始化时由tm返回。
tx-lcn.client.machine-id=1
# 该参数为事务方法注解切面的orderNumber,默认值为0.
tx-lcn.client.dtx-aspect-order=0
# 该参数为事务连接资源方法切面的orderNumber,默认值为0.
tx-lcn.client.resource-order=0
# 是否开启日志记录。当开启以后需要配置对应logger的数据库连接配置信息。
tx-lcn.logger.enabled=false
tx-lcn.logger.driver-class-name=${spring.datasource.driver-class-name}
tx-lcn.logger.jdbc-url=${spring.datasource.url}
tx-lcn.logger.username=${spring.datasource.username}
tx-lcn.logger.password=${spring.datasource.password}
二、特别配置
1、微服务集群且用到 LCN事务模式时,为保证性能请开启TX-LCN重写的负载策略。
- Dubbo 开启
@Reference(version = "${demo.service.version}",
application = "${dubbo.application.e}",
retries = -1,
registry = "${dubbo.registry.address}",
loadbalance = "txlcn_random") // here
private EDemoService eDemoService;
- SpringCloud 开启 (application.properties)
tx-lcn.springcloud.loadbalance.enabled=true
配置详情参见
2、关闭业务RPC重试
- Dubbo 开启
@Reference(version = "${demo.service.version}",
application = "${dubbo.application.e}",
retries = -1,
registry = "${dubbo.registry.address}",
loadbalance = "txlcn_random") // here
private EDemoService eDemoService;
- SpringCloud 开启 (application.properties)
# 关闭Ribbon的重试机制
ribbon.MaxAutoRetriesNextServer=0
NOTE
1、TxClient所有配置均有默认配置,请按需覆盖默认配置。
2、为什么要关闭服务调用的重试。远程业务调用失败有两种可能: (1),远程业务执行失败 (2)、远程业务执行成功,网络失败。对于第2种,事务场景下重试会发生,某个业务执行两次的问题。 如果业务上控制某个事务接口的幂等,则不用关闭重试。
3、通过AOP配置本地事务与分布式事务
@Configuration
@EnableTransactionManagement
public class TransactionConfiguration {
/**
* 本地事务配置
* @param transactionManager
* @return
*/
@Bean
@ConditionalOnMissingBean
public TransactionInterceptor transactionInterceptor(PlatformTransactionManager transactionManager) {
Properties properties = new Properties();
properties.setProperty("*", "PROPAGATION_REQUIRED,-Throwable");
TransactionInterceptor transactionInterceptor = new TransactionInterceptor();
transactionInterceptor.setTransactionManager(transactionManager);
transactionInterceptor.setTransactionAttributes(properties);
return transactionInterceptor;
}
/**
* 分布式事务配置 设置为LCN模式
* @param dtxLogicWeaver
* @return
*/
@ConditionalOnBean(DTXLogicWeaver.class)
@Bean
public TxLcnInterceptor txLcnInterceptor(DTXLogicWeaver dtxLogicWeaver) {
TxLcnInterceptor txLcnInterceptor = new TxLcnInterceptor(dtxLogicWeaver);
Properties properties = new Properties();
properties.setProperty(Transactions.DTX_TYPE,Transactions.LCN);
properties.setProperty(Transactions.DTX_PROPAGATION, "REQUIRED");
txLcnInterceptor.setTransactionAttributes(properties);
return txLcnInterceptor;
}
@Bean
public BeanNameAutoProxyCreator beanNameAutoProxyCreator() {
BeanNameAutoProxyCreator beanNameAutoProxyCreator = new BeanNameAutoProxyCreator();
//需要调整优先级,分布式事务在前,本地事务在后。
beanNameAutoProxyCreator.setInterceptorNames("txLcnInterceptor","transactionInterceptor");
beanNameAutoProxyCreator.setBeanNames("*Impl");
return beanNameAutoProxyCreator;
}
}
4、TXC模式定义表的实际主键
TXC 是基于逆向sql的方式实现对业务的回滚控制,在逆向sql操作数据是会检索对应记录的主键作为条件处理回滚业务。但是在有些情况下可能表中并没有主键字段(primary key),仅存在业务上的名义主键,此时可通过重写PrimaryKeysProvider方式定义表对应的主键关系。
如下所示:
@Component
public class MysqlPrimaryKeysProvider implements PrimaryKeysProvider {
@Override
public Map> provide() {
//t_demo 表的回滚主键为 kid字段
return Maps.newHashMap("t_demo", Collections.singletonList("kid"));
}
}
5、TC模块标识策略
TC模块在负载时,TM为了区分具体模块,会要求TC注册时提供唯一标识。默认策略是,应用名称加端口方式标识。也可以自定义,自定义需要保证各个模块标识不能重复。
@Component
public class MyModIdProvider implements ModIdProvider {
@Override
public String modId() {
return ip + port;
}
}
TM配置说明
application.properties
spring.application.name=TransactionManager
server.port=7970
# JDBC 数据库配置
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/tx-manager?characterEncoding=UTF-8
spring.datasource.username=root
spring.datasource.password=123456
# 数据库方言
spring.jpa.database-platform=org.hibernate.dialect.MySQL5InnoDBDialect
# 第一次运行可以设置为: create, 为TM创建持久化数据库表
spring.jpa.hibernate.ddl-auto=validate
# TM监听IP. 默认为 127.0.0.1
tx-lcn.manager.host=127.0.0.1
# TM监听Socket端口. 默认为 ${server.port} - 100
tx-lcn.manager.port=8070
# 心跳检测时间(ms). 默认为 300000
tx-lcn.manager.heart-time=300000
# 分布式事务执行总时间(ms). 默认为36000
tx-lcn.manager.dtx-time=8000
# 参数延迟删除时间单位ms 默认为dtx-time值
tx-lcn.message.netty.attr-delay-time=${tx-lcn.manager.dtx-time}
# 事务处理并发等级. 默认为机器逻辑核心数5倍
tx-lcn.manager.concurrent-level=160
# TM后台登陆密码,默认值为codingapi
tx-lcn.manager.admin-key=codingapi
# 分布式事务锁超时时间 默认为-1,当-1时会用tx-lcn.manager.dtx-time的时间
tx-lcn.manager.dtx-lock-time=${tx-lcn.manager.dtx-time}
# 雪花算法的sequence位长度,默认为12位.
tx-lcn.manager.seq-len=12
# 异常回调开关。开启时请制定ex-url
tx-lcn.manager.ex-url-enabled=false
# 事务异常通知(任何http协议地址。未指定协议时,为TM提供内置功能接口)。默认是邮件通知
tx-lcn.manager.ex-url=/provider/email-to/***@**.com
# 开启日志,默认为false
tx-lcn.logger.enabled=true
tx-lcn.logger.enabled=false
tx-lcn.logger.driver-class-name=${spring.datasource.driver-class-name}
tx-lcn.logger.jdbc-url=${spring.datasource.url}
tx-lcn.logger.username=${spring.datasource.username}
tx-lcn.logger.password=${spring.datasource.password}
# redis 的设置信息. 线上请用Redis Cluster
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.password=
注意(NOTE)
(1) TxManager所有配置均有默认配置,请按需覆盖默认配置。
(2) 特别注意 TxManager进程会监听两个端口号,一个为TxManager端口,另一个是事务消息端口。TxClient默认连接事务消息端口是8070, 所以,为保证TX-LCN基于默认配置运行良好,请设置TxManager端口号为8069 或者指定事务消息端口为8070
(3) 分布式事务执行总时间 a 与 TxClient通讯最大等待时间 b、TxManager通讯最大等待时间 c、微服务间通讯时间 d、微服务调用链长度 e 几个时间存在着依赖关系。 a >= 2c + (b + c + d) * (e - 1), 特别地,b、c、d 一致时,a >= (3e-1)b。你也可以在此理论上适当在减小a的值,发生异常时能更快得到自动补偿,即 a >= (3e-1)b - Δ(原因)。 最后,调用链小于等于3时,将基于默认配置运行良好
(4) 若用tx-lcn.manager.ex-url=/provider/email-to/xxx@xx.xxx 这个配置,配置管理员邮箱信息(如QQ邮箱):
spring.mail.host=smtp.qq.com
spring.mail.port=587
spring.mail.username=xxxxx@**.com
spring.mail.password=*********
负载与集群配置
负载集群分为业务模块与TxManager
- 业务模块负载集群说明
模块的集群集群基于springcloud或dubbo机制,集群的方式都是围绕服务发现来完成的,关于模块的负载集群配置这里将不阐述,可参考dubbo与springcloud资料。
- TxManager集群说明
TxManager集群比较简单,只需要控制TxManager下的db资源相同(mysql 、redis)部署多份即可,注意TxManager负载均衡5.0版本与之前版本机制不同。
TX-LCN 负载均衡介绍
使用步骤:
- 首选需要启动多个TxManager服务。
- 在客户端配置TxManager服务地址。
tx-lcn.client.manager-address=127.0.0.1:8070,127.0.0.1:8072
原理介绍:
当有事务请求客户端时事务发起端会随机选择一个可用TxManager作为事务控制方,然后告知其参与模块都与该模块通讯。
目前TX-LCN的负载机制仅提供了随机机制。
- 关于tx-lcn.client.manager-address的注意事项:
- 客户端在配置上tx-lcn.client.manager-address地址后,启动时必须要全部可访问客户端才能正常启动。
- 当tx-lcn.client.manager-address中的服务存在不可用时,客户端会重试链接8次,超过次数以后将不在重试,重试链接的间隔时间为6秒,当所有的TxManager都不可访问则会导致所有的分布式事务请求都失败回滚。
- 当增加一个新的TxManager的集群模块时不需要添加到tx-lcn.client.manager-address下,TxManager也会广播到所有的TxManager端再通知所有链接中的TxClient端新的TxManager加入。TC配置集群时,不用制定集群里的所有地址。
模块端负载集群注意事项
目前TX-LCN支持的事务种类有三种,其中LCN模式是会占用资源,详情见LCN模式原理。
若存在这样的请求链,A模块先调用了B模块的one方法,然后在调用了two方法,如下所示:
A ->B.one();
A ->B.two();
假如one与two方法的业务都是在修改同一条数据,假如两个方法的id相同,伪代码如下:
void one(id){
execute => update demo set state = 1 where id = {id} ;
}
void two(id){
execute => update demo set state = 2 where id = {id} ;
}
若B模块做了集群存在B1、B2两个模块。那么就可能出现A分别调用了B1 B2模块,如下:
A ->B1.one();
A ->B2.two();
在这样的情况下业务方将在LCN下会因为资源占用而导致执行失败而回滚事务。为了支持这样的场景,框架提供了重写了rpc的负载模式。
控制在同一次事务下同一个被负载的模块被重复调用时将只会请求到第一次被选中的模块。
针对dubbo需要指定loadbalance为txlcn的负载方式,框架重写了dubbo的负载方式提供了对应dubbo的四种负载方式 :
txlcn_random=com.codingapi.txlcn.tracing.dubbo.TxlcnRandomLoadBalance
txlcn_roundrobin=com.codingapi.txlcn.tracing.dubbo.TxlcnRoundRobinLoadBalance
txlcn_leastactive=com.codingapi.txlcn.tracing.dubbo.TxlcnLeastActiveLoadBalance
txlcn_consistenthash=com.codingapi.txlcn.tracing.dubbo.TxlcnConsistentHashLoadBalance
使用如下:
@Reference(version = "${demo.service.version}",
application = "${dubbo.application.e}",
retries = -1,
registry = "${dubbo.registry.address}",
loadbalance = "txlcn_random")
private EDemoService eDemoService;
springcloud下需要在application的配置文件下增加:
tx-lcn.springcloud.loadbalance.enabled=true
事务模式扩展
TX-LCN不仅仅支持LCN TXC TCC模式,也可以由开发者自定义符合TX-LCN控制原理的请求事务模型。
事务模式的接口定义
- 增加一种新的事务模式名称,不能与已有的模式重名,例如test模式。
在使用新的模式时,只需要在业务上标准类型即可。如下:
@TxTransaction(type = "test")
@Transactional
public void test(){
}
- 实现TransactionResourceExecutor接口,处理db资源。
public interface TransactionResourceProxy {
/**
* 获取资源连接
*
* @param connectionCallback Connection提供者
* @return Connection Connection
* @throws Throwable Throwable
*/
Connection proxyConnection(ConnectionCallback connectionCallback) throws Throwable;
}
- 实现不同状态下的事务控制 实现DTXLocalControl 接口处理业务。
public interface DTXLocalControl {
/**
* 业务代码执行前
*
* @param info info
* @throws TransactionException TransactionException
*/
default void preBusinessCode(TxTransactionInfo info) throws TransactionException {
}
/**
* 执行业务代码
*
* @param info info
* @return Object Object
* @throws Throwable Throwable
*/
default Object doBusinessCode(TxTransactionInfo info) throws Throwable {
return info.getBusinessCallback().call();
}
/**
* 业务代码执行失败
*
* @param info info
* @param throwable throwable
*/
default void onBusinessCodeError(TxTransactionInfo info, Throwable throwable) throws TransactionException {
}
/**
* 业务代码执行成功
*
* @param info info
* @param result result
* @throws TransactionException TransactionException
*/
default void onBusinessCodeSuccess(TxTransactionInfo info, Object result) throws TransactionException {
}
/**
* 清场
*
* @param info info
*/
default void postBusinessCode(TxTransactionInfo info) {
}
}
例如 LCN starting状态下的处理实现,bean name control_lcn_starting是标准规范,control_+模式名称+状态名称:
@Service(value = "control_lcn_starting")
@Slf4j
public class LcnStartingTransaction implements DTXLocalControl {
private final TransactionControlTemplate transactionControlTemplate;
@Autowired
public LcnStartingTransaction(TransactionControlTemplate transactionControlTemplate) {
this.transactionControlTemplate = transactionControlTemplate;
}
@Override
public void preBusinessCode(TxTransactionInfo info) throws TransactionException {
// create DTX group
transactionControlTemplate.createGroup(
info.getGroupId(), info.getUnitId(), info.getTransactionInfo(), info.getTransactionType());
// lcn type need connection proxy
DTXLocalContext.makeProxy();
}
@Override
public void onBusinessCodeError(TxTransactionInfo info, Throwable throwable) {
DTXLocalContext.cur().setSysTransactionState(0);
}
@Override
public void onBusinessCodeSuccess(TxTransactionInfo info, Object result) {
DTXLocalContext.cur().setSysTransactionState(1);
}
@Override
public void postBusinessCode(TxTransactionInfo info) {
// RPC close DTX group
transactionControlTemplate.notifyGroup(
info.getGroupId(), info.getUnitId(), info.getTransactionType(), DTXLocalContext.transactionState());
}
}
说明:
若增加的新的模式最好创建一个新的模块,然后调整pom增加该模块的支持即可。
通讯协议扩展
通讯协议扩展是指txclient与txmanager通讯的协议扩展。
目前TX-LCN默认采用了netty方式通讯。关于拓展也以netty方式来说明如何拓展。
拓展txlcn-txmsg
主要实现6个接口,其中下面4个是由txlcn-txmsg的实现方提供:
- 发起请求调用客户端 RpcClient
public abstract class RpcClient {
@Autowired
private RpcLoadBalance rpcLoadBalance;
/**
* 发送指令不需要返回数据,需要知道返回的状态
*
* @param rpcCmd 指令内容
* @return 指令状态
* @throws RpcException 远程调用请求异常
*/
public abstract RpcResponseState send(RpcCmd rpcCmd) throws RpcException;
/**
* 发送指令不需要返回数据,需要知道返回的状态
*
* @param remoteKey 远程标识关键字
* @param msg 指令内容
* @return 指令状态
* @throws RpcException 远程调用请求异常
*/
public abstract RpcResponseState send(String remoteKey, MessageDto msg) throws RpcException;
/**
* 发送请求并获取响应
*
* @param rpcCmd 指令内容
* @return 响应指令数据
* @throws RpcException 远程调用请求异常
*/
public abstract MessageDto request(RpcCmd rpcCmd) throws RpcException;
/**
* 发送请求并响应
*
* @param remoteKey 远程标识关键字
* @param msg 请求内容
* @return 相应指令数据
* @throws RpcException 远程调用请求异常
*/
public abstract MessageDto request(String remoteKey, MessageDto msg) throws RpcException;
/**
* 发送请求并获取响应
*
* @param remoteKey 远程标识关键字
* @param msg 请求内容
* @param timeout 超时时间
* @return 响应消息
* @throws RpcException 远程调用请求异常
*/
public abstract MessageDto request(String remoteKey, MessageDto msg, long timeout) throws RpcException;
/**
* 获取一个远程标识关键字
*
* @return 远程标识关键字
* @throws RpcException 远程调用请求异常
*/
public String loadRemoteKey() throws RpcException {
return rpcLoadBalance.getRemoteKey();
}
/**
* 获取所有的远程连接对象
*
* @return 远程连接对象数组.
*/
public abstract ListloadAllRemoteKey();
/**
* 获取模块远程标识
*
* @param moduleName 模块名称
* @return 远程标识
*/
public abstract ListremoteKeys(String moduleName);
/**
* 绑定模块名称
*
* @param remoteKey 远程标识
* @param appName 应用名称
*/
public abstract void bindAppName(String remoteKey, String appName);
/**
* 获取模块名称
*
* @param remoteKey 远程标识
* @return 应用名称
*/
public abstract String getAppName(String remoteKey);
/**
* 获取所有的模块信息
*
* @return 应用名称
*/
public abstract List apps();
}
- 发起请求调用客户端初始化接口 RpcClientInitializer
public interface RpcClientInitializer {
/**
* message client init
* @param hosts
*/
void init(List
/**
* 建立连接
* @param socketAddress
*/
void connect(SocketAddress socketAddress);
}
- TxManager message初始化接口 RpcServerInitializer
public interface RpcServerInitializer {
/**
* support server init
*
* @param managerProperties 配置信息
*/
void init(ManagerProperties managerProperties);
}
- 客户端请求TxManager的负载策略 RpcLoadBalance
public interface RpcLoadBalance {
/**
* 获取一个远程标识关键字
* @return
* @throws RpcException
*/
String getRemoteKey()throws RpcException;
}
下面两个用于Tx-Manager与Tx-Client的回调业务
RpcAnswer接口 Tx-Manager与Tx-Client都会实现用于接受响应数据。
public interface RpcAnswer {
/**
* 业务处理
* @param rpcCmd message 曾业务回调函数
*
*/
void callback(RpcCmd rpcCmd);
}
ClientInitCallBack 接口,用于Tx-Manager下需要处理客户端与TxManager建立连接的初始化回调业务。
public interface ClientInitCallBack {
/**
* 初始化连接成功回调
* @param remoteKey 远程调用唯一key
*/
void connected(String remoteKey);
}
实现细节可借鉴 txlcn-txmsg-netty 模块源码
RPC框架扩展
RPC扩展主要是指在分布式事务框架下对传递控制参数的支持、与负载均衡的扩展控制。
下面以dubbo框架为例讲解扩展的过程。
- 传递控制参数的支持
dubbo参数传递可以通过隐形传参的方式来完成。参数传递分为传出与接受两块。下面分别展示代码说明。
dubbo传出参数的filter:
@Activate(group = Constants.CONSUMER)
public class DubboRequestInterceptor implements Filter {
@Override
public Result invoke(Invoker> invoker, Invocation invocation) throws RpcException {
//判断是否存在事务
if (TracingContext.tracing().hasGroup()) {
//设置传递的参数信息
RpcContext.getContext().setAttachment(TracingConstants.HEADER_KEY_GROUP_ID, TracingContext.tracing().groupId());
RpcContext.getContext().setAttachment(TracingConstants.HEADER_KEY_APP_MAP, TracingContext.tracing().appMapBase64String());
}
return invoker.invoke(invocation);
}
}
dubbo传入参数的filter:
@Activate(group = {Constants.PROVIDER})
public class TracingHandlerInterceptor implements Filter {
@Override
public Result invoke(Invoker> invoker, Invocation invocation) throws RpcException {
//接受参数
String groupId = invocation.getAttachment(TracingConstants.HEADER_KEY_GROUP_ID, "");
String appList = invocation.getAttachment(TracingConstants.HEADER_KEY_APP_MAP, "");
//设置参数
TracingContext.tracing().init(Maps.newHashMap(TracingConstants.GROUP_ID, groupId, TracingConstants.APP_MAP, appList));
return invoker.invoke(invocation);
}
}
- 负载均衡的扩展控制(仅限于LCN模式下)
控制的效果:负载均衡扩展主要为了做到在同一次分布式事务中相同的模块重复调用在同一个模块下。
为什么仅限于LCN模式?
当存在这样的请求链,A模块先调用了B模块的one方法,然后在调用了two方法,如下所示:
A ->B.one(); A ->B.two(); 假如one与two方法的业务都是在修改同一条数据,假如两个方法的id相同,伪代码如下:
void one(id){
execute => update demo set state = 1 where id = {id} ;
}
void two(id){
execute => update demo set state = 2 where id = {id} ;
}
若B模块做了集群存在B1、B2两个模块。那么就可能出现A分别调用了B1 B2模块,如下:
A ->B1.one(); A ->B2.two(); 在这样的情况下业务方将在LCN下会因为资源占用而导致执行失败而回滚事务。为了支持这样的场景,框架提供了重写了rpc的负载模式。
控制在同一次事务下同一个被负载的模块被重复调用时将只会请求到第一次被选中的模块。在采用这样的方案的时候也会提高Connection的连接使用率,会提高在负载情况下的性能。
dubbo框架默认提供了四种负载策略,这里仅仅展示random的实现。
public class TxlcnRandomLoadBalance extends RandomLoadBalance {
@Override
publicInvoker select(List > invokers, URL url, Invocation invocation) {
return DubboTxlcnLoadBalance.chooseInvoker(invokers, url, invocation, super::select);
}
}
@Slf4j
class DubboTxlcnLoadBalance {
private static final String empty = "";
staticInvoker chooseInvoker(List > invokers, URL url, Invocation invocation, TxLcnLoadBalance loadBalance) {
//非分布式事务直接执行默认业务.
if(!TracingContext.tracing().hasGroup()){
return loadBalance.select(invokers, url, invocation);
}
TracingContext.tracing()
.addApp(RpcContext.getContext().getLocalAddressString(), empty);
assert invokers.size() > 0;
JSONObject appMap = TracingContext.tracing().appMap();
log.debug("invokers: {}", invokers);
InvokerchooseInvoker = null;
outline:
for (InvokertInvoker : invokers) {
for (String address : appMap.keySet()) {
if (address.equals(tInvoker.getUrl().getAddress())) {
chooseInvoker = tInvoker;
log.debug("txlcn choosed server [{}] in txGroup: {}", tInvoker, TracingContext.tracing().groupId());
break outline;
}
}
}
if (chooseInvoker == null) {
Invokerinvoker = loadBalance.select(invokers, url, invocation);
TracingContext.tracing().addApp(invoker.getUrl().getAddress(), empty);
return invoker;
}
return chooseInvoker;
}
@FunctionalInterface
public interface TxLcnLoadBalance {Invoker select(List > invokers, URL url, Invocation invocation);
}
}
私信回复"tx-lcn"获取链接地址,喜欢的点个关注,一起学习探讨新技术。
本文来自投稿,不代表本人立场,如若转载,请注明出处:http://www.sosokankan.com/article/692707.html