Skip to content

Commit 40b83ee

Browse files
committed
I/O: Adapter for Apache Iceberg
1 parent 991c1ca commit 40b83ee

File tree

5 files changed

+104
-4
lines changed

5 files changed

+104
-4
lines changed

cratedb_toolkit/cli.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
from .cmd.tail.cli import cli as tail_cli
1010
from .docs.cli import cli as docs_cli
1111
from .info.cli import cli as info_cli
12-
from .io.cli import cli as io_cli
12+
from .io.cli import cli_load as io_cli_load
13+
from .io.cli import cli_save as io_cli_save
1314
from .query.cli import cli as query_cli
1415
from .settings.cli import cli as settings_cli
1516
from .shell.cli import cli as shell_cli
@@ -30,7 +31,8 @@ def cli(ctx: click.Context, verbose: bool, debug: bool):
3031
cli.add_command(cfr_cli, name="cfr")
3132
cli.add_command(cloud_cli, name="cluster")
3233
cli.add_command(docs_cli, name="docs")
33-
cli.add_command(io_cli, name="load")
34+
cli.add_command(io_cli_load, name="load")
35+
cli.add_command(io_cli_save, name="save")
3436
cli.add_command(query_cli, name="query")
3537
cli.add_command(rockset_cli, name="rockset")
3638
cli.add_command(shell_cli, name="shell")

cratedb_toolkit/cluster/core.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
DatabaseAddressMissingError,
2121
OperationFailed,
2222
)
23+
from cratedb_toolkit.io.iceberg import from_iceberg, to_iceberg
2324
from cratedb_toolkit.model import ClusterAddressOptions, DatabaseAddress, InputOutputResource, TableAddress
2425
from cratedb_toolkit.util.client import jwt_token_patch
2526
from cratedb_toolkit.util.data import asbool
@@ -569,6 +570,9 @@ def load_table(
569570
else:
570571
raise NotImplementedError("Loading full data via Kinesis not implemented yet")
571572

573+
elif source_url_obj.scheme.startswith("iceberg") or source_url_obj.scheme.endswith("iceberg"):
574+
return from_iceberg(str(source_url_obj), target_url)
575+
572576
elif source_url_obj.scheme in ["file+bson", "http+bson", "https+bson", "mongodb", "mongodb+srv"]:
573577
if "+cdc" in source_url_obj.scheme:
574578
source_url_obj.scheme = source_url_obj.scheme.replace("+cdc", "")
@@ -599,6 +603,30 @@ def load_table(
599603

600604
return self
601605

606+
def save_table(
607+
self, source: TableAddress, target: InputOutputResource, transformation: t.Union[Path, None] = None
608+
) -> "StandaloneCluster":
609+
"""
610+
Export data from a database table on a standalone CrateDB Server.
611+
612+
Synopsis
613+
--------
614+
export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo
615+
616+
ctk load table influxdb2://example:token@localhost:8086/testdrive/demo
617+
ctk load table mongodb://localhost:27017/testdrive/demo
618+
"""
619+
source_url = self.address.dburi
620+
target_url_obj = URL(target.url)
621+
#source_url = source.url
622+
623+
if target_url_obj.scheme.startswith("iceberg") or target_url_obj.scheme.endswith("iceberg"):
624+
return to_iceberg(source_url, target.url)
625+
else:
626+
raise NotImplementedError(f"Exporting resource not implemented yet: {target_url_obj}")
627+
628+
return self
629+
602630

603631
class DatabaseCluster:
604632
"""

cratedb_toolkit/io/cli.py

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818
@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level")
1919
@click.version_option()
2020
@click.pass_context
21-
def cli(ctx: click.Context, verbose: bool, debug: bool):
21+
def cli_load(ctx: click.Context, verbose: bool, debug: bool):
2222
"""
2323
Load data into CrateDB.
2424
"""
2525
return boot_click(ctx, verbose, debug)
2626

2727

28-
@make_command(cli, name="table")
28+
@make_command(cli_load, name="table")
2929
@click.argument("url")
3030
@option_cluster_id
3131
@option_cluster_name
@@ -67,3 +67,60 @@ def load_table(
6767
cluster_url=cluster_url,
6868
)
6969
cluster.load_table(source=source, target=target, transformation=transformation)
70+
71+
72+
@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type]
73+
@click.option("--verbose", is_flag=True, required=False, help="Turn on logging")
74+
@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level")
75+
@click.version_option()
76+
@click.pass_context
77+
def cli_save(ctx: click.Context, verbose: bool, debug: bool):
78+
"""
79+
Export data from CrateDB.
80+
"""
81+
return boot_click(ctx, verbose, debug)
82+
83+
84+
@make_command(cli_save, name="table")
85+
@click.argument("url")
86+
@option_cluster_id
87+
@option_cluster_name
88+
@option_cluster_url
89+
@click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema where to import the data")
90+
@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data")
91+
@click.option("--format", "format_", type=str, required=False, help="File format of the import resource")
92+
@click.option("--compression", type=str, required=False, help="Compression format of the import resource")
93+
@click.option("--transformation", type=Path, required=False, help="Path to Zyp transformation file")
94+
@click.pass_context
95+
def save_table(
96+
ctx: click.Context,
97+
url: str,
98+
cluster_id: str,
99+
cluster_name: str,
100+
cluster_url: str,
101+
schema: str,
102+
table: str,
103+
format_: str,
104+
compression: str,
105+
transformation: t.Union[Path, None],
106+
):
107+
"""
108+
Export data from CrateDB and CrateDB Cloud clusters.
109+
"""
110+
111+
# When `--transformation` is given, but empty, fix it.
112+
if transformation is not None and transformation.name == "":
113+
transformation = None
114+
115+
# Encapsulate source and target parameters.
116+
source = TableAddress(schema=schema, table=table)
117+
target = InputOutputResource(url=url, format=format_, compression=compression)
118+
print("target:", target)
119+
120+
# Dispatch "load table" operation.
121+
cluster = DatabaseCluster.create(
122+
cluster_id=cluster_id,
123+
cluster_name=cluster_name,
124+
cluster_url=cluster_url,
125+
)
126+
cluster.save_table(source=source, target=target, transformation=transformation)

doc/io/iceberg/index.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
(iceberg)=
2+
# Apache Iceberg I/O
3+
4+
## About
5+
Import and export data into/from Iceberg tables, for humans and machines.
6+
7+
8+
```{toctree}
9+
:maxdepth: 1
10+
11+
loader
12+
```

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ optional-dependencies.io = [
173173
"fsspec[s3,http]",
174174
"pandas>=1,<2.3",
175175
"polars<1.30",
176+
"pyiceberg[pyarrow,sql-postgres]<0.10",
176177
"sqlalchemy>=2",
177178
"universal-pathlib<0.3",
178179
]

0 commit comments

Comments
 (0)