目录

Redis - 事务与锁

笔记

优秀的 SQL 数据库都拥有事务,而 Redis 作为 NoSQL 数据库的佼佼者,也有「事务」。

2021-12-26 @Young Kbt

# 事务

# 介绍

Redis 事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

Redis 事务的本质是一组命令的集合。事务支持一次执行多个命令,一个事务中所有命令都会被序列化。在事务执行过程,会按照顺序串行化执行队列中的命令,其他客户端提交的命令请求不会插入到事务执行命令序列中。

Redis 事务的主要作用就是 串联多个命令 防止别的命令插队。

  • Redis 事务没有隔离级别的概念
  • Redis 不保证原子性
  • Redis 事务的三个阶段
    • 开始事务
    • 命令入队
    • 执行事务

# 三大特性

  • 单独的隔离操作

    事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断

  • 没有隔离级别的概念

    队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行

  • 不保证原子性

    执行的过程中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚

# 三大指令

开启事务指令

multi
1

执行事务指定

exec
1

在执行事务前(exec),结束事务指令(理解为手动回滚)

discard
1
  • 从输入 Multi 命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入完成
  • Exec 后,Redis 会将之前的命令队列中的命令依次执行。
  • 组队的过程中可以通过 discard 来放弃组队。

image-20211226170941580

# 案例代码

multi

set k1 v1
set k2 v2
get k1
get k2

exec

# 输出 v1、v2
1
2
3
4
5
6
7
8
9
10

在没有 exec 之前,set 和 get 并没有立即执行,它们仅仅进入了命令队列,等待 exec 命令后再全部执行。

set k1 v1
multi

set k1 v2

discard

get k1
# 输出 v1
1
2
3
4
5
6
7
8
9

使用了 discard,代表取消事务,则事务里的 set k1 v2 没有被执行,所以 k1 值依然是 1。

# 错误处理

  • 组队中某个命令出现了报告错误(multi 中),执行时整个的所有队列都会被取消

    image-20211226171839361

  • 如果执行阶段(exec)某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚

    image-20211226171906051

# 案例图

image-20210408101936847

放弃事务

image-20210408101955501

若在事务队列中存在命令性错误(类似于 Java 编译性错误),则执行 EXEC 命令时,所有命令都不会执行

image-20210408102023204

若在事务队列中存在语法性错误(类似于 Java 的 1/0 的运行时异常),则执行 EXEC 命令时,其他正确命令会被执行,错误命令抛出异常。

image-20210408102051072

# 事务冲突的问题

想想一个场景:有很多人有你的账户,同时去参加双十一抢购

  • 一个请求想给金额减 8000
  • 一个请求想给金额减 5000
  • 一个请求想给金额减 1000

结果如图:

image-20211226172110847

那么如何解决呢?我们需要利用 Redis 的锁机制。

#

# 悲观锁

悲观锁(Pessimistic Lock),顾名思义,就是很悲观,认为这个世界是黑暗的,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会 block 直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。

image-20211226172952960

# 乐观锁

乐观锁(Optimistic Lock),顾名思义,就是很乐观,认为这个世界是光明的,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis 就是利用这种 check-and-set 机制实现事务的。

