Skip to content

Commit 2f3aed4

Browse files
committed
Issue #182 add namespace support to DataCube.process and related
bumps version to 0.5.0a1 because the `ProcessGraphVisitor` changed a bit
1 parent 05ab328 commit 2f3aed4

File tree

9 files changed

+120
-30
lines changed

9 files changed

+120
-30
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
88

99
### Added
1010

11+
- Add namespace support to `DataCube.process`, `PGNode`, `ProcessGraphVisitor` (minor API breaking change) and related. Allows building process graphs with processes from non-"backend" namespaces ([#182](https://github.com/Open-EO/openeo-python-client/issues/182))
12+
1113
### Changed
1214

1315
### Removed

openeo/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '0.4.11a1'
1+
__version__ = '0.5.0a1'

openeo/internal/graph_building.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from openeo.api.process import Parameter
1010
from openeo.internal.process_graph_visitor import ProcessGraphVisitor
11-
from openeo.util import legacy_alias
11+
from openeo.util import legacy_alias, dict_no_none
1212

1313

1414
class PGNode:
@@ -24,7 +24,7 @@ class PGNode:
2424
2525
"""
2626

27-
def __init__(self, process_id: str, arguments: dict = None, **kwargs):
27+
def __init__(self, process_id: str, arguments: dict = None, namespace: Union[str, None] = None, **kwargs):
2828
self._process_id = process_id
2929
# Merge arguments dict and kwargs
3030
arguments = dict(**(arguments or {}), **kwargs)
@@ -34,6 +34,7 @@ def __init__(self, process_id: str, arguments: dict = None, **kwargs):
3434
arguments[arg] = {"from_node": value}
3535
# TODO: use a frozendict of some sort to ensure immutability?
3636
self._arguments = arguments
37+
self._namespace = namespace
3738

3839
def __repr__(self):
3940
return "<{c} {p!r} at 0x{m:x}>".format(c=self.__class__.__name__, p=self.process_id, m=id(self))
@@ -46,6 +47,10 @@ def process_id(self) -> str:
4647
def arguments(self) -> dict:
4748
return self._arguments
4849

50+
@property
51+
def namespace(self) -> Union[str, None]:
52+
return self._namespace
53+
4954
def to_dict(self) -> dict:
5055
"""
5156
Convert process graph to a nested dictionary structure.
@@ -55,7 +60,7 @@ def to_dict(self) -> dict:
5560
def _deep_copy(x):
5661
"""PGNode aware deep copy helper"""
5762
if isinstance(x, PGNode):
58-
return {"process_id": x.process_id, "arguments": _deep_copy(x.arguments)}
63+
return dict_no_none(process_id=x.process_id, arguments=_deep_copy(x.arguments), namespace=x.namespace)
5964
if isinstance(x, Parameter):
6065
return {"from_parameter": x.name}
6166
elif isinstance(x, dict):
@@ -201,20 +206,21 @@ def accept_node(self, node: PGNode):
201206
# Process reused nodes only first time and remember node id.
202207
node_id = id(node)
203208
if node_id not in self._node_cache:
204-
super()._accept_process(process_id=node.process_id, arguments=node.arguments)
209+
super()._accept_process(process_id=node.process_id, arguments=node.arguments, namespace=node.namespace)
205210
self._node_cache[node_id] = self._last_node_id
206211
else:
207212
self._last_node_id = self._node_cache[node_id]
208213

209-
def enterProcess(self, process_id: str, arguments: dict):
214+
def enterProcess(self, process_id: str, arguments: dict, namespace: Union[str, None]):
210215
self._argument_stack.append({})
211216

212-
def leaveProcess(self, process_id: str, arguments: dict):
217+
def leaveProcess(self, process_id: str, arguments: dict, namespace: Union[str, None]):
213218
node_id = self._node_id_generator.generate(process_id)
214-
self._flattened[node_id] = {
215-
"process_id": process_id,
216-
"arguments": self._argument_stack.pop()
217-
}
219+
self._flattened[node_id] = dict_no_none(
220+
process_id=process_id,
221+
arguments=self._argument_stack.pop(),
222+
namespace=namespace,
223+
)
218224
self._last_node_id = node_id
219225

220226
def _store_argument(self, argument_id: str, value):

openeo/internal/process_graph_visitor.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from abc import ABC
2+
from typing import Union
23

34
from deprecated import deprecated
45

@@ -75,11 +76,12 @@ def accept(self, node: dict):
7576
def accept_node(self, node: dict):
7677
pid = node['process_id']
7778
arguments = node.get('arguments', {})
78-
self._accept_process(process_id=pid, arguments=arguments)
79+
namespace = node.get("namespace", None)
80+
self._accept_process(process_id=pid, arguments=arguments, namespace=namespace)
7981

80-
def _accept_process(self, process_id: str, arguments: dict):
82+
def _accept_process(self, process_id: str, arguments: dict, namespace: Union[str, None]):
8183
self.process_stack.append(process_id)
82-
self.enterProcess(process_id=process_id, arguments=arguments)
84+
self.enterProcess(process_id=process_id, arguments=arguments, namespace=namespace)
8385
for arg_id, value in sorted(arguments.items()):
8486
if isinstance(value, list):
8587
self.enterArray(argument_id=arg_id)
@@ -91,7 +93,7 @@ def _accept_process(self, process_id: str, arguments: dict):
9193
self.leaveArgument(argument_id=arg_id, value=value)
9294
else:
9395
self.constantArgument(argument_id=arg_id, value=value)
94-
self.leaveProcess(process_id=process_id, arguments=arguments)
96+
self.leaveProcess(process_id=process_id, arguments=arguments, namespace=namespace)
9597
assert self.process_stack.pop() == process_id
9698

9799
def _accept_argument_list(self, elements: list):
@@ -121,10 +123,10 @@ def _accept_dict(self, value: dict):
121123
def from_parameter(self,parameter_id:str):
122124
pass
123125

124-
def enterProcess(self, process_id: str, arguments: dict):
126+
def enterProcess(self, process_id: str, arguments: dict, namespace: Union[str, None]):
125127
pass
126128

127-
def leaveProcess(self, process_id: str, arguments: dict):
129+
def leaveProcess(self, process_id: str, arguments: dict, namespace: Union[str, None]):
128130
pass
129131

130132
def enterArgument(self, argument_id: str, value):

openeo/rest/datacube.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,21 @@ def _api_version(self):
9393
def connection(self) -> 'openeo.Connection':
9494
return self._connection
9595

96-
def process(self, process_id: str, arguments: dict = None, metadata: CollectionMetadata = None, **kwargs) -> 'DataCube':
96+
def process(
97+
self,
98+
process_id: str,
99+
arguments: dict = None,
100+
metadata: Optional[CollectionMetadata] = None,
101+
namespace: Optional[str] = None,
102+
**kwargs
103+
) -> 'DataCube':
97104
"""
98105
Generic helper to create a new DataCube by applying a process.
99106
100107
:param process_id: process id of the process.
101108
:param arguments: argument dictionary for the process.
109+
:param metadata: optional: metadata to override original cube metadata (e.g. when reducing dimensions)
110+
:param namespace: optional: process namespace
102111
:return: new DataCube instance
103112
"""
104113
arguments = {**(arguments or {}), **kwargs}
@@ -110,16 +119,17 @@ def process(self, process_id: str, arguments: dict = None, metadata: CollectionM
110119
return self.process_with_node(PGNode(
111120
process_id=process_id,
112121
arguments=arguments,
122+
namespace=namespace,
113123
), metadata=metadata)
114124

115125
graph_add_node = legacy_alias(process, "graph_add_node")
116126

117-
def process_with_node(self, pg: PGNode, metadata: CollectionMetadata = None) -> 'DataCube':
127+
def process_with_node(self, pg: PGNode, metadata: Optional[CollectionMetadata] = None) -> 'DataCube':
118128
"""
119129
Generic helper to create a new DataCube by applying a process (given as process graph node)
120130
121131
:param pg: process graph node (containing process id and arguments)
122-
:param metadata: (optional) metadata to override original cube metadata (e.g. when reducing dimensions)
132+
:param metadata: optional: metadata to override original cube metadata (e.g. when reducing dimensions)
123133
:return: new DataCube instance
124134
"""
125135
# TODO: deep copy `self.metadata` instead of using same instance?
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"loadcollection1": {
3+
"process_id": "load_collection",
4+
"arguments": {
5+
"id": "S2",
6+
"spatial_extent": null,
7+
"temporal_extent": null
8+
}
9+
},
10+
"foo1": {
11+
"process_id": "foo",
12+
"namespace": "bar",
13+
"arguments": {
14+
"data": {
15+
"from_node": "loadcollection1"
16+
},
17+
"bar": 123
18+
},
19+
"result": true
20+
}
21+
}

tests/internal/test_graphbuilding.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ def test_pgnode_arguments():
1919
PGNode("foo", arguments={"bar": 123}, bar=456)
2020

2121

22+
def test_pgnode_namespace():
23+
assert PGNode("foo").namespace is None
24+
assert PGNode("foo", namespace="bar").namespace == "bar"
25+
26+
2227
def test_pgnode_to_dict():
2328
pg = PGNode(process_id="load_collection", arguments={"collection_id": "S2"})
2429
assert pg.to_dict() == {
@@ -27,6 +32,15 @@ def test_pgnode_to_dict():
2732
}
2833

2934

35+
def test_pgnode_to_dict_namespace():
36+
pg = PGNode(process_id="load_collection", arguments={"collection_id": "S2"}, namespace="bar")
37+
assert pg.to_dict() == {
38+
"process_id": "load_collection",
39+
"namespace": "bar",
40+
"arguments": {"collection_id": "S2"}
41+
}
42+
43+
3044
def test_pgnode_to_dict_nested():
3145
pg = PGNode(
3246
process_id="filter_bands",
@@ -88,6 +102,11 @@ def test_build_and_flatten_argument_dict():
88102
assert node.flat_graph() == {"foo1": {"process_id": "foo", "arguments": {"bar": "red", "x": 3}, "result": True}}
89103

90104

105+
def test_build_and_flatten_namespace():
106+
node = PGNode("foo", namespace="bar")
107+
assert node.flat_graph() == {"foo1": {"process_id": "foo", "namespace": "bar", "arguments": {}, "result": True}}
108+
109+
91110
def test_pgnode_to_dict_subprocess_graphs():
92111
load_collection = PGNode("load_collection", collection_id="S2")
93112
band2 = PGNode("array_element", data={"from_argument": "data"}, index=2)

tests/internal/test_process_graph_visitor.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from unittest import TestCase
21
from unittest.mock import MagicMock, call, ANY
32

43
import pytest
@@ -16,7 +15,26 @@ def test_visit_node():
1615
visitor.enterArgument = MagicMock()
1716
visitor.accept_node(node)
1817

19-
assert visitor.enterProcess.call_args_list == [call(process_id="cos", arguments={"x": {"from_argument": "data"}})]
18+
assert visitor.enterProcess.call_args_list == [
19+
call(process_id="cos", arguments={"x": {"from_argument": "data"}}, namespace=None)
20+
]
21+
assert visitor.enterArgument.call_args_list == [call(argument_id="x", value={"from_argument": "data"})]
22+
23+
24+
def test_visit_node_namespaced():
25+
node = {
26+
"process_id": "cos",
27+
"namespace": "math",
28+
"arguments": {"x": {"from_argument": "data"}}
29+
}
30+
visitor = ProcessGraphVisitor()
31+
visitor.enterProcess = MagicMock()
32+
visitor.enterArgument = MagicMock()
33+
visitor.accept_node(node)
34+
35+
assert visitor.enterProcess.call_args_list == [
36+
call(process_id="cos", arguments={"x": {"from_argument": "data"}}, namespace="math")
37+
]
2038
assert visitor.enterArgument.call_args_list == [call(argument_id="x", value={"from_argument": "data"})]
2139

2240

@@ -51,8 +69,8 @@ def test_visit_nodes():
5169
visitor.accept_process_graph(graph)
5270

5371
assert visitor.leaveProcess.call_args_list == [
54-
call(process_id="abs", arguments=ANY),
55-
call(process_id="cos", arguments=ANY),
72+
call(process_id="abs", arguments=ANY, namespace=None),
73+
call(process_id="cos", arguments=ANY, namespace=None),
5674
]
5775
assert visitor.enterArgument.call_args_list == [
5876
call(argument_id="data", value=ANY),
@@ -93,8 +111,8 @@ def test_visit_nodes_array():
93111

94112
visitor.accept_process_graph(graph)
95113
assert visitor.leaveProcess.call_args_list == [
96-
call(process_id='abs', arguments=ANY),
97-
call(process_id='cos', arguments=ANY)
114+
call(process_id='abs', arguments=ANY, namespace=None),
115+
call(process_id='cos', arguments=ANY, namespace=None)
98116
]
99117
assert visitor.enterArgument.call_args_list == [
100118
call(argument_id="data", value=ANY)
@@ -130,8 +148,8 @@ def test_visit_array_with_dereferenced_nodes():
130148

131149
visitor.accept_node(dereferenced)
132150
assert visitor.leaveProcess.call_args_list == [
133-
call(process_id='array_element', arguments=ANY),
134-
call(process_id='product', arguments=ANY)
151+
call(process_id='array_element', arguments=ANY, namespace=None),
152+
call(process_id='product', arguments=ANY, namespace=None)
135153
]
136154
assert visitor.enterArgument.call_args_list == [
137155
call(argument_id="data", value={'from_argument': 'data'})

tests/rest/datacube/test_datacube100.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -539,12 +539,18 @@ def test_custom_process_kwargs_datacube_pg(con100: Connection):
539539
assert res.graph == expected
540540

541541

542-
def test_custom_process_kwargs_datacube_chained(con100: Connection):
542+
def test_custom_process_kwargs_this(con100: Connection):
543543
res = con100.load_collection("S2").process(process_id="foo", data=THIS, bar=123)
544544
expected = load_json_resource('data/1.0.0/process_foo.json')
545545
assert res.graph == expected
546546

547547

548+
def test_custom_process_kwargs_namespaced(con100: Connection):
549+
res = con100.load_collection("S2").process(process_id="foo", data=THIS, bar=123, namespace="bar")
550+
expected = load_json_resource('data/1.0.0/process_foo_namespaced.json')
551+
assert res.graph == expected
552+
553+
548554
def test_custom_process_arguments_datacube(con100: Connection):
549555
img = con100.load_collection("S2")
550556
res = img.process(process_id="foo", arguments={"data": img, "bar": 123})
@@ -559,12 +565,18 @@ def test_custom_process_arguments_datacube_pg(con100: Connection):
559565
assert res.graph == expected
560566

561567

562-
def test_custom_process_arguments_datacube_chained(con100: Connection):
568+
def test_custom_process_arguments_this(con100: Connection):
563569
res = con100.load_collection("S2").process(process_id="foo", arguments={"data": THIS, "bar": 123})
564570
expected = load_json_resource('data/1.0.0/process_foo.json')
565571
assert res.graph == expected
566572

567573

574+
def test_custom_process_arguments_namespacd(con100: Connection):
575+
res = con100.load_collection("S2").process(process_id="foo", arguments={"data": THIS, "bar": 123}, namespace="bar")
576+
expected = load_json_resource('data/1.0.0/process_foo_namespaced.json')
577+
assert res.graph == expected
578+
579+
568580
def test_save_user_defined_process(con100, requests_mock):
569581
requests_mock.get(API_URL + "/processes", json={"processes": [{"id": "add"}]})
570582

0 commit comments

Comments
 (0)