[实践]如何在分布式微服务中使用Redis实现分布式锁-1
背景
锁
在谈到分布式锁之前,为什么要用锁?好端端代码或者数据为啥要加锁呢?
我的理解是,因为值的读取和写入是两个步骤,如果在读取和写入之间有其他线程修改了数据,就会导致数据不一致的问题。锁的作用就是在读取和写入操作之间提供一个互斥的环境,保证在同一时间中只有一个线程可以进行操作。
这就像你买股票,买入和卖出是两个步骤,如果在你买入股票的过程中,其他人也在买入同一只股票,就会导致价格变动,最终影响你的交易价格。
但是如果这个股票交易所是你家开的,你可以在交易过程中先锁定价格,其他人不能交易,这样就能保证你的买这支股票就是就是你买之前看到的价格。
分布式锁
在分布式系统中,数据可能分布在多个节点上,如何确保在多个节点之间的操作是互斥的?这就需要锁,在分布式系统中,变了一个名字,叫分布式锁,不再是单体的锁了。
在我们的微服务架构中,服务之间肯定是会有并发操作的,分布式锁可以确保在同一时间只有一个服务可以操作某个资源,从而避免数据不一致的问题。
最常见的场景就是库存扣减,以及我们的余额扣减上。
假设有两个服务(也可能是同一个服务的多个实例)同时扣减库存,如果没有分布式锁,保证串行,就可能出现超卖的情况。想象一下,两个人同时下单购买同一件商品,并发线程去查询下,都查出来有一件商品,然后都扣减库存,最终导致库存为负数,这就会超卖。
我们实际遇到的情况是余额扣减。一个用户通过页面批量下单,而下单的过程中会使用异步消息扣减余额,当消费消息是并行处理的时候,扣减的余额的逻辑是先查询余额,再扣减余额。两个线程分别查了余额发现余额100块,然后都扣减了余额100块,更新成余额为0,最终导致实际的余额是0,而不是两次扣款后的余额-100。
为了解决这个问题我们引入了分布式锁。
Redis分布式锁
Redis是一个非常常用的缓存系统,在分布式锁中可以大有用途。
下面我讲下,Redis分布式锁的基本实现原理。那就是基于Redis的SETNX命令(Set if Not eXists)。当一个线程或进程需要获取锁时,它会尝试使用SETNX命令设置一个唯一的锁标识符。如果设置成功,表示获取锁成功;如果设置失败,表示锁已被其他线程或进程持有,那就要等待,等待一段时间后再尝试去设置值,然后一直循环尝试直到达到最大等待次数。
如果获取锁成功,线程或进程可以进行操作,操作完成后需要释放锁。释放锁的操作是通过删除锁标识符来实现的, 而删除是发送一个LUA脚本命令的方式实现。
如果在操作过程中线程或进程意外崩溃,锁可能会被遗留在Redis中,这就需要设置一个锁的过期时间,以防止死锁的情况发生。
在实现分布式锁时,我提个问题,为什么要用Redis实现分布式锁,而不是用数据库或者本地内存?
我的理解是,我们要制作分布式锁之前,首先要考虑到性能和可用性。Redis是一个内存数据库,读写速度非常快,适合高并发场景。而数据库通常是磁盘存储,读写速度相对较慢,可能会成为性能瓶颈。
另外,Redis提供了原子操作和过期时间的支持,可以有效避免死锁和资源泄漏的问题。而数据库的锁机制通常比较复杂,可能会导致性能下降。
而本地内存锁只能在单机环境下使用,无法满足分布式系统的需求。
所以,Redis天然有着实现分布式锁的优势。
实现分布式锁
使用切面编程实现拦截方法过滤出要加锁的方法
public interface LockConstant {
/**
* 无限重试
*/
int RETRY_NO_END = -1;
/**
* 无限重试
*/
int TIME_OUT_NO_END = -1;
/**
* 默认重试次数
*/
int RETRY_TIMES_DEFAULT = 50;
/**
* 尝试获取锁 总超时时间,超过这个时间则不会在等待 默认 毫秒 值:10秒
*/
long TIME_OUT_MILLS_DEFAULT = 10 * 1000L;
/**
* 每次等待时间 默认 200毫秒
*/
long SLEEP_TIME_MILLS_PER_STEP_DEFAULT = 200L;
/**
* 默认锁住时间 毫秒 值:1分钟
*/
long EXPIRE_TIME_MILLS_DEFAULT = 60 * 1000L;
/**
* value默认值
*/
String VALUE_EMPTY_DEFAULT = "";
/**
* 释放锁脚本
* COMPARE_AND_DELETE
*/
String UNLOCK_REDIS_LUA_SCRIPT =
"if redis.call('get',KEYS[1]) == ARGV[1]\n" +
"then\n" +
" return redis.call('del',KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
/**
* 多次尝试获取锁失败提示信息测试
*/
int TRY_LOCK_TO_LOG_WARN_COUNT_MAX_TIMES = 30;
int TRY_LOCK_TO_LOG_WARN_COUNT_PER_TIMES = 10;
}
/**
* 锁 切面处理
* @author wanxp
*/
@Aspect
@Component
@Slf4j
@Order(1)
public class LockAspect implements EmbeddedValueResolverAware {
private LockService lockService;
private StringValueResolver resolver;
private SpelExpressionParser spelParser = new SpelExpressionParser();
@Autowired
public void setLockService(LockService lockService) {
this.lockService = lockService;
}
@Pointcut("@annotation(com.zlsong.shared.synchronize.DistributedLock)")
public void pointcut() {
}
@Around("pointcut() && @annotation(distributedLock)")
public Object lockHandle(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) throws Throwable{
if (log.isTraceEnabled()) {
log.trace("distributedLock handle start:{}", joinPoint.getSignature().getName());
}
String key = getKey(joinPoint, distributedLock);
String finalKey = (Detect.notEmpty(distributedLock.keyPrefix()) ? distributedLock.keyPrefix() + ":" : "") + key;
String finalValue = getValueByValueGenerator(distributedLock.valueGenerator());
try(LockObject lock = lockService.lock(finalKey, finalValue, distributedLock.expire(), distributedLock.retries(),
distributedLock.sleepTimePerStep(), distributedLock.totalWaitTime())) {
if (lock == null) {
throw new RuntimeException("获取分布式锁失败;key:" + finalKey + ", value:" + finalValue + ", method:" +
joinPoint.getSignature().getName());
}
Object result = joinPoint.proceed();
return result;
}finally {
if (log.isTraceEnabled()) {
log.trace("distributedLock handle complete:{}", finalKey);
}
}
}
/**
* 获取随机的value
* @param valueGenerator 随机方式
* @return value
*/
private String getValueByValueGenerator(DistributedLock.ValueGenerator valueGenerator) {
//这是一个随机值生成器
value = IdGenerator.getInstance().generate().toString();
return value;
}
@Override
public void setEmbeddedValueResolver(StringValueResolver resolver) {
this.resolver = resolver;
}
/**
* 获取spel值
* 通过SpelUtil解析注解的key值
* @param proceedingJoinPoint 切点
* @param distributedLock 注解
* @return key的spel值
*/
private String getKey(ProceedingJoinPoint proceedingJoinPoint, DistributedLock distributedLock) {
Signature signature = proceedingJoinPoint.getSignature();
MethodSignature methodSignature = (MethodSignature) signature;
Method targetMethod = methodSignature.getMethod();
Object target = proceedingJoinPoint.getTarget();
Object[] arguments = proceedingJoinPoint.getArgs();
return SpelUtil.parse(target, distributedLock.key(), targetMethod, arguments);
}
}
真正的加锁逻辑
以下逻辑是分布式锁实现的核心。
首先构建一个锁的对象RedisDistributedLock
,为了能自动解锁,RedisDistributedLock
还使用了AutoCloseable
接口,这样在使用完锁后可以自动释放锁。
/**
* 分布式锁 可自动释放锁
* @author wanxp
*/
@Slf4j
abstract public class LockAutoCloseable implements AutoCloseable{
/**
* release lock
* @author piaoruiqing
*/
abstract public void unlock();
@Override
public void close() throws Exception {
log.debug("distributed lock unlock, {},", this.toString());
this.unlock();
}
}
/**
* 锁,支持自动释放,使用方式:
* <code>
* try(LockObject lock = lockService.lock()) {
* //do something;
* }
* </code>
*
*
*
* @author wanxp
*/
public abstract class LockObject extends LockAutoCloseable{
}
/**
* 分布式锁 redis的实现
* @author wanxp
*/
@Slf4j
@Getter
public class RedisDistributedLock extends LockObject {
private RedisOperations<String, String> operations;
private String key;
private String value;
/**
* 分布式锁 构造器
* @param operations Redis操作
* @param key key
* @param value value
*/
public RedisDistributedLock(RedisOperations<String, String> operations, String key, String value) {
Assert.invalidIfNull(operations, SystemResultCode.PARAMETERS_ERROR);
Assert.invalidIfNull(key, SystemResultCode.PARAMETERS_ERROR);
Assert.invalidIfNull(value, SystemResultCode.PARAMETERS_ERROR);
this.operations = operations;
this.key = key;
this.value = value;
}
@Override
public void unlock() {
List<String> keys = Collections.singletonList(key);
DefaultRedisScript redisScript = new DefaultRedisScript();
redisScript.setScriptText(LockConstant.UNLOCK_REDIS_LUA_SCRIPT);
redisScript.setResultType(Boolean.class);
operations.execute(redisScript, keys,value);
}
@Override
public String toString() {
return "RedisDistributedLock [key=" + key + ", value=" + value + "]";
}
@Override
public boolean equals(Object obj) {
return obj != null &&
obj instanceof RedisDistributedLock &&
this.key.equals(((RedisDistributedLock)obj).key) &&
this.value.equals(((RedisDistributedLock) obj).value);
}
}
注解编写
然后编写一个注解,用于标记需要加锁的方法。
/**
* 分布式锁 注解
* @author wanxp
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface DistributedLock {
@AliasFor("key")
String value() default "''";
/**
* 锁 key,支持 spel
*
* @return
*/
@AliasFor("value")
String key() default "''";
/**
* 锁 key前缀,
* 锁的固定前缀 和 key组成锁的具体内容
* @return
*/
String keyPrefix() default "";
/**
* 锁住最长多久自动释放 毫秒
* @return
*/
long expire() default LockConstant.EXPIRE_TIME_MILLS_DEFAULT;
/**
* 尝试获取锁总超时时间 毫秒
* 如果锁一直获取不到,则累计等待totalWaitTime 时间后就会直接返回失败
* @return
*/
long totalWaitTime() default LockConstant.TIME_OUT_MILLS_DEFAULT;
/**
* 获取锁单步等待时间 毫秒
* 如果锁被占用,则每次等待sleepTimePerStep再去尝试获取锁
* @return
*/
long sleepTimePerStep() default LockConstant.SLEEP_TIME_MILLS_PER_STEP_DEFAULT;
/**
* 重试次数,最大重试次数,当重试 retries 后还未获取锁则 直接返回失败
* @return
*/
int retries() default LockConstant.RETRY_TIMES_DEFAULT;
}
接下来是大名鼎鼎的真正锁逻辑来了
/**
* 分布式、Redis 实现
* @author wanxp
*/
@Slf4j
@Component
public class RedisDistributedLockServiceImpl implements LockService {
private RedisOperations redisOperations;
public RedisDistributedLockServiceImpl(@Qualifier("stringRedisTemplate") RedisOperations redisOperations) {
this.redisOperations = redisOperations;
}
@Override
public LockObject lock(String key, String value, Long expire) {
return lock(key, value, expire, 0, LockConstant.SLEEP_TIME_MILLS_PER_STEP_DEFAULT, LockConstant.TIME_OUT_MILLS_DEFAULT);
}
@Override
public LockObject tryLock(String key, String value) {
return lock(key, value, LockConstant.EXPIRE_TIME_MILLS_DEFAULT);
}
/**
* 通过key和value获取锁
* retries/timeOutMills 只要有一个不满足要求即退出重试
* @param key 锁的key
* @param value 锁的value 防止同样的key的误删
* @param expireMillis 锁的最长时间,锁住的时间,锁住之后超过这个时间若不主动释放则会自动释放锁
* @param retries 最大的重试次数:0-不重试;-1-无重试次数;其他正整数:具体重试次数
* @param sleepTimeMillisPerStep 每次没有获取到锁时重试等待时间、单步时间
* @param totalWaitTimeMills 总的最长的等待时间:-1 无限等待;其他正整数:具体等待的时间
* @return 是否已经获取锁
*/
@Override
public LockObject lock(String key, String value, long expireMillis, int retries, long sleepTimeMillisPerStep, long totalWaitTimeMills) {
if (log.isTraceEnabled()) {
log.trace("lock handle start: key:{}, value:{}", key, value);
}
int count = 0;
long endTime = System.currentTimeMillis() + totalWaitTimeMills;
do {
if (count > LockConstant.TRY_LOCK_TO_LOG_WARN_COUNT_MAX_TIMES && count % LockConstant.TRY_LOCK_TO_LOG_WARN_COUNT_PER_TIMES == 0) {
log.warn("lock warning, lock retry {} times; key:{},value:{},expire:{},retries:{},waitingTime:{}, " +
"timeOutMills:{}", count, key, value, expireMillis, retries, sleepTimeMillisPerStep,
totalWaitTimeMills);
}
Boolean result = redisOperations
.opsForValue()
.setIfAbsent(key, value, expireMillis, TimeUnit.MILLISECONDS);
if (result) {
if (log.isTraceEnabled()) {
log.trace("lock handle success: key:{}, value:{}", key, value);
}
return new RedisDistributedLock(redisOperations, key, value);
}else if (retries == 0) {
log.warn("lock failed, retries = 0,key:{}, value:{}", key, value);
return null;
}
if (count < retries || LockConstant.RETRY_NO_END == retries) {
try {
TimeUnit.MILLISECONDS.sleep(sleepTimeMillisPerStep);
}catch (InterruptedException e) {
log.error("lock error, interruptedException;lock retry " + count + " times, " +
"key:" + key + ",value:" + value + ",expireMillis:" + expireMillis +
",retries:" + retries + ",waitingTimeMillis:" + sleepTimeMillisPerStep +
", timeOutMills:"+ totalWaitTimeMills);
}
}
if (Thread.currentThread().isInterrupted() ||
(LockConstant.TIME_OUT_NO_END != totalWaitTimeMills && System.currentTimeMillis() >= endTime)) {
break;
}
} while (count++ < retries || LockConstant.RETRY_NO_END == retries);
log.warn("lock failed, return lock:null, key:{}, value:{}", key, value);
return null;
}
}
使用分布式锁
@Service
public class OrderService {
@DistributedLock(key = "#orderId", expire = 10000, retries = 3, sleepTimePerStep = 200, totalWaitTime = 5000)
public void processOrder(String orderId) {
// 处理订单逻辑
// 这里会自动加锁,确保同一时间只有一个线程可以处理这个订单
System.out.println("Processing order: " + orderId);
}
}
最后
分布式锁的实现是一个复杂的过程,需要考虑到性能、可用性和一致性等多个方面。Redis提供了一个简单而强大的分布式锁机制,可以有效地解决分布式系统中的数据一致性问题。
但是还有优化的空间,比如说可重入锁、读写锁等,这些都可以在实际应用中根据需求进行扩展和优化。