跳至主要內容

SpringBoot集成Redis

sixkey大约 9 分钟后端SpringBootRedis实战

SpringBoot集成Redis

整合主要讲解redis缓存更新策略、缓存三大问题解决方案以及使用Lua脚本解决幂等问题

整合

导入依赖

<!--redis-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    <!--线程池pool-->
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-pool2</artifactId>
    </dependency>

yml配置文件

spring:
  redis:
    host: localhost
    port: 6379
    database: 0
#连接超时时间
    connect-timeout: 1800000
    lettuce:
      pool:
#连接池最大连接数(使用负值表示没有限制)
        max-active: 20
#最大阻塞等待时间(负数表示没限制)
        max-wait: -1
#连接池中的最大空闲连接
        max-idle: 5
#连接池中的最小空闲连接
        min-idle: 0

Key Value 序列化

package redis.config;

import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * redis序列化
 */
@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {

    @Bean
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();

        //默认的Key序列化器为:JdkSerializationRedisSerializer
        redisTemplate.setKeySerializer(new StringRedisSerializer()); // key序列化
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); // value序列化

        redisTemplate.setConnectionFactory(connectionFactory);
        return redisTemplate;
    }
}

缓存

缓存更新策略

内存淘汰超时剔除主动更新
说明不用自己维护,利用Redis的内存淘汰机制,当内存不足时自动淘汰部分数据。下次查询时更新缓存。给缓存数据添加TTL时间,到期后自动删除缓存。下次查询时更新缓存。编写业务逻辑,在修改数据库的同时,更新缓存。
一致性一般
维护成本

业务场景

  • 低一致性需求:使用内存淘汰机制。
  • 高一致性需求:主动更新,并以超时剔除作为兜底方案。

三个问题需要考虑:

(1)删除缓存还是更新缓存?

  • 更新缓存:每次更新数据库都更新缓存,无效写操作较多
  • 删除缓存:更新数据库时让缓存失效,查询时再更新缓存(建议)

(2)如何保证缓存与数据库的操作的同时成功或失败?

  • 单体系统:将缓存与数据库操作放在一个事务
  • 分布式系统:利用TCC等分布式事务方案

(3)先操作缓存还是先操作数据库?

  • 先删除缓存,再操作数据库
  • 先操作数据库,再删除缓存(建议)

缓存三大问题解决方案

(1)、缓存穿透

(2)、缓存击穿

(3)、缓存雪崩

缓存穿透

简单理解:缓存穿透是指客户端请求的数据在缓存中和数据库都不存在,这样缓存永远不会生效,这些请求都会打到数据库。

8f30241f4a924a734157ba22a51bc72f.png
8f30241f4a924a734157ba22a51bc72f.png

解决方案

(1)缓存空对象

  • 优点:实现简单,维护方便
  • 缺点:
    • 额外的内存消耗
    • 可能造成短期的数据不一致
2ed2ad626e4cf23bb5de7a205d98b513.png
2ed2ad626e4cf23bb5de7a205d98b513.png

(2)布隆过滤器

  • 优点:内存占用较少,没有多余key
  • 缺点:
    • 实现复杂
    • 存在误判可能
3765c356f0617e06ec5e94b65e44dca0.png
3765c356f0617e06ec5e94b65e44dca0.png

缓存空对象代码片段

//如果数据中不存在则缓存空对象返回,两分钟后过期
redisTemplate.opsForValue().set(GoodsConstant.GOODS + userId,"",RedisConstant.CACHE_NULL_EXPIRE, TimeUnit.MINUTES);

其他解决方案:

  • 增强id的复杂度,避免被猜测id规律
  • 做好数据的基础格式校验
  • 加强用户权限校验
  • 做好热点参数的限流

缓存雪崩

简单理解:缓存雪崩是指同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。

840e066d60bc1a94ddadea5a3cc1a817.png

解决方案

  • 给不同的key的TTL添加随机值
  • 利用Redis集群提高服务的可用性
  • 给缓存业务添加降级限流策略
  • 给业务添加多级缓存

缓存击穿

简单理解:缓存击穿问题 也叫热点key问题,即使也给被高并发访问并且缓存重建页较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

