Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# Apache Paimon Trino Connector

This repository is Trino Connector for the [Apache Paimon](https://paimon.apache.org/) project.

ps:
Implement the functionality of writing data into Paimon tables using a higher-version
Trino (with Paimon-Trino integration), given the environment details of Paimon 1.0.0
and Trino 427. deal the issues#84;use the source code of commit 2a8aab975be1265583897ac5e0e28d02c8683142
## About

Apache Paimon is an open source project of [The Apache Software Foundation](https://apache.org/) (ASF).
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

package org.apache.paimon.trino;

import org.apache.paimon.trino.catalog.TrinoCatalog;

import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
Expand All @@ -40,19 +43,22 @@ public class TrinoConnector implements Connector {
private final ConnectorPageSourceProvider trinoPageSourceProvider;
private final List<PropertyMetadata<?>> tableProperties;
private final List<PropertyMetadata<?>> sessionProperties;
private final ConnectorPageSinkProvider pageSinkProvider;

public TrinoConnector(
ConnectorMetadata trinoMetadata,
ConnectorSplitManager trinoSplitManager,
ConnectorPageSourceProvider trinoPageSourceProvider,
TrinoTableOptions trinoTableOptions,
TrinoSessionProperties trinoSessionProperties) {
TrinoSessionProperties trinoSessionProperties,
TrinoCatalog catalog) {
this.trinoMetadata = requireNonNull(trinoMetadata, "trinoMetadata is null");
this.trinoSplitManager = requireNonNull(trinoSplitManager, "trinoSplitManager is null");
this.trinoPageSourceProvider =
requireNonNull(trinoPageSourceProvider, "trinoRecordSetProvider is null");
this.tableProperties = trinoTableOptions.getTableProperties();
this.sessionProperties = trinoSessionProperties.getSessionProperties();
this.pageSinkProvider = new TrinoPageSinkProvider(catalog);
}

@Override
Expand Down Expand Up @@ -87,4 +93,9 @@ public List<PropertyMetadata<?>> getSessionProperties() {
public List<PropertyMetadata<?>> getTableProperties() {
return tableProperties;
}

@Override
public ConnectorPageSinkProvider getPageSinkProvider() {
return pageSinkProvider;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.paimon.trino;

import org.apache.paimon.trino.catalog.TrinoCatalog;
import org.apache.paimon.utils.StringUtils;

import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
Expand All @@ -38,14 +41,29 @@
import io.trino.spi.connector.ConnectorContext;
import io.trino.spi.connector.ConnectorFactory;
import io.trino.spi.type.TypeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;

import javax.xml.parsers.DocumentBuilderFactory;

import java.io.File;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;

/** Trino {@link ConnectorFactory}. */
public class TrinoConnectorFactory implements ConnectorFactory {

private static final Logger LOG = LoggerFactory.getLogger(TrinoConnectorFactory.class);
// see https://trino.io/docs/current/connector/hive.html#hive-general-configuration-properties
private static final String HADOOP_CONF_FILES_KEY = "hive.config.resources";
// see org.apache.paimon.utils.HadoopUtils
private static final String HADOOP_CONF_PREFIX = "hadoop.";

@Override
public String getName() {
return "paimon";
Expand All @@ -62,6 +80,19 @@ public Connector create(
Map<String, String> config,
ConnectorContext context,
Module module) {
config = new HashMap<>(config);
if (config.containsKey(HADOOP_CONF_FILES_KEY)) {
for (String hadoopXml : config.get(HADOOP_CONF_FILES_KEY).split(",")) {
try {
readHadoopXml(hadoopXml, config);
} catch (Exception e) {
LOG.warn(
"Failed to read hadoop xml file " + hadoopXml + ", skipping this file.",
e);
}
}
}

ClassLoader classLoader = TrinoConnectorFactory.class.getClassLoader();
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
Bootstrap app =
Expand Down Expand Up @@ -95,6 +126,7 @@ public Connector create(
.initialize();

TrinoMetadata trinoMetadata = injector.getInstance(TrinoMetadataFactory.class).create();
TrinoCatalog catalog = trinoMetadata.catalog();
TrinoSplitManager trinoSplitManager = injector.getInstance(TrinoSplitManager.class);
TrinoPageSourceProvider trinoPageSourceProvider =
injector.getInstance(TrinoPageSourceProvider.class);
Expand All @@ -108,7 +140,34 @@ public Connector create(
new ClassLoaderSafeConnectorPageSourceProvider(
trinoPageSourceProvider, classLoader),
trinoTableOptions,
trinoSessionProperties);
trinoSessionProperties,
catalog);
}
}

private static void readHadoopXml(String path, Map<String, String> config) throws Exception {
path = path.trim();
if (path.isEmpty()) {
return;
}

File xmlFile = new File(path);
NodeList propertyNodes =
DocumentBuilderFactory.newInstance()
.newDocumentBuilder()
.parse(xmlFile)
.getElementsByTagName("property");
for (int i = 0; i < propertyNodes.getLength(); i++) {
Node propertyNode = propertyNodes.item(i);
if (propertyNode.getNodeType() == 1) {
Element propertyElement = (Element) propertyNode;
String key = propertyElement.getElementsByTagName("name").item(0).getTextContent();
String value =
propertyElement.getElementsByTagName("value").item(0).getTextContent();
if (!StringUtils.isNullOrWhitespaceOnly(value)) {
config.putIfAbsent(HADOOP_CONF_PREFIX + key, value);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.trino;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.type.Type;

import java.util.List;
import java.util.Objects;

/** Trino {@link TrinoInsertTableHandle}. */
public class TrinoInsertTableHandle
implements ConnectorInsertTableHandle, ConnectorOutputTableHandle {
private final String schemaName;
private final String tableName;
private final List<String> columnNames;
private final List<Type> columnTypes;

@JsonCreator
public TrinoInsertTableHandle(
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("columnNames") List<String> columnNames,
@JsonProperty("columnTypes") List<Type> columnTypes) {
this.schemaName = schemaName;
this.tableName = tableName;
this.columnNames = ImmutableList.copyOf(columnNames);
this.columnTypes = ImmutableList.copyOf(columnTypes);
}

@JsonProperty
public String getSchemaName() {
return schemaName;
}

@JsonProperty
public String getTableName() {
return tableName;
}

@JsonProperty
public List<String> getColumnNames() {
return columnNames;
}

@JsonProperty
public List<Type> getColumnTypes() {
return columnTypes;
}

@Override
public String toString() {
return "paimon:" + schemaName + "." + tableName;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TrinoInsertTableHandle that = (TrinoInsertTableHandle) o;
return Objects.equals(schemaName, that.schemaName)
&& Objects.equals(tableName, that.tableName)
&& Objects.equals(columnNames, that.columnNames)
&& Objects.equals(columnTypes, that.columnTypes);
}

@Override
public int hashCode() {
return Objects.hash(schemaName, tableName, columnNames, columnTypes);
}
}
Loading