Skip to content

Commit 9f7c3ae

Browse files
committed
I/O: Adapter for Apache Iceberg
1 parent 991c1ca commit 9f7c3ae

File tree

6 files changed

+225
-4
lines changed

6 files changed

+225
-4
lines changed

cratedb_toolkit/cli.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
from .cmd.tail.cli import cli as tail_cli
1010
from .docs.cli import cli as docs_cli
1111
from .info.cli import cli as info_cli
12-
from .io.cli import cli as io_cli
12+
from .io.cli import cli_load as io_cli_load
13+
from .io.cli import cli_save as io_cli_save
1314
from .query.cli import cli as query_cli
1415
from .settings.cli import cli as settings_cli
1516
from .shell.cli import cli as shell_cli
@@ -30,7 +31,8 @@ def cli(ctx: click.Context, verbose: bool, debug: bool):
3031
cli.add_command(cfr_cli, name="cfr")
3132
cli.add_command(cloud_cli, name="cluster")
3233
cli.add_command(docs_cli, name="docs")
33-
cli.add_command(io_cli, name="load")
34+
cli.add_command(io_cli_load, name="load")
35+
cli.add_command(io_cli_save, name="save")
3436
cli.add_command(query_cli, name="query")
3537
cli.add_command(rockset_cli, name="rockset")
3638
cli.add_command(shell_cli, name="shell")

