<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>Redis</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<slf4j.version>1.7.36</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<jedis.version>4.3.1</jedis.version>
<maven.compiler.plugin.version>3.6.1</maven.compiler.plugin.version>
</properties>
<dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.compiler.plugin.version}</version>
</plugin>
</plugins>
</build>
</project>
package com.chenwc.jedis.distributed.locks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisPooled;
import redis.clients.jedis.params.SetParams;
import java.net.SocketTimeoutException;
import java.util.concurrent.TimeUnit;
/**
* Redis 分布式锁
* @author chenwc
* @date 2023/5/21 17:01
*/
public class JedisLock {
private final static Logger log = LoggerFactory.getLogger(JedisLock.class);
private JedisPooled client;
private LockState lockState;
private KeepAliveTask keepAliveTask;
//抢锁的时间间隔
private int sleepMillisecond;
//抢锁成功返回状态
private final static String RESULT_OK = "OK";
//解锁成功返回状态
private static final Long UNLOCK_SUCCESS = 1L;
/**
* 锁的封装
*/
class LockState {
//锁的键
private String lockKey;
//锁的值
private String lockValue;
//错误信息
private String errorMsg;
//锁的过期时间
private int leaseTTL;
private long leaseId;
//是否加锁成功
private boolean lockSuccess;
public LockState(String lockKey, int leaseTTL) {
this.lockKey = lockKey;
this.leaseTTL = leaseTTL;
}
public LockState(String lockKey, String value, int leaseTTL) {
this.lockKey = lockKey;
this.lockValue = value;
this.leaseTTL = leaseTTL;
}
public String getLockKey() {
return lockKey;
}
public void setLockKey(String lockKey) {
this.lockKey = lockKey;
}
public String getLockValue() {
return lockValue;
}
public void setLockValue(String lockValue) {
this.lockValue = lockValue;
}
public String getErrorMsg() {
return errorMsg;
}
public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}
public long getLeaseId() {
return leaseId;
}
public void setLeaseId(long leaseId) {
this.leaseId = leaseId;
}
public boolean isLockSuccess() {
return lockSuccess;
}
public void setLockSuccess(boolean lockSuccess) {
this.lockSuccess = lockSuccess;
}
public int getLeaseTTL() {
return leaseTTL;
}
public void setLeaseTTL(int leaseTTL) {
this.leaseTTL = leaseTTL;
}
}
/**
* 构造函数
* @param client JedisPooled客户端
* @param key 键
* @param value 值
* @param ttlSeconds 锁的过期时间
*/
public JedisLock(JedisPooled client, String key, String value, int ttlSeconds) {
//1.准备客户端
this.client = client;
this.lockState = new LockState(key, value, ttlSeconds);
//抢锁的重试间隔
this.sleepMillisecond = (ttlSeconds * 1000) / 3;
}
/**
* 加锁
* @param waitTime 等待时间
* @param waitUnit 时间单位
* @return 加锁是否成功,成功返回true,失败返回false
* @throws DtLockException 锁异常
*/
public boolean tryLock(long waitTime, TimeUnit waitUnit) throws DtLockException {
long totalMillisSeconds = waitUnit.toMillis(waitTime);
long start = System.currentTimeMillis();
//重试,直到成功或超过指定时间
while (true) {
// 抢锁
try {
//使用 SET 的扩展指令加锁(SET key value [EX seconds][px milliseconds] [NX|XX])
SetParams params = SetParams.setParams().nx().ex(lockState.getLeaseTTL());
String result = client.set(lockState.getLockKey(), lockState.getLockValue(), params);
if (RESULT_OK.equals(result)) {
manualKeepAlive();
log.info("[jedis-lock] lock success 线程:{} 加锁成功,key:{} , value:{}", Thread.currentThread().getName(), lockState.getLockKey(), lockState.getLockValue());
//抢锁成功
lockState.setLockSuccess(true);
return true;
} else {
//抢锁时间大于等待时间,返回失败
if (System.currentTimeMillis() - start >= totalMillisSeconds) {
log.info("[jedis-lock] lock fail 线程:{} 等待加锁超时,key:{} , value:{}", Thread.currentThread().getName(), lockState.getLockKey(), lockState.getLockValue());
return false;
}
//抢锁的重试时间间隔
Thread.sleep(sleepMillisecond);
}
} catch (Exception e) {
Throwable cause = e.getCause();
if (cause instanceof SocketTimeoutException) {//忽略网络抖动等异常
}
log.error("[jedis-lock] lock failed:" + e);
throw new DtLockException("[jedis-lock] lock failed:" + e.getMessage(), e);
}
}
}
/**
* 此实现中忽略,网络通信异常部分的处理,可参考tryLock
* @throws DtLockException 锁异常
*/
public void unlock() throws DtLockException {
try {
// 首先停止续约
if (keepAliveTask != null) {
keepAliveTask.close();
}
//使用 lua 解锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = client.eval(script, 1, lockState.getLockKey(), lockState.getLockValue());
//解锁成功
if (UNLOCK_SUCCESS.equals(result)) {
log.info("[jedis-lock] unlock success 线程 : {} 解锁成功,锁key : {} ,路径:{}", Thread.currentThread().getName(), lockState.getLockKey(), lockState.getLockValue());
} else {
log.info("[jedis-lock] unlock del key failed ,线程 : {} 解锁成功,锁key : {} ,路径:{}", Thread.currentThread().getName(), lockState.getLockKey(), lockState.getLockValue());
}
} catch (Exception e) {
log.error("[jedis-lock] unlock failed:" + e.getMessage(), e);
throw new DtLockException("[jedis-lock] unlock failed:" + e.getMessage(), e);
}
}
/**
* 定时将 Key 的过期时间推迟
*/
private void manualKeepAlive() {
log.debug("开启定时将 Key 的过期时间推迟任务");
final String t_key = lockState.getLockKey();
final int t_ttl = lockState.getLeaseTTL();
keepAliveTask = new KeepAliveTask(() -> {
// 刷新值
try {
log.debug("刷新 Key 的过期时间");
client.expire(t_key, t_ttl);
} catch (Exception e) {
e.printStackTrace();
}
}, t_ttl);
keepAliveTask.start();
}
}
package com.chenwc.jedis.distributed.locks;
/**
* @author chenwc
* @date 2023/5/21 17:03
*/
public class DtLockException extends RuntimeException{
public DtLockException(String message) {
super(message);
}
public DtLockException(String message, Throwable cause) {
super(message, cause);
}
public static DtLockException clientException(){
return new DtLockException("client is empty");
}
}
package com.chenwc.jedis.distributed.locks;
/**
* @author chenwc
* @date 2023/5/21 17:04
*/
public interface KeepAliveAction {
void run() throws DtLockException;
}
package com.chenwc.jedis.distributed.locks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/**
* @author chenwc
* @date 2023/5/21 17:04
*/
public class KeepAliveTask extends Thread {
private static final Logger log = LoggerFactory.getLogger(KeepAliveTask.class);
//是否在运行
public volatile boolean isRunning = true;
//过期时间,单位s
private long ttlSeconds;
private KeepAliveAction action;
/**
* 保持锁不过期的任务
* @param action KeepAliveAction
* @param ttlSeconds 过期时间,单位s
*/
public KeepAliveTask(KeepAliveAction action, long ttlSeconds) {
this.ttlSeconds = ttlSeconds;
this.action = action;
this.setDaemon(true);
}
@Override
public void run() {
// 每隔三分之一过期时间,续租一次
final long sleep = this.ttlSeconds * 1000 / 3;
log.debug("运行刷新 Key 的过期时间任务");
while (isRunning) {
try {
// 1、续租,刷新值
action.run();
log.debug("续租成功!");
TimeUnit.MILLISECONDS.sleep(sleep);
} catch (InterruptedException | DtLockException e) {
close();
}
}
}
public void close() {
log.debug("停止刷新 Key 的过期时间任务");
isRunning = false;
this.interrupt();
}
}
import com.chenwc.jedis.distributed.locks.JedisLock;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Connection;
import redis.clients.jedis.JedisPooled;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* @author chenwc
* @date 2023/5/21 16:45
*/
public class Main {
private final static Logger log = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) throws InterruptedException {
GenericObjectPoolConfig<Connection> poolConfig = new GenericObjectPoolConfig<>();
JedisPooled jedis = new JedisPooled(poolConfig, "42.193.141.68", 16379, 3000,"CWCcwy12");
JedisLock demoEtcdLock1 = new JedisLock(jedis, "rock", UUID.randomUUID().toString(), 10);
JedisLock demoEtcdLock2 = new JedisLock(jedis, "rock", UUID.randomUUID().toString(), 10);
boolean lock1 = demoEtcdLock1.tryLock(20, TimeUnit.SECONDS);
if (lock1) {
try {
log.info("do something");
for (int i = 0; i < 30; i++) {
log.info(String.valueOf(i));
Thread.sleep(1000);
}
} finally {
demoEtcdLock1.unlock();
}
}
demoEtcdLock1.tryLock(20, TimeUnit.SECONDS);
demoEtcdLock2.tryLock(5, TimeUnit.SECONDS);//等待锁,超时后放弃
}
}
log4j.rootLogger=info,stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %-5p %-5L --- [%-5t] %-10c : %m %n
通常分布式锁服务会和业务逻辑使用同一个Redis 集群,自然也使用同一个 Jedis 客户端; 当业务逻辑侧对 Redis 的读写并发提高时,会给 Redis 集群和 Jedis 客户度带来压力; 为应对一些异常情况,我们除了解功能层面的 API,还需要了解一下客户端的一些配置调优,主要是池化管理和网络通信两个方面。
在使用 Jedis 时可以配置 JedisPool 连接池,池化处理有许多好处, 如:提高响应的速度、降低资源的消耗、方便管理和维护;JedisPool 配置参数大部分是由 JedisPoolConfig 的对应项来赋值的, 在生产中我们需要关注它的配置并合理的赋值,如此能够提升 Redis 的服务性能,降低资源开销。 下边是对一些重要参数的说明、默认及设置建议:
参数 | 说明 | 默认值 | 建议 |
maxTotal | 资源池中的最大连接数 | 8 | |
maxIdle | 资源池允许的最大空闲连接数 | 8 | |
minIdle | 资源池确保的最少空闲连接数 | 0 | |
blockWhenExhausted | 当资源池用尽后,调用者是否要等待。只有当值为 true 时,下面的maxWaitMillis才会生效。 | TRUE | 建议使用默认值。 |
maxWaitMillis | 当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)。 | -1(表示永不超时) | 不建议使用默认值。 |
testOnBorrow | 向资源池借用连接时是否做连接有效性检测(ping)。检测到的无效连接将会被移除。 | FALSE | 业务量很大时候建议设置为 false,减少一次 ping 的开销。 |
testOnReturn | 向资源池归还连接时是否做连接有效性检测(ping)。检测到无效连接将会被移除。 | FALSE | 业务量很大时候建议设置为 false,减少一次 ping 的开销。 |
jmxEnabled | 是否开启 JMX 监控 | TRUE | 建议开启,请注意应用本身也需要开启。 |
空闲 Jedis 对象的回收检测由以下四个参数组合完成,testWhileIdle是该功能的开关。
名称 | 说明 | 默认值 | 建议 |
testWhileIdle | 是否开启空闲资源检测。 | FALSE | TRUE |
timeBetweenEvictionRunsMillis | 空闲资源的检测周期(单位为毫秒) | -1(不检测) | 建议设置,周期自行选择,也可以默认也可以使用下方JedisPoolConfig 中的配置。 |
minEvictableIdleTimeMillis | 资源池中资源的最小空闲时间(单位为毫秒),达到此值后空闲资源将被移除。 | 180000(即 30 分钟) | 可根据自身业务决定,一般默认值即可,也可以考虑使用下方JeidsPoolConfig中的配置。 |
numTestsPerEvictionRun | 做空闲资源检测时,每次检测资源的个数。 | 3 | 可根据自身应用连接数进行微调,如果设置为 -1,就是对所有连接做空闲监测。 |
通过源码可以发现这些配置是 GenericObjectPoolConfig 对象的属性,这个类实际上是 rg.apache.commons.pool2.impl apache 提供的, 也就是说 jedis 的连接池是依托于 apache 提供的对象池来,这个对象池的声明周期如下图,感兴趣的可以看下:

max-redirects:这个是集群模式下,重定向的最大数量;举例说明,比如第一台挂了,连第二台,第二台挂了连第三台,重新连接的次数不能超过这个值。
timeout:客户端超时时间,单位是毫秒。
Rsdis 节点故障或者网络抖动时,这两个值如果不合理可能会导致很严重的问题,比如 timeout 设置为 1000,maxRedirect 为 2,一旦出现 redis 连接问题,将会导致请求阻塞 3s 左右。而这个 3 秒的阻塞在可能导致常规业务流量下的线程池耗尽,需根据业务场景调整。
本篇介绍了如何基于 Redis 的特性来实现一个分布式锁,并基于 Jedis 库提供了一个分布式锁的示例,呈现了其关键 API 的用法; 此示例尚未达到生产级可用,如异常、可重入、可重试、超时控制等功能都未补全。