如何使用Redis解决高并发

NoSQL

Not Only SQL的简称。NoSQL是解决传统的RDBMS在应对某些问题时比较乏力而提出的。

即非关系型数据库,它们不保证关系数据的ACID特性,数据之间一般没有关联,在扩展上就非常容易实现,并且拥有较高的性能。

Redis

redis是nosql的典型代表,也是目前互联网公司的必用技术。

Redis主要采用哈希表来实现键值对存储。大多数时候是直接以缓存的形式被使用,使得请求不直接访问到磁盘,所以效率方面是很不错的,完全能满足中小型企业的使用需求。

常用数据类型

  • 字符串string

  • 散列hash

  • 列表list

  • 集合sets

  • 有序集合sort set

使用频率上string和hash会高一些,各个类型有各自的操作命令,无非增删改查,具体的命令后面我会整理一份。

痛点

web应用在众多请求同时发生时,可能会导致数据读取、存储上出现错误,即发生脏读、脏数据生成。

在分布式项目下,会出现更多的问题。

思路

并发时,本质其实就是多个请求同时进来了,没办法正确的去进行处理。

可以将所有的请求放在 一个队列,让请求们按照一个顺序,挨个进来执行业务逻辑。使用消息队列是目前可行的解决方案,我将在下次整理一篇关于如何处理高并发的消息队列文章

还有一个方法是直接将并行转为串行,Java提供了synchronized,即同步,不过这个在效率要求比较苛刻的地方 或者 分布式项目下还是不太合适的方案,这里就引出了使用redis来实现分布式锁,从而解决并发问题。

分布式锁

在分布式项目中,使用一个唯一、通用、效率高的标识,来表示上锁和解锁。

redis实现起来很简单,即对一个key是否存在来表示是否上锁、是否解锁。

以string类型举例:

Integer stock = goodsMapper.getStock();
if (stock > 0) {
    stock =- 1;
    goodsMapper.updateStock(stock);
}

以上是最简单的秒杀伪代码,我们尝试用redis实现分布式锁。

// 这里是错误代码,只是一个思考过程,请耐心看完哦
String key = "REDIS_DISTRIBUTION_LOCKER"; // 分布式锁名称
String value = jedisUtils.get(key);
if (value != null) { // 未上锁
    // wingzingliu
    jedisUtils.set(key, 1); // 上锁
    Integer stock = goodsMapper.getStock();
    if (stock > 0) {
        stock =- 1;
        goodsMapper.updateStock(stock);
        jedisUtils.del(key); // 释放锁
    }
}

以上代码可能会出现一个问题,就是当同时多个请求进来,某次多个请求都拿到value为空,线程A进入if 走到// wingzingliu这里的时候,还未上锁,其他请求也进来了,这样就会出现脏数据了。

这里的代码问题就是出在没有考虑原子性问题。

所以我们要使用到redis的一个setNx命令,本质也是设置值,但是这是一个原子操作,执行之后会返回是否设置成功。

redis> SETNX job "programmer"    # job 设置成功
(integer) 1
 
redis> SETNX job "code-farmer"   # 尝试覆盖 job ,失败
(integer) 0
 
redis> GET job                   # 没有被覆盖
"programmer"

重点关注 当有值时,会失败,返回0。所以我们的代码会改造成以下这个样子。

// 这里是错误代码,只是一个思考过程,请耐心看完哦
String key = "REDIS_DISTRIBUTION_LOCKER"; // 分布式锁名称
Long result = jedisUtils.setNx(key, 1);
if (result > 0) { // 上锁成功,进入逻辑
    // wingzingliu1
    Integer stock = goodsMapper.getStock();
    if (stock > 0) {
        stock =- 1;
        goodsMapper.updateStock(stock);
 
        System.out.println("购买成功!");
    } else {
        System.out.println("没有库存了!");
    }
    // wingzingliu2
    jedisUtils.del(key); // 释放锁
}

以上我们就可以保证原子性,能正确的按照顺序去处理。

可是还有一个隐藏的问题,就是当某个线程执行上锁成功后,在wingzingliu1到wingzingliu2之间时,程序抛异常了,那么程序终止了,就无法释放锁,其他线程也都进不来了。

解决方案是加上try catch finally块,在finally里面去释放锁。

可是那如果是宕机呢?上锁之后宕机了,finally里面的依然不会执行,锁没有得到释放,不手动处理的情况下,以后所有线程也无法进入。

所以引入了redis的过期时间,到了某个时间自动解锁。

// 这里是不够完善的代码,请耐心看完哦
try {
    String key = "REDIS_DISTRIBUTION_LOCKER"; // 分布式锁名称
    Long result = jedisUtils.setNx(key, 1, 30); // 假设处理逻辑需要20s左右,设置了30秒自动过期
    if (result > 0) { // 上锁成功,进入逻辑
        Integer stock = goodsMapper.getStock();
        if (stock > 0) {
            stock =- 1;
            goodsMapper.updateStock(stock);
 
            System.out.println("购买成功!");
        } else {
            System.out.println("没有库存了!");
        }
    }
} catch (Exception e) {
    
} finally {
    jedisUtils.del(key); // 释放锁
}

以上是比较完善的分布式锁了,但是还有一个小瑕疵,就是假设某一次请求A处理的很慢,预计20s但是跑了35s,到了30s的时候锁过期了,其他请求就自然进来了。

这不仅会导致一次并发执行,而且在请求A处理完后,仍会继续执行释放锁操作,从而实际上将锁交给了下一个线程。以此类推,整个并发控制就乱了。

理论上可以设置一个更大的key过期时间,但是并不是最好的解决方案。这里就引出一个概念:锁续命。

锁续命

如其名,给锁续命。实现就是 当锁快过期的时候,去延长锁的时间。假设使用一个30秒的锁,每10秒进行一次检测以确认锁是否仍然存在。如果锁依然存在,则将锁继续保持在30秒。这样就避免掉了上面的这个可能出现的问题。

这里使用一个定时任务,周期性的调用即可。

扩展

刚刚对key设置的value是1,其实能使用请求ID来进行保存,这样就能知道锁是由哪个请求上的,在解锁的时候 也可以避免解锁了其他线程上的锁。具体由前端传递,或者由服务端以某种规则生成都可以。

以上就是如何使用Redis解决高并发的详细内容,更多请关注www.sxiaw.com其它相关文章!