Skip to content

Commit 1fc9588

Browse files
authored
Fix iso_date_regex_pattern config in file_batcher module and allow override (nv-morpheus#1580)
- Update `file_batcher`module to use documented module config key `iso_date_regex_pattern` instead of `batch_iso_date_regex_pattern`. - Allow override of `iso_date_regex_pattern` with control message. - Add tests for `file_batcher` module Closes nv-morpheus#1576 Closes nv-morpheus#1577 ## By Submitting this PR I confirm: - I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md). - When the PR is ready for review, new or existing tests cover these changes. - When the PR is ready for review, the documentation is up to date with these changes. Authors: - Eli Fajardo (https://github.com/efajardo-nv) Approvers: - Michael Demoret (https://github.com/mdemoret-nv) URL: nv-morpheus#1580
1 parent 2fb97ae commit 1fc9588

File tree

2 files changed

+319
-3
lines changed

2 files changed

+319
-3
lines changed

morpheus/modules/file_batcher.py

+19-3
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ def file_batcher(builder: mrc.Builder):
8080
sampling = config.get("sampling", None)
8181
sampling_rate_s = config.get("sampling_rate_s", None)
8282

83-
iso_date_regex_pattern = config.get("batch_iso_date_regex_pattern", DEFAULT_ISO_DATE_REGEX_PATTERN)
83+
iso_date_regex_pattern = config.get("iso_date_regex_pattern", DEFAULT_ISO_DATE_REGEX_PATTERN)
8484
iso_date_regex = re.compile(iso_date_regex_pattern)
8585

8686
if (sampling_rate_s is not None and sampling_rate_s > 0):
@@ -99,6 +99,7 @@ def file_batcher(builder: mrc.Builder):
9999
"sampling": sampling,
100100
"start_time": config.get("start_time"),
101101
"end_time": config.get("end_time"),
102+
"iso_date_regex_pattern": iso_date_regex_pattern
102103
}
103104

