Skip to content

Commit

Permalink
Initial Implementation of dask-ee. (#6)
Browse files Browse the repository at this point in the history
Fixes #1.
  • Loading branch information
alxmrs authored Jun 23, 2024
1 parent 497ede9 commit c859cf0
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 15 deletions.
2 changes: 1 addition & 1 deletion dask_ee/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
# TODO(alxmrs): https://docs.google.com/document/d/1Ltl6XrZ_uGD2J7OW1roUsxhpFxx5QvNeInjuONIkmL8/edit#heading=h.uj7plawe7x7e
from .read import read_ee
14 changes: 0 additions & 14 deletions dask_ee/default_test.py

This file was deleted.

70 changes: 70 additions & 0 deletions dask_ee/read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Special thanks to @aazuspan for help with the implementation
import typing as t

import dask.dataframe as dd
import ee
import numpy as np
import pandas as pd

# Order is in appearance of types in the EE documentation. This looks alphabetical.
_BUILTIN_DTYPES = {
'Byte': np.uint8,
'Double': np.float64,
'Float': np.float32,
'Int': np.int32,
'Int16': np.int16,
'Int32': np.int32,
'Int64': np.int64,
'Int8': np.int8,
'Json': np.object_, # added to handle GeoJSON columns.
'Long': np.int64,
'Short': np.int16,
'Uint16': np.uint16,
'Uint32': np.uint32,
'Uint8': np.uint8,
'String': np.str_,
}


# TODO(#4): Support 'auto' chunks, where we calculate the maximum allowed page size given the number of
# bytes in each row.
def read_ee(
fc: ee.FeatureCollection, io_chunks: t.Union[int, t.Literal['auto']] = 5_000
) -> dd.DataFrame:

if io_chunks == 'auto':
raise NotImplementedError('Auto `io_chunks` are not implemented yet!')

# Make all the getInfo() calls at once, up front.
fc_size, all_info = ee.List([fc.size(), fc.limit(0)]).getInfo()

columns = {'geo': 'Json'}
columns.update(all_info['columns'])
del columns['system:index']

divisions = tuple(range(0, fc_size, io_chunks))

# TODO(#5): Compare `toList()` to other range operations, like getting all index IDs via `getInfo()`.
pages = [ee.FeatureCollection(fc.toList(io_chunks, i)) for i in divisions]
# Get the remainder, if it exists. `io_chunks` are not likely to evenly partition the data.
d, r = divmod(fc_size, io_chunks)
if r != 0:
pages.append(ee.FeatureCollection(fc.toList(r, d)))
divisions += (fc_size,)

def to_df(page: ee.FeatureCollection) -> pd.DataFrame:
return ee.data.computeFeatures(
{
'expression': page,
'fileFormat': 'PANDAS_DATAFRAME',
}
)

meta = {k: _BUILTIN_DTYPES[v] for k, v in columns.items()}

return dd.from_map(
to_df,
pages,
meta=meta,
divisions=divisions,
)
50 changes: 50 additions & 0 deletions dask_ee/read_integrationtest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Integration tests with Google Earth Engine.
Before running, please authenticate:
```
earthengine authenticate
```
"""

import cProfile
import pstats
import unittest

import dask.dataframe as dd
import ee

import dask_ee


class ReadIntegrationTests(unittest.TestCase):

@classmethod
def setUpClass(cls):
ee.Initialize()

def test_reads_dask_dataframe(self):
fc = ee.FeatureCollection("WRI/GPPD/power_plants")
ddf = dask_ee.read_ee(fc)

head = ddf.head()
columns = ddf.columns

self.assertIsNotNone(ddf)
self.assertIsNotNone(head)
self.assertIsInstance(ddf, dd.DataFrame)
self.assertEqual(ddf.compute().shape, (28_664, 23))

print(columns)
print(head)

def test_prof__read_ee(self):
fc = ee.FeatureCollection("WRI/GPPD/power_plants")
with cProfile.Profile() as pr:
_ = dask_ee.read_ee(fc)

# Modified version of `pr.print_stats()`.
pstats.Stats(pr).sort_stats("cumtime").print_stats()


if __name__ == "__main__":
unittest.main()
14 changes: 14 additions & 0 deletions dask_ee/read_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import unittest


class ReadFeatureCollections(unittest.TestCase):

def test_can_import_read_op(self):
try:
from dask_ee import read_ee
except ModuleNotFoundError:
self.fail('Cannot import `read_ee` function.')


if __name__ == '__main__':
unittest.main()
5 changes: 5 additions & 0 deletions dask_ee/write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# TODO(alxmrs): This is open for design and implementation.


def to_ee(ddf, *args, **kwargs):
raise NotImplementedError('This has not yet been designed.')
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ classifiers = [
]
dependencies = [
"earthengine-api>=0.1.374",
"pandas",
"dask",
]

Expand Down

0 comments on commit c859cf0

Please sign in to comment.