-
Notifications
You must be signed in to change notification settings - Fork 114
Open
Description
我想创建一个可以处理流式数据的工作流,例如第一个Operator从某个UDP端口上持续读取视频流,将视频流体传给第二个Operator,在第二个Operator中做目标识别,将识别结果传递给第三个Operator,第三个Operator将结果保存,这三个Operator建立DagEngine后,如果在while中调用runAndWait,工作流会每次都从第一个Operator执行到最后一个Operator吗,类似如下代码,如果在for循环中执行 engine.runAndWait(300_000);,我构建的1-2,3->4 的工作流每次都会执行一次吗
public void test() {
//请求上下文
OpConfig opConfig = OpConfigEntity.getOpConfig();
opConfig.getExtMap().put("tStart", -3.14);
opConfig.getExtMap().put("tEnd", 3.14);
opConfig.getExtMap().put("f", 10.0);
opConfig.getExtMap().put("t", 0);
System.out.println(GsonUtil.prettyPrint(opConfig));
System.out.println("==========================");
DagEngine engine = new DagEngine(opConfig, executor);
OperatorWrapper wrapper1 = new OperatorWrapper<OpConfig, OpConfig>()
.id("1")
.engine(engine)
.operator(time)
.next("2", "3");
OperatorWrapper wrapper2 = new OperatorWrapper<OpConfig, Double>()
.id("2")
.engine(engine)
.operator(sin)
.next("4")
.addParamFromWrapperId("1");
OperatorWrapper wrapper3 = new OperatorWrapper<OpConfig, Double>()
.id("3")
.engine(engine)
.operator(cos)
.next("4")
.addParamFromWrapperId("1");;
OperatorWrapper wrapper4 = new OperatorWrapper<List<Double>, Double>()
.id("4")
.engine(engine)
.operator(multiply).addParamFromWrapperId("2").addParamFromWrapperId("3");
for(int i = 0; i < 10; i++) {
engine.runAndWait(300_000);
System.out.println(GsonUtil.prettyPrint(wrapper4.getOperatorResult().getResult()));
if (engine.getEx() != null) {
engine.getEx().printStackTrace();
}
}
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels