Skip to content

Commit 526ae12

Browse files
authored
Merge pull request #36 from AbsaOSS/Release/2.0.8
Release/2.0.8
2 parents 5de48fa + 6af92bd commit 526ae12

11 files changed

Lines changed: 59 additions & 37 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
# Change Log
22
All notable changes to this project will be documented in this file.
33

4+
## 2.0.8 - 2025-05-xx
5+
### Runner
6+
- expanded config overrides logging
7+
- added config validation
8+
- added owner attribute to GroupMetadata class
9+
410
## 2.0.7 - 2025-04-15
511
### Runner
612
- email reporting is now optional

README.md

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from pydantic import BaseModelfrom rialto.runner.config_loader import PipelineConfigfrom rialto.jobs import config
21

32
# Rialto
43

@@ -106,7 +105,6 @@ pipelines: # a list of pipelines to run
106105
metadata_manager: # optional
107106
metadata_schema: catalog.metadata # schema where metadata is stored
108107
feature_loader: # optional
109-
config_path: model_features_config.yaml # path to the feature loader configuration file
110108
feature_schema: catalog.feature_tables # schema where feature tables are stored
111109
metadata_schema: catalog.metadata # schema where metadata is stored
112110
extras: #optional arguments processed as dictionary
@@ -171,23 +169,21 @@ overrides={"pipelines[name=SimpleGroup].target.target_schema": "new_schema"},
171169
#### Injecting/Replacing whole sections
172170
You can directly replace a bigger section of the configuration by providing a dictionary
173171
When the whole section doesn't exist, it will be added to the configuration, however it needs to be added as a whole.
174-
i.e. if the yaml file doesn't specify feature_loader, you can't just add a feature_loader.config_path, you need to add the whole section.
172+
i.e. if the yaml file doesn't specify feature_loader, you can't just add a feature_loader.feature_schema, you need to add the whole section.
175173
```python
176174
overrides={"pipelines[name=SimpleGroup].feature_loader":
177-
{"config_path": "features_cfg.yaml",
178-
"feature_schema": "catalog.features",
175+
{"feature_schema": "catalog.features",
179176
"metadata_schema": "catalog.metadata"}}
180177
```
181178

