侧边栏壁纸
博主头像
zcarry博主等级

BUG,永无止境

  • 累计撰写 13 篇文章
  • 累计创建 28 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

PowerJob升级 重试间隔策略&工作流where跳转&工作流超实例数轮训执行

zcarry
2022-08-23 / 0 评论 / 0 点赞 / 9,933 阅读 / 2,534 字 / 正在检测是否收录...

Powerjob

开源项目地址 http://www.powerjob.tech/
很强大的一个开源调度项目,但很久没有升级了。这里根据实际使用情况进行了几处改造
image

升级

1. 重试间隔策略

TaskTracker.java【改造点】

   public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) {

        if (finished.get()) {
            return;
        }
        TaskStatus nTaskStatus = TaskStatus.of(newStatus);

        int lockId = taskId.hashCode();
        try {

            // 阻塞获取锁
            segmentLock.lockInterruptible(lockId);

            Long lastReportTime = taskId2LastReportTime.getIfPresent(taskId);

            // 缓存中不存在,从数据库查
            if (lastReportTime == null) {
                Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);
                if (taskOpt.isPresent()) {
                    lastReportTime = taskOpt.get().getLastReportTime();
                } else {
                    // 理论上不存在这种情况,除非数据库异常
                    log.error("[TaskTracker-{}-{}] can't find task by taskId={}.", instanceId, subInstanceId, taskId);
                }

                if (lastReportTime == null) {
                    lastReportTime = -1L;
                }
            }

            // 过滤过期的请求(潜在的集群时间一致性需求,重试跨Worker时,时间不一致可能导致问题)
            if (lastReportTime > reportTime) {
                log.warn("[TaskTracker-{}-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.",
                        instanceId, subInstanceId, lastReportTime, reportTime, taskId, newStatus);
                return;
            }

            // 此时本次请求已经有效,先写入最新的时间
            taskId2LastReportTime.put(taskId, reportTime);

            // 处理失败的情况
            int configTaskRetryNum = instanceInfo.getTaskRetryNum();
            if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED && configTaskRetryNum >= 1) {

                // 失败不是主要的情况,多查一次数据库也问题不大(况且前面有缓存顶着,大部分情况之前不会去查DB)
                Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);
                // 查询DB再失败的话,就不重试了...
                if (taskOpt.isPresent()) {
                    int failedCnt = taskOpt.get().getFailedCnt();
                    if (failedCnt < configTaskRetryNum) {

                        TaskDO updateEntity = new TaskDO();
                        updateEntity.setFailedCnt(failedCnt + 1);

                        /*
                        地址规则:
                        1. 当前存储的地址为任务派发的目的地(ProcessorTracker地址)
                        2. 根任务、最终任务必须由TaskTracker所在机器执行(如果是根任务和最终任务,不应当修改地址)
                        3. 广播任务每台机器都需要执行,因此不应该重新分配worker(广播任务不应当修改地址)
                         */
                        String taskName = taskOpt.get().getTaskName();
                        ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
                        if (!taskName.equals(TaskConstant.ROOT_TASK_NAME) && !taskName.equals(TaskConstant.LAST_TASK_NAME) && executeType != ExecuteType.BROADCAST) {
                            updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
                        }

                        updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
                        updateEntity.setLastReportTime(reportTime);
                        //【改造点】计算重试间隔时间,此处固定失败次数*5,后期可以改造为配置到Task上 动态获取配置
                        Long nextTime = updateEntity.getFailedCnt() * 5L;
                        ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(10);
                        scheduledExecutorService.schedule(new Runnable() {
                            @Override
                            public void run() {
                                boolean retryTask = taskPersistenceService.updateTask(instanceId, taskId, updateEntity);
                                if (retryTask) {
                                    log.info("[TaskTracker-{}-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, subInstanceId, taskId);
                                    return;
                                }
                            }
                        }, nextTime, TimeUnit.SECONDS);
                        return;
                    }
                }
            }

            // 更新状态(失败重试写入DB失败的,也就不重试了...谁让你那么倒霉呢...)
            result = result == null ? "" : result;
            boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result);

            if (!updateResult) {
                log.warn("[TaskTracker-{}-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, subInstanceId, taskId);
            }

        } catch (InterruptedException ignore) {
            // ignore
        } catch (Exception e) {
            log.warn("[TaskTracker-{}-{}] update task status failed.", instanceId, subInstanceId, e);
        } finally {
            segmentLock.unlock(lockId);
        }
    }

