From eb01aa6f7c24b4509c29eb3c6e94eb530b47c2ce Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sun, 23 Jun 2024 14:42:43 +0200 Subject: [PATCH 1/5] Initial Implementation of dask-ee. Fixes #1. --- dask_ee/__init__.py | 2 +- dask_ee/default_test.py | 14 --------- dask_ee/read.py | 66 +++++++++++++++++++++++++++++++++++++++++ dask_ee/read_test.py | 14 +++++++++ dask_ee/write.py | 5 ++++ pyproject.toml | 1 + 6 files changed, 87 insertions(+), 15 deletions(-) delete mode 100644 dask_ee/default_test.py create mode 100644 dask_ee/read.py create mode 100644 dask_ee/read_test.py create mode 100644 dask_ee/write.py diff --git a/dask_ee/__init__.py b/dask_ee/__init__.py index f2166d7..677febd 100644 --- a/dask_ee/__init__.py +++ b/dask_ee/__init__.py @@ -1 +1 @@ -# TODO(alxmrs): https://docs.google.com/document/d/1Ltl6XrZ_uGD2J7OW1roUsxhpFxx5QvNeInjuONIkmL8/edit#heading=h.uj7plawe7x7e +from .read import read_ee diff --git a/dask_ee/default_test.py b/dask_ee/default_test.py deleted file mode 100644 index 722e4f9..0000000 --- a/dask_ee/default_test.py +++ /dev/null @@ -1,14 +0,0 @@ -import unittest - - -class ImportTest(unittest.TestCase): - - def test_can_import_module(self): - try: - import dask_ee - except ModuleNotFoundError: - self.fail('Cannot import `dask_ee`.') - - -if __name__ == '__main__': - unittest.main() diff --git a/dask_ee/read.py b/dask_ee/read.py new file mode 100644 index 0000000..dbc8fa3 --- /dev/null +++ b/dask_ee/read.py @@ -0,0 +1,66 @@ +# Implementation heavily inspired by @aazuspan via +# https://medium.com/@aa.zuspan/parallelizing-earth-engine-feature-collections-with-dask-bc6cdf9e2f48 +# Taken with permission. +# Independent equivalent design was drawn up here: +# https://docs.google.com/document/d/1Ltl6XrZ_uGD2J7OW1roUsxhpFxx5QvNeInjuONIkmL8/edit +import typing as t + +import dask.dataframe as dd +import ee +import numpy as np +import pandas as pd + +# Order is in appearance of types in the EE documentation. This looks alphabetical. +_BUILTIN_DTYPES = { + 'byte': np.uint8, + 'double': np.float64, + 'float': np.float32, + 'int': np.int32, + 'int16': np.int16, + 'int32': np.int32, + 'int64': np.int64, + 'int8': np.int8, + 'long': np.int64, + 'short': np.int16, + 'uint16': np.uint16, + 'uint32': np.uint32, + 'uint8': np.uint8, + 'string': np.str_, +} + + +# TODO(#4): Support 'auto' chunks, where we calculate the maximum allowed page size given the number of +# bytes in each row. +def read_ee( + fc: ee.FeatureCollection, io_chunks: t.Union[int, t.Literal['auto']] = 5_000 +) -> dd.DataFrame: + + if io_chunks == 'auto': + raise NotImplementedError('Auto io_chunks are not implemented yet!') + + fc_size, all_info = ee.List([fc.size(), fc]).getInfo() + columns = all_info['columns'] + + # TODO(#5): Compare `toList()` to other range operations, like getting all index IDs via `getInfo()`. + pages = [ + ee.FeatureCollection(fc.toList(io_chunks, i)) + for i in range(0, fc_size, io_chunks) + ] + + def to_df(page: ee.FeatureCollection) -> pd.DataFrame: + return ee.data.computeFeatures( + { + 'expression': page, + 'fileFormat': 'PANDAS_DATAFRAME', + } + ) + + meta = {k: _BUILTIN_DTYPES[v.lower()] for k, v in columns.items()} + divisions = tuple(range(0, fc_size, io_chunks)) + + return dd.from_map( + to_df, + pages, + meta=meta, + divisions=divisions, + ) diff --git a/dask_ee/read_test.py b/dask_ee/read_test.py new file mode 100644 index 0000000..1e4b17f --- /dev/null +++ b/dask_ee/read_test.py @@ -0,0 +1,14 @@ +import unittest + + +class ReadFeatureCollections(unittest.TestCase): + + def test_can_import_read_op(self): + try: + from dask_ee import read_ee + except ModuleNotFoundError: + self.fail('Cannot import `read_ee` function.') + + +if __name__ == '__main__': + unittest.main() diff --git a/dask_ee/write.py b/dask_ee/write.py new file mode 100644 index 0000000..afd98ca --- /dev/null +++ b/dask_ee/write.py @@ -0,0 +1,5 @@ +# TODO(alxmrs): This is open for design and implementation. + + +def to_ee(ddf, *args, **kwargs): + raise NotImplementedError('This has not yet been designed.') diff --git a/pyproject.toml b/pyproject.toml index 7cf5ce7..118fa77 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ classifiers = [ ] dependencies = [ "earthengine-api>=0.1.374", + "pandas", "dask", ] From ad2b2bd48fdc352d9e90704ee324c41724ed6f8b Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sun, 23 Jun 2024 14:59:43 +0200 Subject: [PATCH 2/5] Basic reads work without `meta`. --- dask_ee/read.py | 8 ++++--- dask_ee/read_integrationtest.py | 39 +++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 3 deletions(-) create mode 100644 dask_ee/read_integrationtest.py diff --git a/dask_ee/read.py b/dask_ee/read.py index dbc8fa3..d0ff809 100644 --- a/dask_ee/read.py +++ b/dask_ee/read.py @@ -20,6 +20,7 @@ 'int32': np.int32, 'int64': np.int64, 'int8': np.int8, + 'json': dict, 'long': np.int64, 'short': np.int16, 'uint16': np.uint16, @@ -39,7 +40,8 @@ def read_ee( raise NotImplementedError('Auto io_chunks are not implemented yet!') fc_size, all_info = ee.List([fc.size(), fc]).getInfo() - columns = all_info['columns'] + columns = {'geo': 'json'} + columns.update(all_info['columns']) # TODO(#5): Compare `toList()` to other range operations, like getting all index IDs via `getInfo()`. pages = [ @@ -55,12 +57,12 @@ def to_df(page: ee.FeatureCollection) -> pd.DataFrame: } ) - meta = {k: _BUILTIN_DTYPES[v.lower()] for k, v in columns.items()} + # TODO(alxmrs): Support dask dataframe `meta` via columns. + # meta = {k: _BUILTIN_DTYPES[v.lower()] for k, v in columns.items()} divisions = tuple(range(0, fc_size, io_chunks)) return dd.from_map( to_df, pages, - meta=meta, divisions=divisions, ) diff --git a/dask_ee/read_integrationtest.py b/dask_ee/read_integrationtest.py new file mode 100644 index 0000000..d2c87cb --- /dev/null +++ b/dask_ee/read_integrationtest.py @@ -0,0 +1,39 @@ +"""Integration tests with Google Earth Engine. + +Before running, please authenticate: +``` +earthengine authenticate +``` + +""" +import unittest + +import dask.dataframe as dd +import ee + +import dask_ee + + +class ReadIntegrationTests(unittest.TestCase): + + @classmethod + def setUpClass(cls): + ee.Initialize() + + def test_reads_dask_dataframe(self): + fc = ee.FeatureCollection("WRI/GPPD/power_plants") + ddf = dask_ee.read_ee(fc) + head = ddf.head() + columns = ddf.columns + + print(columns) + + self.assertIsNotNone(ddf) + self.assertIsNotNone(head) + self.assertIsInstance(ddf, dd.DataFrame) + + print(head) + + +if __name__ == '__main__': + unittest.main() From bb801a8aa839387ff1a1b40fc85f78eb172f639b Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sun, 23 Jun 2024 16:06:34 +0200 Subject: [PATCH 3/5] Meta is correct. Further, now getting all the rows. --- dask_ee/read.py | 60 +++++++++++++++++---------------- dask_ee/read_integrationtest.py | 8 +++-- 2 files changed, 36 insertions(+), 32 deletions(-) diff --git a/dask_ee/read.py b/dask_ee/read.py index d0ff809..6e111e6 100644 --- a/dask_ee/read.py +++ b/dask_ee/read.py @@ -1,8 +1,4 @@ -# Implementation heavily inspired by @aazuspan via -# https://medium.com/@aa.zuspan/parallelizing-earth-engine-feature-collections-with-dask-bc6cdf9e2f48 -# Taken with permission. -# Independent equivalent design was drawn up here: -# https://docs.google.com/document/d/1Ltl6XrZ_uGD2J7OW1roUsxhpFxx5QvNeInjuONIkmL8/edit +# Special thanks to @aazuspan for help with the implementation import typing as t import dask.dataframe as dd @@ -12,21 +8,21 @@ # Order is in appearance of types in the EE documentation. This looks alphabetical. _BUILTIN_DTYPES = { - 'byte': np.uint8, - 'double': np.float64, - 'float': np.float32, - 'int': np.int32, - 'int16': np.int16, - 'int32': np.int32, - 'int64': np.int64, - 'int8': np.int8, - 'json': dict, - 'long': np.int64, - 'short': np.int16, - 'uint16': np.uint16, - 'uint32': np.uint32, - 'uint8': np.uint8, - 'string': np.str_, + 'Byte': np.uint8, + 'Double': np.float64, + 'Float': np.float32, + 'Int': np.int32, + 'Int16': np.int16, + 'Int32': np.int32, + 'Int64': np.int64, + 'Int8': np.int8, + 'Json': np.object_, # added to handle GeoJSON columns. + 'Long': np.int64, + 'Short': np.int16, + 'Uint16': np.uint16, + 'Uint32': np.uint32, + 'Uint8': np.uint8, + 'String': np.str_, } @@ -37,17 +33,24 @@ def read_ee( ) -> dd.DataFrame: if io_chunks == 'auto': - raise NotImplementedError('Auto io_chunks are not implemented yet!') + raise NotImplementedError('Auto `io_chunks` are not implemented yet!') + # Make all the getInfo() calls at once, up front. fc_size, all_info = ee.List([fc.size(), fc]).getInfo() - columns = {'geo': 'json'} + + columns = {'geo': 'Json'} columns.update(all_info['columns']) + del columns['system:index'] + + divisions = tuple(range(0, fc_size, io_chunks)) # TODO(#5): Compare `toList()` to other range operations, like getting all index IDs via `getInfo()`. - pages = [ - ee.FeatureCollection(fc.toList(io_chunks, i)) - for i in range(0, fc_size, io_chunks) - ] + pages = [ee.FeatureCollection(fc.toList(io_chunks, i)) for i in divisions] + # Get the remainder, if it exists. `io_chunks` are not likely to evenly partition the data. + d, r = divmod(fc_size, io_chunks) + if r != 0: + pages.append(ee.FeatureCollection(fc.toList(r, d))) + divisions += (fc_size,) def to_df(page: ee.FeatureCollection) -> pd.DataFrame: return ee.data.computeFeatures( @@ -57,12 +60,11 @@ def to_df(page: ee.FeatureCollection) -> pd.DataFrame: } ) - # TODO(alxmrs): Support dask dataframe `meta` via columns. - # meta = {k: _BUILTIN_DTYPES[v.lower()] for k, v in columns.items()} - divisions = tuple(range(0, fc_size, io_chunks)) + meta = {k: _BUILTIN_DTYPES[v] for k, v in columns.items()} return dd.from_map( to_df, pages, + meta=meta, divisions=divisions, ) diff --git a/dask_ee/read_integrationtest.py b/dask_ee/read_integrationtest.py index d2c87cb..7d34adb 100644 --- a/dask_ee/read_integrationtest.py +++ b/dask_ee/read_integrationtest.py @@ -6,6 +6,7 @@ ``` """ + import unittest import dask.dataframe as dd @@ -23,17 +24,18 @@ def setUpClass(cls): def test_reads_dask_dataframe(self): fc = ee.FeatureCollection("WRI/GPPD/power_plants") ddf = dask_ee.read_ee(fc) + head = ddf.head() columns = ddf.columns - print(columns) - self.assertIsNotNone(ddf) self.assertIsNotNone(head) self.assertIsInstance(ddf, dd.DataFrame) + self.assertEqual(ddf.compute().shape, (28_664, 23)) + print(columns) print(head) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() From cece66f063452f2539ded789568ce1b610b19a5e Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sun, 23 Jun 2024 16:13:48 +0200 Subject: [PATCH 4/5] Get column metadata a bit faster. --- dask_ee/read.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_ee/read.py b/dask_ee/read.py index 6e111e6..fd5c21f 100644 --- a/dask_ee/read.py +++ b/dask_ee/read.py @@ -36,7 +36,7 @@ def read_ee( raise NotImplementedError('Auto `io_chunks` are not implemented yet!') # Make all the getInfo() calls at once, up front. - fc_size, all_info = ee.List([fc.size(), fc]).getInfo() + fc_size, all_info = ee.List([fc.size(), fc.limit(0)]).getInfo() columns = {'geo': 'Json'} columns.update(all_info['columns']) From 694e3fbb8373143d8d7476e71b40e83770a12534 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sun, 23 Jun 2024 16:32:08 +0200 Subject: [PATCH 5/5] Added basic profiling. --- dask_ee/read_integrationtest.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/dask_ee/read_integrationtest.py b/dask_ee/read_integrationtest.py index 7d34adb..ca794f6 100644 --- a/dask_ee/read_integrationtest.py +++ b/dask_ee/read_integrationtest.py @@ -4,9 +4,10 @@ ``` earthengine authenticate ``` - """ +import cProfile +import pstats import unittest import dask.dataframe as dd @@ -36,6 +37,14 @@ def test_reads_dask_dataframe(self): print(columns) print(head) + def test_prof__read_ee(self): + fc = ee.FeatureCollection("WRI/GPPD/power_plants") + with cProfile.Profile() as pr: + _ = dask_ee.read_ee(fc) + + # Modified version of `pr.print_stats()`. + pstats.Stats(pr).sort_stats("cumtime").print_stats() + if __name__ == "__main__": unittest.main()