- A+
一、springboot整合redisson环境
请参考我的上一篇博客:springboot整合redisson(一)搭建Redisson环境。
二、什么是锁?
我们讲的锁一般指的是同步锁,同步锁是为了保证多线程的操作都能符合预期结果,不会因为cpu缓存等问题导致发生数据错乱问题,举一个现实中的例子,你可能就好理解了,在古代,由于没有计算机,所以钱庄都是使用账本记录每个客户的账户余额信息,张三再钱庄一共存了 10 两银子,突然有一天,李四飞鸽传书张三(顺便丢给了张三一个卡号),以嫖娼被抓为由借 3 两银子应急,作为铁哥们的张三,自然不会看着李四在衙门里受苦,所以就去钱庄给李四汇钱,不过这个时候张三的妈妈担心张三一个人再他乡受苦,她准备到附近的钱庄给张三打 50 两银子,正好张三给李四打钱和张三妈妈给张三打钱都在同一时间,钱庄店员 A 去翻阅账本查看张三的余额,库房总管告诉店员A张三账户余额:10 元,店员B也去翻阅账本查看张三的余额,库房总管告诉店员 B 张三账户余额10 元,店员 A 先将李四的账户增加了 3 两,告诉库房总管,张三的余额需要修改为 7 两,然后库房总管将账本上张三二点余额修改为 7 两。由于店员 B 看到张三的余额也是 10 两,减去张三妈妈账户 50 两,然后往张三的账户增加 10 两,店员 B 告诉库房总管,张三的余额需要变更为:60 两,所以这个时候张三的账户就变成了 60 两。 这个时候问题就出现了,张三的最后余额应该是 57 两,而不是 60 两。
久而久之,钱庄发现流水一直对不上,终于发现了问题所在,所以钱庄老板做出了改进方案:只要有店员来库房查询客户余额之后,库房总管就记录下是谁查询的,这个时候锁定库房,其他店员将无法查询库房中的账本信息,直到查询账本的店员修改完客户的余额为止。这样就解决了上面的问题,因为下一个看到的客户余额数据总是最新的,这里就是我们程序中所讲到的:锁。
虽然锁可以保证每个客户的余额不会出现差错,但是你们发现了没有,效率变差了很多,店员 A 需要对张三的账户余额做变更,店员 B 需要对李四余额做变更,但是因为店员 A 先去库房查询了张三的余额,导致库房总管将账本全部锁定,店员 B 无法查询,只能等店员 A 回来修改完张三的余额信息(这里可以理解为数据库的表锁)。
过了几个月,虽然流水没有出过问题,但是效率太差了,于是老板想到了一个新套路,一个店员去库房查询客户余额信息的时候,让库房总管标记一下哪个店员,查询了哪个客户,其他店员只要不是操作已经记录的客户,就能成功的获取到客户的余额信息,这样就大大的提高了办公的效率(参考数据库中的行级锁)。
上面的例子只是纯属个人瞎想,钱庄肯定不是这么干的,他们都有着一套完善的执行流程,比我这个强得多。
锁:就是开辟一块临界空间,只有拿到钥匙的人才能进入临界区,进行相关的操作,没有拿到钥匙的人只能在门口等着。
三、什么是分布式锁
分布式锁,是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。
四、rediison分布式锁
1、可重入锁(Reentrant Lock),不可中断
redisson实现了 java.util.concurrent.locks.Lock 接口,同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
package com.nlx.redisson.core; import lombok.extern.slf4j.Slf4j; import org.redisson.api.*; import org.redisson.client.codec.Codec; import org.redisson.codec.MarshallingCodec; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; import java.util.concurrent.TimeUnit; /** * @ClassName RedissonTemplate * redisson封装操作类 * @author nlx */ @Configuration @Slf4j public class RedissonTemplate { private final RedissonClient redissonClient; /** * 锁前缀 */ private final String DEFAULT_LOCK_NAME = "nlx-instance"; public RedissonTemplate(RedissonClient redissonClient) { this.redissonClient = redissonClient; } /** * 加锁(可重入),会一直等待获取锁,不会中断 * @param lockName waitTimeout timeout * @return boolean * @author ymy * @date 2021/5/13 17:53 */ public boolean lock(String lockName, long timeout) { checkRedissonClient(); RLock lock = getLock(lockName); try { if(timeout != -1){ // timeout:超时时间 TimeUnit.SECONDS:单位 lock.lock(timeout, TimeUnit.SECONDS); }else{ lock.lock(); } log.debug(" get lock success ,lockKey:{}", lockName); return true; } catch (Exception e) { log.error(" get lock fail,lockKey:{}, cause:{} ", lockName, e.getMessage()); return false; } } /** * 解锁 * @param lockName */ public void unlock(String lockName){ checkRedissonClient(); try { RLock lock = getLock(lockName); if(lock.isLocked() && lock.isHeldByCurrentThread()){ lock.unlock(); log.debug("key:{},unlock success",lockName); }else{ log.debug("key:{},没有加锁或者不是当前线程加的锁 ",lockName); } }catch (Exception e){ log.error("key:{},unlock error,reason:{}",lockName,e.getMessage()); } } private RLock getLock(String lockName) { String key = DEFAULT_LOCK_NAME + lockName; return redissonClient.getLock(key); } private void checkRedissonClient() { if (null == redissonClient) { log.error(" redissonClient is null ,please check redis instance ! "); throw new RuntimeException("redissonClient is null ,please check redis instance !"); } if (redissonClient.isShutdown()) { log.error(" Redisson instance has been shut down !!!"); throw new RuntimeException("Redisson instance has been shut down !!!"); } } } package com.nlx.redisson; import com.nlx.redisson.core.RedissonTemplate; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @SpringBootTest @Slf4j class SpringbootRedissonApplicationTests { @Autowired private RedissonTemplate redissonTemplate; private CountDownLatch count = new CountDownLatch(2); @Test void contextLoads() { String lockName = "hello-test"; new Thread(() ->{ String threadName = Thread.currentThread().getName(); log.info("线程:{} 正在尝试获取锁。。。",threadName); boolean lock = redissonTemplate.lock(lockName, 60L); doSomthing(lock,lockName,threadName); }).start(); new Thread(() ->{ String threadName = Thread.currentThread().getName(); log.info("线程:{} 正在尝试获取锁。。。",threadName); boolean lock = redissonTemplate.lock(lockName, 60L); doSomthing(lock,lockName,threadName); }).start(); try { count.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("子线程都已执行完毕,main函数可以结束了!"); } private void doSomthing(boolean lock,String lockName,String threadName) { if(lock){ log.info("线程:{},获取到了锁",threadName); try{ try { TimeUnit.SECONDS.sleep(5L); count.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }finally { redissonTemplate.unlock(lockName); log.info("线程:{},释放了锁",threadName); } } } }
在单元测试中,使用了两个线程同时去获取锁:hello-test,获取到锁的线程休眠5秒,然后释放锁资源,下面我来解释一下比较重要的几行代码。
CountDownLatch:线程同步,为了能在main函数执行结束之前看到连个子线程的执行结果。
RedissonTemplate:我自己封装的redisson工具类。
RedissonClient:redisson官方封装的工具类。
redissonClient.getLock(key):获取锁对象。
lock.lock(timeout, TimeUnit.SECONDS):锁定,timeout:超时时间 ,TimeUnit.SECONDS:单位。
lock.lock(); 锁定,没有超时间时间,如果当前线程一直不释放锁资源,其他线程将会一直处于阻塞状态。
lock.unlock();解锁。
我们直接来看单元测试的执行结果:
我们可以发现,Thread-3 在15:04:18.947 获取到了锁,而这个时候 Thread-4 还处于阻塞状态,直到5秒之后 15:04:23.977 Thread-3释放了锁,Thread-4 在 15:04:23.991 获取到了锁,Thread-4 大概阻塞了5秒钟,可以理解为 Thread-4 一直在等待锁资源的释放,如果只有锁的线程一直不释放锁,那么 Thread-4 将会一直处于等待状态,那你可能就会有疑问了,Thread-4 会等他多久呢?不要怀疑它的专一,它会等到天荒地老,海枯石烂,宇宙毁灭。说直白一点就是,除非有其他线程执行 Thread-4 线程的 interrupted()方法,否则 它的等待将用于休止。
我们将这种无休止的等待称为:不可中断,我们使用 RLock 中的lock() 方法的特性就是不可中断,这种锁存在比较大二点安全隐患,稍不注意,就能让你程序万劫不复。这点可以参考java并发编程的明星锁:synchronized,它也是一种不可中断类型的锁。
我们的例子中是设置的锁的过期时间,他还支持不设置过期时间,这种情况下,只要程序不解锁,那么其他线程都将一直处于阻塞状态,这样就会引发一个很严重的问题,那就是在线程获取到了锁之后,程序或者服务器突然宕机,等重启完成之后,其他线程也会一直处于阻塞状态,因为宕机前获取的锁还没有被释放。
redisson也为我们考虑到了这个问题,所以它设置一个看门狗。它的作用是在Redisson实例被关闭前,不断地延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
说直白一点,如果你加的锁没有指定过期时间,那么redisson会默认将这个锁的过期时间设置为 30 秒,快到 30 的程序去自动续期,直到程序把锁释放,如果这个时候服务器宕机了,那么程序的续期功能自然也就不存在了,锁最多还能再存活 30 秒,这个大家可以自己去测试一下,我这里就不做测试了,很简单,不带超时间锁定之后,去redis中查看当前锁的有效期是不是你 Config.lockWatchdogTimeout 参数指定的时间,然后过了这个时间,有效期是否自动刷新。
2.可重入锁(Reentrant Lock),可中断
我们先来看看RLock 给我提供的可中断锁的方法有哪些
boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit); RFuture<Boolean> tryLockAsync(); RFuture<Boolean> tryLockAsync(long threadId); RFuture<Boolean> tryLockAsync(long waitTime, TimeUnit unit); RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId);
tryLock():很好理解,尝试着加锁,这里面有几个参数讲解一下:
-
time:等待锁的最长时间。
-
unit:时间单位。
-
waitTime:与time一致,等待锁的最长时间。
-
leaseTime:锁的过期时间。
-
threadId:线程id。
大致意思说的就是一个线程带等待 time/waitTime时长后如果还没有获取到锁,那么当前线程将会放弃获取锁资源的机会,去干其他事情。Async结尾的几个方法主要就是异步加锁的意思。
我们一起来写一个单元测试:
RedissonTemplate.java中添加如下方法:
/** * 可中断锁 * @param lockName 锁名称 * @param waitTimeout 等待时长 * @param unit 时间单位 * @return */ public boolean tryLock(String lockName, long waitTimeout, TimeUnit unit) { checkRedissonClient(); RLock lock = getLock(lockName); try { boolean res = lock.tryLock(waitTimeout,unit); if (!res) { log.debug(" get lock fail ,lockKey:{}", lockName); return false; } log.debug(" get lock success ,lockKey:{}", lockName); return true; } catch (Exception e) { log.error(" get lock fail,lockKey:{}, cause:{} ", lockName, e.getMessage()); return false; } }
单元测试:
package com.nlx.redisson; import com.nlx.redisson.core.RedissonTemplate; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @SpringBootTest @Slf4j class SpringbootRedissonApplicationTests { @Autowired private RedissonTemplate redissonTemplate; private CountDownLatch count = new CountDownLatch(2); @Test void contextLoads() { String lockName = "hello-test"; new Thread(() ->{ String threadName = Thread.currentThread().getName(); log.info("线程:{} 正在尝试获取锁。。。",threadName); boolean lock = redissonTemplate.tryLock(lockName, 2L,TimeUnit.SECONDS); doSomthing(lock,lockName,threadName); }).start(); new Thread(() ->{ String threadName = Thread.currentThread().getName(); log.info("线程:{} 正在尝试获取锁。。。",threadName); boolean lock = redissonTemplate.tryLock(lockName, 2L,TimeUnit.SECONDS); doSomthing(lock,lockName,threadName); }).start(); try { count.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("子线程都已执行完毕,main函数可以结束了!"); } private void doSomthing(boolean lock,String lockName,String threadName) { if(lock){ log.info("线程:{},获取到了锁",threadName); try{ try { TimeUnit.SECONDS.sleep(5L); } catch (InterruptedException e) { e.printStackTrace(); } }finally { redissonTemplate.unlock(lockName); log.info("线程:{},释放了锁",threadName); } }else{ log.info("线程:{},没有获取到锁,过了等待时长,结束等待",threadName); } count.countDown(); } }
输出结果:
Thread-3 在 20:22:52.494 的时候尝试获取锁,20:22:52.527 的时候获取到了锁,并且进入到了休眠状态,Thread-4 在 20:22:52.494 的时候尝试获取锁,直到 20:22:54.513 也没有获取到,然后 Thread-4就放弃了等待,直接结束了线程,期间花费了两秒钟的时间,而我们设置的等待时间刚好就是两秒,所以单元测试通过。
3.公平锁(Fair Lock)
基于 Redis 的 Redisson 分布式可重入公平锁也是实现了 java.util.concurrent.locks.Lock 接口的一种 RLock 对象。同时还提供了异步(Async)、反射式(Reactive)和 RxJava2 标准的接口。它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson 会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。
何为公平?就是所谓的先来后到,先获取锁的线程先拿到锁,后面的线程都在后面排着,这里你可以理解为你去做核算检测,工作人员刚把棚子搭好的时候,你就去了,这个时候没有人,你一去就直接做,第二次核算检测的时候,你正好在上班,下班回来之后发现做核算的队伍排得老长老长,这个时候你就不得不排在那些人的后面,等待前面的人核算都做完了,才会轮到你,这就是程序里面的公平锁。前两的那两种都不是公平锁,什么意思呢?非公平锁可以把他想象成小车过十字路,在没有红绿灯以及交警指挥的时候,每辆车都想自己最先通过十字路口,然后疯狂地向前开,然后就导致了后面的堵车,映射程序中利用大量cas去获取锁,非常消耗cpu,这也是为什么十字路口需要红路灯和交警的原因,但是有些十字路口也不需要红绿灯,因为这个十字路口几乎没有什么车,不会造成拥堵,程序也是这样,没有大量的线程竞争的时候,就没有必要设置成公平锁,毕竟红绿灯和公平锁也是需要成本的。
我们一起来看看公平锁的实现方式
/** * 公平锁 * @param lockName * @param waitTimeout * @param timeout * @param unit * @return */ public boolean getFairLock(String lockName, long waitTimeout,long timeout, TimeUnit unit){ checkRedissonClient(); RLock lock = redissonClient.getFairLock(DEFAULT_LOCK_NAME + lockName); try { boolean res = lock.tryLock(waitTimeout,timeout,unit); if (!res) { log.debug(" get lock fail ,lockKey:{}", lockName); return false; } log.debug(" get lock success ,lockKey:{}", lockName); return true; } catch (Exception e) { log.error(" get lock fail,lockKey:{}, cause:{} ", lockName, e.getMessage()); return false; } } /** * 公平锁 */ @Test public void testFairLock() throws InterruptedException { CountDownLatch countDown = new CountDownLatch(3); String lockName = "hello-test"; new Thread(() -> { log.info("进入thread1 ======"); log.info("thread1 正在尝试获取锁。。。"); boolean lock = redissonTemplate.getFairLock(lockName, 20L, 7L,TimeUnit.SECONDS); doSomthing(lock, lockName, "thread1"); }).start(); new Thread(() -> { log.info("进入thread2 ======"); try { TimeUnit.SECONDS.sleep(2L); } catch (InterruptedException e) { e.printStackTrace(); } log.info("thread2 休眠结束 正在尝试获取锁。。。"); boolean lock = redissonTemplate.getFairLock(lockName, 20L,7L, TimeUnit.SECONDS); doSomthing(lock, lockName, "thread2"); }).start(); new Thread(() -> { log.info("进入thread3 ======"); try { TimeUnit.SECONDS.sleep(3L); } catch (InterruptedException e) { e.printStackTrace(); } log.info("thread3 休眠结束 正在尝试获取锁。。。"); boolean lock = redissonTemplate.getFairLock(lockName, 20L,7L, TimeUnit.SECONDS); doSomthing(lock, lockName, "thread3"); }).start(); countDown.await(); } 1234567891011121314151617181920212223242526272829303132333435363738394041 2021-06-27 11:22:13.753 INFO 1128 --- [ Thread-3] c.n.r.SpringbootRedissonApplicationTests : 进入thread1 ====== 2021-06-27 11:22:13.754 INFO 1128 --- [ Thread-4] c.n.r.SpringbootRedissonApplicationTests : 进入thread2 ====== 2021-06-27 11:22:13.754 INFO 1128 --- [ Thread-3] c.n.r.SpringbootRedissonApplicationTests : thread1 正在尝试获取锁。。。 2021-06-27 11:22:13.754 INFO 1128 --- [ Thread-5] c.n.r.SpringbootRedissonApplicationTests : 进入thread3 ====== 2021-06-27 11:22:13.796 INFO 1128 --- [ Thread-3] c.n.r.SpringbootRedissonApplicationTests : 线程:thread1,获取到了锁 2021-06-27 11:22:15.759 INFO 1128 --- [ Thread-4] c.n.r.SpringbootRedissonApplicationTests : thread2 休眠结束 正在尝试获取锁。。。 2021-06-27 11:22:16.767 INFO 1128 --- [ Thread-5] c.n.r.SpringbootRedissonApplicationTests : thread3 休眠结束 正在尝试获取锁。。。 2021-06-27 11:22:18.810 INFO 1128 --- [ Thread-3] c.n.r.SpringbootRedissonApplicationTests : 线程:thread1 正在释放了锁 2021-06-27 11:22:18.867 INFO 1128 --- [ Thread-4] c.n.r.SpringbootRedissonApplicationTests : 线程:thread2,获取到了锁 2021-06-27 11:22:23.869 INFO 1128 --- [ Thread-4] c.n.r.SpringbootRedissonApplicationTests : 线程:thread2 正在释放了锁 2021-06-27 11:22:23.912 INFO 1128 --- [ Thread-5] c.n.r.SpringbootRedissonApplicationTests : 线程:thread3,获取到了锁 2021-06-27 11:22:28.914 INFO 1128 --- [ Thread-5] c.n.r.SpringbootRedissonApplicationTests : 线程:thread3 正在释放了锁
4.联锁(MultiLock)
基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。
联锁指的是:同时对多个资源进行加锁操作,只有所有资源都加锁成功的时候,联锁才会成功。
@Test public void testMultiLock(){ RLock lock1 = redissonTemplate.getLock("lock1" ); RLock lock2 = redissonTemplate.getLock("lock2"); RLock lock3 = redissonTemplate.getLock("lock3"); RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); boolean flag = lock.tryLock(); if(flag){ try { log.info("联锁加索成功"); }finally { //一定要释放锁 lock.unlock(); } } }
5.红锁(RedLock)
基于Redis的Redisson红锁RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例
与联锁比较相似,都是对多个资源进行加锁,但是红锁与连锁不同的是,红锁只需要在大部分资源加锁成功即可,
/** * 红锁 */ @Test public void testRedLock(){ RLock lock1 = redissonTemplate.getLock("lock1" ); RLock lock2 = redissonTemplate.getLock("lock2"); RLock lock3 = redissonTemplate.getLock("lock3"); RedissonRedLock lock = new RedissonRedLock (lock1, lock2, lock3); boolean flag = lock.tryLock(); if(flag){ try { log.info("红锁加索成功"); }finally { //一定要释放锁 lock.unlock(); } } }
6.读写锁(ReadWriteLock)
基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。这点相当于java并发sdk并发包中的 StampedLock 。
如果大家对读写锁还不太熟悉的话,可以参考我的另外两篇文章:
【并发编程】java并发编程之ReentrantReadWriteLock读写锁
【并发编程】面试官:有没有比读写锁更快的锁?
/** * 读写锁 */ @Test public void testReadWriteLock(){ RReadWriteLock rwlock = redissonTemplate.getReadWriteLock("testRWLock"); rwlock.readLock().lock(); rwlock.writeLock().lock(); } 12345678 /** * 获取读写锁 * @param lockName * @return */ public RReadWriteLock getReadWriteLock(String lockName) { return redissonClient.getReadWriteLock(lockName); }
7.信号量(Semaphore)
基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
/** * 信号量 * @param semaphoreName * @return */ public RSemaphore getSemaphore(String semaphoreName) { return redissonClient.getSemaphore(semaphoreName); } /** * 信号量 */ @Test public void testSemaphore() throws InterruptedException { RSemaphore semaphore = redissonTemplate.getSemaphore("testSemaphore"); //设置许可个数 semaphore.trySetPermits(10); // //设置许可个数 异步 // semaphore.acquireAsync(); // //获取5个许可 // semaphore.acquire(5); // //尝试获取一个许可 // semaphore.tryAcquire(); // //尝试获取一个许可 异步 // semaphore.tryAcquireAsync(); // //尝试获取一个许可 等待5秒如果未获取到,则返回false // semaphore.tryAcquire(5, TimeUnit.SECONDS); // //尝试获取一个许可 等待5秒如果未获取到,则返回false 异步 // semaphore.tryAcquireAsync(5, TimeUnit.SECONDS); // //释放一个许可,将其返回给信号量 // semaphore.release(); // //释放 6 个许可 ,将其返回给信号量 // semaphore.release(6); // //释放一个许可,将其返回给信号量 异步 // semaphore.releaseAsync(); CountDownLatch count = new CountDownLatch(10); for (int i= 0;i< 15 ;++i){ new Thread(() -> { try { String threadName = Thread.currentThread().getName(); log.info("线程:{} 尝试获取许可。。。。。。。。。。。。。",threadName); //默认获取一个许可,如果没有获取到,则阻塞线程 semaphore.acquire(); log.info("线程:{}获取许可成功。。。。。。。", threadName); count.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } count.await(); }
在实现信号量的时候一定要注意许可数量,如果被使用完,而你用完之后并没有将许可归还给信号量,那么有可能在许可用完之后,之后的线程一直处于阻塞阶段。
关于信号量还有一个:可过期性信号量(PermitExpirableSemaphore),获取到的许可有效期只有你设置的时长,
/** * 可过期性信号量 * @param permitExpirableSemaphoreName * @return */ public RPermitExpirableSemaphore getPermitExpirableSemaphore(String permitExpirableSemaphoreName) { return redissonClient.getPermitExpirableSemaphore(permitExpirableSemaphoreName); } /** * 信号量 */ @Test public void testPermitExpirableSemaphore() throws InterruptedException { RPermitExpirableSemaphore semaphore = redissonTemplate.getPermitExpirableSemaphore("testPermitExpirableSemaphore"); //设置许可个数 semaphore.trySetPermits(10); // 获取一个信号,有效期只有2秒钟。 String permitId = semaphore.acquire(1, TimeUnit.SECONDS); log.info("许可:{}",permitId); semaphore.release(permitId); }
8.闭锁(CountDownLatch)
基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。
我在例子中也是用到了java sdk并发包中的 CountDownLatch ,主要是线程同步的作用,redisson同样也实现了这样的功能,我们一起来看一下redisson的代码实现
@Test public void testCountDownLatch() throws InterruptedException { RCountDownLatch latch = redissonTemplate.getCountDownLatch("testCountDownLatch"); latch.trySetCount(2); new Thread(() ->{ log.info("这是一个服务的线程"); try { TimeUnit.SECONDS.sleep(3); log.info("线程:{},休眠结束",Thread.currentThread().getName()); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() ->{ log.info("这是另外一个服务的线程"); try { TimeUnit.SECONDS.sleep(3); log.info("线程:{},休眠结束",Thread.currentThread().getName()); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); latch.await(); log.info("子线程执行结束。。。。。。"); } /** * 闭锁 * @param countDownLatchName * @return */ public RCountDownLatch getCountDownLatch(String countDownLatchName) { return redissonClient.getCountDownLatch(countDownLatchName); }
springboot整合redisson实现强大的分布式锁到这里就讲的差不多了,最后在贴一份单元测试和 RedissonTemplate 的完整代码吧。
package com.nlx.redisson.core; import lombok.extern.slf4j.Slf4j; import org.redisson.api.*; import org.redisson.client.codec.Codec; import org.redisson.codec.MarshallingCodec; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; import java.util.concurrent.TimeUnit; /** * @ClassName RedissonTemplate * redisson封装操作类 * @author nlx */ @Configuration @Slf4j public class RedissonTemplate { private final RedissonClient redissonClient; /** * 锁前缀 */ private final String DEFAULT_LOCK_NAME = "nlx-instance"; public RedissonTemplate(RedissonClient redissonClient) { this.redissonClient = redissonClient; } /** * 加锁(可重入),会一直等待获取锁,不会中断 * @param lockName waitTimeout timeout * @return boolean * @author ymy * @date 2021/5/13 17:53 */ public boolean lock(String lockName, long timeout) { checkRedissonClient(); RLock lock = getLock(lockName); try { if(timeout != -1){ // timeout:超时时间 TimeUnit.SECONDS:单位 lock.lock(timeout, TimeUnit.SECONDS); }else{ lock.lock(); } log.debug(" get lock success ,lockKey:{}", lockName); return true; } catch (Exception e) { log.error(" get lock fail,lockKey:{}, cause:{} ", lockName, e.getMessage()); return false; } } /** * 可中断锁 * @param lockName 锁名称 * @param waitTimeout 等待时长 * @param unit 时间单位 * @return */ public boolean tryLock(String lockName, long waitTimeout, TimeUnit unit) { checkRedissonClient(); RLock lock = getLock(lockName); try { boolean res = lock.tryLock(waitTimeout,unit); if (!res) { log.debug(" get lock fail ,lockKey:{}", lockName); return false; } log.debug(" get lock success ,lockKey:{}", lockName); return true; } catch (Exception e) { log.error(" get lock fail,lockKey:{}, cause:{} ", lockName, e.getMessage()); return false; } } /** * 公平锁 * @param lockName * @param waitTimeout * @param timeout * @param unit * @return */ public boolean getFairLock(String lockName, long waitTimeout,long timeout, TimeUnit unit){ checkRedissonClient(); RLock lock = redissonClient.getFairLock(DEFAULT_LOCK_NAME + lockName); try { boolean res = lock.tryLock(waitTimeout,timeout,unit); if (!res) { log.debug(" get lock fail ,lockKey:{}", lockName); return false; } log.debug(" get lock success ,lockKey:{}", lockName); return true; } catch (Exception e) { log.error(" get lock fail,lockKey:{}, cause:{} ", lockName, e.getMessage()); return false; } } /** * 解锁 * @param lockName */ public void unlock(String lockName){ checkRedissonClient(); try { RLock lock = redissonClient.getFairLock(DEFAULT_LOCK_NAME + lockName); if(lock.isLocked() && lock.isHeldByCurrentThread()){ lock.unlock(); log.debug("key:{},unlock success",lockName); }else{ log.debug("key:{},没有加锁或者不是当前线程加的锁 ",lockName); } }catch (Exception e){ log.error("key:{},unlock error,reason:{}",lockName,e.getMessage()); } } public RLock getLock(String lockName) { String key = DEFAULT_LOCK_NAME + lockName; return redissonClient.getLock(key); } private void checkRedissonClient() { if (null == redissonClient) { log.error(" redissonClient is null ,please check redis instance ! "); throw new RuntimeException("redissonClient is null ,please check redis instance !"); } if (redissonClient.isShutdown()) { log.error(" Redisson instance has been shut down !!!"); throw new RuntimeException("Redisson instance has been shut down !!!"); } } /** * 获取读写锁 * @param lockName * @return */ public RReadWriteLock getReadWriteLock(String lockName) { return redissonClient.getReadWriteLock(lockName); } /** * 信号量 * @param semaphoreName * @return */ public RSemaphore getSemaphore(String semaphoreName) { return redissonClient.getSemaphore(semaphoreName); } /** * 可过期性信号量 * @param permitExpirableSemaphoreName * @return */ public RPermitExpirableSemaphore getPermitExpirableSemaphore(String permitExpirableSemaphoreName) { return redissonClient.getPermitExpirableSemaphore(permitExpirableSemaphoreName); } /** * 闭锁 * @param countDownLatchName * @return */ public RCountDownLatch getCountDownLatch(String countDownLatchName) { return redissonClient.getCountDownLatch(countDownLatchName); } } package com.nlx.redisson; import com.nlx.redisson.core.RedissonTemplate; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.redisson.RedissonMultiLock; import org.redisson.RedissonRedLock; import org.redisson.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @SpringBootTest @Slf4j class SpringbootRedissonApplicationTests { @Autowired private RedissonTemplate redissonTemplate; private CountDownLatch count = new CountDownLatch(2); @Test void contextLoads() { String lockName = "hello-test"; new Thread(() ->{ String threadName = Thread.currentThread().getName(); log.info("线程:{} 正在尝试获取锁。。。",threadName); boolean lock = redissonTemplate.tryLock(lockName, 2L,TimeUnit.SECONDS); doSomthing(lock,lockName,threadName); }).start(); new Thread(() ->{ String threadName = Thread.currentThread().getName(); log.info("线程:{} 正在尝试获取锁。。。",threadName); boolean lock = redissonTemplate.tryLock(lockName, 2L,TimeUnit.SECONDS); doSomthing(lock,lockName,threadName); }).start(); try { count.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("子线程都已执行完毕,main函数可以结束了!"); } private void doSomthing(boolean lock,String lockName,String threadName) { if(lock){ log.info("线程:{},获取到了锁",threadName); try{ try { TimeUnit.SECONDS.sleep(5L); } catch (InterruptedException e) { e.printStackTrace(); } }finally { log.info("线程:{} 正在释放了锁",threadName); redissonTemplate.unlock(lockName); } }else{ log.info("线程:{},没有获取到锁,过了等待时长,结束等待",threadName); } count.countDown(); } /** * 公平锁 */ @Test public void testFairLock() throws InterruptedException { CountDownLatch countDown = new CountDownLatch(3); String lockName = "hello-test"; new Thread(() -> { log.info("进入thread1 ======"); log.info("thread1 正在尝试获取锁。。。"); boolean lock = redissonTemplate.getFairLock(lockName, 20L, 7L,TimeUnit.SECONDS); doSomthing(lock, lockName, "thread1"); }).start(); new Thread(() -> { log.info("进入thread2 ======"); try { TimeUnit.SECONDS.sleep(2L); } catch (InterruptedException e) { e.printStackTrace(); } log.info("thread2 休眠结束 正在尝试获取锁。。。"); boolean lock = redissonTemplate.getFairLock(lockName, 20L,7L, TimeUnit.SECONDS); doSomthing(lock, lockName, "thread2"); }).start(); new Thread(() -> { log.info("进入thread3 ======"); try { TimeUnit.SECONDS.sleep(3L); } catch (InterruptedException e) { e.printStackTrace(); } log.info("thread3 休眠结束 正在尝试获取锁。。。"); boolean lock = redissonTemplate.getFairLock(lockName, 20L,7L, TimeUnit.SECONDS); doSomthing(lock, lockName, "thread3"); }).start(); countDown.await(); } /** * 联锁 */ @Test public void testMultiLock(){ RLock lock1 = redissonTemplate.getLock("lock1" ); RLock lock2 = redissonTemplate.getLock("lock2"); RLock lock3 = redissonTemplate.getLock("lock3"); RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); boolean flag = lock.tryLock(); if(flag){ try { log.info("联锁加索成功"); }finally { //一定要释放锁 lock.unlock(); } } } /** * 红锁 */ @Test public void testRedLock(){ RLock lock1 = redissonTemplate.getLock("lock1" ); RLock lock2 = redissonTemplate.getLock("lock2"); RLock lock3 = redissonTemplate.getLock("lock3"); RedissonRedLock lock = new RedissonRedLock (lock1, lock2, lock3); boolean flag = lock.tryLock(); if(flag){ try { log.info("红锁加索成功"); }finally { //一定要释放锁 lock.unlock(); } } } /** * 读写锁 */ @Test public void testReadWriteLock(){ RReadWriteLock rwlock = redissonTemplate.getReadWriteLock("testRWLock"); rwlock.readLock().lock(); rwlock.writeLock().lock(); } /** * 信号量 */ @Test public void testSemaphore() throws InterruptedException { RSemaphore semaphore = redissonTemplate.getSemaphore("testSemaphore"); //设置许可个数 semaphore.trySetPermits(10); // //设置许可个数 异步 // semaphore.acquireAsync(); // //获取5个许可 // semaphore.acquire(5); // //尝试获取一个许可 // semaphore.tryAcquire(); // //尝试获取一个许可 异步 // semaphore.tryAcquireAsync(); // //尝试获取一个许可 等待5秒如果未获取到,则返回false // semaphore.tryAcquire(5, TimeUnit.SECONDS); // //尝试获取一个许可 等待5秒如果未获取到,则返回false 异步 // semaphore.tryAcquireAsync(5, TimeUnit.SECONDS); // //释放一个许可,将其返回给信号量 // semaphore.release(); // //释放 6 个许可 ,将其返回给信号量 // semaphore.release(6); // //释放一个许可,将其返回给信号量 异步 // semaphore.releaseAsync(); CountDownLatch count = new CountDownLatch(10); for (int i= 0;i< 15 ;++i){ new Thread(() -> { try { String threadName = Thread.currentThread().getName(); log.info("线程:{} 尝试获取许可。。。。。。。。。。。。。",threadName); //默认获取一个许可,如果没有获取到,则阻塞线程 semaphore.acquire(); log.info("线程:{}获取许可成功。。。。。。。", threadName); count.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } count.await(); } /** * 信号量 */ @Test public void testPermitExpirableSemaphore() throws InterruptedException { RPermitExpirableSemaphore semaphore = redissonTemplate.getPermitExpirableSemaphore("testPermitExpirableSemaphore"); //设置许可个数 semaphore.trySetPermits(10); // 获取一个信号,有效期只有2秒钟。 String permitId = semaphore.acquire(1, TimeUnit.SECONDS); log.info("许可:{}",permitId); semaphore.release(permitId); } @Test public void testCountDownLatch() throws InterruptedException { RCountDownLatch latch = redissonTemplate.getCountDownLatch("testCountDownLatch"); latch.trySetCount(2); new Thread(() ->{ log.info("这是一个服务的线程"); try { TimeUnit.SECONDS.sleep(3); log.info("线程:{},休眠结束",Thread.currentThread().getName()); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() ->{ log.info("这是另外一个服务的线程"); try { TimeUnit.SECONDS.sleep(3); log.info("线程:{},休眠结束",Thread.currentThread().getName()); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); latch.await(); log.info("子线程执行结束。。。。。。"); } }