[实践]如何在分布式微服务中使用Redis实现分布式锁-1

背景

在谈到分布式锁之前,为什么要用锁?好端端代码或者数据为啥要加锁呢?
我的理解是,因为值的读取和写入是两个步骤,如果在读取和写入之间有其他线程修改了数据,就会导致数据不一致的问题。锁的作用就是在读取和写入操作之间提供一个互斥的环境,保证在同一时间中只有一个线程可以进行操作。
这就像你买股票,买入和卖出是两个步骤,如果在你买入股票的过程中,其他人也在买入同一只股票,就会导致价格变动,最终影响你的交易价格。
但是如果这个股票交易所是你家开的,你可以在交易过程中先锁定价格,其他人不能交易,这样就能保证你的买这支股票就是就是你买之前看到的价格。

分布式锁

在分布式系统中,数据可能分布在多个节点上,如何确保在多个节点之间的操作是互斥的?这就需要锁,在分布式系统中,变了一个名字,叫分布式锁,不再是单体的锁了。
在我们的微服务架构中,服务之间肯定是会有并发操作的,分布式锁可以确保在同一时间只有一个服务可以操作某个资源,从而避免数据不一致的问题。
最常见的场景就是库存扣减,以及我们的余额扣减上。
假设有两个服务(也可能是同一个服务的多个实例)同时扣减库存,如果没有分布式锁,保证串行,就可能出现超卖的情况。想象一下,两个人同时下单购买同一件商品,并发线程去查询下,都查出来有一件商品,然后都扣减库存,最终导致库存为负数,这就会超卖。
我们实际遇到的情况是余额扣减。一个用户通过页面批量下单,而下单的过程中会使用异步消息扣减余额,当消费消息是并行处理的时候,扣减的余额的逻辑是先查询余额,再扣减余额。两个线程分别查了余额发现余额100块,然后都扣减了余额100块,更新成余额为0,最终导致实际的余额是0,而不是两次扣款后的余额-100。
为了解决这个问题我们引入了分布式锁。

Redis分布式锁

Redis是一个非常常用的缓存系统,在分布式锁中可以大有用途。
下面我讲下,Redis分布式锁的基本实现原理。那就是基于Redis的SETNX命令(Set if Not eXists)。当一个线程或进程需要获取锁时,它会尝试使用SETNX命令设置一个唯一的锁标识符。如果设置成功,表示获取锁成功;如果设置失败,表示锁已被其他线程或进程持有,那就要等待,等待一段时间后再尝试去设置值,然后一直循环尝试直到达到最大等待次数。
如果获取锁成功,线程或进程可以进行操作,操作完成后需要释放锁。释放锁的操作是通过删除锁标识符来实现的, 而删除是发送一个LUA脚本命令的方式实现。
如果在操作过程中线程或进程意外崩溃,锁可能会被遗留在Redis中,这就需要设置一个锁的过期时间,以防止死锁的情况发生。

在实现分布式锁时,我提个问题,为什么要用Redis实现分布式锁,而不是用数据库或者本地内存?
我的理解是,我们要制作分布式锁之前,首先要考虑到性能和可用性。Redis是一个内存数据库,读写速度非常快,适合高并发场景。而数据库通常是磁盘存储,读写速度相对较慢,可能会成为性能瓶颈。
另外,Redis提供了原子操作和过期时间的支持,可以有效避免死锁和资源泄漏的问题。而数据库的锁机制通常比较复杂,可能会导致性能下降。
而本地内存锁只能在单机环境下使用,无法满足分布式系统的需求。
所以,Redis天然有着实现分布式锁的优势。

实现分布式锁

使用切面编程实现拦截方法过滤出要加锁的方法

public interface LockConstant {

    /**
     * 无限重试
     */
    int RETRY_NO_END = -1;

    /**
     * 无限重试
     */
    int TIME_OUT_NO_END = -1;

    /**
     * 默认重试次数
     */
    int RETRY_TIMES_DEFAULT = 50;

    /**
     * 尝试获取锁 总超时时间,超过这个时间则不会在等待 默认 毫秒 值:10秒
     */
    long TIME_OUT_MILLS_DEFAULT = 10 * 1000L;

