xxl-job实践
官方文档:XXL-JOB官方文档
1. 前言
XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展
- 依赖mysql,不依赖第三方调度中心【elastic-job依赖zookeeper】
- 支持任务的动态修改
- 支持可视化修改(WEB页面直接进行CRUD操作)
- 支持多种任务模式:Bean模式(类级别)、Bean模式(方法级别)、Glue模式
2. xxl-job核心概念
2.1 组成模块&架构
设计思想:利用"执行器"将"调度"与"任务"解耦;调度中心负责发起调度请求,任务被抽象为分散的JobHandler,执行器负责接收调度中心的请求,并执行对应的任务"
xxl-job整体架构分为三个部分:
- 调度中心:
需要单独部署一个工程
,负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码- 调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块
- 支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover
执行器:
可以改造现有业务工程
,负责接收调度请求并执行任务逻辑【执行器本质上是一个内嵌server,默认端口为9999】- 任务模块专注于任务的执行等操作,开发和维护更加简单和高效
- 接收“调度中心”的执行请求、终止请求和日志请求
任务:即具体的定时任务业务逻辑,主要分为以下几种运行模式
- BEAN模式:任务以JobHandler方式维护在执行器端;需要结合 “JobHandler” 属性匹配执行器中任务
- GLUE模式(Java):任务以源码方式维护在调度中心;该模式的任务实际上是一段继承自IJobHandler的Java类代码并 “groovy” 源码方式维护,它在执行器项目中运行,可使用@Resource/@Autowire注入执行器里中的其他服务
2.2 数据表结构
- xxl_job_lock:任务调度锁表;
- xxl_job_group:执行器信息表,维护任务执行器信息;
- xxl_job_info:调度扩展信息表: 用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等;
- xxl_job_log:调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;
- xxl_job_log_report:调度日志报表:用户存储XXL-JOB任务调度日志的报表,调度中心报表功能页面会用到;
- xxl_job_logglue:任务GLUE日志:用于保存GLUE更新历史,用于支持GLUE的版本回溯功能;
- xxl_job_registry:执行器注册表,维护在线的执行器和调度中心机器地址信息;
- xxl_job_user:系统用户表;
2.3 任务的运行模式
2.3.1 Bean模式任务
- 每个任务对应一个Sping的bean实例
- 任务上声明
@JobHandler(value=”名称”)
注解,用于被执行器识别 - 任务集成统一的
IJobHandler
接口,覆写execute方法,执行器接收到调度中心的请求时,会自动调用该方法 - execute方法中提供具体的业务逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36/**
*
* @Title: NewMediaCollectHandle
* @Package com.dayang.dycmedit.handler
* @Description: 新媒体稿件采集定时器(APP、微信、微博、网页)
* @author: ysy
* @date: 2019年12月16日 下午5:51:24
* @version V1.0
*/
"newMediaManuscriptCollectHandle") (value=
public class NewMediaManuscriptCollectHandle extends IJobHandler{
private AssessmentTaskService assessmentTaskService;
public ReturnT<String> execute(String param) throws Exception {
// 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
int shardingItem = shardingVO.getIndex();
int shardingTotalCount = shardingVO.getTotal();
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingItem, shardingTotalCount);
XxlJobLogger.log("接收参数:{}", param);
XxlJobParam xxlJobParam = new XxlJobParam();
if(DyStringUtils.isNotEmpty(param)){
xxlJobParam = new GsonBuilder().setDateFormat("yyyy-MM-dd").create().fromJson(param, XxlJobParam.class);
}
try{
assessmentTaskService.firstGetAllManuscript("1", 1, xxlJobParam.getCollectRange(), xxlJobParam.getChannelId(), shardingTotalCount, shardingItem);
}catch (Exception e) {
XxlJobLogger.log("定时任务失败:",e);
}
// 业务逻辑
return SUCCESS;
}
}2.3.2 Glue模式任务
- 任务代码托管在调度中心,在执行器中执行
- 任务的代码,实际上是一个继承自
IJobHandler
实现类的类代码 - 执行器接收调度中心请求时,会通过Groovy类加载器加载此代码,实例化成Java对象,同时注入此代码中声明的Spring服务,并调用execute方法
【须确保Glue代码中的服务和类引用在“执行器”项目中存在】1
2
3
4
5
6
7
8
9
10
11
12
13
14"demoJobHandler") (value=
public class DemoJobHandler extends IJobHandler {
public ReturnT<String> execute(String param) throws Exception {
XxlJobLogger.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {
XxlJobLogger.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
return SUCCESS;
}
}
2.3.3 任务注册,任务自动发现
基于DB进行实现,模块间通讯使用http
- AppName: 每个执行器机器集群的唯一标示, 任务注册以 “执行器” 为最小粒度进行注册; 每个任务通过其绑定的执行器可感知对应的执行器机器列表;
- 注册表: 见”xxl_job_registry”表, “执行器” 在进行任务注册时将会周期性维护一条注册记录,即机器地址和AppName的绑定关系; “调度中心” 从而可以动态感知每个AppName在线的机器列表;
- 执行器注册: 任务注册Beat周期默认30s; 执行器以一倍Beat进行执行器注册, 调度中心以一倍Beat进行动态任务发现; 注册信息的失效时间为三倍Beat;
- 执行器注册摘除:执行器销毁时,将会主动上报调度中心并摘除对应的执行器机器信息,提高心跳注册的实时性;
2.3.4 分片广播
执行器集群部署时,路由策略选择"分片广播",任务调度将会广播给所有执行器,并传递分片参数
1 | // 获取分片参数(index:当前分片序号,0开始;total:总分片数) |
2.3.5 路由策略
执行器集群部署时,调度中心的路由规则
- FIRST(第一个):固定选择第一个机器;
- LAST(最后一个):固定选择最后一个机器;
- ROUND(轮询):;
- RANDOM(随机):随机选择在线的机器;
- CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
- LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
- LEAST_RECENTLY_USED(最近最久未使用):最久为使用的机器优先被选举;
- FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
- BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
- SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
2.3.6 官方API
- 调度中心API服务
业务相关
` API服务位置:com.xxl.job.admin.controller.JobInfoController.java`
> 1. 任务列表查询;2、任务新增;3、任务更新;4、任务删除;5、任务启动;6、任务停止;7、任务触发;
执行器相关
` API服务位置:com.xxl.job.admin.controller.JobInfoController.java`
` API服务请求参考代码:com.xxl.job.adminbiz.AdminBizTest.java`
> - 任务结果回调服务;
> - 执行器注册服务;
> - 执行器注册摘除服务;
> - 触发任务单次执行服务,支持任务根据业务事件触发;
执行器API服务
API服务位置:com.xxl.job.core.biz.ExecutorBiz
API服务请求参考代码:com.xxl.job.executor.ExecutorBizTest
1、心跳检测:调度中心使用
2、忙碌检测:调度中心使用
3、触发任务执行:调度中心使用;本地进行任务开发时,可使用该API服务模拟触发任务;
4、获取Rolling Log:调度中心使用
5、终止任务:调度中心使用
3. 分布式任务调度实战
3.1 构建调度中心
直接下载官方xxl-job-admin项目,构建数据库,修改配置,部署项目即可
3.2 构建执行器&分片任务
3.2.1 改造当前工程为执行器
- 引入maven依赖
1
2
3
4
5
6<!-- xxl-job相关依赖 -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.1.0</version>
</dependency> - 新增application相关配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14### 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
xxl.job.executor.appname=xxl-job-executor-sample
### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
xxl.job.executor.ip=
### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
xxl.job.executor.port=9999
### 执行器通讯TOKEN [选填]:非空时启用;
xxl.job.accessToken=
### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
xxl.job.executor.logretentiondays=303.2.2 定义bean模式任务
- 显示地声明SpringBean注解、@JobHandler注解
- 继承IJobHandler,重写execute方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67/**
*
* @Title: ZhixinManuscriptCollectHandle
* @Package com.dayang.dycmedit.handler
* @Description: 台内稿件采集任务(网页、微信、微博、电视、串联单、报选题)
* @author: ysy
* @date: 2019年12月16日 下午6:52:42
* @version V1.0
*/
"zhixinManuscriptCollectHandle") (value=
public class ZhixinManuscriptCollectHandle extends IJobHandler{
private static Logger logger = LoggerFactory.getLogger(ZhixinManuscriptCollectHandle.class);
private DataStatisticService dataStatisticService;
private EditProperties editProperties;
public ReturnT<String> execute(String param) throws Exception {
// 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
int shardingItem = shardingVO.getIndex();
int shardingTotalCount = shardingVO.getTotal();
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingItem, shardingTotalCount);
XxlJobLogger.log("接收参数:{}", param);
XxlJobParam xxlJobParam = new XxlJobParam();
if(DyStringUtils.isNotEmpty(param)){
xxlJobParam = new GsonBuilder().setDateFormat("yyyy-MM-dd").create().fromJson(param, XxlJobParam.class);
}
try{
StasticCollectRequest request = new StasticCollectRequest();
request.setMainStatus(ChannelTypeEnum.ALL.getChannelType());
request.setCurrentPage(1);
request.setPageSize(1000);
request.setTenantId(editProperties.getTenantId());
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.DATE, -1);
request.setStartDate(DateUtils.toString(calendar.getTime()));
request.setEndDate(DateUtils.toString(new Date()));
UserInfo user = new UserInfo();
user.setTenantId(editProperties.getTenantId());
if(ChannelTypeEnum.ZHIXIN_CMS.getChannelId().equalsIgnoreCase(xxlJobParam.getChannelId())){
dataStatisticService.collectCmsFromZhixin(request, user);
}
if(ChannelTypeEnum.ZHIXIN_MICROBLOG.getChannelId().equalsIgnoreCase(xxlJobParam.getChannelId())){
dataStatisticService.collectMicroblogFromZhixin(request, user);
}
if(ChannelTypeEnum.ZHIXIN_WECHAT.getChannelId().equalsIgnoreCase(xxlJobParam.getChannelId())){
dataStatisticService.collectWechatFromZhixin(request, user);
}
/*if(ChannelTypeEnum.TOPIC.getChannelId().equalsIgnoreCase(xxlJobParam.getChannelId())){
dataStatisticService.collectWechatFromZhixin(request, user);
}*/
/*if(ChannelTypeEnum.BILL.getChannelId().equalsIgnoreCase(xxlJobParam.getChannelId())){
dataStatisticService.collectWechatFromZhixin(request, user);
}
if(ChannelTypeEnum.TV.getChannelId().equalsIgnoreCase(xxlJobParam.getChannelId())){
dataStatisticService.collectWechatFromZhixin(request, user);
}*/
}catch (Exception e) {
XxlJobLogger.log("定时任务失败:",e);
}
// 业务逻辑
return SUCCESS;
}
}
3. 配置执行器&任务
- 登录:10.10.0.120:8880/xxl-job-admin/toLogin
- 进入执行器管理,新增执行器
- 进入任务管理页面,新增任务