2. 工作流增加where跳转

实现根据上个节点的返回值来决定连线的下个节点是否需要跳过
示例 {“from”:202,“to”:206,“where”:"{value!='skipTask'}"} {value!=‘上个节点的值’},value为关键字 条件和值自己填充
PEWorkflowDAG.java 【改造点】

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Edge implements Serializable {
        private Long from;
        private Long to;
        private String where;【改造点 线上增加条件】
    }

WorkflowDAGUtils.java 【改造点】

public static List<PEWorkflowDAG.Node> listReadyNodes(PEWorkflowDAG dag) {
        // 保存 nodeId -> Node 的映射关系
        Map<Long, PEWorkflowDAG.Node> nodeId2Node = Maps.newHashMap();

        List<PEWorkflowDAG.Node> dagNodes = dag.getNodes();
        for (PEWorkflowDAG.Node node : dagNodes) {
            nodeId2Node.put(node.getNodeId(), node);
        }
        // 构建依赖树(下游任务需要哪些上游任务完成才能执行)
        Multimap<Long, Long> relyMap = LinkedListMultimap.create();
        Multimap<Long, PEWorkflowDAG.Edge> relyWhereMap = LinkedListMultimap.create();
        // 后继节点 Map
        Multimap<Long, Long> successorMap = LinkedListMultimap.create();
        dag.getEdges().forEach(edge -> {
            relyMap.put(edge.getTo(), edge.getFrom());
            relyWhereMap.put(edge.getTo(), edge);
            successorMap.put(edge.getFrom(), edge.getTo());

        });
        List<PEWorkflowDAG.Node> readyNodes = Lists.newArrayList();
        List<PEWorkflowDAG.Node> skipNodes = Lists.newArrayList();

        for (PEWorkflowDAG.Node currentNode : dagNodes) {
            if (!isReadyNode(currentNode.getNodeId(), nodeId2Node, relyMap)) {
                continue;
            }
            // 需要直接跳过的节点
            if (currentNode.getEnable() != null && !currentNode.getEnable()) {
                skipNodes.add(currentNode);
            } else if (isNeedSkipNode(currentNode.getNodeId(), nodeId2Node, relyWhereMap, successorMap)) {
               //【改造点】 新增了isNeedSkipNode 逻辑依然沿用了原有的skipNodes
                skipNodes.add(currentNode);
            } else {
                readyNodes.add(currentNode);
            }
        }
        // 当前直接跳过的节点不为空
        if (!skipNodes.isEmpty()) {
            for (PEWorkflowDAG.Node skipNode : skipNodes) {
                // move
                readyNodes.addAll(moveAndObtainReadySuccessor(skipNode, nodeId2Node, relyMap, successorMap, relyWhereMap));
            }
        }
        return readyNodes;
    }
    
    /**
     * @return boolean
     * @Description 是否需要跳过节点
     * @Author zhanghan
     * @Date 2022/7/12 14:18
     * @Param [nodeId, nodeId2Node, relyMap]
     **/
    private static boolean isNeedSkipNode(long nodeId, Map<Long, PEWorkflowDAG.Node> nodeId2Node, Multimap<Long, PEWorkflowDAG.Edge> relyWhereMap, Multimap<Long, Long> successorMap) {
        Collection<PEWorkflowDAG.Edge> relyNodeEdges = relyWhereMap.get(nodeId);
        int fromDisabled = 0;
        for (PEWorkflowDAG.Edge relyNodeEdge : relyNodeEdges) {
            PEWorkflowDAG.Node relyNode = nodeId2Node.get(relyNodeEdge.getFrom());
            int relyNodeStatus = relyNode.getStatus() == null ? InstanceStatus.WAITING_DISPATCH.getV() : relyNode.getStatus();
            // 一定是成功状态,必须拿到返回值
            if (InstanceStatus.SUCCEED.getV() == relyNodeStatus && StringUtils.isNotEmpty(relyNode.getResult())) {
                //若父级为disabled , 则记录flag
                if (relyNode.getResult().equals(SystemInstanceResult.DISABLE_NODE)) {
                    fromDisabled++;
                }
                //判断where 条件
                if (StringUtils.isNotEmpty(relyNodeEdge.getWhere())) {
                    String skipExpress = relyNodeEdge.getWhere();
                    Map map = new HashMap<>();
                    map.put("value", relyNode.getResult());
                    if (expressionParsing(skipExpress, map)) {
                        fromDisabled++;
                    }
                    //return expressionParsing(skipExpress, map);
                }
            }
        }
        //若所有来源节点 皆为disabled,那么当前节点也需要跳过
        if (fromDisabled > 0 && fromDisabled == relyNodeEdges.size()) {
            return true;
        }
        return false;
    }
    
    public static Boolean expressionParsing(String skipExpress, Map map) {
        if (StringUtils.isBlank(skipExpress) && map.isEmpty()) {
            return false;
        }
        ExpressionParser parser = new SpelExpressionParser();
        StandardEvaluationContext context = new StandardEvaluationContext();

        TemplateParserContext templateParserContext = new TemplateParserContext("${", "}");
        MapAccessor propertyAccessor = new MapAccessor();
        context.setVariables(map);
        context.setPropertyAccessors(Arrays.asList(propertyAccessor));

        SpelExpression expression = (SpelExpression) parser.parseExpression(skipExpress, templateParserContext);
        expression.setEvaluationContext(context);
        boolean value = expression.getValue(map, boolean.class);
        return value;
    }
    

3. 工作流超过实例数进行排队,轮训执行

目前当工作流实例数超过设定的值 直接就报错,改造为 超过实例数给个新的状态,然后轮训去处理这些数据
WorkflowInstanceStatus.java【改造点】

@Getter
@AllArgsConstructor
public enum WorkflowInstanceStatus {
    /**
     * 初始状态为等待调度
     */
    WAITING(1, "等待调度"),
    RUNNING(2, "运行中"),
    FAILED(3, "失败"),
    SUCCEED(4, "成功"),
    WAITING_INSTANCE(5, "等待实例"), //改造点新增状态
    STOPPED(10, "手动停止");

    /**
     * 广义的运行状态
     */
    public static final List<Integer> GENERALIZED_RUNNING_STATUS = Lists.newArrayList(WAITING.v, RUNNING.v);
    /**
     * 结束状态
     */
    public static final List<Integer> FINISHED_STATUS = Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v);

    private final int v;

    private final String des;

    public static WorkflowInstanceStatus of(int v) {
        for (WorkflowInstanceStatus is : values()) {
            if (v == is.v) {
                return is;
            }
        }
        throw new IllegalArgumentException("WorkflowInstanceStatus has no item for value " + v);
    }
}

WorkflowInstanceManager.java【改造点】

 @UseSegmentLock(type = "startWfInstance", key = "#wfInfo.getId().intValue()", concurrencyLevel = 1024)
    public void start(WorkflowInfoDO wfInfo, Long wfInstanceId) {

        Optional<WorkflowInstanceInfoDO> wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId);
        if (!wfInstanceInfoOpt.isPresent()) {
            log.error("[WorkflowInstanceManager] can't find metadata by workflowInstanceId({}).", wfInstanceId);
            return;
        }
        WorkflowInstanceInfoDO wfInstanceInfo = wfInstanceInfoOpt.get();

        // 不是等待中,不再继续执行(可能上一流程已经失败)
        if (wfInstanceInfo.getStatus() != WorkflowInstanceStatus.WAITING.getV()) {
            log.info("[Workflow-{}|{}] workflowInstance({}) needn't running any more.", wfInfo.getId(), wfInstanceId, wfInstanceInfo);
            return;
        }
        // 最大实例数量 <= 0 表示不限制
        if (wfInfo.getMaxWfInstanceNum() > 0) {
            // 并发度控制
            int instanceConcurrency = workflowInstanceInfoRepository.countByWorkflowIdAndStatusIn(wfInfo.getId(), WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS);
            if (instanceConcurrency > wfInfo.getMaxWfInstanceNum()) {               
                //改造点 超过实例限制数 则开启等待
                onWorkflowInstanceWaiting(wfInstanceInfo);
                return;
            }
        }

        try {
            // 从实例中读取工作流信息
            PEWorkflowDAG dag = JSON.parseObject(wfInstanceInfo.getDag(), PEWorkflowDAG.class);
            // 根节点有可能被 disable
            List<PEWorkflowDAG.Node> readyNodes = WorkflowDAGUtils.listReadyNodes(dag);
            // 创建所有的根任务
            readyNodes.forEach(readyNode -> {
                // 注意:这里必须保证任务实例全部创建成功,如果在这里创建实例部分失败,会导致 DAG 信息不会更新,已经生成的实例节点在工作流日志中没法展示
                // instanceParam 传递的是工作流实例的 wfContext
                Long instanceId = instanceService.create(readyNode.getJobId(), readyNode.getJobCode(),  wfInfo.getAppId(), readyNode.getNodeParams(), wfInstanceInfo.getWfContext(), wfInstanceId, System.currentTimeMillis());
                readyNode.setInstanceId(instanceId);
                readyNode.setStatus(InstanceStatus.RUNNING.getV());

                log.info("[Workflow-{}|{}] create readyNode instance(nodeId={},jobId={},instanceId={}) successfully~", wfInfo.getId(), wfInstanceId, readyNode.getNodeId(), readyNode.getJobId(), instanceId);
            });

            // 持久化
            wfInstanceInfo.setStatus(WorkflowInstanceStatus.RUNNING.getV());
            wfInstanceInfo.setDag(JSON.toJSONString(dag));
            if (readyNodes.isEmpty()) {
                // 没有就绪的节点(所有节点都被禁用)
                wfInstanceInfo.setStatus(WorkflowInstanceStatus.SUCCEED.getV());
                wfInstanceInfo.setResult(SystemInstanceResult.NO_ENABLED_NODES);
                wfInstanceInfo.setFinishedTime(System.currentTimeMillis());
                log.warn("[Workflow-{}|{}] workflowInstance({}) needn't running ", wfInfo.getId(), wfInstanceId, wfInstanceInfo);
                workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo);
                return;
            }
            workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo);
            log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId);

            // 真正开始执行根任务
            readyNodes.forEach(this::runInstance);
        } catch (Exception e) {
            log.error("[Workflow-{}|{}] submit workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e);
            onWorkflowInstanceFailed(e.getMessage(), wfInstanceInfo);
        }
    }
    
    private void onWorkflowInstanceWaiting(WorkflowInstanceInfoDO wfInstance) {
        wfInstance.setStatus(WorkflowInstanceStatus.WAITING_INSTANCE.getV());
        wfInstance.setGmtModified(new Date());
        workflowInstanceInfoRepository.saveAndFlush(wfInstance);
    }

