xxl-job实践

xxl-job实践

官方文档XXL-JOB官方文档

1. 前言

XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展

  • 依赖mysql,不依赖第三方调度中心【elastic-job依赖zookeeper】
  • 支持任务的动态修改
  • 支持可视化修改(WEB页面直接进行CRUD操作)
  • 支持多种任务模式:Bean模式(类级别)、Bean模式(方法级别)、Glue模式

2. xxl-job核心概念

2.1 组成模块&架构

设计思想:利用"执行器"将"调度"与"任务"解耦;调度中心负责发起调度请求,任务被抽象为分散的JobHandler,执行器负责接收调度中心的请求,并执行对应的任务"

xxl-job整体架构分为三个部分:

  • 调度中心需要单独部署一个工程,负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码
    1. 调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块
    2. 支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover
  • 执行器可以改造现有业务工程,负责接收调度请求并执行任务逻辑【执行器本质上是一个内嵌server,默认端口为9999

    1. 任务模块专注于任务的执行等操作,开发和维护更加简单和高效
    2. 接收“调度中心”的执行请求、终止请求和日志请求
  • 任务:即具体的定时任务业务逻辑,主要分为以下几种运行模式

    1. BEAN模式:任务以JobHandler方式维护在执行器端;需要结合 “JobHandler” 属性匹配执行器中任务
    2. GLUE模式(Java):任务以源码方式维护在调度中心;该模式的任务实际上是一段继承自IJobHandler的Java类代码并 “groovy” 源码方式维护,它在执行器项目中运行,可使用@Resource/@Autowire注入执行器里中的其他服务

2.2 数据表结构

  • xxl_job_lock:任务调度锁表;
  • xxl_job_group:执行器信息表,维护任务执行器信息;
  • xxl_job_info:调度扩展信息表: 用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等;
  • xxl_job_log:调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;
  • xxl_job_log_report:调度日志报表:用户存储XXL-JOB任务调度日志的报表,调度中心报表功能页面会用到;
  • xxl_job_logglue:任务GLUE日志:用于保存GLUE更新历史,用于支持GLUE的版本回溯功能;
  • xxl_job_registry:执行器注册表,维护在线的执行器和调度中心机器地址信息;
  • xxl_job_user:系统用户表;

2.3 任务的运行模式

2.3.1 Bean模式任务

  • 每个任务对应一个Sping的bean实例
  • 任务上声明@JobHandler(value=”名称”)注解,用于被执行器识别
  • 任务集成统一的IJobHandler接口,覆写execute方法,执行器接收到调度中心的请求时,会自动调用该方法
  • execute方法中提供具体的业务逻辑
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    /**
    *
    * @Title: NewMediaCollectHandle
    * @Package com.dayang.dycmedit.handler
    * @Description: 新媒体稿件采集定时器(APP、微信、微博、网页)
    * @author: ysy
    * @date: 2019年12月16日 下午5:51:24
    * @version V1.0
    */
    @JobHandler(value="newMediaManuscriptCollectHandle")
    @Service
    public class NewMediaManuscriptCollectHandle extends IJobHandler{
    @Autowired
    private AssessmentTaskService assessmentTaskService;

    @Override
    public ReturnT<String> execute(String param) throws Exception {
    // 分片参数
    ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
    int shardingItem = shardingVO.getIndex();
    int shardingTotalCount = shardingVO.getTotal();
    XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingItem, shardingTotalCount);
    XxlJobLogger.log("接收参数:{}", param);
    XxlJobParam xxlJobParam = new XxlJobParam();
    if(DyStringUtils.isNotEmpty(param)){
    xxlJobParam = new GsonBuilder().setDateFormat("yyyy-MM-dd").create().fromJson(param, XxlJobParam.class);
    }
    try{
    assessmentTaskService.firstGetAllManuscript("1", 1, xxlJobParam.getCollectRange(), xxlJobParam.getChannelId(), shardingTotalCount, shardingItem);
    }catch (Exception e) {
    XxlJobLogger.log("定时任务失败:",e);
    }
    // 业务逻辑
    return SUCCESS;
    }
    }

    2.3.2 Glue模式任务

  • 任务代码托管在调度中心,在执行器中执行
  • 任务的代码,实际上是一个继承自IJobHandler实现类的类代码
  • 执行器接收调度中心请求时,会通过Groovy类加载器加载此代码,实例化成Java对象,同时注入此代码中声明的Spring服务,并调用execute方法
    【须确保Glue代码中的服务和类引用在“执行器”项目中存在】
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @JobHandler(value="demoJobHandler")
    @Component
    public class DemoJobHandler extends IJobHandler {
    @Override
    public ReturnT<String> execute(String param) throws Exception {
    XxlJobLogger.log("XXL-JOB, Hello World.");

    for (int i = 0; i < 5; i++) {
    XxlJobLogger.log("beat at:" + i);
    TimeUnit.SECONDS.sleep(2);
    }
    return SUCCESS;
    }
    }