    /**
     * 每次等待时间 默认 200毫秒
     */
    long SLEEP_TIME_MILLS_PER_STEP_DEFAULT = 200L;

    /**
     * 默认锁住时间 毫秒 值:1分钟
     */
    long EXPIRE_TIME_MILLS_DEFAULT = 60 * 1000L;

    /**
     * value默认值
     */
    String VALUE_EMPTY_DEFAULT = "";

    /**
     * 释放锁脚本
     * COMPARE_AND_DELETE
     */
    String UNLOCK_REDIS_LUA_SCRIPT =
            "if redis.call('get',KEYS[1]) == ARGV[1]\n" +
                    "then\n" +
                    "    return redis.call('del',KEYS[1])\n" +
                    "else\n" +
                    "    return 0\n" +
                    "end";
    /**
     * 多次尝试获取锁失败提示信息测试
     */
    int TRY_LOCK_TO_LOG_WARN_COUNT_MAX_TIMES = 30;

    int TRY_LOCK_TO_LOG_WARN_COUNT_PER_TIMES = 10;
}

/**
 * 锁 切面处理
 * @author wanxp
 */
@Aspect
@Component
@Slf4j
@Order(1)
public class LockAspect implements EmbeddedValueResolverAware {

    private LockService lockService;

    private StringValueResolver resolver;

    private SpelExpressionParser spelParser = new SpelExpressionParser();


    @Autowired
    public void setLockService(LockService lockService) {
        this.lockService = lockService;
    }

    @Pointcut("@annotation(com.zlsong.shared.synchronize.DistributedLock)")
    public void pointcut() {

    }

    @Around("pointcut() && @annotation(distributedLock)")
    public Object lockHandle(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) throws Throwable{
        if (log.isTraceEnabled()) {
            log.trace("distributedLock handle start:{}", joinPoint.getSignature().getName());
        }
        String key = getKey(joinPoint, distributedLock);
        String finalKey = (Detect.notEmpty(distributedLock.keyPrefix()) ? distributedLock.keyPrefix() + ":" : "") + key;
        String finalValue  = getValueByValueGenerator(distributedLock.valueGenerator());
        try(LockObject lock = lockService.lock(finalKey, finalValue, distributedLock.expire(), distributedLock.retries(),
                distributedLock.sleepTimePerStep(), distributedLock.totalWaitTime())) {
            if (lock == null) {
                throw new RuntimeException("获取分布式锁失败;key:" + finalKey + ", value:" + finalValue + ", method:" +
                        joinPoint.getSignature().getName());
            }
            Object result = joinPoint.proceed();
            return result;
        }finally {
            if (log.isTraceEnabled()) {
                log.trace("distributedLock handle complete:{}", finalKey);
            }
        }
    }

    /**
     * 获取随机的value
     * @param valueGenerator 随机方式
     * @return value
     */
    private String getValueByValueGenerator(DistributedLock.ValueGenerator valueGenerator) {
        //这是一个随机值生成器
        value = IdGenerator.getInstance().generate().toString();
        return value;
    }

    @Override
    public void setEmbeddedValueResolver(StringValueResolver resolver) {
        this.resolver = resolver;
    }

    /**
     * 获取spel值
     * 通过SpelUtil解析注解的key值
     * @param proceedingJoinPoint 切点
     * @param distributedLock 注解
     * @return key的spel值
     */
    private String getKey(ProceedingJoinPoint proceedingJoinPoint, DistributedLock distributedLock) {
        Signature signature = proceedingJoinPoint.getSignature();
        MethodSignature methodSignature = (MethodSignature) signature;
        Method targetMethod = methodSignature.getMethod();
        Object target = proceedingJoinPoint.getTarget();
        Object[] arguments = proceedingJoinPoint.getArgs();
        return SpelUtil.parse(target, distributedLock.key(), targetMethod, arguments);
    }
}

真正的加锁逻辑

以下逻辑是分布式锁实现的核心。
首先构建一个锁的对象RedisDistributedLock,为了能自动解锁,RedisDistributedLock还使用了AutoCloseable接口,这样在使用完锁后可以自动释放锁。