31e844250335c6063d24e88449660556.png
31e844250335c6063d24e88449660556.png

解决方案

  • 互斥锁
  • 逻辑过期
解决方案优点缺点
互斥锁(1)没有额外的内存消耗。(2)保证一致性。(3)实现简单(1)线程需要等待,性能受影响。(2)可能有死锁风险
逻辑过期线程不需要等待,性能较好。(1)有额外的内存消耗。(2)不保证一致性。(3)实现复杂

互斥锁

61b82942c04d801283e35bb2dee7a490.png

一下是一个简单的使用互斥锁解决的流程

397072438d11e859412b812ed4856a3a.png

获取互斥锁方法

/**
     * 获取锁方法
     * @param key
     */
    private boolean tryLock(String key,Integer userId){
        //获取锁,要对锁设置一个过期时间,防止某些异常导致死锁
        Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, userId, RedisConstant.CACHE_LOCK_EXPIRE, TimeUnit.SECONDS);
        //如果为true则说明获取到了锁,若返回false则说明前面已经有线程获取到了锁
        return BooleanUtil.isTrue(flag);
    }

释放互斥锁方法

/**
     * 释放锁
     * @param key
     */
    private void unLock(String key){
        //直接删除锁的键即可
        redisTemplate.delete(key);
    }

核心代码

@Override
    public List<GoodsDto> getList(Integer userId) {
        //先去redis中查询是否有缓存的数据
        List<GoodsDto> GoodsDtoList = (List<GoodsDto>) redisTemplate.opsForValue().get(GoodsConstant.GOODS + userId);
        if(!CollectionUtils.isEmpty(GoodsDtoList)){
            log.info("命中了redis中的商品缓存数据,成功进入缓存---------------");
            return GoodsDtoList;
        }else{
            //未命中redis中的缓存数据
            String lockKey = RedisConstant.GOODS_LOCK + userId;
            //尝试获取锁
            try {
                boolean isLock = this.tryLock(lockKey, userId);
                if(!isLock){
                    //未获取到锁,休眠一段时间等待前一个线程重建缓存
                    Thread.sleep(RedisConstant.THREAD_SLEEP);
                    //重新获取,递归
                    return getList(userId);
                }
                //获取到了锁
                //再去redis中查询是否有缓存的数据
                List<GoodsDto> GoodsList = (List<GoodsDto>) redisTemplate.opsForValue().get(GoodsConstant.GOODS + userId);
                if(!CollectionUtils.isEmpty(GoodsList)){
                    log.info("命中了双重检测redis中的商品缓存数据,成功进入缓存---------------");
                    return GoodsList;
                }
                //根据商户id获取商户所属的店铺
                QueryWrapper<Store> wrapper = new QueryWrapper<>();
                wrapper.eq("user_id",userId);
                List<Store> stores = storeMapper.selectList(wrapper);
                if(CollectionUtils.isEmpty(stores)){
                    //如果数据中不存在则缓存空对象返回来解决redis缓存穿透,两分钟后过期
                    List<UserDto> dtoList = new ArrayList<>();
                    dtoList.add(new UserDto());
                    redisTemplate.opsForValue().set(GoodsConstant.GOODS + userId,dtoList,RedisConstant.CACHE_NULL_EXPIRE, TimeUnit.MINUTES);
                }else{
                    //进行缓存重建
                    List<GoodsDto> goodsDtoList = this.getGoodsDtoList(stores,userId);
                    return goodsDtoList;
                }
            }catch (Exception e){
                throw new ServiceException(ResponseEnum.ERROR);
            }finally {
                //释放锁
                this.unLock(lockKey);
            }
        }
        return null;
    }

逻辑过期

4a8016d4ea953c618d68b72cb2a3fa99.png

防重复提交(解决幂等)

使用redis + lua脚本保证原子性操作

e4f67e118c2bc9782088eba09296a61c.png
e4f67e118c2bc9782088eba09296a61c.png

lua脚本

if(redis.call('get', KEYS[1]) == ARGV[1]) then
return redis.call('del', KEYS[1])
else return 0 end

核心代码

