diff --git a/doc/source/getting_started/install.rst b/doc/source/getting_started/install.rst index 93663c1cced7e..1589fea5f8953 100644 --- a/doc/source/getting_started/install.rst +++ b/doc/source/getting_started/install.rst @@ -308,7 +308,7 @@ Dependency Minimum Version pip ex `zlib `__ hdf5 Compression for HDF5 `fastparquet `__ 2024.2.0 - Parquet reading / writing (pyarrow is default) `pyarrow `__ 10.0.1 parquet, feather Parquet, ORC, and feather reading / writing -`PyIceberg `__ 0.7.1 iceberg Apache Iceberg reading +`PyIceberg `__ 0.7.1 iceberg Apache Iceberg reading / writing `pyreadstat `__ 1.2.6 spss SPSS files (.sav) reading `odfpy `__ 1.4.1 excel Open document format (.odf, .ods, .odt) reading / writing ====================================================== ================== ================ ========================================================== diff --git a/doc/source/reference/io.rst b/doc/source/reference/io.rst index 6e5992916f800..37d9e7f6b7dbd 100644 --- a/doc/source/reference/io.rst +++ b/doc/source/reference/io.rst @@ -162,6 +162,7 @@ Iceberg :toctree: api/ read_iceberg + DataFrame.to_iceberg .. warning:: ``read_iceberg`` is experimental and may change without warning. diff --git a/doc/source/user_guide/io.rst b/doc/source/user_guide/io.rst index e5d43259033b3..25f1e11e6b603 100644 --- a/doc/source/user_guide/io.rst +++ b/doc/source/user_guide/io.rst @@ -29,7 +29,7 @@ The pandas I/O API is a set of top level ``reader`` functions accessed like binary,`HDF5 Format `__, :ref:`read_hdf`, :ref:`to_hdf` binary,`Feather Format `__, :ref:`read_feather`, :ref:`to_feather` binary,`Parquet Format `__, :ref:`read_parquet`, :ref:`to_parquet` - binary,`Apache Iceberg `__, :ref:`read_iceberg` , NA + binary,`Apache Iceberg `__, :ref:`read_iceberg` , :ref:`to_iceberg` binary,`ORC Format `__, :ref:`read_orc`, :ref:`to_orc` binary,`Stata `__, :ref:`read_stata`, :ref:`to_stata` binary,`SAS `__, :ref:`read_sas` , NA @@ -5417,7 +5417,7 @@ engines to safely work with the same tables at the same time. Iceberg support predicate pushdown and column pruning, which are available to pandas users via the ``row_filter`` and ``selected_fields`` parameters of the :func:`~pandas.read_iceberg` -function. This is convenient to extract from large tables a subset that fits in memory asa +function. This is convenient to extract from large tables a subset that fits in memory as a pandas ``DataFrame``. Internally, pandas uses PyIceberg_ to query Iceberg. @@ -5497,6 +5497,29 @@ parameter: Reading a particular snapshot is also possible providing the snapshot ID as an argument to ``snapshot_id``. +To save a ``DataFrame`` to Iceberg, it can be done with the :meth:`DataFrame.to_iceberg` +method: + +.. code-block:: python + + df.to_iceberg("my_table", catalog_name="my_catalog") + +To specify the catalog, it works in the same way as for :func:`read_iceberg` with the +``catalog_name`` and ``catalog_properties`` parameters. + +The location of the table can be specified with the ``location`` parameter: + +.. code-block:: python + + df.to_iceberg( + "my_table", + catalog_name="my_catalog", + location="s://my-data-lake/my-iceberg-tables", + ) + +It is possible to add properties to the table snapshot by passing a dictionary to the +``snapshot_properties`` parameter. + More information about the Iceberg format can be found in the `Apache Iceberg official page `__. diff --git a/doc/source/whatsnew/v3.0.0.rst b/doc/source/whatsnew/v3.0.0.rst index 9100ce0cdc990..575ed3829fe1c 100644 --- a/doc/source/whatsnew/v3.0.0.rst +++ b/doc/source/whatsnew/v3.0.0.rst @@ -79,7 +79,7 @@ Other enhancements - :py:class:`frozenset` elements in pandas objects are now natively printed (:issue:`60690`) - Add ``"delete_rows"`` option to ``if_exists`` argument in :meth:`DataFrame.to_sql` deleting all records of the table before inserting data (:issue:`37210`). - Added half-year offset classes :class:`HalfYearBegin`, :class:`HalfYearEnd`, :class:`BHalfYearBegin` and :class:`BHalfYearEnd` (:issue:`60928`) -- Added support to read from Apache Iceberg tables with the new :func:`read_iceberg` function (:issue:`61383`) +- Added support to read and write from and to Apache Iceberg tables with the new :func:`read_iceberg` and :meth:`DataFrame.to_iceberg` functions (:issue:`61383`) - Errors occurring during SQL I/O will now throw a generic :class:`.DatabaseError` instead of the raw Exception type from the underlying driver manager library (:issue:`60748`) - Implemented :meth:`Series.str.isascii` and :meth:`Series.str.isascii` (:issue:`59091`) - Improved deprecation message for offset aliases (:issue:`60820`) diff --git a/pandas/core/frame.py b/pandas/core/frame.py index bf919c6fe8a42..8053c17437c5e 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -3547,6 +3547,62 @@ def to_xml( return xml_formatter.write_output() + def to_iceberg( + self, + table_identifier: str, + catalog_name: str | None = None, + *, + catalog_properties: dict[str, Any] | None = None, + location: str | None = None, + append: bool = False, + snapshot_properties: dict[str, str] | None = None, + ) -> None: + """ + Write a DataFrame to an Apache Iceberg table. + + .. versionadded:: 3.0.0 + + .. warning:: + + to_iceberg is experimental and may change without warning. + + Parameters + ---------- + table_identifier : str + Table identifier. + catalog_name : str, optional + The name of the catalog. + catalog_properties : dict of {str: str}, optional + The properties that are used next to the catalog configuration. + location : str, optional + Location for the table. + append : bool, default False + If ``True``, append data to the table, instead of replacing the content. + snapshot_properties : dict of {str: str}, optional + Custom properties to be added to the snapshot summary + + See Also + -------- + read_iceberg : Read an Apache Iceberg table. + DataFrame.to_parquet : Write a DataFrame in Parquet format. + + Examples + -------- + >>> df = pd.DataFrame(data={"col1": [1, 2], "col2": [4, 3]}) + >>> df.to_iceberg("my_table", catalog_name="my_catalog") # doctest: +SKIP + """ + from pandas.io.iceberg import to_iceberg + + to_iceberg( + self, + table_identifier, + catalog_name, + catalog_properties=catalog_properties, + location=location, + append=append, + snapshot_properties=snapshot_properties, + ) + # ---------------------------------------------------------------------- @doc(INFO_DOCSTRING, **frame_sub_kwargs) def info( diff --git a/pandas/io/iceberg.py b/pandas/io/iceberg.py index 8a3e8f5da49b3..dcb675271031e 100644 --- a/pandas/io/iceberg.py +++ b/pandas/io/iceberg.py @@ -10,6 +10,7 @@ def read_iceberg( table_identifier: str, catalog_name: str | None = None, + *, catalog_properties: dict[str, Any] | None = None, row_filter: str | None = None, selected_fields: tuple[str] | None = None, @@ -21,6 +22,8 @@ def read_iceberg( """ Read an Apache Iceberg table into a pandas DataFrame. + .. versionadded:: 3.0.0 + .. warning:: read_iceberg is experimental and may change without warning. @@ -71,7 +74,6 @@ def read_iceberg( """ pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog") pyiceberg_expressions = import_optional_dependency("pyiceberg.expressions") - if catalog_properties is None: catalog_properties = {} catalog = pyiceberg_catalog.load_catalog(catalog_name, **catalog_properties) @@ -91,3 +93,59 @@ def read_iceberg( limit=limit, ) return result.to_pandas() + + +def to_iceberg( + df: DataFrame, + table_identifier: str, + catalog_name: str | None = None, + *, + catalog_properties: dict[str, Any] | None = None, + location: str | None = None, + append: bool = False, + snapshot_properties: dict[str, str] | None = None, +) -> None: + """ + Write a DataFrame to an Apache Iceberg table. + + .. versionadded:: 3.0.0 + + Parameters + ---------- + table_identifier : str + Table identifier. + catalog_name : str, optional + The name of the catalog. + catalog_properties : dict of {str: str}, optional + The properties that are used next to the catalog configuration. + location : str, optional + Location for the table. + append : bool, default False + If ``True``, append data to the table, instead of replacing the content. + snapshot_properties : dict of {str: str}, optional + Custom properties to be added to the snapshot summary + + See Also + -------- + read_iceberg : Read an Apache Iceberg table. + DataFrame.to_parquet : Write a DataFrame in Parquet format. + """ + pa = import_optional_dependency("pyarrow") + pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog") + if catalog_properties is None: + catalog_properties = {} + catalog = pyiceberg_catalog.load_catalog(catalog_name, **catalog_properties) + arrow_table = pa.Table.from_pandas(df) + table = catalog.create_table_if_not_exists( + identifier=table_identifier, + schema=arrow_table.schema, + location=location, + # we could add `partition_spec`, `sort_order` and `properties` in the + # future, but it may not be trivial without exposing PyIceberg objects + ) + if snapshot_properties is None: + snapshot_properties = {} + if append: + table.append(arrow_table, snapshot_properties=snapshot_properties) + else: + table.overwrite(arrow_table, snapshot_properties=snapshot_properties) diff --git a/pandas/tests/io/test_iceberg.py b/pandas/tests/io/test_iceberg.py index 765eccb602434..916c1d2af9b12 100644 --- a/pandas/tests/io/test_iceberg.py +++ b/pandas/tests/io/test_iceberg.py @@ -22,7 +22,7 @@ pyiceberg_catalog = pytest.importorskip("pyiceberg.catalog") pq = pytest.importorskip("pyarrow.parquet") -Catalog = collections.namedtuple("Catalog", ["name", "uri"]) +Catalog = collections.namedtuple("Catalog", ["name", "uri", "warehouse"]) @pytest.fixture @@ -58,7 +58,7 @@ def catalog(request, tmp_path): importlib.reload(pyiceberg_catalog) # needed to reload the config file - yield Catalog(name=catalog_name or "default", uri=uri) + yield Catalog(name=catalog_name or "default", uri=uri, warehouse=warehouse) if catalog_name is not None: config_path.unlink() @@ -141,3 +141,82 @@ def test_read_with_limit(self, catalog): limit=2, ) tm.assert_frame_equal(result, expected) + + def test_write(self, catalog): + df = pd.DataFrame( + { + "A": [1, 2, 3], + "B": ["foo", "foo", "foo"], + } + ) + df.to_iceberg( + "ns.new_table", + catalog_properties={"uri": catalog.uri}, + location=catalog.warehouse, + ) + result = read_iceberg( + "ns.new_table", + catalog_properties={"uri": catalog.uri}, + ) + tm.assert_frame_equal(result, df) + + @pytest.mark.parametrize("catalog", ["default", "pandas_tests"], indirect=True) + def test_write_by_catalog_name(self, catalog): + df = pd.DataFrame( + { + "A": [1, 2, 3], + "B": ["foo", "foo", "foo"], + } + ) + df.to_iceberg( + "ns.new_table", + catalog_name=catalog.name, + ) + result = read_iceberg( + "ns.new_table", + catalog_name=catalog.name, + ) + tm.assert_frame_equal(result, df) + + def test_write_existing_table_with_append_true(self, catalog): + original = read_iceberg( + "ns.my_table", + catalog_properties={"uri": catalog.uri}, + ) + new = pd.DataFrame( + { + "A": [1, 2, 3], + "B": ["foo", "foo", "foo"], + } + ) + expected = pd.concat([original, new], ignore_index=True) + new.to_iceberg( + "ns.my_table", + catalog_properties={"uri": catalog.uri}, + location=catalog.warehouse, + append=True, + ) + result = read_iceberg( + "ns.my_table", + catalog_properties={"uri": catalog.uri}, + ) + tm.assert_frame_equal(result, expected) + + def test_write_existing_table_with_append_false(self, catalog): + df = pd.DataFrame( + { + "A": [1, 2, 3], + "B": ["foo", "foo", "foo"], + } + ) + df.to_iceberg( + "ns.my_table", + catalog_properties={"uri": catalog.uri}, + location=catalog.warehouse, + append=False, + ) + result = read_iceberg( + "ns.my_table", + catalog_properties={"uri": catalog.uri}, + ) + tm.assert_frame_equal(result, df)