- A+
Redisson的Github地址:https://github.com/redisson/redisson/wiki/Table-of-Content
1、添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.4.1</version> <exclusions> <exclusion> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.3.0</version> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.13.6</version> </dependency>
2、新建配置类
import org.redisson.Redisson;import org.redisson.api.RedissonClient;import org.redisson.config.Config;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.io.IOException;/** * @author ZhaoBW * @version 1.0 * @date 2021/1/25 14:24 */@Configurationpublic class MyRedissonConfig { @Bean(destroyMethod="shutdown") RedissonClient redisson() throws IOException { //1、创建配置 Config config = new Config(); config.useSingleServer() .setAddress("192.168.43.129:6379"); return Redisson.create(config); } }
3、分布式锁
3.1、可重入锁
基于Redis的Redisson分布式可重入锁RLock对象实现了java.util.concurrent.locks.Lock接口。
@RequestMapping("/redisson") public String testRedisson(){ //获取分布式锁,只要锁的名字一样,就是同一把锁 RLock lock = redissonClient.getLock("lock"); //加锁(阻塞等待),默认过期时间是30秒 lock.lock(); try{ //如果业务执行过长,Redisson会自动给锁续期 Thread.sleep(1000); System.out.println("加锁成功,执行业务逻辑"); } catch (InterruptedException e) { e.printStackTrace(); } finally { //解锁,如果业务执行完成,就不会继续续期,即使没有手动释放锁,在30秒过后,也会释放锁 lock.unlock(); } return "Hello Redisson!"; }
大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
在RedissonLock类的renewExpiration()方法中,会启动一个定时任务每隔30/3=10秒给锁续期。如果业务执行期间,应用挂了,那么不会自动续期,到过期时间之后,锁会自动释放。
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getName() + " expiration", e); return; } if (res) { // reschedule itself renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }
另外Redisson还提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
// 加锁以后10秒钟自动解锁// 无需调用unlock方法手动解锁lock.lock(10, TimeUnit.SECONDS);// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
如果指定了锁的超时时间,底层直接调用lua脚本,进行占锁。如果超过leaseTime,业务逻辑还没有执行完成,则直接释放锁,所以在指定leaseTime时,要让leaseTime大于业务执行时间。RedissonLock类的tryLockInnerAsync()方法
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
3.2、读写锁
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。在读写锁中,读读共享、读写互斥、写写互斥。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");// 最常见的使用方法rwlock.readLock().lock();// 或rwlock.writeLock().lock();
读写锁测试类,当访问write接口时,read接口会被阻塞住。
import org.redisson.api.RCountDownLatch;import org.redisson.api.RLock;import org.redisson.api.RReadWriteLock;import org.redisson.api.RedissonClient;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.util.UUID;/** * @author ZhaoBW * @version 1.0 * @date 2021/1/23 20:12 */@RestControllerpublic class TestController { @Autowired RedissonClient redissonClient; @Autowired StringRedisTemplate redisTemplate; @RequestMapping("/write") public String write(){ RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("wr-lock"); RLock writeLock = readWriteLock.writeLock(); String s = UUID.randomUUID().toString(); writeLock.lock(); try { redisTemplate.opsForValue().set("wr-lock-key", s); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); }finally { writeLock.unlock(); } return s; } @RequestMapping("/read") public String read(){ RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("wr-lock"); RLock readLock = readWriteLock.readLock(); String s = ""; readLock.lock(); try { s = redisTemplate.opsForValue().get("wr-lock-key"); } finally { readLock.unlock(); } return s; }}
3.3、信号量(Semaphore)
Redisson的分布式信号量与的用法与java.util.concurrent.Semaphore相似
RSemaphore semaphore = redisson.getSemaphore("semaphore");semaphore.acquire();//或semaphore.acquireAsync();semaphore.acquire(23);semaphore.tryAcquire();//或semaphore.tryAcquireAsync();semaphore.tryAcquire(23, TimeUnit.SECONDS);//或semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);semaphore.release(10);semaphore.release();//或semaphore.releaseAsync();
现在redis中保存semaphore的值为3
Redisson1.jpg
然后在TestController中添加测试方法:
@RequestMapping("/releaseSemaphore") public String releaseSemaphore(){ RSemaphore semaphore = redissonClient.getSemaphore("semaphore"); semaphore.release(); return "release success"; } @RequestMapping("/acquireSemaphore") public String acquireSemaphore() throws InterruptedException { RSemaphore semaphore = redissonClient.getSemaphore("semaphore"); semaphore.acquire(); return "acquire success"; }
当访问acquireSemaphore接口时,redis中的semaphore会减1;访问releaseSemaphore接口时,redis中的semaphore会加1。当redis中的semaphore为0时,继续访问acquireSemaphore接口,会被阻塞,直到访问releaseSemaphore接口,使得semaphore>0,acquireSemaphore才会继续执行。
3.4、闭锁(CountDownLatch)
CountDownLatch作用:某一线程,等待其他线程执行完毕之后,自己再继续执行。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");latch.trySetCount(1);latch.await();// 在其他线程或其他JVM里RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");latch.countDown();
在TestController中添加测试方法,访问close接口时,调用await()方法进入阻塞状态,直到有三次访问release接口时,close接口才会返回。
@RequestMapping("/close") public String close() throws InterruptedException { RCountDownLatch close = redissonClient.getCountDownLatch("close"); close.trySetCount(3); close.await(); return "close"; } @RequestMapping("/release") public String release(){ RCountDownLatch close = redissonClient.getCountDownLatch("close"); close.countDown(); return "release"; }