cratedb_toolkit/cluster/core.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
DatabaseAddressMissingError,
2121
OperationFailed,
2222
)
23+
from cratedb_toolkit.io.iceberg import from_iceberg, to_iceberg
2324
from cratedb_toolkit.model import ClusterAddressOptions, DatabaseAddress, InputOutputResource, TableAddress
2425
from cratedb_toolkit.util.client import jwt_token_patch
2526
from cratedb_toolkit.util.data import asbool
@@ -569,6 +570,9 @@ def load_table(
569570
else:
570571
raise NotImplementedError("Loading full data via Kinesis not implemented yet")
571572

573+
elif source_url_obj.scheme.startswith("iceberg") or source_url_obj.scheme.endswith("iceberg"):
574+
return from_iceberg(str(source_url_obj), target_url)
575+
572576
elif source_url_obj.scheme in ["file+bson", "http+bson", "https+bson", "mongodb", "mongodb+srv"]:
573577
if "+cdc" in source_url_obj.scheme:
574578
source_url_obj.scheme = source_url_obj.scheme.replace("+cdc", "")
@@ -599,6 +603,30 @@ def load_table(
599603

600604
return self
601605

606+
def save_table(
607+
self, source: TableAddress, target: InputOutputResource, transformation: t.Union[Path, None] = None
608+
) -> "StandaloneCluster":
609+
"""
610+
Export data from a database table on a standalone CrateDB Server.
611+
612+
Synopsis
613+
--------
614+
export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo
615+
616+
ctk load table influxdb2://example:token@localhost:8086/testdrive/demo
617+
ctk load table mongodb://localhost:27017/testdrive/demo
618+
"""
619+
source_url = self.address.dburi
620+
target_url_obj = URL(target.url)
621+
#source_url = source.url
622+
623+
if target_url_obj.scheme.startswith("iceberg") or target_url_obj.scheme.endswith("iceberg"):
624+
return to_iceberg(source_url, target.url)
625+
else:
626+
raise NotImplementedError(f"Exporting resource not implemented yet: {target_url_obj}")
627+
628+
return self
629+
602630

603631
class DatabaseCluster:
604632
"""

cratedb_toolkit/io/cli.py

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818
@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level")
1919
@click.version_option()
2020
@click.pass_context
21-
def cli(ctx: click.Context, verbose: bool, debug: bool):
21+
def cli_load(ctx: click.Context, verbose: bool, debug: bool):
2222
"""
2323
Load data into CrateDB.
2424
"""
2525
return boot_click(ctx, verbose, debug)
2626

2727

28-
@make_command(cli, name="table")
28+
@make_command(cli_load, name="table")
2929
@click.argument("url")
3030
@option_cluster_id
3131
@option_cluster_name
@@ -67,3 +67,60 @@ def load_table(
6767
cluster_url=cluster_url,
6868
)
6969
cluster.load_table(source=source, target=target, transformation=transformation)
70+
71+
72+
@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type]
73+
@click.option("--verbose", is_flag=True, required=False, help="Turn on logging")
74+
@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level")
75+
@click.version_option()
76+
@click.pass_context
77+
def cli_save(ctx: click.Context, verbose: bool, debug: bool):
78+
"""
79+
Export data from CrateDB.
80+
"""
81+
return boot_click(ctx, verbose, debug)
82+
83+
84+
@make_command(cli_save, name="table")
85+
@click.argument("url")
86+
@option_cluster_id
87+
@option_cluster_name
88+
@option_cluster_url
89+
@click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema where to import the data")
90+
@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data")
91+
@click.option("--format", "format_", type=str, required=False, help="File format of the import resource")
92+
@click.option("--compression", type=str, required=False, help="Compression format of the import resource")
93+
@click.option("--transformation", type=Path, required=False, help="Path to Zyp transformation file")
94+
@click.pass_context
95+
def save_table(
96+
ctx: click.Context,
97+
url: str,
98+
cluster_id: str,
99+
cluster_name: str,
100+
cluster_url: str,
101+
schema: str,
102+
table: str,
103+
format_: str,
104+
compression: str,
105+
transformation: t.Union[Path, None],
106+
):
107+
"""
108+
Export data from CrateDB and CrateDB Cloud clusters.
109+
"""
110+
111+
# When `--transformation` is given, but empty, fix it.
112+
if transformation is not None and transformation.name == "":
113+
transformation = None
114+
115+
# Encapsulate source and target parameters.
116+
source = TableAddress(schema=schema, table=table)
117+
target = InputOutputResource(url=url, format=format_, compression=compression)
118+
print("target:", target)
119+
120+
# Dispatch "load table" operation.
121+
cluster = DatabaseCluster.create(
122+
cluster_id=cluster_id,
123+
cluster_name=cluster_name,
124+
cluster_url=cluster_url,
125+
)
126+
cluster.save_table(source=source, target=target, transformation=transformation)

cratedb_toolkit/io/iceberg.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import dataclasses
2+
import logging
3+
import sqlalchemy as sa
4+
import polars as pl
5+
import pyarrow.parquet as pq
6+
from boltons.urlutils import URL
7+
from pyiceberg.catalog import load_catalog, Catalog
8+
from sqlalchemy_cratedb import insert_bulk
9+
10+
from cratedb_toolkit.model import DatabaseAddress
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
CHUNK_SIZE = 75_000
16+
17+
18+
@dataclasses.dataclass
19+
class IcebergAddress:
20+
path: str
21+
catalog: str
22+
table: str
23+
24+
@classmethod
25+
def from_url(cls, url: str):
26+
iceberg_url = URL(url)
27+
if iceberg_url.host == ".":
28+
iceberg_url.path = iceberg_url.path.lstrip("/")
29+
return cls(path=iceberg_url.path, catalog=iceberg_url.query_params.get("catalog"), table=iceberg_url.query_params.get("table"))
30+
31+
def load_catalog(self) -> Catalog:
32+
return load_catalog(
33+
self.catalog,
34+
**{
35+
'type': 'sql',
36+
"uri": f"sqlite:///{self.path}/pyiceberg_catalog.db",
37+
"warehouse": f"file://{self.path}",
38+
},
39+
)
40+
41+
@property
42+
def identifier(self):
43+
return (self.catalog, self.table)
44+
45+
def load_table(self) -> pl.LazyFrame:
46+
if self.catalog is not None:
47+
catalog = self.load_catalog()
48+
return catalog.load_table(self.identifier).to_polars()
49+
else:
50+
return pl.scan_iceberg(self.path)
51+
52+
53+
def from_iceberg(source_url, cratedb_url, progress: bool = False):
54+
"""
55+
Scan an Iceberg table from local filesystem or object store, and load into CrateDB.
56+
https://docs.pola.rs/api/python/stable/reference/api/polars.scan_iceberg.html
57+
58+
Synopsis
59+
--------
60+
export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo
61+
ctk load table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json"
62+
ctk load table "iceberg://./var/lib/iceberg/?catalog=default&table=taxi_dataset"
63+
"""
64+
65+
iceberg_address = IcebergAddress.from_url(source_url)
66+
67+
# Parse parameters.
68+
logger.info(f"Iceberg address: Path: {iceberg_address.path}, catalog: {iceberg_address.catalog}, table: {iceberg_address.table}")
69+
70+
cratedb_address = DatabaseAddress.from_string(cratedb_url)
71+
cratedb_url, cratedb_table = cratedb_address.decode()
72+
if cratedb_table.table is None:
73+
raise ValueError("Table name is missing. Please adjust CrateDB database URL.")
74+
logger.info(f"Target address: {cratedb_address}")
75+
76+
# Invoke copy operation.
77+
logger.info("Running Iceberg copy")
78+
engine = sa.create_engine(str(cratedb_url))
79+
80+
pl.Config.set_streaming_chunk_size(CHUNK_SIZE)
81+
table = iceberg_address.load_table()
82+
83+
# This conversion to pandas is zero-copy,
84+
# so we can utilize their SQL utils for free.
85+
# https://github.com/pola-rs/polars/issues/7852
86+
# Note: This code also uses the most efficient `insert_bulk` method with CrateDB.
87+
# https://cratedb.com/docs/sqlalchemy-cratedb/dataframe.html#efficient-insert-operations-with-pandas
88+
table.collect(streaming=True).to_pandas().to_sql(
89+
name=cratedb_table.table, schema=cratedb_table.schema, con=engine, if_exists="replace", index=False, chunksize=CHUNK_SIZE, method=insert_bulk,
90+
)
91+
92+
# Note: This was much slower.
93+
# table.to_polars().collect(streaming=True).write_database(table_name=table_address.fullname, connection=engine, if_table_exists="replace")
94+
95+
96+
def to_iceberg(source_url, target_url, progress: bool = False):
97+
"""
98+
Synopsis
99+
--------
100+
export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo
101+
ctk load table "iceberg://./var/lib/iceberg/?catalog=default&table=taxi_dataset"
102+
ctk save table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json"
103+
"""
104+
105+
iceberg_address = IcebergAddress.from_url(target_url)
106+
catalog = iceberg_address.load_catalog()
107+
print("catalog:", catalog)
108+
109+
# https://py.iceberg.apache.org/#write-a-pyarrow-dataframe
110+
df = pq.read_table("tmp/yellow_tripdata_2023-01.parquet")
111+
112+
# Create a new Iceberg table.
113+
catalog.create_namespace_if_not_exists("default")
114+
table = catalog.create_table_if_not_exists(
115+
"default.taxi_dataset",
116+
schema=df.schema,
117+
)
118+
119+
# Append the dataframe to the table.
120+
table.append(df)
121+
len(table.scan().to_arrow())

doc/io/iceberg/index.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
(iceberg)=
2+
# Apache Iceberg I/O
3+
4+
## About
5+
Import and export data into/from Iceberg tables, for humans and machines.
6+
7+
8+
```{toctree}
9+
:maxdepth: 1
10+
11+
loader
12+
```

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ optional-dependencies.io = [
173173
"fsspec[s3,http]",
174174
"pandas>=1,<2.3",
175175
"polars<1.30",
176+
"pyiceberg[pyarrow,sql-postgres]<0.10",
176177
"sqlalchemy>=2",
177178
"universal-pathlib<0.3",
178179
]

0 commit comments

Comments
 (0)