Skip to content

Commit

Permalink
CNDIT-1671: Add Kafka listener for Notification changing event (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
sveselev authored Sep 16, 2024
1 parent 765d58c commit 7171ba6
Show file tree
Hide file tree
Showing 35 changed files with 748 additions and 442 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,8 @@ public class NoDataException extends RuntimeException {
public NoDataException(String message) {
super(message);
}

public NoDataException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package gov.cdc.etldatapipeline.commonutil;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import lombok.extern.slf4j.Slf4j;

import java.util.NoSuchElementException;

@Slf4j
public class UtilHelper {
private static final ObjectMapper objectMapper = new ObjectMapper()
.registerModule(new JavaTimeModule());

private UtilHelper() {
throw new IllegalStateException("Utility class");
}

public static <T> T deserializePayload(String jsonString, Class<T> type) {
try {
if (jsonString == null) return null;
return objectMapper.readValue(jsonString, type);
} catch (JsonProcessingException e) {
log.error("JsonProcessingException: ", e);
}
return null;
}

public static String extractUid(String value, String uidName) throws Exception {
JsonNode jsonNode = objectMapper.readTree(value);
JsonNode payloadNode = jsonNode.get("payload").path("after");
if (!payloadNode.isMissingNode() && payloadNode.has(uidName)) {
return payloadNode.get(uidName).asText();
} else {
throw new NoSuchElementException("The " + uidName + " field is missing in the message payload.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,32 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.common.base.CaseFormat;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.Field;

@Slf4j
public class CustomJsonGeneratorImpl {

private static final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());

public String generateStringJson(Object model) {
try {
ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
ObjectNode root = objectMapper.createObjectNode();
ObjectNode schemaNode = root.putObject("schema");
schemaNode.put("type", "struct");
schemaNode.set("fields", generateFieldsArray(model));
ObjectNode payloadNode = root.putObject("payload");
payloadNode = generatePayloadNode(payloadNode, model);
generatePayloadNode(payloadNode, model);
return objectMapper.writeValueAsString(root);
} catch (Exception e) {
e.printStackTrace();
log.error("Failed to generate JSON string for model: {}", model.getClass().getName(), e);
return null;
}
}

private static ArrayNode generateFieldsArray(Object model) {
ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
ArrayNode fieldsArray = objectMapper.createArrayNode();

try {
Class<?> modelClass = model.getClass();
for (Field field : modelClass.getDeclaredFields()) {
Expand All @@ -46,25 +47,23 @@ private static ArrayNode generateFieldsArray(Object model) {
fieldsArray.add(fieldNode);
}
} catch (Exception e) {
e.printStackTrace();
log.error("Failed to generate JSON array node for model: {}", model.getClass().getName(), e);
}

return fieldsArray;
}

private static ObjectNode generatePayloadNode(ObjectNode payloadNode, Object model) {
ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());

try {
Class<?> modelClass = model.getClass();
for (java.lang.reflect.Field field : modelClass.getDeclaredFields()) {
field.setAccessible(true);
String fieldName = getFieldName(field);

payloadNode.put(fieldName, objectMapper.valueToTree(field.get(model)));
payloadNode.set(fieldName, objectMapper.valueToTree(field.get(model)));
}
} catch (Exception e) {
e.printStackTrace();
log.error("Failed to generate JSON payload node for model: {}", model.getClass().getName(), e);
}

return payloadNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,25 @@
public class InvestigationController {
private final KafkaProducerService producerService;

@Value("${spring.kafka.input.topic-name}")
private String topicName;
@Value("${spring.kafka.input.topic-name-phc}")
private String investigationTopic;

@Value("${spring.kafka.input.topic-name-ntf}")
private String notificationTopic;

@GetMapping("/reporting/investigation-svc/status")
@ResponseBody
public ResponseEntity<String> getDataPipelineStatusHealth() {
log.info("Investigation Service Status OK");
return ResponseEntity.status(HttpStatus.OK).body("Investigation Service Status OK");
}

@PostMapping("/publish")
public void publishMessageToKafka(@RequestBody String jsonData) {
producerService.sendMessage(topicName, jsonData);
@PostMapping("/reporting/investigation-svc/investigation")
public void postInvestigation(@RequestBody String jsonData) {
producerService.sendMessage(investigationTopic, jsonData);
}

@PostMapping("/reporting/investigation-svc/notification")
public void postNotification(@RequestBody String jsonData) {
producerService.sendMessage(notificationTopic, jsonData);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package gov.cdc.etldatapipeline.investigation.repository.model.dto;

import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import lombok.Data;

@Entity
@Data
public class NotificationUpdate {

@Id
@Column(name = "notification_uid")
private Long notificationUid;

@Column(name = "investigation_notifications")
private String investigationNotifications;

@Column(name = "notification_history")
private String notificationHistory;
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package gov.cdc.etldatapipeline.investigation.repository.model.dto;
package gov.cdc.etldatapipeline.investigation.repository.model.reporting;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package gov.cdc.etldatapipeline.investigation.repository.model.dto;
package gov.cdc.etldatapipeline.investigation.repository.model.reporting;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.persistence.Column;
Expand All @@ -8,7 +8,7 @@

@Entity
@Data
public class InvestigationNotifications {
public class InvestigationNotification {
@JsonProperty("source_act_uid")
@Id
@Column(name = "source_act_uid")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package gov.cdc.etldatapipeline.investigation.repository.model.dto;
package gov.cdc.etldatapipeline.investigation.repository.model.reporting;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
Expand All @@ -8,7 +8,7 @@

@Data
@NoArgsConstructor
public class InvestigationNotificationsKey {
public class InvestigationNotificationKey {

@NonNull
@JsonProperty("notification_uid")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ public interface InvestigationRepository extends JpaRepository<Investigation, St
Optional<Investigation> computeInvestigations(@Param("investigation_uids") String investigationUids);

@Query(nativeQuery = true, value = "exec sp_public_health_case_fact_datamart_event :investigation_uids")
Optional<String> populatePhcFact(@Param("investigation_uids") String phcIds);
void populatePhcFact(@Param("investigation_uids") String phcIds);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package gov.cdc.etldatapipeline.investigation.repository.odse;

import gov.cdc.etldatapipeline.investigation.repository.model.dto.NotificationUpdate;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;

import java.util.Optional;

public interface NotificationRepository extends JpaRepository<NotificationUpdate, String> {

@Query(nativeQuery = true, value = "execute sp_notification_event :notification_uids")
Optional<NotificationUpdate> computeNotifications(@Param("notification_uids") String notificationUids);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

import java.util.List;

@Repository
public interface InvestigationCaseAnswerRepository extends JpaRepository<InvestigationCaseAnswer, Long> {
List<InvestigationCaseAnswer> findByActUid(Long actUid);

void deleteByActUid(Long actUid);
}
Loading

0 comments on commit 7171ba6

Please sign in to comment.