182179
#### Multiple overrides
183180
You can provide multiple overrides at once, the order of execution is not guaranteed
184181
```python
185-
overrides={"runner.watch_period_value": 4,
186-
"runner.watch_period_units": "weeks",
182+
overrides={"runner.watched_period_value": 4,
183+
"runner.watched_period_units": "weeks",
187184
"pipelines[name=SimpleGroup].target.target_schema": "new_schema",
188185
"pipelines[name=SimpleGroup].feature_loader":
189-
{"config_path": "features_cfg.yaml",
190-
"feature_schema": "catalog.features",
186+
{"feature_schema": "catalog.features",
191187
"metadata_schema": "catalog.metadata"}
192188
}
193189
```
@@ -630,6 +626,7 @@ GroupMetadata
630626
frequency: Schedule # generation frequency
631627
description: str # group description
632628
key: List[str] # group primary keys
629+
owner: str # owner of the group data (table)
633630
fs_name: str = None # actual table name of this feature group in DataBricks
634631
features: List[str] = None # A list of feature names belonging to this group
635632
```

docs/source/conf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
project = "rialto"
2929
copyright = "2022, Marek Dobransky"
3030
author = "Marek Dobransky"
31-
release = "2.0.7"
31+
release = "2.0.8"
3232

3333
# -- General configuration ---------------------------------------------------
3434
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[tool.poetry]
22
name = "rialto"
33

4-
version = "2.0.7"
4+
version = "2.0.8"
55

66
packages = [
77
{ include = "rialto" },
@@ -54,7 +54,7 @@ build-backend = "poetry.core.masonry.api"
5454

5555
[tool.black]
5656
line-length = 120
57-
target-version = ["py36"]
57+
target-version = ["py310"]
5858

5959
[tool.isort]
6060
profile = "black"

rialto/metadata/data_classes/group_metadata.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class GroupMetadata:
3232
frequency: Schedule
3333
description: str
3434
key: List[str]
35+
owner: str
3536
fs_name: str = None
3637
features: List[str] = None
3738

@@ -42,7 +43,7 @@ def __repr__(self) -> str:
4243
f"name={self.name!r}, frequency={self.frequency!r}, "
4344
f"feature store name={self.fs_name!r},"
4445
f"description={self.description!r}, key={self.key!r}, "
45-
f"features={self.features!r}"
46+
f"features={self.features!r}, owner={self.owner!r}"
4647
")"
4748
)
4849

@@ -65,7 +66,7 @@ def to_tuple(self) -> Tuple:
6566
"""
6667
if not self.fs_name:
6768
self.fs_name = class_to_catalog_name(self.name)
68-
return (self.name, self.frequency.value, self.description, self.key, self.fs_name)
69+
return (self.name, self.frequency.value, self.description, self.key, self.fs_name, self.owner)
6970

7071
@classmethod
7172
def from_spark(cls, schema: Row) -> Self:
@@ -81,4 +82,5 @@ def from_spark(cls, schema: Row) -> Self:
8182
frequency=Schedule[schema.group_frequency],
8283
description=schema.group_description,
8384
key=schema.group_key,
85+
owner=schema.group_owner
8486
)

rialto/runner/config_loader.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,65 +18,69 @@
1818

1919
from typing import Dict, List, Optional
2020

21-
from pydantic import BaseModel
21+
from pydantic import BaseModel, ConfigDict
2222

2323
from rialto.common.utils import load_yaml
2424
from rialto.runner.config_overrides import override_config
2525

2626

27-
class IntervalConfig(BaseModel):
27+
class BaseConfig(BaseModel):
28+
model_config = ConfigDict(extra="forbid")
29+
30+
31+
class IntervalConfig(BaseConfig):
2832
units: str
2933
value: int
3034

3135

32-
class ScheduleConfig(BaseModel):
36+
class ScheduleConfig(BaseConfig):
3337
frequency: str
3438
day: Optional[int] = 0
3539
info_date_shift: Optional[List[IntervalConfig]] = IntervalConfig(units="days", value=0)
3640

3741

38-
class DependencyConfig(BaseModel):
42+
class DependencyConfig(BaseConfig):
3943
table: str
4044
name: Optional[str] = None
4145
date_col: str
4246
interval: IntervalConfig
4347

4448

45-
class ModuleConfig(BaseModel):
49+
class ModuleConfig(BaseConfig):
4650
python_module: str
4751
python_class: str
4852

4953

50-
class MailConfig(BaseModel):
54+
class MailConfig(BaseConfig):
5155
sender: str
5256
to: List[str]
5357
smtp: str
5458
subject: str
5559
sent_empty: Optional[bool] = False
5660

5761

58-
class RunnerConfig(BaseModel):
62+
class RunnerConfig(BaseConfig):
5963
watched_period_units: str
6064
watched_period_value: int
6165
mail: Optional[MailConfig] = None
6266
bookkeeping: Optional[str] = None
6367

6468

65-
class TargetConfig(BaseModel):
69+
class TargetConfig(BaseConfig):
6670
target_schema: str
6771
target_partition_column: str
6872

6973

70-
class MetadataManagerConfig(BaseModel):
74+
class MetadataManagerConfig(BaseConfig):
7175
metadata_schema: str
7276

7377

74-
class FeatureLoaderConfig(BaseModel):
78+
class FeatureLoaderConfig(BaseConfig):
7579
feature_schema: str
7680
metadata_schema: str
7781

7882

79-
class PipelineConfig(BaseModel):
83+
class PipelineConfig(BaseConfig):
8084
name: str
8185
module: ModuleConfig
8286
schedule: ScheduleConfig
@@ -87,7 +91,7 @@ class PipelineConfig(BaseModel):
8791
extras: Optional[Dict] = {}
8892

8993

90-
class PipelinesConfig(BaseModel):
94+
class PipelinesConfig(BaseConfig):
9195
runner: RunnerConfig
9296
pipelines: list[PipelineConfig]
9397

rialto/runner/config_overrides.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def _override(config, path, value) -> Dict:
3535
if "[" in key:
3636
name, index = _split_index_key(key)
3737
if name not in config:
38-
raise ValueError(f"Invalid key {name}")
38+
raise ValueError(f"Invalid key: {name}")
3939
if "=" in index:
4040
index = _find_first_match(config[name], index)
4141
else:
@@ -54,10 +54,12 @@ def _override(config, path, value) -> Dict:
5454
raise IndexError(f"Index {index} out of bounds for key {key}")
5555
else:
5656
if len(path) == 1:
57+
if key not in config:
58+
logger.warning(f"Adding new key: {key} with value {value}")
5759
config[key] = value
5860
else:
5961
if key not in config:
60-
raise ValueError(f"Invalid key {key}")
62+
raise ValueError(f"Invalid key: {key}")
6163
config[key] = _override(config[key], path[1:], value)
6264
return config
6365

@@ -70,7 +72,7 @@ def override_config(config: Dict, overrides: Dict) -> Dict:
7072
:return: Overridden config
7173
"""
7274
for path, value in overrides.items():
73-
logger.info("Applying override: ", path, value)
75+
logger.info(f"Applying override:\npath: {path}\nvalue: {value}")
7476
config = _override(config, path.split("."), value)
7577

7678
return config

rialto/runner/runner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ def _run_pipeline(self, pipeline: PipelineConfig):
258258
)
259259
except Exception as error:
260260
logger.error(f"An exception occurred in pipeline {pipeline.name}")
261-
logger.error(error)
261+
logger.exception(error)
262262
self.tracker.add(
263263
Record(
264264
job=pipeline.name,

tests/metadata/resources.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
StructField("group_description", StringType(), False),
2323
StructField("group_key", ArrayType(StringType(), True), False),
2424
StructField("group_fs_name", StringType(), False),
25+
StructField("group_owner", StringType(), False),
2526
]
2627
)
2728

@@ -35,8 +36,8 @@
3536
)
3637

3738
group_base = [
38-
("Group1", "weekly", "group1", ["key1"], "group_1"),
39-
("Group2", "monthly", "group2", ["key2", "key3"], "group_2"),
39+
("Group1", "weekly", "group1", ["key1"], "group_1", "owner_1"),
40+
("Group2", "monthly", "group2", ["key2", "key3"], "group_2", "owner_2"),
4041
]
4142

4243
feature_base = [
@@ -50,6 +51,7 @@
5051
frequency=Schedule.weekly,
5152
description="group1",
5253
key=["key1"],
54+
owner="owner_1",
5355
)
5456

5557
group_md2 = GroupMetadata(
@@ -58,6 +60,7 @@
5860
frequency=Schedule.monthly,
5961
description="group2",
6062
key=["key2", "key3"],
63+
owner="owner_2",
6164
features=["Feature1", "Feature2"],
6265
)
6366

tests/runner/overrider.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ pipelines:
5050
target_schema: catalog.schema
5151
target_partition_column: "INFORMATION_DATE"
5252
feature_loader:
53-
config_path: path/to/config.yaml
5453
feature_schema: catalog.feature_tables
5554
metadata_schema: catalog.metadata
5655
metadata_manager:

0 commit comments

Comments
 (0)