@Scheduled + Redis 分布式锁—解决集群环境下多次定时任务执行

  • A+
所属分类:Java springboot

一.任务需求

***需求:***根据固定时间,主动获取本地数据库未推送的数据,然后将这些数据通过远程向某供应商推送。


二.定时任务代码

    @Resource
    private RedisLock redisLock;

    @Resource
    private RemoteInitiativeAeService initiativeAeService;


    /**
     * 主动拉取未读数据 如果数据不为空 主动推送
     * 拉取/推送时间 每天凌晨1点执行一次
     */
    @Scheduled(cron = "0 0 1 * * ?") //每天凌晨1点执行一次
    public void getBatchVaristor() {

        //获取当前类名 + 当前方法名
        String timerName = this.getClass().getName()
                + Thread.currentThread().getStackTrace()[1].getMethodName();
        //如果当前已经持有锁 结束 否则 设置过期时间 为 23小时59分钟
        if (redisLock.requireLock(timerName, 86340)) {
            return;
        }
        //查询 未推送过的原材料检验-压敏电阻
        List<AeRaiVaristorVO> raiVaristorVo = aeRaiVaristorMapper.selectNoReadVaristor();
        //如果存在未读数据 调用远程接口 主动推送
        if (StringUtils.isNotEmpty(raiVaristorVo)) {
            if (initiativeAeService.addAeVaristor(raiVaristorVo).checkAndGet() == 0) {
                throw new CustomException("推送错误! 数据0条!");
            }
        }
        //使用stream流 过滤出刚刚推送的Id
        List<Long> varistorIds = raiVaristorVo.stream().
                map(AeRaiVaristorVO::getId).collect(Collectors.toList());
        //根据Id 将这些数据修改为 已推送 避免下次重复推送
        aeRaiVaristorMapper.updateVaristorIsRead(varistorIds);
    }

代码分析:

1.获取当前类名 + 当前方法名 作为key

2.判断当前是否已经持有锁 说白了 就是是否存在重复执行 避免集群环境出现多次执行

3.查询 未推送过的原材料检验-压敏电阻

4.判断如果存在 数据 调用远程接口 主动推送

5.使用stream流 过滤出刚刚推送成功的Id

6.根据Id 将这些数据修改为 已推送 避免下次重复推送


三.Redis 代码逻辑

/**
     * redis 任务锁key的前缀
     */
    public static final String REDIS_LOCK_KEY = "com:guodian:job";


    @Resource
    private RedisTemplate<String, Object> redisTemplate;


    /**
     * 判断是否有锁。有 返回true 无 设置一定有效期锁 并 返回 false
     * @param lockName key
     * @param timeout 锁的有效期时间
     * @return 是否持有锁
     */
    public  boolean requireLock(String lockName, Long timeout) {
        String key = REDIS_LOCK_KEY + lockName;
        if (StringUtils.isNull(redisTemplate.getExpire(key))){
            redisTemplate.expire(key,timeout, TimeUnit.SECONDS); //时间单位为秒 可以设置小时和天
            return false;
        }
        return true;
    }



    /**
     * 判断是否有锁。有 返回true 无 设置有效时间截止点 并 返回 false
     * @param lockName key
     * @param date 锁的有效时间截止点
     * @return 是否持有锁
     */
    public boolean requireLock(String lockName, Date date) {
        String key = REDIS_LOCK_KEY + lockName;
        if (StringUtils.isNull(redisTemplate.getExpire(key))){
            redisTemplate.expireAt(key,date);
            return false;
        }
        return true;
    }

代码分析:

1.这里有两个方法 第一个方法:判断是否有锁。有 返回true 无 设置一定有效期锁 并 返回 false

第二个方法 :判断是否有锁。有 返回true 无 设置有效时间截止点 并 返回 false


三.远程调用

/**
 * @Author wufushan
 * @Date 2022/4/20 15:14
 * @Version 1.0
 */@FeignClient(name = "remoteInitiativeAeService", url = "http://localhost:8080",fallbackFactory = RemoteGetUserFactory.class )public interface RemoteInitiativeAeService {


    /**
     * 主动推送压敏电阻信息
     * @param raiVaristorVo 压敏电阻参数
     * @return 推送数量
     */
    @PostMapping("/initiative/aeVaristor")
    R<Integer> addAeVaristor(@Param("raiVaristorVo") List<AeRaiVaristorVO> raiVaristorVo);}

四.熔断代码-hystrix

/**
 * @Author wfs
 * @Date 2022/4/20 15:15
 * @Version 1.0
 */public class RemoteInitiativeAeFactory implements FallbackFactory<RemoteInitiativeAeService> {

    private static final Logger log = LoggerFactory.getLogger(RemoteInitiativeAeService.class);


    @Override
    public RemoteInitiativeAeService create(Throwable throwable) {
        log.info("远程WebSocket服务调用失败!{}", throwable.getMessage());

        return new RemoteInitiativeAeService() {

            @Override
            public R<Integer> addAeVaristor(List<AeRaiVaristorVO> raiVaristorVo) {
                return R.fail("主动推送压敏电阻数据失败!!");
            }
        };
    }}

五.远程调用及跨模块调用 想看更详细的 请点击!!

远程调用跨服务+跨模块超详细

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: