Skip to content

Commit 536a7d0

Browse files
fix(glue): Support create_table for S3 Tables federated databases (#3058)
1 parent 67cb0a9 commit 536a7d0

File tree

2 files changed

+275
-7
lines changed

2 files changed

+275
-7
lines changed

pyiceberg/catalog/glue.py

Lines changed: 148 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# under the License.
1717

1818

19+
import logging
1920
from typing import (
2021
TYPE_CHECKING,
2122
Any,
@@ -48,10 +49,10 @@
4849
NoSuchTableError,
4950
TableAlreadyExistsError,
5051
)
51-
from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_PROFILE_NAME, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
52+
from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_PROFILE_NAME, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, FileIO
5253
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
5354
from pyiceberg.schema import Schema, SchemaVisitor, visit
54-
from pyiceberg.serializers import FromInputFile
55+
from pyiceberg.serializers import FromInputFile, ToOutputFile
5556
from pyiceberg.table import (
5657
CommitTableResponse,
5758
Table,
@@ -122,13 +123,16 @@
122123
ICEBERG_FIELD_OPTIONAL = "iceberg.field.optional"
123124
ICEBERG_FIELD_CURRENT = "iceberg.field.current"
124125

126+
logger = logging.getLogger(__name__)
127+
125128
GLUE_PROFILE_NAME = "glue.profile-name"
126129
GLUE_REGION = "glue.region"
127130
GLUE_ACCESS_KEY_ID = "glue.access-key-id"
128131
GLUE_SECRET_ACCESS_KEY = "glue.secret-access-key"
129132
GLUE_SESSION_TOKEN = "glue.session-token"
130133
GLUE_MAX_RETRIES = "glue.max-retries"
131134
GLUE_RETRY_MODE = "glue.retry-mode"
135+
GLUE_CONNECTION_S3_TABLES = "aws:s3tables"
132136

133137
MAX_RETRIES = 10
134138
STANDARD_RETRY_MODE = "standard"
@@ -419,6 +423,121 @@ def _get_glue_table(self, database_name: str, table_name: str) -> "TableTypeDef"
419423
except self.glue.exceptions.EntityNotFoundException as e:
420424
raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e
421425

426+
def _is_s3tables_database(self, database_name: str) -> bool:
427+
"""Check if a Glue database is federated with S3 Tables.
428+
429+
S3 Tables databases have a FederatedDatabase property with
430+
ConnectionType set to aws:s3tables.
431+
432+
Args:
433+
database_name: The name of the Glue database.
434+
435+
Returns:
436+
True if the database is an S3 Tables federated database.
437+
"""
438+
try:
439+
database_response = self.glue.get_database(Name=database_name)
440+
except self.glue.exceptions.EntityNotFoundException:
441+
return False
442+
database = database_response["Database"]
443+
federated = database.get("FederatedDatabase", {})
444+
return federated.get("ConnectionType", "") == GLUE_CONNECTION_S3_TABLES
445+
446+
@staticmethod
447+
def _write_metadata_no_exist_check(metadata: TableMetadata, io: FileIO, metadata_path: str) -> None:
448+
ToOutputFile.table_metadata(metadata, io.new_output(metadata_path), overwrite=True)
449+
450+
def _create_table_s3tables(
451+
self,
452+
identifier: str | Identifier,
453+
schema: Union[Schema, "pa.Schema"],
454+
location: str | None,
455+
partition_spec: PartitionSpec,
456+
sort_order: SortOrder,
457+
properties: Properties,
458+
) -> Table:
459+
"""Create an Iceberg table in an S3 Tables federated database.
460+
461+
S3 Tables manages storage internally, so the table location is not known until the
462+
table is created in the service. This method:
463+
1. Creates a minimal table entry in Glue (format=ICEBERG), which causes S3 Tables
464+
to allocate storage.
465+
2. Retrieves the managed storage location via GetTable.
466+
3. Writes Iceberg metadata to that location.
467+
4. Updates the Glue table entry with the metadata pointer.
468+
469+
On failure, the table created in step 1 is deleted.
470+
"""
471+
database_name, table_name = self.identifier_to_database_and_table(identifier)
472+
473+
if location is not None:
474+
raise ValueError(
475+
f"Cannot specify a location for S3 Tables table {database_name}.{table_name}. "
476+
"S3 Tables manages the storage location automatically."
477+
)
478+
479+
# Create a minimal table in Glue so S3 Tables allocates storage.
480+
self._create_glue_table(
481+
database_name=database_name,
482+
table_name=table_name,
483+
table_input={
484+
"Name": table_name,
485+
"Parameters": {"format": "ICEBERG"},
486+
},
487+
)
488+
489+
try:
490+
# Retrieve the managed storage location.
491+
glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
492+
storage_descriptor = glue_table.get("StorageDescriptor", {})
493+
managed_location = storage_descriptor.get("Location")
494+
if not managed_location:
495+
raise ValueError(f"S3 Tables did not assign a storage location for {database_name}.{table_name}")
496+
497+
# Build the Iceberg metadata targeting the managed location.
498+
staged_table = self._create_staged_table(
499+
identifier=identifier,
500+
schema=schema,
501+
location=managed_location,
502+
partition_spec=partition_spec,
503+
sort_order=sort_order,
504+
properties=properties,
505+
)
506+
507+
# Write metadata and update the Glue table with the metadata pointer.
508+
# Skip the exist check before writing; S3 Tables doesn't support ListObjectsV2.
509+
self._write_metadata_no_exist_check(staged_table.metadata, staged_table.io, staged_table.metadata_location)
510+
table_input = _construct_table_input(table_name, staged_table.metadata_location, properties, staged_table.metadata)
511+
version_id = glue_table.get("VersionId")
512+
if not version_id:
513+
raise CommitFailedException(
514+
f"Cannot commit {database_name}.{table_name} because Glue table version id is missing"
515+
)
516+
self._update_glue_table(
517+
database_name=database_name,
518+
table_name=table_name,
519+
table_input=table_input,
520+
version_id=version_id,
521+
)
522+
except Exception:
523+
# Clean up the table created in step 1.
524+
try:
525+
self.glue.delete_table(DatabaseName=database_name, Name=table_name)
526+
except Exception:
527+
logger.warning(
528+
f"Failed to clean up S3 Tables table {database_name}.{table_name}",
529+
exc_info=logger.isEnabledFor(logging.DEBUG),
530+
)
531+
raise
532+
533+
return Table(
534+
identifier=self.identifier_to_tuple(identifier),
535+
metadata=staged_table.metadata,
536+
metadata_location=staged_table.metadata_location,
537+
io=self._load_file_io(staged_table.metadata.properties, staged_table.metadata_location),
538+
catalog=self,
539+
)
540+
422541
def create_table(
423542
self,
424543
identifier: str | Identifier,
@@ -435,6 +554,7 @@ def create_table(
435554
identifier: Table identifier.
436555
schema: Table's schema.
437556
location: Location for the table. Optional Argument.
557+
Must not be set for S3 Tables, which manage their own storage.
438558
partition_spec: PartitionSpec for the table.
439559
sort_order: SortOrder for the table.
440560
properties: Table properties that can be a string based dictionary.
@@ -444,9 +564,22 @@ def create_table(
444564
445565
Raises:
446566
AlreadyExistsError: If a table with the name already exists.
447-
ValueError: If the identifier is invalid, or no path is given to store metadata.
567+
ValueError: If the identifier is invalid, no path is given to store metadata,
568+
or a location is specified for an S3 Tables table.
448569
449570
"""
571+
database_name, table_name = self.identifier_to_database_and_table(identifier)
572+
573+
if self._is_s3tables_database(database_name):
574+
return self._create_table_s3tables(
575+
identifier=identifier,
576+
schema=schema,
577+
location=location,
578+
partition_spec=partition_spec,
579+
sort_order=sort_order,
580+
properties=properties,
581+
)
582+
450583
staged_table = self._create_staged_table(
451584
identifier=identifier,
452585
schema=schema,
@@ -455,13 +588,18 @@ def create_table(
455588
sort_order=sort_order,
456589
properties=properties,
457590
)
458-
database_name, table_name = self.identifier_to_database_and_table(identifier)
459591

460592
self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location)
461593
table_input = _construct_table_input(table_name, staged_table.metadata_location, properties, staged_table.metadata)
462594
self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input)
463595

464-
return self.load_table(identifier=identifier)
596+
return Table(
597+
identifier=self.identifier_to_tuple(identifier),
598+
metadata=staged_table.metadata,
599+
metadata_location=staged_table.metadata_location,
600+
io=self._load_file_io(staged_table.metadata.properties, staged_table.metadata_location),
601+
catalog=self,
602+
)
465603

466604
def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table:
467605
"""Register a new table using existing metadata.
@@ -521,7 +659,11 @@ def commit_table(
521659
if current_table and updated_staged_table.metadata == current_table.metadata:
522660
# no changes, do nothing
523661
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
524-
self._write_metadata(
662+
# S3 Tables managed storage doesn't support ListObjectsV2, so skip the exist check.
663+
write_metadata = (
664+
self._write_metadata_no_exist_check if self._is_s3tables_database(database_name) else self._write_metadata
665+
)
666+
write_metadata(
525667
metadata=updated_staged_table.metadata,
526668
io=updated_staged_table.io,
527669
metadata_path=updated_staged_table.metadata_location,

tests/catalog/test_glue.py

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import pytest
2222
from moto import mock_aws
2323

24-
from pyiceberg.catalog.glue import GlueCatalog
24+
from pyiceberg.catalog.glue import GLUE_CONNECTION_S3_TABLES, GlueCatalog
2525
from pyiceberg.exceptions import (
2626
NamespaceAlreadyExistsError,
2727
NamespaceNotEmptyError,
@@ -43,6 +43,57 @@
4343
UNIFIED_AWS_SESSION_PROPERTIES,
4444
)
4545

46+
S3TABLES_WAREHOUSE_LOCATION = "s3tables-warehouse-location"
47+
48+
49+
def _patch_moto_for_s3tables(monkeypatch: pytest.MonkeyPatch) -> None:
50+
"""Patch moto to simulate S3 Tables federated databases.
51+
52+
Moto does not support FederatedDatabase on GetDatabase responses or
53+
auto-populating StorageDescriptor.Location for S3 Tables. These patches
54+
simulate the S3 Tables service behavior so that the GlueCatalog S3 Tables
55+
code path can be tested end-to-end with moto.
56+
"""
57+
from moto.glue.models import FakeDatabase, FakeTable
58+
59+
# Patch 1: Make GetDatabase return FederatedDatabase from the stored input.
60+
_original_db_as_dict = FakeDatabase.as_dict
61+
62+
def _db_as_dict_with_federated(self): # type: ignore
63+
result = _original_db_as_dict(self)
64+
if federated := self.input.get("FederatedDatabase"):
65+
result["FederatedDatabase"] = federated
66+
return result
67+
68+
monkeypatch.setattr(FakeDatabase, "as_dict", _db_as_dict_with_federated)
69+
70+
# Patch 2: When a table is created with format=ICEBERG (the S3 Tables convention),
71+
# inject a StorageDescriptor.Location to simulate S3 Tables vending a table
72+
# warehouse location.
73+
_original_table_init = FakeTable.__init__
74+
75+
def _table_init_with_location(self, database_name, table_name, table_input, catalog_id): # type: ignore
76+
if table_input.get("Parameters", {}).get("format") == "ICEBERG" and "StorageDescriptor" not in table_input:
77+
table_input = {
78+
**table_input,
79+
"StorageDescriptor": {
80+
"Columns": [],
81+
"Location": f"s3://{S3TABLES_WAREHOUSE_LOCATION}/{database_name}/{table_name}/",
82+
"InputFormat": "",
83+
"OutputFormat": "",
84+
"SerdeInfo": {},
85+
},
86+
}
87+
_original_table_init(self, database_name, table_name, table_input, catalog_id)
88+
89+
monkeypatch.setattr(FakeTable, "__init__", _table_init_with_location)
90+
91+
# Create a bucket backing the simulated table warehouse location. S3 Tables manages
92+
# this storage internally, but in tests moto needs a real bucket for metadata file
93+
# writes to succeed.
94+
s3 = boto3.client("s3", region_name="us-east-1")
95+
s3.create_bucket(Bucket=S3TABLES_WAREHOUSE_LOCATION)
96+
4697

4798
@mock_aws
4899
def test_create_table_with_database_location(
@@ -953,3 +1004,78 @@ def test_glue_client_override() -> None:
9531004
test_client = boto3.client("glue", region_name="us-west-2")
9541005
test_catalog = GlueCatalog(catalog_name, test_client)
9551006
assert test_catalog.glue is test_client
1007+
1008+
1009+
def _create_s3tables_database(catalog: GlueCatalog, database_name: str) -> None:
1010+
"""Create a Glue database with S3 Tables federation metadata."""
1011+
catalog.glue.create_database(
1012+
DatabaseInput={
1013+
"Name": database_name,
1014+
"FederatedDatabase": {
1015+
"Identifier": "arn:aws:s3tables:us-east-1:123456789012:bucket/my-bucket",
1016+
"ConnectionType": GLUE_CONNECTION_S3_TABLES,
1017+
},
1018+
}
1019+
)
1020+
1021+
1022+
@mock_aws
1023+
def test_create_table_s3tables(
1024+
monkeypatch: pytest.MonkeyPatch,
1025+
_bucket_initialize: None,
1026+
moto_endpoint_url: str,
1027+
table_schema_nested: Schema,
1028+
database_name: str,
1029+
table_name: str,
1030+
) -> None:
1031+
_patch_moto_for_s3tables(monkeypatch)
1032+
1033+
identifier = (database_name, table_name)
1034+
test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url})
1035+
_create_s3tables_database(test_catalog, database_name)
1036+
1037+
table = test_catalog.create_table(identifier, table_schema_nested)
1038+
assert table.name() == identifier
1039+
assert table.location() == f"s3://{S3TABLES_WAREHOUSE_LOCATION}/{database_name}/{table_name}"
1040+
assert table.metadata_location.startswith(f"s3://{S3TABLES_WAREHOUSE_LOCATION}/{database_name}/{table_name}/metadata/00000-")
1041+
assert table.metadata_location.endswith(".metadata.json")
1042+
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
1043+
1044+
1045+
@mock_aws
1046+
def test_create_table_s3tables_rejects_location(
1047+
monkeypatch: pytest.MonkeyPatch,
1048+
_bucket_initialize: None,
1049+
moto_endpoint_url: str,
1050+
table_schema_nested: Schema,
1051+
database_name: str,
1052+
table_name: str,
1053+
) -> None:
1054+
_patch_moto_for_s3tables(monkeypatch)
1055+
1056+
identifier = (database_name, table_name)
1057+
test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url})
1058+
_create_s3tables_database(test_catalog, database_name)
1059+
1060+
with pytest.raises(ValueError, match="Cannot specify a location for S3 Tables table"):
1061+
test_catalog.create_table(identifier, table_schema_nested, location="s3://some-bucket/some-path")
1062+
1063+
1064+
@mock_aws
1065+
def test_create_table_s3tables_duplicate(
1066+
monkeypatch: pytest.MonkeyPatch,
1067+
_bucket_initialize: None,
1068+
moto_endpoint_url: str,
1069+
table_schema_nested: Schema,
1070+
database_name: str,
1071+
table_name: str,
1072+
) -> None:
1073+
_patch_moto_for_s3tables(monkeypatch)
1074+
1075+
identifier = (database_name, table_name)
1076+
test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url})
1077+
_create_s3tables_database(test_catalog, database_name)
1078+
1079+
test_catalog.create_table(identifier, table_schema_nested)
1080+
with pytest.raises(TableAlreadyExistsError):
1081+
test_catalog.create_table(identifier, table_schema_nested)

0 commit comments

Comments
 (0)