104105
default_file_to_df_opts = {
@@ -123,11 +124,19 @@ def build_period_batches(files: typing.List[str],
123124
params: typing.Dict[any, any]) -> typing.List[typing.Tuple[typing.List[str], int]]:
124125
file_objects: fsspec.core.OpenFiles = fsspec.open_files(files)
125126

127+
nonlocal iso_date_regex_pattern
128+
nonlocal iso_date_regex
129+
130+
if params["iso_date_regex_pattern"] != iso_date_regex_pattern:
131+
iso_date_regex_pattern = params["iso_date_regex_pattern"]
132+
iso_date_regex = re.compile(iso_date_regex_pattern)
133+
126134
try:
127135
start_time = params["start_time"]
128136
end_time = params["end_time"]
129137
period = params["period"]
130138
sampling_rate_s = params["sampling_rate_s"]
139+
sampling = params["sampling"]
131140

132141
if not isinstance(start_time, (str, type(None))) or (start_time is not None
133142
and not re.match(r"\d{4}-\d{2}-\d{2}", start_time)):
@@ -137,8 +146,15 @@ def build_period_batches(files: typing.List[str],
137146
and not re.match(r"\d{4}-\d{2}-\d{2}", end_time)):
138147
raise ValueError(f"Invalid 'end_time' value: {end_time}")
139148

140-
if not isinstance(sampling_rate_s, int) or sampling_rate_s < 0:
141-
raise ValueError(f"Invalid 'sampling_rate_s' value: {sampling_rate_s}")
149+
if (sampling_rate_s is not None and sampling_rate_s > 0):
150+
assert sampling is None, "Cannot set both sampling and sampling_rate_s at the same time"
151+
152+
# Show the deprecation message
153+
warnings.warn(("The `sampling_rate_s` argument has been deprecated. "
154+
"Please use `sampling={sampling_rate_s}S` instead"),
155+
DeprecationWarning)
156+
157+
sampling = f"{sampling_rate_s}S"
142158

143159
if (start_time is not None):
144160
start_time = datetime.datetime.strptime(start_time, '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc)

tests/modules/test_file_batcher.py

+300
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
#!/usr/bin/env python
2+
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3+
# SPDX-License-Identifier: Apache-2.0
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
import pytest
18+
19+
import cudf
20+
21+
import morpheus.modules # noqa: F401 # pylint: disable=unused-import
22+
from morpheus.config import Config
23+
from morpheus.messages import ControlMessage
24+
from morpheus.messages.message_meta import MessageMeta
25+
from morpheus.pipeline import LinearPipeline
26+
from morpheus.pipeline.stage_decorator import source
27+
from morpheus.stages.general.linear_modules_stage import LinearModulesStage
28+
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
29+
from morpheus.utils.module_ids import FILE_BATCHER
30+
from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE
31+
32+
# pylint: disable=redundant-keyword-arg
33+
34+
35+
@source
36+
def source_test_stage(filenames: list[str], cm_batching_options: dict) -> ControlMessage:
37+
38+
df = cudf.DataFrame(filenames, columns=['files'])
39+
40+
control_message = ControlMessage()
41+
42+
control_message.set_metadata("batching_options", cm_batching_options)
43+
control_message.set_metadata("data_type", "payload")
44+
45+
control_message.payload(MessageMeta(df=df))
46+
47+
yield control_message
48+
49+
50+
@pytest.fixture(name="default_module_config")
51+
def default_module_config_fixture():
52+
yield {
53+
"module_id": FILE_BATCHER,
54+
"module_name": "file_batcher",
55+
"namespace": MORPHEUS_MODULE_NAMESPACE,
56+
"sampling_rate_s": 0,
57+
"start_time": "2022-08-01",
58+
"end_time": "2022-08-31",
59+
"parser_kwargs": None,
60+
"schema": {
61+
"schema_str": None, "encoding": None
62+
}
63+
}
64+
65+
66+
@pytest.fixture(name="default_file_list")
67+
def default_file_list_fixture():
68+
yield [
69+
"DUO_2022-08-01T00_05_06.806Z.json",
70+
"DUO_2022-08-01T03_02_04.418Z.json",
71+
"DUO_2022-08-01T06_05_05.064Z.json",
72+
"DUO_2022-08-02T00_05_06.806Z.json",
73+
"DUO_2022-08-02T03_02_04.418Z.json",
74+
"DUO_2022-08-02T06_05_05.064Z.json"
75+
]
76+
77+
78+
def test_no_overrides(config: Config, default_module_config, default_file_list):
79+
pipeline = LinearPipeline(config)
80+
81+
cm_batching_opts = {
82+
"sampling_rate_s": 0,
83+
"start_time": "2022-08-01",
84+
"end_time": "2022-08-31",
85+
"parser_kwargs": None,
86+
"schema": {
87+
"schema_str": None, "encoding": None
88+
}
89+
}
90+
91+
pipeline.set_source(source_test_stage(config, filenames=default_file_list, cm_batching_options=cm_batching_opts))
92+
93+
pipeline.add_stage(
94+
LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output"))
95+
96+
sink_stage = pipeline.add_stage(InMemorySinkStage(config))
97+
98+
pipeline.run()
99+
100+
sink_messages = sink_stage.get_messages()
101+
assert len(sink_messages) == 2
102+
assert len(sink_messages[0].get_tasks()["load"][0]["files"]) == 3
103+
assert sink_messages[0].get_tasks()["load"][0]["n_groups"] == 2
104+
assert len(sink_messages[1].get_tasks()["load"][0]["files"]) == 3
105+
assert sink_messages[1].get_tasks()["load"][0]["n_groups"] == 2
106+
107+
108+
def test_no_date_matches(config: Config, default_module_config, default_file_list):
109+
pipeline = LinearPipeline(config)
110+
111+
cm_batching_opts = {
112+
"sampling_rate_s": 0,
113+
"start_time": "2022-09-01",
114+
"end_time": "2022-09-30",
115+
"parser_kwargs": None,
116+
"schema": {
117+
"schema_str": None, "encoding": None
118+
}
119+
}
120+
121+
pipeline.set_source(source_test_stage(config, filenames=default_file_list, cm_batching_options=cm_batching_opts))
122+
123+
pipeline.add_stage(
124+
LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output"))
125+
126+
sink_stage = pipeline.add_stage(InMemorySinkStage(config))
127+
128+
pipeline.run()
129+
130+
sink_messages = sink_stage.get_messages()
131+
assert len(sink_messages) == 0
132+
133+
134+
def test_partial_date_matches(config: Config, default_module_config, default_file_list):
135+
pipeline = LinearPipeline(config)
136+
137+
cm_batching_opts = {
138+
"sampling_rate_s": 0,
139+
"start_time": "2022-07-30",
140+
"end_time": "2022-08-02",
141+
"parser_kwargs": None,
142+
"schema": {
143+
"schema_str": None, "encoding": None
144+
}
145+
}
146+
147+
pipeline.set_source(source_test_stage(config, filenames=default_file_list, cm_batching_options=cm_batching_opts))
148+
149+
pipeline.add_stage(
150+
LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output"))
151+
152+
sink_stage = pipeline.add_stage(InMemorySinkStage(config))
153+
154+
pipeline.run()
155+
156+
sink_messages = sink_stage.get_messages()
157+
sink_messages = sink_stage.get_messages()
158+
assert len(sink_messages) == 1
159+
assert len(sink_messages[0].get_tasks()["load"][0]["files"]) == 3
160+
assert sink_messages[0].get_tasks()["load"][0]["n_groups"] == 1
161+
162+
163+
def test_override_date_regex(config: Config, default_module_config):
164+
pipeline = LinearPipeline(config)
165+
166+
filenames = [
167+
"DUO_2022-08-01_00_05_06.806Z.json",
168+
"DUO_2022-08-01_03_02_04.418Z.json",
169+
"DUO_2022-08-01_06_05_05.064Z.json",
170+
"DUO_2022-08-02_00_05_06.806Z.json",
171+
"DUO_2022-08-02_03_02_04.418Z.json",
172+
"DUO_2022-08-02_06_05_05.064Z.json"
173+
]
174+
175+
cm_date_regex_pattern = (
176+
r"(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})"
177+
r"_(?P<hour>\d{1,2})(:|_)(?P<minute>\d{1,2})(:|_)(?P<second>\d{1,2})(?P<microsecond>\.\d{1,6})?Z")
178+
179+
cm_batching_opts = {
180+
"sampling_rate_s": 0,
181+
"start_time": "2022-08-01",
182+
"end_time": "2022-08-31",
183+
"iso_date_regex_pattern": cm_date_regex_pattern,
184+
"parser_kwargs": None,
185+
"schema": {
186+
"schema_str": None, "encoding": None
187+
}
188+
}
189+
190+
pipeline.set_source(source_test_stage(config, filenames=filenames, cm_batching_options=cm_batching_opts))
191+
192+
pipeline.add_stage(
193+
LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output"))
194+
195+
sink_stage = pipeline.add_stage(InMemorySinkStage(config))
196+
197+
pipeline.run()
198+
199+
sink_messages = sink_stage.get_messages()
200+
assert len(sink_messages) == 2
201+
assert len(sink_messages[0].get_tasks()["load"][0]["files"]) == 3
202+
assert sink_messages[0].get_tasks()["load"][0]["n_groups"] == 2
203+
assert len(sink_messages[1].get_tasks()["load"][0]["files"]) == 3
204+
assert sink_messages[1].get_tasks()["load"][0]["n_groups"] == 2
205+
206+
207+
def test_sampling_freq(config: Config, default_module_config):
208+
pipeline = LinearPipeline(config)
209+
210+
filenames = [
211+
"DUO_2022-08-01T00_05_06.806Z.json",
212+
"DUO_2022-08-01T00_05_08.418Z.json",
213+
"DUO_2022-08-01T00_05_12.064Z.json",
214+
"DUO_2022-08-02T03_02_06.806Z.json",
215+
"DUO_2022-08-02T03_02_14.418Z.json",
216+
"DUO_2022-08-02T03_02_17.064Z.json"
217+
]
218+
219+
cm_batching_opts = {
220+
"sampling_rate_s": None,
221+
"sampling": "30S",
222+
"start_time": "2022-08-01",
223+
"end_time": "2022-08-31",
224+
"parser_kwargs": None,
225+
"schema": {
226+
"schema_str": None, "encoding": None
227+
}
228+
}
229+
230+
pipeline.set_source(source_test_stage(config, filenames=filenames, cm_batching_options=cm_batching_opts))
231+
232+
pipeline.add_stage(
233+
LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output"))
234+
235+
sink_stage = pipeline.add_stage(InMemorySinkStage(config))
236+
237+
pipeline.run()
238+
239+
sink_messages = sink_stage.get_messages()
240+
assert len(sink_messages) == 2
241+
assert len(sink_messages[0].get_tasks()["load"][0]["files"]) == 1
242+
assert sink_messages[0].get_tasks()["load"][0]["n_groups"] == 2
243+
assert len(sink_messages[1].get_tasks()["load"][0]["files"]) == 1
244+
assert sink_messages[1].get_tasks()["load"][0]["n_groups"] == 2
245+
246+
247+
def test_sampling_pct(config: Config, default_module_config, default_file_list):
248+
pipeline = LinearPipeline(config)
249+
250+
cm_batching_opts = {
251+
"sampling_rate_s": None,
252+
"sampling": 0.5,
253+
"start_time": "2022-08-01",
254+
"end_time": "2022-08-31",
255+
"parser_kwargs": None,
256+
"schema": {
257+
"schema_str": None, "encoding": None
258+
}
259+
}
260+
261+
pipeline.set_source(source_test_stage(config, filenames=default_file_list, cm_batching_options=cm_batching_opts))
262+
263+
pipeline.add_stage(
264+
LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output"))
265+
266+
sink_stage = pipeline.add_stage(InMemorySinkStage(config))
267+
268+
pipeline.run()
269+
270+
sink_messages = sink_stage.get_messages()
271+
msg_counts = [len(m.get_tasks()["load"][0]["files"]) for m in sink_messages]
272+
assert sum(msg_counts) == 3
273+
274+
275+
def test_sampling_fixed(config: Config, default_module_config, default_file_list):
276+
pipeline = LinearPipeline(config)
277+
278+
cm_batching_opts = {
279+
"sampling_rate_s": None,
280+
"sampling": 5,
281+
"start_time": "2022-08-01",
282+
"end_time": "2022-08-31",
283+
"parser_kwargs": None,
284+
"schema": {
285+
"schema_str": None, "encoding": None
286+
}
287+
}
288+
289+
pipeline.set_source(source_test_stage(config, filenames=default_file_list, cm_batching_options=cm_batching_opts))
290+
291+
pipeline.add_stage(
292+
LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output"))
293+
294+
sink_stage = pipeline.add_stage(InMemorySinkStage(config))
295+
296+
pipeline.run()
297+
298+
sink_messages = sink_stage.get_messages()
299+
msg_counts = [len(m.get_tasks()["load"][0]["files"]) for m in sink_messages]
300+
assert sum(msg_counts) == 5

0 commit comments

Comments
 (0)