Powerjob
开源项目地址 http://www.powerjob.tech/
很强大的一个开源调度项目,但很久没有升级了。这里根据实际使用情况进行了几处改造
升级
1. 重试间隔策略
TaskTracker.java【改造点】
public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) {
if (finished.get()) {
return;
}
TaskStatus nTaskStatus = TaskStatus.of(newStatus);
int lockId = taskId.hashCode();
try {
// 阻塞获取锁
segmentLock.lockInterruptible(lockId);
Long lastReportTime = taskId2LastReportTime.getIfPresent(taskId);
// 缓存中不存在,从数据库查
if (lastReportTime == null) {
Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);
if (taskOpt.isPresent()) {
lastReportTime = taskOpt.get().getLastReportTime();
} else {
// 理论上不存在这种情况,除非数据库异常
log.error("[TaskTracker-{}-{}] can't find task by taskId={}.", instanceId, subInstanceId, taskId);
}
if (lastReportTime == null) {
lastReportTime = -1L;
}
}
// 过滤过期的请求(潜在的集群时间一致性需求,重试跨Worker时,时间不一致可能导致问题)
if (lastReportTime > reportTime) {
log.warn("[TaskTracker-{}-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.",
instanceId, subInstanceId, lastReportTime, reportTime, taskId, newStatus);
return;
}
// 此时本次请求已经有效,先写入最新的时间
taskId2LastReportTime.put(taskId, reportTime);
// 处理失败的情况
int configTaskRetryNum = instanceInfo.getTaskRetryNum();
if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED && configTaskRetryNum >= 1) {
// 失败不是主要的情况,多查一次数据库也问题不大(况且前面有缓存顶着,大部分情况之前不会去查DB)
Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);
// 查询DB再失败的话,就不重试了...
if (taskOpt.isPresent()) {
int failedCnt = taskOpt.get().getFailedCnt();
if (failedCnt < configTaskRetryNum) {
TaskDO updateEntity = new TaskDO();
updateEntity.setFailedCnt(failedCnt + 1);
/*
地址规则:
1. 当前存储的地址为任务派发的目的地(ProcessorTracker地址)
2. 根任务、最终任务必须由TaskTracker所在机器执行(如果是根任务和最终任务,不应当修改地址)
3. 广播任务每台机器都需要执行,因此不应该重新分配worker(广播任务不应当修改地址)
*/
String taskName = taskOpt.get().getTaskName();
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
if (!taskName.equals(TaskConstant.ROOT_TASK_NAME) && !taskName.equals(TaskConstant.LAST_TASK_NAME) && executeType != ExecuteType.BROADCAST) {
updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
}
updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
updateEntity.setLastReportTime(reportTime);
//【改造点】计算重试间隔时间,此处固定失败次数*5,后期可以改造为配置到Task上 动态获取配置
Long nextTime = updateEntity.getFailedCnt() * 5L;
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(10);
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
boolean retryTask = taskPersistenceService.updateTask(instanceId, taskId, updateEntity);
if (retryTask) {
log.info("[TaskTracker-{}-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, subInstanceId, taskId);
return;
}
}
}, nextTime, TimeUnit.SECONDS);
return;
}
}
}
// 更新状态(失败重试写入DB失败的,也就不重试了...谁让你那么倒霉呢...)
result = result == null ? "" : result;
boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result);
if (!updateResult) {
log.warn("[TaskTracker-{}-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, subInstanceId, taskId);
}
} catch (InterruptedException ignore) {
// ignore
} catch (Exception e) {
log.warn("[TaskTracker-{}-{}] update task status failed.", instanceId, subInstanceId, e);
} finally {
segmentLock.unlock(lockId);
}
}
2. 工作流增加where跳转
实现根据上个节点的返回值来决定连线的下个节点是否需要跳过
示例 {“from”:202,“to”:206,“where”:"{value!='skipTask'}"}
{value!=‘上个节点的值’},value为关键字 条件和值自己填充
PEWorkflowDAG.java 【改造点】
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Edge implements Serializable {
private Long from;
private Long to;
private String where;【改造点 线上增加条件】
}
WorkflowDAGUtils.java 【改造点】
public static List<PEWorkflowDAG.Node> listReadyNodes(PEWorkflowDAG dag) {
// 保存 nodeId -> Node 的映射关系
Map<Long, PEWorkflowDAG.Node> nodeId2Node = Maps.newHashMap();
List<PEWorkflowDAG.Node> dagNodes = dag.getNodes();
for (PEWorkflowDAG.Node node : dagNodes) {
nodeId2Node.put(node.getNodeId(), node);
}
// 构建依赖树(下游任务需要哪些上游任务完成才能执行)
Multimap<Long, Long> relyMap = LinkedListMultimap.create();
Multimap<Long, PEWorkflowDAG.Edge> relyWhereMap = LinkedListMultimap.create();
// 后继节点 Map
Multimap<Long, Long> successorMap = LinkedListMultimap.create();
dag.getEdges().forEach(edge -> {
relyMap.put(edge.getTo(), edge.getFrom());
relyWhereMap.put(edge.getTo(), edge);
successorMap.put(edge.getFrom(), edge.getTo());
});
List<PEWorkflowDAG.Node> readyNodes = Lists.newArrayList();
List<PEWorkflowDAG.Node> skipNodes = Lists.newArrayList();
for (PEWorkflowDAG.Node currentNode : dagNodes) {
if (!isReadyNode(currentNode.getNodeId(), nodeId2Node, relyMap)) {
continue;
}
// 需要直接跳过的节点
if (currentNode.getEnable() != null && !currentNode.getEnable()) {
skipNodes.add(currentNode);
} else if (isNeedSkipNode(currentNode.getNodeId(), nodeId2Node, relyWhereMap, successorMap)) {
//【改造点】 新增了isNeedSkipNode 逻辑依然沿用了原有的skipNodes
skipNodes.add(currentNode);
} else {
readyNodes.add(currentNode);
}
}
// 当前直接跳过的节点不为空
if (!skipNodes.isEmpty()) {
for (PEWorkflowDAG.Node skipNode : skipNodes) {
// move
readyNodes.addAll(moveAndObtainReadySuccessor(skipNode, nodeId2Node, relyMap, successorMap, relyWhereMap));
}
}
return readyNodes;
}
/**
* @return boolean
* @Description 是否需要跳过节点
* @Author zhanghan
* @Date 2022/7/12 14:18
* @Param [nodeId, nodeId2Node, relyMap]
**/
private static boolean isNeedSkipNode(long nodeId, Map<Long, PEWorkflowDAG.Node> nodeId2Node, Multimap<Long, PEWorkflowDAG.Edge> relyWhereMap, Multimap<Long, Long> successorMap) {
Collection<PEWorkflowDAG.Edge> relyNodeEdges = relyWhereMap.get(nodeId);
int fromDisabled = 0;
for (PEWorkflowDAG.Edge relyNodeEdge : relyNodeEdges) {
PEWorkflowDAG.Node relyNode = nodeId2Node.get(relyNodeEdge.getFrom());
int relyNodeStatus = relyNode.getStatus() == null ? InstanceStatus.WAITING_DISPATCH.getV() : relyNode.getStatus();
// 一定是成功状态,必须拿到返回值
if (InstanceStatus.SUCCEED.getV() == relyNodeStatus && StringUtils.isNotEmpty(relyNode.getResult())) {
//若父级为disabled , 则记录flag
if (relyNode.getResult().equals(SystemInstanceResult.DISABLE_NODE)) {
fromDisabled++;
}
//判断where 条件
if (StringUtils.isNotEmpty(relyNodeEdge.getWhere())) {
String skipExpress = relyNodeEdge.getWhere();
Map map = new HashMap<>();
map.put("value", relyNode.getResult());
if (expressionParsing(skipExpress, map)) {
fromDisabled++;
}
//return expressionParsing(skipExpress, map);
}
}
}
//若所有来源节点 皆为disabled,那么当前节点也需要跳过
if (fromDisabled > 0 && fromDisabled == relyNodeEdges.size()) {
return true;
}
return false;
}
public static Boolean expressionParsing(String skipExpress, Map map) {
if (StringUtils.isBlank(skipExpress) && map.isEmpty()) {
return false;
}
ExpressionParser parser = new SpelExpressionParser();
StandardEvaluationContext context = new StandardEvaluationContext();
TemplateParserContext templateParserContext = new TemplateParserContext("${", "}");
MapAccessor propertyAccessor = new MapAccessor();
context.setVariables(map);
context.setPropertyAccessors(Arrays.asList(propertyAccessor));
SpelExpression expression = (SpelExpression) parser.parseExpression(skipExpress, templateParserContext);
expression.setEvaluationContext(context);
boolean value = expression.getValue(map, boolean.class);
return value;
}
3. 工作流超过实例数进行排队,轮训执行
目前当工作流实例数超过设定的值 直接就报错,改造为 超过实例数给个新的状态,然后轮训去处理这些数据
WorkflowInstanceStatus.java【改造点】
@Getter
@AllArgsConstructor
public enum WorkflowInstanceStatus {
/**
* 初始状态为等待调度
*/
WAITING(1, "等待调度"),
RUNNING(2, "运行中"),
FAILED(3, "失败"),
SUCCEED(4, "成功"),
WAITING_INSTANCE(5, "等待实例"), //改造点新增状态
STOPPED(10, "手动停止");
/**
* 广义的运行状态
*/
public static final List<Integer> GENERALIZED_RUNNING_STATUS = Lists.newArrayList(WAITING.v, RUNNING.v);
/**
* 结束状态
*/
public static final List<Integer> FINISHED_STATUS = Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v);
private final int v;
private final String des;
public static WorkflowInstanceStatus of(int v) {
for (WorkflowInstanceStatus is : values()) {
if (v == is.v) {
return is;
}
}
throw new IllegalArgumentException("WorkflowInstanceStatus has no item for value " + v);
}
}
WorkflowInstanceManager.java【改造点】
@UseSegmentLock(type = "startWfInstance", key = "#wfInfo.getId().intValue()", concurrencyLevel = 1024)
public void start(WorkflowInfoDO wfInfo, Long wfInstanceId) {
Optional<WorkflowInstanceInfoDO> wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId);
if (!wfInstanceInfoOpt.isPresent()) {
log.error("[WorkflowInstanceManager] can't find metadata by workflowInstanceId({}).", wfInstanceId);
return;
}
WorkflowInstanceInfoDO wfInstanceInfo = wfInstanceInfoOpt.get();
// 不是等待中,不再继续执行(可能上一流程已经失败)
if (wfInstanceInfo.getStatus() != WorkflowInstanceStatus.WAITING.getV()) {
log.info("[Workflow-{}|{}] workflowInstance({}) needn't running any more.", wfInfo.getId(), wfInstanceId, wfInstanceInfo);
return;
}
// 最大实例数量 <= 0 表示不限制
if (wfInfo.getMaxWfInstanceNum() > 0) {
// 并发度控制
int instanceConcurrency = workflowInstanceInfoRepository.countByWorkflowIdAndStatusIn(wfInfo.getId(), WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS);
if (instanceConcurrency > wfInfo.getMaxWfInstanceNum()) {
//改造点 超过实例限制数 则开启等待
onWorkflowInstanceWaiting(wfInstanceInfo);
return;
}
}
try {
// 从实例中读取工作流信息
PEWorkflowDAG dag = JSON.parseObject(wfInstanceInfo.getDag(), PEWorkflowDAG.class);
// 根节点有可能被 disable
List<PEWorkflowDAG.Node> readyNodes = WorkflowDAGUtils.listReadyNodes(dag);
// 创建所有的根任务
readyNodes.forEach(readyNode -> {
// 注意:这里必须保证任务实例全部创建成功,如果在这里创建实例部分失败,会导致 DAG 信息不会更新,已经生成的实例节点在工作流日志中没法展示
// instanceParam 传递的是工作流实例的 wfContext
Long instanceId = instanceService.create(readyNode.getJobId(), readyNode.getJobCode(), wfInfo.getAppId(), readyNode.getNodeParams(), wfInstanceInfo.getWfContext(), wfInstanceId, System.currentTimeMillis());
readyNode.setInstanceId(instanceId);
readyNode.setStatus(InstanceStatus.RUNNING.getV());
log.info("[Workflow-{}|{}] create readyNode instance(nodeId={},jobId={},instanceId={}) successfully~", wfInfo.getId(), wfInstanceId, readyNode.getNodeId(), readyNode.getJobId(), instanceId);
});
// 持久化
wfInstanceInfo.setStatus(WorkflowInstanceStatus.RUNNING.getV());
wfInstanceInfo.setDag(JSON.toJSONString(dag));
if (readyNodes.isEmpty()) {
// 没有就绪的节点(所有节点都被禁用)
wfInstanceInfo.setStatus(WorkflowInstanceStatus.SUCCEED.getV());
wfInstanceInfo.setResult(SystemInstanceResult.NO_ENABLED_NODES);
wfInstanceInfo.setFinishedTime(System.currentTimeMillis());
log.warn("[Workflow-{}|{}] workflowInstance({}) needn't running ", wfInfo.getId(), wfInstanceId, wfInstanceInfo);
workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo);
return;
}
workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo);
log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId);
// 真正开始执行根任务
readyNodes.forEach(this::runInstance);
} catch (Exception e) {
log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e);
onWorkflowInstanceFailed(e.getMessage(), wfInstanceInfo);
}
}
private void onWorkflowInstanceWaiting(WorkflowInstanceInfoDO wfInstance) {
wfInstance.setStatus(WorkflowInstanceStatus.WAITING_INSTANCE.getV());
wfInstance.setGmtModified(new Date());
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
}
PowerScheduleService.java 【改造点】
@Async("omsTimingPool")
@Scheduled(fixedDelay = SCHEDULE_RATE)
public void timingSchedule() {
long start = System.currentTimeMillis();
Stopwatch stopwatch = Stopwatch.createStarted();
// 先查询DB,查看本机需要负责的任务
List<AppInfoDO> allAppInfos = appInfoRepository.findAllByCurrentServer(AkkaStarter.getActorSystemAddress());
if (CollectionUtils.isEmpty(allAppInfos)) {
log.info("[JobScheduleService] current server has no app's job to schedule.");
return;
}
List<Long> allAppIds = allAppInfos.stream().map(AppInfoDO::getId).collect(Collectors.toList());
// 清理不需要维护的数据
WorkerClusterManagerService.clean(allAppIds);
// 调度 CRON 表达式 JOB
try {
scheduleCronJob(allAppIds);
} catch (Exception e) {
log.error("[CronScheduler] schedule cron job failed.", e);
}
String cronTime = stopwatch.toString();
stopwatch.reset().start();
// 调度 workflow 任务
try {
scheduleWorkflow(allAppIds);
} catch (Exception e) {
log.error("[WorkflowScheduler] schedule workflow job failed.", e);
}
String wfTime = stopwatch.toString();
stopwatch.reset().start();
//【改造点】调度 workflow instance 超实例等待的任务
try {
scheduleWorkflowInstance(allAppIds);
} catch (Exception e) {
log.error("[WorkflowInstanceScheduler] schedule workflow instance failed.", e);
}
// 调度 秒级任务
try {
scheduleFrequentJob(allAppIds);
} catch (Exception e) {
log.error("[FrequentScheduler] schedule frequent job failed.", e);
}
log.info("[JobScheduleService] cron schedule: {}, workflow schedule: {}, frequent schedule: {}.", cronTime, wfTime, stopwatch.stop());
long cost = System.currentTimeMillis() - start;
if (cost > SCHEDULE_RATE) {
log.warn("[JobScheduleService] The database query is using too much time({}ms), please check if the database load is too high!", cost);
}
}
/**
* 调度超过任务实例数的 工作流实例
*
* @param appIds
*/
private void scheduleWorkflowInstance(List<Long> appIds) {
Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> {
//先查询需要从新调度的工作流实例
List<WorkflowInstanceInfoDO> workflowInstanceInfoList = wfInstanceInfoRepository.findByAppIdInAndStatus(partAppIds, WorkflowInstanceStatus.WAITING_INSTANCE.getV());
if (CollectionUtils.isEmpty(workflowInstanceInfoList)) {
return;
}
//转为Map List 下面减少循环查询
Map<Long, List<WorkflowInstanceInfoDO>> workflowInstanceInfoMap = workflowInstanceInfoList.stream().collect(Collectors.groupingBy(WorkflowInstanceInfoDO::getWorkflowId));
//取出工作流ID 查询 工作流的实例数限制
Set<Long> workflowIds = workflowInstanceInfoList.stream().map(v -> v.getWorkflowId()).collect(Collectors.toSet());
List<WorkflowInfoDO> wfInfos = workflowInfoRepository.findByIdInAndStatus(workflowIds, SwitchableStatus.ENABLE.getV());
if (CollectionUtils.isEmpty(wfInfos)) {
//理论上这里一定有值
return;
}
//转为Map 方便取出当前实例的限制数
Map<Long, Integer> wfInfoMap = wfInfos.stream().collect(Collectors.toMap(k -> k.getId(), v -> v.getMaxWfInstanceNum()));
workflowIds.forEach(v -> {
//当前实例数
int currentInstances = wfInstanceInfoRepository.countByWorkflowIdAndStatusIn(v, WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS);
//最大实例数
Integer maxInstances = wfInfoMap.get(v);
//需要开启的实例数
Integer needToStartNum = maxInstances - currentInstances;
if (needToStartNum > 0) {
List<WorkflowInstanceInfoDO> needToStartList = workflowInstanceInfoMap.get(v);
if (needToStartNum < needToStartList.size()) {
needToStartList = needToStartList.subList(0,needToStartNum);
}
if (CollectionUtils.isEmpty(needToStartList)) {
return;
}
needToStartList.forEach(wfInfo -> {
workflowInstanceService.retryWorkflowInstance(wfInfo.getWfInstanceId(), wfInfo.getAppId());
});
}
});
});
}
评论区