private static final DefaultRedisScript<Boolean> REDIS_SCRIPT;

    /**
     * 静态代码块中初始化lua脚本
     */
    static {
        REDIS_SCRIPT = new DefaultRedisScript<Boolean>();
        //读取项目中的lua文件
        REDIS_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        REDIS_SCRIPT.setResultType(Boolean.class);
    }

    @Override
    public void saveOrderInfo(OrderDto orderDto,String orderId) {
        //TODO: 需要做防止重复提交和锁定库存操作
        if(StringUtils.isEmpty(orderId)){
            throw new ServiceException(ResponseEnum.ORDER_ID_EXCEPTION);
        }
        //lua脚本保证原子性操作
        //拿着orderId到redis中查询
        //如果redis中有相同orderId,则说明是正常提交订单,把redis中的orderId删除,
        // 这个过程要保证原子性操作,由lua脚本保证。
        Boolean flag = (Boolean) redisTemplate.execute(REDIS_SCRIPT, Arrays.asList(RedisConstant.ORDER_ID + orderId), orderId);
        //如果redis中没有相同orderId,则说明是重复提交订单,不再往下进行
        if(!flag){
            //返回false则说明是重复提交
            throw new ServiceException(ResponseEnum.DO_NOT_REPLACE_SUBMIT);
        } 
    }
}

redisson实现分布式锁

分布式锁实现库存锁定,解决商品超卖问题

maven依赖

<dependency>
      <groupId>org.redisson</groupId>
      <artifactId>redisson</artifactId>
      <version>3.11.2</version>
    </dependency>

redisson配置类

@Data
@Configuration
@ConfigurationProperties("spring.redis")
public class RedissonConfig {

    private String host;

    private String addresses;

    private String password;

    private String port;

    private int timeout = 3000;
    private int connectionPoolSize = 64;
    private int connectionMinimumIdleSize=10;
    private int pingConnectionInterval = 60000;
    private static String ADDRESS_PREFIX = "redis://";

    /**
     * 自动装配
     *
     */
    @Bean
    RedissonClient redissonSingle() {
        Config config = new Config();
        //  判断redis 的host是否为空
        if(StringUtils.isEmpty(host)){
            throw new RuntimeException("host is  empty");
        }
        //  配置host,port等参数
        SingleServerConfig serverConfig = config.useSingleServer()
                //redis://127.0.0.1:7181
                .setAddress(ADDRESS_PREFIX + this.host + ":" + port)
                .setTimeout(this.timeout)
                .setPingConnectionInterval(pingConnectionInterval)
                .setConnectionPoolSize(this.connectionPoolSize)
                .setConnectionMinimumIdleSize(this.connectionMinimumIdleSize);
        //  判断进入redis 是否密码
        if(!StringUtils.isEmpty(this.password)) {
            serverConfig.setPassword(this.password);
        }
        return Redisson.create(config);
    }
}

实战

接口逻辑:前端跳转到支付页面时获取订单id,在获取订单id的同时锁定商品库存。若订单中的商品全部锁定成功则返回订单id;若有一个商品锁定失败则将锁定的商品库存全部解锁,并返回商品库存锁定失败提示。

核心代码如下

/**
 * 商品锁定实体
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class GoodsStockLockVo implements Serializable {
    /**
     * 商品id
     */
    private Integer goodsId;
    /**
     * 锁定库存量
     */
    private Integer numbers;
    /**
     * 是否成功锁定库存标志
     */
    private boolean isLock;
}

getOrderId()获取订单号方法

