一.什么是分布式事务

针对于单数据库的事务我们叫着本地事务/传统事务,在分布式环境中一个请求可能涉及到多个数据库的写操作(多数据源),要保证多数据源的一致性必须用到分布式事务。

二.为什么需要分布式事务

系统微服务化后,一个看似简单的功能,内部可能需要调用多个服务并操作多个数据库实现,服务调用的分布式事务问题变的非常突出。

image-20221023200145848

一个下单请求同时设计到订单库,优惠券库,库存库的写操作,需要保证三个库写操作的一致性,就要用到分布式事务 即:分布式事务就是要解决一个请求同时对多个数据库写操作的一致性

注意:微服务拆分原则,尽量让大部分操作都不要跨微服务操作,也就是跨库。 分布式事务比本地事务耗费的资源更多。

三.分布式事务解决方案

  1. 2PC方案

    2PC即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段( Prepare phase).提交阶段( pphase ) , 2是指两个阶段, P是指准备阶段, C是指提交阶段。

    在第一阶段(准备阶段),事务管理器先事务参与者(资源)们发送准备请求,大家都返回OK状态,那么就进入第二阶段,提交事务,如果在第一阶段有任何一个参与者没有OK,那么事务协调器通知其他所有事务参与者(资源)回滚事务。2PC常见的标准是XA, JTA,Seata等。

  2. 基于Seata的2pc

    Seata是由阿里中间件团队发起的开源项目Fescar ,后更名为Seata ,它是一个是开源的分布式事务框架。传统2PC的问题在Seata中得到了解决,它通过对本地关系数据库的分支事务的协调来驱动完成全局事务,是工作在应用层的中间件。主要优点是性能较好,且不长时间占用连接资源,它以高效并且对业务0侵入的方式解决微服务场景下面临的分布式事务问题,它目前提供AT模式(即2PC)及TCC模式的分布式事务解决方案。

    • Transaction Coordinator(TC):事务协调器,它是独立的中间件,需要独立部署运行,它维护全局事务的运行状态,接收TM指令发起全局事务的提交与回滚,负责与RM通信协调各各分支事务的提交或回滚。 相当于是一个软件需要单独部署
    • Transaction Manager (TM):事务管理器, TM需要嵌入应用程序中工作,它负责开启一个全局事务,并最终 向TC发起全局提交或全局回滚的指令。
    • Resource Manager (RM):资源管理器控制分支事务, 负责分支注册、状态汇报,并接收事务协调器TC的指令, 驱动 分支(本地)事务的提交和回滚。
  3. 事务流程如下

    image-20221023214811148

    具体的执行流程如下:

  1. 用户服务的TM向TC申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的XID。
  2. 用户服务的RM向TC注册分支事务,该分支事务在用户服务执行新增用户逻辑,并将其纳入XID对应全局事务的管辖。
  3. 用户服务执行分支事务,向用户表插入一条记录。
  4. 逻辑执行到远程调用积分服务时(XID在微服务调用链路的,上下文中传播)。积分服务的RM向TC注册分支事务,该分支事务执行增加积分的逻辑,并将其纳入XID对应全局事务的管辖。
  5. 积分服务执行分支事务,向积分记录表插入一条记录,执行完毕后,返回用户服务。

Seata 分布式事务:https://blog.csdn.net/u014494148/article/details/105781920

四.注册集成Seata