Redis 使用的是乐观锁。

  • 通过指令(可指定多个),开启乐观锁

    watch key [key] ...
    
    1

    一旦 watch 某个 key,则会一直监视这个 key,如果 key 发生了变化,就返回提示。

    作用:在执行 multi 之前,先执行 watch key1 [key2],可以监视一个(或多个) key ,如果在事务 exec 执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。

    使用场景:很多人同时对一个值进行操作,一旦这个值被修改,且被其他人监听,则其他人无法修改这个值

  • 取消 WATCH 命令对所有 key 的监视

    unwatch key [key] ...
    
    1

    缺点:如果单纯使用 watch,可能导致 key 的值无法完全被修改。

    场景:假设库存有 500 个商品,2000 个人进行秒杀购买(2000 个程序监听商品的 key),假设 1999 人同时购买,其内部程序监听的商品数量为 500,最后一个人却已经购买成功,商品数量变为 499,则前面的事务被打断(监听的 500 数量),导致 1999 人会购买失败,库存还有 499 个商品。

  • 测试

    初始化信用卡可用余额和欠额

    127.0.0.1:6379> set balance 100
    OK
    127.0.0.1:6379> set debt 0
    OK
    
    1
    2
    3
    4

    使用 watch 检测 balance,事务期间 balance 数据未变动,事务执行成功

    127.0.0.1:6379> watch balance
    OK
    127.0.0.1:6379> multi	# 开启事务
    OK
    127.0.0.1:6379> decrby balance 20	# 可用余额 -20
    QUEUED
    127.0.0.1:6379> incrby debt 20	# 欠款 +20
    QUEUED
    127.0.0.1:6379> exec	# 执行事务
    1) (integer) 80
    2) (integer) 20
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

    使用 watch 检测 balance,若事务期间 balance 数据变动,则事务执行失败

    窗口一

    # 窗口一
    127.0.0.1:6379> watch balance	# 监视 balance
    OK
    127.0.0.1:6379> MULTI # 执行完毕后,执行窗口二代码测试
    OK
    127.0.0.1:6379> decrby balance 20
    QUEUED
    127.0.0.1:6379> incrby debt 20
    QUEUED
    127.0.0.1:6379> exec # 修改失败!因为被监视的 balance 值改变
    (nil)
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

    窗口二

    # 窗口二
    127.0.0.1:6379> get balance
    "80"
    127.0.0.1:6379> set balance 200
    OK
    
    1
    2
    3
    4
    5

    窗口一:出现问题后放弃监视,然后重来

    127.0.0.1:6379> UNWATCH # 放弃监视,这是取消所有的监视
    OK
    127.0.0.1:6379> watch balance	# 监视
    OK
    127.0.0.1:6379> MULTI	# 事务
    OK
    127.0.0.1:6379> decrby balance 20
    QUEUED
    127.0.0.1:6379> incrby debt 20
    QUEUED
    127.0.0.1:6379> exec # 成功
    1) (integer) 180
    2) (integer) 40
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13

    说明:

    一但执行 exec 指令或 descard 指令,无论事务是否执行成功,watch 指令对变量的监控都将被取消。

    故当事务执行失败后,需重新执行 watch 命令对变量进行监控,并开启新的事务进行操作。

# 指令总结

Redis 事务相关指令:

序号 命令及描述 描述
1 DISCARD 取消事务,放弃执行事务块内的所有命令
2 EXEC 执行所有事务块内的命令
3 MULTI 标记一个事务块的开始
4 UNWATCH 取消 WATCH 命令对所有 key 的监视
5 WATCH key [key ...] 监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。类似乐观锁

# 秒杀案例

# 环境准备

首先创建一个 Spring Boot 项目,然后添加依赖:

<dependencies>
    <!-- redis普通依赖包 -->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>3.2.0</version>
    </dependency>

    <!-- spring boot + redis 整合   -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
    <!-- redis   -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 配置类

配置类我在 Redis - Java 整合 也提供了,如下:

@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
   
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new
                Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setConnectionFactory(factory);
        //key 序列化方式
        template.setKeySerializer(redisSerializer);
        //value 序列化
        template.setValueSerializer(jackson2JsonRedisSerializer);
        //value hashmap 序列化
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        return template;
    }

    @Bean
    public CacheManager cacheManager(RedisConnectionFactory factory) {
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        //解决查询缓存转换异常的问题
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // 配置序列化(解决乱码的问题),过期时间 600 秒
        RedisCacheConfiguration config =
                RedisCacheConfiguration.defaultCacheConfig()
                        .entryTtl(Duration.ofSeconds(600))
                        .serializeKeysWith(RedisSerializationContext.SerializationPair.
                                fromSerializer(redisSerializer))
                        .serializeValuesWith(RedisSerializationContext.SerializationPair
                                .fromSerializer(jackson2JsonRedisSerializer))
                        .disableCachingNullValues();
        RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
                .cacheDefaults(config)
                .build();
        return cacheManager;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

为什么要写配置类呢,因为自带的兼容性不好,我使用自带了报错,所以自己重写配置,覆盖官方自带的。

# 连接池

如果还是用 Jedis 自带的连接方式,那么容易出现超时问题,自带的连接方式:Jedis jedis = new Jedis("192.168.199.27",6379);

我们需要使用连接池来连接 Redis,防止出现超时问题

public class JedisPoolUtils {
    
    private static volatile JedisPool jedisPool = null;

    private JedisPoolUtils() {
    }
    
    public static JedisPool getJedisPoolInstance(){
        if(null == jedisPool){
            synchronized (JedisPoolUtils.class){
                if(null == jedisPool){
                    JedisPoolConfig poolConfig = new JedisPoolConfig();
                    // 一个 pool 可分配多少个 jedis 实例
                    poolConfig.setMaxTotal(200);
                    // 一个 pool 最多有多少个状态为 idle(空闲)的 jedis 实例
                    poolConfig.setMaxIdle(32);
                    // 表示当 borrow 一个 jedis 实例时,最大的等待毫秒数
                    poolConfig.setMaxWaitMillis(100 * 1000);
                    poolConfig.setBlockWhenExhausted(true);
                    poolConfig.setTestOnBorrow(true);
                    
                    jedisPool = new JedisPool(poolConfig, "192.168.199.27", 6379, 60000);
                }
            }
        }
        return jedisPool;
    }
    
    public static void release(JedisPool jedisPool, Jedis jedis){
        if(null != jedis){
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

连接池参数:

  • MaxTotal:控制一个 pool 可分配多少个 jedis 实例,通过 pool.getResource() 来 获取;如果赋值为 -1,则表示不限制;如果 pool 已经分配了 MaxTotal 个 jedis 实例,则此时 pool 的状态为 exhausted

  • maxIdle:控制一个 pool 最多有多少个状态为 idle(空闲)的 jedis 实例

  • MaxWaitMillis:表示当 borrow 一个 jedis 实例时,最大的等待毫秒数,如果超过等待时间,则直接抛 JedisConnectionException

# 秒杀版本一

这只是一个简单的 demo。重点:

  • 31 行的 watch,进行监听

  • 54 - 32 行的 multi 和 exce 的事务操作

@RestController
public class SecKill {

    @RequestMapping("/secKill")
    public boolean secKill(){
        Random random = new Random();
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 6; i++) {
            int code = random.nextInt(10);
            sb.append(code);
        }

        return doSecKill(sb.toString(), "123");
    }
    
    public boolean doSecKill(String uid,String productId){
        
        if(uid == null || productId == null){
            return false;
        }
        
        // Jedis jedis = new Jedis("192.168.199.27",6379);
        
        // 通过连接池解决连接超时问题
        Jedis jedis = JedisPoolUtils.getJedisPoolInstance().getResource();
        // 秒杀的商品 Key
        String kcKey = "sk:" + productId + ":qt";
        // 秒杀的用户 Key
        String userKey = "sk:" + productId + ":user";
        
        jedis.watch(kcKey);
        
        String kc = jedis.get(kcKey);
        // 库存为 null
        if(kc == null){
            System.out.println("秒杀没有开始");
            jedis.close();
            return false;
        }
        
        // 库存卖完
        if(Integer.parseInt(kc) <= 0){
            System.out.println("秒杀已经结束");
            jedis.close();
            return false;
        }
        
        // 秒杀过程(出现库存为负情况)
        // jedis.decr(kcKey);
        // 添加秒杀的用户
        // jedis.sadd(userKey, uid);
        
        // 秒杀过程(通过乐观锁解决库存可能为负情况),但出现库存遗留问题
        Transaction multi = jedis.multi();
        multi.decr(kcKey);
        multi.sadd(userKey, uid);
        List<Object> exec = multi.exec();
        if(exec == null || exec.size() == 0){
            System.out.println("秒杀失败");
            jedis.close();
            return false;
        }

        System.out.println("秒杀成功");
        jedis.close();
        return true;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

这个版本有很大的问题,上面的程序无法解决库存遗留问题,使用 LUA 脚本可解决(具体还有其他)。

原因:31 行代码开启了 watch。假设库存有 500 个商品,2000 个人进行秒杀购买(2000 个程序监听商品的 key),假设 1999 人同时购买,其内部程序监听的商品数量为 500,最后一个人却抢先购买成功,商品数量变为 499,则前面的事务被打断(监听的 500 数量),即 1999 人会购买失败,导致库存还有 499 个商品。

# 秒杀版本二

该版本利用 lua 语言,解决了库存遗留问题。

**什么是库存遗留问题?**即系统告诉用户已经秒光,可是还有库存。原因:就是乐观锁导致很多请求都失败,先点的没秒到,后点的可能秒到了。

通过 lua 脚本解决争抢问题,实际上是 Redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题。

@RestController
public class SecKillByScript {
    
    static String secKillScript = "local userid=KEYS[1]; \n" +
            "local prodid=KEYS[2];\n" +
            "local qtkey=\"sk:\"..prodid..\":qt\";\n" +
            "local usersKey=\"sk:\"..prodid..\":user\";\n" +
            "local userExists=redis.call(\"sismember\",usersKey,userid);\n" +
            "if tonumber(userExists)==1 then \n" +
            " return 2;\n" +
            "end\n" +
            "local num= redis.call(\"get\" ,qtkey);\n" +
            "if tonumber(num)<=0 then \n" +
            " return 0; \n" +
            "else \n" +
            " redis.call(\"decr\",qtkey);\n" +
            "redis.call(\"sadd\",usersKey,userid);\n" +
            "end\n" +
            "return 1";

    @RequestMapping("/secKillByScript")
    public boolean secKill(){
        Random random = new Random();
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 6; i++) {
            int code = random.nextInt(10);
            sb.append(code);
        }

        return doSecKill(sb.toString(), "123");
    }

    @RequestMapping("/doSecKillByScript")
    public boolean doSecKill(String userid,String productId){
        Jedis jedis = JedisPoolUtils.getJedisPoolInstance().getResource();
        
        String shal = jedis.scriptLoad(secKillScript);
        Object evalsha = jedis.evalsha(shal, 2, userid, productId);

        String value = String.valueOf(evalsha);
        
        if("0".equals(value)){
            System.out.println("已抢空");
        }else if ("1".equals(value)){
            System.out.println("抢购成功");
        }else if("2".equals(value)){
            System.out.println("该用户已经抢过了");
        }else {
            System.out.println("抢购异常");
        }
        
        jedis.close();
        return true;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

# 测试

利用能并发的工具访问 /secKillByScript 即可。

这里使用 ab 工具,首先安装它

yum install -y httpd-tools
1

安装完后,在某个目录创建一个文件,模拟表单提交参数

vim postfile
1

添加内容:(以 & 符号结尾)

prodid=0101&
1

启动测试:

ab -n 2000 -c 200 -k -p ~/postfile -T application/x-www-form-urlencoded http://192.168.199.27:8080/secKillByScript
1

192.168.199.27 是你本地的 IP 地址,因为是从 Linux 系统访问本机的项目。

更新时间: 2024/01/17, 05:48:13
最近更新
01
JVM调优
12-10
02
jenkins
12-10
03
Arthas
12-10
更多文章>