Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Add namespace support to `DataCube.process`, `PGNode`, `ProcessGraphVisitor` (minor API breaking change) and related. Allows building process graphs with processes from non-"backend" namespaces ([#182](https://github.com/Open-EO/openeo-python-client/issues/182))

### Changed

### Removed
Expand Down
2 changes: 1 addition & 1 deletion openeo/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.4.11a1'
__version__ = '0.5.0a1'
26 changes: 16 additions & 10 deletions openeo/internal/graph_building.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from openeo.api.process import Parameter
from openeo.internal.process_graph_visitor import ProcessGraphVisitor
from openeo.util import legacy_alias
from openeo.util import legacy_alias, dict_no_none


class PGNode:
Expand All @@ -24,7 +24,7 @@ class PGNode:

"""

def __init__(self, process_id: str, arguments: dict = None, **kwargs):
def __init__(self, process_id: str, arguments: dict = None, namespace: Union[str, None] = None, **kwargs):
self._process_id = process_id
# Merge arguments dict and kwargs
arguments = dict(**(arguments or {}), **kwargs)
Expand All @@ -34,6 +34,7 @@ def __init__(self, process_id: str, arguments: dict = None, **kwargs):
arguments[arg] = {"from_node": value}
# TODO: use a frozendict of some sort to ensure immutability?
self._arguments = arguments
self._namespace = namespace

def __repr__(self):
return "<{c} {p!r} at 0x{m:x}>".format(c=self.__class__.__name__, p=self.process_id, m=id(self))
Expand All @@ -46,6 +47,10 @@ def process_id(self) -> str:
def arguments(self) -> dict:
return self._arguments

@property
def namespace(self) -> Union[str, None]:
return self._namespace

def to_dict(self) -> dict:
"""
Convert process graph to a nested dictionary structure.
Expand All @@ -55,7 +60,7 @@ def to_dict(self) -> dict:
def _deep_copy(x):
"""PGNode aware deep copy helper"""
if isinstance(x, PGNode):
return {"process_id": x.process_id, "arguments": _deep_copy(x.arguments)}
return dict_no_none(process_id=x.process_id, arguments=_deep_copy(x.arguments), namespace=x.namespace)
if isinstance(x, Parameter):
return {"from_parameter": x.name}
elif isinstance(x, dict):
Expand Down Expand Up @@ -201,20 +206,21 @@ def accept_node(self, node: PGNode):
# Process reused nodes only first time and remember node id.
node_id = id(node)
if node_id not in self._node_cache:
super()._accept_process(process_id=node.process_id, arguments=node.arguments)
super()._accept_process(process_id=node.process_id, arguments=node.arguments, namespace=node.namespace)
self._node_cache[node_id] = self._last_node_id
else:
self._last_node_id = self._node_cache[node_id]

def enterProcess(self, process_id: str, arguments: dict):
def enterProcess(self, process_id: str, arguments: dict, namespace: Union[str, None]):
self._argument_stack.append({})

def leaveProcess(self, process_id: str, arguments: dict):
def leaveProcess(self, process_id: str, arguments: dict, namespace: Union[str, None]):
node_id = self._node_id_generator.generate(process_id)
self._flattened[node_id] = {
"process_id": process_id,
"arguments": self._argument_stack.pop()
}
self._flattened[node_id] = dict_no_none(
process_id=process_id,
arguments=self._argument_stack.pop(),
namespace=namespace,
)
self._last_node_id = node_id

def _store_argument(self, argument_id: str, value):
Expand Down
14 changes: 8 additions & 6 deletions openeo/internal/process_graph_visitor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABC
from typing import Union

from deprecated import deprecated

Expand Down Expand Up @@ -75,11 +76,12 @@ def accept(self, node: dict):
def accept_node(self, node: dict):
pid = node['process_id']
arguments = node.get('arguments', {})
self._accept_process(process_id=pid, arguments=arguments)
namespace = node.get("namespace", None)
self._accept_process(process_id=pid, arguments=arguments, namespace=namespace)

def _accept_process(self, process_id: str, arguments: dict):
def _accept_process(self, process_id: str, arguments: dict, namespace: Union[str, None]):
self.process_stack.append(process_id)
self.enterProcess(process_id=process_id, arguments=arguments)
self.enterProcess(process_id=process_id, arguments=arguments, namespace=namespace)
for arg_id, value in sorted(arguments.items()):
if isinstance(value, list):
self.enterArray(argument_id=arg_id)
Expand All @@ -91,7 +93,7 @@ def _accept_process(self, process_id: str, arguments: dict):
self.leaveArgument(argument_id=arg_id, value=value)
else:
self.constantArgument(argument_id=arg_id, value=value)
self.leaveProcess(process_id=process_id, arguments=arguments)
self.leaveProcess(process_id=process_id, arguments=arguments, namespace=namespace)
assert self.process_stack.pop() == process_id

def _accept_argument_list(self, elements: list):
Expand Down Expand Up @@ -121,10 +123,10 @@ def _accept_dict(self, value: dict):
def from_parameter(self,parameter_id:str):
pass

def enterProcess(self, process_id: str, arguments: dict):
def enterProcess(self, process_id: str, arguments: dict, namespace: Union[str, None]):
pass

def leaveProcess(self, process_id: str, arguments: dict):
def leaveProcess(self, process_id: str, arguments: dict, namespace: Union[str, None]):
pass

def enterArgument(self, argument_id: str, value):
Expand Down
16 changes: 13 additions & 3 deletions openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,21 @@ def _api_version(self):
def connection(self) -> 'openeo.Connection':
return self._connection

def process(self, process_id: str, arguments: dict = None, metadata: CollectionMetadata = None, **kwargs) -> 'DataCube':
def process(
self,
process_id: str,
arguments: dict = None,
metadata: Optional[CollectionMetadata] = None,
namespace: Optional[str] = None,
**kwargs
) -> 'DataCube':
"""
Generic helper to create a new DataCube by applying a process.

:param process_id: process id of the process.
:param arguments: argument dictionary for the process.
:param metadata: optional: metadata to override original cube metadata (e.g. when reducing dimensions)
:param namespace: optional: process namespace
:return: new DataCube instance
"""
arguments = {**(arguments or {}), **kwargs}
Expand All @@ -110,16 +119,17 @@ def process(self, process_id: str, arguments: dict = None, metadata: CollectionM
return self.process_with_node(PGNode(
process_id=process_id,
arguments=arguments,
namespace=namespace,
), metadata=metadata)

graph_add_node = legacy_alias(process, "graph_add_node")

def process_with_node(self, pg: PGNode, metadata: CollectionMetadata = None) -> 'DataCube':
def process_with_node(self, pg: PGNode, metadata: Optional[CollectionMetadata] = None) -> 'DataCube':
"""
Generic helper to create a new DataCube by applying a process (given as process graph node)

:param pg: process graph node (containing process id and arguments)
:param metadata: (optional) metadata to override original cube metadata (e.g. when reducing dimensions)
:param metadata: optional: metadata to override original cube metadata (e.g. when reducing dimensions)
:return: new DataCube instance
"""
# TODO: deep copy `self.metadata` instead of using same instance?
Expand Down
21 changes: 21 additions & 0 deletions tests/data/1.0.0/process_foo_namespaced.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"loadcollection1": {
"process_id": "load_collection",
"arguments": {
"id": "S2",
"spatial_extent": null,
"temporal_extent": null
}
},
"foo1": {
"process_id": "foo",
"namespace": "bar",
"arguments": {
"data": {
"from_node": "loadcollection1"
},
"bar": 123
},
"result": true
}
}
19 changes: 19 additions & 0 deletions tests/internal/test_graphbuilding.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ def test_pgnode_arguments():
PGNode("foo", arguments={"bar": 123}, bar=456)


def test_pgnode_namespace():
assert PGNode("foo").namespace is None
assert PGNode("foo", namespace="bar").namespace == "bar"


def test_pgnode_to_dict():
pg = PGNode(process_id="load_collection", arguments={"collection_id": "S2"})
assert pg.to_dict() == {
Expand All @@ -27,6 +32,15 @@ def test_pgnode_to_dict():
}


def test_pgnode_to_dict_namespace():
pg = PGNode(process_id="load_collection", arguments={"collection_id": "S2"}, namespace="bar")
assert pg.to_dict() == {
"process_id": "load_collection",
"namespace": "bar",
"arguments": {"collection_id": "S2"}
}


def test_pgnode_to_dict_nested():
pg = PGNode(
process_id="filter_bands",
Expand Down Expand Up @@ -88,6 +102,11 @@ def test_build_and_flatten_argument_dict():
assert node.flat_graph() == {"foo1": {"process_id": "foo", "arguments": {"bar": "red", "x": 3}, "result": True}}


def test_build_and_flatten_namespace():
node = PGNode("foo", namespace="bar")
assert node.flat_graph() == {"foo1": {"process_id": "foo", "namespace": "bar", "arguments": {}, "result": True}}


def test_pgnode_to_dict_subprocess_graphs():
load_collection = PGNode("load_collection", collection_id="S2")
band2 = PGNode("array_element", data={"from_argument": "data"}, index=2)
Expand Down
34 changes: 26 additions & 8 deletions tests/internal/test_process_graph_visitor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from unittest import TestCase
from unittest.mock import MagicMock, call, ANY

import pytest
Expand All @@ -16,7 +15,26 @@ def test_visit_node():
visitor.enterArgument = MagicMock()
visitor.accept_node(node)

assert visitor.enterProcess.call_args_list == [call(process_id="cos", arguments={"x": {"from_argument": "data"}})]
assert visitor.enterProcess.call_args_list == [
call(process_id="cos", arguments={"x": {"from_argument": "data"}}, namespace=None)
]
assert visitor.enterArgument.call_args_list == [call(argument_id="x", value={"from_argument": "data"})]


def test_visit_node_namespaced():
node = {
"process_id": "cos",
"namespace": "math",
"arguments": {"x": {"from_argument": "data"}}
}
visitor = ProcessGraphVisitor()
visitor.enterProcess = MagicMock()
visitor.enterArgument = MagicMock()
visitor.accept_node(node)

assert visitor.enterProcess.call_args_list == [
call(process_id="cos", arguments={"x": {"from_argument": "data"}}, namespace="math")
]
assert visitor.enterArgument.call_args_list == [call(argument_id="x", value={"from_argument": "data"})]


Expand Down Expand Up @@ -51,8 +69,8 @@ def test_visit_nodes():
visitor.accept_process_graph(graph)

assert visitor.leaveProcess.call_args_list == [
call(process_id="abs", arguments=ANY),
call(process_id="cos", arguments=ANY),
call(process_id="abs", arguments=ANY, namespace=None),
call(process_id="cos", arguments=ANY, namespace=None),
]
assert visitor.enterArgument.call_args_list == [
call(argument_id="data", value=ANY),
Expand Down Expand Up @@ -93,8 +111,8 @@ def test_visit_nodes_array():

visitor.accept_process_graph(graph)
assert visitor.leaveProcess.call_args_list == [
call(process_id='abs', arguments=ANY),
call(process_id='cos', arguments=ANY)
call(process_id='abs', arguments=ANY, namespace=None),
call(process_id='cos', arguments=ANY, namespace=None)
]
assert visitor.enterArgument.call_args_list == [
call(argument_id="data", value=ANY)
Expand Down Expand Up @@ -130,8 +148,8 @@ def test_visit_array_with_dereferenced_nodes():

visitor.accept_node(dereferenced)
assert visitor.leaveProcess.call_args_list == [
call(process_id='array_element', arguments=ANY),
call(process_id='product', arguments=ANY)
call(process_id='array_element', arguments=ANY, namespace=None),
call(process_id='product', arguments=ANY, namespace=None)
]
assert visitor.enterArgument.call_args_list == [
call(argument_id="data", value={'from_argument': 'data'})
Expand Down
16 changes: 14 additions & 2 deletions tests/rest/datacube/test_datacube100.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,12 +539,18 @@ def test_custom_process_kwargs_datacube_pg(con100: Connection):
assert res.graph == expected


def test_custom_process_kwargs_datacube_chained(con100: Connection):
def test_custom_process_kwargs_this(con100: Connection):
res = con100.load_collection("S2").process(process_id="foo", data=THIS, bar=123)
expected = load_json_resource('data/1.0.0/process_foo.json')
assert res.graph == expected


def test_custom_process_kwargs_namespaced(con100: Connection):
res = con100.load_collection("S2").process(process_id="foo", data=THIS, bar=123, namespace="bar")
expected = load_json_resource('data/1.0.0/process_foo_namespaced.json')
assert res.graph == expected


def test_custom_process_arguments_datacube(con100: Connection):
img = con100.load_collection("S2")
res = img.process(process_id="foo", arguments={"data": img, "bar": 123})
Expand All @@ -559,12 +565,18 @@ def test_custom_process_arguments_datacube_pg(con100: Connection):
assert res.graph == expected


def test_custom_process_arguments_datacube_chained(con100: Connection):
def test_custom_process_arguments_this(con100: Connection):
res = con100.load_collection("S2").process(process_id="foo", arguments={"data": THIS, "bar": 123})
expected = load_json_resource('data/1.0.0/process_foo.json')
assert res.graph == expected


def test_custom_process_arguments_namespacd(con100: Connection):
res = con100.load_collection("S2").process(process_id="foo", arguments={"data": THIS, "bar": 123}, namespace="bar")
expected = load_json_resource('data/1.0.0/process_foo_namespaced.json')
assert res.graph == expected


def test_save_user_defined_process(con100, requests_mock):
requests_mock.get(API_URL + "/processes", json={"processes": [{"id": "add"}]})

Expand Down