Skip to content

Commit

Permalink
CNDIT-1702: RTR performance investigate and fix for investigation ser…
Browse files Browse the repository at this point in the history
…vice topic duplication (#35)
  • Loading branch information
sveselev authored Sep 10, 2024
1 parent 14a7129 commit 765d58c
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class KafkaConsumerConfig {
private String bootstrapServers = "";

// Higher value for more intensive operation, also increase latency
// default is 30000, equivalent to 5 min
// default is 300000, equivalent to 5 min
@Value("${spring.kafka.consumer.maxPollIntervalMs}")
private String maxPollInterval = "";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import gov.cdc.etldatapipeline.investigation.util.ProcessInvestigationDataUtil;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.errors.SerializationException;
import org.modelmapper.ModelMapper;
import org.slf4j.Logger;
Expand Down Expand Up @@ -78,9 +79,11 @@ public class InvestigationService {
topics = "${spring.kafka.input.topic-name}"
)
public void processMessage(String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Consumer<?,?> consumer) {
logger.debug(topicDebugLog, message, topic);
processInvestigation(message);
consumer.commitSync();
}

public void processInvestigation(String value) {
Expand Down
4 changes: 2 additions & 2 deletions investigation-service/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ spring:
topic-name: nbs_Public_health_case
output:
topic-name-reporting: nrt_investigation
topic-name-es: elastic_search_investigation
topic-name-confirmation: nrt_investigation_confirmation
topic-name-observation: nrt_investigation_observation
topic-name-notifications: nrt_investigation_notification
Expand All @@ -16,7 +15,8 @@ spring:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER:localhost:9092}
consumer:
max-retry: 3
maxPollIntervalMs: 30000
maxPollIntervalMs: 300000
enable-auto-commit: false
admin:
auto-create: true
application:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import gov.cdc.etldatapipeline.investigation.repository.model.reporting.InvestigationReporting;
import gov.cdc.etldatapipeline.investigation.repository.rdb.InvestigationCaseAnswerRepository;
import gov.cdc.etldatapipeline.investigation.util.ProcessInvestigationDataUtil;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -36,6 +37,9 @@ class InvestigationServiceTest {
@Mock
KafkaTemplate<String, String> kafkaTemplate;

@Mock
MockConsumer<String, String> consumer;

@Captor
private ArgumentCaptor<String> topicCaptor;

Expand Down Expand Up @@ -89,7 +93,7 @@ void testProcessInvestigationException() {
String invalidPayload = "{\"payload\": {\"after\": }}";

final var investigationService = getInvestigationService(investigationTopic, investigationTopicOutput);
assertThrows(RuntimeException.class, () -> investigationService.processMessage(invalidPayload, investigationTopic));
assertThrows(RuntimeException.class, () -> investigationService.processMessage(invalidPayload, investigationTopic, consumer));
}

@Test
Expand All @@ -102,14 +106,14 @@ void testProcessInvestigationNoDataException() {
when(investigationRepository.computeInvestigations(String.valueOf(investigationUid))).thenReturn(Optional.empty());

final var investigationService = getInvestigationService(investigationTopic, investigationTopicOutput);
assertThrows(NoDataException.class, () -> investigationService.processMessage(payload, investigationTopic));
assertThrows(NoDataException.class, () -> investigationService.processMessage(payload, investigationTopic, consumer));
}

private void validateData(String inputTopicName, String outputTopicName,
String payload, Investigation investigation) throws JsonProcessingException {

final var investigationService = getInvestigationService(inputTopicName, outputTopicName);
investigationService.processMessage(payload, inputTopicName);
investigationService.processMessage(payload, inputTopicName, consumer);

InvestigationKey investigationKey = new InvestigationKey();
investigationKey.setPublicHealthCaseUid(investigation.getPublicHealthCaseUid());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public <T> void processOrgName(String name, T org) {
Arrays.stream(utilHelper.deserializePayload(name, Name[].class))
.filter(oName -> !ObjectUtils.isEmpty(oName.getOnOrgUid()))
.max(Comparator.comparing(Name::getOnOrgUid))
.map(n -> n.updateOrg(org));
.ifPresent(n -> n.updateOrg(org));
}
}

Expand All @@ -33,7 +33,7 @@ public <T> void processOrgEntity(String entity, T org) {
Arrays.stream(utilHelper.deserializePayload(entity, Entity[].class))
.filter(oEntity -> !ObjectUtils.isEmpty(oEntity.getEntityIdSeq()))
.max(Comparator.comparing(Entity::getEntityIdSeq))
.map(n -> n.updateOrg(org));
.ifPresent(n -> n.updateOrg(org));
} else {
Function<String, T> entityFn =
(String typeCd) ->
Expand All @@ -55,7 +55,7 @@ public <T> void processOrgAddress(String address, T org) {
Arrays.stream(utilHelper.deserializePayload(address, Address[].class))
.filter(oAddr -> !ObjectUtils.isEmpty(oAddr.getAddrPlUid()))
.max(Comparator.comparing(Address::getAddrPlUid))
.map(n -> n.updateOrg(org));
.ifPresent(n -> n.updateOrg(org));
}
}

Expand All @@ -64,7 +64,7 @@ public <T> void processOrgPhone(String phone, T org) {
Arrays.stream(utilHelper.deserializePayload(phone, Phone[].class))
.filter(oPhone -> !ObjectUtils.isEmpty(oPhone.getPhTlUid()))
.max(Comparator.comparing(Phone::getPhTlUid))
.map(n -> n.updateOrg(org));
.ifPresent(n -> n.updateOrg(org));
}
}

Expand All @@ -73,7 +73,7 @@ public <T> void processOrgFax(String fax, T org) {
Arrays.stream(utilHelper.deserializePayload(fax, Fax[].class))
.filter(oPhone -> !ObjectUtils.isEmpty(oPhone.getFaxTlUid()))
.max(Comparator.comparing(Fax::getFaxTlUid))
.map(n -> n.updateOrg(org));
.ifPresent(n -> n.updateOrg(org));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public <T extends PersonExtendedProps> void processPersonName(String name, T pf)
.filter(pName -> !ObjectUtils.isEmpty(pName.getPersonNmSeq()))
// Get the entry with the max Person Name Sequence
.max(Comparator.comparing(Name::getPersonNmSeq))
.map(n -> n.updatePerson(pf, cd.getVal()));
.ifPresent(n -> n.updatePerson(pf, cd.getVal()));
}
});
}
Expand All @@ -70,18 +70,18 @@ public <T extends PersonExtendedProps> void processPersonAddress(String address,
.filter(pa -> !ObjectUtils.isEmpty(pa.getPostalLocatorUid())
&& (pa.getUseCd().equalsIgnoreCase("H")))
.max(Comparator.comparing(Address::getPostalLocatorUid))
.map(n -> n.updatePerson(pf));
.ifPresent(n -> n.updatePerson(pf));
Arrays.stream(utilHelper.deserializePayload(address, Address[].class))
.filter(pa -> !ObjectUtils.isEmpty(pa.getPostalLocatorUid())
&& pa.getUseCd().equalsIgnoreCase("BIR"))
.max(Comparator.comparing(Address::getPostalLocatorUid))
.map(n -> n.updatePerson(pf));
.ifPresent(n -> n.updatePerson(pf));
} else if (pf.getClass() == ProviderReporting.class || pf.getClass() == ProviderElasticSearch.class) {
Arrays.stream(utilHelper.deserializePayload(address, Address[].class))
.filter(pa -> !ObjectUtils.isEmpty(pa.getPostalLocatorUid())
&& pa.getUseCd().equalsIgnoreCase("WP"))
.max(Comparator.comparing(Address::getPostalLocatorUid))
.map(n -> n.updatePerson(pf));
.ifPresent(n -> n.updatePerson(pf));
}
}
}
Expand All @@ -91,14 +91,14 @@ public <T extends PersonExtendedProps> void processPersonRace(String race, T pf)
Arrays.stream(utilHelper.deserializePayload(race, Race[].class))
.filter(pRace -> !ObjectUtils.isEmpty(pRace.getPersonUid()))
.max(Comparator.comparing(Race::getPersonUid))
.map(n -> n.updatePerson(pf));
.ifPresent(n -> n.updatePerson(pf));
}
}

public <T extends PersonExtendedProps> void processPersonTelephone(String telephone, T pf) {
if (!ObjectUtils.isEmpty(telephone)) {
Function<String, T> personPhoneFn =
(code) -> Arrays.stream(utilHelper.deserializePayload(telephone, Phone[].class))
code -> Arrays.stream(utilHelper.deserializePayload(telephone, Phone[].class))
.filter(phone -> (StringUtils.hasText(phone.getUseCd())
&& phone.getUseCd().equalsIgnoreCase(code)) ||
(StringUtils.hasText(phone.getCd())
Expand Down Expand Up @@ -138,7 +138,7 @@ public <T extends PersonExtendedProps> void processPersonEmail(String email, T p
Arrays.stream(utilHelper.deserializePayload(email, Email[].class))
.filter(pEmail -> !ObjectUtils.isEmpty(pEmail.getTeleLocatorUid()))
.max(Comparator.comparing(Email::getTeleLocatorUid))
.map(n -> n.updatePerson(pf));
.ifPresent(n -> n.updatePerson(pf));
}
}
}

0 comments on commit 765d58c

Please sign in to comment.