模拟任务阻塞处理策略
官方描述
阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
- 单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
- 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
- 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
为了验证并且更加熟悉XXL-JOB这一块的功能,遂做了如下的实验:
实验
编写3个任务执行方法
模拟【阻塞处理策略】为
单机串行
1
2
3
4
5
6
7
public ReturnT<String> serialExecutionJobHandler(String param) throws Exception {
XxlJobLogger.log("XXL-JOB, serialExecutionJobHandler.");
// 休眠1分钟,模拟执行时间特别长
TimeUnit.SECONDS.sleep(60);
return ReturnT.SUCCESS;
}模拟【阻塞处理策略】为
丢弃后续调度
1
2
3
4
5
6
7
public ReturnT<String> discardLaterJobHandler(String param) throws Exception {
XxlJobLogger.log("XXL-JOB, discardLaterJobHandler.");
// 休眠1分钟,模拟执行时间特别长
TimeUnit.SECONDS.sleep(60);
return ReturnT.SUCCESS;
}模拟【阻塞处理策略】为
覆盖之前调度
1
2
3
4
5
6
7
public ReturnT<String> coverEarlyJobHandler(String param) throws Exception {
XxlJobLogger.log("XXL-JOB, coverEarlyJobHandler.");
// 休眠1分钟,模拟执行时间特别长
TimeUnit.SECONDS.sleep(60);
return ReturnT.SUCCESS;
}
在调度中心页面创建3个任务,分别对应3种阻塞处理策略
任务配置
对每个任务,使用调度中心页面上的
执行一次
按钮 触发 (第一次触发是为了第二次任务进行时存在阻塞的任务)观察每个任务的日志:
单机串行【任务id为3】
可见两次调度都是成功的,但是第二个任务一直在等待第一个任务执行完成,才会被执行。丢弃后续调度任务【任务id为4】
可见因为这个任务有一次调度正在被执行中,并且阻塞处理策略为【丢弃后续调度任务】,所以第二次调度结果直接为失败!覆盖之前调度任务【任务id为5】
可见该任务第二次被调度时,发现已经有调度正在被执行(第一次调度结果为成功),由于阻塞策略设置为【覆盖之前调度任务】,所以执行器会将第一次调度kill掉,然后开始执行第二次调度。
分析
- 执行阻塞策略是在执行器中判断、执行的
- 调度中心在调度任务时,会将任务配置的阻塞策略通过RPC传给执行器
- 执行器在执行时,调用的是
com.xxl.job.core.biz.impl.ExecutorBizImpl#run
方法 - 该方法最后一段就是在根据阻塞策略进行一些判断,如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27// 阻塞策略判断
if (jobThread != null) {
// 进入这个判断,则表示当前执行任务的线程中有正在运行的
// 所以需要根据阻塞策略决定是否执行本次调度
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// 因为上一次的调度正在线程池中跑,需要再次判断
if (jobThread.isRunningOrHasQueue()) {
// 丢弃后续调度
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
// kill running jobThread
// 覆盖之前调度
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
// just queue trigger
}
}
// 其实这里才是 覆盖之前调度 正在在执行本次调度
// replace thread (new or exists invalid)
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}