Skip to content

Commit 763725b

Browse files
committed
[feature-15837] [api] Auto create workflow while import sql script
1 parent faaef5f commit 763725b

File tree

24 files changed

+833
-98
lines changed

24 files changed

+833
-98
lines changed

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -711,12 +711,18 @@ public Result queryAllProcessDefinitionByProjectCode(@Parameter(hidden = true) @
711711
public Result importProcessDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
712712
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
713713
@RequestParam("file") MultipartFile file) {
714-
Map<String, Object> result;
715-
if ("application/zip".equals(file.getContentType())) {
716-
result = processDefinitionService.importSqlProcessDefinition(loginUser, projectCode, file);
717-
} else {
718-
result = processDefinitionService.importProcessDefinition(loginUser, projectCode, file);
719-
}
714+
Map<String, Object> result = processDefinitionService.importProcessDefinition(loginUser, projectCode, file);
715+
return returnDataList(result);
716+
}
717+
718+
@Operation(summary = "importSqlProcessDefinition", description = "IMPORT_SQL_PROCESS_DEFINITION_NOTES")
719+
@Parameter(name = "file", description = "RESOURCE_FILE", required = true, schema = @Schema(implementation = MultipartFile.class))
720+
@PostMapping(value = "/importSql")
721+
@ApiException(IMPORT_PROCESS_DEFINE_ERROR)
722+
public Result importSqlProcessDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
723+
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
724+
@RequestParam("file") MultipartFile file) {
725+
Map<String, Object> result = processDefinitionService.importSqlProcessDefinition(loginUser, projectCode, file);
720726
return returnDataList(result);
721727
}
722728

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

