Skip to content

Commit

Permalink
CNDE-1861: Fix multivalued data for coded, reason and txt (#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
sveselev authored Nov 1, 2024
1 parent 6e6afc7 commit 8555977
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public ObservationTransformed transformObservationData(Observation observation)
observationTransformed.setObservationUid(observation.getObservationUid());
observationTransformed.setReportObservationUid(observation.getObservationUid());

observationKey.setObservationUid(observation.getObservationUid());
String obsDomainCdSt1 = observation.getObsDomainCdSt1();

transformPersonParticipations(observation.getPersonParticipations(), obsDomainCdSt1, observationTransformed);
Expand Down Expand Up @@ -320,6 +321,9 @@ private void transformActIds(String actIds, ObservationTransformed observationTr
}

private void transformObservationCoded(String observationCoded) {
// Tombstone message to delete previous observation coded data for specified uid
sendToKafka(observationKey, null, codedTopicName, observationKey.getObservationUid(), null);

try {
JsonNode observationCodedJsonArray = parseJsonArray(observationCoded);

Expand All @@ -342,9 +346,8 @@ private void transformObservationDate(String observationDate) {
JsonNode observationDateJsonArray = parseJsonArray(observationDate);

for (JsonNode jsonNode : observationDateJsonArray) {
ObservationDate coded = objectMapper.treeToValue(jsonNode, ObservationDate.class);
observationKey.setObservationUid(coded.getObservationUid());
sendToKafka(observationKey, coded, dateTopicName, coded.getObservationUid(), "Observation Date data (uid={}) sent to {}");
ObservationDate obsDate = objectMapper.treeToValue(jsonNode, ObservationDate.class);
sendToKafka(observationKey, obsDate, dateTopicName, obsDate.getObservationUid(), "Observation Date data (uid={}) sent to {}");
}
} catch (IllegalArgumentException ex) {
logger.info(ex.getMessage(), "ObservationDate");
Expand Down Expand Up @@ -376,7 +379,6 @@ private void transformObservationNumeric(String observationNumeric) {

for (JsonNode jsonNode : observationNumericJsonArray) {
ObservationNumeric numeric = objectMapper.treeToValue(jsonNode, ObservationNumeric.class);
observationKey.setObservationUid(numeric.getObservationUid());
sendToKafka(observationKey, numeric, numericTopicName, numeric.getObservationUid(), "Observation Numeric data (uid={}) sent to {}");
}
} catch (IllegalArgumentException ex) {
Expand All @@ -388,6 +390,9 @@ private void transformObservationNumeric(String observationNumeric) {

private void transformObservationReasons(String observationReasons) {
try {
// Tombstone message to delete previous observation reason data for specified uid
sendToKafka(observationKey, null, reasonTopicName, observationKey.getObservationUid(), null);

JsonNode observationReasonsJsonArray = parseJsonArray(observationReasons);

ObservationReasonKey reasonKey = new ObservationReasonKey();
Expand All @@ -405,6 +410,9 @@ private void transformObservationReasons(String observationReasons) {
}

private void transformObservationTxt(String observationTxt) {
// Tombstone message to delete previous observation txt data for specified uid
sendToKafka(observationKey, null, txtTopicName, observationKey.getObservationUid(), null);

try {
JsonNode observationTxtJsonArray = parseJsonArray(observationTxt);

Expand All @@ -424,9 +432,13 @@ private void transformObservationTxt(String observationTxt) {

private void sendToKafka(Object key, Object value, String topicName, Long uid, String message) {
String jsonKey = jsonGenerator.generateStringJson(key);
String jsonValue = jsonGenerator.generateStringJson(value);
String jsonValue = Optional.ofNullable(value).map(jsonGenerator::generateStringJson).orElse(null);
kafkaTemplate.send(topicName, jsonKey, jsonValue)
.whenComplete((res, e) -> logger.info(message, uid, topicName));
.whenComplete((res, e) -> {
if (message != null) {
logger.info(message, uid, topicName);
}
});
}

private JsonNode parseJsonArray(String jsonString) throws JsonProcessingException, IllegalArgumentException {
Expand Down
Loading

0 comments on commit 8555977

Please sign in to comment.