@Override
    @Transactional(rollbackFor = Exception.class)
    public String getOrderId(List<Integer> cartIds,Integer userId) {
        //生成一个唯一的订单id
        String orderId = IdUtil.getSnowflakeNextIdStr();
        redisTemplate.opsForValue().set(RedisConstant.ORDER_ID + orderId,orderId,RedisConstant.ORDER_CACHE_EXPIRE,TimeUnit.MINUTES);
        //获取订单中的所有商品数据
        //将所有商品数据转换为商品锁定实体
        if(!CollectionUtils.isEmpty(cartIds)){
            List<GoodsStockLockVo> stockLockVos = new ArrayList<GoodsStockLockVo>(cartIds.size());
            cartIds.stream().forEach(id -> {
                GoodsStockLockVo goodsStockLockVo = new GoodsStockLockVo();
                Cart cart = baseMapper.selectById(id);
                Goods goods = goodsMapper.selectById(cart.getGoodsId());
                goodsStockLockVo.setGoodsId(goods.getId());
                goodsStockLockVo.setNumbers(cart.getNumber());
                stockLockVos.add(goodsStockLockVo);
            });
            //验证和锁定库存
            boolean isLockSuccess = this.checkAndLock(stockLockVos, orderId);
            if(!isLockSuccess){
                //库存锁定失败
                throw new ServiceException(ResponseEnum.GOODS_STOCK_FAIL);
            }
            //将支付表的状态设置为待付款
            asyncService.savePayInfo(orderId, userId);
            //保存订单和购物车记录
            asyncService.saveOrderCart(orderId, userId,cartIds);
            //库存锁定成功返回订单id
            return orderId;
        }
        return null;
    }

checkAndLock()商品库存是否锁定成功方法

/**
     * 验证和锁定库存
     * @param StockLockList
     * @param orderId
     * @return
     */
    private boolean checkAndLock(List<GoodsStockLockVo> StockLockList,String orderId) {
        //判空
        if(CollectionUtils.isEmpty(StockLockList)){
            throw new ServiceException(ResponseEnum.GOODS_STOCK_FAIL);
        }
        //遍历每一个商品,验证并锁定库存,具备原子性
        StockLockList.stream().forEach(item -> {
            this.checkLock(item);
        });

        //只要有一个商品库存锁定失败,所有锁定成功的商品都解锁
        boolean flag = StockLockList.stream().anyMatch(item -> !item.isLock());
        if(flag){
            //所有锁定成功的商品都解锁
            StockLockList.stream().filter(GoodsStockLockVo::isLock)
                    .forEach(stockLockVo -> {
                        //解锁库存
                        goodsMapper.unLockStock(stockLockVo.getGoodsId(),stockLockVo.getNumbers());
                    });
            //返回失败状态
            return false;
        }
        //如果所有商品都锁定成功,则缓存锁定库存数据,方便后续解锁和减库存
        //TODO: 缓存设置时间为30
        redisTemplate.opsForValue().set(RedisConstant.GOODS_STOCK_LOCK + orderId,StockLockList);
        return true;
    }

checkLock()锁定库存方法

/**
     * 锁定库存逻辑方法
     * @param stockLockVo
     * @return
     */
    private void checkLock(GoodsStockLockVo stockLockVo) {
        //获取锁
        //公平锁
        RLock rLock = this.redissonClient.getFairLock(RedisConstant.STOCK_LOCK + stockLockVo.getGoodsId());
        //上锁
        rLock.lock();
        try {
            //验证库存
            Goods goods = goodsMapper.checkStock(stockLockVo.getGoodsId(),stockLockVo.getNumbers());
            if(null == goods){
                //说明库存不足,锁定失败
                stockLockVo.setLock(false);
                return;
            }
            //锁定库存
            //就是一个更新操作
            Integer rows = goodsMapper.lockStock(stockLockVo.getGoodsId(),stockLockVo.getNumbers());
            if(rows >= 1){
                //成功锁定
                stockLockVo.setLock(true);
            }
        }finally {
            //解锁
            rLock.unlock();
        }
    }

dao操作

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.qs.mapper.GoodsMapper">

    <resultMap id="goodsInfoMap" type="com.qs.pojo.entity.Goods" autoMapping="true" />

    <!--验证库存-->
    <select id="checkStock" resultMap="goodsInfoMap">
        select * from qs_goods
        where id = #{goodsId} and number - lock_stock > #{numbers} for update
    </select>

    <!--锁定库存-->
    <update id="lockStock">
        update qs_goods
        set lock_stock = lock_stock + #{numbers}
        where id = #{goodsId}
    </update>

    <!--解锁库存-->
    <update id="unLockStock">
        update qs_goods
        set lock_stock = lock_stock - #{numbers}
        where id = #{goodsId}
    </update>
</mapper>