-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Open
Labels
IcebergIOIcebergIO: can only be used through ManagedIOIcebergIO: can only be used through ManagedIOP2awaiting triagebugflinkiojava
Description
Description
Writing Iceberg files to S3 with AWS Glue Data Catalog fails with IllegalStateException: Connection pool shut down
error starting in Beam v2.63. The pipeline works correctly in v2.62.
Environment
- Beam Version: 2.63+
- Runner: Direc Runner or Flink Runner
- Storage: S3
- Catalog: AWS Glue Data Catalog
- File Format: Parquet
Steps to Reproduce
- Configure IcebergIO to write to S3 with AWS Glue Data Catalog
- Use Beam 2.63 or later
- Execute a pipeline that writes data using IcebergIO
Stack Trace
java.lang.IllegalStateException: Connection pool shut down
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(...)
at software.amazon.awssdk.http.apache.ApacheHttpClient.execute(...)
at software.amazon.awssdk.services.s3.DefaultS3Client.putObject(...)
at org.apache.iceberg.aws.s3.S3OutputStream.completeUploads(S3OutputStream.java:444)
at org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:270)
at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1204)
at org.apache.iceberg.parquet.ParquetWriter.close(ParquetWriter.java:257)
at org.apache.iceberg.io.DataWriter.close(DataWriter.java:82)
at org.apache.beam.sdk.io.iceberg.RecordWriter.close(RecordWriter.java:116)
at org.apache.beam.sdk.io.iceberg.RecordWriterManager$DestinationState.lambda$new$0(RecordWriterManager.java:131)
at com.google.common.cache.LocalCache.processPendingNotifications(LocalCache.java:1850)
Caused by: java.io.IOException: Failed to close PARQUET writer for table
at org.apache.beam.sdk.io.iceberg.RecordWriter.close(RecordWriter.java:132)
... 10 more
Caused by: java.lang.IllegalArgumentException: Expected all data writers to be closed, but found 1 data writer(s) still open
at org.apache.beam.sdk.io.iceberg.RecordWriterManager.close(RecordWriterManager.java:363)
at org.apache.beam.sdk.io.iceberg.WriteGroupedRowsToFiles$WriteGroupedRowsToFilesDoFn.processElement(WriteGroupedRowsToFiles.java:109)
Root Cause
I try to identify the root cause of the issue, but I think the issue is related to the way the RecordWriter is closed in sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
try (FileIO io = table.io()) {
OutputFile tmpFile = io.newOutputFile(absoluteFilename);
EncryptedOutputFile encryptedOutputFile = table.encryption().encrypt(tmpFile);
outputFile = encryptedOutputFile.encryptingOutputFile();
keyMetadata = encryptedOutputFile.keyMetadata();
}
Then L130 the close method is failing
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner
Metadata
Metadata
Assignees
Labels
IcebergIOIcebergIO: can only be used through ManagedIOIcebergIO: can only be used through ManagedIOP2awaiting triagebugflinkiojava