SpringBoot集成Redis
大约 9 分钟
SpringBoot集成Redis
整合主要讲解redis缓存更新策略、缓存三大问题解决方案以及使用Lua脚本解决幂等问题
整合
导入依赖
<!--redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--线程池pool-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
yml配置文件
spring:
redis:
host: localhost
port: 6379
database: 0
#连接超时时间
connect-timeout: 1800000
lettuce:
pool:
#连接池最大连接数(使用负值表示没有限制)
max-active: 20
#最大阻塞等待时间(负数表示没限制)
max-wait: -1
#连接池中的最大空闲连接
max-idle: 5
#连接池中的最小空闲连接
min-idle: 0
Key Value 序列化
package redis.config;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* redis序列化
*/
@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
//默认的Key序列化器为:JdkSerializationRedisSerializer
redisTemplate.setKeySerializer(new StringRedisSerializer()); // key序列化
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); // value序列化
redisTemplate.setConnectionFactory(connectionFactory);
return redisTemplate;
}
}
缓存
缓存更新策略
内存淘汰 | 超时剔除 | 主动更新 | |
---|---|---|---|
说明 | 不用自己维护,利用Redis的内存淘汰机制,当内存不足时自动淘汰部分数据。下次查询时更新缓存。 | 给缓存数据添加TTL时间,到期后自动删除缓存。下次查询时更新缓存。 | 编写业务逻辑,在修改数据库的同时,更新缓存。 |
一致性 | 差 | 一般 | 好 |
维护成本 | 无 | 低 | 高 |
业务场景
- 低一致性需求:使用内存淘汰机制。
- 高一致性需求:主动更新,并以超时剔除作为兜底方案。
三个问题需要考虑:
(1)删除缓存还是更新缓存?
- 更新缓存:每次更新数据库都更新缓存,无效写操作较多
- 删除缓存:更新数据库时让缓存失效,查询时再更新缓存(建议)
(2)如何保证缓存与数据库的操作的同时成功或失败?
- 单体系统:将缓存与数据库操作放在一个事务
- 分布式系统:利用TCC等分布式事务方案
(3)先操作缓存还是先操作数据库?
- 先删除缓存,再操作数据库
- 先操作数据库,再删除缓存(建议)
缓存三大问题解决方案
(1)、缓存穿透
(2)、缓存击穿
(3)、缓存雪崩
缓存穿透
简单理解:缓存穿透是指客户端请求的数据在缓存中和数据库都不存在,这样缓存永远不会生效,这些请求都会打到数据库。

解决方案
(1)缓存空对象
- 优点:实现简单,维护方便
- 缺点:
- 额外的内存消耗
- 可能造成短期的数据不一致

(2)布隆过滤器
- 优点:内存占用较少,没有多余key
- 缺点:
- 实现复杂
- 存在误判可能

缓存空对象代码片段
//如果数据中不存在则缓存空对象返回,两分钟后过期
redisTemplate.opsForValue().set(GoodsConstant.GOODS + userId,"",RedisConstant.CACHE_NULL_EXPIRE, TimeUnit.MINUTES);
其他解决方案:
- 增强id的复杂度,避免被猜测id规律
- 做好数据的基础格式校验
- 加强用户权限校验
- 做好热点参数的限流
缓存雪崩
简单理解:缓存雪崩是指同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。

解决方案
- 给不同的key的TTL添加随机值
- 利用Redis集群提高服务的可用性
- 给缓存业务添加降级限流策略
- 给业务添加多级缓存
缓存击穿
简单理解:缓存击穿问题 也叫热点key问题,即使也给被高并发访问并且缓存重建页较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

解决方案
- 互斥锁
- 逻辑过期
解决方案 | 优点 | 缺点 |
---|---|---|
互斥锁 | (1)没有额外的内存消耗。(2)保证一致性。(3)实现简单 | (1)线程需要等待,性能受影响。(2)可能有死锁风险 |
逻辑过期 | 线程不需要等待,性能较好。 | (1)有额外的内存消耗。(2)不保证一致性。(3)实现复杂 |
互斥锁

一下是一个简单的使用互斥锁解决的流程

