Skip to content

Commit 9d51d61

Browse files
fixup! chore(extraction): Extract impact data for gdacs.
1 parent 0c9cbfa commit 9d51d61

File tree

6 files changed

+48
-26
lines changed

6 files changed

+48
-26
lines changed

apps/etl/etl_tasks/gdacs.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@
1818

1919
HAZARDS = [
2020
(HazardType.EARTHQUAKE, 64),
21-
# (HazardType.CYCLONE, 64),
22-
# (HazardType.FLOOD, 64),
23-
# (HazardType.DROUGHT, 64),
24-
# (HazardType.WILDFIRE, 64),
25-
# (HazardType.VOLCANO, 64),
26-
# (HazardType.TSUNAMI, 64),
21+
(HazardType.CYCLONE, 64),
22+
(HazardType.FLOOD, 64),
23+
(HazardType.DROUGHT, 64),
24+
(HazardType.WILDFIRE, 64),
25+
(HazardType.VOLCANO, 64),
26+
(HazardType.TSUNAMI, 64),
2727
]
2828

2929
URL = f"{etl_config.GDACS_URL}/gdacsapi/api/events/geteventlist/SEARCH"
@@ -42,12 +42,10 @@ def ext_and_transform_gdacs_latest_data():
4242
.order_by("-created_at")
4343
.first()
4444
)
45-
# if ext_object:
46-
# start_date = ext_object.created_at.date()
47-
# else:
48-
# start_date = etl_config.GDACS_START_DATE
49-
50-
start_date = etl_config.GDACS_START_DATE
45+
if ext_object:
46+
start_date = ext_object.created_at.date()
47+
else:
48+
start_date = etl_config.GDACS_START_DATE
5149

5250
end_date = dt.today().date()
5351
for hazard, size in HAZARDS:

apps/etl/extraction/sources/base/handler.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,6 @@ def init_extraction(
367367
add_to_queue: bool = True,
368368
queue_name: str | None = None,
369369
) -> ExtractionData:
370-
print("INIT Extraction ", metadata.model_dump())
371370
extraction_obj = ExtractionData.objects.create(
372371
source=cls.source_enum,
373372
url=metadata.url, # type: ignore[reportAttributeAccessIssue] TODO
@@ -379,7 +378,6 @@ def init_extraction(
379378
attempt_no=0,
380379
resp_code=0,
381380
)
382-
print("Ext obj---------:", extraction_obj.id)
383381

384382
if add_to_queue:
385383
cls.task.apply_async([extraction_obj.pk], queue=queue_name or cls.DEFAULT_CELERY_QUEUE)

apps/etl/extraction/sources/gdacs/extract.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ def handle_type_episode(self, retrigger: bool):
206206
parent_extraction=self.extraction_object,
207207
add_to_queue=False,
208208
)
209+
# Extract Impact data
209210
for obj in impact_list:
210211
# source = obj["source"]
211212
impact_url = obj["resource"]["impact"]
@@ -219,6 +220,8 @@ def handle_type_episode(self, retrigger: bool):
219220
parent_extraction=self.extraction_object,
220221
add_to_queue=False,
221222
)
223+
GdacsExtraction.task(impact_obj.id)
224+
222225
else:
223226
geo_obj = ExtractionData.objects.filter(
224227
url=geometry_episode_url,

apps/etl/transform/sources/gdacs.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ class GDACSTransformHandler(BaseTransformerHandler[GDACSTransformer, GDACSDataSo
2222

2323
@classmethod
2424
def get_schema_data(cls, extraction_object):
25+
from apps.etl.extraction.sources.gdacs.extract import GdacsExtractionMetadataType
26+
2527
with extraction_object.resp_data.open("rb") as f:
2628
file_content = f.read()
2729
data_file = write_into_temp_file(file_content)
@@ -39,7 +41,9 @@ def get_schema_data(cls, extraction_object):
3941
source_url=episode_obj.url, input_data=File(path=episode_data_temp_file.name, data_type=DataType.FILE)
4042
),
4143
)
42-
geometry_object = episode_obj.child_extractions.all().first()
44+
geometry_object = (
45+
episode_obj.child_extractions.all().filter(metadata__type=GdacsExtractionMetadataType.GEOMETRY).first()
46+
)
4347

4448
with geometry_object.resp_data.open("rb") as f:
4549
file_content = f.read()
@@ -52,7 +56,21 @@ def get_schema_data(cls, extraction_object):
5256
),
5357
)
5458

55-
episode_data_tuple = (event_episode_data, geometry_episode_data)
59+
impact_object = (
60+
episode_obj.child_extractions.all().filter(metadata__type=GdacsExtractionMetadataType.IMPACT).first()
61+
)
62+
63+
with impact_object.resp_data.open("rb") as f:
64+
file_content = f.read()
65+
impact_detail_temp_file = write_into_temp_file(file_content)
66+
impact_episode_data = GdacsEpisodes(
67+
type=GDACSDataSourceType.IMPACT,
68+
data=GenericDataSource(
69+
source_url=impact_object.url,
70+
input_data=File(path=impact_detail_temp_file.name, data_type=DataType.FILE),
71+
),
72+
)
73+
episode_data_tuple = (event_episode_data, geometry_episode_data, impact_episode_data)
5674
episodes.append(episode_data_tuple)
5775

5876
result = cls.transformer_schema(

schema.graphql

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,8 @@ scalar JSON @specifiedBy(url: "https://ecma-international.org/wp-content/uploads
217217
type Mutation {
218218
login(username: String!, password: String!): UserMeType!
219219
logout: Boolean!
220-
retriggerPipeline(data: PipelineRetriggerInput!): StrMutationResponseType!
221-
retriggerTransform(data: TransformRetriggerInput!): StrMutationResponseType!
220+
retriggerPipeline(data: PipelineRetriggerInput!): PipelineRetriggerTypeMutationResponseType!
221+
retriggerTransform(data: TransformRetriggerInput!): PipelineRetriggerTypeMutationResponseType!
222222
}
223223

224224
type OffsetPaginationInfo {
@@ -244,6 +244,17 @@ input PipelineRetriggerInput {
244244
traceIds: [ID!]!
245245
}
246246

247+
type PipelineRetriggerType {
248+
taskId: ID!
249+
status: String!
250+
}
251+
252+
type PipelineRetriggerTypeMutationResponseType {
253+
ok: Boolean!
254+
errors: CustomErrorType
255+
result: PipelineRetriggerType
256+
}
257+
247258
enum PyStacLoadDataItemTypeEnum {
248259
EVENT
249260
HAZARD
@@ -482,12 +493,6 @@ input StrFilterLookup {
482493
iRegex: String
483494
}
484495

485-
type StrMutationResponseType {
486-
ok: Boolean!
487-
errors: CustomErrorType
488-
result: String
489-
}
490-
491496
input TransformDataFilter {
492497
createdAt: DatetimeDatetimeFilterLookup
493498
status: DataStatusTypeEnum
@@ -564,4 +569,4 @@ type ValStatusSourceCount {
564569
noDataCount: Int!
565570
noChangeCount: Int!
566571
noValidationCount: Int!
567-
}
572+
}

0 commit comments

Comments
 (0)