2.3.3 任务注册,任务自动发现

基于DB进行实现,模块间通讯使用http

  • AppName: 每个执行器机器集群的唯一标示, 任务注册以 “执行器” 为最小粒度进行注册; 每个任务通过其绑定的执行器可感知对应的执行器机器列表;
  • 注册表: 见”xxl_job_registry”表, “执行器” 在进行任务注册时将会周期性维护一条注册记录,即机器地址和AppName的绑定关系; “调度中心” 从而可以动态感知每个AppName在线的机器列表;
  • 执行器注册: 任务注册Beat周期默认30s; 执行器以一倍Beat进行执行器注册, 调度中心以一倍Beat进行动态任务发现; 注册信息的失效时间为三倍Beat;
  • 执行器注册摘除:执行器销毁时,将会主动上报调度中心并摘除对应的执行器机器信息,提高心跳注册的实时性;

2.3.4 分片广播

执行器集群部署时,路由策略选择"分片广播",任务调度将会广播给所有执行器,并传递分片参数

1
2
// 获取分片参数(index:当前分片序号,0开始;total:总分片数)
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();

2.3.5 路由策略

执行器集群部署时,调度中心的路由规则

  1. FIRST(第一个):固定选择第一个机器;
  2. LAST(最后一个):固定选择最后一个机器;
  3. ROUND(轮询):;
  4. RANDOM(随机):随机选择在线的机器;
  5. CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
  6. LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
  7. LEAST_RECENTLY_USED(最近最久未使用):最久为使用的机器优先被选举;
  8. FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
  9. BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
  10. SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

2.3.6 官方API

  • 调度中心API服务
业务相关
` API服务位置:com.xxl.job.admin.controller.JobInfoController.java`
> 1. 任务列表查询;2、任务新增;3、任务更新;4、任务删除;5、任务启动;6、任务停止;7、任务触发;

执行器相关
` API服务位置:com.xxl.job.admin.controller.JobInfoController.java`
` API服务请求参考代码:com.xxl.job.adminbiz.AdminBizTest.java`
> - 任务结果回调服务;
> - 执行器注册服务;
> - 执行器注册摘除服务;
> - 触发任务单次执行服务,支持任务根据业务事件触发;
  • 执行器API服务
    API服务位置:com.xxl.job.core.biz.ExecutorBiz
    API服务请求参考代码:com.xxl.job.executor.ExecutorBizTest

    1、心跳检测:调度中心使用
    2、忙碌检测:调度中心使用
    3、触发任务执行:调度中心使用;本地进行任务开发时,可使用该API服务模拟触发任务;
    4、获取Rolling Log:调度中心使用
    5、终止任务:调度中心使用

3. 分布式任务调度实战

3.1 构建调度中心

直接下载官方xxl-job-admin项目,构建数据库,修改配置,部署项目即可

3.2 构建执行器&分片任务