1.下载

  1. 导入依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    <dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    </dependency>
    <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.1.9</version>
    </dependency>
  2. yml配置

    1
    2
    3
    4
    5
    6
    7
    seata:
    enableAutoDataSourceProxy: false #关闭DataSource代理的自动配置,我们要手动配置
    spring:
    cloud:
    alibaba:
    seata:
    tx-service-group: fsp_tx_group #这里和file.conf中事务组名一样
  3. 拷贝配置

    1.resources/file.conf
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    transport {
    # tcp udt unix-domain-socket
    type = "TCP"
    #NIO NATIVE
    server = "NIO"
    #enable heartbeat
    heartbeat = true
    # the client batch send request enable
    enableClientBatchSendRequest = true
    #thread factory for netty
    threadFactory {
    bossThreadPrefix = "NettyBoss"
    workerThreadPrefix = "NettyServerNIOWorker"
    serverExecutorThread-prefix = "NettyServerBizHandler"
    shareBossWorker = false
    clientSelectorThreadPrefix = "NettyClientSelector"
    clientSelectorThreadSize = 1
    clientWorkerThreadPrefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    bossThreadSize = 1
    #auto default pin or 8
    workerThreadSize = "default"
    }
    shutdown {
    # when destroy server, wait seconds
    wait = 3
    }
    serialization = "seata"
    compressor = "none"
    }
    service {
    #transaction service group mapping
    vgroupMapping.fsp_tx_group = "default"
    #only support when registry.type=file, please don't set multiple addresses
    default.grouplist = "127.0.0.1:8091"
    #degrade, current not support
    enableDegrade = false
    #disable seata
    disableGlobalTransaction = false
    }

    client {
    rm {
    asyncCommitBufferLimit = 10000
    lock {
    retryInterval = 10
    retryTimes = 30
    retryPolicyBranchRollbackOnConflict = true
    }
    reportRetryCount = 5
    tableMetaCheckEnable = false
    reportSuccessEnable = false
    }
    tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
    }
    undo {
    dataValidation = true
    logSerialization = "jackson"
    logTable = "undo_log"
    }
    log {
    exceptionRate = 100
    }
    }
    2.resources/registry.conf
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    registry {
    # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
    type = "file"

    nacos {
    serverAddr = "localhost"
    namespace = ""
    cluster = "default"
    }
    eureka {
    serviceUrl = "http://localhost:8761/eureka"
    application = "default"
    weight = "1"
    }
    redis {
    serverAddr = "localhost:6379"
    db = "0"
    password = ""
    cluster = "default"
    timeout = "0"
    }
    zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
    username = ""
    password = ""
    }
    consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
    }
    etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
    }
    sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
    }
    file {
    name = "file.conf"
    }
    }

    config {
    # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
    type = "file"

    nacos {
    serverAddr = "localhost"
    namespace = ""
    group = "SEATA_GROUP"
    }
    consul {
    serverAddr = "127.0.0.1:8500"
    }
    apollo {
    app.id = "seata-server"
    apollo.meta = "http://192.168.1.204:8801"
    namespace = "application"
    }
    zk {
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
    username = ""
    password = ""
    }
    etcd3 {
    serverAddr = "http://localhost:2379"
    }
    file {
    name = "file.conf"
    }
    }

  4. 排除DataSource自动配置

    @SpringBootApplication(exclude = { DataSourceAutoConfiguration.class})

  5. .MybatisPlus版本

    把DataSource交给Seata代理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    package io.coderyeah.ymcc.config;

    import com.alibaba.druid.pool.DruidDataSource;
    import com.baomidou.mybatisplus.spring.MybatisSqlSessionFactoryBean;
    import io.seata.rm.datasource.DataSourceProxy;
    import org.mybatis.spring.transaction.SpringManagedTransactionFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;
    import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

    import javax.sql.DataSource;

    /**
    * 数据源代理
    */
    @Configuration
    public class DataSourceConfiguration {

    //mapper.xml路径
    @Value("${mybatis-plus.mapper-locations}")
    private String mapperLocations;

    //手动配置bean
    @Bean
    @ConfigurationProperties("spring.datasource")
    public DataSource druidDataSource(){
    return new DruidDataSource();
    }

    @Bean
    public MybatisSqlSessionFactoryBean sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
    //处理MybatisPlus
    MybatisSqlSessionFactoryBean factory = new MybatisSqlSessionFactoryBean();
    factory.setDataSource(dataSourceProxy);
    factory.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
    //事务管理工厂
    factory.setTransactionFactory(new SpringManagedTransactionFactory());
    return factory;
    }

    @Primary
    @Bean("dataSource")
    public DataSourceProxy dataSourceProxy(DataSource druidDataSource) {
    return new DataSourceProxy(druidDataSource);
    }

    }

    Mybatis版本

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    import com.alibaba.druid.pool.DruidDataSource;
    import io.seata.rm.datasource.DataSourceProxy;
    import org.apache.ibatis.session.SqlSessionFactory;
    import org.mybatis.spring.SqlSessionFactoryBean;
    import org.mybatis.spring.transaction.SpringManagedTransactionFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

    import javax.sql.DataSource;

    //使用seata对DataSource进行代理
    @Configuration
    public class DataSourceProxyConfig {

    //mapper.xml路径
    @Value("${mybatis.mapper-locations}")
    private String mapperLocations;

    //手动配置bean
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource(){
    return new DruidDataSource();
    }

    @Bean
    public SqlSessionFactory sessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
    SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean();
    sessionFactoryBean.setDataSource(dataSourceProxy);
    sessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
    //事务管理工厂
    sessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
    return sessionFactoryBean.getObject();
    }

    @Bean
    public DataSourceProxy dataSource() {
    return new DataSourceProxy(druidDataSource());
    }
    }
  6. 业务方法

    方法上贴 : @GlobalTransactional(rollbackFor = Exception.class) 开启Seata全局事务

    2.6.注释事务开启注解

    注意:不能加@EnableTransactionManagement 注解了 , 也不需要加@Transactional

    2.7.undolog表

    数据库中创建表,涉及到事务的表都需要添加undolog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

