Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDE 2087: Preprocessing service treatment #195

Merged
merged 13 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 45 additions & 49 deletions db/upgrade/odse/routines/021-sp_treatment_event.sql
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ BEGIN
par1.subject_entity_uid AS provider_uid,
viewPatientKeys.treatment_uid AS patient_treatment_uid,
act2.target_act_uid AS morbidity_uid,
rx1.LOCAL_ID,
rx1.ADD_TIME,
rx1.ADD_USER_ID,
rx1.LAST_CHG_TIME,
rx1.LAST_CHG_USER_ID,
rx1.VERSION_CTRL_NBR
rx1.local_id,
rx1.add_time,
rx1.add_user_id,
rx1.last_chg_time,
rx1.last_chg_user_id,
rx1.version_ctrl_nbr
INTO #TREATMENT_UIDS
FROM NBS_ODSE.dbo.treatment AS rx1 WITH (NOLOCK)
INNER JOIN NBS_ODSE.dbo.Treatment_administered AS rx2 WITH (NOLOCK)
Expand Down Expand Up @@ -117,31 +117,27 @@ BEGIN
t.provider_uid,
t.patient_treatment_uid,
t.morbidity_uid,
rx1.cd_desc_txt AS Treatment_nm,
rx1.program_jurisdiction_oid AS Treatment_oid,
REPLACE(REPLACE(rx1.txt, CHAR(13) + CHAR(10), ' '), CHAR(10), ' ') AS Treatment_comments,
rx1.shared_ind AS Treatment_shared_ind,
rx1.cd_desc_txt AS treatment_name,
rx1.program_jurisdiction_oid AS treatment_oid,
REPLACE(REPLACE(rx1.txt, CHAR(13) + CHAR(10), ' '), CHAR(10), ' ') AS treatment_comments,
rx1.shared_ind AS treatment_shared_ind,
rx1.cd,
rx2.effective_from_time AS Treatment_dt,
rx2.cd AS Treatment_drug,
rx2.cd_desc_txt AS Treatment_drug_nm,
rx2.dose_qty AS Treatment_dosage_strength,
rx2.dose_qty_unit_cd AS Treatment_dosage_strength_unit,
rx2.interval_cd AS Treatment_frequency,
rx2.effective_duration_amt AS Treatment_duration,
rx2.effective_duration_unit_cd AS Treatment_duration_unit,
rx2.route_cd AS Treatment_route,
t.LOCAL_ID,
CASE
WHEN rx1.record_status_cd = '' THEN 'ACTIVE'
WHEN rx1.record_status_cd = 'LOG_DEL' THEN 'INACTIVE'
ELSE rx1.record_status_cd
END as record_status_cd,
t.ADD_TIME,
t.ADD_USER_ID,
t.LAST_CHG_TIME,
t.LAST_CHG_USER_ID,
t.VERSION_CTRL_NBR
rx2.effective_from_time AS treatment_date,
rx2.cd AS treatment_drug,
rx2.cd_desc_txt AS treatment_drug_name,
rx2.dose_qty AS treatment_dosage_strength,
rx2.dose_qty_unit_cd AS treatment_dosage_strength_unit,
rx2.interval_cd AS treatment_frequency,
rx2.effective_duration_amt AS treatment_duration,
rx2.effective_duration_unit_cd AS treatment_duration_unit,
rx2.route_cd AS treatment_route,
t.local_id,
dbo.fn_get_record_status(rx1.record_status_cd) as record_status_cd,
t.add_time,
t.add_user_id,
t.last_chg_time,
t.last_chg_user_id,
t.version_ctrl_nbr
INTO #TREATMENT_DETAILS
FROM #TREATMENT_UIDS t
INNER JOIN NBS_ODSE.dbo.treatment rx1 WITH (NOLOCK)
Expand Down Expand Up @@ -176,27 +172,27 @@ BEGIN
t.provider_uid,
t.morbidity_uid,
t.patient_treatment_uid,
t.Treatment_nm,
t.Treatment_oid,
t.Treatment_comments,
t.Treatment_shared_ind,
t.treatment_name,
t.treatment_oid,
t.treatment_comments,
t.treatment_shared_ind,
t.cd,
t.Treatment_dt,
t.Treatment_drug,
t.Treatment_drug_nm,
t.Treatment_dosage_strength,
t.Treatment_dosage_strength_unit,
t.Treatment_frequency,
t.Treatment_duration,
t.Treatment_duration_unit,
t.Treatment_route,
t.LOCAL_ID,
t.treatment_date,
t.treatment_drug,
t.treatment_drug_name,
t.treatment_dosage_strength,
t.treatment_dosage_strength_unit,
t.treatment_frequency,
t.treatment_duration,
t.treatment_duration_unit,
t.treatment_route,
t.local_id,
t.record_status_cd,
t.ADD_TIME,
t.ADD_USER_ID,
t.LAST_CHG_TIME,
t.LAST_CHG_USER_ID,
t.VERSION_CTRL_NBR
t.add_time,
t.add_user_id,
t.last_chg_time,
t.last_chg_user_id,
t.version_ctrl_nbr
FROM #TREATMENT_DETAILS t;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public class InvestigationController {
@Value("${spring.kafka.input.topic-name-vac}")
private String vaccinationTopic;

