From 73ba9cc2a1d8584823d27553a0e10dfffade49bc Mon Sep 17 00:00:00 2001 From: rudraneel-chakraborty Date: Mon, 27 Jan 2025 17:40:56 -0500 Subject: [PATCH 1/5] DATAGO-89836 implemented functionality --- .../management/agent/cli/CommandLineScan.java | 6 +- .../agent/config/SolaceConfiguration.java | 2 +- .../agent/logging/StreamingAppender.java | 14 ++- ...ImportPublishImportScanEventProcessor.java | 3 +- .../agent/processor/ScanDataProcessor.java | 4 +- .../agent/processor/ScanLogsProcessor.java | 3 +- .../processor/ScanStatusOverAllProcessor.java | 3 +- .../ScanStatusPerRouteProcessor.java | 4 +- .../agent/publisher/HeartbeatGenerator.java | 3 + .../agent/publisher/ScanDataPublisher.java | 6 +- .../agent/publisher/ScanLogsPublisher.java | 2 +- .../agent/publisher/ScanStatusPublisher.java | 5 +- .../agent/scanManager/ScanManager.java | 23 +++-- .../scanManager/model/ScanRequestBO.java | 2 + .../model/SingleScanSpecification.java | 82 +++++++++++++++++ .../scanManager/rest/EMAControllerImpl.java | 2 +- .../management/agent/service/ScanService.java | 91 ++++++------------- .../ScanCommandMessageProcessor.java | 1 + .../management/agent/cli/CliScanTest.java | 5 +- .../agent/scanManager/ScanManagerTest.java | 35 +++++-- .../mapper/ScanRequestMapperTest.java | 2 + .../scanManager/rest/EMAControllerTest.java | 3 + .../agent/service/ScanServiceTests.java | 36 +++++--- .../plugin/constants/RouteConstants.java | 1 + .../processor/logging/MDCProcessor.java | 3 + 25 files changed, 224 insertions(+), 117 deletions(-) create mode 100644 service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/model/SingleScanSpecification.java diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/cli/CommandLineScan.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/cli/CommandLineScan.java index d7dcce569..142cc940e 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/cli/CommandLineScan.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/cli/CommandLineScan.java @@ -1,5 +1,6 @@ package com.solace.maas.ep.event.management.agent.cli; +import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties; import com.solace.maas.ep.event.management.agent.repository.model.mesagingservice.MessagingServiceEntity; import com.solace.maas.ep.event.management.agent.scanManager.ScanManager; import com.solace.maas.ep.event.management.agent.scanManager.model.ScanRequestBO; @@ -26,16 +27,18 @@ public class CommandLineScan { private final MessagingServiceDelegateServiceImpl messagingServiceDelegateService; private final ImportService importService; private final CommandLineCommon commandLineCommon; + private final EventPortalProperties eventPortalProperties; public CommandLineScan(ScanManager scanManager, IDGenerator idGenerator, MessagingServiceDelegateServiceImpl messagingServiceDelegateService, ImportService importService, - CommandLineCommon commandLineCommon) { + CommandLineCommon commandLineCommon, EventPortalProperties eventPortalProperties) { this.scanManager = scanManager; this.idGenerator = idGenerator; this.messagingServiceDelegateService = messagingServiceDelegateService; this.importService = importService; this.commandLineCommon = commandLineCommon; + this.eventPortalProperties = eventPortalProperties; } public void runScan(String messagingServiceId, String filePathAndName) throws InterruptedException, IOException { @@ -45,6 +48,7 @@ public void runScan(String messagingServiceId, String filePathAndName) throws In .messagingServiceId(messagingServiceId) .scanId(idGenerator.generateRandomUniqueId()) .destinations(List.of("FILE_WRITER")) + .orgId(eventPortalProperties.getOrganizationId()) .build(); setScanType(messagingServiceEntity, scanRequestBO, messagingServiceId); diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/SolaceConfiguration.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/SolaceConfiguration.java index 70f0500e7..0100e3f63 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/SolaceConfiguration.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/SolaceConfiguration.java @@ -52,7 +52,7 @@ public String getTopicPrefix() { if (topicPrefix == null) { topicPrefix = String.format(TOPIC_PREFIX_FORMAT, - eventPortalProperties.getOrganizationId(), + Boolean.TRUE.equals(eventPortalProperties.getManaged()) ? "*" : eventPortalProperties.getOrganizationId(), eventPortalProperties.getRuntimeAgentId()); } return topicPrefix; diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/logging/StreamingAppender.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/logging/StreamingAppender.java index dace7f2e4..53a84571d 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/logging/StreamingAppender.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/logging/StreamingAppender.java @@ -26,9 +26,11 @@ public class StreamingAppender extends AppenderBase { @Override protected void append(ILoggingEvent event) { + String orgId = event.getMDCPropertyMap().get(RouteConstants.ORG_ID); if (!standalone) { if (StringUtils.isNotEmpty(event.getMDCPropertyMap().get(RouteConstants.SCAN_ID))) { - sendScanLogsAsync(event, + sendScanLogsAsync(orgId, + event, event.getMDCPropertyMap().get(RouteConstants.SCAN_ID), event.getMDCPropertyMap().get(RouteConstants.TRACE_ID), event.getMDCPropertyMap().get(RouteConstants.ACTOR_ID), @@ -36,7 +38,9 @@ protected void append(ILoggingEvent event) { event.getMDCPropertyMap().get(RouteConstants.SCHEDULE_ID), event.getMDCPropertyMap().get(RouteConstants.MESSAGING_SERVICE_ID)); } else if (StringUtils.isNotEmpty(event.getMDCPropertyMap().get(RouteConstants.COMMAND_CORRELATION_ID))) { - sendCommandLogsAsync(event, + sendCommandLogsAsync( + orgId, + event, event.getMDCPropertyMap().get(RouteConstants.COMMAND_CORRELATION_ID), event.getMDCPropertyMap().get(RouteConstants.TRACE_ID), event.getMDCPropertyMap().get(RouteConstants.ACTOR_ID), @@ -45,7 +49,7 @@ protected void append(ILoggingEvent event) { } } - private void sendCommandLogsAsync(ILoggingEvent event, String commandCorrelationId, String traceId, + private void sendCommandLogsAsync(String orgId, ILoggingEvent event, String commandCorrelationId, String traceId, String actorId, String messagingServiceId) { @@ -59,6 +63,7 @@ private void sendCommandLogsAsync(ILoggingEvent event, String commandCorrelation exchange.getIn().setHeader(RouteConstants.COMMAND_CORRELATION_ID, commandCorrelationId); exchange.getIn().setHeader(RouteConstants.TRACE_ID, traceId); exchange.getIn().setHeader(RouteConstants.ACTOR_ID, actorId); + exchange.getIn().setHeader(RouteConstants.ORG_ID, orgId); exchange.getIn().setHeader(RouteConstants.MESSAGING_SERVICE_ID, messagingServiceId); exchange.getIn().setBody(event); @@ -71,7 +76,7 @@ private void sendCommandLogsAsync(ILoggingEvent event, String commandCorrelation }); } - private void sendScanLogsAsync(ILoggingEvent event, String scanId, String traceId, String actorId, + private void sendScanLogsAsync(String orgId, ILoggingEvent event, String scanId, String traceId, String actorId, String scanType, String groupId, String messagingServiceId) { RouteEntity route = creatLoggingRoute(scanType, messagingServiceId); @@ -81,6 +86,7 @@ private void sendScanLogsAsync(ILoggingEvent event, String scanId, String traceI exchange.getIn().setHeader(RouteConstants.TRACE_ID, traceId); exchange.getIn().setHeader(RouteConstants.ACTOR_ID, actorId); exchange.getIn().setHeader(RouteConstants.SCAN_TYPE, scanType); + exchange.getIn().setHeader(RouteConstants.ORG_ID, orgId); exchange.getIn().setHeader(RouteConstants.SCHEDULE_ID, groupId); exchange.getIn().setHeader(RouteConstants.MESSAGING_SERVICE_ID, messagingServiceId); diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanDataImportPublishImportScanEventProcessor.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanDataImportPublishImportScanEventProcessor.java index 54da33bd9..53c6809cd 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanDataImportPublishImportScanEventProcessor.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanDataImportPublishImportScanEventProcessor.java @@ -20,7 +20,6 @@ @Component @ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false") public class ScanDataImportPublishImportScanEventProcessor implements Processor { - private final String orgId; private final String runtimeAgentId; private final ScanDataPublisher scanDataPublisher; @@ -29,7 +28,6 @@ public ScanDataImportPublishImportScanEventProcessor(ScanDataPublisher scanDataP this.scanDataPublisher = scanDataPublisher; - orgId = eventPortalProperties.getOrganizationId(); runtimeAgentId = eventPortalProperties.getRuntimeAgentId(); } @@ -50,6 +48,7 @@ public void process(Exchange exchange) throws Exception { String messagingServiceId = (String) properties.get(RouteConstants.MESSAGING_SERVICE_ID); String scanId = (String) properties.get(RouteConstants.SCAN_ID); + String orgId = (String) properties.get(RouteConstants.ORG_ID); Boolean isImportOp = (Boolean) properties.get(RouteConstants.IS_DATA_IMPORT); ScanDataImportMessage importDataMessage = diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanDataProcessor.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanDataProcessor.java index 88268ad92..20c294c2e 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanDataProcessor.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanDataProcessor.java @@ -23,7 +23,6 @@ public class ScanDataProcessor implements Processor { private final ScanDataPublisher scanDataPublisher; - private final String orgId; private final String runtimeAgentId; @Autowired @@ -31,8 +30,6 @@ public ScanDataProcessor(ScanDataPublisher scanDataPublisher, EventPortalPropert super(); this.scanDataPublisher = scanDataPublisher; - - orgId = eventPortalProperties.getOrganizationId(); runtimeAgentId = eventPortalProperties.getRuntimeAgentId(); } @@ -48,6 +45,7 @@ public void process(Exchange exchange) throws Exception { String traceId = (String) properties.get(RouteConstants.TRACE_ID); String actorId = (String) properties.get(RouteConstants.ACTOR_ID); String scanType = (String) properties.get(RouteConstants.SCAN_TYPE); + String orgId = (String) properties.get(RouteConstants.ORG_ID); Boolean isImportOp = (Boolean) properties.get(RouteConstants.IS_DATA_IMPORT); ScanDataMessage scanDataMessage = new ScanDataMessage(orgId, scanId, traceId, actorId, scanType, body, Instant.now().toString()); diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanLogsProcessor.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanLogsProcessor.java index c4f381fda..196926b3e 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanLogsProcessor.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanLogsProcessor.java @@ -19,7 +19,6 @@ @Component @ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false") public class ScanLogsProcessor implements Processor { - private final String orgId; private final String runtimeAgentId; private final ScanLogsPublisher logDataPublisher; @@ -30,7 +29,6 @@ public ScanLogsProcessor(ScanLogsPublisher logDataPublisher, EventPortalProperti this.logDataPublisher = logDataPublisher; - orgId = eventPortalProperties.getOrganizationId(); runtimeAgentId = eventPortalProperties.getRuntimeAgentId(); } @@ -42,6 +40,7 @@ public void process(Exchange exchange) throws Exception { ILoggingEvent event = (ILoggingEvent) exchange.getIn().getBody(); String scanId = (String) properties.get(RouteConstants.SCAN_ID); String traceId = (String) properties.get(RouteConstants.TRACE_ID); + String orgId = (String) properties.get(RouteConstants.ORG_ID); String actorId = (String) properties.get(RouteConstants.ACTOR_ID); String messagingServiceId = (String) properties.get(RouteConstants.MESSAGING_SERVICE_ID); diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanStatusOverAllProcessor.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanStatusOverAllProcessor.java index a378d9a5d..ade8ca02c 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanStatusOverAllProcessor.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanStatusOverAllProcessor.java @@ -22,14 +22,12 @@ @ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false") public class ScanStatusOverAllProcessor implements Processor { - private final String orgId; private final String runtimeAgentId; @Autowired public ScanStatusOverAllProcessor(EventPortalProperties eventPortalProperties) { super(); - orgId = eventPortalProperties.getOrganizationId(); runtimeAgentId = eventPortalProperties.getRuntimeAgentId(); } @@ -46,6 +44,7 @@ public void process(Exchange exchange) throws Exception { String description = (String) properties.get(RouteConstants.SCAN_STATUS_DESC); String scanType = (String) properties.get(RouteConstants.SCAN_TYPE); + String orgId = (String) properties.get(RouteConstants.ORG_ID); List scanTypes = Arrays.asList(scanType.split(",")); topicDetails.put("orgId", orgId); diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanStatusPerRouteProcessor.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanStatusPerRouteProcessor.java index 30b551ce7..b495a10ae 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanStatusPerRouteProcessor.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanStatusPerRouteProcessor.java @@ -19,14 +19,11 @@ @ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false") @SuppressWarnings("unchecked") public class ScanStatusPerRouteProcessor implements Processor { - private final String orgId; private final String runtimeAgentId; @Autowired public ScanStatusPerRouteProcessor(EventPortalProperties eventPortalProperties) { super(); - - orgId = eventPortalProperties.getOrganizationId(); runtimeAgentId = eventPortalProperties.getRuntimeAgentId(); } @@ -42,6 +39,7 @@ public void process(Exchange exchange) throws Exception { String scanType = (String) properties.get(RouteConstants.SCAN_TYPE); ScanStatus status = (ScanStatus) properties.get(RouteConstants.SCAN_STATUS); String description = (String) properties.get(RouteConstants.SCAN_STATUS_DESC); + String orgId = (String) properties.get(RouteConstants.ORG_ID); topicDetails.put("orgId", orgId); topicDetails.put("runtimeAgentId", runtimeAgentId); diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/HeartbeatGenerator.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/HeartbeatGenerator.java index 06901b617..96e53a8d8 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/HeartbeatGenerator.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/HeartbeatGenerator.java @@ -32,6 +32,7 @@ public class HeartbeatGenerator { private final String topic; private final String runtimeAgentVersion; private final MeterRegistry meterRegistry; + private final EventPortalProperties eventPortalProperties; public HeartbeatGenerator(SolaceConfiguration solaceConfiguration, EventPortalProperties eventPortalProperties, @@ -43,11 +44,13 @@ public HeartbeatGenerator(SolaceConfiguration solaceConfiguration, topic = solaceConfiguration.getTopicPrefix() + "heartbeat/v1"; this.runtimeAgentVersion = getFormattedVersion(buildProperties.getVersion()); this.meterRegistry = meterRegistry; + this.eventPortalProperties = eventPortalProperties; } @Scheduled(fixedRate = 5000) public void sendHeartbeat() { HeartbeatMessage message = new HeartbeatMessage(runtimeAgentId, Instant.now().toString(), runtimeAgentVersion); + message.setOrgId(eventPortalProperties.getOrganizationId()); boolean result = solacePublisher.publish(message, topic); logHealthMetric(message, result); } diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanDataPublisher.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanDataPublisher.java index 0b1e35b1a..2ce82060d 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanDataPublisher.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanDataPublisher.java @@ -42,12 +42,12 @@ public void sendScanData(MOPMessage message, Map topicDetails) { String topicString = isImport ? String.format("sc/ep/runtime/%s/%s/importScan/v1/%s", - topicDetails.get("orgId"), + message.getOrgId(), topicDetails.get("runtimeAgentId"), topicDetails.get("messagingServiceId")) : String.format("sc/ep/runtime/%s/%s/scan/data/v1/%s/%s/%s", - topicDetails.get("orgId"), + message.getOrgId(), topicDetails.get("runtimeAgentId"), topicDetails.get("messagingServiceId"), topicDetails.get("scanId"), @@ -58,6 +58,6 @@ public void sendScanData(MOPMessage message, Map topicDetails) { meterRegistry.counter(MAAS_EMA_SCAN_EVENT_SENT, STATUS_TAG, isSuccessful ? ScanStatus.COMPLETE.name() : ScanStatus.FAILED.name(), SCAN_ID_TAG, topicDetails.get("scanId"), - ORG_ID_TAG, topicDetails.get("orgId")).increment(); + ORG_ID_TAG, message.getOrgId()).increment(); } } diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanLogsPublisher.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanLogsPublisher.java index e9826e716..3991e9879 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanLogsPublisher.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanLogsPublisher.java @@ -22,7 +22,7 @@ public ScanLogsPublisher(SolacePublisher solacePublisher) { public void sendScanLogData(ScanLogMessage message, Map topicDetails) { String topicString = String.format("sc/ep/runtime/%s/%s/scan/logs/v1/%s/%s", - topicDetails.get("orgId"), + message.getOrgId(), topicDetails.get("runtimeAgentId"), topicDetails.get("messagingServiceId"), topicDetails.get("scanId")); diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanStatusPublisher.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanStatusPublisher.java index 077a63fc7..ba2cdd159 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanStatusPublisher.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanStatusPublisher.java @@ -43,9 +43,8 @@ public void sendOverallScanStatus(ScanStatusMessage message, Map String scanId = topicDetails.get("scanId"); String scanType = topicDetails.get("scanType"); String status = topicDetails.get("status"); - String topicString = String.format("sc/ep/runtime/%s/%s/scan/status/v1/%s/%s", - topicDetails.get("orgId"), + message.getOrgId(), topicDetails.get("runtimeAgentId"), topicDetails.get("messagingServiceId"), scanId); @@ -72,7 +71,7 @@ public void sendScanDataStatus(ScanDataStatusMessage message, Map scanStatusPublisherOpt; @@ -54,7 +54,6 @@ public ScanManager(MessagingServiceDelegateServiceImpl messagingServiceDelegateS this.scanService = scanService; this.scanStatusPublisherOpt = scanStatusPublisher; runtimeAgentId = eventPortalProperties.getRuntimeAgentId(); - orgId = eventPortalProperties.getOrganizationId(); } public String scan(ScanRequestBO scanRequestBO) { @@ -68,6 +67,7 @@ public String scan(ScanRequestBO scanRequestBO) { MDC.put(RouteConstants.TRACE_ID, traceId); MDC.put(RouteConstants.ACTOR_ID, actorId); MDC.put(RouteConstants.SCHEDULE_ID, groupId); + MDC.put(RouteConstants.ORG_ID, scanRequestBO.getOrgId()); MDC.put(RouteConstants.MESSAGING_SERVICE_ID, messagingServiceId); MessagingServiceEntity messagingServiceEntity = retrieveMessagingServiceEntity(messagingServiceId); @@ -114,7 +114,18 @@ public String scan(ScanRequestBO scanRequestBO) { .toList().stream() ).toList().stream().flatMap(List::stream).toList(); - return scanService.singleScan(routes, groupId, scanId, traceId, actorId, messagingServiceEntity, runtimeAgentId); + return scanService.singleScan( + SingleScanSpecification.builder() + .routeBundles(routes) + .orgId(scanRequestBO.getOrgId()) + .groupId(groupId) + .scanId(scanId) + .traceId(traceId) + .actorId(actorId) + .messagingServiceEntity(messagingServiceEntity) + .runtimeAgentId(runtimeAgentId) + .build() + ); } public void handleError(Exception e, ScanCommandMessage message) { @@ -137,7 +148,7 @@ public void handleError(Exception e, ScanCommandMessage message) { ); Map topicVars = Map.of( - "orgId", orgId, + "orgId", message.getOrgId(), "runtimeAgentId", runtimeAgentId ); scanStatusPublisher.sendOverallScanStatus(response, topicVars); @@ -162,8 +173,8 @@ public Page findByMessagingServiceId(String messagingServiceId, Page public boolean isScanComplete(String scanId) { if (ObjectUtils.isEmpty(scanId)) { - throw new IllegalArgumentException("Scan ID cannot be null or empty"); + throw new IllegalArgumentException("Scan ID cannot be null or empty"); } - return scanService.isScanComplete(scanId); + return scanService.isScanComplete(scanId); } } diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/model/ScanRequestBO.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/model/ScanRequestBO.java index cae964a50..a16eaa9fe 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/model/ScanRequestBO.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/model/ScanRequestBO.java @@ -17,6 +17,8 @@ @SuppressWarnings("PMD") public class ScanRequestBO extends AbstractBaseBO { + private String orgId; + private String messagingServiceId; private String scanId; diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/model/SingleScanSpecification.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/model/SingleScanSpecification.java new file mode 100644 index 000000000..b83a63312 --- /dev/null +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/model/SingleScanSpecification.java @@ -0,0 +1,82 @@ +package com.solace.maas.ep.event.management.agent.scanManager.model; + +import com.solace.maas.ep.event.management.agent.plugin.route.RouteBundle; +import com.solace.maas.ep.event.management.agent.repository.model.mesagingservice.MessagingServiceEntity; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; + +import java.util.List; + +@Getter +@Setter +@Builder +public class SingleScanSpecification { + private String orgId; + /** + * The concept of a RouteBundle is introduced to make chaining routes easier + *

+ * Each RouteBundle contains a routeId and scanType for a route to be executed, + * plus a list of destinations and recipients. + *

+ * A destination is another route that is called after the route described by the routeId + * and scanId is completed. Destination routes cannot be chained. + *

+ * A recipient is another RouteBundle and is the mechanism by which routes are chained + * together into a scan. + *

+ * The following code is an example of how 3 routes can be chained together for a single scan. + *

+ * RouteBundle fileWriterDestination = RouteBundle.builder() + * .destinations(List.of()) + * .scanType("") + * .routeId("seda:dataCollectionFileWrite") + * .recipients(List.of()) + * .messagingServiceId(myMessagingService.getId()) + * .firstRouteInChain(false) + * .build(); + *

+ * RouteBundle loggingDestination = RouteBundle.builder() + * .destinations(List.of()) + * .scanType("") + * .routeId("log:test") + * .recipients(List.of()) + * .messagingServiceId(myMessagingService.getId()) + * .firstRouteInChain(false) + * .build(); + *

+ * RouteBundle solaceSubscriptionConfiguration = RouteBundle.builder() + * .destinations(List.of(fileWriterDestination)) + * .scanType("subscriptionConfiguration") + * .routeId("solaceSubscriptionConfiguration") + * .recipients(List.of()) + * .messagingServiceId(myMessagingService.getId()) + * .firstRouteInChain(false) + * .build(); + *

+ * RouteBundle solaceQueueListing = RouteBundle.builder() + * .destinations(List.of(loggingDestination)) + * .scanType("queueListing") + * .routeId("solaceDataPublisher") + * .recipients(List.of(solaceSubscriptionConfiguration)) + * .messagingServiceId(myMessagingService.getId()) + * .firstRouteInChain(true) + * .build(); + *

+ * RouteBundle solaceQueueConfiguration = RouteBundle.builder() + * .destinations(List.of(fileWriterDestination)) + * .scanType("queueConfiguration") + * .routeId("solaceQueueConfiguration") + * .recipients(List.of()) + * .messagingServiceId(myMessagingService.getId()) + * .firstRouteInChain(true) + * .build(); + */ + private List routeBundles; + private String groupId; + private String scanId; + private String traceId; + private String actorId; + private MessagingServiceEntity messagingServiceEntity; + private String runtimeAgentId; +} \ No newline at end of file diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/rest/EMAControllerImpl.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/rest/EMAControllerImpl.java index 92b6383b7..11fb74649 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/rest/EMAControllerImpl.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/rest/EMAControllerImpl.java @@ -50,7 +50,7 @@ public ResponseEntity scan(@PathVariable(value = "resourceId") String me ScanRequestBO scanRequestBO = scanRequestMapper.map(body); scanRequestBO.setMessagingServiceId(messagingServiceId); scanRequestBO.setScanId(idGenerator.generateRandomUniqueId()); - + scanRequestBO.setOrgId(eventPortalProperties.getOrganizationId()); boolean isEMAStandalone = eventPortalProperties.getGateway().getMessaging().isStandalone(); List destinations = scanRequestBO.getDestinations(); diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java index c6d1e9919..19e21576b 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java @@ -20,6 +20,7 @@ import com.solace.maas.ep.event.management.agent.repository.scan.ScanTypeRepository; import com.solace.maas.ep.event.management.agent.scanManager.model.ScanItemBO; import com.solace.maas.ep.event.management.agent.scanManager.model.ScanTypeBO; +import com.solace.maas.ep.event.management.agent.scanManager.model.SingleScanSpecification; import com.solace.maas.ep.event.management.agent.util.IDGenerator; import io.micrometer.core.instrument.MeterRegistry; import lombok.extern.slf4j.Slf4j; @@ -93,69 +94,22 @@ public ScanService(ScanRepository repository, } /** - * The concept of a RouteBundle is introduced to make chaining routes easier - *

- * Each RouteBundle contains a routeId and scanType for a route to be executed, - * plus a list of destinations and recipients. - *

- * A destination is another route that is called after the route described by the routeId - * and scanId is completed. Destination routes cannot be chained. - *

- * A recipient is another RouteBundle and is the mechanism by which routes are chained - * together into a scan. - *

- * The following code is an example of how 3 routes can be chained together for a single scan. - *

- * RouteBundle fileWriterDestination = RouteBundle.builder() - * .destinations(List.of()) - * .scanType("") - * .routeId("seda:dataCollectionFileWrite") - * .recipients(List.of()) - * .messagingServiceId(myMessagingService.getId()) - * .firstRouteInChain(false) - * .build(); - *

- * RouteBundle loggingDestination = RouteBundle.builder() - * .destinations(List.of()) - * .scanType("") - * .routeId("log:test") - * .recipients(List.of()) - * .messagingServiceId(myMessagingService.getId()) - * .firstRouteInChain(false) - * .build(); - *

- * RouteBundle solaceSubscriptionConfiguration = RouteBundle.builder() - * .destinations(List.of(fileWriterDestination)) - * .scanType("subscriptionConfiguration") - * .routeId("solaceSubscriptionConfiguration") - * .recipients(List.of()) - * .messagingServiceId(myMessagingService.getId()) - * .firstRouteInChain(false) - * .build(); - *

- * RouteBundle solaceQueueListing = RouteBundle.builder() - * .destinations(List.of(loggingDestination)) - * .scanType("queueListing") - * .routeId("solaceDataPublisher") - * .recipients(List.of(solaceSubscriptionConfiguration)) - * .messagingServiceId(myMessagingService.getId()) - * .firstRouteInChain(true) - * .build(); - *

- * RouteBundle solaceQueueConfiguration = RouteBundle.builder() - * .destinations(List.of(fileWriterDestination)) - * .scanType("queueConfiguration") - * .routeId("solaceQueueConfiguration") - * .recipients(List.of()) - * .messagingServiceId(myMessagingService.getId()) - * .firstRouteInChain(true) - * .build(); + * Initiates a single scan for a Messaging Service. * - * @param routeBundles - see description above + * @param singleScanSpecification The specification for the single scan. * @return The id of the scan. */ - public String singleScan(List routeBundles, String groupId, String scanId, String traceId, String actorId, - MessagingServiceEntity messagingServiceEntity, String runtimeAgentId) { + public String singleScan(SingleScanSpecification singleScanSpecification) { + + String scanId = singleScanSpecification.getScanId(); + String traceId = singleScanSpecification.getTraceId(); + String orgId = singleScanSpecification.getOrgId(); + List routeBundles = singleScanSpecification.getRouteBundles(); + String actorId = singleScanSpecification.getActorId(); + MessagingServiceEntity messagingServiceEntity = singleScanSpecification.getMessagingServiceEntity(); + String runtimeAgentId = singleScanSpecification.getRuntimeAgentId(); + String groupId = singleScanSpecification.getGroupId(); + log.info("Scan request [{}], trace ID [{}]: Starting a single scan.", scanId, traceId); List scanTypes = parseRouteBundle(routeBundles, new ArrayList<>()); @@ -168,7 +122,7 @@ public String singleScan(List routeBundles, String groupId, String log.info("Scan request [{}], trace ID [{}]: Total of {} scan types to be retrieved: [{}].", scanId, traceId, scanTypes.size(), StringUtils.join(scanTypes, ", ")); - sendScanStatus(groupId, scanId, traceId, actorId, routeBundles.stream().findFirst().orElseThrow().getMessagingServiceId(), + sendScanStatus(orgId, groupId, scanId, traceId, actorId, routeBundles.stream().findFirst().orElseThrow().getMessagingServiceId(), StringUtils.join(scanTypes, ","), ScanStatus.IN_PROGRESS); log.trace("RouteBundles to be processed: {}", routeBundles); @@ -185,7 +139,7 @@ public String singleScan(List routeBundles, String groupId, String updateScan(route, routeBundle, returnedScanEntity); - scanAsync(groupId, scanEntityId, traceId, actorId, route, routeBundle.getMessagingServiceId()); + scanAsync(orgId, groupId, scanEntityId, traceId, actorId, route, routeBundle.getMessagingServiceId()); } return scanId; @@ -287,12 +241,19 @@ public Optional findById(String scanId) { * @param scanTypes The scan types included in the scan request. * @param status The status of scan. */ - public void sendScanStatus(String groupId, String scanId, String traceId, String actorId, String messagingServiceId, String scanTypes, + public void sendScanStatus(String orgId, + String groupId, + String scanId, + String traceId, + String actorId, + String messagingServiceId, + String scanTypes, ScanStatus status) { producerTemplate.send("direct:overallScanStatusPublisher?block=false&failIfNoConsumers=false", exchange -> { exchange.getIn().setHeader(RouteConstants.SCHEDULE_ID, groupId); exchange.getIn().setHeader(RouteConstants.SCAN_ID, scanId); exchange.getIn().setHeader(RouteConstants.TRACE_ID, traceId); + exchange.getIn().setHeader(RouteConstants.ORG_ID, orgId); exchange.getIn().setHeader(RouteConstants.ACTOR_ID, actorId); exchange.getIn().setHeader(RouteConstants.MESSAGING_SERVICE_ID, messagingServiceId); exchange.getIn().setHeader(RouteConstants.SCAN_TYPE, scanTypes); @@ -302,13 +263,14 @@ public void sendScanStatus(String groupId, String scanId, String traceId, String meterRegistry.counter(MAAS_EMA_SCAN_EVENT_SENT, STATUS_TAG, status.name(), SCAN_ID_TAG, scanId).increment(); } - protected CompletableFuture scanAsync(String groupId, String scanId, String traceId, String actorId, + protected CompletableFuture scanAsync(String orgId, String groupId, String scanId, String traceId, String actorId, RouteEntity route, String messagingServiceId) { return producerTemplate.asyncSend("seda:" + route.getId(), exchange -> { // Need to set headers to let the Route have access to the Scan ID, Group ID, and Messaging Service ID. exchange.getIn().setHeader(RouteConstants.SCHEDULE_ID, groupId); exchange.getIn().setHeader(RouteConstants.SCAN_ID, scanId); exchange.getIn().setHeader(RouteConstants.TRACE_ID, traceId); + exchange.getIn().setHeader(RouteConstants.ORG_ID, orgId); exchange.getIn().setHeader(RouteConstants.ACTOR_ID, actorId); exchange.getIn().setHeader(RouteConstants.MESSAGING_SERVICE_ID, messagingServiceId); exchange.getIn().setHeader(RouteConstants.SCAN_STATUS_DESC, ""); @@ -322,6 +284,7 @@ protected CompletableFuture scanAsync(String groupId, String scanId, S MDC.put(RouteConstants.SCHEDULE_ID, groupId); MDC.put(RouteConstants.SCAN_ID, scanId); + MDC.put(RouteConstants.ORG_ID, orgId); MDC.put(RouteConstants.TRACE_ID, traceId); MDC.put(RouteConstants.ACTOR_ID, actorId); MDC.put(RouteConstants.MESSAGING_SERVICE_ID, messagingServiceId); diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java index 6d7f80307..c5a1180cb 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java @@ -75,6 +75,7 @@ public void processMessage(ScanCommandMessage message) { ScanRequestBO scanRequestBO = ScanRequestBO.builder() .messagingServiceId(message.getMessagingServiceId()) .scanId(scanId) + .orgId(message.getOrgId()) .traceId(message.getTraceId()) .actorId(message.getActorId()) .scanTypes(entityTypes) diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/cli/CliScanTest.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/cli/CliScanTest.java index 5e9022412..69eb3cf89 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/cli/CliScanTest.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/cli/CliScanTest.java @@ -65,7 +65,10 @@ public void setUp() { @Test public void testScanCLICommand() throws Exception { - when(scanService.singleScan(any(), any(), any(), any(), any(), any(MessagingServiceEntity.class), anyString())).thenReturn("xyz"); + + when(scanService.singleScan( + any() + )).thenReturn("xyz"); when(scanService.findById(anyString())).thenReturn(Optional.of(ScanEntity.builder() .id("abcdef") .messagingService(MessagingServiceEntity.builder() diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerTest.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerTest.java index cb9803608..720f591d7 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerTest.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerTest.java @@ -9,6 +9,7 @@ import com.solace.maas.ep.event.management.agent.plugin.route.RouteBundle; import com.solace.maas.ep.event.management.agent.repository.model.mesagingservice.MessagingServiceEntity; import com.solace.maas.ep.event.management.agent.scanManager.model.ScanRequestBO; +import com.solace.maas.ep.event.management.agent.scanManager.model.SingleScanSpecification; import com.solace.maas.ep.event.management.agent.service.MessagingServiceDelegateServiceImpl; import com.solace.maas.ep.event.management.agent.service.ScanService; import lombok.SneakyThrows; @@ -63,11 +64,21 @@ void testScanManagerExceptions() { when(messagingServiceDelegateService.getMessagingServiceById("id")) .thenReturn(messagingServiceEntity); - when(scanService.singleScan(List.of(), "groupId", "scanId", "traceId", "actorId", - mock(MessagingServiceEntity.class), "runtimeAgent1")) - .thenReturn(Mockito.anyString()); + when(scanService.singleScan( + SingleScanSpecification + .builder() + .groupId("groupId") + .scanId("scanId") + .traceId("traceId") + .actorId("actorId") + .messagingServiceEntity(mock(MessagingServiceEntity.class)) + .runtimeAgentId("runtimeAgent1") + .routeBundles(List.of()) + .build() + )).thenReturn(Mockito.anyString()); ScanRequestBO scanRequestBO = new ScanRequestBO( + "orgId", "id", "scanId", "traceId", @@ -78,6 +89,7 @@ void testScanManagerExceptions() { Assertions.assertThrows(NullPointerException.class, () -> scanManager.scan(scanRequestBO)); ScanRequestBO scanRequestBOTopics = new ScanRequestBO( + "orgId", "id", "scanId", "traceId", @@ -88,6 +100,7 @@ void testScanManagerExceptions() { Assertions.assertThrows(NullPointerException.class, () -> scanManager.scan(scanRequestBOTopics)); ScanRequestBO scanRequestBOConsumerGroups = new ScanRequestBO( + "orgId", "id", "scanId", "traceId", @@ -105,6 +118,7 @@ void testScanManager() { String confluentSchemaRegistryId = "confluentId"; ScanRequestBO scanRequestBO = new ScanRequestBO( + "orgId", messagingServiceId, "scanId", "traceId", "actorId", List.of("KAFKA_ALL", "CONFLUENT_SCHEMA_REGISTRY_SCHEMA"), List.of("FILE_WRITER")); @@ -151,10 +165,19 @@ void testScanManager() { .thenReturn(destinations); when(kafkaRouteDelegate.generateRouteList(destinations, List.of(), "KAFKA_ALL", messagingServiceId)) .thenReturn(routes); - when(scanService.singleScan(List.of(), "groupId", "scanId", "traceId", "actorId", - mock(MessagingServiceEntity.class), "runtimeAgent1")) - .thenReturn(Mockito.anyString()); + when(scanService.singleScan( + SingleScanSpecification + .builder() + .groupId("groupId") + .scanId("scanId") + .traceId("traceId") + .actorId("actorId") + .messagingServiceEntity(mock(MessagingServiceEntity.class)) + .runtimeAgentId("runtimeAgent1") + .routeBundles(List.of()) + .build() + )).thenReturn(Mockito.anyString()); scanManager.scan(scanRequestBO); assertThatNoException(); diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/mapper/ScanRequestMapperTest.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/mapper/ScanRequestMapperTest.java index 05d1bb4f5..790b6eff1 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/mapper/ScanRequestMapperTest.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/mapper/ScanRequestMapperTest.java @@ -28,6 +28,7 @@ public void testMapper() { scanRequestMapper.map((ScanRequestDTO) null); ScanRequestBO scanRequestBO = new ScanRequestBO( + "orgId", "id", "scanId", "traceId", @@ -45,6 +46,7 @@ public void testMapper() { public void testMapperWithUser() { User user = new User("orgId", "userId"); ScanRequestBO scanRequestBO = new ScanRequestBO( + "orgId", "id", "scanId", "traceId", diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/rest/EMAControllerTest.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/rest/EMAControllerTest.java index a66892820..1a9ccf755 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/rest/EMAControllerTest.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/rest/EMAControllerTest.java @@ -42,6 +42,7 @@ public void testEMAControllerInConnectedMode() { ScanRequestDTO scanRequestDTO = new ScanRequestDTO(List.of("topics"), List.of()); ScanRequestBO scanRequestBO = new ScanRequestBO( + "orgId", "id", "scanConnected", "traceId", @@ -73,6 +74,7 @@ public void testEMAControllerInStandAloneModeWithEventPortalDestination() { ScanRequestDTO scanRequestDTO = new ScanRequestDTO(List.of("topics"), List.of("EVENT_PORTAL")); ScanRequestBO scanRequestBO = new ScanRequestBO( + "orgId", "id", "scanId", "traceId", @@ -103,6 +105,7 @@ public void testEMAControllerInStandAloneMode() { ScanRequestDTO scanRequestDTO = new ScanRequestDTO(List.of("topics"), List.of()); ScanRequestBO scanRequestBO = new ScanRequestBO( + "orgId", "id", "scanId", "traceId", diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java index fe84eba03..900459138 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java @@ -16,6 +16,7 @@ import com.solace.maas.ep.event.management.agent.repository.scan.ScanRepository; import com.solace.maas.ep.event.management.agent.repository.scan.ScanStatusRepository; import com.solace.maas.ep.event.management.agent.repository.scan.ScanTypeRepository; +import com.solace.maas.ep.event.management.agent.scanManager.model.SingleScanSpecification; import com.solace.maas.ep.event.management.agent.service.logging.LoggingService; import com.solace.maas.ep.event.management.agent.util.IDGenerator; import io.micrometer.core.instrument.Meter; @@ -156,13 +157,21 @@ public void testSingleScanWithRouteBundle() { when(meterRegistry.counter(any(), any(String[].class))) .thenReturn(new NoopCounter(new Meter.Id("noop", null, null, null, null))); - scanService.singleScan(List.of(topicListing, consumerGroups, additionalConsumerGroupConfigBundle), - "groupId", - "scanId", - "traceId", - "actorId", - mock(MessagingServiceEntity.class), - "runtimeAgent1"); + + scanService.singleScan( + SingleScanSpecification + .builder() + .orgId("orgId") + .traceId("traceId") + .groupId("groupId") + .scanId("scanId") + .actorId("actorId") + .messagingServiceEntity(mock(MessagingServiceEntity.class)) + .runtimeAgentId("runtimeAgent1") + .routeBundles(List.of(topicListing, consumerGroups, additionalConsumerGroupConfigBundle)) + .build() + + ); ArgumentCaptor scanCaptor = ArgumentCaptor.forClass(ScanStatusEntity.class); verify(scanStatusRepository, times(8)).save(scanCaptor.capture()); @@ -325,7 +334,7 @@ public void testSendScanStatus() { mock(ScanTypeRepository.class), mock(ScanStatusRepository.class), mock(ScanRouteService.class), mock(RouteService.class), template, idGenerator, meterRegistry); - service.sendScanStatus("scanId", "groupId", "messagingServiceId", "traceId", "actorId", + service.sendScanStatus("orgId", "scanId", "groupId", "messagingServiceId", "traceId", "actorId", "queueListing", ScanStatus.IN_PROGRESS); assertThatNoException(); @@ -373,10 +382,9 @@ public void testScanStatusIncompleteWithSingleScan(String scanStatus) { } - - @Test - @SneakyThrows - public void testScanStatusCompletedWithMultipleScans(){ + @Test + @SneakyThrows + public void testScanStatusCompletedWithMultipleScans() { ScanStatusEntity scanStatusA = scanServiceHelper.buildScanStatusEntity("status1", "COMPLETE"); ScanTypeEntity scanTypeA = scanServiceHelper.buildScanTypeEntity("123", "queueListing", null, scanStatusA); @@ -384,7 +392,7 @@ public void testScanStatusCompletedWithMultipleScans(){ ScanTypeEntity scanTypeB = scanServiceHelper.buildScanTypeEntity("124", "queueListing", null, scanStatusB); when(scanTypeRepository.findAllByScanId(any(String.class))) - .thenReturn(List.of(scanTypeA,scanTypeB)); + .thenReturn(List.of(scanTypeA, scanTypeB)); when(scanStatusRepository.findByScanType(scanTypeA)) .thenReturn(scanStatusA); when(scanStatusRepository.findByScanType(scanTypeB)) @@ -403,7 +411,7 @@ public void testScanStatusNotCompleteWithMultipleScans() { ScanTypeEntity scanTypeB = scanServiceHelper.buildScanTypeEntity("124", "queueConfiguration", null, scanStatusB); when(scanTypeRepository.findAllByScanId(any(String.class))) - .thenReturn(List.of(scanTypeA,scanTypeB)); + .thenReturn(List.of(scanTypeA, scanTypeB)); when(scanStatusRepository.findByScanType(scanTypeA)) .thenReturn(scanStatusA); when(scanStatusRepository.findByScanType(scanTypeB)) diff --git a/service/plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/constants/RouteConstants.java b/service/plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/constants/RouteConstants.java index a78ea2b29..d31d1571e 100644 --- a/service/plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/constants/RouteConstants.java +++ b/service/plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/constants/RouteConstants.java @@ -8,6 +8,7 @@ public class RouteConstants { public static final String SCHEDULE_ID = "SCHEDULE_ID"; public static final String SCAN_ID = "SCAN_ID"; + public static final String ORG_ID = "ORG_ID"; public static final String TRACE_ID = "traceId"; diff --git a/service/plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/processor/logging/MDCProcessor.java b/service/plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/processor/logging/MDCProcessor.java index e30269eed..f8c9e7542 100644 --- a/service/plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/processor/logging/MDCProcessor.java +++ b/service/plugin/src/main/java/com/solace/maas/ep/event/management/agent/plugin/processor/logging/MDCProcessor.java @@ -19,6 +19,9 @@ public void process(Exchange exchange) throws Exception { MDC.put(RouteConstants.TRACE_ID, exchange.getIn().getHeader(RouteConstants.TRACE_ID, String.class)); + MDC.put(RouteConstants.ORG_ID, + exchange.getIn().getHeader(RouteConstants.ORG_ID, String.class)); + MDC.put(RouteConstants.ACTOR_ID, exchange.getIn().getHeader(RouteConstants.ACTOR_ID, String.class)); From 3822d1badfb6d8f097766e6683466174a519799b Mon Sep 17 00:00:00 2001 From: rudraneel-chakraborty Date: Tue, 28 Jan 2025 15:54:53 -0500 Subject: [PATCH 2/5] DATAGO-89836 code complete --- .../agent/processor/ScanDataProcessor.java | 10 +- .../agent/scanManager/ScanManager.java | 3 + .../management/agent/service/ScanService.java | 7 +- .../ScanCommandMessageProcessor.java | 3 + .../ScanManagerHandleErrorTest.java | 4 +- .../agent/scanManager/ScanManagerTest.java | 4 - .../agent/service/ScanServiceTests.java | 8 -- .../PersistentMessageHandlerTests.java | 1 + .../ScanJobPersistentMessageHandlerTests.java | 117 ++++++++++++------ .../ScanCommandMessageProcessorTests.java | 80 ++++++++++-- 10 files changed, 178 insertions(+), 59 deletions(-) diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanDataProcessor.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanDataProcessor.java index 20c294c2e..f3f0616ed 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanDataProcessor.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/processor/ScanDataProcessor.java @@ -48,7 +48,15 @@ public void process(Exchange exchange) throws Exception { String orgId = (String) properties.get(RouteConstants.ORG_ID); Boolean isImportOp = (Boolean) properties.get(RouteConstants.IS_DATA_IMPORT); - ScanDataMessage scanDataMessage = new ScanDataMessage(orgId, scanId, traceId, actorId, scanType, body, Instant.now().toString()); + ScanDataMessage scanDataMessage = new ScanDataMessage( + orgId, + scanId, + traceId, + actorId, + scanType, + body, + Instant.now().toString() + ); topicDetails.put("orgId", orgId); topicDetails.put("runtimeAgentId", runtimeAgentId); diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManager.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManager.java index 1976b9b43..7123e21d0 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManager.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManager.java @@ -18,6 +18,7 @@ import com.solace.maas.ep.event.management.agent.service.ScanService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; +import org.apache.commons.lang3.Validate; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Page; @@ -57,6 +58,7 @@ public ScanManager(MessagingServiceDelegateServiceImpl messagingServiceDelegateS } public String scan(ScanRequestBO scanRequestBO) { + Validate.notBlank(scanRequestBO.getOrgId(), " Organization ID cannot be null or empty"); String messagingServiceId = scanRequestBO.getMessagingServiceId(); String scanId = scanRequestBO.getScanId(); String traceId = scanRequestBO.getTraceId(); @@ -130,6 +132,7 @@ public String scan(ScanRequestBO scanRequestBO) { public void handleError(Exception e, ScanCommandMessage message) { + Validate.notBlank(message.getOrgId()," Organization ID cannot be null or empty"); if (scanStatusPublisherOpt.isEmpty()) { return; } diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java index 19e21576b..f4ba4f55c 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java @@ -28,6 +28,7 @@ import org.apache.camel.Exchange; import org.apache.camel.ProducerTemplate; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.Validate; import org.slf4j.MDC; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; @@ -100,7 +101,7 @@ public ScanService(ScanRepository repository, * @return The id of the scan. */ public String singleScan(SingleScanSpecification singleScanSpecification) { - + Validate.notBlank(singleScanSpecification.getOrgId(), "Organization ID cannot be null or empty"); String scanId = singleScanSpecification.getScanId(); String traceId = singleScanSpecification.getTraceId(); String orgId = singleScanSpecification.getOrgId(); @@ -249,6 +250,8 @@ public void sendScanStatus(String orgId, String messagingServiceId, String scanTypes, ScanStatus status) { + + Validate.notBlank(orgId, "Organization ID cannot be null or empty"); producerTemplate.send("direct:overallScanStatusPublisher?block=false&failIfNoConsumers=false", exchange -> { exchange.getIn().setHeader(RouteConstants.SCHEDULE_ID, groupId); exchange.getIn().setHeader(RouteConstants.SCAN_ID, scanId); @@ -265,6 +268,8 @@ public void sendScanStatus(String orgId, protected CompletableFuture scanAsync(String orgId, String groupId, String scanId, String traceId, String actorId, RouteEntity route, String messagingServiceId) { + + Validate.notBlank(orgId, "Organization ID cannot be null or empty"); return producerTemplate.asyncSend("seda:" + route.getId(), exchange -> { // Need to set headers to let the Route have access to the Scan ID, Group ID, and Messaging Service ID. exchange.getIn().setHeader(RouteConstants.SCHEDULE_ID, groupId); diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java index c5a1180cb..3e8229977 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java @@ -8,6 +8,7 @@ import lombok.extern.slf4j.Slf4j; import net.logstash.logback.encoder.org.apache.commons.lang3.StringUtils; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.Validate; import org.awaitility.core.ConditionTimeoutException; import org.slf4j.MDC; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -16,6 +17,7 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; + import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_SCAN_EVENT_RECEIVED; import static com.solace.maas.ep.common.metrics.ObservabilityConstants.SCAN_ID_TAG; import static java.util.concurrent.TimeUnit.SECONDS; @@ -45,6 +47,7 @@ public ScanCommandMessageProcessor(ScanManager scanManager, @Override public void processMessage(ScanCommandMessage message) { MDC.clear(); + Validate.notBlank(message.getOrgId(), "Organization ID cannot be null or empty"); String scanId = StringUtils.isEmpty(message.getScanId()) ? UUID.randomUUID().toString() : message.getScanId(); meterRegistry.counter(MAAS_EMA_SCAN_EVENT_RECEIVED, SCAN_ID_TAG, scanId).increment(); diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerHandleErrorTest.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerHandleErrorTest.java index 932f76c3c..f98dea399 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerHandleErrorTest.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerHandleErrorTest.java @@ -75,11 +75,13 @@ void testScanManagerStandaloneHandleError(){ } private ScanCommandMessage createScanCommandMessage(){ - return new ScanCommandMessage( + ScanCommandMessage msg = new ScanCommandMessage( "messageServiceId", "scanId", List.of(ScanType.SOLACE_ALL), List.of(ScanDestination.EVENT_PORTAL), null); + msg.setOrgId("orgId"); + return msg; } } diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerTest.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerTest.java index 720f591d7..5b908139f 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerTest.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerTest.java @@ -1,7 +1,6 @@ package com.solace.maas.ep.event.management.agent.scanManager; import com.solace.maas.ep.event.management.agent.TestConfig; -import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties; import com.solace.maas.ep.event.management.agent.plugin.confluentSchemaRegistry.route.delegate.ConfluentSchemaRegistryRouteDelegateImpl; import com.solace.maas.ep.event.management.agent.plugin.kafka.route.delegate.KafkaRouteDelegateImpl; import com.solace.maas.ep.event.management.agent.plugin.localstorage.route.delegate.DataCollectionFileWriterDelegateImpl; @@ -35,9 +34,6 @@ @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = TestConfig.class) class ScanManagerTest { - @Mock - EventPortalProperties eventPortalProperties; - @Mock MessagingServiceDelegateServiceImpl messagingServiceDelegateService; diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java index 900459138..220e40c8a 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java @@ -2,7 +2,6 @@ import com.solace.maas.ep.event.management.agent.TestConfig; import com.solace.maas.ep.event.management.agent.config.plugin.enumeration.MessagingServiceType; -import com.solace.maas.ep.event.management.agent.logging.StreamingAppender; import com.solace.maas.ep.event.management.agent.plugin.constants.ScanStatus; import com.solace.maas.ep.event.management.agent.plugin.route.RouteBundle; import com.solace.maas.ep.event.management.agent.plugin.route.RouteBundleHierarchyStore; @@ -17,7 +16,6 @@ import com.solace.maas.ep.event.management.agent.repository.scan.ScanStatusRepository; import com.solace.maas.ep.event.management.agent.repository.scan.ScanTypeRepository; import com.solace.maas.ep.event.management.agent.scanManager.model.SingleScanSpecification; -import com.solace.maas.ep.event.management.agent.service.logging.LoggingService; import com.solace.maas.ep.event.management.agent.util.IDGenerator; import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.MeterRegistry; @@ -62,12 +60,6 @@ public class ScanServiceTests { private final String routeId = UUID.randomUUID().toString(); - @Mock - LoggingService loggingService; - - @Mock - StreamingAppender streamingAppender; - @Mock private ScanRepository scanRepository; diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java index 38185f78b..1b689db69 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java @@ -142,6 +142,7 @@ void testScanCommandMessageHandler() { ScanCommandMessage scanCommandMessage = new ScanCommandMessage("messagingServiceId", "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL)); + scanCommandMessage.setOrgId(eventPortalProperties.getOrganizationId()); when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage)); when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn( ScanCommandMessage.class.getCanonicalName() diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java index 377352f1a..bdd859d1f 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java @@ -9,13 +9,16 @@ import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties; import com.solace.maas.ep.event.management.agent.plugin.mop.MOPConstants; import com.solace.maas.ep.event.management.agent.scanManager.ScanManager; +import com.solace.maas.ep.event.management.agent.scanManager.model.ScanRequestBO; import com.solace.maas.ep.event.management.agent.subscriber.messageProcessors.ScanCommandMessageProcessor; import com.solace.messaging.MessagingService; import com.solace.messaging.receiver.InboundMessage; import com.solace.messaging.receiver.PersistentMessageReceiver; import lombok.extern.slf4j.Slf4j; +import org.assertj.core.api.SoftAssertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @@ -26,6 +29,7 @@ import java.util.List; import static com.solace.maas.ep.common.model.ScanDestination.EVENT_PORTAL; +import static com.solace.maas.ep.common.model.ScanDestination.FILE_WRITER; import static com.solace.maas.ep.common.model.ScanType.SOLACE_ALL; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; @@ -95,26 +99,62 @@ void setup() { solacePersistentMessageHandler.setMessageHandlerObserver(messageHandlerObserver); } + @Test + void testScanManagerIsInvokedWithCorrectScanRequest() { + ArgumentCaptor captor = ArgumentCaptor.forClass(ScanRequestBO.class); + setupMocks(true); + solacePersistentMessageHandler.onMessage(inboundMessage); + await().atMost(5, SECONDS).until(() -> messageHandlerObserver.hasAcknowledgedMessage(inboundMessage)); + verify(scanCommandMessageProcessor, times(1)).processMessage(any()); + verify(scanManager).scan(captor.capture()); + ScanRequestBO capturedArgument = captor.getValue(); + SoftAssertions softly = new SoftAssertions(); + softly.assertThat(capturedArgument.getOrgId()).isEqualTo("orgId"); + softly.assertThat(capturedArgument.getMessagingServiceId()).isEqualTo("messagingServiceId"); + softly.assertThat(capturedArgument.getScanId()).isEqualTo("scanId"); + softly.assertThat(capturedArgument.getScanTypes()).containsExactly(SOLACE_ALL.name()); + softly.assertThat(capturedArgument.getDestinations()).contains(EVENT_PORTAL.name(), FILE_WRITER.name()); + softly.assertAll(); + + verify(scanCommandMessageProcessor, times(1)).processMessage(any()); + + + } + + + @Test + void testOnFailureIsInvokedWithCorrectScanCommandMessage() { + ArgumentCaptor captor = ArgumentCaptor.forClass(ScanCommandMessage.class); + setupMocksForScanFailure(); + solacePersistentMessageHandler.onMessage(inboundMessage); + await().atMost(5, SECONDS).until(() -> messageHandlerObserver.hasAcknowledgedMessage(inboundMessage)); + verify(scanCommandMessageProcessor).onFailure(any(), captor.capture()); + ScanCommandMessage capturedArgument = captor.getValue(); + SoftAssertions softly = new SoftAssertions(); + softly.assertThat(capturedArgument.getOrgId()).isEqualTo("orgId"); + softly.assertThat(capturedArgument.getMessagingServiceId()).isEqualTo("messagingServiceId"); + softly.assertThat(capturedArgument.getScanId()).isEqualTo("scanId"); + softly.assertThat(capturedArgument.getScanTypes()).containsExactly(SOLACE_ALL); + softly.assertAll(); + + verify(scanCommandMessageProcessor, times(1)).processMessage(any()); + verify(scanCommandMessageProcessor, times(1)).onFailure(any(), any()); + + + } + // Test that the message handler is able to process a scan command message without an observer, // which will be the case when EMA is executed and not as unit / it test @Test void testPersistentMessageHandlerScanCommandMsgAckedWithoutObserver() { solacePersistentMessageHandler.setMessageHandlerObserver(null); - ScanCommandMessage scanCommandMessage = - new ScanCommandMessage("messagingServiceId", - "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL)); - when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage)); - when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn( - ScanCommandMessage.class.getCanonicalName() - ); - when(scanManager.scan(any())).thenReturn("scanId"); - when(scanManager.isScanComplete("scanId")).thenReturn(true); + setupMocks(true); solacePersistentMessageHandler.onMessage(inboundMessage); // we have to wait now for a second as there is no observer being notified try { Thread.sleep(1000); } catch (InterruptedException e) { - throw new RuntimeException(e); + throw new RuntimeException(e); } verify(scanCommandMessageProcessor, times(1)).processMessage(any()); @@ -123,15 +163,7 @@ void testPersistentMessageHandlerScanCommandMsgAckedWithoutObserver() { @Test void testPersistentMessageHandlerScanCommandMsgAcked() { - ScanCommandMessage scanCommandMessage = - new ScanCommandMessage("messagingServiceId", - "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL)); - when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage)); - when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn( - ScanCommandMessage.class.getCanonicalName() - ); - when(scanManager.scan(any())).thenReturn("scanId"); - when(scanManager.isScanComplete("scanId")).thenReturn(true); + setupMocks(true); solacePersistentMessageHandler.onMessage(inboundMessage); //happy path - the message should be processed and acked @@ -143,30 +175,20 @@ void testPersistentMessageHandlerScanCommandMsgAcked() { assertThat(messageHandlerObserver.hasFailedMessage(inboundMessage)).isFalse(); // the scan command message processor should be called once by the persistent message handler - verify(scanCommandMessageProcessor, times(1)).processMessage(any()); - // if the EMA is managed, the waitForScanCompletion method should be called + verify(scanCommandMessageProcessor, times(1)).processMessage(any()); + // if the EMA is managed, the waitForScanCompletion method should be called verify(scanCommandMessageProcessor, atLeastOnce()).waitForScanCompletion(any()); // the message should be acked after the scan is complete - verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage); + verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage); } @Test void testPersistentMessageHandlerScanCommandTimeoutMsgAcked() { - ScanCommandMessage scanCommandMessage = - new ScanCommandMessage("messagingServiceId", - "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL)); - when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage)); - when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn( - ScanCommandMessage.class.getCanonicalName() - ); - when(scanManager.scan(any())).thenReturn("scanId"); - // the scan is not complete and will never be ;-) - // the waitForScanCompletion method will throw an exception after the timeout - when(scanManager.isScanComplete("scanId")).thenReturn(false); + setupMocks(false); solacePersistentMessageHandler.onMessage(inboundMessage); // sleep for a while to allow the scan complete poll interval to pass - await().atMost(eventPortalProperties.getWaitAckScanCompleteTimeoutSec()+5, SECONDS).until(() + await().atMost(eventPortalProperties.getWaitAckScanCompleteTimeoutSec() + 5, SECONDS).until(() -> messageHandlerObserver.hasAcknowledgedMessage(inboundMessage)); assertThat(messageHandlerObserver.hasReceivedMessage(inboundMessage)).isTrue(); assertThat(messageHandlerObserver.hasInitiatedMessageProcessing(inboundMessage)).isTrue(); @@ -191,6 +213,7 @@ void testPersistentMessageHandlerScanCommandExceptionThrownMsgAcked() { ScanCommandMessage scanCommandMessage = new ScanCommandMessage("messagingServiceId", "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL)); + scanCommandMessage.setOrgId("orgId"); when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage)); when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn( ScanCommandMessage.class.getCanonicalName() @@ -198,7 +221,7 @@ void testPersistentMessageHandlerScanCommandExceptionThrownMsgAcked() { when(scanManager.scan(any())).thenThrow(new RuntimeException("Test exception thrown on purpose")); solacePersistentMessageHandler.onMessage(inboundMessage); // sleep for a while to allow the scan complete poll interval to pass - await().atMost(eventPortalProperties.getWaitAckScanCompleteTimeoutSec()+2, SECONDS).until(() + await().atMost(eventPortalProperties.getWaitAckScanCompleteTimeoutSec() + 2, SECONDS).until(() -> messageHandlerObserver.hasAcknowledgedMessage(inboundMessage)); assertThat(messageHandlerObserver.hasReceivedMessage(inboundMessage)).isTrue(); assertThat(messageHandlerObserver.hasInitiatedMessageProcessing(inboundMessage)).isTrue(); @@ -220,6 +243,30 @@ void testPersistentMessageHandlerScanCommandExceptionThrownMsgAcked() { } + private void setupMocks(boolean isScanComplete) { + ScanCommandMessage scanCommandMessage = + new ScanCommandMessage("messagingServiceId", + "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL)); + scanCommandMessage.setOrgId("orgId"); + when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage)); + when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn( + ScanCommandMessage.class.getCanonicalName() + ); + when(scanManager.scan(any())).thenReturn("scanId"); + when(scanManager.isScanComplete("scanId")).thenReturn(isScanComplete); + } + + private void setupMocksForScanFailure() { + ScanCommandMessage scanCommandMessage = + new ScanCommandMessage("messagingServiceId", + "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL)); + scanCommandMessage.setOrgId("orgId"); + when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage)); + when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn( + ScanCommandMessage.class.getCanonicalName() + ); + when(scanManager.scan(any())).thenThrow(new RuntimeException("Test exception thrown on purpose")); + } private String jsonString(Object object) { try { diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessorTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessorTests.java index bbb954652..f1a24f091 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessorTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessorTests.java @@ -6,58 +6,120 @@ import com.solace.maas.ep.common.model.ScanDestination; import com.solace.maas.ep.common.model.ScanType; import com.solace.maas.ep.event.management.agent.TestConfig; +import com.solace.maas.ep.event.management.agent.config.plugin.enumeration.MessagingServiceType; +import com.solace.maas.ep.event.management.agent.repository.messagingservice.MessagingServiceRepository; +import com.solace.maas.ep.event.management.agent.repository.model.mesagingservice.ConnectionDetailsEntity; +import com.solace.maas.ep.event.management.agent.repository.model.mesagingservice.MessagingServiceEntity; import com.solace.maas.ep.event.management.agent.scanManager.ScanManager; +import com.solace.maas.ep.event.management.agent.scanManager.model.SingleScanSpecification; +import com.solace.maas.ep.event.management.agent.service.ScanService; +import org.assertj.core.api.SoftAssertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.test.context.ActiveProfiles; import java.util.List; +import java.util.Optional; +import java.util.UUID; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = TestConfig.class) @ActiveProfiles("TEST") class ScanCommandMessageProcessorTests { - @MockBean + @SpyBean private ScanManager scanManager; + @MockBean + private ScanService scanService; + @SpyBean private ScanCommandMessageProcessor scanCommandMessageProcessor; + @Autowired + private MessagingServiceRepository repository; + @MockBean private DynamicResourceConfigurationHelper dynamicResourceConfigurationHelper; + private ArgumentCaptor captor; + + @BeforeEach + void setUp() { + reset(repository, scanManager, scanService, dynamicResourceConfigurationHelper); + captor = ArgumentCaptor.forClass(SingleScanSpecification.class); + ConnectionDetailsEntity connectionDetailsEntity = ConnectionDetailsEntity.builder() + .id(UUID.randomUUID().toString()) + .url("localhost:9090") + .build(); + when(repository.findById(any(String.class))) + .thenReturn(Optional.of(MessagingServiceEntity.builder() + .type(MessagingServiceType.SOLACE.name()) + .name("service1") + .id("messagingServiceId") + .connections(List.of(connectionDetailsEntity)) + .build())); + } + @Test - void processMessageWithoutResourceConfiguration(){ + void processMessageWithoutResourceConfiguration() { ScanCommandMessage message = buildScanCommandMessage(null); + when(scanService.singleScan(any())).thenReturn("scanId"); scanCommandMessageProcessor.processMessage(message); + verify(scanService).singleScan(captor.capture()); + SingleScanSpecification capturedSpec = captor.getValue(); + verifySingleScanSpec(capturedSpec, message); verifyNoInteractions(dynamicResourceConfigurationHelper); - verify(scanManager,times(1)).scan(any()); + verify(scanManager, times(1)).scan(any()); } @Test - void processMessageWithResourceConfiguration(){ + void processMessageWithResourceConfiguration() { ScanCommandMessage message = buildScanCommandMessage(List.of( EventBrokerResourceConfigTestHelper.buildResourceConfiguration(ResourceConfigurationType.SOLACE)) ); + when(scanService.singleScan(any())).thenReturn("scanId"); scanCommandMessageProcessor.processMessage(message); + verify(scanService).singleScan(captor.capture()); + SingleScanSpecification capturedSpec = captor.getValue(); + verifySingleScanSpec(capturedSpec, message); verify(dynamicResourceConfigurationHelper, times(1)).loadSolaceBrokerResourceConfigurations(any()); - verify(scanManager,times(1)).scan(any()); + verify(scanManager, times(1)).scan(any()); } - - private ScanCommandMessage buildScanCommandMessage(List resources){ - return new ScanCommandMessage( - "messageServiceId", + private ScanCommandMessage buildScanCommandMessage(List resources) { + ScanCommandMessage msg = new ScanCommandMessage( + "messagingServiceId", "scanId", List.of(ScanType.SOLACE_ALL), List.of(ScanDestination.EVENT_PORTAL), resources); + msg.setOrgId("orgId"); + msg.setTraceId("traceId"); + msg.setActorId("actorId"); + return msg; + } + + private static void verifySingleScanSpec(SingleScanSpecification capturedSpec, ScanCommandMessage msg) { + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(capturedSpec.getScanId()).isEqualTo(msg.getScanId()); + softly.assertThat(capturedSpec.getTraceId()).isEqualTo(msg.getTraceId()); + softly.assertThat(capturedSpec.getOrgId()).isEqualTo(msg.getOrgId()); + softly.assertThat(capturedSpec.getMessagingServiceEntity().getId()).isEqualTo(msg.getMessagingServiceId()); + softly.assertThat(capturedSpec.getGroupId()).isNotBlank(); + softly.assertThat(capturedSpec.getRouteBundles()).hasSize(2); + + }); } } From 294df6944c7fbfcdd93fe2efc6bd71d8f359ce46 Mon Sep 17 00:00:00 2001 From: rudraneel-chakraborty Date: Tue, 28 Jan 2025 16:26:37 -0500 Subject: [PATCH 3/5] DATAGO-92933 code complete --- .../event/management/agent/scanManager/ScanManagerTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerTest.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerTest.java index 5b908139f..593a0f0a2 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerTest.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerTest.java @@ -1,6 +1,7 @@ package com.solace.maas.ep.event.management.agent.scanManager; import com.solace.maas.ep.event.management.agent.TestConfig; +import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties; import com.solace.maas.ep.event.management.agent.plugin.confluentSchemaRegistry.route.delegate.ConfluentSchemaRegistryRouteDelegateImpl; import com.solace.maas.ep.event.management.agent.plugin.kafka.route.delegate.KafkaRouteDelegateImpl; import com.solace.maas.ep.event.management.agent.plugin.localstorage.route.delegate.DataCollectionFileWriterDelegateImpl; @@ -37,6 +38,9 @@ class ScanManagerTest { @Mock MessagingServiceDelegateServiceImpl messagingServiceDelegateService; + @Mock + private EventPortalProperties eventPortalProperties; + @InjectMocks ScanManager scanManager; @@ -165,6 +169,7 @@ void testScanManager() { when(scanService.singleScan( SingleScanSpecification .builder() + .orgId("orgId") .groupId("groupId") .scanId("scanId") .traceId("traceId") From d194d72a65e907d16a71d8555a712815b3857f54 Mon Sep 17 00:00:00 2001 From: rudraneel-chakraborty Date: Tue, 28 Jan 2025 18:24:20 -0500 Subject: [PATCH 4/5] DATAGO-92933 code complete --- .../ep/event/management/agent/config/SolaceConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/SolaceConfiguration.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/SolaceConfiguration.java index 0100e3f63..70f0500e7 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/SolaceConfiguration.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/SolaceConfiguration.java @@ -52,7 +52,7 @@ public String getTopicPrefix() { if (topicPrefix == null) { topicPrefix = String.format(TOPIC_PREFIX_FORMAT, - Boolean.TRUE.equals(eventPortalProperties.getManaged()) ? "*" : eventPortalProperties.getOrganizationId(), + eventPortalProperties.getOrganizationId(), eventPortalProperties.getRuntimeAgentId()); } return topicPrefix; From 8303db5aac24f8d97889f9ebebb110635372108c Mon Sep 17 00:00:00 2001 From: rudraneel-chakraborty Date: Thu, 30 Jan 2025 13:57:35 -0500 Subject: [PATCH 5/5] DATAGO-89836 PR comment --- .../ep/event/management/agent/service/ScanService.java | 7 ++++--- .../messageProcessors/ScanCommandMessageProcessor.java | 3 ++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java index f4ba4f55c..b1465439e 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java @@ -56,6 +56,7 @@ @Slf4j @Service public class ScanService { + private static final String NULL_ORG_ID_ERROR_MSG = "Organization ID cannot be null or empty"; private final ScanRepository repository; private final ScanRecipientHierarchyRepository scanRecipientHierarchyRepository; @@ -101,7 +102,7 @@ public ScanService(ScanRepository repository, * @return The id of the scan. */ public String singleScan(SingleScanSpecification singleScanSpecification) { - Validate.notBlank(singleScanSpecification.getOrgId(), "Organization ID cannot be null or empty"); + Validate.notBlank(singleScanSpecification.getOrgId(), NULL_ORG_ID_ERROR_MSG); String scanId = singleScanSpecification.getScanId(); String traceId = singleScanSpecification.getTraceId(); String orgId = singleScanSpecification.getOrgId(); @@ -251,7 +252,7 @@ public void sendScanStatus(String orgId, String scanTypes, ScanStatus status) { - Validate.notBlank(orgId, "Organization ID cannot be null or empty"); + Validate.notBlank(orgId, NULL_ORG_ID_ERROR_MSG); producerTemplate.send("direct:overallScanStatusPublisher?block=false&failIfNoConsumers=false", exchange -> { exchange.getIn().setHeader(RouteConstants.SCHEDULE_ID, groupId); exchange.getIn().setHeader(RouteConstants.SCAN_ID, scanId); @@ -269,7 +270,7 @@ public void sendScanStatus(String orgId, protected CompletableFuture scanAsync(String orgId, String groupId, String scanId, String traceId, String actorId, RouteEntity route, String messagingServiceId) { - Validate.notBlank(orgId, "Organization ID cannot be null or empty"); + Validate.notBlank(orgId, NULL_ORG_ID_ERROR_MSG); return producerTemplate.asyncSend("seda:" + route.getId(), exchange -> { // Need to set headers to let the Route have access to the Scan ID, Group ID, and Messaging Service ID. exchange.getIn().setHeader(RouteConstants.SCHEDULE_ID, groupId); diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java index 3e8229977..e02d45ab0 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java @@ -29,6 +29,7 @@ public class ScanCommandMessageProcessor implements MessageProcessor { private static final String DEFAULT_DESTINATION = "FILE_WRITER"; + private static final String NULL_ORG_ID_ERROR_MSG = "Organization ID cannot be null or empty"; private final ScanManager scanManager; private final DynamicResourceConfigurationHelper dynamicResourceConfigurationHelper; private final MeterRegistry meterRegistry; @@ -47,7 +48,7 @@ public ScanCommandMessageProcessor(ScanManager scanManager, @Override public void processMessage(ScanCommandMessage message) { MDC.clear(); - Validate.notBlank(message.getOrgId(), "Organization ID cannot be null or empty"); + Validate.notBlank(message.getOrgId(), NULL_ORG_ID_ERROR_MSG); String scanId = StringUtils.isEmpty(message.getScanId()) ? UUID.randomUUID().toString() : message.getScanId(); meterRegistry.counter(MAAS_EMA_SCAN_EVENT_RECEIVED, SCAN_ID_TAG, scanId).increment();