Skip to content

Commit 7c6aa80

Browse files
committed
Support workflow serial strategy
1 parent eea596f commit 7c6aa80

File tree

41 files changed

+1246
-75
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1246
-75
lines changed

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowCreateRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class WorkflowCreateRequest {
5656
private int timeout;
5757

5858
@Schema(allowableValues = "PARALLEL / SERIAL_WAIT / SERIAL_DISCARD / SERIAL_PRIORITY", example = "PARALLEL", description = "default PARALLEL if not provide.")
59-
private String executionType;
59+
private String executionType = WorkflowExecutionTypeEnum.PARALLEL.name();
6060

6161
public WorkflowDefinition convert2WorkflowDefinition() {
6262
WorkflowDefinition workflowDefinition = new WorkflowDefinition();

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/PauseWorkflowInstanceExecutorDelegate.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
2222
import org.apache.dolphinscheduler.dao.entity.User;
2323
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
24+
import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
2425
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
2526
import org.apache.dolphinscheduler.extract.base.client.Clients;
2627
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
@@ -31,6 +32,7 @@
3132

3233
import org.springframework.beans.factory.annotation.Autowired;
3334
import org.springframework.stereotype.Component;
35+
import org.springframework.transaction.support.TransactionTemplate;
3436

3537
@Slf4j
3638
@Component
@@ -41,6 +43,12 @@ public class PauseWorkflowInstanceExecutorDelegate
4143
@Autowired
4244
private WorkflowInstanceDao workflowInstanceDao;
4345

46+
@Autowired
47+
private TransactionTemplate transactionTemplate;
48+
49+
@Autowired
50+
private SerialCommandDao serialCommandDao;
51+
4452
@Override
4553
public Void execute(PauseWorkflowInstanceOperation workflowInstanceControlRequest) {
4654
final WorkflowInstance workflowInstance = workflowInstanceControlRequest.workflowInstance;
@@ -64,10 +72,15 @@ private void exceptionIfWorkflowInstanceCannotPause(WorkflowInstance workflowIns
6472
}
6573

6674
private void directPauseInDB(WorkflowInstance workflowInstance) {
67-
workflowInstanceDao.updateWorkflowInstanceState(
68-
workflowInstance.getId(),
69-
workflowInstance.getState(),
70-
WorkflowExecutionStatus.PAUSE);
75+
// todo: move the pause logic to master
76+
transactionTemplate.execute(status -> {
77+
workflowInstanceDao.updateWorkflowInstanceState(
78+
workflowInstance.getId(),
79+
workflowInstance.getState(),
80+
WorkflowExecutionStatus.PAUSE);
81+
serialCommandDao.deleteByWorkflowInstanceId(workflowInstance.getId());
82+
return null;
83+
});
7184
log.info("Update workflow instance {} state from: {} to {} success",
7285
workflowInstance.getName(),
7386
workflowInstance.getState().name(),

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecutorDelegate.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
2222
import org.apache.dolphinscheduler.dao.entity.User;
2323
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
24+
import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
2425
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
2526
import org.apache.dolphinscheduler.extract.base.client.Clients;
2627
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
@@ -31,6 +32,7 @@
3132

3233
import org.springframework.beans.factory.annotation.Autowired;
3334
import org.springframework.stereotype.Component;
35+
import org.springframework.transaction.support.TransactionTemplate;
3436

3537
@Slf4j
3638
@Component
@@ -41,6 +43,12 @@ public class StopWorkflowInstanceExecutorDelegate
4143
@Autowired
4244
private WorkflowInstanceDao workflowInstanceDao;
4345

46+
@Autowired
47+
private TransactionTemplate transactionTemplate;
48+
49+
@Autowired
50+
private SerialCommandDao serialCommandDao;
51+
4452
@Override
4553
public Void execute(StopWorkflowInstanceOperation workflowInstanceControlRequest) {
4654
final WorkflowInstance workflowInstance = workflowInstanceControlRequest.workflowInstance;
@@ -65,10 +73,15 @@ void exceptionIfWorkflowInstanceCannotStop(WorkflowInstance workflowInstance) {
6573
}
6674

6775
void directStopInDB(WorkflowInstance workflowInstance) {
68-
workflowInstanceDao.updateWorkflowInstanceState(
69-
workflowInstance.getId(),
70-
workflowInstance.getState(),
71-
WorkflowExecutionStatus.STOP);
76+
// todo: move the stop logic to master
77+
transactionTemplate.execute(status -> {
78+
workflowInstanceDao.updateWorkflowInstanceState(
79+
workflowInstance.getId(),
80+
workflowInstance.getState(),
81+
WorkflowExecutionStatus.STOP);
82+
serialCommandDao.deleteByWorkflowInstanceId(workflowInstance.getId());
83+
return null;
84+
});
7285
log.info("Update workflow instance {} state from: {} to {} success",
7386
workflowInstance.getName(),
7487
workflowInstance.getState().name(),

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ public enum CommandType {
8989
STOP(9, "stop a workflow"),
9090
/**
9191
* Recover from the serial-wait state.
92-
* todo: We may need to remove these command, and use the workflow instance origin command type when notify from serial wait.
9392
*/
9493
RECOVER_SERIAL_WAIT(11, "recover serial wait"),
9594
/**

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionTypeEnum.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,8 @@ public static WorkflowExecutionTypeEnum of(int executionType) {
5656
throw new IllegalArgumentException("invalid status : " + executionType);
5757
}
5858

59+
public boolean isSerial() {
60+
return this != PARALLEL;
61+
}
62+
5963
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.dao.entity;
19+
20+
import java.util.Date;
21+
22+
import lombok.AllArgsConstructor;
23+
import lombok.Builder;
24+
import lombok.Data;
25+
import lombok.NoArgsConstructor;
26+
27+
import com.baomidou.mybatisplus.annotation.IdType;
28+
import com.baomidou.mybatisplus.annotation.TableId;
29+
import com.baomidou.mybatisplus.annotation.TableName;
30+
31+
@Data
32+
@Builder
33+
@NoArgsConstructor
34+
@AllArgsConstructor
35+
@TableName("t_ds_serial_command")
36+
public class SerialCommand {
37+
38+
@TableId(value = "id", type = IdType.AUTO)
39+
private Integer id;
40+
41+
private Integer workflowInstanceId;
42+
43+
private Long workflowDefinitionCode;
44+
45+
private Integer workflowDefinitionVersion;
46+
47+
private String command;
48+
49+
private int state;
50+
51+
private Date createTime;
52+
53+
private Date updateTime;
54+
55+
}

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkflowInstance.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -210,11 +210,6 @@ public CommandType getCmdTypeIfComplement() {
210210
return commandType;
211211
}
212212

213-
/**
214-
* set state with desc
215-
* @param state
216-
* @param stateDesc
217-
*/
218213
public void setStateWithDesc(WorkflowExecutionStatus state, String stateDesc) {
219214
this.setState(state);
220215
if (StringUtils.isEmpty(this.getStateHistory())) {
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.dao.entity;
19+
20+
import java.util.Date;
21+
22+
import lombok.AllArgsConstructor;
23+
import lombok.Builder;
24+
import lombok.Data;
25+
import lombok.NoArgsConstructor;
26+
27+
import com.baomidou.mybatisplus.annotation.IdType;
28+
import com.baomidou.mybatisplus.annotation.TableId;
29+
import com.baomidou.mybatisplus.annotation.TableName;
30+
31+
@Data
32+
@Builder
33+
@NoArgsConstructor
34+
@AllArgsConstructor
35+
@TableName("t_ds_workflow_task_lineage")
36+
public class WorkflowSerialQueue {
37+
38+
@TableId(value = "id", type = IdType.AUTO)
39+
private Integer id;
40+
41+
private long workflowDefinitionCode;
42+
43+
private int workflowInstanceId;
44+
45+
private int state;
46+
47+
private Date createTime;
48+
49+
private Date updateTime;
50+
51+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.dao.mapper;
19+
20+
import org.apache.dolphinscheduler.dao.entity.SerialCommand;
21+
22+
import org.apache.ibatis.annotations.Param;
23+
24+
import java.util.List;
25+
26+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
27+
28+
public interface SerialCommandMapper extends BaseMapper<SerialCommand> {
29+
30+
List<SerialCommand> fetchSerialCommands(@Param("fetchSize") int fetchSize);
31+
32+
void deleteByWorkflowInstanceId(@Param("workflowInstanceId") int workflowInstanceId);
33+
34+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.dao.mapper;
19+
20+
import org.apache.dolphinscheduler.dao.entity.WorkflowSerialQueue;
21+
22+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
23+
24+
public interface WorkflowSerialQueueMapper extends BaseMapper<WorkflowSerialQueue> {
25+
}

0 commit comments

Comments
 (0)