@Value("${spring.kafka.input.topic-name-tmt}")
private String treatmentTopic;


@GetMapping("/reporting/investigation-svc/status")
public ResponseEntity<String> getDataPipelineStatusHealth() {
Expand Down Expand Up @@ -60,4 +63,9 @@ public void postContact(@RequestBody String jsonData) {
public void postVaccination(@RequestBody String jsonData) {
producerService.sendMessage(vaccinationTopic, jsonData);
}

@PostMapping("/reporting/investigation-svc/treatment")
public void postTreatment(@RequestBody String jsonData)
{producerService.sendMessage(treatmentTopic, jsonData);}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package gov.cdc.etldatapipeline.investigation.repository;

import gov.cdc.etldatapipeline.investigation.repository.model.dto.Treatment;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;

import java.util.Optional;

public interface TreatmentRepository extends JpaRepository<Treatment, String> {

@Query(nativeQuery = true, value = "exec sp_treatment_event :treatment_uid")
Optional<Treatment> computeTreatment(@Param("treatment_uid") String treatmentUid);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package gov.cdc.etldatapipeline.investigation.repository.model.dto;

import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import lombok.Data;

@Entity
@Data
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public class Treatment {
@Id
@Column(name = "treatment_uid")
private String treatmentUid;

@Column(name = "public_health_case_uid")
private String publicHealthCaseUid;

@Column(name = "organization_uid")
private String organizationUid;

@Column(name = "provider_uid")
private String providerUid;

@Column(name = "patient_treatment_uid")
private String patientTreatmentUid;

@Column(name = "treatment_name")
private String treatmentName;

@Column(name = "treatment_oid")
private String treatmentOid;

@Column(name = "treatment_comments")
private String treatmentComments;

@Column(name = "treatment_shared_ind")
private String treatmentSharedInd;

@Column(name = "cd")
private String cd;

@Column(name = "treatment_date")
private String treatmentDate;

@Column(name = "treatment_drug")
private String treatmentDrug;

@Column(name = "treatment_drug_name")
private String treatmentDrugName;

@Column(name = "treatment_dosage_strength")
private String treatmentDosageStrength;

@Column(name = "treatment_dosage_strength_unit")
private String treatmentDosageStrengthUnit;

@Column(name = "treatment_frequency")
private String treatmentFrequency;

@Column(name = "treatment_duration")
private String treatmentDuration;

@Column(name = "treatment_duration_unit")
private String treatmentDurationUnit;

@Column(name = "treatment_route")
private String treatmentRoute;

@Column(name = "local_id")
private String localId;

@Column(name = "record_status_cd")
private String recordStatusCd;

@Column(name = "add_time")
private String addTime;

@Column(name = "add_user_id")
private String addUserId;

@Column(name = "last_chg_time")
private String lastChangeTime;

@Column(name = "last_chg_user_id")
private String lastChangeUserId;

@Column(name = "version_ctrl_nbr")
private String versionControlNumber;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package gov.cdc.etldatapipeline.investigation.repository.model.reporting;

import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
import lombok.NonNull;

@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public class TreatmentReportingKey {
@NonNull
private String treatmentUid;
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import gov.cdc.etldatapipeline.investigation.repository.model.dto.*;
import gov.cdc.etldatapipeline.investigation.repository.model.reporting.InvestigationKey;
import gov.cdc.etldatapipeline.investigation.repository.model.reporting.InvestigationReporting;
import gov.cdc.etldatapipeline.investigation.repository.model.reporting.TreatmentReportingKey;
import gov.cdc.etldatapipeline.investigation.util.ProcessInvestigationDataUtil;
import jakarta.persistence.EntityNotFoundException;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -60,6 +61,12 @@ public class InvestigationService {
@Value("${spring.kafka.input.topic-name-vac}")
private String vaccinationTopic;

@Value("${spring.kafka.input.topic-name-tmt}")
private String treatmentTopic;

@Value("${spring.kafka.output.topic-name-treatment}")
private String treatmentOutputTopicName;

@Value("${spring.kafka.output.topic-name-reporting}")
private String investigationTopicReporting;

Expand All @@ -77,6 +84,7 @@ public class InvestigationService {
private final InterviewRepository interviewRepository;
private final ContactRepository contactRepository;
private final VaccinationRepository vaccinationRepository;
private final TreatmentRepository treatmentRepository;

private final KafkaTemplate<String, String> kafkaTemplate;
private final ProcessInvestigationDataUtil processDataUtil;
Expand Down Expand Up @@ -132,6 +140,8 @@ public void processMessage(ConsumerRecord<String, String> rec,
processContact(message);
} else if (topic.equals(vaccinationTopic)) {
processVaccination(message);
} else if (topic.equals(treatmentTopic)) {
processTreatment(message);
}
consumer.commitSync();
}
Expand Down Expand Up @@ -260,6 +270,36 @@ private void processVaccination(String value) {
}
}

private void processTreatment(String value) {
String treatmentUid = "";
try {
treatmentUid = extractUid(value, "treatment_uid");

logger.info(topicDebugLog, "Treatment", treatmentUid, treatmentTopic);
Optional<Treatment> treatmentData = treatmentRepository.computeTreatment(treatmentUid);
if(treatmentData.isPresent()) {
Treatment treatment = treatmentData.get();

// Using Treatment directly as the reporting object
TreatmentReportingKey treatmentReportingKey = new TreatmentReportingKey(treatment.getTreatmentUid());

String jsonKey = jsonGenerator.generateStringJson(treatmentReportingKey);
String jsonValue = jsonGenerator.generateStringJson(treatment);

kafkaTemplate.send(treatmentOutputTopicName, jsonKey, jsonValue)
.whenComplete((res, e) -> logger.info("Treatment data (uid={}) sent to {}",
treatment.getTreatmentUid(), treatmentOutputTopicName));

} else {
throw new EntityNotFoundException("Unable to find treatment with id: " + treatmentUid);
}
} catch (EntityNotFoundException ex) {
throw new NoDataException(ex.getMessage(), ex);
} catch (Exception e) {
throw new RuntimeException(errorMessage("Treatment", treatmentUid, e), e);
}
}

// This same method can be used for elastic search as well and that is why the generic model is present
private CompletableFuture<SendResult<String, String>> pushKeyValuePairToKafka(InvestigationKey investigationKey, Object model, String topicName) {
String jsonKey = jsonGenerator.generateStringJson(investigationKey);
Expand Down
2 changes: 2 additions & 0 deletions investigation-service/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ spring:
topic-name-int: nbs_Interview
topic-name-ctr: nbs_CT_contact
topic-name-vac: nbs_Intervention
topic-name-tmt: nbs_Treatment
output:
topic-name-reporting: nrt_investigation
topic-name-confirmation: nrt_investigation_confirmation
Expand All @@ -21,6 +22,7 @@ spring:
topic-name-contact-answer: nrt_contact_answer
topic-name-vaccination: nrt_vaccination
topic-name-vaccination-answer: nrt_vaccination_answer
topic-name-treatment: nrt_treatment
dlq:
retry-suffix: _retry
dlq-suffix: _dlt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class InvestigationDataProcessingTests {

ProcessInvestigationDataUtil transformer;



@BeforeEach
void setUp() {
closeable = MockitoAnnotations.openMocks(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public void tearDown() throws Exception {
"/reporting/investigation-svc/notification",
"/reporting/investigation-svc/interview",
"/reporting/investigation-svc/contact",
"/reporting/investigation-svc/vaccination"
"/reporting/investigation-svc/vaccination",
"/reporting/investigation-svc/treatment"
})
void testControllerMethods(String endpoint) throws Exception {
String jsonData = "{\"key\":\"value\"}";
Expand Down
Loading