PowerScheduleService.java 【改造点】

 @Async("omsTimingPool")
    @Scheduled(fixedDelay = SCHEDULE_RATE)
    public void timingSchedule() {

        long start = System.currentTimeMillis();
        Stopwatch stopwatch = Stopwatch.createStarted();

        // 先查询DB,查看本机需要负责的任务
        List<AppInfoDO> allAppInfos = appInfoRepository.findAllByCurrentServer(AkkaStarter.getActorSystemAddress());
        if (CollectionUtils.isEmpty(allAppInfos)) {
            log.info("[JobScheduleService] current server has no app's job to schedule.");
            return;
        }
        List<Long> allAppIds = allAppInfos.stream().map(AppInfoDO::getId).collect(Collectors.toList());
        // 清理不需要维护的数据
        WorkerClusterManagerService.clean(allAppIds);

        // 调度 CRON 表达式 JOB
        try {
            scheduleCronJob(allAppIds);
        } catch (Exception e) {
            log.error("[CronScheduler] schedule cron job failed.", e);
        }
        String cronTime = stopwatch.toString();
        stopwatch.reset().start();

        // 调度 workflow 任务
        try {
            scheduleWorkflow(allAppIds);
        } catch (Exception e) {
            log.error("[WorkflowScheduler] schedule workflow job failed.", e);
        }
        String wfTime = stopwatch.toString();
        stopwatch.reset().start();

        //【改造点】调度 workflow instance 超实例等待的任务

        try {
            scheduleWorkflowInstance(allAppIds);
        } catch (Exception e) {
            log.error("[WorkflowInstanceScheduler] schedule workflow instance failed.", e);
        }

        // 调度 秒级任务
        try {
            scheduleFrequentJob(allAppIds);
        } catch (Exception e) {
            log.error("[FrequentScheduler] schedule frequent job failed.", e);
        }

        log.info("[JobScheduleService] cron schedule: {}, workflow schedule: {}, frequent schedule: {}.", cronTime, wfTime, stopwatch.stop());

        long cost = System.currentTimeMillis() - start;
        if (cost > SCHEDULE_RATE) {
            log.warn("[JobScheduleService] The database query is using too much time({}ms), please check if the database load is too high!", cost);
        }
    }
    /**
     * 调度超过任务实例数的 工作流实例
     *
     * @param appIds
     */
    private void scheduleWorkflowInstance(List<Long> appIds) {
        Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> {
            //先查询需要从新调度的工作流实例
            List<WorkflowInstanceInfoDO> workflowInstanceInfoList = wfInstanceInfoRepository.findByAppIdInAndStatus(partAppIds, WorkflowInstanceStatus.WAITING_INSTANCE.getV());
            if (CollectionUtils.isEmpty(workflowInstanceInfoList)) {
                return;
            }
            //转为Map List 下面减少循环查询
            Map<Long, List<WorkflowInstanceInfoDO>> workflowInstanceInfoMap = workflowInstanceInfoList.stream().collect(Collectors.groupingBy(WorkflowInstanceInfoDO::getWorkflowId));
            //取出工作流ID 查询 工作流的实例数限制
            Set<Long> workflowIds = workflowInstanceInfoList.stream().map(v -> v.getWorkflowId()).collect(Collectors.toSet());
            List<WorkflowInfoDO> wfInfos = workflowInfoRepository.findByIdInAndStatus(workflowIds, SwitchableStatus.ENABLE.getV());
            if (CollectionUtils.isEmpty(wfInfos)) {
                //理论上这里一定有值
                return;
            }
            //转为Map 方便取出当前实例的限制数
            Map<Long, Integer> wfInfoMap = wfInfos.stream().collect(Collectors.toMap(k -> k.getId(), v -> v.getMaxWfInstanceNum()));
            workflowIds.forEach(v -> {
                //当前实例数
                int currentInstances = wfInstanceInfoRepository.countByWorkflowIdAndStatusIn(v, WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS);
                //最大实例数
                Integer maxInstances = wfInfoMap.get(v);
                //需要开启的实例数
                Integer needToStartNum = maxInstances - currentInstances;
                if (needToStartNum > 0) {
                    List<WorkflowInstanceInfoDO> needToStartList = workflowInstanceInfoMap.get(v);
                    if (needToStartNum < needToStartList.size()) {
                        needToStartList = needToStartList.subList(0,needToStartNum);
                    }
                    if (CollectionUtils.isEmpty(needToStartList)) {
                        return;
                    }
                    needToStartList.forEach(wfInfo -> {
                        workflowInstanceService.retryWorkflowInstance(wfInfo.getWfInstanceId(), wfInfo.getAppId());
                    });
                }
            });
        });
    }

0

评论区