Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

循环节点 #14

Open
cookie2013 opened this issue May 21, 2024 · 6 comments
Open

循环节点 #14

cookie2013 opened this issue May 21, 2024 · 6 comments

Comments

@cookie2013
Copy link

循环节点应该如何编排?

@ytyht226
Copy link
Owner

循环节点应该如何编排?

是说某个节点需要执行多次?目前引擎在编排层面不支持这种方式,不过你可以在op的具体实现里面控制循环的次数。另外,循环节点的使用场景可以举例说明一下么,我看看后续版本是否需要支持?

@cookie2013
Copy link
Author

循环节点应该如何编排?

是说某个节点需要执行多次?目前引擎在编排层面不支持这种方式,不过你可以在op的具体实现里面控制循环的次数。另外,循环节点的使用场景可以举例说明一下么,我看看后续版本是否需要支持?

是的。比如不断判断某个节点的返回数据是否有值,如果还有值,则参数+1后继续执行该节点,直到取出所有数据。比如某个http调用是需要分页,不断查询的

@ytyht226
Copy link
Owner

循环节点应该如何编排?

是说某个节点需要执行多次?目前引擎在编排层面不支持这种方式,不过你可以在op的具体实现里面控制循环的次数。另外,循环节点的使用场景可以举例说明一下么,我看看后续版本是否需要支持?

是的。比如不断判断某个节点的返回数据是否有值,如果还有值,则参数+1后继续执行该节点,直到取出所有数据。比如某个http调用是需要分页,不断查询的

除了上面说的可以直接在op逻辑内部实现循环控制次数,还可以结合op的回调接口实现,下面是一个例子,可以参考:
CycleTest代码实现:
image
image
Operator1代码实现:
image
Operator2代码实现:
image

@cookie2013
Copy link
Author

next的下个节点,加了judge是false的也会执行,是否有问题

@cookie2013
Copy link
Author

cookie2013 commented May 22, 2024

next的下个节点,加了judge是false的也会执行,是否有问题

@Test
    public void test2() {
        DagEngine engine = new DagEngine(executor);
        int param = 1;
        OperatorWrapper<Integer, Integer> wrapper1 = new OperatorWrapper<Integer, Integer>()
                .id("1")
                .engine(engine)
                .operator(operator1)
                .context(param)
                .next("2")
                ;
        OperatorWrapper<Integer, Integer> wrapper2 = new OperatorWrapper<Integer, Integer>()
                .id("2")
                .engine(engine)
                .operator(operator2)
                .addParamFromWrapperId("1")
                .condition(new Wrapper2Condition())
                ;
        engine.runAndWait(900_000);
    }

private static class Wrapper2Condition implements ICondition {

        @Override
        public boolean judge(OperatorWrapper wrapper) {
            OperatorResult<Integer> wrapper1Result = DagContextHolder.getOperatorResult("1");
            int result = 0;
            if (wrapper1Result != null && wrapper1Result.getResultState() == ResultState.SUCCESS) {
                result = wrapper1Result.getResult();
            }
            System.out.println("Wrapper2Condition result: " + result);
            return result == 0;
        }
    }

@ytyht226
Copy link
Owner

next的下个节点,加了judge是false的也会执行,是否有问题

@Test
    public void test2() {
        DagEngine engine = new DagEngine(executor);
        int param = 1;
        OperatorWrapper<Integer, Integer> wrapper1 = new OperatorWrapper<Integer, Integer>()
                .id("1")
                .engine(engine)
                .operator(operator1)
                .context(param)
                .next("2")
                ;
        OperatorWrapper<Integer, Integer> wrapper2 = new OperatorWrapper<Integer, Integer>()
                .id("2")
                .engine(engine)
                .operator(operator2)
                .addParamFromWrapperId("1")
                .condition(new Wrapper2Condition())
                ;
        engine.runAndWait(900_000);
    }

private static class Wrapper2Condition implements ICondition {

        @Override
        public boolean judge(OperatorWrapper wrapper) {
            OperatorResult<Integer> wrapper1Result = DagContextHolder.getOperatorResult("1");
            int result = 0;
            if (wrapper1Result != null && wrapper1Result.getResultState() == ResultState.SUCCESS) {
                result = wrapper1Result.getResult();
            }
            System.out.println("Wrapper2Condition result: " + result);
            return result == 0;
        }
    }

目前DAG图中的节点引擎只会执行一次,这个例子中节点2只有一个前继节点1,这种情况加在节点2上的condition其实是不起作用的,不管judge返回true、false当前节点都会执行;你可以试想一下,如果judge返回false,此时节点1已经执行过(state != DagState.RUNNING)不会再次执行了,那么引擎就没有机会再判断加在节点2上的condition,节点2就不可能执行了。代码中如果judge返回false时,其实有一个逻辑是看还有没有执行中的前继节点,如果没有是会执行当前节点的:
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants