From d6b810bab50e8f10f76f8e17edc3740f90955baa Mon Sep 17 00:00:00 2001 From: Jayasudha Sundaram Date: Mon, 24 Feb 2025 18:47:39 -0500 Subject: [PATCH 1/9] CNDE-2087 pre processing service for treatment --- .../controller/InvestigationController.java | 8 + .../repository/TreatmentRepository.java | 14 ++ .../repository/model/dto/Treatment.java | 95 ++++++++ .../model/reporting/TreatmentReporting.java | 39 ++++ .../reporting/TreatmentReportingKey.java | 18 ++ .../service/InvestigationService.java | 37 +++- .../util/ProcessInvestigationDataUtil.java | 76 +++++++ .../src/main/resources/application.yaml | 3 + .../InvestigationDataProcessingTests.java | 184 ++++++++++++++++ .../InvestigationControllerTest.java | 3 +- .../service/InvestigationServiceTest.java | 204 +++++++++++++++++- 11 files changed, 666 insertions(+), 15 deletions(-) create mode 100644 investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/TreatmentRepository.java create mode 100644 investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/dto/Treatment.java create mode 100644 investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/TreatmentReporting.java create mode 100644 investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/TreatmentReportingKey.java diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/controller/InvestigationController.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/controller/InvestigationController.java index d303d37b..09c012dc 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/controller/InvestigationController.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/controller/InvestigationController.java @@ -26,6 +26,9 @@ public class InvestigationController { @Value("${spring.kafka.input.topic-name-ctr}") private String contactTopic; + @Value("${spring.kafka.input.topic-name-tmt}") + private String treatmentTopic; + @GetMapping("/reporting/investigation-svc/status") public ResponseEntity getDataPipelineStatusHealth() { @@ -52,4 +55,9 @@ public void postInterview(@RequestBody String jsonData) { public void postContact(@RequestBody String jsonData) { producerService.sendMessage(contactTopic, jsonData); } + + @PostMapping("/reporting/investigation-svc/treatment") + public void postTreatment(@RequestBody String jsonData) {producerService.sendMessage(treatmentTopic, jsonData);} + + } diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/TreatmentRepository.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/TreatmentRepository.java new file mode 100644 index 00000000..5274e9ac --- /dev/null +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/TreatmentRepository.java @@ -0,0 +1,14 @@ +package gov.cdc.etldatapipeline.investigation.repository; + +import gov.cdc.etldatapipeline.investigation.repository.model.dto.Treatment; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; + +import java.util.Optional; + +public interface TreatmentRepository extends JpaRepository { + + @Query(nativeQuery = true, value = "exec sp_treatment_event :treatment_uid") + Optional computeTreatment(@Param("treatment_uid") String treatmentUid); +} diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/dto/Treatment.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/dto/Treatment.java new file mode 100644 index 00000000..cd752522 --- /dev/null +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/dto/Treatment.java @@ -0,0 +1,95 @@ +package gov.cdc.etldatapipeline.investigation.repository.model.dto; + +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import lombok.Data; + +@Entity +@Data +public class Treatment { + @Id + @Column(name = "treatment_uid") + private String treatmentUid; + + @Column(name = "public_health_case_uid") + private String publicHealthCaseUid; + + @Column(name = "organization_uid") + private String organizationUid; + + @Column(name = "provider_uid") + private String providerUid; + + @Column(name = "patient_treatment_uid") + private String patientTreatmentUid; + + @Column(name = "Treatment_nm") + private String treatmentName; + + @Column(name = "Treatment_oid") + private String treatmentOid; + + @Column(name = "Treatment_comments") + private String treatmentComments; + + @Column(name = "Treatment_shared_ind") + private String treatmentSharedInd; + + @Column(name = "cd") + private String cd; + + @Column(name = "Treatment_dt") + private String treatmentDate; + + @Column(name = "Treatment_drug") + private String treatmentDrug; + + @Column(name = "Treatment_drug_nm") + private String treatmentDrugName; + + @Column(name = "Treatment_dosage_strength") + private String treatmentDosageStrength; + + @Column(name = "Treatment_dosage_strength_unit") + private String treatmentDosageStrengthUnit; + + @Column(name = "Treatment_frequency") + private String treatmentFrequency; + + @Column(name = "Treatment_duration") + private String treatmentDuration; + + @Column(name = "Treatment_duration_unit") + private String treatmentDurationUnit; + + @Column(name = "Treatment_route") + private String treatmentRoute; + + @Column(name = "LOCAL_ID") + private String localId; + + @Column(name = "record_status_cd") + private String recordStatusCd; + + @Column(name = "ADD_TIME") + private String addTime; + + @Column(name = "ADD_USER_ID") + private String addUserId; + + @Column(name = "LAST_CHG_TIME") + private String lastChangeTime; + + @Column(name = "LAST_CHG_USER_ID") + private String lastChangeUserId; + + @Column(name = "VERSION_CTRL_NBR") + private String versionControlNumber; + + @Column(name = "refresh_datetime") + private String refreshDatetime; + + @Column(name = "max_datetime") + private String maxDatetime; +} diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/TreatmentReporting.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/TreatmentReporting.java new file mode 100644 index 00000000..f21feb96 --- /dev/null +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/TreatmentReporting.java @@ -0,0 +1,39 @@ +package gov.cdc.etldatapipeline.investigation.repository.model.reporting; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonNaming; +import lombok.Data; +import java.time.LocalDateTime; + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) +public class TreatmentReporting { + private String treatmentUid; + private String publicHealthCaseUid; + private String organizationUid; + private String providerUid; + private String patientTreatmentUid; + private String treatmentName; + private String treatmentOid; + private String treatmentComments; + private String treatmentSharedInd; + private String cd; + private String treatmentDate; + private String treatmentDrug; + private String treatmentDrugName; + private String treatmentDosageStrength; + private String treatmentDosageStrengthUnit; + private String treatmentFrequency; + private String treatmentDuration; + private String treatmentDurationUnit; + private String treatmentRoute; + private String localId; + private String recordStatusCd; + private String addTime; + private String addUserId; + private String lastChangeTime; + private String lastChangeUserId; + private String versionControlNumber; +} \ No newline at end of file diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/TreatmentReportingKey.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/TreatmentReportingKey.java new file mode 100644 index 00000000..a786e370 --- /dev/null +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/TreatmentReportingKey.java @@ -0,0 +1,18 @@ +package gov.cdc.etldatapipeline.investigation.repository.model.reporting; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.AllArgsConstructor; +import lombok.NonNull; + +import java.time.LocalDateTime; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class TreatmentReportingKey { + @NonNull + @JsonProperty("treatment_uid") + private String treatmentUid; +} diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java index 866dae68..e676ef28 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java @@ -2,13 +2,10 @@ import gov.cdc.etldatapipeline.commonutil.NoDataException; import gov.cdc.etldatapipeline.commonutil.json.CustomJsonGeneratorImpl; -import gov.cdc.etldatapipeline.investigation.repository.ContactRepository; -import gov.cdc.etldatapipeline.investigation.repository.InterviewRepository; +import gov.cdc.etldatapipeline.investigation.repository.*; import gov.cdc.etldatapipeline.investigation.repository.model.dto.*; -import gov.cdc.etldatapipeline.investigation.repository.InvestigationRepository; import gov.cdc.etldatapipeline.investigation.repository.model.reporting.InvestigationKey; import gov.cdc.etldatapipeline.investigation.repository.model.reporting.InvestigationReporting; -import gov.cdc.etldatapipeline.investigation.repository.NotificationRepository; import gov.cdc.etldatapipeline.investigation.util.ProcessInvestigationDataUtil; import jakarta.persistence.EntityNotFoundException; import lombok.RequiredArgsConstructor; @@ -63,6 +60,9 @@ public class InvestigationService { @Value("${spring.kafka.output.topic-name-reporting}") private String investigationTopicReporting; + @Value("${spring.kafka.output.topic-name-tmt}") + private String treatmentTopic; + @Value("${featureFlag.phc-datamart-enable}") private boolean phcDatamartEnable; @@ -72,10 +72,14 @@ public class InvestigationService { @Value("${featureFlag.contact-record-enable}") public boolean contactRecordEnable; + @Value("${featureFlag.treatment-enable}") + public boolean treatmentEnable; + private final InvestigationRepository investigationRepository; private final NotificationRepository notificationRepository; private final InterviewRepository interviewRepository; private final ContactRepository contactRepository; + private final TreatmentRepository treatmentRepository; private final KafkaTemplate kafkaTemplate; private final ProcessInvestigationDataUtil processDataUtil; @@ -107,7 +111,8 @@ public class InvestigationService { "${spring.kafka.input.topic-name-phc}", "${spring.kafka.input.topic-name-ntf}", "${spring.kafka.input.topic-name-int}", - "${spring.kafka.input.topic-name-ctr}" + "${spring.kafka.input.topic-name-ctr}", + "${spring.kafka.input.topic-name-tmt}" } ) public void processMessage(String message, @@ -122,6 +127,8 @@ public void processMessage(String message, processInterview(message); } else if (topic.equals(contactTopic) && contactRecordEnable) { processContact(message); + } else if (topic.equals(treatmentTopic) && treatmentEnable) { + processTreatment(message); } consumer.commitSync(); } @@ -229,6 +236,26 @@ private void processContact(String value) { } } + private void processTreatment(String value) { + String treatmentUid = ""; + try { + treatmentUid = extractUid(value, "treatment_uid"); + + logger.info(topicDebugLog, "Treatment", treatmentUid, treatmentTopic); + Optional treatmentData = treatmentRepository.computeTreatment(treatmentUid); + if(treatmentData.isPresent()) { + Treatment treatment = treatmentData.get(); + processDataUtil.processTreatment(treatment); + } else { + throw new EntityNotFoundException("Unable to find treatment with id: " + treatmentUid); + } + } catch (EntityNotFoundException ex) { + throw new NoDataException(ex.getMessage(), ex); + } catch (Exception e) { + throw new RuntimeException(errorMessage("Treatment", treatmentUid, e), e); + } + } + // This same method can be used for elastic search as well and that is why the generic model is present private CompletableFuture> pushKeyValuePairToKafka(InvestigationKey investigationKey, Object model, String topicName) { String jsonKey = jsonGenerator.generateStringJson(investigationKey); diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/util/ProcessInvestigationDataUtil.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/util/ProcessInvestigationDataUtil.java index 64f96b25..85f1b5cd 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/util/ProcessInvestigationDataUtil.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/util/ProcessInvestigationDataUtil.java @@ -62,6 +62,9 @@ public class ProcessInvestigationDataUtil { @Value("${spring.kafka.output.topic-name-interview-note}") private String interviewNoteOutputTopicName; + @Value("${spring.kafka.output.topic-name-treatment}") + private String treatmentOutputTopicName; + @Value("${spring.kafka.output.topic-name-rdb-metadata-columns}") private String rdbMetadataColumnsOutputTopicName; @@ -658,6 +661,79 @@ private ContactReporting transformContact(Contact contact) { return contactReporting; } + /** + * Process treatment data and send to Kafka topic + * @param treatment Entity bean returned from stored procedures + */ + public void processTreatment(Treatment treatment) { + try { + // Tombstone message to delete all treatment records for specified treatment uid + TreatmentReportingKey treatmentReportingKey = new TreatmentReportingKey(treatment.getTreatmentUid()); + String jsonKeyDel = jsonGenerator.generateStringJson(treatmentReportingKey); + logger.info(TOMBSTONE_MSG_SENT, "treatment", "treatment", treatment.getTreatmentUid()); + + kafkaTemplate.send(treatmentOutputTopicName, jsonKeyDel, null) + .whenComplete((res, e) -> logger.info(TOMBSTONE_MSG_ACCEPTED, "treatment")) + .thenRunAsync(() -> { + try { + // Transform and send the treatment data + TreatmentReporting treatmentReporting = transformTreatment(treatment); + + String jsonKey = jsonGenerator.generateStringJson(treatmentReportingKey); + String jsonValue = jsonGenerator.generateStringJson(treatmentReporting); + + kafkaTemplate.send(treatmentOutputTopicName, jsonKey, jsonValue) + .whenComplete((res, e) -> logger.info("Treatment data (uid={}) sent to {}", + treatment.getTreatmentUid(), treatmentOutputTopicName)); + + } catch (Exception e) { + logger.error("Error processing Treatment data: {}", e.getMessage()); + throw new RuntimeException(e); + } + }); + + } catch (IllegalArgumentException ex) { + logger.info(ex.getMessage(), "Treatment"); + } catch (Exception e) { + logger.error("Error processing Treatment data: {}", e.getMessage()); + throw e; + } + } + + private TreatmentReporting transformTreatment(Treatment treatment) { + TreatmentReporting treatmentReporting = new TreatmentReporting(); + + treatmentReporting.setTreatmentUid(treatment.getTreatmentUid()); + treatmentReporting.setPublicHealthCaseUid(treatment.getPublicHealthCaseUid()); + treatmentReporting.setOrganizationUid(treatment.getOrganizationUid()); + treatmentReporting.setProviderUid(treatment.getProviderUid()); + treatmentReporting.setPatientTreatmentUid(treatment.getPatientTreatmentUid()); + treatmentReporting.setTreatmentName(treatment.getTreatmentName()); + treatmentReporting.setTreatmentOid(treatment.getTreatmentOid()); + treatmentReporting.setTreatmentComments(treatment.getTreatmentComments()); + treatmentReporting.setTreatmentSharedInd(treatment.getTreatmentSharedInd()); + treatmentReporting.setCd(treatment.getCd()); + treatmentReporting.setTreatmentDate(treatment.getTreatmentDate()); + treatmentReporting.setTreatmentDrug(treatment.getTreatmentDrug()); + treatmentReporting.setTreatmentDrugName(treatment.getTreatmentDrugName()); + treatmentReporting.setTreatmentDosageStrength(treatment.getTreatmentDosageStrength()); + treatmentReporting.setTreatmentDosageStrengthUnit(treatment.getTreatmentDosageStrengthUnit()); + treatmentReporting.setTreatmentFrequency(treatment.getTreatmentFrequency()); + treatmentReporting.setTreatmentDuration(treatment.getTreatmentDuration()); + treatmentReporting.setTreatmentDurationUnit(treatment.getTreatmentDurationUnit()); + treatmentReporting.setTreatmentRoute(treatment.getTreatmentRoute()); + treatmentReporting.setLocalId(treatment.getLocalId()); + treatmentReporting.setRecordStatusCd(treatment.getRecordStatusCd()); + treatmentReporting.setAddTime(treatment.getAddTime()); + treatmentReporting.setAddUserId(treatment.getAddUserId()); + treatmentReporting.setLastChangeTime(treatment.getLastChangeTime()); + treatmentReporting.setLastChangeUserId(treatment.getLastChangeUserId()); + treatmentReporting.setVersionControlNumber(treatment.getVersionControlNumber()); + + return treatmentReporting; + } + + /** * Parse and send RDB metadata column information sourced from the odse nbs_rdb_metadata * To a generic kafka topic to handle all types of rdb column metadata diff --git a/investigation-service/src/main/resources/application.yaml b/investigation-service/src/main/resources/application.yaml index 962845a0..75dd2798 100644 --- a/investigation-service/src/main/resources/application.yaml +++ b/investigation-service/src/main/resources/application.yaml @@ -5,6 +5,7 @@ spring: topic-name-ntf: nbs_Notification topic-name-int: nbs_Interview topic-name-ctr: nbs_CT_contact + topic-name-tmt: nbs_Treatment output: topic-name-reporting: nrt_investigation topic-name-confirmation: nrt_investigation_confirmation @@ -18,6 +19,7 @@ spring: topic-name-rdb-metadata-columns: nrt_metadata_columns topic-name-contact: nrt_contact topic-name-contact-answer: nrt_contact_answer + topic-name-treatment: nrt_treatment dlq: retry-suffix: _retry dlq-suffix: _dlt @@ -47,5 +49,6 @@ featureFlag: phc-datamart-enable: ${FF_PHC_DM_ENABLE:false} bmird-case-enable: ${FF_BMIRD_CASE_ENABLE:false} contact-record-enable: ${FF_CONTACT_RECORD_ENABLE:false} + treatment-enable: ${FF_TREATMENT_ENABLE:false} server: port: '8093' \ No newline at end of file diff --git a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java index 649dfb35..2b2431e2 100644 --- a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java +++ b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.awaitility.Awaitility; +import org.springframework.kafka.support.SendResult; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -63,12 +64,16 @@ class InvestigationDataProcessingTests { private static final String RDB_METADATA_COLS_TOPIC = "rdbMetadataColsTopic"; private static final String CONTACT_TOPIC = "contactTopic"; private static final String CONTACT_ANSWERS_TOPIC = "contactAnswersTopic"; + private static final String TREATMENT_TOPIC = "treatmentTopic"; private static final Long INVESTIGATION_UID = 234567890L; private static final Long INTERVIEW_UID = 234567890L; private static final Long CONTACT_UID = 12345678L; + private static final Long TREATMENT_UID = 34567890L; private static final String INVALID_JSON = "invalidJSON"; ProcessInvestigationDataUtil transformer; + + @BeforeEach void setUp() { closeable = MockitoAnnotations.openMocks(this); @@ -886,4 +891,183 @@ private ContactAnswer constructContactAnswers() { contactAnswer.setRdbColumnNm("CTT_EXPOSURE_TYPE"); return contactAnswer; } + + @Test + void testProcessTreatment() throws JsonProcessingException { + // Set up topic name + transformer.setTreatmentOutputTopicName(TREATMENT_TOPIC); + + // Create valid treatment object + Treatment treatment = constructTreatment(); + + // Set up the expected key and value + final TreatmentReportingKey treatmentReportingKey = new TreatmentReportingKey(); + treatmentReportingKey.setTreatmentUid(treatment.getTreatmentUid()); + final TreatmentReporting treatmentReportingValue = constructTreatmentReporting(); + + // Create a CompletableFuture that we can manually complete + CompletableFuture> future = new CompletableFuture<>(); + + // Mock the Kafka template send methods - both with value and with null (tombstone) + when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(future); + when(kafkaTemplate.send(anyString(), anyString(), isNull())).thenReturn(future); + + // Process the treatment + transformer.processTreatment(treatment); + + // Complete the future to trigger the thenRunAsync callbacks + future.complete(null); + + // Use Awaitility to wait for async operations to complete + Awaitility.await() + .atMost(1, TimeUnit.SECONDS) + .untilAsserted(() -> + verify(kafkaTemplate, times(2)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()) + ); + + // Verify both calls were to the correct topic + assertEquals(TREATMENT_TOPIC, topicCaptor.getAllValues().get(0)); + assertEquals(TREATMENT_TOPIC, topicCaptor.getAllValues().get(1)); + + // Verify first call was tombstone message (null value) + assertNull(messageCaptor.getAllValues().get(0)); + + // Verify the keys + String firstKey = keyCaptor.getAllValues().get(0); + String secondKey = keyCaptor.getAllValues().get(1); + + var firstKeyObj = objectMapper.readValue( + objectMapper.readTree(firstKey).path("payload").toString(), + TreatmentReportingKey.class); + var secondKeyObj = objectMapper.readValue( + objectMapper.readTree(secondKey).path("payload").toString(), + TreatmentReportingKey.class); + + assertEquals(treatmentReportingKey, firstKeyObj); + assertEquals(treatmentReportingKey, secondKeyObj); + + // Verify the treatment data in second call + String treatmentJson = messageCaptor.getAllValues().get(1); + var actualTreatmentValue = objectMapper.readValue( + objectMapper.readTree(treatmentJson).path("payload").toString(), + TreatmentReporting.class); + + assertEquals(treatmentReportingValue, actualTreatmentValue); + } + + /* @Test + void testProcessTreatmentError() { + Treatment treatment = new Treatment(); + treatment.setTreatmentUid(INVALID_JSON); + + + transformer.setTreatmentOutputTopicName(TREATMENT_TOPIC); + + when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(CompletableFuture.completedFuture(null)); + transformer.processTreatment(treatment); + + verify(kafkaTemplate, times(1)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); + + ILoggingEvent log = listAppender.list.getLast(); + assertTrue(log.getFormattedMessage().contains(INVALID_JSON)); + }*/ + + @Test + void testProcessTreatmentError() { + // Set up topic name + transformer.setTreatmentOutputTopicName(TREATMENT_TOPIC); + + // Create a Treatment with a valid UID but with invalid fields + Treatment treatment = new Treatment(); + treatment.setTreatmentUid(TREATMENT_UID.toString()); + // Intentionally don't set other fields to simulate error scenario + + // Create a CompletableFuture that we can manually complete + CompletableFuture> future = new CompletableFuture<>(); + + // Mock the Kafka template send methods + when(kafkaTemplate.send(anyString(), anyString(), isNull())).thenReturn(future); + when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(future); + + // Process treatment - should handle the error gracefully + transformer.processTreatment(treatment); + + // Complete the future to trigger async operations + future.complete(null); + + // Verify only tombstone message was sent (only 1 Kafka call) + verify(kafkaTemplate, times(1)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); + + // Verify it was the tombstone message + assertEquals(TREATMENT_TOPIC, topicCaptor.getValue()); + assertNull(messageCaptor.getValue()); + + // Verify the key contains the treatment UID + String capturedKey = keyCaptor.getValue(); + assertTrue(capturedKey.contains(TREATMENT_UID.toString()), + "Key should contain treatment UID: " + TREATMENT_UID); + } + + + private Treatment constructTreatment() { + Treatment treatment = new Treatment(); + treatment.setTreatmentUid(TREATMENT_UID.toString()); + treatment.setPublicHealthCaseUid("12345"); + treatment.setOrganizationUid("67890"); + treatment.setProviderUid("11111"); + treatment.setPatientTreatmentUid("22222"); + treatment.setTreatmentName("Test Treatment"); + treatment.setTreatmentOid("33333"); + treatment.setTreatmentComments("Test Comments"); + treatment.setTreatmentSharedInd("Y"); + treatment.setCd("TEST_CD"); + treatment.setTreatmentDate("2024-01-01T10:00:00"); + treatment.setTreatmentDrug("Drug123"); + treatment.setTreatmentDrugName("Test Drug"); + treatment.setTreatmentDosageStrength("100"); + treatment.setTreatmentDosageStrengthUnit("mg"); + treatment.setTreatmentFrequency("Daily"); + treatment.setTreatmentDuration("7"); + treatment.setTreatmentDurationUnit("days"); + treatment.setTreatmentRoute("Oral"); + treatment.setLocalId("LOC123"); + treatment.setRecordStatusCd("Active"); + treatment.setAddTime("2024-01-01T10:00:00"); + treatment.setAddUserId("44444"); + treatment.setLastChangeTime("2024-01-01T10:00:00"); + treatment.setLastChangeUserId("55555"); + treatment.setVersionControlNumber("1"); + return treatment; + } + + private TreatmentReporting constructTreatmentReporting() { + TreatmentReporting treatmentReporting = new TreatmentReporting(); + treatmentReporting.setTreatmentUid(TREATMENT_UID.toString()); + treatmentReporting.setPublicHealthCaseUid("12345"); + treatmentReporting.setOrganizationUid("67890"); + treatmentReporting.setProviderUid("11111"); + treatmentReporting.setPatientTreatmentUid("22222"); + treatmentReporting.setTreatmentName("Test Treatment"); + treatmentReporting.setTreatmentOid("33333"); + treatmentReporting.setTreatmentComments("Test Comments"); + treatmentReporting.setTreatmentSharedInd("Y"); + treatmentReporting.setCd("TEST_CD"); + treatmentReporting.setTreatmentDate("2024-01-01T10:00:00"); + treatmentReporting.setTreatmentDrug("Drug123"); + treatmentReporting.setTreatmentDrugName("Test Drug"); + treatmentReporting.setTreatmentDosageStrength("100"); + treatmentReporting.setTreatmentDosageStrengthUnit("mg"); + treatmentReporting.setTreatmentFrequency("Daily"); + treatmentReporting.setTreatmentDuration("7"); + treatmentReporting.setTreatmentDurationUnit("days"); + treatmentReporting.setTreatmentRoute("Oral"); + treatmentReporting.setLocalId("LOC123"); + treatmentReporting.setRecordStatusCd("Active"); + treatmentReporting.setAddTime("2024-01-01T10:00:00"); + treatmentReporting.setAddUserId("44444"); + treatmentReporting.setLastChangeTime("2024-01-01T10:00:00"); + treatmentReporting.setLastChangeUserId("55555"); + treatmentReporting.setVersionControlNumber("1"); + return treatmentReporting; + } } diff --git a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/controller/InvestigationControllerTest.java b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/controller/InvestigationControllerTest.java index 4e0e309b..a3e0b33d 100644 --- a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/controller/InvestigationControllerTest.java +++ b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/controller/InvestigationControllerTest.java @@ -49,7 +49,8 @@ public void tearDown() throws Exception { "/reporting/investigation-svc/investigation", "/reporting/investigation-svc/notification", "/reporting/investigation-svc/interview", - "/reporting/investigation-svc/contact" + "/reporting/investigation-svc/contact", + "/reporting/investigation-svc/treatment" }) void testControllerMethods(String endpoint) throws Exception { String jsonData = "{\"key\":\"value\"}"; diff --git a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java index 404a4688..5f2c5d7d 100644 --- a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java +++ b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java @@ -3,15 +3,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import gov.cdc.etldatapipeline.commonutil.NoDataException; -import gov.cdc.etldatapipeline.investigation.repository.ContactRepository; -import gov.cdc.etldatapipeline.investigation.repository.InterviewRepository; -import gov.cdc.etldatapipeline.investigation.repository.model.dto.Contact; -import gov.cdc.etldatapipeline.investigation.repository.model.dto.Interview; -import gov.cdc.etldatapipeline.investigation.repository.model.dto.NotificationUpdate; -import gov.cdc.etldatapipeline.investigation.repository.InvestigationRepository; -import gov.cdc.etldatapipeline.investigation.repository.model.dto.Investigation; +import gov.cdc.etldatapipeline.investigation.repository.*; +import gov.cdc.etldatapipeline.investigation.repository.model.dto.*; import gov.cdc.etldatapipeline.investigation.repository.model.reporting.*; -import gov.cdc.etldatapipeline.investigation.repository.NotificationRepository; import gov.cdc.etldatapipeline.investigation.util.ProcessInvestigationDataUtil; import org.apache.kafka.clients.consumer.MockConsumer; import org.awaitility.Awaitility; @@ -20,6 +14,7 @@ import org.junit.jupiter.api.Test; import org.mockito.*; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; import java.util.List; import java.util.NoSuchElementException; @@ -48,6 +43,9 @@ class InvestigationServiceTest { @Mock private ContactRepository contactRepository; + @Mock + private TreatmentRepository treatmentRepository; + @Mock KafkaTemplate kafkaTemplate; @@ -73,11 +71,13 @@ class InvestigationServiceTest { private final String notificationTopic = "Notification"; private final String interviewTopic = "Interview"; private final String contactTopic = "Contact"; + private final String treatmentTopic = "Treatment"; //output topics private final String investigationTopicOutput = "InvestigationOutput"; private final String notificationTopicOutput = "investigationNotification"; private final String interviewTopicOutput = "InterviewOutput"; private final String contactTopicOutput = "ContactOutput"; + private final String treatmentTopicOutput = "TreatmentOutput"; @BeforeEach @@ -85,7 +85,7 @@ void setUp() { closeable = MockitoAnnotations.openMocks(this); ProcessInvestigationDataUtil transformer = new ProcessInvestigationDataUtil(kafkaTemplate, investigationRepository); - investigationService = new InvestigationService(investigationRepository, notificationRepository, interviewRepository, contactRepository, kafkaTemplate, transformer); + investigationService = new InvestigationService(investigationRepository, notificationRepository, interviewRepository, contactRepository,treatmentRepository, kafkaTemplate, transformer); investigationService.setPhcDatamartEnable(true); investigationService.setBmirdCaseEnable(true); @@ -95,6 +95,7 @@ void setUp() { investigationService.setInvestigationTopicReporting(investigationTopicOutput); investigationService.setInterviewTopic(interviewTopic); investigationService.setContactTopic(contactTopic); + investigationService.setTreatmentTopic(treatmentTopic); transformer.setInvestigationConfirmationOutputTopicName("investigationConfirmation"); transformer.setInvestigationObservationOutputTopicName("investigationObservation"); @@ -106,6 +107,8 @@ void setUp() { transformer.setInterviewAnswerOutputTopicName("interviewAnswer"); transformer.setInterviewNoteOutputTopicName("interviewNote"); transformer.setRdbMetadataColumnsOutputTopicName("metadataColumns"); + transformer.setTreatmentOutputTopicName(treatmentTopicOutput); + investigationService.setTreatmentEnable(true); } @AfterEach @@ -574,4 +577,187 @@ private ContactReporting constructContactReporting(Long contactUid) { return contactReporting; } + /* @Test + void testProcessTreatmentMessage() throws JsonProcessingException { + Long treatmentUid = 234567890L; + String payload = "{\"payload\": {\"after\": {\"treatment_uid\": \"" + treatmentUid + "\"}}}"; + + final Treatment treatment = constructTreatment(treatmentUid); + when(treatmentRepository.computeTreatment(String.valueOf(treatmentUid))).thenReturn(Optional.of(treatment)); + when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(CompletableFuture.completedFuture(null)); + + investigationService.processMessage(payload, treatmentTopic, consumer); + + final TreatmentReportingKey treatmentReportingKey = new TreatmentReportingKey(); + treatmentReportingKey.setTreatmentUid(String.valueOf(treatmentUid)); + + final TreatmentReporting treatmentReportingValue = constructTreatmentReporting(treatmentUid); + + Awaitility.await() + .atMost(1, TimeUnit.SECONDS) + .untilAsserted(() -> + verify(kafkaTemplate, times(1)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()) + ); + + String actualTopic = topicCaptor.getAllValues().getFirst(); + String actualKey = keyCaptor.getAllValues().getFirst(); + String actualValue = messageCaptor.getAllValues().getFirst(); + + var actualTreatmentKey = objectMapper.readValue( + objectMapper.readTree(actualKey).path("payload").toString(), TreatmentReportingKey.class); + var actualTreatmentValue = objectMapper.readValue( + objectMapper.readTree(actualValue).path("payload").toString(), TreatmentReporting.class); + + assertEquals(treatmentTopicOutput, actualTopic); + assertEquals(treatmentReportingKey, actualTreatmentKey); + assertEquals(treatmentReportingValue, actualTreatmentValue); + + verify(treatmentRepository).computeTreatment(String.valueOf(treatmentUid)); + + + } */ + + @Test + void testProcessTreatmentMessage() throws JsonProcessingException { + Long treatmentUid = 234567890L; + String payload = "{\"payload\": {\"after\": {\"treatment_uid\": \"" + treatmentUid + "\"}}}"; + + // Create valid treatment + final Treatment treatment = constructTreatment(treatmentUid); + + // Set up mock repository + when(treatmentRepository.computeTreatment(String.valueOf(treatmentUid))).thenReturn(Optional.of(treatment)); + + // Create a CompletableFuture that we can manually complete + CompletableFuture> future = new CompletableFuture<>(); + + // Mock Kafka operations + when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(future); + when(kafkaTemplate.send(anyString(), anyString(), isNull())).thenReturn(future); + + // Process the message + investigationService.processMessage(payload, treatmentTopic, consumer); + + // Complete the future to trigger async operations + future.complete(null); + + // Use Awaitility to wait for async operations + Awaitility.await() + .atMost(1, TimeUnit.SECONDS) + .untilAsserted(() -> + verify(kafkaTemplate, times(2)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()) + ); + + // Verify repository was called with correct ID + verify(treatmentRepository).computeTreatment(String.valueOf(treatmentUid)); + + // Verify tombstone message (first message) + assertEquals(treatmentTopicOutput, topicCaptor.getAllValues().get(0)); + assertNull(messageCaptor.getAllValues().get(0)); + + // Verify treatment data message (second message) + assertEquals(treatmentTopicOutput, topicCaptor.getAllValues().get(1)); + + // Extract and verify the message content + String treatmentJson = messageCaptor.getAllValues().get(1); + TreatmentReporting actualTreatment = objectMapper.readValue( + objectMapper.readTree(treatmentJson).path("payload").toString(), + TreatmentReporting.class); + + // Create expected treatment reporting object + TreatmentReporting expectedTreatment = constructTreatmentReporting(treatmentUid); + + // Compare actual to expected + assertEquals(expectedTreatment, actualTreatment); + } + + @Test + void testProcessTreatmentMessageWhenFeatureDisabled() { + Long treatmentUid = 234567890L; + String payload = "{\"payload\": {\"after\": {\"treatment_uid\": \"" + treatmentUid + "\"}}}"; + + final Treatment treatment = constructTreatment(treatmentUid); + when(treatmentRepository.computeTreatment(String.valueOf(treatmentUid))).thenReturn(Optional.of(treatment)); + + investigationService.setTreatmentEnable(false); + investigationService.processMessage(payload, treatmentTopic, consumer); + verify(kafkaTemplate, never()).send(anyString(), anyString(), anyString()); + } + + @Test + void testProcessTreatmentException() { + String invalidPayload = "{\"payload\": {\"after\": {}}}"; + RuntimeException ex = assertThrows(RuntimeException.class, + () -> investigationService.processMessage(invalidPayload, treatmentTopic, consumer)); + assertEquals(NoSuchElementException.class, ex.getCause().getClass()); + } + + @Test + void testProcessTreatmentNoDataException() { + String payload = "{\"payload\": {\"after\": {\"treatment_uid\": \"\"}}}"; + assertThrows(NoDataException.class, () -> investigationService.processMessage(payload, treatmentTopic, consumer)); + } + + private Treatment constructTreatment(Long treatmentUid) { + Treatment treatment = new Treatment(); + treatment.setTreatmentUid(String.valueOf(treatmentUid)); + treatment.setPublicHealthCaseUid("12345"); + treatment.setOrganizationUid("67890"); + treatment.setProviderUid("11111"); + treatment.setPatientTreatmentUid("22222"); + treatment.setTreatmentName("Test Treatment"); + treatment.setTreatmentOid("33333"); + treatment.setTreatmentComments("Test Comments"); + treatment.setTreatmentSharedInd("Y"); + treatment.setCd("TEST_CD"); + treatment.setTreatmentDate("2024-01-01T10:00:00"); + treatment.setTreatmentDrug("Drug123"); + treatment.setTreatmentDrugName("Test Drug"); + treatment.setTreatmentDosageStrength("100"); + treatment.setTreatmentDosageStrengthUnit("mg"); + treatment.setTreatmentFrequency("Daily"); + treatment.setTreatmentDuration("7"); + treatment.setTreatmentDurationUnit("days"); + treatment.setTreatmentRoute("Oral"); + treatment.setLocalId("LOC123"); + treatment.setRecordStatusCd("Active"); + treatment.setAddTime("2024-01-01T10:00:00"); + treatment.setAddUserId("44444"); + treatment.setLastChangeTime("2024-01-01T10:00:00"); + treatment.setLastChangeUserId("55555"); + treatment.setVersionControlNumber("1"); + return treatment; + } + + private TreatmentReporting constructTreatmentReporting(Long treatmentUid) { + TreatmentReporting treatmentReporting = new TreatmentReporting(); + treatmentReporting.setTreatmentUid(treatmentUid.toString()); + treatmentReporting.setPublicHealthCaseUid("12345"); + treatmentReporting.setOrganizationUid("67890"); + treatmentReporting.setProviderUid("11111"); + treatmentReporting.setPatientTreatmentUid("22222"); + treatmentReporting.setTreatmentName("Test Treatment"); + treatmentReporting.setTreatmentOid("33333"); + treatmentReporting.setTreatmentComments("Test Comments"); + treatmentReporting.setTreatmentSharedInd("Y"); + treatmentReporting.setCd("TEST_CD"); + treatmentReporting.setTreatmentDate("2024-01-01T10:00:00"); + treatmentReporting.setTreatmentDrug("Drug123"); + treatmentReporting.setTreatmentDrugName("Test Drug"); + treatmentReporting.setTreatmentDosageStrength("100"); + treatmentReporting.setTreatmentDosageStrengthUnit("mg"); + treatmentReporting.setTreatmentFrequency("Daily"); + treatmentReporting.setTreatmentDuration("7"); + treatmentReporting.setTreatmentDurationUnit("days"); + treatmentReporting.setTreatmentRoute("Oral"); + treatmentReporting.setLocalId("LOC123"); + treatmentReporting.setRecordStatusCd("Active"); + treatmentReporting.setAddTime("2024-01-01T10:00:00"); + treatmentReporting.setAddUserId("44444"); + treatmentReporting.setLastChangeTime("2024-01-01T10:00:00"); + treatmentReporting.setLastChangeUserId("55555"); + treatmentReporting.setVersionControlNumber("1"); + return treatmentReporting; + } + } \ No newline at end of file From f35179f5612456d24ec6ee31475ef9f2cb0af7d3 Mon Sep 17 00:00:00 2001 From: Jayasudha Sundaram Date: Mon, 24 Feb 2025 18:53:24 -0500 Subject: [PATCH 2/9] removed commented test --- .../InvestigationDataProcessingTests.java | 40 -------------- .../service/InvestigationServiceTest.java | 54 ------------------- 2 files changed, 94 deletions(-) diff --git a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java index 2b2431e2..9db98053 100644 --- a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java +++ b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java @@ -894,45 +894,34 @@ private ContactAnswer constructContactAnswers() { @Test void testProcessTreatment() throws JsonProcessingException { - // Set up topic name transformer.setTreatmentOutputTopicName(TREATMENT_TOPIC); - // Create valid treatment object Treatment treatment = constructTreatment(); - // Set up the expected key and value final TreatmentReportingKey treatmentReportingKey = new TreatmentReportingKey(); treatmentReportingKey.setTreatmentUid(treatment.getTreatmentUid()); final TreatmentReporting treatmentReportingValue = constructTreatmentReporting(); - // Create a CompletableFuture that we can manually complete CompletableFuture> future = new CompletableFuture<>(); - // Mock the Kafka template send methods - both with value and with null (tombstone) when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(future); when(kafkaTemplate.send(anyString(), anyString(), isNull())).thenReturn(future); - // Process the treatment transformer.processTreatment(treatment); - // Complete the future to trigger the thenRunAsync callbacks future.complete(null); - // Use Awaitility to wait for async operations to complete Awaitility.await() .atMost(1, TimeUnit.SECONDS) .untilAsserted(() -> verify(kafkaTemplate, times(2)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()) ); - // Verify both calls were to the correct topic assertEquals(TREATMENT_TOPIC, topicCaptor.getAllValues().get(0)); assertEquals(TREATMENT_TOPIC, topicCaptor.getAllValues().get(1)); - // Verify first call was tombstone message (null value) assertNull(messageCaptor.getAllValues().get(0)); - // Verify the keys String firstKey = keyCaptor.getAllValues().get(0); String secondKey = keyCaptor.getAllValues().get(1); @@ -946,7 +935,6 @@ void testProcessTreatment() throws JsonProcessingException { assertEquals(treatmentReportingKey, firstKeyObj); assertEquals(treatmentReportingKey, secondKeyObj); - // Verify the treatment data in second call String treatmentJson = messageCaptor.getAllValues().get(1); var actualTreatmentValue = objectMapper.readValue( objectMapper.readTree(treatmentJson).path("payload").toString(), @@ -954,55 +942,27 @@ void testProcessTreatment() throws JsonProcessingException { assertEquals(treatmentReportingValue, actualTreatmentValue); } - - /* @Test - void testProcessTreatmentError() { - Treatment treatment = new Treatment(); - treatment.setTreatmentUid(INVALID_JSON); - - - transformer.setTreatmentOutputTopicName(TREATMENT_TOPIC); - - when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(CompletableFuture.completedFuture(null)); - transformer.processTreatment(treatment); - - verify(kafkaTemplate, times(1)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); - - ILoggingEvent log = listAppender.list.getLast(); - assertTrue(log.getFormattedMessage().contains(INVALID_JSON)); - }*/ - @Test void testProcessTreatmentError() { - // Set up topic name transformer.setTreatmentOutputTopicName(TREATMENT_TOPIC); - // Create a Treatment with a valid UID but with invalid fields Treatment treatment = new Treatment(); treatment.setTreatmentUid(TREATMENT_UID.toString()); - // Intentionally don't set other fields to simulate error scenario - // Create a CompletableFuture that we can manually complete CompletableFuture> future = new CompletableFuture<>(); - // Mock the Kafka template send methods when(kafkaTemplate.send(anyString(), anyString(), isNull())).thenReturn(future); when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(future); - // Process treatment - should handle the error gracefully transformer.processTreatment(treatment); - // Complete the future to trigger async operations future.complete(null); - // Verify only tombstone message was sent (only 1 Kafka call) verify(kafkaTemplate, times(1)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); - // Verify it was the tombstone message assertEquals(TREATMENT_TOPIC, topicCaptor.getValue()); assertNull(messageCaptor.getValue()); - // Verify the key contains the treatment UID String capturedKey = keyCaptor.getValue(); assertTrue(capturedKey.contains(TREATMENT_UID.toString()), "Key should contain treatment UID: " + TREATMENT_UID); diff --git a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java index 5f2c5d7d..03f20b6f 100644 --- a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java +++ b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java @@ -576,98 +576,44 @@ private ContactReporting constructContactReporting(Long contactUid) { contactReporting.setDispositionedByUid(123L); return contactReporting; } - - /* @Test - void testProcessTreatmentMessage() throws JsonProcessingException { - Long treatmentUid = 234567890L; - String payload = "{\"payload\": {\"after\": {\"treatment_uid\": \"" + treatmentUid + "\"}}}"; - - final Treatment treatment = constructTreatment(treatmentUid); - when(treatmentRepository.computeTreatment(String.valueOf(treatmentUid))).thenReturn(Optional.of(treatment)); - when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(CompletableFuture.completedFuture(null)); - - investigationService.processMessage(payload, treatmentTopic, consumer); - - final TreatmentReportingKey treatmentReportingKey = new TreatmentReportingKey(); - treatmentReportingKey.setTreatmentUid(String.valueOf(treatmentUid)); - - final TreatmentReporting treatmentReportingValue = constructTreatmentReporting(treatmentUid); - - Awaitility.await() - .atMost(1, TimeUnit.SECONDS) - .untilAsserted(() -> - verify(kafkaTemplate, times(1)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()) - ); - - String actualTopic = topicCaptor.getAllValues().getFirst(); - String actualKey = keyCaptor.getAllValues().getFirst(); - String actualValue = messageCaptor.getAllValues().getFirst(); - - var actualTreatmentKey = objectMapper.readValue( - objectMapper.readTree(actualKey).path("payload").toString(), TreatmentReportingKey.class); - var actualTreatmentValue = objectMapper.readValue( - objectMapper.readTree(actualValue).path("payload").toString(), TreatmentReporting.class); - - assertEquals(treatmentTopicOutput, actualTopic); - assertEquals(treatmentReportingKey, actualTreatmentKey); - assertEquals(treatmentReportingValue, actualTreatmentValue); - - verify(treatmentRepository).computeTreatment(String.valueOf(treatmentUid)); - - - } */ - @Test void testProcessTreatmentMessage() throws JsonProcessingException { Long treatmentUid = 234567890L; String payload = "{\"payload\": {\"after\": {\"treatment_uid\": \"" + treatmentUid + "\"}}}"; - // Create valid treatment final Treatment treatment = constructTreatment(treatmentUid); - // Set up mock repository when(treatmentRepository.computeTreatment(String.valueOf(treatmentUid))).thenReturn(Optional.of(treatment)); - // Create a CompletableFuture that we can manually complete CompletableFuture> future = new CompletableFuture<>(); - // Mock Kafka operations when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(future); when(kafkaTemplate.send(anyString(), anyString(), isNull())).thenReturn(future); - // Process the message investigationService.processMessage(payload, treatmentTopic, consumer); - // Complete the future to trigger async operations future.complete(null); - // Use Awaitility to wait for async operations Awaitility.await() .atMost(1, TimeUnit.SECONDS) .untilAsserted(() -> verify(kafkaTemplate, times(2)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()) ); - // Verify repository was called with correct ID verify(treatmentRepository).computeTreatment(String.valueOf(treatmentUid)); - // Verify tombstone message (first message) assertEquals(treatmentTopicOutput, topicCaptor.getAllValues().get(0)); assertNull(messageCaptor.getAllValues().get(0)); - // Verify treatment data message (second message) assertEquals(treatmentTopicOutput, topicCaptor.getAllValues().get(1)); - // Extract and verify the message content String treatmentJson = messageCaptor.getAllValues().get(1); TreatmentReporting actualTreatment = objectMapper.readValue( objectMapper.readTree(treatmentJson).path("payload").toString(), TreatmentReporting.class); - // Create expected treatment reporting object TreatmentReporting expectedTreatment = constructTreatmentReporting(treatmentUid); - // Compare actual to expected assertEquals(expectedTreatment, actualTreatment); } From fb0a197591f40c2041b8069296460206e37866eb Mon Sep 17 00:00:00 2001 From: Jayasudha Sundaram Date: Tue, 25 Feb 2025 12:48:45 -0500 Subject: [PATCH 3/9] updated the junit for error scenario --- .../InvestigationDataProcessingTests.java | 32 ++++++++----------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java index 9db98053..6f824e0e 100644 --- a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java +++ b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java @@ -942,30 +942,24 @@ void testProcessTreatment() throws JsonProcessingException { assertEquals(treatmentReportingValue, actualTreatmentValue); } + @Test void testProcessTreatmentError() { - transformer.setTreatmentOutputTopicName(TREATMENT_TOPIC); - Treatment treatment = new Treatment(); - treatment.setTreatmentUid(TREATMENT_UID.toString()); - - CompletableFuture> future = new CompletableFuture<>(); - - when(kafkaTemplate.send(anyString(), anyString(), isNull())).thenReturn(future); - when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(future); - - transformer.processTreatment(treatment); - - future.complete(null); - - verify(kafkaTemplate, times(1)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); + // Setup to throw an exception during processing,no uid set + transformer.setTreatmentOutputTopicName(TREATMENT_TOPIC); + when(kafkaTemplate.send(anyString(), anyString(), isNull())).thenReturn(CompletableFuture.completedFuture(null)); + try { + transformer.processTreatment(treatment); + } catch (Exception e) { + } + List logs = listAppender.list; + boolean foundErrorLog = logs.stream() + .map(ILoggingEvent::getFormattedMessage) + .anyMatch(m -> m.contains("Error processing Treatment data")); - assertEquals(TREATMENT_TOPIC, topicCaptor.getValue()); - assertNull(messageCaptor.getValue()); + assertTrue(foundErrorLog, "Error processing Treatment data"); - String capturedKey = keyCaptor.getValue(); - assertTrue(capturedKey.contains(TREATMENT_UID.toString()), - "Key should contain treatment UID: " + TREATMENT_UID); } From 47aaec062d99e7c536906ff72b55880055464645 Mon Sep 17 00:00:00 2001 From: Jayasudha Sundaram Date: Fri, 28 Feb 2025 14:32:35 -0500 Subject: [PATCH 4/9] Addressed the review comments --- .../odse/routines/021-sp_treatment_event.sql | 90 ++++++------ .../tables/042-create_nrt_treatment.sql | 40 +++--- .../repository/model/dto/Treatment.java | 47 +++---- .../model/reporting/TreatmentReporting.java | 39 ----- .../reporting/TreatmentReportingKey.java | 6 +- .../service/InvestigationService.java | 16 ++- .../util/ProcessInvestigationDataUtil.java | 77 ---------- .../InvestigationDataProcessingTests.java | 133 ------------------ .../service/InvestigationServiceTest.java | 65 ++------- .../routines/026-sp_treatment_event-001.sql | 90 ++++++------ .../tables/042-create_nrt_treatment-001.sql | 40 +++--- 11 files changed, 184 insertions(+), 459 deletions(-) delete mode 100644 investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/TreatmentReporting.java diff --git a/db/upgrade/odse/routines/021-sp_treatment_event.sql b/db/upgrade/odse/routines/021-sp_treatment_event.sql index 169d6dfa..8493c08a 100644 --- a/db/upgrade/odse/routines/021-sp_treatment_event.sql +++ b/db/upgrade/odse/routines/021-sp_treatment_event.sql @@ -46,12 +46,12 @@ BEGIN par.subject_entity_uid AS organization_uid, par1.subject_entity_uid AS provider_uid, viewPatientKeys.treatment_uid AS patient_treatment_uid, - rx1.LOCAL_ID, - rx1.ADD_TIME, - rx1.ADD_USER_ID, - rx1.LAST_CHG_TIME, - rx1.LAST_CHG_USER_ID, - rx1.VERSION_CTRL_NBR + rx1.local_id, + rx1.add_time, + rx1.add_user_id, + rx1.last_chg_time, + rx1.last_chg_user_id, + rx1.version_ctrl_nbr INTO #TREATMENT_UIDS FROM NBS_ODSE.dbo.treatment AS rx1 WITH (NOLOCK) INNER JOIN NBS_ODSE.dbo.Treatment_administered AS rx2 WITH (NOLOCK) @@ -110,31 +110,31 @@ BEGIN t.organization_uid, t.provider_uid, t.patient_treatment_uid, - rx1.cd_desc_txt AS Treatment_nm, - rx1.program_jurisdiction_oid AS Treatment_oid, - REPLACE(REPLACE(rx1.txt, CHAR(13) + CHAR(10), ' '), CHAR(10), ' ') AS Treatment_comments, - rx1.shared_ind AS Treatment_shared_ind, + rx1.cd_desc_txt AS treatment_name, + rx1.program_jurisdiction_oid AS treatment_oid, + REPLACE(REPLACE(rx1.txt, CHAR(13) + CHAR(10), ' '), CHAR(10), ' ') AS treatment_comments, + rx1.shared_ind AS treatment_shared_ind, rx1.cd, - rx2.effective_from_time AS Treatment_dt, - rx2.cd AS Treatment_drug, - rx2.cd_desc_txt AS Treatment_drug_nm, - rx2.dose_qty AS Treatment_dosage_strength, - rx2.dose_qty_unit_cd AS Treatment_dosage_strength_unit, - rx2.interval_cd AS Treatment_frequency, - rx2.effective_duration_amt AS Treatment_duration, - rx2.effective_duration_unit_cd AS Treatment_duration_unit, - rx2.route_cd AS Treatment_route, - t.LOCAL_ID, + rx2.effective_from_time AS treatment_date, + rx2.cd AS treatment_drug, + rx2.cd_desc_txt AS treatment_drug_name, + rx2.dose_qty AS treatment_dosage_strength, + rx2.dose_qty_unit_cd AS treatment_dosage_strength_unit, + rx2.interval_cd AS treatment_frequency, + rx2.effective_duration_amt AS treatment_duration, + rx2.effective_duration_unit_cd AS treatment_duration_unit, + rx2.route_cd AS treatment_route, + t.local_id, CASE WHEN rx1.record_status_cd = '' THEN 'ACTIVE' WHEN rx1.record_status_cd = 'LOG_DEL' THEN 'INACTIVE' ELSE rx1.record_status_cd END as record_status_cd, - t.ADD_TIME, - t.ADD_USER_ID, - t.LAST_CHG_TIME, - t.LAST_CHG_USER_ID, - t.VERSION_CTRL_NBR + t.add_time, + t.add_user_id, + t.last_chg_time, + t.last_chg_user_id, + t.version_ctrl_nbr INTO #TREATMENT_DETAILS FROM #TREATMENT_UIDS t INNER JOIN NBS_ODSE.dbo.treatment rx1 WITH (NOLOCK) @@ -168,27 +168,27 @@ BEGIN t.organization_uid, t.provider_uid, t.patient_treatment_uid, - t.Treatment_nm, - t.Treatment_oid, - t.Treatment_comments, - t.Treatment_shared_ind, + t.treatment_name, + t.treatment_oid, + t.treatment_comments, + t.treatment_shared_ind, t.cd, - t.Treatment_dt, - t.Treatment_drug, - t.Treatment_drug_nm, - t.Treatment_dosage_strength, - t.Treatment_dosage_strength_unit, - t.Treatment_frequency, - t.Treatment_duration, - t.Treatment_duration_unit, - t.Treatment_route, - t.LOCAL_ID, + t.treatment_date, + t.treatment_drug, + t.treatment_drug_name, + t.treatment_dosage_strength, + t.treatment_dosage_strength_unit, + t.treatment_frequency, + t.treatment_duration, + t.treatment_duration_unit, + t.treatment_route, + t.local_id, t.record_status_cd, - t.ADD_TIME, - t.ADD_USER_ID, - t.LAST_CHG_TIME, - t.LAST_CHG_USER_ID, - t.VERSION_CTRL_NBR + t.add_time, + t.add_user_id, + t.last_chg_time, + t.last_chg_user_id, + t.version_ctrl_nbr FROM #TREATMENT_DETAILS t; @@ -251,4 +251,4 @@ BEGIN return @ErrorMessage; END CATCH -END; +END; \ No newline at end of file diff --git a/db/upgrade/rdb_modern/tables/042-create_nrt_treatment.sql b/db/upgrade/rdb_modern/tables/042-create_nrt_treatment.sql index afaa9a77..b629cafd 100644 --- a/db/upgrade/rdb_modern/tables/042-create_nrt_treatment.sql +++ b/db/upgrade/rdb_modern/tables/042-create_nrt_treatment.sql @@ -9,28 +9,28 @@ CREATE TABLE dbo.nrt_treatment organization_uid varchar(100) NULL, provider_uid varchar(100) NULL, patient_treatment_uid varchar(100) NULL, - Treatment_nm varchar(500) NULL, - Treatment_oid varchar(100) NULL, - Treatment_comments varchar(500) NULL, - Treatment_shared_ind varchar(100) NULL, + treatment_name varchar(500) NULL, + treatment_oid varchar(100) NULL, + treatment_comments varchar(500) NULL, + treatment_shared_ind varchar(100) NULL, cd varchar(100) NULL, - Treatment_dt datetime NULL, - Treatment_drug varchar(100) NULL, - Treatment_drug_nm varchar(500) NULL, - Treatment_dosage_strength varchar(100) NULL, - Treatment_dosage_strength_unit varchar(100) NULL, - Treatment_frequency varchar(100) NULL, - Treatment_duration varchar(100) NULL, - Treatment_duration_unit varchar(100) NULL, - Treatment_route varchar(100) NULL, - LOCAL_ID varchar(100) NULL, + treatment_date datetime NULL, + treatment_drug varchar(100) NULL, + treatment_drug_name varchar(500) NULL, + treatment_dosage_strength varchar(100) NULL, + treatment_dosage_strength_unit varchar(100) NULL, + treatment_frequency varchar(100) NULL, + treatment_duration varchar(100) NULL, + treatment_duration_unit varchar(100) NULL, + treatment_route varchar(100) NULL, + local_id varchar(100) NULL, record_status_cd varchar(100) NULL, - ADD_TIME datetime NULL, - ADD_USER_ID varchar(100) NULL, - LAST_CHG_TIME datetime NULL, - LAST_CHG_USER_ID varchar(100) NULL, - VERSION_CTRL_NBR varchar(100) NULL, + add_time datetime NULL, + add_user_id varchar(100) NULL, + last_chg_time datetime NULL, + last_chg_user_id varchar(100) NULL, + version_ctrl_nbr varchar(100) NULL, refresh_datetime datetime2(7) GENERATED ALWAYS AS ROW START NOT NULL, max_datetime datetime2(7) GENERATED ALWAYS AS ROW END HIDDEN NOT NULL, PERIOD FOR SYSTEM_TIME (refresh_datetime, max_datetime) -) +) \ No newline at end of file diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/dto/Treatment.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/dto/Treatment.java index cd752522..56f24d57 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/dto/Treatment.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/dto/Treatment.java @@ -1,5 +1,7 @@ package gov.cdc.etldatapipeline.investigation.repository.model.dto; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonNaming; import jakarta.persistence.Column; import jakarta.persistence.Entity; import jakarta.persistence.Id; @@ -7,6 +9,7 @@ @Entity @Data +@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) public class Treatment { @Id @Column(name = "treatment_uid") @@ -24,72 +27,66 @@ public class Treatment { @Column(name = "patient_treatment_uid") private String patientTreatmentUid; - @Column(name = "Treatment_nm") + @Column(name = "treatment_name") private String treatmentName; - @Column(name = "Treatment_oid") + @Column(name = "treatment_oid") private String treatmentOid; - @Column(name = "Treatment_comments") + @Column(name = "treatment_comments") private String treatmentComments; - @Column(name = "Treatment_shared_ind") + @Column(name = "treatment_shared_ind") private String treatmentSharedInd; @Column(name = "cd") private String cd; - @Column(name = "Treatment_dt") + @Column(name = "treatment_date") private String treatmentDate; - @Column(name = "Treatment_drug") + @Column(name = "treatment_drug") private String treatmentDrug; - @Column(name = "Treatment_drug_nm") + @Column(name = "treatment_drug_name") private String treatmentDrugName; - @Column(name = "Treatment_dosage_strength") + @Column(name = "treatment_dosage_strength") private String treatmentDosageStrength; - @Column(name = "Treatment_dosage_strength_unit") + @Column(name = "treatment_dosage_strength_unit") private String treatmentDosageStrengthUnit; - @Column(name = "Treatment_frequency") + @Column(name = "treatment_frequency") private String treatmentFrequency; - @Column(name = "Treatment_duration") + @Column(name = "treatment_duration") private String treatmentDuration; - @Column(name = "Treatment_duration_unit") + @Column(name = "treatment_duration_unit") private String treatmentDurationUnit; - @Column(name = "Treatment_route") + @Column(name = "treatment_route") private String treatmentRoute; - @Column(name = "LOCAL_ID") + @Column(name = "local_id") private String localId; @Column(name = "record_status_cd") private String recordStatusCd; - @Column(name = "ADD_TIME") + @Column(name = "add_time") private String addTime; - @Column(name = "ADD_USER_ID") + @Column(name = "add_user_id") private String addUserId; - @Column(name = "LAST_CHG_TIME") + @Column(name = "last_chg_time") private String lastChangeTime; - @Column(name = "LAST_CHG_USER_ID") + @Column(name = "last_chg_user_id") private String lastChangeUserId; - @Column(name = "VERSION_CTRL_NBR") + @Column(name = "version_ctrl_nbr") private String versionControlNumber; - - @Column(name = "refresh_datetime") - private String refreshDatetime; - - @Column(name = "max_datetime") - private String maxDatetime; } diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/TreatmentReporting.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/TreatmentReporting.java deleted file mode 100644 index f21feb96..00000000 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/TreatmentReporting.java +++ /dev/null @@ -1,39 +0,0 @@ -package gov.cdc.etldatapipeline.investigation.repository.model.reporting; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.databind.PropertyNamingStrategies; -import com.fasterxml.jackson.databind.annotation.JsonNaming; -import lombok.Data; -import java.time.LocalDateTime; - -@Data -@JsonIgnoreProperties(ignoreUnknown = true) -@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) -public class TreatmentReporting { - private String treatmentUid; - private String publicHealthCaseUid; - private String organizationUid; - private String providerUid; - private String patientTreatmentUid; - private String treatmentName; - private String treatmentOid; - private String treatmentComments; - private String treatmentSharedInd; - private String cd; - private String treatmentDate; - private String treatmentDrug; - private String treatmentDrugName; - private String treatmentDosageStrength; - private String treatmentDosageStrengthUnit; - private String treatmentFrequency; - private String treatmentDuration; - private String treatmentDurationUnit; - private String treatmentRoute; - private String localId; - private String recordStatusCd; - private String addTime; - private String addUserId; - private String lastChangeTime; - private String lastChangeUserId; - private String versionControlNumber; -} \ No newline at end of file diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/TreatmentReportingKey.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/TreatmentReportingKey.java index a786e370..3b043d54 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/TreatmentReportingKey.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/TreatmentReportingKey.java @@ -1,18 +1,18 @@ package gov.cdc.etldatapipeline.investigation.repository.model.reporting; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonNaming; import lombok.Data; import lombok.NoArgsConstructor; import lombok.AllArgsConstructor; import lombok.NonNull; -import java.time.LocalDateTime; - @Data @NoArgsConstructor @AllArgsConstructor +@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) public class TreatmentReportingKey { @NonNull - @JsonProperty("treatment_uid") private String treatmentUid; } diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java index e676ef28..ab2c739e 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java @@ -6,6 +6,7 @@ import gov.cdc.etldatapipeline.investigation.repository.model.dto.*; import gov.cdc.etldatapipeline.investigation.repository.model.reporting.InvestigationKey; import gov.cdc.etldatapipeline.investigation.repository.model.reporting.InvestigationReporting; +import gov.cdc.etldatapipeline.investigation.repository.model.reporting.TreatmentReportingKey; import gov.cdc.etldatapipeline.investigation.util.ProcessInvestigationDataUtil; import jakarta.persistence.EntityNotFoundException; import lombok.RequiredArgsConstructor; @@ -63,6 +64,9 @@ public class InvestigationService { @Value("${spring.kafka.output.topic-name-tmt}") private String treatmentTopic; + @Value("${spring.kafka.output.topic-name-treatment}") + private String treatmentOutputTopicName; + @Value("${featureFlag.phc-datamart-enable}") private boolean phcDatamartEnable; @@ -245,7 +249,17 @@ private void processTreatment(String value) { Optional treatmentData = treatmentRepository.computeTreatment(treatmentUid); if(treatmentData.isPresent()) { Treatment treatment = treatmentData.get(); - processDataUtil.processTreatment(treatment); + + // Using Treatment directly as the reporting object + TreatmentReportingKey treatmentReportingKey = new TreatmentReportingKey(treatment.getTreatmentUid()); + + String jsonKey = jsonGenerator.generateStringJson(treatmentReportingKey); + String jsonValue = jsonGenerator.generateStringJson(treatment); + + kafkaTemplate.send(treatmentOutputTopicName, jsonKey, jsonValue) + .whenComplete((res, e) -> logger.info("Treatment data (uid={}) sent to {}", + treatment.getTreatmentUid(), treatmentOutputTopicName)); + } else { throw new EntityNotFoundException("Unable to find treatment with id: " + treatmentUid); } diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/util/ProcessInvestigationDataUtil.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/util/ProcessInvestigationDataUtil.java index 85f1b5cd..319da9fd 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/util/ProcessInvestigationDataUtil.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/util/ProcessInvestigationDataUtil.java @@ -62,9 +62,6 @@ public class ProcessInvestigationDataUtil { @Value("${spring.kafka.output.topic-name-interview-note}") private String interviewNoteOutputTopicName; - @Value("${spring.kafka.output.topic-name-treatment}") - private String treatmentOutputTopicName; - @Value("${spring.kafka.output.topic-name-rdb-metadata-columns}") private String rdbMetadataColumnsOutputTopicName; @@ -660,80 +657,6 @@ private ContactReporting transformContact(Contact contact) { contactReporting.setDispositionedByUid(contact.getDispositionedByUid()); return contactReporting; } - - /** - * Process treatment data and send to Kafka topic - * @param treatment Entity bean returned from stored procedures - */ - public void processTreatment(Treatment treatment) { - try { - // Tombstone message to delete all treatment records for specified treatment uid - TreatmentReportingKey treatmentReportingKey = new TreatmentReportingKey(treatment.getTreatmentUid()); - String jsonKeyDel = jsonGenerator.generateStringJson(treatmentReportingKey); - logger.info(TOMBSTONE_MSG_SENT, "treatment", "treatment", treatment.getTreatmentUid()); - - kafkaTemplate.send(treatmentOutputTopicName, jsonKeyDel, null) - .whenComplete((res, e) -> logger.info(TOMBSTONE_MSG_ACCEPTED, "treatment")) - .thenRunAsync(() -> { - try { - // Transform and send the treatment data - TreatmentReporting treatmentReporting = transformTreatment(treatment); - - String jsonKey = jsonGenerator.generateStringJson(treatmentReportingKey); - String jsonValue = jsonGenerator.generateStringJson(treatmentReporting); - - kafkaTemplate.send(treatmentOutputTopicName, jsonKey, jsonValue) - .whenComplete((res, e) -> logger.info("Treatment data (uid={}) sent to {}", - treatment.getTreatmentUid(), treatmentOutputTopicName)); - - } catch (Exception e) { - logger.error("Error processing Treatment data: {}", e.getMessage()); - throw new RuntimeException(e); - } - }); - - } catch (IllegalArgumentException ex) { - logger.info(ex.getMessage(), "Treatment"); - } catch (Exception e) { - logger.error("Error processing Treatment data: {}", e.getMessage()); - throw e; - } - } - - private TreatmentReporting transformTreatment(Treatment treatment) { - TreatmentReporting treatmentReporting = new TreatmentReporting(); - - treatmentReporting.setTreatmentUid(treatment.getTreatmentUid()); - treatmentReporting.setPublicHealthCaseUid(treatment.getPublicHealthCaseUid()); - treatmentReporting.setOrganizationUid(treatment.getOrganizationUid()); - treatmentReporting.setProviderUid(treatment.getProviderUid()); - treatmentReporting.setPatientTreatmentUid(treatment.getPatientTreatmentUid()); - treatmentReporting.setTreatmentName(treatment.getTreatmentName()); - treatmentReporting.setTreatmentOid(treatment.getTreatmentOid()); - treatmentReporting.setTreatmentComments(treatment.getTreatmentComments()); - treatmentReporting.setTreatmentSharedInd(treatment.getTreatmentSharedInd()); - treatmentReporting.setCd(treatment.getCd()); - treatmentReporting.setTreatmentDate(treatment.getTreatmentDate()); - treatmentReporting.setTreatmentDrug(treatment.getTreatmentDrug()); - treatmentReporting.setTreatmentDrugName(treatment.getTreatmentDrugName()); - treatmentReporting.setTreatmentDosageStrength(treatment.getTreatmentDosageStrength()); - treatmentReporting.setTreatmentDosageStrengthUnit(treatment.getTreatmentDosageStrengthUnit()); - treatmentReporting.setTreatmentFrequency(treatment.getTreatmentFrequency()); - treatmentReporting.setTreatmentDuration(treatment.getTreatmentDuration()); - treatmentReporting.setTreatmentDurationUnit(treatment.getTreatmentDurationUnit()); - treatmentReporting.setTreatmentRoute(treatment.getTreatmentRoute()); - treatmentReporting.setLocalId(treatment.getLocalId()); - treatmentReporting.setRecordStatusCd(treatment.getRecordStatusCd()); - treatmentReporting.setAddTime(treatment.getAddTime()); - treatmentReporting.setAddUserId(treatment.getAddUserId()); - treatmentReporting.setLastChangeTime(treatment.getLastChangeTime()); - treatmentReporting.setLastChangeUserId(treatment.getLastChangeUserId()); - treatmentReporting.setVersionControlNumber(treatment.getVersionControlNumber()); - - return treatmentReporting; - } - - /** * Parse and send RDB metadata column information sourced from the odse nbs_rdb_metadata * To a generic kafka topic to handle all types of rdb column metadata diff --git a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java index 6f824e0e..e782bc65 100644 --- a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java +++ b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java @@ -891,137 +891,4 @@ private ContactAnswer constructContactAnswers() { contactAnswer.setRdbColumnNm("CTT_EXPOSURE_TYPE"); return contactAnswer; } - - @Test - void testProcessTreatment() throws JsonProcessingException { - transformer.setTreatmentOutputTopicName(TREATMENT_TOPIC); - - Treatment treatment = constructTreatment(); - - final TreatmentReportingKey treatmentReportingKey = new TreatmentReportingKey(); - treatmentReportingKey.setTreatmentUid(treatment.getTreatmentUid()); - final TreatmentReporting treatmentReportingValue = constructTreatmentReporting(); - - CompletableFuture> future = new CompletableFuture<>(); - - when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(future); - when(kafkaTemplate.send(anyString(), anyString(), isNull())).thenReturn(future); - - transformer.processTreatment(treatment); - - future.complete(null); - - Awaitility.await() - .atMost(1, TimeUnit.SECONDS) - .untilAsserted(() -> - verify(kafkaTemplate, times(2)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()) - ); - - assertEquals(TREATMENT_TOPIC, topicCaptor.getAllValues().get(0)); - assertEquals(TREATMENT_TOPIC, topicCaptor.getAllValues().get(1)); - - assertNull(messageCaptor.getAllValues().get(0)); - - String firstKey = keyCaptor.getAllValues().get(0); - String secondKey = keyCaptor.getAllValues().get(1); - - var firstKeyObj = objectMapper.readValue( - objectMapper.readTree(firstKey).path("payload").toString(), - TreatmentReportingKey.class); - var secondKeyObj = objectMapper.readValue( - objectMapper.readTree(secondKey).path("payload").toString(), - TreatmentReportingKey.class); - - assertEquals(treatmentReportingKey, firstKeyObj); - assertEquals(treatmentReportingKey, secondKeyObj); - - String treatmentJson = messageCaptor.getAllValues().get(1); - var actualTreatmentValue = objectMapper.readValue( - objectMapper.readTree(treatmentJson).path("payload").toString(), - TreatmentReporting.class); - - assertEquals(treatmentReportingValue, actualTreatmentValue); - } - - @Test - void testProcessTreatmentError() { - Treatment treatment = new Treatment(); - // Setup to throw an exception during processing,no uid set - transformer.setTreatmentOutputTopicName(TREATMENT_TOPIC); - when(kafkaTemplate.send(anyString(), anyString(), isNull())).thenReturn(CompletableFuture.completedFuture(null)); - try { - transformer.processTreatment(treatment); - } catch (Exception e) { - } - List logs = listAppender.list; - boolean foundErrorLog = logs.stream() - .map(ILoggingEvent::getFormattedMessage) - .anyMatch(m -> m.contains("Error processing Treatment data")); - - assertTrue(foundErrorLog, "Error processing Treatment data"); - - } - - - private Treatment constructTreatment() { - Treatment treatment = new Treatment(); - treatment.setTreatmentUid(TREATMENT_UID.toString()); - treatment.setPublicHealthCaseUid("12345"); - treatment.setOrganizationUid("67890"); - treatment.setProviderUid("11111"); - treatment.setPatientTreatmentUid("22222"); - treatment.setTreatmentName("Test Treatment"); - treatment.setTreatmentOid("33333"); - treatment.setTreatmentComments("Test Comments"); - treatment.setTreatmentSharedInd("Y"); - treatment.setCd("TEST_CD"); - treatment.setTreatmentDate("2024-01-01T10:00:00"); - treatment.setTreatmentDrug("Drug123"); - treatment.setTreatmentDrugName("Test Drug"); - treatment.setTreatmentDosageStrength("100"); - treatment.setTreatmentDosageStrengthUnit("mg"); - treatment.setTreatmentFrequency("Daily"); - treatment.setTreatmentDuration("7"); - treatment.setTreatmentDurationUnit("days"); - treatment.setTreatmentRoute("Oral"); - treatment.setLocalId("LOC123"); - treatment.setRecordStatusCd("Active"); - treatment.setAddTime("2024-01-01T10:00:00"); - treatment.setAddUserId("44444"); - treatment.setLastChangeTime("2024-01-01T10:00:00"); - treatment.setLastChangeUserId("55555"); - treatment.setVersionControlNumber("1"); - return treatment; - } - - private TreatmentReporting constructTreatmentReporting() { - TreatmentReporting treatmentReporting = new TreatmentReporting(); - treatmentReporting.setTreatmentUid(TREATMENT_UID.toString()); - treatmentReporting.setPublicHealthCaseUid("12345"); - treatmentReporting.setOrganizationUid("67890"); - treatmentReporting.setProviderUid("11111"); - treatmentReporting.setPatientTreatmentUid("22222"); - treatmentReporting.setTreatmentName("Test Treatment"); - treatmentReporting.setTreatmentOid("33333"); - treatmentReporting.setTreatmentComments("Test Comments"); - treatmentReporting.setTreatmentSharedInd("Y"); - treatmentReporting.setCd("TEST_CD"); - treatmentReporting.setTreatmentDate("2024-01-01T10:00:00"); - treatmentReporting.setTreatmentDrug("Drug123"); - treatmentReporting.setTreatmentDrugName("Test Drug"); - treatmentReporting.setTreatmentDosageStrength("100"); - treatmentReporting.setTreatmentDosageStrengthUnit("mg"); - treatmentReporting.setTreatmentFrequency("Daily"); - treatmentReporting.setTreatmentDuration("7"); - treatmentReporting.setTreatmentDurationUnit("days"); - treatmentReporting.setTreatmentRoute("Oral"); - treatmentReporting.setLocalId("LOC123"); - treatmentReporting.setRecordStatusCd("Active"); - treatmentReporting.setAddTime("2024-01-01T10:00:00"); - treatmentReporting.setAddUserId("44444"); - treatmentReporting.setLastChangeTime("2024-01-01T10:00:00"); - treatmentReporting.setLastChangeUserId("55555"); - treatmentReporting.setVersionControlNumber("1"); - return treatmentReporting; - } } diff --git a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java index 03f20b6f..a7cae8f4 100644 --- a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java +++ b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java @@ -96,6 +96,7 @@ void setUp() { investigationService.setInterviewTopic(interviewTopic); investigationService.setContactTopic(contactTopic); investigationService.setTreatmentTopic(treatmentTopic); + investigationService.setTreatmentOutputTopicName(treatmentTopicOutput); transformer.setInvestigationConfirmationOutputTopicName("investigationConfirmation"); transformer.setInvestigationObservationOutputTopicName("investigationObservation"); @@ -107,7 +108,6 @@ void setUp() { transformer.setInterviewAnswerOutputTopicName("interviewAnswer"); transformer.setInterviewNoteOutputTopicName("interviewNote"); transformer.setRdbMetadataColumnsOutputTopicName("metadataColumns"); - transformer.setTreatmentOutputTopicName(treatmentTopicOutput); investigationService.setTreatmentEnable(true); } @@ -586,35 +586,28 @@ void testProcessTreatmentMessage() throws JsonProcessingException { when(treatmentRepository.computeTreatment(String.valueOf(treatmentUid))).thenReturn(Optional.of(treatment)); CompletableFuture> future = new CompletableFuture<>(); - when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(future); - when(kafkaTemplate.send(anyString(), anyString(), isNull())).thenReturn(future); investigationService.processMessage(payload, treatmentTopic, consumer); - future.complete(null); - Awaitility.await() - .atMost(1, TimeUnit.SECONDS) - .untilAsserted(() -> - verify(kafkaTemplate, times(2)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()) - ); - verify(treatmentRepository).computeTreatment(String.valueOf(treatmentUid)); + verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); - assertEquals(treatmentTopicOutput, topicCaptor.getAllValues().get(0)); - assertNull(messageCaptor.getAllValues().get(0)); - - assertEquals(treatmentTopicOutput, topicCaptor.getAllValues().get(1)); + assertEquals(treatmentTopicOutput, topicCaptor.getValue()); - String treatmentJson = messageCaptor.getAllValues().get(1); - TreatmentReporting actualTreatment = objectMapper.readValue( + String treatmentJson = messageCaptor.getValue(); + Treatment actualTreatment = objectMapper.readValue( objectMapper.readTree(treatmentJson).path("payload").toString(), - TreatmentReporting.class); - - TreatmentReporting expectedTreatment = constructTreatmentReporting(treatmentUid); - - assertEquals(expectedTreatment, actualTreatment); + Treatment.class); + + String keyJson = keyCaptor.getValue(); + TreatmentReportingKey keyObject = objectMapper.readValue( + objectMapper.readTree(keyJson).path("payload").toString(), + TreatmentReportingKey.class); + assertEquals(treatment.getTreatmentUid(), keyObject.getTreatmentUid()); + + assertEquals(treatment, actualTreatment); } @Test @@ -675,35 +668,5 @@ private Treatment constructTreatment(Long treatmentUid) { return treatment; } - private TreatmentReporting constructTreatmentReporting(Long treatmentUid) { - TreatmentReporting treatmentReporting = new TreatmentReporting(); - treatmentReporting.setTreatmentUid(treatmentUid.toString()); - treatmentReporting.setPublicHealthCaseUid("12345"); - treatmentReporting.setOrganizationUid("67890"); - treatmentReporting.setProviderUid("11111"); - treatmentReporting.setPatientTreatmentUid("22222"); - treatmentReporting.setTreatmentName("Test Treatment"); - treatmentReporting.setTreatmentOid("33333"); - treatmentReporting.setTreatmentComments("Test Comments"); - treatmentReporting.setTreatmentSharedInd("Y"); - treatmentReporting.setCd("TEST_CD"); - treatmentReporting.setTreatmentDate("2024-01-01T10:00:00"); - treatmentReporting.setTreatmentDrug("Drug123"); - treatmentReporting.setTreatmentDrugName("Test Drug"); - treatmentReporting.setTreatmentDosageStrength("100"); - treatmentReporting.setTreatmentDosageStrengthUnit("mg"); - treatmentReporting.setTreatmentFrequency("Daily"); - treatmentReporting.setTreatmentDuration("7"); - treatmentReporting.setTreatmentDurationUnit("days"); - treatmentReporting.setTreatmentRoute("Oral"); - treatmentReporting.setLocalId("LOC123"); - treatmentReporting.setRecordStatusCd("Active"); - treatmentReporting.setAddTime("2024-01-01T10:00:00"); - treatmentReporting.setAddUserId("44444"); - treatmentReporting.setLastChangeTime("2024-01-01T10:00:00"); - treatmentReporting.setLastChangeUserId("55555"); - treatmentReporting.setVersionControlNumber("1"); - return treatmentReporting; - } } \ No newline at end of file diff --git a/liquibase-service/src/main/resources/db/odse/routines/026-sp_treatment_event-001.sql b/liquibase-service/src/main/resources/db/odse/routines/026-sp_treatment_event-001.sql index 169d6dfa..8493c08a 100644 --- a/liquibase-service/src/main/resources/db/odse/routines/026-sp_treatment_event-001.sql +++ b/liquibase-service/src/main/resources/db/odse/routines/026-sp_treatment_event-001.sql @@ -46,12 +46,12 @@ BEGIN par.subject_entity_uid AS organization_uid, par1.subject_entity_uid AS provider_uid, viewPatientKeys.treatment_uid AS patient_treatment_uid, - rx1.LOCAL_ID, - rx1.ADD_TIME, - rx1.ADD_USER_ID, - rx1.LAST_CHG_TIME, - rx1.LAST_CHG_USER_ID, - rx1.VERSION_CTRL_NBR + rx1.local_id, + rx1.add_time, + rx1.add_user_id, + rx1.last_chg_time, + rx1.last_chg_user_id, + rx1.version_ctrl_nbr INTO #TREATMENT_UIDS FROM NBS_ODSE.dbo.treatment AS rx1 WITH (NOLOCK) INNER JOIN NBS_ODSE.dbo.Treatment_administered AS rx2 WITH (NOLOCK) @@ -110,31 +110,31 @@ BEGIN t.organization_uid, t.provider_uid, t.patient_treatment_uid, - rx1.cd_desc_txt AS Treatment_nm, - rx1.program_jurisdiction_oid AS Treatment_oid, - REPLACE(REPLACE(rx1.txt, CHAR(13) + CHAR(10), ' '), CHAR(10), ' ') AS Treatment_comments, - rx1.shared_ind AS Treatment_shared_ind, + rx1.cd_desc_txt AS treatment_name, + rx1.program_jurisdiction_oid AS treatment_oid, + REPLACE(REPLACE(rx1.txt, CHAR(13) + CHAR(10), ' '), CHAR(10), ' ') AS treatment_comments, + rx1.shared_ind AS treatment_shared_ind, rx1.cd, - rx2.effective_from_time AS Treatment_dt, - rx2.cd AS Treatment_drug, - rx2.cd_desc_txt AS Treatment_drug_nm, - rx2.dose_qty AS Treatment_dosage_strength, - rx2.dose_qty_unit_cd AS Treatment_dosage_strength_unit, - rx2.interval_cd AS Treatment_frequency, - rx2.effective_duration_amt AS Treatment_duration, - rx2.effective_duration_unit_cd AS Treatment_duration_unit, - rx2.route_cd AS Treatment_route, - t.LOCAL_ID, + rx2.effective_from_time AS treatment_date, + rx2.cd AS treatment_drug, + rx2.cd_desc_txt AS treatment_drug_name, + rx2.dose_qty AS treatment_dosage_strength, + rx2.dose_qty_unit_cd AS treatment_dosage_strength_unit, + rx2.interval_cd AS treatment_frequency, + rx2.effective_duration_amt AS treatment_duration, + rx2.effective_duration_unit_cd AS treatment_duration_unit, + rx2.route_cd AS treatment_route, + t.local_id, CASE WHEN rx1.record_status_cd = '' THEN 'ACTIVE' WHEN rx1.record_status_cd = 'LOG_DEL' THEN 'INACTIVE' ELSE rx1.record_status_cd END as record_status_cd, - t.ADD_TIME, - t.ADD_USER_ID, - t.LAST_CHG_TIME, - t.LAST_CHG_USER_ID, - t.VERSION_CTRL_NBR + t.add_time, + t.add_user_id, + t.last_chg_time, + t.last_chg_user_id, + t.version_ctrl_nbr INTO #TREATMENT_DETAILS FROM #TREATMENT_UIDS t INNER JOIN NBS_ODSE.dbo.treatment rx1 WITH (NOLOCK) @@ -168,27 +168,27 @@ BEGIN t.organization_uid, t.provider_uid, t.patient_treatment_uid, - t.Treatment_nm, - t.Treatment_oid, - t.Treatment_comments, - t.Treatment_shared_ind, + t.treatment_name, + t.treatment_oid, + t.treatment_comments, + t.treatment_shared_ind, t.cd, - t.Treatment_dt, - t.Treatment_drug, - t.Treatment_drug_nm, - t.Treatment_dosage_strength, - t.Treatment_dosage_strength_unit, - t.Treatment_frequency, - t.Treatment_duration, - t.Treatment_duration_unit, - t.Treatment_route, - t.LOCAL_ID, + t.treatment_date, + t.treatment_drug, + t.treatment_drug_name, + t.treatment_dosage_strength, + t.treatment_dosage_strength_unit, + t.treatment_frequency, + t.treatment_duration, + t.treatment_duration_unit, + t.treatment_route, + t.local_id, t.record_status_cd, - t.ADD_TIME, - t.ADD_USER_ID, - t.LAST_CHG_TIME, - t.LAST_CHG_USER_ID, - t.VERSION_CTRL_NBR + t.add_time, + t.add_user_id, + t.last_chg_time, + t.last_chg_user_id, + t.version_ctrl_nbr FROM #TREATMENT_DETAILS t; @@ -251,4 +251,4 @@ BEGIN return @ErrorMessage; END CATCH -END; +END; \ No newline at end of file diff --git a/liquibase-service/src/main/resources/db/rdb_modern/tables/042-create_nrt_treatment-001.sql b/liquibase-service/src/main/resources/db/rdb_modern/tables/042-create_nrt_treatment-001.sql index afaa9a77..b629cafd 100644 --- a/liquibase-service/src/main/resources/db/rdb_modern/tables/042-create_nrt_treatment-001.sql +++ b/liquibase-service/src/main/resources/db/rdb_modern/tables/042-create_nrt_treatment-001.sql @@ -9,28 +9,28 @@ CREATE TABLE dbo.nrt_treatment organization_uid varchar(100) NULL, provider_uid varchar(100) NULL, patient_treatment_uid varchar(100) NULL, - Treatment_nm varchar(500) NULL, - Treatment_oid varchar(100) NULL, - Treatment_comments varchar(500) NULL, - Treatment_shared_ind varchar(100) NULL, + treatment_name varchar(500) NULL, + treatment_oid varchar(100) NULL, + treatment_comments varchar(500) NULL, + treatment_shared_ind varchar(100) NULL, cd varchar(100) NULL, - Treatment_dt datetime NULL, - Treatment_drug varchar(100) NULL, - Treatment_drug_nm varchar(500) NULL, - Treatment_dosage_strength varchar(100) NULL, - Treatment_dosage_strength_unit varchar(100) NULL, - Treatment_frequency varchar(100) NULL, - Treatment_duration varchar(100) NULL, - Treatment_duration_unit varchar(100) NULL, - Treatment_route varchar(100) NULL, - LOCAL_ID varchar(100) NULL, + treatment_date datetime NULL, + treatment_drug varchar(100) NULL, + treatment_drug_name varchar(500) NULL, + treatment_dosage_strength varchar(100) NULL, + treatment_dosage_strength_unit varchar(100) NULL, + treatment_frequency varchar(100) NULL, + treatment_duration varchar(100) NULL, + treatment_duration_unit varchar(100) NULL, + treatment_route varchar(100) NULL, + local_id varchar(100) NULL, record_status_cd varchar(100) NULL, - ADD_TIME datetime NULL, - ADD_USER_ID varchar(100) NULL, - LAST_CHG_TIME datetime NULL, - LAST_CHG_USER_ID varchar(100) NULL, - VERSION_CTRL_NBR varchar(100) NULL, + add_time datetime NULL, + add_user_id varchar(100) NULL, + last_chg_time datetime NULL, + last_chg_user_id varchar(100) NULL, + version_ctrl_nbr varchar(100) NULL, refresh_datetime datetime2(7) GENERATED ALWAYS AS ROW START NOT NULL, max_datetime datetime2(7) GENERATED ALWAYS AS ROW END HIDDEN NOT NULL, PERIOD FOR SYSTEM_TIME (refresh_datetime, max_datetime) -) +) \ No newline at end of file From 4a71deb2fbd77f6e7a6e40c960475d10dd7a120f Mon Sep 17 00:00:00 2001 From: Jayasudha Sundaram Date: Mon, 3 Mar 2025 14:39:23 -0500 Subject: [PATCH 5/9] updated as per review comments --- db/upgrade/odse/routines/021-sp_treatment_event.sql | 6 +----- .../repository/model/reporting/TreatmentReportingKey.java | 1 - .../investigation/service/InvestigationService.java | 2 +- .../investigation/InvestigationDataProcessingTests.java | 3 --- .../db/odse/routines/026-sp_treatment_event-001.sql | 6 +----- 5 files changed, 3 insertions(+), 15 deletions(-) diff --git a/db/upgrade/odse/routines/021-sp_treatment_event.sql b/db/upgrade/odse/routines/021-sp_treatment_event.sql index 8493c08a..b2f2a67f 100644 --- a/db/upgrade/odse/routines/021-sp_treatment_event.sql +++ b/db/upgrade/odse/routines/021-sp_treatment_event.sql @@ -125,11 +125,7 @@ BEGIN rx2.effective_duration_unit_cd AS treatment_duration_unit, rx2.route_cd AS treatment_route, t.local_id, - CASE - WHEN rx1.record_status_cd = '' THEN 'ACTIVE' - WHEN rx1.record_status_cd = 'LOG_DEL' THEN 'INACTIVE' - ELSE rx1.record_status_cd - END as record_status_cd, + dbo.fn_get_record_status(rx1.record_status_cd) as record_status_cd, t.add_time, t.add_user_id, t.last_chg_time, diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/TreatmentReportingKey.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/TreatmentReportingKey.java index 3b043d54..9a14f8a4 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/TreatmentReportingKey.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/repository/model/reporting/TreatmentReportingKey.java @@ -1,6 +1,5 @@ package gov.cdc.etldatapipeline.investigation.repository.model.reporting; -import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.PropertyNamingStrategies; import com.fasterxml.jackson.databind.annotation.JsonNaming; import lombok.Data; diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java index 498c86cf..514d70e2 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java @@ -61,7 +61,7 @@ public class InvestigationService { @Value("${spring.kafka.output.topic-name-reporting}") private String investigationTopicReporting; - @Value("${spring.kafka.output.topic-name-tmt}") + @Value("${spring.kafka.input.topic-name-tmt}") private String treatmentTopic; @Value("${spring.kafka.output.topic-name-treatment}") diff --git a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java index e782bc65..dace685c 100644 --- a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java +++ b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java @@ -22,7 +22,6 @@ import org.slf4j.LoggerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.awaitility.Awaitility; -import org.springframework.kafka.support.SendResult; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -64,11 +63,9 @@ class InvestigationDataProcessingTests { private static final String RDB_METADATA_COLS_TOPIC = "rdbMetadataColsTopic"; private static final String CONTACT_TOPIC = "contactTopic"; private static final String CONTACT_ANSWERS_TOPIC = "contactAnswersTopic"; - private static final String TREATMENT_TOPIC = "treatmentTopic"; private static final Long INVESTIGATION_UID = 234567890L; private static final Long INTERVIEW_UID = 234567890L; private static final Long CONTACT_UID = 12345678L; - private static final Long TREATMENT_UID = 34567890L; private static final String INVALID_JSON = "invalidJSON"; ProcessInvestigationDataUtil transformer; diff --git a/liquibase-service/src/main/resources/db/odse/routines/026-sp_treatment_event-001.sql b/liquibase-service/src/main/resources/db/odse/routines/026-sp_treatment_event-001.sql index 8493c08a..b2f2a67f 100644 --- a/liquibase-service/src/main/resources/db/odse/routines/026-sp_treatment_event-001.sql +++ b/liquibase-service/src/main/resources/db/odse/routines/026-sp_treatment_event-001.sql @@ -125,11 +125,7 @@ BEGIN rx2.effective_duration_unit_cd AS treatment_duration_unit, rx2.route_cd AS treatment_route, t.local_id, - CASE - WHEN rx1.record_status_cd = '' THEN 'ACTIVE' - WHEN rx1.record_status_cd = 'LOG_DEL' THEN 'INACTIVE' - ELSE rx1.record_status_cd - END as record_status_cd, + dbo.fn_get_record_status(rx1.record_status_cd) as record_status_cd, t.add_time, t.add_user_id, t.last_chg_time, From dedd8b5dc6aa8b01a460e03898c119edd874c01d Mon Sep 17 00:00:00 2001 From: Jayasudha Sundaram Date: Mon, 3 Mar 2025 15:13:06 -0500 Subject: [PATCH 6/9] resolved the conficts --- .../service/InvestigationServiceTest.java | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java index 35797d97..942deba0 100644 --- a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java +++ b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java @@ -591,6 +591,7 @@ private ContactReporting constructContactReporting(Long contactUid) { contactReporting.setDispositionedByUid(123L); return contactReporting; } + @Test void testProcessTreatmentMessage() throws JsonProcessingException { Long treatmentUid = 234567890L; @@ -603,7 +604,9 @@ void testProcessTreatmentMessage() throws JsonProcessingException { CompletableFuture> future = new CompletableFuture<>(); when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(future); - investigationService.processMessage(payload, treatmentTopic, consumer); + // Create a ConsumerRecord object + ConsumerRecord rec = getRecord(treatmentTopic, payload); + investigationService.processMessage(rec, consumer); future.complete(null); verify(treatmentRepository).computeTreatment(String.valueOf(treatmentUid)); @@ -621,7 +624,7 @@ void testProcessTreatmentMessage() throws JsonProcessingException { objectMapper.readTree(keyJson).path("payload").toString(), TreatmentReportingKey.class); assertEquals(treatment.getTreatmentUid(), keyObject.getTreatmentUid()); - + assertEquals(treatment, actualTreatment); } @@ -634,22 +637,28 @@ void testProcessTreatmentMessageWhenFeatureDisabled() { when(treatmentRepository.computeTreatment(String.valueOf(treatmentUid))).thenReturn(Optional.of(treatment)); investigationService.setTreatmentEnable(false); - investigationService.processMessage(payload, treatmentTopic, consumer); + // Create a ConsumerRecord object + ConsumerRecord rec = getRecord(treatmentTopic, payload); + investigationService.processMessage(rec, consumer); verify(kafkaTemplate, never()).send(anyString(), anyString(), anyString()); } @Test void testProcessTreatmentException() { String invalidPayload = "{\"payload\": {\"after\": {}}}"; + // Create a ConsumerRecord object + ConsumerRecord rec = getRecord(treatmentTopic, invalidPayload); RuntimeException ex = assertThrows(RuntimeException.class, - () -> investigationService.processMessage(invalidPayload, treatmentTopic, consumer)); + () -> investigationService.processMessage(rec, consumer)); assertEquals(NoSuchElementException.class, ex.getCause().getClass()); } @Test void testProcessTreatmentNoDataException() { String payload = "{\"payload\": {\"after\": {\"treatment_uid\": \"\"}}}"; - assertThrows(NoDataException.class, () -> investigationService.processMessage(payload, treatmentTopic, consumer)); + // Create a ConsumerRecord object + ConsumerRecord rec = getRecord(treatmentTopic, payload); + assertThrows(NoDataException.class, () -> investigationService.processMessage(rec, consumer)); } private Treatment constructTreatment(Long treatmentUid) { @@ -682,8 +691,6 @@ private Treatment constructTreatment(Long treatmentUid) { treatment.setVersionControlNumber("1"); return treatment; } - - private ConsumerRecord getRecord(String topic, String payload) { return new ConsumerRecord<>(topic, 0, 11L, null, payload); } From 743d55a935fb221a2b5e22a2e2b00399d6f95adf Mon Sep 17 00:00:00 2001 From: Jayasudha Sundaram Date: Mon, 3 Mar 2025 15:53:50 -0500 Subject: [PATCH 7/9] resolving conflicts --- .../controller/InvestigationController.java | 8 ++ .../service/InvestigationService.java | 43 +++++++ .../src/main/resources/application.yaml | 3 + .../InvestigationControllerTest.java | 3 +- .../service/InvestigationServiceTest.java | 112 +++++++++++++++++- 5 files changed, 167 insertions(+), 2 deletions(-) diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/controller/InvestigationController.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/controller/InvestigationController.java index 40b73249..2282d04d 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/controller/InvestigationController.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/controller/InvestigationController.java @@ -29,6 +29,9 @@ public class InvestigationController { @Value("${spring.kafka.input.topic-name-vac}") private String vaccinationTopic; + @Value("${spring.kafka.input.topic-name-tmt}") + private String treatmentTopic; + @GetMapping("/reporting/investigation-svc/status") public ResponseEntity getDataPipelineStatusHealth() { @@ -60,4 +63,9 @@ public void postContact(@RequestBody String jsonData) { public void postVaccination(@RequestBody String jsonData) { producerService.sendMessage(vaccinationTopic, jsonData); } + + @PostMapping("/reporting/investigation-svc/treatment") + public void postTreatment(@RequestBody String jsonData) + {producerService.sendMessage(treatmentTopic, jsonData);} + } diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java index f5ca2c79..5bba8739 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java @@ -6,6 +6,7 @@ import gov.cdc.etldatapipeline.investigation.repository.model.dto.*; import gov.cdc.etldatapipeline.investigation.repository.model.reporting.InvestigationKey; import gov.cdc.etldatapipeline.investigation.repository.model.reporting.InvestigationReporting; +import gov.cdc.etldatapipeline.investigation.repository.model.reporting.TreatmentReportingKey; import gov.cdc.etldatapipeline.investigation.util.ProcessInvestigationDataUtil; import jakarta.persistence.EntityNotFoundException; import lombok.RequiredArgsConstructor; @@ -60,6 +61,12 @@ public class InvestigationService { @Value("${spring.kafka.input.topic-name-vac}") private String vaccinationTopic; + @Value("${spring.kafka.input.topic-name-tmt}") + private String treatmentTopic; + + @Value("${spring.kafka.output.topic-name-treatment}") + private String treatmentOutputTopicName; + @Value("${spring.kafka.output.topic-name-reporting}") private String investigationTopicReporting; @@ -72,11 +79,15 @@ public class InvestigationService { @Value("${featureFlag.contact-record-enable}") public boolean contactRecordEnable; + @Value("${featureFlag.treatment-enable}") + public boolean treatmentEnable; + private final InvestigationRepository investigationRepository; private final NotificationRepository notificationRepository; private final InterviewRepository interviewRepository; private final ContactRepository contactRepository; private final VaccinationRepository vaccinationRepository; + private final TreatmentRepository treatmentRepository; private final KafkaTemplate kafkaTemplate; private final ProcessInvestigationDataUtil processDataUtil; @@ -132,6 +143,8 @@ public void processMessage(ConsumerRecord rec, processContact(message); } else if (topic.equals(vaccinationTopic)) { processVaccination(message); + } else if (topic.equals(treatmentTopic) && treatmentEnable) { + processTreatment(message); } consumer.commitSync(); } @@ -260,6 +273,36 @@ private void processVaccination(String value) { } } + private void processTreatment(String value) { + String treatmentUid = ""; + try { + treatmentUid = extractUid(value, "treatment_uid"); + + logger.info(topicDebugLog, "Treatment", treatmentUid, treatmentTopic); + Optional treatmentData = treatmentRepository.computeTreatment(treatmentUid); + if(treatmentData.isPresent()) { + Treatment treatment = treatmentData.get(); + + // Using Treatment directly as the reporting object + TreatmentReportingKey treatmentReportingKey = new TreatmentReportingKey(treatment.getTreatmentUid()); + + String jsonKey = jsonGenerator.generateStringJson(treatmentReportingKey); + String jsonValue = jsonGenerator.generateStringJson(treatment); + + kafkaTemplate.send(treatmentOutputTopicName, jsonKey, jsonValue) + .whenComplete((res, e) -> logger.info("Treatment data (uid={}) sent to {}", + treatment.getTreatmentUid(), treatmentOutputTopicName)); + + } else { + throw new EntityNotFoundException("Unable to find treatment with id: " + treatmentUid); + } + } catch (EntityNotFoundException ex) { + throw new NoDataException(ex.getMessage(), ex); + } catch (Exception e) { + throw new RuntimeException(errorMessage("Treatment", treatmentUid, e), e); + } + } + // This same method can be used for elastic search as well and that is why the generic model is present private CompletableFuture> pushKeyValuePairToKafka(InvestigationKey investigationKey, Object model, String topicName) { String jsonKey = jsonGenerator.generateStringJson(investigationKey); diff --git a/investigation-service/src/main/resources/application.yaml b/investigation-service/src/main/resources/application.yaml index c9c62a49..d687a2fe 100644 --- a/investigation-service/src/main/resources/application.yaml +++ b/investigation-service/src/main/resources/application.yaml @@ -6,6 +6,7 @@ spring: topic-name-int: nbs_Interview topic-name-ctr: nbs_CT_contact topic-name-vac: nbs_Intervention + topic-name-tmt: nbs_Treatment output: topic-name-reporting: nrt_investigation topic-name-confirmation: nrt_investigation_confirmation @@ -21,6 +22,7 @@ spring: topic-name-contact-answer: nrt_contact_answer topic-name-vaccination: nrt_vaccination topic-name-vaccination-answer: nrt_vaccination_answer + topic-name-treatment: nrt_treatment dlq: retry-suffix: _retry dlq-suffix: _dlt @@ -50,5 +52,6 @@ featureFlag: phc-datamart-enable: ${FF_PHC_DM_ENABLE:false} bmird-case-enable: ${FF_BMIRD_CASE_ENABLE:false} contact-record-enable: ${FF_CONTACT_RECORD_ENABLE:false} + treatment-enable: ${FF_TREATMENT_ENABLE:false} server: port: '8093' \ No newline at end of file diff --git a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/controller/InvestigationControllerTest.java b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/controller/InvestigationControllerTest.java index 3d38114c..9a0eff94 100644 --- a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/controller/InvestigationControllerTest.java +++ b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/controller/InvestigationControllerTest.java @@ -50,7 +50,8 @@ public void tearDown() throws Exception { "/reporting/investigation-svc/notification", "/reporting/investigation-svc/interview", "/reporting/investigation-svc/contact", - "/reporting/investigation-svc/vaccination" + "/reporting/investigation-svc/vaccination", + "/reporting/investigation-svc/treatment" }) void testControllerMethods(String endpoint) throws Exception { String jsonData = "{\"key\":\"value\"}"; diff --git a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java index 662657ab..fcb60377 100644 --- a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java +++ b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java @@ -15,6 +15,7 @@ import org.junit.jupiter.api.Test; import org.mockito.*; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; import java.util.List; import java.util.NoSuchElementException; @@ -47,6 +48,9 @@ class InvestigationServiceTest { @Mock private ContactRepository contactRepository; + @Mock + private TreatmentRepository treatmentRepository; + @Mock private VaccinationRepository vaccinationRepository; @@ -75,6 +79,7 @@ class InvestigationServiceTest { private final String interviewTopic = "Interview"; private final String contactTopic = "Contact"; private final String vaccinationTopic = "Vaccination"; + private final String treatmentTopic = "Treatment"; //output topics private final String investigationTopicOutput = "InvestigationOutput"; @@ -82,6 +87,8 @@ class InvestigationServiceTest { private final String interviewTopicOutput = "InterviewOutput"; private final String contactTopicOutput = "ContactOutput"; private final String vaccinationTopicOutput = "VaccinationOutput"; + private final String treatmentTopicOutput = "TreatmentOutput"; + @@ -90,7 +97,7 @@ void setUp() { closeable = MockitoAnnotations.openMocks(this); ProcessInvestigationDataUtil transformer = new ProcessInvestigationDataUtil(kafkaTemplate, investigationRepository); - investigationService = new InvestigationService(investigationRepository, notificationRepository, interviewRepository, contactRepository, vaccinationRepository, kafkaTemplate, transformer); + investigationService = new InvestigationService(investigationRepository, notificationRepository, interviewRepository, contactRepository, vaccinationRepository,treatmentRepository, kafkaTemplate, transformer); investigationService.setPhcDatamartEnable(true); investigationService.setBmirdCaseEnable(true); @@ -101,6 +108,8 @@ void setUp() { investigationService.setInterviewTopic(interviewTopic); investigationService.setContactTopic(contactTopic); investigationService.setVaccinationTopic(vaccinationTopic); + investigationService.setTreatmentTopic(treatmentTopic); + investigationService.setTreatmentOutputTopicName(treatmentTopicOutput); transformer.setInvestigationConfirmationOutputTopicName("investigationConfirmation"); transformer.setInvestigationObservationOutputTopicName("investigationObservation"); @@ -115,6 +124,7 @@ void setUp() { transformer.setRdbMetadataColumnsOutputTopicName("metadataColumns"); transformer.setVaccinationOutputTopicName(vaccinationTopicOutput); transformer.setVaccinationAnswerOutputTopicName("VaccinationAnswerOutput"); + investigationService.setTreatmentEnable(true); } @AfterEach @@ -418,6 +428,106 @@ private void validateInvestigationData(String payload, Investigation investigati assertEquals(reportingModel, actualReporting); } + @Test + void testProcessTreatmentMessage() throws JsonProcessingException { + Long treatmentUid = 234567890L; + String payload = "{\"payload\": {\"after\": {\"treatment_uid\": \"" + treatmentUid + "\"}}}"; + + final Treatment treatment = constructTreatment(treatmentUid); + + when(treatmentRepository.computeTreatment(String.valueOf(treatmentUid))).thenReturn(Optional.of(treatment)); + + CompletableFuture> future = new CompletableFuture<>(); + when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(future); + + // Create a ConsumerRecord object + ConsumerRecord rec = getRecord(treatmentTopic, payload); + investigationService.processMessage(rec, consumer); + future.complete(null); + + verify(treatmentRepository).computeTreatment(String.valueOf(treatmentUid)); + verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); + + assertEquals(treatmentTopicOutput, topicCaptor.getValue()); + + String treatmentJson = messageCaptor.getValue(); + Treatment actualTreatment = objectMapper.readValue( + objectMapper.readTree(treatmentJson).path("payload").toString(), + Treatment.class); + + String keyJson = keyCaptor.getValue(); + TreatmentReportingKey keyObject = objectMapper.readValue( + objectMapper.readTree(keyJson).path("payload").toString(), + TreatmentReportingKey.class); + assertEquals(treatment.getTreatmentUid(), keyObject.getTreatmentUid()); + + assertEquals(treatment, actualTreatment); + } + + @Test + void testProcessTreatmentMessageWhenFeatureDisabled() { + Long treatmentUid = 234567890L; + String payload = "{\"payload\": {\"after\": {\"treatment_uid\": \"" + treatmentUid + "\"}}}"; + + final Treatment treatment = constructTreatment(treatmentUid); + when(treatmentRepository.computeTreatment(String.valueOf(treatmentUid))).thenReturn(Optional.of(treatment)); + + investigationService.setTreatmentEnable(false); + // Create a ConsumerRecord object + ConsumerRecord rec = getRecord(treatmentTopic, payload); + investigationService.processMessage(rec, consumer); + verify(kafkaTemplate, never()).send(anyString(), anyString(), anyString()); + } + + @Test + void testProcessTreatmentException() { + String invalidPayload = "{\"payload\": {\"after\": {}}}"; + // Create a ConsumerRecord object + ConsumerRecord rec = getRecord(treatmentTopic, invalidPayload); + RuntimeException ex = assertThrows(RuntimeException.class, + () -> investigationService.processMessage(rec, consumer)); + assertEquals(NoSuchElementException.class, ex.getCause().getClass()); + } + + @Test + void testProcessTreatmentNoDataException() { + String payload = "{\"payload\": {\"after\": {\"treatment_uid\": \"\"}}}"; + // Create a ConsumerRecord object + ConsumerRecord rec = getRecord(treatmentTopic, payload); + assertThrows(NoDataException.class, () -> investigationService.processMessage(rec, consumer)); + } + + private Treatment constructTreatment(Long treatmentUid) { + Treatment treatment = new Treatment(); + treatment.setTreatmentUid(String.valueOf(treatmentUid)); + treatment.setPublicHealthCaseUid("12345"); + treatment.setOrganizationUid("67890"); + treatment.setProviderUid("11111"); + treatment.setPatientTreatmentUid("22222"); + treatment.setTreatmentName("Test Treatment"); + treatment.setTreatmentOid("33333"); + treatment.setTreatmentComments("Test Comments"); + treatment.setTreatmentSharedInd("Y"); + treatment.setCd("TEST_CD"); + treatment.setTreatmentDate("2024-01-01T10:00:00"); + treatment.setTreatmentDrug("Drug123"); + treatment.setTreatmentDrugName("Test Drug"); + treatment.setTreatmentDosageStrength("100"); + treatment.setTreatmentDosageStrengthUnit("mg"); + treatment.setTreatmentFrequency("Daily"); + treatment.setTreatmentDuration("7"); + treatment.setTreatmentDurationUnit("days"); + treatment.setTreatmentRoute("Oral"); + treatment.setLocalId("LOC123"); + treatment.setRecordStatusCd("Active"); + treatment.setAddTime("2024-01-01T10:00:00"); + treatment.setAddUserId("44444"); + treatment.setLastChangeTime("2024-01-01T10:00:00"); + treatment.setLastChangeUserId("55555"); + treatment.setVersionControlNumber("1"); + return treatment; + } + private ConsumerRecord getRecord(String topic, String payload) { return new ConsumerRecord<>(topic, 0, 11L, null, payload); } From 27e55d2c4725ff51549349afb52789ba27290002 Mon Sep 17 00:00:00 2001 From: Jayasudha Sundaram Date: Mon, 3 Mar 2025 16:15:54 -0500 Subject: [PATCH 8/9] addressed pr comments --- .../service/InvestigationServiceTest.java | 31 ------------------- .../investigation/utils/TestUtils.java | 31 +++++++++++++++++++ 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java index fcb60377..02bb974a 100644 --- a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java +++ b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java @@ -497,37 +497,6 @@ void testProcessTreatmentNoDataException() { assertThrows(NoDataException.class, () -> investigationService.processMessage(rec, consumer)); } - private Treatment constructTreatment(Long treatmentUid) { - Treatment treatment = new Treatment(); - treatment.setTreatmentUid(String.valueOf(treatmentUid)); - treatment.setPublicHealthCaseUid("12345"); - treatment.setOrganizationUid("67890"); - treatment.setProviderUid("11111"); - treatment.setPatientTreatmentUid("22222"); - treatment.setTreatmentName("Test Treatment"); - treatment.setTreatmentOid("33333"); - treatment.setTreatmentComments("Test Comments"); - treatment.setTreatmentSharedInd("Y"); - treatment.setCd("TEST_CD"); - treatment.setTreatmentDate("2024-01-01T10:00:00"); - treatment.setTreatmentDrug("Drug123"); - treatment.setTreatmentDrugName("Test Drug"); - treatment.setTreatmentDosageStrength("100"); - treatment.setTreatmentDosageStrengthUnit("mg"); - treatment.setTreatmentFrequency("Daily"); - treatment.setTreatmentDuration("7"); - treatment.setTreatmentDurationUnit("days"); - treatment.setTreatmentRoute("Oral"); - treatment.setLocalId("LOC123"); - treatment.setRecordStatusCd("Active"); - treatment.setAddTime("2024-01-01T10:00:00"); - treatment.setAddUserId("44444"); - treatment.setLastChangeTime("2024-01-01T10:00:00"); - treatment.setLastChangeUserId("55555"); - treatment.setVersionControlNumber("1"); - return treatment; - } - private ConsumerRecord getRecord(String topic, String payload) { return new ConsumerRecord<>(topic, 0, 11L, null, payload); } diff --git a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/utils/TestUtils.java b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/utils/TestUtils.java index f87051d0..c49ca894 100644 --- a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/utils/TestUtils.java +++ b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/utils/TestUtils.java @@ -367,4 +367,35 @@ public static VaccinationReporting constructVaccinationReporting(Long vaccinatio vaccinationReporting.setVaccineManufacturerNm("test"); return vaccinationReporting; } + + public static Treatment constructTreatment(Long treatmentUid) { + Treatment treatment = new Treatment(); + treatment.setTreatmentUid(String.valueOf(treatmentUid)); + treatment.setPublicHealthCaseUid("12345"); + treatment.setOrganizationUid("67890"); + treatment.setProviderUid("11111"); + treatment.setPatientTreatmentUid("22222"); + treatment.setTreatmentName("Test Treatment"); + treatment.setTreatmentOid("33333"); + treatment.setTreatmentComments("Test Comments"); + treatment.setTreatmentSharedInd("Y"); + treatment.setCd("TEST_CD"); + treatment.setTreatmentDate("2024-01-01T10:00:00"); + treatment.setTreatmentDrug("Drug123"); + treatment.setTreatmentDrugName("Test Drug"); + treatment.setTreatmentDosageStrength("100"); + treatment.setTreatmentDosageStrengthUnit("mg"); + treatment.setTreatmentFrequency("Daily"); + treatment.setTreatmentDuration("7"); + treatment.setTreatmentDurationUnit("days"); + treatment.setTreatmentRoute("Oral"); + treatment.setLocalId("LOC123"); + treatment.setRecordStatusCd("Active"); + treatment.setAddTime("2024-01-01T10:00:00"); + treatment.setAddUserId("44444"); + treatment.setLastChangeTime("2024-01-01T10:00:00"); + treatment.setLastChangeUserId("55555"); + treatment.setVersionControlNumber("1"); + return treatment; + } } From 870f0039bc1834914f6f02839bf6febad8b253f8 Mon Sep 17 00:00:00 2001 From: Jayasudha Sundaram Date: Tue, 11 Mar 2025 13:04:06 -0400 Subject: [PATCH 9/9] removed featureflag for treatment, resolved conflicts --- .../odse/routines/021-sp_treatment_event.sql | 94 +++++++++---------- .../service/InvestigationService.java | 5 +- .../src/main/resources/application.yaml | 1 - .../service/InvestigationServiceTest.java | 16 ---- .../routines/026-sp_treatment_event-001.sql | 94 +++++++++---------- 5 files changed, 91 insertions(+), 119 deletions(-) diff --git a/db/upgrade/odse/routines/021-sp_treatment_event.sql b/db/upgrade/odse/routines/021-sp_treatment_event.sql index 59160171..3bb318f9 100644 --- a/db/upgrade/odse/routines/021-sp_treatment_event.sql +++ b/db/upgrade/odse/routines/021-sp_treatment_event.sql @@ -47,12 +47,12 @@ BEGIN par1.subject_entity_uid AS provider_uid, viewPatientKeys.treatment_uid AS patient_treatment_uid, act2.target_act_uid AS morbidity_uid, - rx1.LOCAL_ID, - rx1.ADD_TIME, - rx1.ADD_USER_ID, - rx1.LAST_CHG_TIME, - rx1.LAST_CHG_USER_ID, - rx1.VERSION_CTRL_NBR + rx1.local_id, + rx1.add_time, + rx1.add_user_id, + rx1.last_chg_time, + rx1.last_chg_user_id, + rx1.version_ctrl_nbr INTO #TREATMENT_UIDS FROM NBS_ODSE.dbo.treatment AS rx1 WITH (NOLOCK) INNER JOIN NBS_ODSE.dbo.Treatment_administered AS rx2 WITH (NOLOCK) @@ -117,31 +117,27 @@ BEGIN t.provider_uid, t.patient_treatment_uid, t.morbidity_uid, - rx1.cd_desc_txt AS Treatment_nm, - rx1.program_jurisdiction_oid AS Treatment_oid, - REPLACE(REPLACE(rx1.txt, CHAR(13) + CHAR(10), ' '), CHAR(10), ' ') AS Treatment_comments, - rx1.shared_ind AS Treatment_shared_ind, + rx1.cd_desc_txt AS treatment_name, + rx1.program_jurisdiction_oid AS treatment_oid, + REPLACE(REPLACE(rx1.txt, CHAR(13) + CHAR(10), ' '), CHAR(10), ' ') AS treatment_comments, + rx1.shared_ind AS treatment_shared_ind, rx1.cd, - rx2.effective_from_time AS Treatment_dt, - rx2.cd AS Treatment_drug, - rx2.cd_desc_txt AS Treatment_drug_nm, - rx2.dose_qty AS Treatment_dosage_strength, - rx2.dose_qty_unit_cd AS Treatment_dosage_strength_unit, - rx2.interval_cd AS Treatment_frequency, - rx2.effective_duration_amt AS Treatment_duration, - rx2.effective_duration_unit_cd AS Treatment_duration_unit, - rx2.route_cd AS Treatment_route, - t.LOCAL_ID, - CASE - WHEN rx1.record_status_cd = '' THEN 'ACTIVE' - WHEN rx1.record_status_cd = 'LOG_DEL' THEN 'INACTIVE' - ELSE rx1.record_status_cd - END as record_status_cd, - t.ADD_TIME, - t.ADD_USER_ID, - t.LAST_CHG_TIME, - t.LAST_CHG_USER_ID, - t.VERSION_CTRL_NBR + rx2.effective_from_time AS treatment_date, + rx2.cd AS treatment_drug, + rx2.cd_desc_txt AS treatment_drug_name, + rx2.dose_qty AS treatment_dosage_strength, + rx2.dose_qty_unit_cd AS treatment_dosage_strength_unit, + rx2.interval_cd AS treatment_frequency, + rx2.effective_duration_amt AS treatment_duration, + rx2.effective_duration_unit_cd AS treatment_duration_unit, + rx2.route_cd AS treatment_route, + t.local_id, + dbo.fn_get_record_status(rx1.record_status_cd) as record_status_cd, + t.add_time, + t.add_user_id, + t.last_chg_time, + t.last_chg_user_id, + t.version_ctrl_nbr INTO #TREATMENT_DETAILS FROM #TREATMENT_UIDS t INNER JOIN NBS_ODSE.dbo.treatment rx1 WITH (NOLOCK) @@ -176,27 +172,27 @@ BEGIN t.provider_uid, t.morbidity_uid, t.patient_treatment_uid, - t.Treatment_nm, - t.Treatment_oid, - t.Treatment_comments, - t.Treatment_shared_ind, + t.treatment_name, + t.treatment_oid, + t.treatment_comments, + t.treatment_shared_ind, t.cd, - t.Treatment_dt, - t.Treatment_drug, - t.Treatment_drug_nm, - t.Treatment_dosage_strength, - t.Treatment_dosage_strength_unit, - t.Treatment_frequency, - t.Treatment_duration, - t.Treatment_duration_unit, - t.Treatment_route, - t.LOCAL_ID, + t.treatment_date, + t.treatment_drug, + t.treatment_drug_name, + t.treatment_dosage_strength, + t.treatment_dosage_strength_unit, + t.treatment_frequency, + t.treatment_duration, + t.treatment_duration_unit, + t.treatment_route, + t.local_id, t.record_status_cd, - t.ADD_TIME, - t.ADD_USER_ID, - t.LAST_CHG_TIME, - t.LAST_CHG_USER_ID, - t.VERSION_CTRL_NBR + t.add_time, + t.add_user_id, + t.last_chg_time, + t.last_chg_user_id, + t.version_ctrl_nbr FROM #TREATMENT_DETAILS t; diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java index 5bba8739..603af6e0 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java @@ -79,9 +79,6 @@ public class InvestigationService { @Value("${featureFlag.contact-record-enable}") public boolean contactRecordEnable; - @Value("${featureFlag.treatment-enable}") - public boolean treatmentEnable; - private final InvestigationRepository investigationRepository; private final NotificationRepository notificationRepository; private final InterviewRepository interviewRepository; @@ -143,7 +140,7 @@ public void processMessage(ConsumerRecord rec, processContact(message); } else if (topic.equals(vaccinationTopic)) { processVaccination(message); - } else if (topic.equals(treatmentTopic) && treatmentEnable) { + } else if (topic.equals(treatmentTopic)) { processTreatment(message); } consumer.commitSync(); diff --git a/investigation-service/src/main/resources/application.yaml b/investigation-service/src/main/resources/application.yaml index d687a2fe..2f454e0b 100644 --- a/investigation-service/src/main/resources/application.yaml +++ b/investigation-service/src/main/resources/application.yaml @@ -52,6 +52,5 @@ featureFlag: phc-datamart-enable: ${FF_PHC_DM_ENABLE:false} bmird-case-enable: ${FF_BMIRD_CASE_ENABLE:false} contact-record-enable: ${FF_CONTACT_RECORD_ENABLE:false} - treatment-enable: ${FF_TREATMENT_ENABLE:false} server: port: '8093' \ No newline at end of file diff --git a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java index 02bb974a..a84e4a17 100644 --- a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java +++ b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java @@ -124,7 +124,6 @@ void setUp() { transformer.setRdbMetadataColumnsOutputTopicName("metadataColumns"); transformer.setVaccinationOutputTopicName(vaccinationTopicOutput); transformer.setVaccinationAnswerOutputTopicName("VaccinationAnswerOutput"); - investigationService.setTreatmentEnable(true); } @AfterEach @@ -464,21 +463,6 @@ void testProcessTreatmentMessage() throws JsonProcessingException { assertEquals(treatment, actualTreatment); } - @Test - void testProcessTreatmentMessageWhenFeatureDisabled() { - Long treatmentUid = 234567890L; - String payload = "{\"payload\": {\"after\": {\"treatment_uid\": \"" + treatmentUid + "\"}}}"; - - final Treatment treatment = constructTreatment(treatmentUid); - when(treatmentRepository.computeTreatment(String.valueOf(treatmentUid))).thenReturn(Optional.of(treatment)); - - investigationService.setTreatmentEnable(false); - // Create a ConsumerRecord object - ConsumerRecord rec = getRecord(treatmentTopic, payload); - investigationService.processMessage(rec, consumer); - verify(kafkaTemplate, never()).send(anyString(), anyString(), anyString()); - } - @Test void testProcessTreatmentException() { String invalidPayload = "{\"payload\": {\"after\": {}}}"; diff --git a/liquibase-service/src/main/resources/db/odse/routines/026-sp_treatment_event-001.sql b/liquibase-service/src/main/resources/db/odse/routines/026-sp_treatment_event-001.sql index 59160171..3bb318f9 100644 --- a/liquibase-service/src/main/resources/db/odse/routines/026-sp_treatment_event-001.sql +++ b/liquibase-service/src/main/resources/db/odse/routines/026-sp_treatment_event-001.sql @@ -47,12 +47,12 @@ BEGIN par1.subject_entity_uid AS provider_uid, viewPatientKeys.treatment_uid AS patient_treatment_uid, act2.target_act_uid AS morbidity_uid, - rx1.LOCAL_ID, - rx1.ADD_TIME, - rx1.ADD_USER_ID, - rx1.LAST_CHG_TIME, - rx1.LAST_CHG_USER_ID, - rx1.VERSION_CTRL_NBR + rx1.local_id, + rx1.add_time, + rx1.add_user_id, + rx1.last_chg_time, + rx1.last_chg_user_id, + rx1.version_ctrl_nbr INTO #TREATMENT_UIDS FROM NBS_ODSE.dbo.treatment AS rx1 WITH (NOLOCK) INNER JOIN NBS_ODSE.dbo.Treatment_administered AS rx2 WITH (NOLOCK) @@ -117,31 +117,27 @@ BEGIN t.provider_uid, t.patient_treatment_uid, t.morbidity_uid, - rx1.cd_desc_txt AS Treatment_nm, - rx1.program_jurisdiction_oid AS Treatment_oid, - REPLACE(REPLACE(rx1.txt, CHAR(13) + CHAR(10), ' '), CHAR(10), ' ') AS Treatment_comments, - rx1.shared_ind AS Treatment_shared_ind, + rx1.cd_desc_txt AS treatment_name, + rx1.program_jurisdiction_oid AS treatment_oid, + REPLACE(REPLACE(rx1.txt, CHAR(13) + CHAR(10), ' '), CHAR(10), ' ') AS treatment_comments, + rx1.shared_ind AS treatment_shared_ind, rx1.cd, - rx2.effective_from_time AS Treatment_dt, - rx2.cd AS Treatment_drug, - rx2.cd_desc_txt AS Treatment_drug_nm, - rx2.dose_qty AS Treatment_dosage_strength, - rx2.dose_qty_unit_cd AS Treatment_dosage_strength_unit, - rx2.interval_cd AS Treatment_frequency, - rx2.effective_duration_amt AS Treatment_duration, - rx2.effective_duration_unit_cd AS Treatment_duration_unit, - rx2.route_cd AS Treatment_route, - t.LOCAL_ID, - CASE - WHEN rx1.record_status_cd = '' THEN 'ACTIVE' - WHEN rx1.record_status_cd = 'LOG_DEL' THEN 'INACTIVE' - ELSE rx1.record_status_cd - END as record_status_cd, - t.ADD_TIME, - t.ADD_USER_ID, - t.LAST_CHG_TIME, - t.LAST_CHG_USER_ID, - t.VERSION_CTRL_NBR + rx2.effective_from_time AS treatment_date, + rx2.cd AS treatment_drug, + rx2.cd_desc_txt AS treatment_drug_name, + rx2.dose_qty AS treatment_dosage_strength, + rx2.dose_qty_unit_cd AS treatment_dosage_strength_unit, + rx2.interval_cd AS treatment_frequency, + rx2.effective_duration_amt AS treatment_duration, + rx2.effective_duration_unit_cd AS treatment_duration_unit, + rx2.route_cd AS treatment_route, + t.local_id, + dbo.fn_get_record_status(rx1.record_status_cd) as record_status_cd, + t.add_time, + t.add_user_id, + t.last_chg_time, + t.last_chg_user_id, + t.version_ctrl_nbr INTO #TREATMENT_DETAILS FROM #TREATMENT_UIDS t INNER JOIN NBS_ODSE.dbo.treatment rx1 WITH (NOLOCK) @@ -176,27 +172,27 @@ BEGIN t.provider_uid, t.morbidity_uid, t.patient_treatment_uid, - t.Treatment_nm, - t.Treatment_oid, - t.Treatment_comments, - t.Treatment_shared_ind, + t.treatment_name, + t.treatment_oid, + t.treatment_comments, + t.treatment_shared_ind, t.cd, - t.Treatment_dt, - t.Treatment_drug, - t.Treatment_drug_nm, - t.Treatment_dosage_strength, - t.Treatment_dosage_strength_unit, - t.Treatment_frequency, - t.Treatment_duration, - t.Treatment_duration_unit, - t.Treatment_route, - t.LOCAL_ID, + t.treatment_date, + t.treatment_drug, + t.treatment_drug_name, + t.treatment_dosage_strength, + t.treatment_dosage_strength_unit, + t.treatment_frequency, + t.treatment_duration, + t.treatment_duration_unit, + t.treatment_route, + t.local_id, t.record_status_cd, - t.ADD_TIME, - t.ADD_USER_ID, - t.LAST_CHG_TIME, - t.LAST_CHG_USER_ID, - t.VERSION_CTRL_NBR + t.add_time, + t.add_user_id, + t.last_chg_time, + t.last_chg_user_id, + t.version_ctrl_nbr FROM #TREATMENT_DETAILS t;