xxl-job总结 一(调度器任务调用原理)

it2025-12-14  9

xxl-job总结 一(任务调用原理)

1、通过XxlJobAdminConfig加载配置,属性配置文件配置完成后执行afterPropertiesSet(),执行XxlJobScheduler.init()方法启动相关配置

@Component public class XxlJobAdminConfig implements InitializingBean, DisposableBean { private static XxlJobAdminConfig adminConfig = null; public static XxlJobAdminConfig getAdminConfig() { return adminConfig; } // ---------------------- XxlJobScheduler ---------------------- private XxlJobScheduler xxlJobScheduler; /** * 调度器启动后,属性配置文件配置完成后执行 * @throws Exception */ @Override public void afterPropertiesSet() throws Exception { adminConfig = this; //初始化任务执行 xxlJobScheduler = new XxlJobScheduler(); xxlJobScheduler.init(); }

2、JobScheduleHelper.getInstance().start(); 启动调度器

public class XxlJobScheduler { private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class); public void init() throws Exception { // init i18n initI18n(); // admin registry monitor run JobRegistryMonitorHelper.getInstance().start(); // admin fail-monitor run JobFailMonitorHelper.getInstance().start(); // admin lose-monitor run JobLosedMonitorHelper.getInstance().start(); // admin trigger pool start JobTriggerPoolHelper.toStart(); // admin log report start JobLogReportHelper.getInstance().start(); // start-schedule 启动调度器 JobScheduleHelper.getInstance().start(); logger.info(">>>>>>>>> init xxl-job admin success."); }

3、启动调度线程、时间轮环形线程;过期5s内,立即触发一次执行JobTriggerPoolHelper.trigger(),当前时间开始计算下次触发时间;

/** * 调度器 * 死循环,在xxl_job_info表里取将要执行的任务,更新下次执行时间的,调用JobTriggerPoolHelper类,来给执行器发送调度任务的 * @author xuxueli 2019-05-21 */ public class JobScheduleHelper { private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class); private static JobScheduleHelper instance = new JobScheduleHelper(); public static JobScheduleHelper getInstance(){ return instance; } public static final long PRE_READ_MS = 5000; // pre read /** * 调度线程 */ private Thread scheduleThread; /** * 时间轮环形线程 */ private Thread ringThread; private volatile boolean scheduleThreadToStop = false; private volatile boolean ringThreadToStop = false; /** * ringData是以0到59的整数为key,以jobId集合为value的Map集合 */ private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>(); public void start(){ // schedule thread scheduleThread = new Thread(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 ); } catch (InterruptedException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>> init xxl-job admin scheduler success."); // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20) int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20; while (!scheduleThreadToStop) { // Scan Job long start = System.currentTimeMillis(); Connection conn = null; Boolean connAutoCommit = null; PreparedStatement preparedStatement = null; boolean preReadSuc = true; try { conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection(); connAutoCommit = conn.getAutoCommit(); conn.setAutoCommit(false); //利用for update语句进行获取任务的资格锁定,再去获取未来5秒内即将要执行的任务 preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); preparedStatement.execute(); // tx start // 1、pre read ->预读5s内调度任务 long nowTime = System.currentTimeMillis(); List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); if (scheduleList!=null && scheduleList.size()>0) { // 2、push time-ring ->推送时间轮 for (XxlJobInfo jobInfo: scheduleList) { // time-ring jump // 时间轮刻度计算->当前时间大于(下一次调度时间+5s) if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) { // 过期超5s:本地忽略,当前时间开始计算下次触发时间 // 2.1、trigger-expire > 5s:pass && make next-trigger-time logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId()); // fresh next->根据Cron表达式,获得下一次调度时间 refreshNextValidTime(jobInfo, new Date()); } else if (nowTime > jobInfo.getTriggerNextTime()) { // 过期5s内 :立即触发一次,当前时间开始计算下次触发时间; // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time // 1、trigger -> 立即执行 JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); // 2、fresh next ->刷新下一次调度时间 refreshNextValidTime(jobInfo, new Date()); // next-trigger-time in 5s, pre-read again //下次5s内:预读一次; if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) { // 1、make ring second->计算下一个调度所在的环形空间下标 int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 2、push time ring->任务缓存到环形时间轴上 pushTimeRing(ringSecond, jobInfo.getId()); // 3、fresh next->刷新下一次调度时间 refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime())); } } else { // 未过期:正常触发,递增计算下次触发时间 // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time // 1、make ring second int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 2、push time ring pushTimeRing(ringSecond, jobInfo.getId()); // 3、fresh next refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime())); } } // 3、更新trigger信息 // 3、update trigger info for (XxlJobInfo jobInfo: scheduleList) { XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); } } else { preReadSuc = false; } // tx stop } catch (Exception e) { if (!scheduleThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e); } } finally { // commit if (conn != null) { try { conn.commit(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } try { conn.setAutoCommit(connAutoCommit); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } try { conn.close(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } // close PreparedStatement if (null != preparedStatement) { try { preparedStatement.close(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } } long cost = System.currentTimeMillis()-start; // Wait seconds, align second if (cost < 1000) { // scan-overtime, not wait try { // pre-read period: success > scan each second; fail > skip this period; TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000); } catch (InterruptedException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop"); } }); scheduleThread.setDaemon(true); scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread"); scheduleThread.start(); // ring thread ringThread = new Thread(new Runnable() { @Override public void run() { // align second try { TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 ); } catch (InterruptedException e) { if (!ringThreadToStop) { logger.error(e.getMessage(), e); } } while (!ringThreadToStop) { try { // second data List<Integer> ringItemData = new ArrayList<>(); int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度; for (int i = 0; i < 2; i++) { List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 ); if (tmpData != null) { ringItemData.addAll(tmpData); } } // ring trigger logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) ); if (ringItemData.size() > 0) { // do trigger for (int jobId: ringItemData) { // do trigger JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null); } // clear ringItemData.clear(); } } catch (Exception e) { if (!ringThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e); } } // next second, align second try { TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000); } catch (InterruptedException e) { if (!ringThreadToStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop"); } }); ringThread.setDaemon(true); ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread"); ringThread.start(); }

4、将任务交给一个线程池,并选择线程池,1分钟窗口期内任务耗时达500ms超过10次,该窗口期内判定为慢任务,慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性。 触发执行任务XxlJobTrigger.trigger();

/** * 将任务提交给一个线程池,在线程池中调用XxlJobTrigger.trigger执行任务 * add trigger */ public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam, final String addressList) { // choose thread pool //1分钟窗口期内任务耗时达500ms超过10次,该窗口期内判定为慢任务,慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性 //快线程池默认8个核心线程,最大线程200,任务队列1000 //慢线程池默认0个核心线程,最大线程100,任务队列2000 //快速线程池 ThreadPoolExecutor triggerPool_ = fastTriggerPool; AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId); if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min triggerPool_ = slowTriggerPool; } // trigger triggerPool_.execute(new Runnable() { @Override public void run() { long start = System.currentTimeMillis(); try { // do trigger -> 触发执行任务 XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { // check timeout-count-map long minTim_now = System.currentTimeMillis()/60000;//一分钟 if (minTim != minTim_now) {//判断是否为一分钟内 minTim = minTim_now; jobTimeoutCountMap.clear();//清理一分钟内的所有任务计数 } // incr timeout-count-map long cost = System.currentTimeMillis()-start; if (cost > 500) { // ob-timeout threshold 500ms ->耗时超过500ms就计算为慢请求,加入慢线程池 AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1)); if (timeoutCount != null) { timeoutCount.incrementAndGet(); } } } } }); }

5、设置任务参数,然后执行任务处理,XxlJobTrigger.processTrigger();

/** * trigger job * 触发任务 * @param jobId * @param triggerType * @param failRetryCount 失败重试次数 * >=0: use this param * <0: use param from job info config * @param executorShardingParam 分片参数 * @param executorParam 执行参数 * null: use job param * not null: cover job param */ public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) { // load data->查询任务数据信息 XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId); if (jobInfo == null) { logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId); return; } //添加执行参数 if (executorParam != null) { jobInfo.setExecutorParam(executorParam); } //设置失败重试次数 int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount(); //获取该类型的执行器信息 XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup()); // cover addressList 覆盖地址 if (addressList!=null && addressList.trim().length()>0) { group.setAddressType(1); group.setAddressList(addressList.trim()); } // sharding param->设置分片参数 int[] shardingParam = null; if (executorShardingParam!=null){ String[] shardingArr = executorShardingParam.split("/"); if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) { shardingParam = new int[2]; shardingParam[0] = Integer.valueOf(shardingArr[0]); shardingParam[1] = Integer.valueOf(shardingArr[1]); } } //执行器路由策略->如果是分片广播 if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && group.getRegistryList()!=null && !group.getRegistryList().isEmpty() && shardingParam==null) { //广播模式,循环执行器配置的服务地址列表 for (int i = 0; i < group.getRegistryList().size(); i++) { processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size()); } } else { //如果不是分片广播 if (shardingParam == null) { shardingParam = new int[]{0, 1}; } processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]); } }

6、处理触发->组装任务参数,选择路由和阻塞策略,执行runExecutor()

/** * 处理触发->组装任务参数,选择路由和阻塞策略 * * 问题: * * 由于某些场景,某个任务必须保证只能执行一次,或者宁愿不执行也不允许重复执行,比方说发放优惠券,可以少发或者不发,但是不能多发,这种情况下该选择什么策略? * @param group job group, registry list may be empty * @param jobInfo * @param finalFailRetryCount * @param triggerType * @param index sharding index * @param total sharding index */ private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){ // param //匹配运行模式,如果匹配不到则默认为阻塞策略 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy //匹配路由策略 ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy //设置分片路由策略参数 String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null; // 1、save log-id->存储log日志信息 XxlJobLog jobLog = new XxlJobLog(); jobLog.setJobGroup(jobInfo.getJobGroup()); jobLog.setJobId(jobInfo.getId()); jobLog.setTriggerTime(new Date()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId()); // 2、init trigger-param->初始化 trigger参数: TriggerParam triggerParam = new TriggerParam(); triggerParam.setJobId(jobInfo.getId()); triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); triggerParam.setExecutorParams(jobInfo.getExecutorParam()); triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy()); triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout()); triggerParam.setLogId(jobLog.getId()); triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime()); triggerParam.setGlueType(jobInfo.getGlueType()); triggerParam.setGlueSource(jobInfo.getGlueSource()); triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); triggerParam.setBroadcastIndex(index); triggerParam.setBroadcastTotal(total); // 3、init address->选择执行器服务地址 String address = null; ReturnT<String> routeAddressResult = null; if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) { //广播模式 if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) { if (index < group.getRegistryList().size()) { address = group.getRegistryList().get(index); } else { address = group.getRegistryList().get(0);//基本不可能执行 } } else { //非广播模式->根据路由策略,选择合适的执行器服务地址来执行任务 routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList()); if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) { //获取路由地址 address = routeAddressResult.getContent(); } } } else { routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty")); } // 4、trigger remote executor->触发执行 ReturnT<String> triggerResult = null; if (address != null) { triggerResult = runExecutor(triggerParam, address); } else { triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null); } // 5、collection trigger info->收集执行结果信息 StringBuffer triggerMsgSb = new StringBuffer(); triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":") .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") ); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle()); if (shardingParam != null) { triggerMsgSb.append("("+shardingParam+")"); } triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount); triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>") .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():""); // 6、save log trigger-info 更新执行日志 jobLog.setExecutorAddress(address); jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); jobLog.setExecutorParam(jobInfo.getExecutorParam()); jobLog.setExecutorShardingParam(shardingParam); jobLog.setExecutorFailRetryCount(finalFailRetryCount); //jobLog.setTriggerTime(); jobLog.setTriggerCode(triggerResult.getCode()); jobLog.setTriggerMsg(triggerMsgSb.toString()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId()); }

7、根据参数以及 机器地址,向执行器发送执行信息,获取执行器动态代理类,进入动态代理,进行rpc请求,调用执行器的ExecutorBizImpl#run方法

/** * run executor->根据参数以及 机器地址,向执行器发送执行信息 * @param triggerParam * @param address * @return */ public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){ ReturnT<String> runResult = null; try { //获取执行器动态代理类 ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address); //进入动态代理,进行rpc请求,调用执行器的ExecutorBizImpl#run方法 runResult = executorBiz.run(triggerParam); } catch (Exception e) { logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e); runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e)); } StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":"); runResultSB.append("<br>address:").append(address); runResultSB.append("<br>code:").append(runResult.getCode()); runResultSB.append("<br>msg:").append(runResult.getMsg()); runResult.setMsg(runResultSB.toString()); return runResult; }
最新回复(0)