Redis 分布式锁示例

Java常用方法   2025-01-09 00:48   154   0  

一、pom 依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>Redis</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <slf4j.version>1.7.36</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
        <jedis.version>4.3.1</jedis.version>
        <maven.compiler.plugin.version>3.6.1</maven.compiler.plugin.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>${jedis.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-reload4j</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven.compiler.plugin.version}</version>
            </plugin>
        </plugins>
    </build>
</project>

二、示例代码

1、锁的封装

package com.chenwc.jedis.distributed.locks;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisPooled;
import redis.clients.jedis.params.SetParams;

import java.net.SocketTimeoutException;
import java.util.concurrent.TimeUnit;

/**
 * Redis 分布式锁
 * @author chenwc
 * @date 2023/5/21 17:01
 */
public class JedisLock {

    private final static Logger log = LoggerFactory.getLogger(JedisLock.class);
    private JedisPooled client;

    private LockState lockState;
    private KeepAliveTask keepAliveTask;

    //抢锁的时间间隔
    private int sleepMillisecond;
    //抢锁成功返回状态
    private final static String RESULT_OK = "OK";
    //解锁成功返回状态
    private static final Long UNLOCK_SUCCESS = 1L;

    /**
     * 锁的封装
     */
    class LockState {
        //锁的键
        private String lockKey;
        //锁的值
        private String lockValue;
        //错误信息
        private String errorMsg;
        //锁的过期时间
        private int leaseTTL;
        private long leaseId;
        //是否加锁成功
        private boolean lockSuccess;

        public LockState(String lockKey, int leaseTTL) {
            this.lockKey = lockKey;
            this.leaseTTL = leaseTTL;
        }

        public LockState(String lockKey, String value, int leaseTTL) {
            this.lockKey = lockKey;
            this.lockValue = value;
            this.leaseTTL = leaseTTL;
        }

        public String getLockKey() {
            return lockKey;
        }

        public void setLockKey(String lockKey) {
            this.lockKey = lockKey;
        }

        public String getLockValue() {
            return lockValue;
        }

        public void setLockValue(String lockValue) {
            this.lockValue = lockValue;
        }

        public String getErrorMsg() {
            return errorMsg;
        }

        public void setErrorMsg(String errorMsg) {
            this.errorMsg = errorMsg;
        }

        public long getLeaseId() {
            return leaseId;
        }

        public void setLeaseId(long leaseId) {
            this.leaseId = leaseId;
        }

        public boolean isLockSuccess() {
            return lockSuccess;
        }

        public void setLockSuccess(boolean lockSuccess) {
            this.lockSuccess = lockSuccess;
        }

        public int getLeaseTTL() {
            return leaseTTL;
        }

        public void setLeaseTTL(int leaseTTL) {
            this.leaseTTL = leaseTTL;
        }
    }

    /**
     * 构造函数
     * @param client JedisPooled客户端
     * @param key 键
     * @param value 值
     * @param ttlSeconds 锁的过期时间
     */
    public JedisLock(JedisPooled client, String key, String value, int ttlSeconds) {
        //1.准备客户端
        this.client = client;
        this.lockState = new LockState(key, value, ttlSeconds);
        //抢锁的重试间隔
        this.sleepMillisecond = (ttlSeconds * 1000) / 3;
    }

    /**
     * 加锁
     * @param waitTime 等待时间
     * @param waitUnit 时间单位
     * @return 加锁是否成功,成功返回true,失败返回false
     * @throws DtLockException 锁异常
     */
    public boolean tryLock(long waitTime, TimeUnit waitUnit) throws DtLockException {
        long totalMillisSeconds = waitUnit.toMillis(waitTime);
        long start = System.currentTimeMillis();
        //重试,直到成功或超过指定时间
        while (true) {
            // 抢锁
            try {
                //使用 SET 的扩展指令加锁(SET key value [EX seconds][px milliseconds] [NX|XX])
                SetParams params = SetParams.setParams().nx().ex(lockState.getLeaseTTL());
                String result = client.set(lockState.getLockKey(), lockState.getLockValue(), params);
                if (RESULT_OK.equals(result)) {
                    manualKeepAlive();
                    log.info("[jedis-lock] lock success 线程:{} 加锁成功,key:{} , value:{}", Thread.currentThread().getName(), lockState.getLockKey(), lockState.getLockValue());
                    //抢锁成功
                    lockState.setLockSuccess(true);
                    return true;
                } else {
                    //抢锁时间大于等待时间,返回失败
                    if (System.currentTimeMillis() - start >= totalMillisSeconds) {
                        log.info("[jedis-lock] lock fail 线程:{} 等待加锁超时,key:{} , value:{}", Thread.currentThread().getName(), lockState.getLockKey(), lockState.getLockValue());
                        return false;
                    }
                    //抢锁的重试时间间隔
                    Thread.sleep(sleepMillisecond);
                }
            } catch (Exception e) {
                Throwable cause = e.getCause();
                if (cause instanceof SocketTimeoutException) {//忽略网络抖动等异常
                }
                log.error("[jedis-lock] lock failed:" + e);
                throw new DtLockException("[jedis-lock] lock failed:" + e.getMessage(), e);
            }

        }
    }