/**
 * 分布式锁 可自动释放锁
 * @author wanxp
 */
@Slf4j
abstract public class LockAutoCloseable implements AutoCloseable{
    /**
     * release lock
     * @author piaoruiqing
     */
    abstract public void unlock();

    @Override
    public void close() throws Exception {
        log.debug("distributed lock unlock, {},", this.toString());
        this.unlock();
    }
}

/**
 * 锁,支持自动释放,使用方式:
 * <code>
 * try(LockObject lock = lockService.lock()) {
 *     //do something;
 * }
 * </code>
 *
 *
 *
 * @author wanxp
 */
public abstract class LockObject extends LockAutoCloseable{
}

/**
 * 分布式锁 redis的实现
 * @author wanxp
 */
@Slf4j
@Getter
public class RedisDistributedLock extends LockObject {
    private RedisOperations<String, String> operations;
    private String key;
    private String value;

    /**
     * 分布式锁 构造器
     * @param operations Redis操作
     * @param key key
     * @param value value
     */
    public RedisDistributedLock(RedisOperations<String, String> operations, String key, String value) {
        Assert.invalidIfNull(operations, SystemResultCode.PARAMETERS_ERROR);
        Assert.invalidIfNull(key, SystemResultCode.PARAMETERS_ERROR);
        Assert.invalidIfNull(value, SystemResultCode.PARAMETERS_ERROR);
        this.operations = operations;
        this.key = key;
        this.value = value;
    }

    @Override
    public void unlock() {
        List<String> keys = Collections.singletonList(key);

        DefaultRedisScript redisScript = new DefaultRedisScript();
        redisScript.setScriptText(LockConstant.UNLOCK_REDIS_LUA_SCRIPT);
        redisScript.setResultType(Boolean.class);
        operations.execute(redisScript, keys,value);
    }

    @Override
    public String toString() {
        return "RedisDistributedLock [key=" + key + ", value=" + value + "]";
    }

    @Override
    public boolean equals(Object obj) {
        return obj != null &&
                obj instanceof RedisDistributedLock &&
                this.key.equals(((RedisDistributedLock)obj).key) &&
                this.value.equals(((RedisDistributedLock) obj).value);
    }
}

注解编写

然后编写一个注解,用于标记需要加锁的方法。

/**
 * 分布式锁 注解
 * @author wanxp
 */
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface DistributedLock {
    @AliasFor("key")
    String value() default "''";

    /**
     * 锁 key,支持 spel
     *
     * @return
     */
    @AliasFor("value")
    String key() default "''";

    /**
     * 锁 key前缀,
     * 锁的固定前缀 和 key组成锁的具体内容
     * @return
     */
    String keyPrefix() default "";

    /**
     * 锁住最长多久自动释放 毫秒
     * @return
     */
    long expire() default LockConstant.EXPIRE_TIME_MILLS_DEFAULT;

    /**
     * 尝试获取锁总超时时间 毫秒
     * 如果锁一直获取不到,则累计等待totalWaitTime 时间后就会直接返回失败
     * @return
     */
    long totalWaitTime() default LockConstant.TIME_OUT_MILLS_DEFAULT;

    /**
     * 获取锁单步等待时间 毫秒
     * 如果锁被占用,则每次等待sleepTimePerStep再去尝试获取锁
     * @return
     */
    long sleepTimePerStep() default LockConstant.SLEEP_TIME_MILLS_PER_STEP_DEFAULT;

    /**
     * 重试次数,最大重试次数,当重试 retries 后还未获取锁则 直接返回失败
     * @return
     */
    int retries() default LockConstant.RETRY_TIMES_DEFAULT;
}

接下来是大名鼎鼎的真正锁逻辑来了

/**
 * 分布式、Redis 实现
 * @author wanxp
 */
@Slf4j
@Component
public class RedisDistributedLockServiceImpl implements LockService {
    private RedisOperations redisOperations;

    public RedisDistributedLockServiceImpl(@Qualifier("stringRedisTemplate") RedisOperations redisOperations) {
        this.redisOperations = redisOperations;
    }