3.2.1 改造当前工程为执行器

  • 引入maven依赖
    1
    2
    3
    4
    5
    6
    <!-- xxl-job相关依赖 -->
    <dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.1.0</version>
    </dependency>
  • 新增application相关配置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    ### 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
    xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
    ### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
    xxl.job.executor.appname=xxl-job-executor-sample
    ### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
    xxl.job.executor.ip=
    ### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
    xxl.job.executor.port=9999
    ### 执行器通讯TOKEN [选填]:非空时启用;
    xxl.job.accessToken=
    ### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
    xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
    ### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
    xxl.job.executor.logretentiondays=30

    3.2.2 定义bean模式任务

  • 显示地声明SpringBean注解、@JobHandler注解
  • 继承IJobHandler,重写execute方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    /**
    *
    * @Title: ZhixinManuscriptCollectHandle
    * @Package com.dayang.dycmedit.handler
    * @Description: 台内稿件采集任务(网页、微信、微博、电视、串联单、报选题)
    * @author: ysy
    * @date: 2019年12月16日 下午6:52:42
    * @version V1.0
    */
    @JobHandler(value="zhixinManuscriptCollectHandle")
    @Service
    public class ZhixinManuscriptCollectHandle extends IJobHandler{
    private static Logger logger = LoggerFactory.getLogger(ZhixinManuscriptCollectHandle.class);
    @Autowired
    private DataStatisticService dataStatisticService;
    @Autowired
    private EditProperties editProperties;

    @Override
    public ReturnT<String> execute(String param) throws Exception {
    // 分片参数
    ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
    int shardingItem = shardingVO.getIndex();
    int shardingTotalCount = shardingVO.getTotal();
    XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingItem, shardingTotalCount);
    XxlJobLogger.log("接收参数:{}", param);
    XxlJobParam xxlJobParam = new XxlJobParam();
    if(DyStringUtils.isNotEmpty(param)){
    xxlJobParam = new GsonBuilder().setDateFormat("yyyy-MM-dd").create().fromJson(param, XxlJobParam.class);
    }
    try{
    StasticCollectRequest request = new StasticCollectRequest();
    request.setMainStatus(ChannelTypeEnum.ALL.getChannelType());
    request.setCurrentPage(1);
    request.setPageSize(1000);
    request.setTenantId(editProperties.getTenantId());
    Calendar calendar = Calendar.getInstance();
    calendar.add(Calendar.DATE, -1);
    request.setStartDate(DateUtils.toString(calendar.getTime()));
    request.setEndDate(DateUtils.toString(new Date()));
    UserInfo user = new UserInfo();
    user.setTenantId(editProperties.getTenantId());
    if(ChannelTypeEnum.ZHIXIN_CMS.getChannelId().equalsIgnoreCase(xxlJobParam.getChannelId())){
    dataStatisticService.collectCmsFromZhixin(request, user);
    }
    if(ChannelTypeEnum.ZHIXIN_MICROBLOG.getChannelId().equalsIgnoreCase(xxlJobParam.getChannelId())){
    dataStatisticService.collectMicroblogFromZhixin(request, user);
    }
    if(ChannelTypeEnum.ZHIXIN_WECHAT.getChannelId().equalsIgnoreCase(xxlJobParam.getChannelId())){
    dataStatisticService.collectWechatFromZhixin(request, user);
    }
    /*if(ChannelTypeEnum.TOPIC.getChannelId().equalsIgnoreCase(xxlJobParam.getChannelId())){
    dataStatisticService.collectWechatFromZhixin(request, user);
    }*/
    /*if(ChannelTypeEnum.BILL.getChannelId().equalsIgnoreCase(xxlJobParam.getChannelId())){
    dataStatisticService.collectWechatFromZhixin(request, user);
    }
    if(ChannelTypeEnum.TV.getChannelId().equalsIgnoreCase(xxlJobParam.getChannelId())){
    dataStatisticService.collectWechatFromZhixin(request, user);
    }*/
    }catch (Exception e) {
    XxlJobLogger.log("定时任务失败:",e);
    }
    // 业务逻辑
    return SUCCESS;
    }
    }

3. 配置执行器&任务

  • 登录:10.10.0.120:8880/xxl-job-admin/toLogin
  • 进入执行器管理,新增执行器
  • 进入任务管理页面,新增任务
坚持原创技术分享,您的支持将鼓励我继续创作!