    /**
     * 此实现中忽略,网络通信异常部分的处理,可参考tryLock
     * @throws DtLockException 锁异常
     */
    public void unlock() throws DtLockException {
        try {
            // 首先停止续约
            if (keepAliveTask != null) {
                keepAliveTask.close();
            }
            //使用 lua 解锁
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            Object result = client.eval(script, 1, lockState.getLockKey(), lockState.getLockValue());
            //解锁成功
            if (UNLOCK_SUCCESS.equals(result)) {
                log.info("[jedis-lock] unlock success 线程 : {} 解锁成功,锁key : {} ,路径:{}", Thread.currentThread().getName(), lockState.getLockKey(), lockState.getLockValue());
            } else {
                log.info("[jedis-lock] unlock del key failed ,线程 : {} 解锁成功,锁key : {} ,路径:{}", Thread.currentThread().getName(), lockState.getLockKey(), lockState.getLockValue());
            }
        } catch (Exception e) {
            log.error("[jedis-lock] unlock failed:" + e.getMessage(), e);
            throw new DtLockException("[jedis-lock] unlock failed:" + e.getMessage(), e);
        }
    }

    /**
     * 定时将 Key 的过期时间推迟
     */
    private void manualKeepAlive() {

        log.debug("开启定时将 Key 的过期时间推迟任务");

        final String t_key = lockState.getLockKey();
        final int t_ttl = lockState.getLeaseTTL();

        keepAliveTask = new KeepAliveTask(() -> {
            // 刷新值
            try {
                log.debug("刷新 Key 的过期时间");
                client.expire(t_key, t_ttl);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, t_ttl);
        keepAliveTask.start();
    }
}

2、异常类的简单实现

package com.chenwc.jedis.distributed.locks;

/**
 * @author chenwc
 * @date 2023/5/21 17:03
 */
public class DtLockException extends RuntimeException{
    public DtLockException(String message) {
        super(message);
    }

    public DtLockException(String message, Throwable cause) {
        super(message, cause);
    }

    public static DtLockException clientException(){
        return new DtLockException("client is empty");
    }
}

3、watchDog 的任务抽象

package com.chenwc.jedis.distributed.locks;

/**
 * @author chenwc
 * @date 2023/5/21 17:04
 */
public interface KeepAliveAction {
    void run() throws DtLockException;
}

4、watchDog 的简单实现

package com.chenwc.jedis.distributed.locks;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
 * @author chenwc
 * @date 2023/5/21 17:04
 */
public class KeepAliveTask extends Thread {

    private static final Logger log = LoggerFactory.getLogger(KeepAliveTask.class);
    //是否在运行
    public volatile boolean isRunning = true;
    //过期时间,单位s
    private long ttlSeconds;
    private KeepAliveAction action;

    /**
     * 保持锁不过期的任务
     * @param action KeepAliveAction
     * @param ttlSeconds 过期时间,单位s
     */
    public KeepAliveTask(KeepAliveAction action, long ttlSeconds) {
        this.ttlSeconds = ttlSeconds;
        this.action = action;
        this.setDaemon(true);
    }

    @Override
    public void run() {
        // 每隔三分之一过期时间,续租一次
        final long sleep = this.ttlSeconds * 1000 / 3;
        log.debug("运行刷新 Key 的过期时间任务");
        while (isRunning) {
            try {
                // 1、续租,刷新值
                action.run();
                log.debug("续租成功!");
                TimeUnit.MILLISECONDS.sleep(sleep);
            } catch (InterruptedException | DtLockException e) {
                close();
            }
        }
    }

    public void close() {
        log.debug("停止刷新 Key 的过期时间任务");
        isRunning = false;
        this.interrupt();
    }
}

5、测试锁

import com.chenwc.jedis.distributed.locks.JedisLock;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Connection;
import redis.clients.jedis.JedisPooled;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @author chenwc
 * @date 2023/5/21 16:45
 */
public class Main {

    private final static Logger log = LoggerFactory.getLogger(Main.class);

    public static void main(String[] args) throws InterruptedException {

        GenericObjectPoolConfig<Connection> poolConfig = new GenericObjectPoolConfig<>();
        JedisPooled jedis = new JedisPooled(poolConfig, "42.193.141.68", 16379, 3000,"CWCcwy12");


        JedisLock demoEtcdLock1 = new JedisLock(jedis, "rock", UUID.randomUUID().toString(), 10);
        JedisLock demoEtcdLock2 = new JedisLock(jedis, "rock", UUID.randomUUID().toString(), 10);

        boolean lock1 = demoEtcdLock1.tryLock(20, TimeUnit.SECONDS);
        if (lock1) {
            try {
                log.info("do something");
                for (int i = 0; i < 30; i++) {
                    log.info(String.valueOf(i));
                    Thread.sleep(1000);
                }
            } finally {
                demoEtcdLock1.unlock();
            }
        }
        demoEtcdLock1.tryLock(20, TimeUnit.SECONDS);
        demoEtcdLock2.tryLock(5, TimeUnit.SECONDS);//等待锁,超时后放弃
    }
}

6、log4j.properties

log4j.rootLogger=info,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %-5p %-5L --- [%-5t] %-10c : %m %n

三、使用 Jedis 的一些注意事项

通常分布式锁服务会和业务逻辑使用同一个Redis 集群,自然也使用同一个 Jedis 客户端;
当业务逻辑侧对 Redis 的读写并发提高时,会给 Redis 集群和 Jedis 客户度带来压力;
为应对一些异常情况,我们除了解功能层面的 API,还需要了解一下客户端的一些配置调优,主要是池化管理和网络通信两个方面。

1、池化管理

在使用 Jedis 时可以配置 JedisPool 连接池,池化处理有许多好处,
如:提高响应的速度、降低资源的消耗、方便管理和维护;JedisPool 配置参数大部分是由 JedisPoolConfig 的对应项来赋值的,
在生产中我们需要关注它的配置并合理的赋值,如此能够提升 Redis 的服务性能,降低资源开销。
下边是对一些重要参数的说明、默认及设置建议:

参数

说明

默认值

建议

maxTotal

资源池中的最大连接数

8


maxIdle

资源池允许的最大空闲连接数

8


minIdle

资源池确保的最少空闲连接数

0


blockWhenExhausted

当资源池用尽后,调用者是否要等待。只有当值为 true 时,下面的maxWaitMillis才会生效。

TRUE

建议使用默认值。

maxWaitMillis

当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)。

-1(表示永不超时)

不建议使用默认值。

testOnBorrow

向资源池借用连接时是否做连接有效性检测(ping)。检测到的无效连接将会被移除。

FALSE

业务量很大时候建议设置为 false,减少一次 ping 的开销。

testOnReturn

向资源池归还连接时是否做连接有效性检测(ping)。检测到无效连接将会被移除。

FALSE

业务量很大时候建议设置为 false,减少一次 ping 的开销。

jmxEnabled

是否开启 JMX 监控

TRUE

建议开启,请注意应用本身也需要开启。

空闲 Jedis 对象的回收检测由以下四个参数组合完成,testWhileIdle是该功能的开关。

名称

说明

默认值

建议

testWhileIdle

是否开启空闲资源检测。

FALSE

TRUE

timeBetweenEvictionRunsMillis

空闲资源的检测周期(单位为毫秒)

-1(不检测)

建议设置,周期自行选择,也可以默认也可以使用下方JedisPoolConfig 中的配置。

minEvictableIdleTimeMillis

资源池中资源的最小空闲时间(单位为毫秒),达到此值后空闲资源将被移除。

180000(即 30 分钟)

可根据自身业务决定,一般默认值即可,也可以考虑使用下方JeidsPoolConfig中的配置。

numTestsPerEvictionRun

做空闲资源检测时,每次检测资源的个数。

3

可根据自身应用连接数进行微调,如果设置为 -1,就是对所有连接做空闲监测。


通过源码可以发现这些配置是 GenericObjectPoolConfig 对象的属性,这个类实际上是 rg.apache.commons.pool2.impl apache 提供的,
也就是说 jedis 的连接池是依托于 apache 提供的对象池来,这个对象池的声明周期如下图,感兴趣的可以看下:

9eb02707861440c6858cacf02487e36d.png

2、网络调优

  • max-redirects:这个是集群模式下,重定向的最大数量;举例说明,比如第一台挂了,连第二台,第二台挂了连第三台,重新连接的次数不能超过这个值。

  • timeout:客户端超时时间,单位是毫秒。

Rsdis 节点故障或者网络抖动时,这两个值如果不合理可能会导致很严重的问题,比如 timeout 设置为 1000,maxRedirect 为 2,一旦出现 redis 连接问题,将会导致请求阻塞 3s 左右。而这个 3 秒的阻塞在可能导致常规业务流量下的线程池耗尽,需根据业务场景调整。

四、总结

本篇介绍了如何基于 Redis 的特性来实现一个分布式锁,并基于 Jedis 库提供了一个分布式锁的示例,呈现了其关键 API 的用法;
此示例尚未达到生产级可用,如异常、可重入、可重试、超时控制等功能都未补全。


博客评论
还没有人评论,赶紧抢个沙发~
发表评论
说明:请文明发言,共建和谐网络,您的个人信息不会被公开显示。