Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDE 2087: Preprocessing service treatment #195

Merged
merged 13 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ public class InvestigationController {
@Value("${spring.kafka.input.topic-name-ctr}")
private String contactTopic;

@Value("${spring.kafka.input.topic-name-tmt}")
private String treatmentTopic;


@GetMapping("/reporting/investigation-svc/status")
public ResponseEntity<String> getDataPipelineStatusHealth() {
Expand All @@ -52,4 +55,9 @@ public void postInterview(@RequestBody String jsonData) {
public void postContact(@RequestBody String jsonData) {
producerService.sendMessage(contactTopic, jsonData);
}

@PostMapping("/reporting/investigation-svc/treatment")
public void postTreatment(@RequestBody String jsonData) {producerService.sendMessage(treatmentTopic, jsonData);}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package gov.cdc.etldatapipeline.investigation.repository;

import gov.cdc.etldatapipeline.investigation.repository.model.dto.Treatment;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;

import java.util.Optional;

public interface TreatmentRepository extends JpaRepository<Treatment, String> {

@Query(nativeQuery = true, value = "exec sp_treatment_event :treatment_uid")
Optional<Treatment> computeTreatment(@Param("treatment_uid") String treatmentUid);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package gov.cdc.etldatapipeline.investigation.repository.model.dto;

import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import lombok.Data;

@Entity
@Data
public class Treatment {
@Id
@Column(name = "treatment_uid")
private String treatmentUid;

@Column(name = "public_health_case_uid")
private String publicHealthCaseUid;

@Column(name = "organization_uid")
private String organizationUid;

@Column(name = "provider_uid")
private String providerUid;

@Column(name = "patient_treatment_uid")
private String patientTreatmentUid;

@Column(name = "Treatment_nm")
private String treatmentName;

@Column(name = "Treatment_oid")
private String treatmentOid;

@Column(name = "Treatment_comments")
private String treatmentComments;

@Column(name = "Treatment_shared_ind")
private String treatmentSharedInd;

@Column(name = "cd")
private String cd;

@Column(name = "Treatment_dt")
private String treatmentDate;

@Column(name = "Treatment_drug")
private String treatmentDrug;

@Column(name = "Treatment_drug_nm")
private String treatmentDrugName;

@Column(name = "Treatment_dosage_strength")
private String treatmentDosageStrength;

@Column(name = "Treatment_dosage_strength_unit")
private String treatmentDosageStrengthUnit;

@Column(name = "Treatment_frequency")
private String treatmentFrequency;

@Column(name = "Treatment_duration")
private String treatmentDuration;

@Column(name = "Treatment_duration_unit")
private String treatmentDurationUnit;

@Column(name = "Treatment_route")
private String treatmentRoute;

@Column(name = "LOCAL_ID")
private String localId;

@Column(name = "record_status_cd")
private String recordStatusCd;

@Column(name = "ADD_TIME")
private String addTime;

@Column(name = "ADD_USER_ID")
private String addUserId;

@Column(name = "LAST_CHG_TIME")
private String lastChangeTime;

@Column(name = "LAST_CHG_USER_ID")
private String lastChangeUserId;

@Column(name = "VERSION_CTRL_NBR")
private String versionControlNumber;

@Column(name = "refresh_datetime")
private String refreshDatetime;

@Column(name = "max_datetime")
private String maxDatetime;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package gov.cdc.etldatapipeline.investigation.repository.model.reporting;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Data;
import java.time.LocalDateTime;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public class TreatmentReporting {
private String treatmentUid;
private String publicHealthCaseUid;
private String organizationUid;
private String providerUid;
private String patientTreatmentUid;
private String treatmentName;
private String treatmentOid;
private String treatmentComments;
private String treatmentSharedInd;
private String cd;
private String treatmentDate;
private String treatmentDrug;
private String treatmentDrugName;
private String treatmentDosageStrength;
private String treatmentDosageStrengthUnit;
private String treatmentFrequency;
private String treatmentDuration;
private String treatmentDurationUnit;
private String treatmentRoute;
private String localId;
private String recordStatusCd;
private String addTime;
private String addUserId;
private String lastChangeTime;
private String lastChangeUserId;
private String versionControlNumber;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package gov.cdc.etldatapipeline.investigation.repository.model.reporting;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
import lombok.NonNull;

import java.time.LocalDateTime;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class TreatmentReportingKey {
@NonNull
@JsonProperty("treatment_uid")
private String treatmentUid;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@

import gov.cdc.etldatapipeline.commonutil.NoDataException;
import gov.cdc.etldatapipeline.commonutil.json.CustomJsonGeneratorImpl;
import gov.cdc.etldatapipeline.investigation.repository.ContactRepository;
import gov.cdc.etldatapipeline.investigation.repository.InterviewRepository;
import gov.cdc.etldatapipeline.investigation.repository.*;
import gov.cdc.etldatapipeline.investigation.repository.model.dto.*;
import gov.cdc.etldatapipeline.investigation.repository.InvestigationRepository;
import gov.cdc.etldatapipeline.investigation.repository.model.reporting.InvestigationKey;
import gov.cdc.etldatapipeline.investigation.repository.model.reporting.InvestigationReporting;
import gov.cdc.etldatapipeline.investigation.repository.NotificationRepository;
import gov.cdc.etldatapipeline.investigation.util.ProcessInvestigationDataUtil;
import jakarta.persistence.EntityNotFoundException;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -63,6 +60,9 @@ public class InvestigationService {
@Value("${spring.kafka.output.topic-name-reporting}")
private String investigationTopicReporting;

@Value("${spring.kafka.output.topic-name-tmt}")
private String treatmentTopic;

@Value("${featureFlag.phc-datamart-enable}")
private boolean phcDatamartEnable;

Expand All @@ -72,10 +72,14 @@ public class InvestigationService {
@Value("${featureFlag.contact-record-enable}")
public boolean contactRecordEnable;

@Value("${featureFlag.treatment-enable}")
public boolean treatmentEnable;

private final InvestigationRepository investigationRepository;
private final NotificationRepository notificationRepository;
private final InterviewRepository interviewRepository;
private final ContactRepository contactRepository;
private final TreatmentRepository treatmentRepository;

private final KafkaTemplate<String, String> kafkaTemplate;
private final ProcessInvestigationDataUtil processDataUtil;
Expand Down Expand Up @@ -107,7 +111,8 @@ public class InvestigationService {
"${spring.kafka.input.topic-name-phc}",
"${spring.kafka.input.topic-name-ntf}",
"${spring.kafka.input.topic-name-int}",
"${spring.kafka.input.topic-name-ctr}"
"${spring.kafka.input.topic-name-ctr}",
"${spring.kafka.input.topic-name-tmt}"
}
)
public void processMessage(String message,
Expand All @@ -122,6 +127,8 @@ public void processMessage(String message,
processInterview(message);
} else if (topic.equals(contactTopic) && contactRecordEnable) {
processContact(message);
} else if (topic.equals(treatmentTopic) && treatmentEnable) {
processTreatment(message);
}
consumer.commitSync();
}
Expand Down Expand Up @@ -229,6 +236,26 @@ private void processContact(String value) {
}
}

private void processTreatment(String value) {
String treatmentUid = "";
try {
treatmentUid = extractUid(value, "treatment_uid");

logger.info(topicDebugLog, "Treatment", treatmentUid, treatmentTopic);
Optional<Treatment> treatmentData = treatmentRepository.computeTreatment(treatmentUid);
if(treatmentData.isPresent()) {
Treatment treatment = treatmentData.get();
processDataUtil.processTreatment(treatment);
} else {
throw new EntityNotFoundException("Unable to find treatment with id: " + treatmentUid);
}
} catch (EntityNotFoundException ex) {
throw new NoDataException(ex.getMessage(), ex);
} catch (Exception e) {
throw new RuntimeException(errorMessage("Treatment", treatmentUid, e), e);
}
}

// This same method can be used for elastic search as well and that is why the generic model is present
private CompletableFuture<SendResult<String, String>> pushKeyValuePairToKafka(InvestigationKey investigationKey, Object model, String topicName) {
String jsonKey = jsonGenerator.generateStringJson(investigationKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public class ProcessInvestigationDataUtil {
@Value("${spring.kafka.output.topic-name-interview-note}")
private String interviewNoteOutputTopicName;

@Value("${spring.kafka.output.topic-name-treatment}")
private String treatmentOutputTopicName;

@Value("${spring.kafka.output.topic-name-rdb-metadata-columns}")
private String rdbMetadataColumnsOutputTopicName;

Expand Down Expand Up @@ -658,6 +661,79 @@ private ContactReporting transformContact(Contact contact) {
return contactReporting;
}

/**
* Process treatment data and send to Kafka topic
* @param treatment Entity bean returned from stored procedures
*/
public void processTreatment(Treatment treatment) {
try {
// Tombstone message to delete all treatment records for specified treatment uid
TreatmentReportingKey treatmentReportingKey = new TreatmentReportingKey(treatment.getTreatmentUid());
String jsonKeyDel = jsonGenerator.generateStringJson(treatmentReportingKey);
logger.info(TOMBSTONE_MSG_SENT, "treatment", "treatment", treatment.getTreatmentUid());

kafkaTemplate.send(treatmentOutputTopicName, jsonKeyDel, null)
.whenComplete((res, e) -> logger.info(TOMBSTONE_MSG_ACCEPTED, "treatment"))
.thenRunAsync(() -> {
try {
// Transform and send the treatment data
TreatmentReporting treatmentReporting = transformTreatment(treatment);

String jsonKey = jsonGenerator.generateStringJson(treatmentReportingKey);
String jsonValue = jsonGenerator.generateStringJson(treatmentReporting);

kafkaTemplate.send(treatmentOutputTopicName, jsonKey, jsonValue)
.whenComplete((res, e) -> logger.info("Treatment data (uid={}) sent to {}",
treatment.getTreatmentUid(), treatmentOutputTopicName));

} catch (Exception e) {
logger.error("Error processing Treatment data: {}", e.getMessage());
throw new RuntimeException(e);
}
});

} catch (IllegalArgumentException ex) {
logger.info(ex.getMessage(), "Treatment");
} catch (Exception e) {
logger.error("Error processing Treatment data: {}", e.getMessage());
throw e;
}
}

private TreatmentReporting transformTreatment(Treatment treatment) {
TreatmentReporting treatmentReporting = new TreatmentReporting();

treatmentReporting.setTreatmentUid(treatment.getTreatmentUid());
treatmentReporting.setPublicHealthCaseUid(treatment.getPublicHealthCaseUid());
treatmentReporting.setOrganizationUid(treatment.getOrganizationUid());
treatmentReporting.setProviderUid(treatment.getProviderUid());
treatmentReporting.setPatientTreatmentUid(treatment.getPatientTreatmentUid());
treatmentReporting.setTreatmentName(treatment.getTreatmentName());
treatmentReporting.setTreatmentOid(treatment.getTreatmentOid());
treatmentReporting.setTreatmentComments(treatment.getTreatmentComments());
treatmentReporting.setTreatmentSharedInd(treatment.getTreatmentSharedInd());
treatmentReporting.setCd(treatment.getCd());
treatmentReporting.setTreatmentDate(treatment.getTreatmentDate());
treatmentReporting.setTreatmentDrug(treatment.getTreatmentDrug());
treatmentReporting.setTreatmentDrugName(treatment.getTreatmentDrugName());
treatmentReporting.setTreatmentDosageStrength(treatment.getTreatmentDosageStrength());
treatmentReporting.setTreatmentDosageStrengthUnit(treatment.getTreatmentDosageStrengthUnit());
treatmentReporting.setTreatmentFrequency(treatment.getTreatmentFrequency());
treatmentReporting.setTreatmentDuration(treatment.getTreatmentDuration());
treatmentReporting.setTreatmentDurationUnit(treatment.getTreatmentDurationUnit());
treatmentReporting.setTreatmentRoute(treatment.getTreatmentRoute());
treatmentReporting.setLocalId(treatment.getLocalId());
treatmentReporting.setRecordStatusCd(treatment.getRecordStatusCd());
treatmentReporting.setAddTime(treatment.getAddTime());
treatmentReporting.setAddUserId(treatment.getAddUserId());
treatmentReporting.setLastChangeTime(treatment.getLastChangeTime());
treatmentReporting.setLastChangeUserId(treatment.getLastChangeUserId());
treatmentReporting.setVersionControlNumber(treatment.getVersionControlNumber());

return treatmentReporting;
}


/**
* Parse and send RDB metadata column information sourced from the odse nbs_rdb_metadata
* To a generic kafka topic to handle all types of rdb column metadata
Expand Down
3 changes: 3 additions & 0 deletions investigation-service/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ spring:
topic-name-ntf: nbs_Notification
topic-name-int: nbs_Interview
topic-name-ctr: nbs_CT_contact
topic-name-tmt: nbs_Treatment
output:
topic-name-reporting: nrt_investigation
topic-name-confirmation: nrt_investigation_confirmation
Expand All @@ -18,6 +19,7 @@ spring:
topic-name-rdb-metadata-columns: nrt_metadata_columns
topic-name-contact: nrt_contact
topic-name-contact-answer: nrt_contact_answer
topic-name-treatment: nrt_treatment
dlq:
retry-suffix: _retry
dlq-suffix: _dlt
Expand Down Expand Up @@ -47,5 +49,6 @@ featureFlag:
phc-datamart-enable: ${FF_PHC_DM_ENABLE:false}
bmird-case-enable: ${FF_BMIRD_CASE_ENABLE:false}
contact-record-enable: ${FF_CONTACT_RECORD_ENABLE:false}
treatment-enable: ${FF_TREATMENT_ENABLE:false}
server:
port: '8093'
Loading