实战:如何设计一个定时任务

前言

最近有个需求,是做一个定时任务服务,忽然想到以前面试时被问过,所以今天打算从实战角度去写一个

注意:目录里开始实现是简单代码,如果对quartz很熟练可以跳到丰富细节那里

需求

设计一个定时任务管理器,页面上支持增删改查,而且可以知道下次执行时间是啥时候,对已经在运行的任务可以操作,最好使用cron去执行,保证存量定时任务切割简单

语言使用java

开始实现

定时任务可以用quartz框架,不但简单,而且工具超级多,内部的Scheduler可对接口进行增删改查,很舒服

执行任务时需要有四个值:唯一标识、cron时间、执行的类、上下文数据,很符合需求

于是开始撸代码

1、任务去create时要求的太多了,如不用一个JobBean包装一下,简化操作

可能有的小伙伴阅读源码能力,这里看注释就好了

@Data
@AllArgsConstructor
@NoArgsConstructor
public class JobBean {
    /**
     * 任务名称
     */
    @NonNull
    private String jobName;
    /**
     * 任务分组
     */
    private String jobGroup;
    /**
     * 任务执行的类
     */
    @NonNull
    private Class jobClass;
    /**
     * 任务周期
     */
    private String cron = "0/3 * * * * ? ";
    /**
     * 备注
     * 不知道有啥用,但加上总没错
     */
    private String description;
    /**
     * job执行时所需要的数据
     */
    private Map data;
​
    public String getJobGroup() {
        if (StringUtils.isEmpty(this.jobGroup)) {
            return this.jobName + "_group";
        } else {
            return this.jobGroup;
        }
    }
}

2、用原生的Scheduler去操作任务实在是太复杂了,处理一下!

@Service
public class JobUtils {
​
    @Resource
    private Scheduler scheduler;
​
    /**
     * 生成任务
     *
     * @param jobBean 任务bean
     */
    public Date createJob(JobBean jobBean) throws SchedulerException {
        JobBuilder jobBuilder = JobBuilder
                .newJob(jobBean.getJobClass())
                .storeDurably() // 开启持久化
                .withIdentity(jobBean.getJobName(), jobBean.getJobGroup()) // 唯一标识
                .withDescription(jobBean.getDescription());// 备注
        jobBean.getData().forEach(jobBuilder::usingJobData);
        JobDetail jobDetail = jobBuilder.build();
​
        Trigger trigger = TriggerBuilder
                .newTrigger()
                .forJob(jobDetail) // 进行关联
                .withIdentity(jobBean.getJobName() + "_trigger", jobBean.getJobGroup() + "_trigger") // 唯一标识
                .withSchedule(CronScheduleBuilder.cronSchedule(jobBean.getCron())) // 时间规则
                .build();
        return scheduler.scheduleJob(jobDetail, trigger);
    }
​
    /**
     * 暂停任务
     *
     * @param jobBean 任务名
     */
    public void pauseJob(JobBean jobBean) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(jobBean.getJobName(), jobBean.getJobGroup());
        scheduler.pauseJob(jobKey);
    }
​
    /**
     * 恢复任务
     *
     * @param jobBean 任务名
     */
    public void resumeJob(JobBean jobBean) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(jobBean.getJobName(), jobBean.getJobGroup());
        scheduler.resumeJob(jobKey);
    }
​
    /**
     * 删除任务
     *
     * @param jobBean 任务名
     */
    public boolean deleteJob(JobBean jobBean) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(jobBean.getJobName(), jobBean.getJobGroup());
        return scheduler.deleteJob(jobKey);
    }
