Skip to content

Commit fe9d713

Browse files
CNDE 2087: Preprocessing service treatment (#195)
1 parent ac929cd commit fe9d713

File tree

12 files changed

+362
-100
lines changed

12 files changed

+362
-100
lines changed

db/upgrade/odse/routines/021-sp_treatment_event.sql

+45-49
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ BEGIN
4747
par1.subject_entity_uid AS provider_uid,
4848
viewPatientKeys.treatment_uid AS patient_treatment_uid,
4949
act2.target_act_uid AS morbidity_uid,
50-
rx1.LOCAL_ID,
51-
rx1.ADD_TIME,
52-
rx1.ADD_USER_ID,
53-
rx1.LAST_CHG_TIME,
54-
rx1.LAST_CHG_USER_ID,
55-
rx1.VERSION_CTRL_NBR
50+
rx1.local_id,
51+
rx1.add_time,
52+
rx1.add_user_id,
53+
rx1.last_chg_time,
54+
rx1.last_chg_user_id,
55+
rx1.version_ctrl_nbr
5656
INTO #TREATMENT_UIDS
5757
FROM NBS_ODSE.dbo.treatment AS rx1 WITH (NOLOCK)
5858
INNER JOIN NBS_ODSE.dbo.Treatment_administered AS rx2 WITH (NOLOCK)
@@ -117,31 +117,27 @@ BEGIN
117117
t.provider_uid,
118118
t.patient_treatment_uid,
119119
t.morbidity_uid,
120-
rx1.cd_desc_txt AS Treatment_nm,
121-
rx1.program_jurisdiction_oid AS Treatment_oid,
122-
REPLACE(REPLACE(rx1.txt, CHAR(13) + CHAR(10), ' '), CHAR(10), ' ') AS Treatment_comments,
123-
rx1.shared_ind AS Treatment_shared_ind,
120+
rx1.cd_desc_txt AS treatment_name,
121+
rx1.program_jurisdiction_oid AS treatment_oid,
122+
REPLACE(REPLACE(rx1.txt, CHAR(13) + CHAR(10), ' '), CHAR(10), ' ') AS treatment_comments,
123+
rx1.shared_ind AS treatment_shared_ind,
124124
rx1.cd,
125-
rx2.effective_from_time AS Treatment_dt,
126-
rx2.cd AS Treatment_drug,
127-
rx2.cd_desc_txt AS Treatment_drug_nm,
128-
rx2.dose_qty AS Treatment_dosage_strength,
129-
rx2.dose_qty_unit_cd AS Treatment_dosage_strength_unit,
130-
rx2.interval_cd AS Treatment_frequency,
131-
rx2.effective_duration_amt AS Treatment_duration,
132-
rx2.effective_duration_unit_cd AS Treatment_duration_unit,
133-
rx2.route_cd AS Treatment_route,
134-
t.LOCAL_ID,
135-
CASE
136-
WHEN rx1.record_status_cd = '' THEN 'ACTIVE'
137-
WHEN rx1.record_status_cd = 'LOG_DEL' THEN 'INACTIVE'
138-
ELSE rx1.record_status_cd
139-
END as record_status_cd,
140-
t.ADD_TIME,
141-
t.ADD_USER_ID,
142-
t.LAST_CHG_TIME,
143-
t.LAST_CHG_USER_ID,
144-
t.VERSION_CTRL_NBR
125+
rx2.effective_from_time AS treatment_date,
126+
rx2.cd AS treatment_drug,
127+
rx2.cd_desc_txt AS treatment_drug_name,
128+
rx2.dose_qty AS treatment_dosage_strength,
129+
rx2.dose_qty_unit_cd AS treatment_dosage_strength_unit,
130+
rx2.interval_cd AS treatment_frequency,
131+
rx2.effective_duration_amt AS treatment_duration,
132+
rx2.effective_duration_unit_cd AS treatment_duration_unit,
133+
rx2.route_cd AS treatment_route,
134+
t.local_id,
135+
dbo.fn_get_record_status(rx1.record_status_cd) as record_status_cd,
136+
t.add_time,
137+
t.add_user_id,
138+
t.last_chg_time,
139+
t.last_chg_user_id,
140+
t.version_ctrl_nbr
145141
INTO #TREATMENT_DETAILS
146142
FROM #TREATMENT_UIDS t
147143
INNER JOIN NBS_ODSE.dbo.treatment rx1 WITH (NOLOCK)
@@ -176,27 +172,27 @@ BEGIN
176172
t.provider_uid,
177173
t.morbidity_uid,
178174
t.patient_treatment_uid,
179-
t.Treatment_nm,
180-
t.Treatment_oid,
181-
t.Treatment_comments,
182-
t.Treatment_shared_ind,
175+
t.treatment_name,
176+
t.treatment_oid,
177+
t.treatment_comments,
178+
t.treatment_shared_ind,
183179
t.cd,
184-
t.Treatment_dt,
185-
t.Treatment_drug,
186-
t.Treatment_drug_nm,
187-
t.Treatment_dosage_strength,
188-
t.Treatment_dosage_strength_unit,
189-
t.Treatment_frequency,
190-
t.Treatment_duration,
191-
t.Treatment_duration_unit,
192-
t.Treatment_route,
193-
t.LOCAL_ID,
180+
t.treatment_date,
181+
t.treatment_drug,
182+
t.treatment_drug_name,
183+
t.treatment_dosage_strength,
184+
t.treatment_dosage_strength_unit,
185+
t.treatment_frequency,
186+
t.treatment_duration,
187+
t.treatment_duration_unit,
188+
t.treatment_route,
189+
t.local_id,
194190
t.record_status_cd,
195-
t.ADD_TIME,
196-
t.ADD_USER_ID,
197-
t.LAST_CHG_TIME,
198-
t.LAST_CHG_USER_ID,
199-
t.VERSION_CTRL_NBR
191+
t.add_time,
192+
t.add_user_id,
193+
t.last_chg_time,
194+
t.last_chg_user_id,
195+
t.version_ctrl_nbr
200196
FROM #TREATMENT_DETAILS t;
201197

202198

investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/controller/InvestigationController.java

+8
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ public class InvestigationController {
2929
@Value("${spring.kafka.input.topic-name-vac}")
3030
private String vaccinationTopic;
3131

32+
@Value("${spring.kafka.input.topic-name-tmt}")
33+
private String treatmentTopic;
34+
3235

3336
@GetMapping("/reporting/investigation-svc/status")
3437
public ResponseEntity<String> getDataPipelineStatusHealth() {
@@ -60,4 +63,9 @@ public void postContact(@RequestBody String jsonData) {
6063
public void postVaccination(@RequestBody String jsonData) {
6164
producerService.sendMessage(vaccinationTopic, jsonData);
6265
}
66+
67+
@PostMapping("/reporting/investigation-svc/treatment")
68+
public void postTreatment(@RequestBody String jsonData)
69+
{producerService.sendMessage(treatmentTopic, jsonData);}
70+
6371
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package gov.cdc.etldatapipeline.investigation.repository;
2+
3+
import gov.cdc.etldatapipeline.investigation.repository.model.dto.Treatment;
4+
import org.springframework.data.jpa.repository.JpaRepository;
5+
import org.springframework.data.jpa.repository.Query;
6+
import org.springframework.data.repository.query.Param;
7+
8+
import java.util.Optional;
9+
10+
public interface TreatmentRepository extends JpaRepository<Treatment, String> {
11+
12+
@Query(nativeQuery = true, value = "exec sp_treatment_event :treatment_uid")
13+
Optional<Treatment> computeTreatment(@Param("treatment_uid") String treatmentUid);
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package gov.cdc.etldatapipeline.investigation.repository.model.dto;
2+
3+
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
4+
import com.fasterxml.jackson.databind.annotation.JsonNaming;
5+
import jakarta.persistence.Column;
6+
import jakarta.persistence.Entity;
7+
import jakarta.persistence.Id;
8+
import lombok.Data;
9+
10+
@Entity
11+
@Data
12+
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
13+
public class Treatment {
14+
@Id
15+
@Column(name = "treatment_uid")
16+
private String treatmentUid;
17+
18+
@Column(name = "public_health_case_uid")
19+
private String publicHealthCaseUid;
20+
21+
@Column(name = "organization_uid")
22+
private String organizationUid;
23+
24+
@Column(name = "provider_uid")
25+
private String providerUid;
26+
27+
@Column(name = "patient_treatment_uid")
28+
private String patientTreatmentUid;
29+
30+
@Column(name = "treatment_name")
31+
private String treatmentName;
32+
33+
@Column(name = "treatment_oid")
34+
private String treatmentOid;
35+
36+
@Column(name = "treatment_comments")
37+
private String treatmentComments;
38+
39+
@Column(name = "treatment_shared_ind")
40+
private String treatmentSharedInd;
41+
42+
@Column(name = "cd")
43+
private String cd;
44+
45+
@Column(name = "treatment_date")
46+
private String treatmentDate;
47+
48+
@Column(name = "treatment_drug")
49+
private String treatmentDrug;
50+
51+
@Column(name = "treatment_drug_name")
52+
private String treatmentDrugName;
53+
54+
@Column(name = "treatment_dosage_strength")
55+
private String treatmentDosageStrength;
56+
57+
@Column(name = "treatment_dosage_strength_unit")
58+
private String treatmentDosageStrengthUnit;
59+
60+
@Column(name = "treatment_frequency")
61+
private String treatmentFrequency;
62+
63+
@Column(name = "treatment_duration")
64+
private String treatmentDuration;
65+
66+
@Column(name = "treatment_duration_unit")
67+
private String treatmentDurationUnit;
68+
69+
@Column(name = "treatment_route")
70+
private String treatmentRoute;
71+
72+
@Column(name = "local_id")
73+
private String localId;
74+
75+
@Column(name = "record_status_cd")
76+
private String recordStatusCd;
77+
78+
@Column(name = "add_time")
79+
private String addTime;
80+
81+
@Column(name = "add_user_id")
82+
private String addUserId;
83+
84+
@Column(name = "last_chg_time")
85+
private String lastChangeTime;
86+
87+
@Column(name = "last_chg_user_id")
88+
private String lastChangeUserId;
89+
90+
@Column(name = "version_ctrl_nbr")
91+
private String versionControlNumber;
92+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package gov.cdc.etldatapipeline.investigation.repository.model.reporting;
2+
3+
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
4+
import com.fasterxml.jackson.databind.annotation.JsonNaming;
5+
import lombok.Data;
6+
import lombok.NoArgsConstructor;
7+
import lombok.AllArgsConstructor;
8+
import lombok.NonNull;
9+
10+
@Data
11+
@NoArgsConstructor
12+
@AllArgsConstructor
13+
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
14+
public class TreatmentReportingKey {
15+
@NonNull
16+
private String treatmentUid;
17+
}

investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java

+40
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import gov.cdc.etldatapipeline.investigation.repository.model.dto.*;
77
import gov.cdc.etldatapipeline.investigation.repository.model.reporting.InvestigationKey;
88
import gov.cdc.etldatapipeline.investigation.repository.model.reporting.InvestigationReporting;
9+
import gov.cdc.etldatapipeline.investigation.repository.model.reporting.TreatmentReportingKey;
910
import gov.cdc.etldatapipeline.investigation.util.ProcessInvestigationDataUtil;
1011
import jakarta.persistence.EntityNotFoundException;
1112
import lombok.RequiredArgsConstructor;
@@ -60,6 +61,12 @@ public class InvestigationService {
6061
@Value("${spring.kafka.input.topic-name-vac}")
6162
private String vaccinationTopic;
6263

64+
@Value("${spring.kafka.input.topic-name-tmt}")
65+
private String treatmentTopic;
66+
67+
@Value("${spring.kafka.output.topic-name-treatment}")
68+
private String treatmentOutputTopicName;
69+
6370
@Value("${spring.kafka.output.topic-name-reporting}")
6471
private String investigationTopicReporting;
6572

@@ -77,6 +84,7 @@ public class InvestigationService {
7784
private final InterviewRepository interviewRepository;
7885
private final ContactRepository contactRepository;
7986
private final VaccinationRepository vaccinationRepository;
87+
private final TreatmentRepository treatmentRepository;
8088

8189
private final KafkaTemplate<String, String> kafkaTemplate;
8290
private final ProcessInvestigationDataUtil processDataUtil;
@@ -132,6 +140,8 @@ public void processMessage(ConsumerRecord<String, String> rec,
132140
processContact(message);
133141
} else if (topic.equals(vaccinationTopic)) {
134142
processVaccination(message);
143+
} else if (topic.equals(treatmentTopic)) {
144+
processTreatment(message);
135145
}
136146
consumer.commitSync();
137147
}
@@ -260,6 +270,36 @@ private void processVaccination(String value) {
260270
}
261271
}
262272

273+
private void processTreatment(String value) {
274+
String treatmentUid = "";
275+
try {
276+
treatmentUid = extractUid(value, "treatment_uid");
277+
278+
logger.info(topicDebugLog, "Treatment", treatmentUid, treatmentTopic);
279+
Optional<Treatment> treatmentData = treatmentRepository.computeTreatment(treatmentUid);
280+
if(treatmentData.isPresent()) {
281+
Treatment treatment = treatmentData.get();
282+
283+
// Using Treatment directly as the reporting object
284+
TreatmentReportingKey treatmentReportingKey = new TreatmentReportingKey(treatment.getTreatmentUid());
285+
286+
String jsonKey = jsonGenerator.generateStringJson(treatmentReportingKey);
287+
String jsonValue = jsonGenerator.generateStringJson(treatment);
288+
289+
kafkaTemplate.send(treatmentOutputTopicName, jsonKey, jsonValue)
290+
.whenComplete((res, e) -> logger.info("Treatment data (uid={}) sent to {}",
291+
treatment.getTreatmentUid(), treatmentOutputTopicName));
292+
293+
} else {
294+
throw new EntityNotFoundException("Unable to find treatment with id: " + treatmentUid);
295+
}
296+
} catch (EntityNotFoundException ex) {
297+
throw new NoDataException(ex.getMessage(), ex);
298+
} catch (Exception e) {
299+
throw new RuntimeException(errorMessage("Treatment", treatmentUid, e), e);
300+
}
301+
}
302+
263303
// This same method can be used for elastic search as well and that is why the generic model is present
264304
private CompletableFuture<SendResult<String, String>> pushKeyValuePairToKafka(InvestigationKey investigationKey, Object model, String topicName) {
265305
String jsonKey = jsonGenerator.generateStringJson(investigationKey);

investigation-service/src/main/resources/application.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ spring:
66
topic-name-int: nbs_Interview
77
topic-name-ctr: nbs_CT_contact
88
topic-name-vac: nbs_Intervention
9+
topic-name-tmt: nbs_Treatment
910
output:
1011
topic-name-reporting: nrt_investigation
1112
topic-name-confirmation: nrt_investigation_confirmation
@@ -21,6 +22,7 @@ spring:
2122
topic-name-contact-answer: nrt_contact_answer
2223
topic-name-vaccination: nrt_vaccination
2324
topic-name-vaccination-answer: nrt_vaccination_answer
25+
topic-name-treatment: nrt_treatment
2426
dlq:
2527
retry-suffix: _retry
2628
dlq-suffix: _dlt

investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ class InvestigationDataProcessingTests {
7777

7878
ProcessInvestigationDataUtil transformer;
7979

80+
81+
8082
@BeforeEach
8183
void setUp() {
8284
closeable = MockitoAnnotations.openMocks(this);

investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/controller/InvestigationControllerTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ public void tearDown() throws Exception {
5050
"/reporting/investigation-svc/notification",
5151
"/reporting/investigation-svc/interview",
5252
"/reporting/investigation-svc/contact",
53-
"/reporting/investigation-svc/vaccination"
53+
"/reporting/investigation-svc/vaccination",
54+
"/reporting/investigation-svc/treatment"
5455
})
5556
void testControllerMethods(String endpoint) throws Exception {
5657
String jsonData = "{\"key\":\"value\"}";

0 commit comments

Comments
 (0)