    @Override
    public LockObject lock(String key, String value, Long expire) {
        return lock(key, value, expire,  0, LockConstant.SLEEP_TIME_MILLS_PER_STEP_DEFAULT, LockConstant.TIME_OUT_MILLS_DEFAULT);
    }

    @Override
    public LockObject tryLock(String key, String value) {
        return lock(key, value, LockConstant.EXPIRE_TIME_MILLS_DEFAULT);
    }

    /**
     * 通过key和value获取锁
     * retries/timeOutMills 只要有一个不满足要求即退出重试
     * @param key 锁的key
     * @param value 锁的value 防止同样的key的误删
     * @param expireMillis 锁的最长时间,锁住的时间,锁住之后超过这个时间若不主动释放则会自动释放锁
     * @param retries 最大的重试次数:0-不重试;-1-无重试次数;其他正整数:具体重试次数
     * @param sleepTimeMillisPerStep 每次没有获取到锁时重试等待时间、单步时间
     * @param totalWaitTimeMills 总的最长的等待时间:-1 无限等待;其他正整数:具体等待的时间
     * @return 是否已经获取锁
     */
    @Override
    public LockObject lock(String key, String value, long expireMillis, int retries, long sleepTimeMillisPerStep, long totalWaitTimeMills) {
        if (log.isTraceEnabled()) {
            log.trace("lock handle start: key:{}, value:{}", key, value);
        }
        int count = 0;
        long endTime = System.currentTimeMillis() + totalWaitTimeMills;
        do {
            if (count > LockConstant.TRY_LOCK_TO_LOG_WARN_COUNT_MAX_TIMES && count % LockConstant.TRY_LOCK_TO_LOG_WARN_COUNT_PER_TIMES == 0) {
                log.warn("lock warning, lock retry {} times; key:{},value:{},expire:{},retries:{},waitingTime:{}, " +
                                "timeOutMills:{}", count, key, value, expireMillis, retries, sleepTimeMillisPerStep,
                        totalWaitTimeMills);
            }
            Boolean result = redisOperations
                    .opsForValue()
                    .setIfAbsent(key, value, expireMillis, TimeUnit.MILLISECONDS);
            if (result) {
                if (log.isTraceEnabled()) {
                    log.trace("lock handle success: key:{}, value:{}", key, value);
                }
                return new RedisDistributedLock(redisOperations, key, value);
            }else if (retries == 0) {
                log.warn("lock failed, retries = 0,key:{}, value:{}", key, value);
                return null;
            }
            if (count < retries || LockConstant.RETRY_NO_END == retries) {
                try {
                    TimeUnit.MILLISECONDS.sleep(sleepTimeMillisPerStep);
                }catch (InterruptedException e) {
                    log.error("lock error, interruptedException;lock retry " + count + " times, " +
                                    "key:" + key + ",value:" + value + ",expireMillis:" + expireMillis +
                                    ",retries:" + retries + ",waitingTimeMillis:" + sleepTimeMillisPerStep +
                            ", timeOutMills:"+ totalWaitTimeMills);
                }
            }
            if (Thread.currentThread().isInterrupted() ||
                    (LockConstant.TIME_OUT_NO_END != totalWaitTimeMills && System.currentTimeMillis() >= endTime)) {
                break;
            }
        } while (count++ < retries || LockConstant.RETRY_NO_END == retries);
        log.warn("lock failed, return lock:null, key:{}, value:{}", key, value);
        return null;
    }
}

使用分布式锁

@Service
public class OrderService {

    @DistributedLock(key = "#orderId", expire = 10000, retries = 3, sleepTimePerStep = 200, totalWaitTime = 5000)
    public void processOrder(String orderId) {
        // 处理订单逻辑
        // 这里会自动加锁,确保同一时间只有一个线程可以处理这个订单
        System.out.println("Processing order: " + orderId);
    }
}

最后

分布式锁的实现是一个复杂的过程,需要考虑到性能、可用性和一致性等多个方面。Redis提供了一个简单而强大的分布式锁机制,可以有效地解决分布式系统中的数据一致性问题。
但是还有优化的空间,比如说可重入锁、读写锁等,这些都可以在实际应用中根据需求进行扩展和优化。