​
    /**
     * 修改任务
     *
     * @param jobBean 任务bean
     */
    public Date modifyJon(JobBean jobBean) throws SchedulerException {
        // 获取任务唯一标识
        TriggerKey triggerKey = TriggerKey.triggerKey(jobBean.getJobName() + "_trigger");
        // 通过唯一标识获取旧的触发器对象
        CronTrigger oldTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
        // 实用新的cron表达式构建新的触发器
        String newCron = jobBean.getCron();
        CronTrigger newTrigger = oldTrigger.getTriggerBuilder()
                .withSchedule(CronScheduleBuilder
                        .cronSchedule(newCron)
                        .withMisfireHandlingInstructionDoNothing()
                )
                .build();
        // 调度器更新任务的触发器
        return scheduler.rescheduleJob(triggerKey, newTrigger);
    }
​
    /**
     * 查询所有
     *
     * @return List
     * @throws SchedulerException 远程计划有关的
     */
    public List queryAll() throws SchedulerException {
        List response = new ArrayList<>();
        Set triggerKeys = scheduler.getTriggerKeys(GroupMatcher.anyTriggerGroup());
        for (TriggerKey triggerKey : triggerKeys) {
            Trigger trigger = scheduler.getTrigger(triggerKey);
            JobKey jobKey = trigger.getJobKey();
            JobDetail jobDetail = scheduler.getJobDetail(jobKey);
            Trigger.TriggerState triggerState = scheduler.getTriggerState(triggerKey);
            String status = null;
            switch (triggerState) {
                case NONE:
                    /*
                    这个状态可能表示Trigger或Job尚未被初始化或没有设置状态。在实际应用中,您可能不太会看到这个状态,因为一旦Trigger被创建并加入到Scheduler中,它通常会被赋予一个明确的状态。
                     */
                    System.out.println("NONE");
                    status = "执行中";
                    break;
                case NORMAL:
                    /*
                    对于Trigger,NORMAL状态通常表示它正在等待下一次触发时间到来。一旦到达预定的触发时间,Trigger将尝试执行其关联的Job。
                     */
                    System.out.println("等待触发");
                    status = "等待触发";
                    break;
                case PAUSED:
                    /*
                    当Trigger或Scheduler被暂停时,它们会进入PAUSED状态。在PAUSED状态下,Trigger不会触发任何Job,即使它们的预定触发时间已经到达。要恢复执行,您需要取消暂停状态。
                     */
                    System.out.println("暂停");
                    status = "暂停";
                    break;
                case COMPLETE:
                    /*
                    对于Trigger,COMPLETE状态表示它已成功触发其关联的Job,并且没有更多的触发计划。这通常发生在Trigger的所有触发次数都已经用完,或者Job执行完成并且没有更多的触发条件满足时。
                     */
                    System.out.println("COMPLETE");
                    status = "无关联";
                    break;
                case ERROR:
                    /*
                    ERROR状态表示在Trigger或Job的执行过程中发生了错误。这可能是由于多种原因造成的,例如Job实现中的异常、Trigger配置错误或Scheduler的内部错误。当遇到ERROR状态时,您应该检查相关的日志和异常信息以确定问题的原因。
                     */
                    System.out.println("ERROR");
                    status = "报错";
                    break;
                case BLOCKED:
                    /*
                    BLOCKED状态通常与并发执行有关。当Job被配置为不允许并发执行,并且前一个Job实例仍在运行时,新的触发请求会导致Trigger进入BLOCKED状态。这意味着尽管Trigger的触发时间已经到达,但由于Job的并发限制,它不能立即执行。
                     */
                    System.out.println("BLOCKED");
                    status = "并发限制";
                    break;
                default:
                    System.out.println("未知");
            }
​
            JobInfo jobInfo = new JobInfo();
            jobInfo.setJobName(jobDetail.getKey().getName());
            jobInfo.setJobGroup(jobDetail.getKey().getGroup());
            jobInfo.setStatus(status);
            jobInfo.setNetTime(trigger.getNextFireTime());
            if (trigger instanceof CronTrigger) {
                CronTrigger cronTrigger = (CronTrigger) trigger;
                String cron = cronTrigger.getCronExpression();
                jobInfo.setCron(cron);
            }
            response.add(jobInfo);
        }
        return response;
    }
}

3、大功告成,可以用了!!!

1、先创建任务要执行的具体类,很简单,实现一个Job接口就好了

这里细节有点多哈,我就简单写了,具体的去看一下quartz框架

@PersistJobDataAfterExecution // 共享数据
@DisallowConcurrentExecution // 并发问题
@Service
public class MyJob implements Job {
    @Override
    public void execute(JobExecutionContext context) {
        // context表示定时任务执行的环境
        JobDetail jobDetail = context.getJobDetail();
        JobDataMap jobDataMap = jobDetail.getJobDataMap(); // 共享数据
        String countStr = jobDataMap.get("count").toString();
        Integer count = Integer.parseInt(countStr);
        // 任务详细信息
        System.out.println("***************************************");
        System.out.println("第\t" + count + "\t次执行");
        System.out.println("任务名字:" + jobDetail.getKey().getName());
        System.out.println("任务分组名字:" + jobDetail.getKey().getGroup());
        System.out.println("任务类名字:" + jobDetail.getJobClass().getName());
        System.out.println("本次执行时间:" + context.getFireTime());
        System.out.println("下次执行时间:" + context.getNextFireTime());
​
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("执行完成");
        count += 1;
        jobDataMap.put("count", count);
    }
}

2、写个controller触发一下

我这里只写一个demo,删、改、查都一样的

@Resource
private JobUtils jobUtils;
@PostMapping("/add")
public void add(@RequestParam Integer input) throws SchedulerException {
    String jobName = "job:\t" + input;
    String groupName = "group:\t" + input;
    JobBean jobBean = new JobBean();
    jobBean.setCron("0/1 * * * * ? ");
    jobBean.setJobGroup(groupName);
    jobBean.setJobName(jobName);
    jobBean.setJobClass(MyJob.class);
    Map jobData = new HashMap<>();
    jobData.put("count", "1");
    jobBean.setData(jobData);
    System.out.println(jobUtils.createJob(jobBean));
}

丰富细节

如果你拿着以上的代码区面试、或者真的去写代码,会有一大堆问题

以下问题,从面试角度触发,实战的话可能会考虑更多

1、你这个东西,只能单机运行啊,那我配置在库里写配置,本地启动10个,岂不是同一个任务会执行10遍啊

那么解决方法是,必须环境变量标识为服务器时,才会执行,除非手动控制,否者本地不会执行

然后,任务发布服务器只有一个,这样就不会出现重复执行的问题了

2、就一个服务器?你疯了?

把整套系统拆分开,分为任务发布服务和任务执行服务,中间用mq去连接,任务发布服务只有一个,但任务执行服务可以有多个。比较消耗性能的时执行,而不是发布

任务发布服务器只有一个,到达执行时间后,把任务放进MQ里,执行器去订阅

以下问题纯属于面试官找茬,怎么回答看你们自己,以下为怼的方向

1、你这用mq了,无法保证时效性啊

那你用dubbo,springcloud那种rpc去实现啊,那还需要解决容器内无法拿到宿主机ip、较高耦合、任务堆积、削峰、非异步导致OOM问题,你去啊

2、如果任务执行失败了怎么办

哪里失败了,任务发布器报错了查日志

mq失败了是mq的问题

执行器失败了那是监控系统的事,那是另外的价钱

3、我现在有800W个任务,你就一台任务发布器,肯定不行

任务发布器可以再负载均衡啊,dubbo强连呗

什么?不想花钱加服务器?你都这么多订单了,还差这点钱?

4、你这两台服务器了吧,不划算啊

谁说的非要两台服务器,数据量小的话,任务发布服务和任务执行服务放在同一个进程里不就好了

什么,你有很多任务要执行?哦,我看懂了,任务多但不赚钱,那你在干嘛呢;任务多但不花钱,头给你打歪


这是一个从 https://juejin.cn/post/7368484446142824485 下的原始话题分离的讨论话题