五.redis分布式缓存

  1. 导入依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
       
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.10.0</version>
    </dependency>
  2. yml配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    spring:
    application:
    name: service-user #服务名
    redis:
    host: 43.136.61.70
    port: 6379
    password: 123456
    database: 0
    lettuce:
    pool:
    max-active: 8
    max-idle: 8
    max-wait: 2000ms
  3. Redis序列化配置

    我们通常以JSON格式将数据存储到Redis中,这种格式是所有编程语言通用的,所以我们可以把Redis的序列化方式配置为JSON ,这样的话我们就可以不用自己去转JSON了.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    //缓存的配置
    @Configuration
    public class RedisConfig {

    @Resource
    private RedisConnectionFactory factory;


    //使用JSON进行序列化
    @Bean
    public RedisTemplate<Object, Object> redisTemplate() {
    RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
    redisTemplate.setConnectionFactory(factory);
    //JSON格式序列化
    GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
    //key的序列化
    redisTemplate.setKeySerializer(genericJackson2JsonRedisSerializer);
    //value的序列化
    redisTemplate.setValueSerializer(genericJackson2JsonRedisSerializer);
    //hash结构key的虚拟化
    redisTemplate.setHashKeySerializer(new StringRedisSerializer());
    //hash结构value的虚拟化
    redisTemplate.setHashValueSerializer(genericJackson2JsonRedisSerializer);
    return redisTemplate;
    }
    }

    举例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    List<CourseType> list = null;
    final Object o = redisTemplate.opsForValue().get(YmccConstants.CACHE_COURSE_TYPE);
    if (null != o) {
    // 从redis中读取返回数据
    list = (List<CourseType>) o;
    System.out.println("-------redis");
    } else {
    list = getCourseTypes();
    // 存入redis
    redisTemplate.opsForValue().set(YmccConstants.CACHE_COURSE_TYPE, list);
    System.out.println("-------mysql");
    }