获取互斥锁方法
/**
* 获取锁方法
* @param key
*/
private boolean tryLock(String key,Integer userId){
//获取锁,要对锁设置一个过期时间,防止某些异常导致死锁
Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, userId, RedisConstant.CACHE_LOCK_EXPIRE, TimeUnit.SECONDS);
//如果为true则说明获取到了锁,若返回false则说明前面已经有线程获取到了锁
return BooleanUtil.isTrue(flag);
}
释放互斥锁方法
/**
* 释放锁
* @param key
*/
private void unLock(String key){
//直接删除锁的键即可
redisTemplate.delete(key);
}
核心代码
@Override
public List<GoodsDto> getList(Integer userId) {
//先去redis中查询是否有缓存的数据
List<GoodsDto> GoodsDtoList = (List<GoodsDto>) redisTemplate.opsForValue().get(GoodsConstant.GOODS + userId);
if(!CollectionUtils.isEmpty(GoodsDtoList)){
log.info("命中了redis中的商品缓存数据,成功进入缓存---------------");
return GoodsDtoList;
}else{
//未命中redis中的缓存数据
String lockKey = RedisConstant.GOODS_LOCK + userId;
//尝试获取锁
try {
boolean isLock = this.tryLock(lockKey, userId);
if(!isLock){
//未获取到锁,休眠一段时间等待前一个线程重建缓存
Thread.sleep(RedisConstant.THREAD_SLEEP);
//重新获取,递归
return getList(userId);
}
//获取到了锁
//再去redis中查询是否有缓存的数据
List<GoodsDto> GoodsList = (List<GoodsDto>) redisTemplate.opsForValue().get(GoodsConstant.GOODS + userId);
if(!CollectionUtils.isEmpty(GoodsList)){
log.info("命中了双重检测redis中的商品缓存数据,成功进入缓存---------------");
return GoodsList;
}
//根据商户id获取商户所属的店铺
QueryWrapper<Store> wrapper = new QueryWrapper<>();
wrapper.eq("user_id",userId);
List<Store> stores = storeMapper.selectList(wrapper);
if(CollectionUtils.isEmpty(stores)){
//如果数据中不存在则缓存空对象返回来解决redis缓存穿透,两分钟后过期
List<UserDto> dtoList = new ArrayList<>();
dtoList.add(new UserDto());
redisTemplate.opsForValue().set(GoodsConstant.GOODS + userId,dtoList,RedisConstant.CACHE_NULL_EXPIRE, TimeUnit.MINUTES);
}else{
//进行缓存重建
List<GoodsDto> goodsDtoList = this.getGoodsDtoList(stores,userId);
return goodsDtoList;
}
}catch (Exception e){
throw new ServiceException(ResponseEnum.ERROR);
}finally {
//释放锁
this.unLock(lockKey);
}
}
return null;
}
逻辑过期

防重复提交(解决幂等)
使用redis + lua脚本保证原子性操作

