diff --git a/README.md b/README.md index 8be9580..30651ed 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,10 @@ # Apache Paimon Trino Connector This repository is Trino Connector for the [Apache Paimon](https://paimon.apache.org/) project. - +ps: +Implement the functionality of writing data into Paimon tables using a higher-version +Trino (with Paimon-Trino integration), given the environment details of Paimon 1.0.0 +and Trino 427. deal the issues#84;use the source code of commit 2a8aab975be1265583897ac5e0e28d02c8683142 ## About Apache Paimon is an open source project of [The Apache Software Foundation](https://apache.org/) (ASF). diff --git a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnector.java b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnector.java index 9097247..bc15d6b 100644 --- a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnector.java +++ b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnector.java @@ -18,8 +18,11 @@ package org.apache.paimon.trino; +import org.apache.paimon.trino.catalog.TrinoCatalog; + import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorPageSourceProvider; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitManager; @@ -40,19 +43,22 @@ public class TrinoConnector implements Connector { private final ConnectorPageSourceProvider trinoPageSourceProvider; private final List> tableProperties; private final List> sessionProperties; + private final ConnectorPageSinkProvider pageSinkProvider; public TrinoConnector( ConnectorMetadata trinoMetadata, ConnectorSplitManager trinoSplitManager, ConnectorPageSourceProvider trinoPageSourceProvider, TrinoTableOptions trinoTableOptions, - TrinoSessionProperties trinoSessionProperties) { + TrinoSessionProperties trinoSessionProperties, + TrinoCatalog catalog) { this.trinoMetadata = requireNonNull(trinoMetadata, "trinoMetadata is null"); this.trinoSplitManager = requireNonNull(trinoSplitManager, "trinoSplitManager is null"); this.trinoPageSourceProvider = requireNonNull(trinoPageSourceProvider, "trinoRecordSetProvider is null"); this.tableProperties = trinoTableOptions.getTableProperties(); this.sessionProperties = trinoSessionProperties.getSessionProperties(); + this.pageSinkProvider = new TrinoPageSinkProvider(catalog); } @Override @@ -87,4 +93,9 @@ public List> getSessionProperties() { public List> getTableProperties() { return tableProperties; } + + @Override + public ConnectorPageSinkProvider getPageSinkProvider() { + return pageSinkProvider; + } } diff --git a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java index f6ad6a8..07ebba8 100644 --- a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java +++ b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoConnectorFactory.java @@ -18,6 +18,9 @@ package org.apache.paimon.trino; +import org.apache.paimon.trino.catalog.TrinoCatalog; +import org.apache.paimon.utils.StringUtils; + import com.google.inject.Binder; import com.google.inject.Injector; import com.google.inject.Module; @@ -38,14 +41,29 @@ import io.trino.spi.connector.ConnectorContext; import io.trino.spi.connector.ConnectorFactory; import io.trino.spi.type.TypeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +import javax.xml.parsers.DocumentBuilderFactory; +import java.io.File; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; import java.util.Map; /** Trino {@link ConnectorFactory}. */ public class TrinoConnectorFactory implements ConnectorFactory { + private static final Logger LOG = LoggerFactory.getLogger(TrinoConnectorFactory.class); + // see https://trino.io/docs/current/connector/hive.html#hive-general-configuration-properties + private static final String HADOOP_CONF_FILES_KEY = "hive.config.resources"; + // see org.apache.paimon.utils.HadoopUtils + private static final String HADOOP_CONF_PREFIX = "hadoop."; + @Override public String getName() { return "paimon"; @@ -62,6 +80,19 @@ public Connector create( Map config, ConnectorContext context, Module module) { + config = new HashMap<>(config); + if (config.containsKey(HADOOP_CONF_FILES_KEY)) { + for (String hadoopXml : config.get(HADOOP_CONF_FILES_KEY).split(",")) { + try { + readHadoopXml(hadoopXml, config); + } catch (Exception e) { + LOG.warn( + "Failed to read hadoop xml file " + hadoopXml + ", skipping this file.", + e); + } + } + } + ClassLoader classLoader = TrinoConnectorFactory.class.getClassLoader(); try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { Bootstrap app = @@ -95,6 +126,7 @@ public Connector create( .initialize(); TrinoMetadata trinoMetadata = injector.getInstance(TrinoMetadataFactory.class).create(); + TrinoCatalog catalog = trinoMetadata.catalog(); TrinoSplitManager trinoSplitManager = injector.getInstance(TrinoSplitManager.class); TrinoPageSourceProvider trinoPageSourceProvider = injector.getInstance(TrinoPageSourceProvider.class); @@ -108,7 +140,34 @@ public Connector create( new ClassLoaderSafeConnectorPageSourceProvider( trinoPageSourceProvider, classLoader), trinoTableOptions, - trinoSessionProperties); + trinoSessionProperties, + catalog); + } + } + + private static void readHadoopXml(String path, Map config) throws Exception { + path = path.trim(); + if (path.isEmpty()) { + return; + } + + File xmlFile = new File(path); + NodeList propertyNodes = + DocumentBuilderFactory.newInstance() + .newDocumentBuilder() + .parse(xmlFile) + .getElementsByTagName("property"); + for (int i = 0; i < propertyNodes.getLength(); i++) { + Node propertyNode = propertyNodes.item(i); + if (propertyNode.getNodeType() == 1) { + Element propertyElement = (Element) propertyNode; + String key = propertyElement.getElementsByTagName("name").item(0).getTextContent(); + String value = + propertyElement.getElementsByTagName("value").item(0).getTextContent(); + if (!StringUtils.isNullOrWhitespaceOnly(value)) { + config.putIfAbsent(HADOOP_CONF_PREFIX + key, value); + } + } } } diff --git a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoInsertTableHandle.java b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoInsertTableHandle.java new file mode 100644 index 0000000..5b8958d --- /dev/null +++ b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoInsertTableHandle.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.trino; + +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.trino.spi.connector.ConnectorInsertTableHandle; +import io.trino.spi.connector.ConnectorOutputTableHandle; +import io.trino.spi.type.Type; + +import java.util.List; +import java.util.Objects; + +/** Trino {@link TrinoInsertTableHandle}. */ +public class TrinoInsertTableHandle + implements ConnectorInsertTableHandle, ConnectorOutputTableHandle { + private final String schemaName; + private final String tableName; + private final List columnNames; + private final List columnTypes; + + @JsonCreator + public TrinoInsertTableHandle( + @JsonProperty("schemaName") String schemaName, + @JsonProperty("tableName") String tableName, + @JsonProperty("columnNames") List columnNames, + @JsonProperty("columnTypes") List columnTypes) { + this.schemaName = schemaName; + this.tableName = tableName; + this.columnNames = ImmutableList.copyOf(columnNames); + this.columnTypes = ImmutableList.copyOf(columnTypes); + } + + @JsonProperty + public String getSchemaName() { + return schemaName; + } + + @JsonProperty + public String getTableName() { + return tableName; + } + + @JsonProperty + public List getColumnNames() { + return columnNames; + } + + @JsonProperty + public List getColumnTypes() { + return columnTypes; + } + + @Override + public String toString() { + return "paimon:" + schemaName + "." + tableName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TrinoInsertTableHandle that = (TrinoInsertTableHandle) o; + return Objects.equals(schemaName, that.schemaName) + && Objects.equals(tableName, that.tableName) + && Objects.equals(columnNames, that.columnNames) + && Objects.equals(columnTypes, that.columnTypes); + } + + @Override + public int hashCode() { + return Objects.hash(schemaName, tableName, columnNames, columnTypes); + } +} diff --git a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadata.java b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadata.java index 926ca64..6b9d3ae 100644 --- a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadata.java +++ b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadata.java @@ -21,17 +21,22 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.fs.Path; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.Table; import org.apache.paimon.trino.catalog.TrinoCatalog; import org.apache.paimon.utils.StringUtils; +import io.airlift.slice.Slice; import io.trino.spi.TrinoException; import io.trino.spi.connector.Assignment; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorInsertTableHandle; import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorOutputMetadata; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTableMetadata; @@ -41,16 +46,21 @@ import io.trino.spi.connector.ConstraintApplicationResult; import io.trino.spi.connector.LimitApplicationResult; import io.trino.spi.connector.ProjectionApplicationResult; +import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SchemaTablePrefix; import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.predicate.Domain; import io.trino.spi.security.TrinoPrincipal; +import io.trino.spi.statistics.ComputedStatistics; import io.trino.spi.type.LongTimestampWithTimeZone; import io.trino.spi.type.TimestampWithTimeZoneType; import io.trino.spi.type.Type; +import io.trino.spi.type.VarcharType; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -73,6 +83,7 @@ /** Trino {@link ConnectorMetadata}. */ public class TrinoMetadata implements ConnectorMetadata { + private static final String TAG_PREFIX = "tag-"; protected final TrinoCatalog catalog; @@ -87,7 +98,12 @@ public TrinoCatalog catalog() { @Override public boolean schemaExists(ConnectorSession session, String schemaName) { catalog.initSession(session); - return catalog.databaseExists(schemaName); + try { + catalog.getDatabase(schemaName); + return true; + } catch (Catalog.DatabaseNotExistException e) { + return false; + } } @Override @@ -167,9 +183,48 @@ public ConnectorTableHandle getTableHandle( } case TARGET_ID: { - dynamicOptions.put( - CoreOptions.SCAN_SNAPSHOT_ID.key(), - version.getVersion().toString()); + String tagOrVersion; + if (versionType instanceof VarcharType) { + tagOrVersion = + BinaryString.fromBytes( + ((Slice) version.getVersion()).getBytes()) + .toString(); + } else { + tagOrVersion = version.getVersion().toString(); + } + + // if value is not number, set tag option + boolean isNumber = StringUtils.isNumeric(tagOrVersion); + if (!isNumber) { + dynamicOptions.put(CoreOptions.SCAN_TAG_NAME.key(), tagOrVersion); + } else { + try { + catalog.initSession(session); + String path = + catalog.getTable( + new Identifier( + tableName.getSchemaName(), + tableName.getTableName())) + .options() + .get("path"); + + if (catalog.fileIO() + .exists( + new Path( + path + + "/tag/" + + TAG_PREFIX + + tagOrVersion))) { + dynamicOptions.put( + CoreOptions.SCAN_TAG_NAME.key(), tagOrVersion); + } else { + dynamicOptions.put( + CoreOptions.SCAN_SNAPSHOT_ID.key(), tagOrVersion); + } + } catch (IOException | Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + } break; } } @@ -193,11 +248,14 @@ public TrinoTableHandle getTableHandle( SchemaTableName tableName, Map dynamicOptions) { catalog.initSession(session); - return catalog.tableExists( - Identifier.create(tableName.getSchemaName(), tableName.getTableName())) - ? new TrinoTableHandle( - tableName.getSchemaName(), tableName.getTableName(), dynamicOptions) - : null; + try { + catalog.getTable( + Identifier.create(tableName.getSchemaName(), tableName.getTableName())); + return new TrinoTableHandle( + tableName.getSchemaName(), tableName.getTableName(), dynamicOptions); + } catch (Catalog.TableNotExistException e) { + return null; + } } @Override @@ -501,4 +559,44 @@ public Optional> applyLimit( return Optional.of(new LimitApplicationResult<>(table, false, false)); } + + @Override + public ConnectorInsertTableHandle beginInsert( + ConnectorSession session, + ConnectorTableHandle tableHandle, + List columns, + RetryMode retryMode) { + TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle; + String schema = trinoTableHandle.getSchemaName(); + String tableName = trinoTableHandle.getTableName(); + Identifier tablePath = new Identifier(schema, tableName); + Table table = null; + try { + catalog.initSession(session); + table = catalog.getTable(tablePath); + } catch (Catalog.TableNotExistException e) { + return null; + } + List columnNames = + columns.stream() + .map(TrinoColumnHandle.class::cast) + .map(TrinoColumnHandle::getColumnName) + .collect(Collectors.toList()); + List columnTypes = + columns.stream() + .map(TrinoColumnHandle.class::cast) + .map(TrinoColumnHandle::getTrinoType) + .collect(Collectors.toList()); + + return new TrinoInsertTableHandle(schema, tableName, columnNames, columnTypes); + } + + @Override + public Optional finishInsert( + ConnectorSession session, + ConnectorInsertTableHandle insertHandle, + Collection fragments, + Collection computedStatistics) { + return Optional.empty(); + } } diff --git a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSink.java b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSink.java new file mode 100644 index 0000000..33f4359 --- /dev/null +++ b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSink.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.trino; + +import org.apache.paimon.trino.catalog.TrinoCatalog; + +import io.trino.spi.type.Type; + +import java.util.List; + +/** Trino {@link TrinoPageSinkBase}. */ +public class TrinoPageSink extends TrinoPageSinkBase { + + public TrinoPageSink( + TrinoCatalog catalog, List columnTypes, String schemaName, String tableName) { + super(catalog, columnTypes, schemaName, tableName); + } +} diff --git a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSinkBase.java b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSinkBase.java new file mode 100644 index 0000000..43277d7 --- /dev/null +++ b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSinkBase.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.trino; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; +import org.apache.paimon.shade.guava30.com.google.common.primitives.Shorts; +import org.apache.paimon.shade.guava30.com.google.common.primitives.SignedBytes; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.trino.catalog.TrinoCatalog; + +import io.airlift.slice.Slice; +import io.trino.spi.Page; +import io.trino.spi.block.Block; +import io.trino.spi.connector.ConnectorPageSink; +import io.trino.spi.type.BigintType; +import io.trino.spi.type.BooleanType; +import io.trino.spi.type.CharType; +import io.trino.spi.type.DateType; +import io.trino.spi.type.DecimalType; +import io.trino.spi.type.DoubleType; +import io.trino.spi.type.IntegerType; +import io.trino.spi.type.RealType; +import io.trino.spi.type.SmallintType; +import io.trino.spi.type.TinyintType; +import io.trino.spi.type.Type; +import io.trino.spi.type.VarbinaryType; +import io.trino.spi.type.VarcharType; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc; +import static io.trino.spi.type.Decimals.readBigDecimal; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS; +import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; +import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND; +import static java.lang.Float.intBitsToFloat; +import static java.lang.Math.toIntExact; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.CompletableFuture.completedFuture; + +/** Trino {@link ConnectorPageSink}. */ +public class TrinoPageSinkBase implements ConnectorPageSink { + private final List columnTypes; + private final Table table; + private final BatchWriteBuilder writeBuilder; + private final BatchTableWrite write; + private List messageList = new ArrayList<>(); + + public TrinoPageSinkBase( + TrinoCatalog catalog, List columnTypes, String schemaName, String tableName) { + this.columnTypes = ImmutableList.copyOf(requireNonNull(columnTypes, "columnTypes is null")); + try { + Identifier identifier = Identifier.create(schemaName, tableName); + table = catalog.getTable(identifier); + writeBuilder = table.newBatchWriteBuilder(); + write = writeBuilder.newWrite(); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + } + + @Override + public CompletableFuture appendPage(Page page) { + writePage(page, write); + try { + messageList.addAll(write.prepareCommit()); + } catch (Exception e) { + throw new RuntimeException(e); + } + return NOT_BLOCKED; + } + + private void writePage(Page chunk, BatchTableWrite write) { + for (int position = 0; position < chunk.getPositionCount(); position++) { + GenericRow record = new GenericRow(chunk.getChannelCount()); + for (int channel = 0; channel < chunk.getChannelCount(); channel++) { + Block block = chunk.getBlock(channel); + Type type = columnTypes.get(channel); + Object value = formatValue(block, type, position); + record.setField(channel, value); + } + try { + write.write(record); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + @Override + public CompletableFuture> finish() { + try { + if (!messageList.isEmpty()) { + BatchTableCommit commit = writeBuilder.newCommit(); + commit.commit(messageList); + } else { + write.close(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return completedFuture(ImmutableList.of()); + } + + @Override + public void abort() { + try { + write.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Object formatValue(Block block, Type type, int position) { + if (block.isNull(position)) { + return null; + } + + if (type instanceof BooleanType) { + return type.getBoolean(block, position); + } + + if (type instanceof TinyintType) { + return SignedBytes.checkedCast(type.getLong(block, position)); + } + + if (type instanceof SmallintType) { + return Shorts.checkedCast(type.getLong(block, position)); + } + + if (type instanceof IntegerType) { + return toIntExact(type.getLong(block, position)); + } + + if (type instanceof BigintType) { + return type.getLong(block, position); + } + + if (type instanceof RealType) { + return intBitsToFloat(toIntExact(type.getLong(block, position))); + } + + if (type instanceof DoubleType) { + return type.getDouble(block, position); + } + + if (type instanceof DateType) { + return toIntExact(type.getLong(block, position)); + } + + if (type.equals(TIMESTAMP_MILLIS)) { + return Timestamp.fromEpochMillis( + type.getLong(block, position) / MICROSECONDS_PER_MILLISECOND); + } + + if (type.equals(TIMESTAMP_TZ_MILLIS)) { + long millisUtc = unpackMillisUtc(type.getLong(block, position)); + return Timestamp.fromEpochMillis(millisUtc); + } + + if (type instanceof CharType) { + return BinaryString.fromBytes(type.getSlice(block, position).getBytes()); + } + + if (type instanceof VarcharType) { + return BinaryString.fromBytes(type.getSlice(block, position).getBytes()); + } + + if (type instanceof VarbinaryType) { + return type.getSlice(block, position).getBytes(); + } + + if (type instanceof DecimalType) { + DecimalType decimalType = (DecimalType) type; + BigDecimal value = readBigDecimal(decimalType, block, position); + return Decimal.fromBigDecimal( + value, decimalType.getPrecision(), decimalType.getScale()); + } + + throw new UnsupportedOperationException("Unsupported type: " + type); + } +} diff --git a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java new file mode 100644 index 0000000..b0f2e82 --- /dev/null +++ b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.trino; + +import org.apache.paimon.trino.catalog.TrinoCatalog; + +import io.trino.spi.connector.ConnectorInsertTableHandle; +import io.trino.spi.connector.ConnectorOutputTableHandle; +import io.trino.spi.connector.ConnectorPageSink; +import io.trino.spi.connector.ConnectorPageSinkId; +import io.trino.spi.connector.ConnectorPageSinkProvider; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTransactionHandle; + +import static java.util.Objects.requireNonNull; + +/** Trino {@link ConnectorPageSinkProvider}. */ +public class TrinoPageSinkProvider implements ConnectorPageSinkProvider { + private final TrinoCatalog catalog; + + public TrinoPageSinkProvider(TrinoCatalog catalog) { + this.catalog = requireNonNull(catalog, "catalog is null"); + } + + @Override + public ConnectorPageSink createPageSink( + ConnectorTransactionHandle transactionHandle, + ConnectorSession session, + ConnectorOutputTableHandle outputTableHandle, + ConnectorPageSinkId pageSinkId) { + requireNonNull(outputTableHandle, "outputTableHandle is null"); + TrinoInsertTableHandle handle = (TrinoInsertTableHandle) outputTableHandle; + return new TrinoPageSink( + catalog, handle.getColumnTypes(), handle.getSchemaName(), handle.getTableName()); + } + + @Override + public ConnectorPageSink createPageSink( + ConnectorTransactionHandle transactionHandle, + ConnectorSession session, + ConnectorInsertTableHandle insertTableHandle, + ConnectorPageSinkId pageSinkId) { + requireNonNull(insertTableHandle, "insertTableHandle is null"); + TrinoInsertTableHandle handle = (TrinoInsertTableHandle) insertTableHandle; + catalog.initSession(session); + return new TrinoPageSink( + catalog, handle.getColumnTypes(), handle.getSchemaName(), handle.getTableName()); + } +} diff --git a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java index a380e42..ca9f053 100644 --- a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java +++ b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java @@ -82,16 +82,14 @@ public class TrinoPageSourceProvider implements ConnectorPageSourceProvider { private final TrinoFileSystemFactory fileSystemFactory; - private final TrinoCatalog trinoCatalog; + private final TrinoMetadataFactory trinoMetadataFactory; @Inject public TrinoPageSourceProvider( TrinoFileSystemFactory fileSystemFactory, TrinoMetadataFactory trinoMetadataFactory) { this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); - this.trinoCatalog = - requireNonNull(trinoMetadataFactory, "trinoMetadataFactory is null") - .create() - .catalog(); + this.trinoMetadataFactory = + requireNonNull(trinoMetadataFactory, "trinoMetadataFactory is null"); } @Override @@ -102,9 +100,11 @@ public ConnectorPageSource createPageSource( ConnectorTableHandle tableHandle, List columns, DynamicFilter dynamicFilter) { - trinoCatalog.initSession(session); + TrinoMetadata metadata = trinoMetadataFactory.create(); + TrinoCatalog catalog = metadata.catalog(); + catalog.initSession(session); TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle; - Table table = trinoTableHandle.tableWithDynamicOptions(trinoCatalog, session); + Table table = trinoTableHandle.tableWithDynamicOptions(catalog, session); return runWithContextClassLoader( () -> createPageSource( @@ -166,7 +166,7 @@ private ConnectorPageSource createPageSource( new Path(indexFile.path()), ((FileStoreTable) table).fileIO(), rowType)) { - if (!fileIndexPredicate.testPredicate(paimonFilter.get())) { + if (!fileIndexPredicate.evaluate(paimonFilter.get()).remain()) { continue; } } diff --git a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java index c0b9344..bcde932 100644 --- a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java +++ b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java @@ -108,8 +108,6 @@ private static boolean isEnum(String className) { private static Class buildClass(String className) { switch (className) { - case "FileFormatType": - return CoreOptions.FileFormatType.class; case "MergeEngine": return CoreOptions.MergeEngine.class; case "ChangelogProducer": diff --git a/paimon-trino-common/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java b/paimon-trino-common/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java index 2caab17..97bf037 100644 --- a/paimon-trino-common/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java +++ b/paimon-trino-common/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java @@ -23,12 +23,12 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; -import org.apache.paimon.catalog.CatalogLockContext; -import org.apache.paimon.catalog.CatalogLockFactory; +import org.apache.paimon.catalog.Database; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.catalog.PropertyChange; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.options.Options; +import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.security.SecurityContext; @@ -43,7 +43,6 @@ import java.util.List; import java.util.Map; -import java.util.Optional; /** Trino catalog, use it after set session. */ public class TrinoCatalog implements Catalog { @@ -120,32 +119,38 @@ public FileIO fileIO() { return current.fileIO(); } - @Override - public Optional lockFactory() { - return current.lockFactory(); - } + // @Override + // public Optional lockFactory() { + // return current.lockFactory(); + // } @Override public List listDatabases() { return current.listDatabases(); } - @Override - public boolean databaseExists(String s) { - return current.databaseExists(s); - } + // @Override + // public boolean databaseExists(String s) { + // return current.databaseExists(s); + // } @Override public void createDatabase(String s, boolean b, Map map) - throws DatabaseAlreadyExistException { + throws Catalog.DatabaseAlreadyExistException { current.createDatabase(s, b, map); } @Override - public Map loadDatabaseProperties(String s) throws DatabaseNotExistException { - return current.loadDatabaseProperties(s); + public Database getDatabase(String s) throws DatabaseNotExistException { + return current.getDatabase(s); } + // @Override + // public Map loadDatabaseProperties(String s) throws + // DatabaseNotExistException { + // return current.loadDatabaseProperties(s); + // } + @Override public void dropDatabase(String s, boolean b, boolean b1) throws DatabaseNotExistException, DatabaseNotEmptyException { @@ -153,7 +158,13 @@ public void dropDatabase(String s, boolean b, boolean b1) } @Override - public Table getTable(Identifier identifier) throws TableNotExistException { + public void alterDatabase(String s, List list, boolean b) + throws DatabaseNotExistException { + current.alterDatabase(s, list, b); + } + + @Override + public Table getTable(Identifier identifier) throws Catalog.TableNotExistException { return current.getTable(identifier); } @@ -185,12 +196,23 @@ public void alterTable(Identifier identifier, List list, boolean i current.alterTable(identifier, list, ignoreIfExists); } + @Override + public void createPartition(Identifier identifier, Map map) + throws TableNotExistException { + current.createPartition(identifier, map); + } + @Override public void dropPartition(Identifier identifier, Map partitions) throws TableNotExistException, PartitionNotExistException { current.dropPartition(identifier, partitions); } + @Override + public List listPartitions(Identifier identifier) throws TableNotExistException { + return current.listPartitions(identifier); + } + @Override public void close() throws Exception { if (current != null) { @@ -198,15 +220,15 @@ public void close() throws Exception { } } - @Override - public Optional lockContext() { - return current.lockContext(); - } - - @Override - public Optional metastoreClientFactory(Identifier identifier) { - return current.metastoreClientFactory(identifier); - } + // @Override + // public Optional lockContext() { + // return current.lockContext(); + // } + // + // @Override + // public Optional metastoreClientFactory(Identifier identifier) { + // return current.metastoreClientFactory(identifier); + // } @Override public void createDatabase(String name, boolean ignoreIfExists) @@ -214,10 +236,10 @@ public void createDatabase(String name, boolean ignoreIfExists) current.createDatabase(name, ignoreIfExists); } - @Override - public boolean tableExists(Identifier identifier) { - return current.tableExists(identifier); - } + // @Override + // public boolean tableExists(Identifier identifier) { + // return current.tableExists(identifier); + // } @Override public void alterTable(Identifier identifier, SchemaChange change, boolean ignoreIfNotExists) diff --git a/pom.xml b/pom.xml index ca8202d..1a5c92d 100644 --- a/pom.xml +++ b/pom.xml @@ -59,7 +59,7 @@ under the License. - 0.8.0 + 1.0.0 11 5.8.1 2.0.7