Lines changed: 140 additions & 91 deletions
Large diffs are not rendered by default.
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.api.task;
19+
20+
import org.apache.dolphinscheduler.dao.entity.DataSource;
21+
22+
import java.util.Map;
23+
import java.util.Objects;
24+
25+
import lombok.Data;
26+
27+
@Data
28+
public class SqlTaskParseContext {
29+
30+
private Map<String, Object> processProperties;
31+
32+
private Map<String, Object> hints;
33+
34+
private String sql;
35+
36+
private String taskName;
37+
38+
private DataSource dataSource;
39+
40+
public <T> T hint(String key) {
41+
Object o = hints.get(key);
42+
if (Objects.nonNull(o)) {
43+
return (T) o;
44+
}
45+
o = processProperties.get(key);
46+
if (Objects.nonNull(o)) {
47+
return (T) o;
48+
}
49+
return null;
50+
}
51+
52+
public <T> T hintOrDefault(String key, T defaultValue) {
53+
T hint = hint(key);
54+
if (Objects.nonNull(hint)) {
55+
return hint;
56+
} else {
57+
return defaultValue;
58+
}
59+
}
60+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.api.task;
19+
20+
public interface SqlTaskParsePlugin {
21+
22+
String name();
23+
24+
SqlTaskParseResult parse(SqlTaskParseContext context);
25+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.api.task;
19+
20+
public interface SqlTaskParsePluginFactory {
21+
22+
SqlTaskParsePlugin build();
23+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.api.task;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.ServiceLoader;
24+
import java.util.stream.Collectors;
25+
26+
import com.google.common.collect.ImmutableMap;
27+
28+
public class SqlTaskParsePluginLoader {
29+
30+
private SqlTaskParsePluginLoader() {
31+
}
32+
33+
private static final Map<String, SqlTaskParsePlugin> PLUGIN_MAP;
34+
35+
static {
36+
List<SqlTaskParsePlugin> plugins = load();
37+
PLUGIN_MAP = ImmutableMap
38+
.copyOf(plugins.stream().collect(Collectors.toMap(SqlTaskParsePlugin::name, plugin -> plugin)));
39+
}
40+
41+
private static List<SqlTaskParsePlugin> load() {
42+
List<SqlTaskParsePlugin> plugins = new ArrayList<>();
43+
for (SqlTaskParsePluginFactory sqlTaskParsePluginFactory : ServiceLoader
44+
.load(SqlTaskParsePluginFactory.class)) {
45+
SqlTaskParsePlugin plugin = sqlTaskParsePluginFactory.build();
46+
plugins.add(plugin);
47+
}
48+
return plugins;
49+
}
50+
51+
public static List<SqlTaskParsePlugin> allSqlTaskParsePlugins() {
52+
return new ArrayList<>(PLUGIN_MAP.values());
53+
}
54+
55+
public static SqlTaskParsePlugin getSqlTaskParsePlugin(String pluginName) {
56+
return PLUGIN_MAP.get(pluginName);
57+
}
58+
59+
public static SqlTaskParsePlugin defaultSqlTaskParsePlugin() {
60+
return PLUGIN_MAP.get("hint");
61+
}
62+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.api.task;
19+
20+
import java.util.Set;
21+
22+
import lombok.Data;
23+
24+
@Data
25+
public class SqlTaskParseResult {
26+
27+
private Set<String> upstreamSet;
28+
29+
private Set<String> downstreamSet;
30+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.api.task.druid;
19+
20+
import org.apache.dolphinscheduler.api.task.SqlTaskParseContext;
21+
import org.apache.dolphinscheduler.api.task.SqlTaskParsePlugin;
22+
import org.apache.dolphinscheduler.api.task.SqlTaskParseResult;
23+
import org.apache.dolphinscheduler.dao.entity.DataSource;
24+
25+
import java.util.HashSet;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.Set;
29+
30+
import com.alibaba.druid.DbType;
31+
import com.alibaba.druid.sql.SQLUtils;
32+
import com.alibaba.druid.sql.ast.SQLStatement;
33+
import com.alibaba.druid.sql.visitor.SchemaStatVisitor;
34+
import com.alibaba.druid.stat.TableStat;
35+
36+
public class DruidSqlTaskParsePlugin implements SqlTaskParsePlugin {
37+
38+
@Override
39+
public String name() {
40+
return "druid parser";
41+
}
42+
43+
@Override
44+
public SqlTaskParseResult parse(SqlTaskParseContext context) {
45+
DbType dbType = getDbType(context.getDataSource());
46+
List<SQLStatement> sqlStatements = SQLUtils.parseStatements(context.getSql(), dbType);
47+
Set<String> upstreamList = new HashSet<>();
48+
Set<String> downstreamList = new HashSet<>();
49+
for (SQLStatement sqlStatement : sqlStatements) {
50+
SchemaStatVisitor schemaStatVisitor = SQLUtils.createSchemaStatVisitor(dbType);
51+
sqlStatement.accept(schemaStatVisitor);
52+
53+
Map<TableStat.Name, TableStat> tables = schemaStatVisitor.getTables();
54+
for (Map.Entry<TableStat.Name, TableStat> table : tables.entrySet()) {
55+
if (table.getValue().getSelectCount() > 0) {
56+
upstreamList.add(table.getKey().getName());
57+
} else {
58+
downstreamList.add(table.getKey().getName());
59+
}
60+
}
61+
}
62+
63+
SqlTaskParseResult result = new SqlTaskParseResult();
64+
result.setUpstreamSet(upstreamList);
65+
result.setDownstreamSet(downstreamList);
66+
return result;
67+
}
68+
69+
private DbType getDbType(DataSource dataSource) {
70+
switch (dataSource.getType()) {
71+
case MYSQL:
72+
return DbType.mysql;
73+
case ORACLE:
74+
return DbType.oracle;
75+
case SQLSERVER:
76+
return DbType.sqlserver;
77+
case POSTGRESQL:
78+
return DbType.postgresql;
79+
case PRESTO:
80+
return DbType.presto;
81+
case CLICKHOUSE:
82+
return DbType.clickhouse;
83+
case TRINO:
84+
return DbType.trino;
85+
case DB2:
86+
return DbType.db2;
87+
case OCEANBASE:
88+
return DbType.oceanbase;
89+
case STARROCKS:
90+
return DbType.starrocks;
91+
case H2:
92+
return DbType.h2;
93+
case HIVE:
94+
case SPARK:
95+
case KYUUBI:
96+
return DbType.hive;
97+
default:
98+
return DbType.other;
99+
}
100+
}
101+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.api.task.druid;
19+
20+
import org.apache.dolphinscheduler.api.task.SqlTaskParsePlugin;
21+
import org.apache.dolphinscheduler.api.task.SqlTaskParsePluginFactory;
22+
23+
import com.google.auto.service.AutoService;
24+
25+
@AutoService(SqlTaskParsePluginFactory.class)
26+
public class DruidSqlTaskParsePluginFactory implements SqlTaskParsePluginFactory {
27+
28+
@Override
29+
public SqlTaskParsePlugin build() {
30+
return new DruidSqlTaskParsePlugin();
31+
}
32+
}

0 commit comments

Comments
 (0)