Skip to content

Commit e946382

Browse files
authored
【OSCP】【代码季】【dataproxy】数据源增加对hive的支持 (#32)
* 实现dataproxy支持使用jdbc读取hive和oracle * 修改readme.md * [lint]为readme添加new line * 规范化代码 * 规范化代码 * hive 测试通过 * hive 测试... * hive和dameng测试通过,oracle 出现超过会话最大打开游标数 * hive dameng oracle 都通过了 * hive dameng oracle 都通过了 * delete * 修改 * 修改 * 改一个代码风格,现在如果要添加一个io源的话,只需要添加一个对应的util文件,还有对应的producer文件就行,重点的功能实现在util上,然后将producer文件添加到resources文件中即可 * 添加了hive的单元测试 * 数据库写入数据时尝试drop,如果权限不够就delete all data * 完成对hive的io添加 * 完成对hive的io添加 * 添加license header * hive单独封装为module,遗留问题是 jdbctype 和 arrrowtype互相转换的问题 * 完成单元测试 * 完成单元测试,修改hive的checktableexists函数 * 完成单元测试,修改hive的checktableexists函数 * add license header * 修改 * hive batchinsert insert support * hive batchinsert insert support * hive 类型支持 * hive 类型支持 * hive 类型支持--完成,添加hiveutiltest
1 parent 4c58e95 commit e946382

File tree

35 files changed

+2755
-46
lines changed

35 files changed

+2755
-46
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ DataProxy is a data service framework based on [Arrow Flight](https://arrow.apac
88
accesses rich data sources and provides unified, easy-to-use, efficient, and robust data reading and writing services.
99
With DataProxy:
1010

11-
* You can access various types of data sources, including MySQL, S3, Aliyun OSS, local disk, etc.
11+
* You can access various types of data sources, including MySQL, Hive, S3, Aliyun OSS, local disk, etc.
1212
* You can use a consistent read/write interface to realize read/write operations on different data sources.
1313

1414
## Documentation
@@ -22,4 +22,4 @@ Currently, we only provide detailed documentations in Chinese.
2222
Non-release version of DataProxy is only for demonstration and should not be used in production environments.
2323
Although this version of DataProxy covers the basic abilities, there may be some security issues and functional defects
2424
due to insufficient functionality and unfinished items in the project.
25-
We welcome your active suggestions and look forward to the official release.
25+
We welcome your active suggestions and look forward to the official release.

dataproxy-common/src/main/java/org/secretflow/dataproxy/common/exceptions/DataproxyErrorCode.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,11 @@ public enum DataproxyErrorCode {
9595
ODPS_TASK_NOT_READY(ErrorLevels.ERROR, ErrorTypes.BIZ, "609", "odps task not ready"),
9696
ODPS_TASK_NOT_RUN(ErrorLevels.ERROR, ErrorTypes.BIZ, "610", "odps task not run"),
9797

98-
98+
// database 异常
99+
DATABASE_CREATE_TABLE_FAILED(ErrorLevels.ERROR, ErrorTypes.BIZ, "700", "Create database table failed"),
100+
DATABASE_TABLE_NOT_EXISTS(ErrorLevels.ERROR, ErrorTypes.BIZ, "701", "database table not exist"),
101+
DATABASE_TABLE_ALREADY_EXISTS(ErrorLevels.ERROR, ErrorTypes.BIZ, "702", "database table already exists"),
102+
DATABASE_ERROR(ErrorLevels.ERROR, ErrorTypes.BIZ, "703", "database execute sql error"),
99103
//============================= 第三方错误【900-999】==================================
100104

101105
;

dataproxy-integration-tests/src/test/java/org/secretflow/dataproxy/integration/tests/BaseArrowFlightServerTest.java

Lines changed: 3 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -38,49 +38,13 @@
3838
**/
3939
public class BaseArrowFlightServerTest {
4040

41-
private static DataProxyFlightServer dataProxyFlightServer;
41+
protected static DataProxyFlightServer dataProxyFlightServer;
4242
protected FlightClient client;
4343
protected BufferAllocator allocator;
44-
private static Thread serverThread;
45-
private static final CountDownLatch SERVER_START_LATCH = new CountDownLatch(1);
44+
protected static Thread serverThread;
45+
protected static final CountDownLatch SERVER_START_LATCH = new CountDownLatch(1);
4646

47-
@BeforeAll
48-
public static void startServer() {
4947

50-
assertNotEquals("", OdpsTestUtil.getOdpsProject(), "odps project is empty");
51-
assertNotEquals("", OdpsTestUtil.getOdpsEndpoint(), "odps endpoint is empty");
52-
assertNotEquals("", OdpsTestUtil.getAccessKeyId(), "odps access key id is empty");
53-
assertNotEquals("", OdpsTestUtil.getAccessKeySecret(), "odps access key secret is empty");
54-
55-
dataProxyFlightServer = new DataProxyFlightServer(FlightServerContext.getInstance().getFlightServerConfig());
56-
57-
assertDoesNotThrow(() -> {
58-
serverThread = new Thread(() -> {
59-
try {
60-
dataProxyFlightServer.start();
61-
SERVER_START_LATCH.countDown();
62-
dataProxyFlightServer.awaitTermination();
63-
} catch (InterruptedException e) {
64-
Thread.currentThread().interrupt();
65-
} catch (Exception e) {
66-
fail("Exception was thrown: " + e.getMessage());
67-
}
68-
});
69-
});
70-
71-
assertDoesNotThrow(() -> {
72-
serverThread.start();
73-
SERVER_START_LATCH.await();
74-
});
75-
}
76-
77-
@AfterAll
78-
static void stopServer() {
79-
assertDoesNotThrow(() -> {
80-
if (dataProxyFlightServer != null) dataProxyFlightServer.close();
81-
serverThread.interrupt();
82-
});
83-
}
8448

8549
@BeforeEach
8650
public void setUp() {

dataproxy-integration-tests/src/test/java/org/secretflow/dataproxy/integration/tests/OdpsIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,4 +466,4 @@ private static String bytesToHex(byte[] bytes) {
466466
}
467467
return hexString.toString();
468468
}
469-
}
469+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
5+
<parent>
6+
<groupId>org.secretflow</groupId>
7+
<artifactId>dataproxy-plugins</artifactId>
8+
<version>0.0.1-SNAPSHOT</version>
9+
<relativePath>../pom.xml</relativePath>
10+
</parent>
11+
<artifactId>dataproxy-plugin-database</artifactId>
12+
<packaging>jar</packaging>
13+
14+
<name>dataproxy-plugin-database</name>
15+
<url>https://maven.apache.org</url>
16+
17+
<dependencies>
18+
<dependency>
19+
<groupId>org.projectlombok</groupId>
20+
<artifactId>lombok</artifactId>
21+
</dependency>
22+
23+
<!-- JUnit 5 -->
24+
<dependency>
25+
<groupId>org.junit.jupiter</groupId>
26+
<artifactId>junit-jupiter-api</artifactId>
27+
<scope>test</scope>
28+
</dependency>
29+
<dependency>
30+
<groupId>org.junit.jupiter</groupId>
31+
<artifactId>junit-jupiter-engine</artifactId>
32+
<scope>test</scope>
33+
</dependency>
34+
<dependency>
35+
<groupId>org.mockito</groupId>
36+
<artifactId>mockito-inline</artifactId>
37+
<scope>test</scope>
38+
</dependency>
39+
40+
<!-- Mockito for JUnit 5 -->
41+
<dependency>
42+
<groupId>org.mockito</groupId>
43+
<artifactId>mockito-junit-jupiter</artifactId>
44+
<scope>test</scope>
45+
</dependency>
46+
<dependency>
47+
<groupId>org.apache.arrow</groupId>
48+
<artifactId>arrow-memory-core</artifactId>
49+
</dependency>
50+
51+
</dependencies>
52+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2025 Ant Group Co., Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.secretflow.dataproxy.plugin.database.config;
18+
19+
import lombok.Getter;
20+
import org.apache.arrow.vector.types.pojo.Schema;
21+
import org.secretflow.dataproxy.plugin.database.constant.DatabaseTypeEnum;
22+
23+
24+
@Getter
25+
public abstract class DatabaseCommandConfig<T> {
26+
protected final DatabaseConnectConfig dbConnectConfig;
27+
protected final DatabaseTypeEnum dbTypeEnum;
28+
protected final T commandConfig;
29+
30+
public DatabaseCommandConfig(DatabaseConnectConfig dbConnectConfig, DatabaseTypeEnum dbTypeEnum, T commandConfig) {
31+
this.dbConnectConfig = dbConnectConfig;
32+
this.dbTypeEnum = dbTypeEnum;
33+
this.commandConfig = commandConfig;
34+
}
35+
36+
public abstract String taskRunSQL();
37+
38+
public abstract Schema getResultSchema();
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2025 Ant Group Co., Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.secretflow.dataproxy.plugin.database.config;
18+
19+
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
20+
import org.secretflow.dataproxy.common.serializer.SensitiveDataSerializer;
21+
22+
/**
23+
*
24+
* @param username database username
25+
* @param password database password
26+
* @param endpoint database endpoint
27+
* @param database database name
28+
*/
29+
public record DatabaseConnectConfig(@JsonSerialize(using = SensitiveDataSerializer.class) String username,
30+
@JsonSerialize(using = SensitiveDataSerializer.class) String password,
31+
String endpoint, String database) {
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2025 Ant Group Co., Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.secretflow.dataproxy.plugin.database.config;
18+
19+
import com.fasterxml.jackson.annotation.JsonIgnore;
20+
import org.secretflow.v1alpha1.common.Common;
21+
22+
import java.util.List;
23+
24+
public record DatabaseTableConfig(String tableName, String partition, @JsonIgnore List<Common.DataColumn> columns) {
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2025 Ant Group Co., Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.secretflow.dataproxy.plugin.database.config;
18+
19+
import org.apache.arrow.vector.types.pojo.Field;
20+
import org.apache.arrow.vector.types.pojo.Schema;
21+
import org.secretflow.dataproxy.plugin.database.constant.DatabaseTypeEnum;
22+
import org.secretflow.dataproxy.common.utils.ArrowUtil;
23+
24+
import java.util.stream.Collectors;
25+
26+
public class DatabaseTableQueryConfig extends DatabaseCommandConfig<org.secretflow.dataproxy.plugin.database.config.DatabaseTableConfig> {
27+
public DatabaseTableQueryConfig(DatabaseConnectConfig dbConnectConfig, DatabaseTableConfig readConfig) {
28+
super(dbConnectConfig, DatabaseTypeEnum.TABLE, readConfig);
29+
}
30+
31+
@Override
32+
public String taskRunSQL() {
33+
return "";
34+
}
35+
36+
@Override
37+
public Schema getResultSchema() {
38+
return new Schema(commandConfig.columns().stream()
39+
.map(column ->
40+
Field.nullable(column.getName(), ArrowUtil.parseKusciaColumnType(column.getType())))
41+
.collect(Collectors.toList()));
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2025 Ant Group Co., Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.secretflow.dataproxy.plugin.database.config;
18+
19+
import org.apache.arrow.vector.types.pojo.Field;
20+
import org.apache.arrow.vector.types.pojo.Schema;
21+
import org.secretflow.dataproxy.plugin.database.constant.DatabaseTypeEnum;
22+
import org.secretflow.dataproxy.common.utils.ArrowUtil;
23+
24+
import java.util.stream.Collectors;
25+
26+
public class DatabaseWriteConfig extends DatabaseCommandConfig<DatabaseTableConfig> {
27+
public DatabaseWriteConfig(DatabaseConnectConfig dbConnectConfig, DatabaseTableConfig readConfig) {
28+
super(dbConnectConfig, DatabaseTypeEnum.TABLE, readConfig);
29+
}
30+
31+
@Override
32+
public String taskRunSQL() {
33+
return "";
34+
}
35+
36+
@Override
37+
public Schema getResultSchema() {
38+
return new Schema(commandConfig.columns().stream()
39+
.map(column ->
40+
Field.nullable(column.getName(), ArrowUtil.parseKusciaColumnType(column.getType())))
41+
.collect(Collectors.toList()));
42+
}
43+
}

0 commit comments

Comments
 (0)