lua脚本
if(redis.call('get', KEYS[1]) == ARGV[1]) then
return redis.call('del', KEYS[1])
else return 0 end
核心代码
private static final DefaultRedisScript<Boolean> REDIS_SCRIPT;
/**
* 静态代码块中初始化lua脚本
*/
static {
REDIS_SCRIPT = new DefaultRedisScript<Boolean>();
//读取项目中的lua文件
REDIS_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
REDIS_SCRIPT.setResultType(Boolean.class);
}
@Override
public void saveOrderInfo(OrderDto orderDto,String orderId) {
//TODO: 需要做防止重复提交和锁定库存操作
if(StringUtils.isEmpty(orderId)){
throw new ServiceException(ResponseEnum.ORDER_ID_EXCEPTION);
}
//lua脚本保证原子性操作
//拿着orderId到redis中查询
//如果redis中有相同orderId,则说明是正常提交订单,把redis中的orderId删除,
// 这个过程要保证原子性操作,由lua脚本保证。
Boolean flag = (Boolean) redisTemplate.execute(REDIS_SCRIPT, Arrays.asList(RedisConstant.ORDER_ID + orderId), orderId);
//如果redis中没有相同orderId,则说明是重复提交订单,不再往下进行
if(!flag){
//返回false则说明是重复提交
throw new ServiceException(ResponseEnum.DO_NOT_REPLACE_SUBMIT);
}
}
}
redisson实现分布式锁
分布式锁实现库存锁定,解决商品超卖问题
maven依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.2</version>
</dependency>
redisson配置类
@Data
@Configuration
@ConfigurationProperties("spring.redis")
public class RedissonConfig {
private String host;
private String addresses;
private String password;
private String port;
private int timeout = 3000;
private int connectionPoolSize = 64;
private int connectionMinimumIdleSize=10;
private int pingConnectionInterval = 60000;
private static String ADDRESS_PREFIX = "redis://";
/**
* 自动装配
*
*/
@Bean
RedissonClient redissonSingle() {
Config config = new Config();
// 判断redis 的host是否为空
if(StringUtils.isEmpty(host)){
throw new RuntimeException("host is empty");
}
// 配置host,port等参数
SingleServerConfig serverConfig = config.useSingleServer()
//redis://127.0.0.1:7181
.setAddress(ADDRESS_PREFIX + this.host + ":" + port)
.setTimeout(this.timeout)
.setPingConnectionInterval(pingConnectionInterval)
.setConnectionPoolSize(this.connectionPoolSize)
.setConnectionMinimumIdleSize(this.connectionMinimumIdleSize);
// 判断进入redis 是否密码
if(!StringUtils.isEmpty(this.password)) {
serverConfig.setPassword(this.password);
}
return Redisson.create(config);
}
}
实战
接口逻辑:前端跳转到支付页面时获取订单id,在获取订单id的同时锁定商品库存。若订单中的商品全部锁定成功则返回订单id;若有一个商品锁定失败则将锁定的商品库存全部解锁,并返回商品库存锁定失败提示。
核心代码如下
/**
* 商品锁定实体
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class GoodsStockLockVo implements Serializable {
/**
* 商品id
*/
private Integer goodsId;
/**
* 锁定库存量
*/
private Integer numbers;
/**
* 是否成功锁定库存标志
*/
private boolean isLock;
}
getOrderId()获取订单号方法
@Override
@Transactional(rollbackFor = Exception.class)
public String getOrderId(List<Integer> cartIds,Integer userId) {
//生成一个唯一的订单id
String orderId = IdUtil.getSnowflakeNextIdStr();
redisTemplate.opsForValue().set(RedisConstant.ORDER_ID + orderId,orderId,RedisConstant.ORDER_CACHE_EXPIRE,TimeUnit.MINUTES);
//获取订单中的所有商品数据
//将所有商品数据转换为商品锁定实体
if(!CollectionUtils.isEmpty(cartIds)){
List<GoodsStockLockVo> stockLockVos = new ArrayList<GoodsStockLockVo>(cartIds.size());
cartIds.stream().forEach(id -> {
GoodsStockLockVo goodsStockLockVo = new GoodsStockLockVo();
Cart cart = baseMapper.selectById(id);
Goods goods = goodsMapper.selectById(cart.getGoodsId());
goodsStockLockVo.setGoodsId(goods.getId());
goodsStockLockVo.setNumbers(cart.getNumber());
stockLockVos.add(goodsStockLockVo);
});
//验证和锁定库存
boolean isLockSuccess = this.checkAndLock(stockLockVos, orderId);
if(!isLockSuccess){
//库存锁定失败
throw new ServiceException(ResponseEnum.GOODS_STOCK_FAIL);
}
//将支付表的状态设置为待付款
asyncService.savePayInfo(orderId, userId);
//保存订单和购物车记录
asyncService.saveOrderCart(orderId, userId,cartIds);
//库存锁定成功返回订单id
return orderId;
}
return null;
}
checkAndLock()商品库存是否锁定成功方法
/**
* 验证和锁定库存
* @param StockLockList
* @param orderId
* @return
*/
private boolean checkAndLock(List<GoodsStockLockVo> StockLockList,String orderId) {
//判空
if(CollectionUtils.isEmpty(StockLockList)){
throw new ServiceException(ResponseEnum.GOODS_STOCK_FAIL);
}
//遍历每一个商品,验证并锁定库存,具备原子性
StockLockList.stream().forEach(item -> {
this.checkLock(item);
});
//只要有一个商品库存锁定失败,所有锁定成功的商品都解锁
boolean flag = StockLockList.stream().anyMatch(item -> !item.isLock());
if(flag){
//所有锁定成功的商品都解锁
StockLockList.stream().filter(GoodsStockLockVo::isLock)
.forEach(stockLockVo -> {
//解锁库存
goodsMapper.unLockStock(stockLockVo.getGoodsId(),stockLockVo.getNumbers());
});
//返回失败状态
return false;
}
//如果所有商品都锁定成功,则缓存锁定库存数据,方便后续解锁和减库存
//TODO: 缓存设置时间为30
redisTemplate.opsForValue().set(RedisConstant.GOODS_STOCK_LOCK + orderId,StockLockList);
return true;
}
checkLock()锁定库存方法
/**
* 锁定库存逻辑方法
* @param stockLockVo
* @return
*/
private void checkLock(GoodsStockLockVo stockLockVo) {
//获取锁
//公平锁
RLock rLock = this.redissonClient.getFairLock(RedisConstant.STOCK_LOCK + stockLockVo.getGoodsId());
//上锁
rLock.lock();
try {
//验证库存
Goods goods = goodsMapper.checkStock(stockLockVo.getGoodsId(),stockLockVo.getNumbers());
if(null == goods){
//说明库存不足,锁定失败
stockLockVo.setLock(false);
return;
}
//锁定库存
//就是一个更新操作
Integer rows = goodsMapper.lockStock(stockLockVo.getGoodsId(),stockLockVo.getNumbers());
if(rows >= 1){
//成功锁定
stockLockVo.setLock(true);
}
}finally {
//解锁
rLock.unlock();
}
}
dao操作
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.qs.mapper.GoodsMapper">
<resultMap id="goodsInfoMap" type="com.qs.pojo.entity.Goods" autoMapping="true" />
<!--验证库存-->
<select id="checkStock" resultMap="goodsInfoMap">
select * from qs_goods
where id = #{goodsId} and number - lock_stock > #{numbers} for update
</select>
<!--锁定库存-->
<update id="lockStock">
update qs_goods
set lock_stock = lock_stock + #{numbers}
where id = #{goodsId}
</update>
<!--解锁库存-->
<update id="unLockStock">
update qs_goods
set lock_stock = lock_stock - #{numbers}
where id = #{goodsId}
</update>
</mapper>