六.SpringCache缓存

  1. SpringCahce对缓存流程进行了简化封装,提供了一些注解,我们通过简单的打注解就能实现缓存的添加,修改,删除等,注解如下:

    • @Cacheable:触发缓存写入。

    • @CacheEvict:触发缓存清除。

    • @CachePut:更新缓存(不会影响到方法的运行)。

    • @Caching:重新组合要应用于方法的多个缓存操作。

    • @CacheConfig:设置类级别上共享的一些常见缓存设置。

  2. 配置SpringCache

    继承 CachingConfigurerSupport 对SpringCache进行配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    package io.coderyeah.ymcc.config;

    import org.springframework.cache.CacheManager;
    import org.springframework.cache.annotation.CachingConfigurerSupport;
    import org.springframework.cache.interceptor.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.cache.RedisCacheConfiguration;
    import org.springframework.data.redis.cache.RedisCacheManager;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.RedisSerializationContext;

    import javax.annotation.Resource;

    @Configuration
    public class CacheConfig extends CachingConfigurerSupport {

    @Resource
    private RedisConnectionFactory factory;

    /*
    * 自定义生成redis-key , 类名.方法名
    */
    @Override
    @Bean
    public KeyGenerator keyGenerator() {
    return (o, method, objects) -> {
    StringBuilder sb = new StringBuilder();
    sb.append(o.getClass().getName()).append(".");
    sb.append(method.getName()).append(".");
    for (Object obj : objects) {
    sb.append(obj.toString());
    }
    System.out.println("keyGenerator=" + sb.toString());
    return sb.toString();
    };
    }

    @Bean
    @Override
    public CacheResolver cacheResolver() {
    return new SimpleCacheResolver(cacheManager());
    }

    @Bean
    @Override
    public CacheErrorHandler errorHandler() {
    // 用于捕获从Cache中进行CRUD时的异常的回调处理器。
    return new SimpleCacheErrorHandler();
    }
    //缓存管理器
    @Bean
    @Override
    public CacheManager cacheManager() {
    RedisCacheConfiguration cacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
    .disableCachingNullValues() //不允许空值
    .serializeValuesWith(RedisSerializationContext.SerializationPair
    //值使用JSON序列化
    .fromSerializer(new GenericJackson2JsonRedisSerializer()));

    return RedisCacheManager.builder(factory).cacheDefaults(cacheConfiguration).build();
    }
    }
  3. 开启SpringCache

    在启动类注解:@EnableCaching

  4. 特别注意

    缓存注解不能加在内部方法上,比如:方法A调用方法B,给方法B加上缓存注解会失效,因为内部方法调用代理会失效。在A方法上打注解即可。

  5. 添加缓存

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    @Cacheable(cacheNames = YmccConstants.CACHE_COURSE_TYPE, key = "'all'")
    @Override
    public List<CourseType> treeData() {
    /* List<CourseType> list = null;
    final Object o = redisTemplate.opsForValue().get(YmccConstants.CACHE_COURSE_TYPE);
    if (null != o) {
    // 从redis中读取返回数据
    list = (List<CourseType>) o;
    System.out.println("-------redis");
    } else {
    list = getCourseTypes();
    // 存入redis
    redisTemplate.opsForValue().set(YmccConstants.CACHE_COURSE_TYPE, list);
    System.out.println("-------mysql");
    }*/
    log.debug("=============查询了数据库============");
    return getCourseTypes();
    }

    // 从数据库中查询
    private List<CourseType> getCourseTypes() {
    // 查询所有分类
    List<CourseType> courseTypes = selectList(null);
    // 将集合转换为map
    Map<Long, CourseType> map = courseTypes.stream().collect(Collectors.toMap(CourseType::getId, courseType -> courseType));
    // 返回给前端的集合
    List<CourseType> list = new ArrayList<>();
    // 遍历
    courseTypes.forEach(courseType -> {
    if (courseType.getPid() == null || courseType.getPid() == 0) {
    // 顶级
    list.add(courseType);
    } else {
    // 找到父级
    CourseType type = map.get(courseType.getPid());
    if (type != null) {
    type.getChildren().add(courseType);
    }
    }
    });
    return list;
    }
  6. 剔除缓存

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    // 剔除缓存
    @CacheEvict(cacheNames = YmccConstants.CACHE_COURSE_TYPE, key = "'all'")
    @Override
    public boolean insert(CourseType entity) {
    return super.insert(entity);
    }

    @CacheEvict(cacheNames = YmccConstants.CACHE_COURSE_TYPE, key = "'all'")
    @Override
    public boolean deleteById(Serializable id) {
    return super.deleteById(id);
    }

    @CacheEvict(cacheNames = YmccConstants.CACHE_COURSE_TYPE, key = "'all'")
    @Override
    public boolean updateById(CourseType entity) {
    return super.updateById(entity);
    }