From 97bfb501f967ac37861c858dd24e35eb517958d2 Mon Sep 17 00:00:00 2001 From: Sergei Veselev Date: Wed, 30 Oct 2024 18:58:26 -0400 Subject: [PATCH] CNDE-1403: Post-processing integration for LAB100 and LAB101 datamarts (#65) --- .../repository/PostProcRepository.java | 6 ++++++ .../service/PostProcessingService.java | 10 ++++++++-- .../service/PostProcessingServiceTest.java | 13 ++++++++----- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/repository/PostProcRepository.java b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/repository/PostProcRepository.java index e4ba633..dfb5e23 100644 --- a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/repository/PostProcRepository.java +++ b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/repository/PostProcRepository.java @@ -31,4 +31,10 @@ public interface PostProcRepository extends JpaRepository { @Procedure("sp_d_labtest_result_postprocessing") void executeStoredProcForLabTestResult(@Param("observationUids") String observationUids); + + @Procedure("sp_lab100_datamart_postprocessing") + void executeStoredProcForLab100Datamart(@Param("observationUids") String observationUids); + + @Procedure("sp_lab101_datamart_postprocessing") + void executeStoredProcForLab101Datamart(@Param("observationUids") String observationUids); } diff --git a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingService.java b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingService.java index 7258544..0c64d37 100644 --- a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingService.java +++ b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingService.java @@ -260,6 +260,11 @@ protected void processCachedIds() { postProcRepository::executeStoredProcForLabTest, "sp_d_lab_test_postprocessing"); processTopic(keyTopic, entity.getName(), labIds, postProcRepository::executeStoredProcForLabTestResult, "sp_d_labtest_result_postprocessing"); + + processTopic(keyTopic, entity.getName(), labIds, + postProcRepository::executeStoredProcForLab100Datamart, "sp_lab100_datamart_postprocessing"); + processTopic(keyTopic, entity.getName(), labIds, + postProcRepository::executeStoredProcForLab101Datamart, "sp_lab101_datamart_postprocessing"); } break; default: @@ -312,13 +317,14 @@ private String extractValFromMessage(String topic, String payload) { } } else if (topic.endsWith(Entity.OBSERVATION.getName())) { String domainCd = objectMapper.readTree(payload).get(PAYLOAD).path("obs_domain_cd_st_1").asText(); - String ctrlCd = objectMapper.readTree(payload).get(PAYLOAD).path("ctrl_cd_display_form").asText(); + String ctrlCd = Optional.ofNullable(objectMapper.readTree(payload).get(PAYLOAD).get("ctrl_cd_display_form")) + .filter(node -> !node.isNull()).map(JsonNode::asText).orElse(null); if (MORB_REPORT.equals(ctrlCd)) { if ("Order".equals(domainCd)) { return ctrlCd; } - } else if (assertMatches(ctrlCd, LAB_REPORT, LAB_REPORT_MORB) && + } else if (assertMatches(ctrlCd, LAB_REPORT, LAB_REPORT_MORB, null) && assertMatches(domainCd, "Order", "Result", "R_Order", "R_Result", "I_Order", "I_Result", "Order_rslt")) { return LAB_REPORT; } diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingServiceTest.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingServiceTest.java index 2ca4631..b1ef1e2 100644 --- a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingServiceTest.java +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingServiceTest.java @@ -233,6 +233,8 @@ void testPostProcessObservationMorb() { "'{\"payload\":{\"observation_uid\":123, \"obs_domain_cd_st_1\": \"Result\",\"ctrl_cd_display_form\": \"LabReport\"}}'", "'{\"payload\":{\"observation_uid\":123, \"obs_domain_cd_st_1\": \"R_Order\",\"ctrl_cd_display_form\": \"LabReportMorb\"}}'", "'{\"payload\":{\"observation_uid\":123, \"obs_domain_cd_st_1\": \"R_Result\",\"ctrl_cd_display_form\": \"LabReport\"}}'", + "'{\"payload\":{\"observation_uid\":123, \"obs_domain_cd_st_1\": \"I_Order\",\"ctrl_cd_display_form\": null}}'", + "'{\"payload\":{\"observation_uid\":123, \"obs_domain_cd_st_1\": \"I_Result\",\"ctrl_cd_display_form\": null}}'" }) void testPostProcessObservationLab(String payload) { String topic = "dummy_observation"; @@ -250,11 +252,12 @@ void testPostProcessObservationLab(String payload) { verify(postProcRepositoryMock).executeStoredProcForLabTest(expectedObsIdsString); verify(postProcRepositoryMock).executeStoredProcForLabTestResult(expectedObsIdsString); List logs = listAppender.list; - assertEquals(6, logs.size()); + assertEquals(10, logs.size()); assertTrue(logs.get(2).getFormattedMessage().contains("sp_d_lab_test_postprocessing")); - assertTrue(logs.get(3).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); assertTrue(logs.get(4).getFormattedMessage().contains("sp_d_labtest_result_postprocessing")); - assertTrue(logs.get(5).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); + assertTrue(logs.get(6).getFormattedMessage().contains("sp_lab100_datamart_postprocessing")); + assertTrue(logs.get(8).getFormattedMessage().contains("sp_lab101_datamart_postprocessing")); + assertTrue(logs.get(9).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); } @ParameterizedTest @@ -263,8 +266,7 @@ void testPostProcessObservationLab(String payload) { "'{\"payload\":{\"observation_uid\":123, \"obs_domain_cd_st_1\": \"Order\",\"ctrl_cd_display_form\": \"NoReport\"}}'", "'{\"payload\":{\"observation_uid\":123, \"obs_domain_cd_st_1\": \"NoOrderOrResult\",\"ctrl_cd_display_form\": \"LabReport\"}}'", "'{\"payload\":{\"observation_uid\":123, \"obs_domain_cd_st_1\": null,\"ctrl_cd_display_form\": \"LabReport\"}}'", - "'{\"payload\":{\"observation_uid\":123, \"obs_domain_cd_st_1\": \"C_Result\",\"ctrl_cd_display_form\": \"LabComment\"}}'", - "'{\"payload\":{\"observation_uid\":123, \"obs_domain_cd_st_1\": \"Result\",\"ctrl_cd_display_form\": null}}'" + "'{\"payload\":{\"observation_uid\":123, \"obs_domain_cd_st_1\": \"C_Result\",\"ctrl_cd_display_form\": \"LabComment\"}}'" }) void testPostProcessObservationNoReport(String payload) { String topic = "dummy_observation"; @@ -406,6 +408,7 @@ void testProduceDatamartTopicWithNoPatient() { @ParameterizedTest @CsvSource({ + "'{\"payload\":{\"public_health_case_uid\":123,\"rdb_table_name_list\":null}}'", "'{\"payload\":{\"patient_uid\":123}}'", "'{\"payload\":{invalid}'" })