Skip to content

Commit

Permalink
Merge pull request #228 from SolaceProducts/DATAGO-89836-public-cema-…
Browse files Browse the repository at this point in the history
…scan

DATAGO-89836: Support Scan for Public CEMA
  • Loading branch information
rudraneel-chakraborty authored Jan 30, 2025
2 parents a25c01b + 8303db5 commit 3b74e50
Show file tree
Hide file tree
Showing 28 changed files with 405 additions and 172 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.solace.maas.ep.event.management.agent.cli;

import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.repository.model.mesagingservice.MessagingServiceEntity;
import com.solace.maas.ep.event.management.agent.scanManager.ScanManager;
import com.solace.maas.ep.event.management.agent.scanManager.model.ScanRequestBO;
Expand All @@ -26,16 +27,18 @@ public class CommandLineScan {
private final MessagingServiceDelegateServiceImpl messagingServiceDelegateService;
private final ImportService importService;
private final CommandLineCommon commandLineCommon;
private final EventPortalProperties eventPortalProperties;

public CommandLineScan(ScanManager scanManager, IDGenerator idGenerator,
MessagingServiceDelegateServiceImpl messagingServiceDelegateService,
ImportService importService,
CommandLineCommon commandLineCommon) {
CommandLineCommon commandLineCommon, EventPortalProperties eventPortalProperties) {
this.scanManager = scanManager;
this.idGenerator = idGenerator;
this.messagingServiceDelegateService = messagingServiceDelegateService;
this.importService = importService;
this.commandLineCommon = commandLineCommon;
this.eventPortalProperties = eventPortalProperties;
}

public void runScan(String messagingServiceId, String filePathAndName) throws InterruptedException, IOException {
Expand All @@ -45,6 +48,7 @@ public void runScan(String messagingServiceId, String filePathAndName) throws In
.messagingServiceId(messagingServiceId)
.scanId(idGenerator.generateRandomUniqueId())
.destinations(List.of("FILE_WRITER"))
.orgId(eventPortalProperties.getOrganizationId())
.build();
setScanType(messagingServiceEntity, scanRequestBO, messagingServiceId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,21 @@ public class StreamingAppender extends AppenderBase<ILoggingEvent> {

@Override
protected void append(ILoggingEvent event) {
String orgId = event.getMDCPropertyMap().get(RouteConstants.ORG_ID);
if (!standalone) {
if (StringUtils.isNotEmpty(event.getMDCPropertyMap().get(RouteConstants.SCAN_ID))) {
sendScanLogsAsync(event,
sendScanLogsAsync(orgId,
event,
event.getMDCPropertyMap().get(RouteConstants.SCAN_ID),
event.getMDCPropertyMap().get(RouteConstants.TRACE_ID),
event.getMDCPropertyMap().get(RouteConstants.ACTOR_ID),
event.getMDCPropertyMap().get(RouteConstants.SCAN_TYPE),
event.getMDCPropertyMap().get(RouteConstants.SCHEDULE_ID),
event.getMDCPropertyMap().get(RouteConstants.MESSAGING_SERVICE_ID));
} else if (StringUtils.isNotEmpty(event.getMDCPropertyMap().get(RouteConstants.COMMAND_CORRELATION_ID))) {
sendCommandLogsAsync(event,
sendCommandLogsAsync(
orgId,
event,
event.getMDCPropertyMap().get(RouteConstants.COMMAND_CORRELATION_ID),
event.getMDCPropertyMap().get(RouteConstants.TRACE_ID),
event.getMDCPropertyMap().get(RouteConstants.ACTOR_ID),
Expand All @@ -45,7 +49,7 @@ protected void append(ILoggingEvent event) {
}
}

private void sendCommandLogsAsync(ILoggingEvent event, String commandCorrelationId, String traceId,
private void sendCommandLogsAsync(String orgId, ILoggingEvent event, String commandCorrelationId, String traceId,
String actorId, String messagingServiceId) {


Expand All @@ -59,6 +63,7 @@ private void sendCommandLogsAsync(ILoggingEvent event, String commandCorrelation
exchange.getIn().setHeader(RouteConstants.COMMAND_CORRELATION_ID, commandCorrelationId);
exchange.getIn().setHeader(RouteConstants.TRACE_ID, traceId);
exchange.getIn().setHeader(RouteConstants.ACTOR_ID, actorId);
exchange.getIn().setHeader(RouteConstants.ORG_ID, orgId);
exchange.getIn().setHeader(RouteConstants.MESSAGING_SERVICE_ID, messagingServiceId);

exchange.getIn().setBody(event);
Expand All @@ -71,7 +76,7 @@ private void sendCommandLogsAsync(ILoggingEvent event, String commandCorrelation
});
}

private void sendScanLogsAsync(ILoggingEvent event, String scanId, String traceId, String actorId,
private void sendScanLogsAsync(String orgId, ILoggingEvent event, String scanId, String traceId, String actorId,
String scanType, String groupId, String messagingServiceId) {
RouteEntity route = creatLoggingRoute(scanType, messagingServiceId);

Expand All @@ -81,6 +86,7 @@ private void sendScanLogsAsync(ILoggingEvent event, String scanId, String traceI
exchange.getIn().setHeader(RouteConstants.TRACE_ID, traceId);
exchange.getIn().setHeader(RouteConstants.ACTOR_ID, actorId);
exchange.getIn().setHeader(RouteConstants.SCAN_TYPE, scanType);
exchange.getIn().setHeader(RouteConstants.ORG_ID, orgId);
exchange.getIn().setHeader(RouteConstants.SCHEDULE_ID, groupId);
exchange.getIn().setHeader(RouteConstants.MESSAGING_SERVICE_ID, messagingServiceId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
@Component
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class ScanDataImportPublishImportScanEventProcessor implements Processor {
private final String orgId;
private final String runtimeAgentId;
private final ScanDataPublisher scanDataPublisher;

Expand All @@ -29,7 +28,6 @@ public ScanDataImportPublishImportScanEventProcessor(ScanDataPublisher scanDataP

this.scanDataPublisher = scanDataPublisher;

orgId = eventPortalProperties.getOrganizationId();
runtimeAgentId = eventPortalProperties.getRuntimeAgentId();
}

Expand All @@ -50,6 +48,7 @@ public void process(Exchange exchange) throws Exception {

String messagingServiceId = (String) properties.get(RouteConstants.MESSAGING_SERVICE_ID);
String scanId = (String) properties.get(RouteConstants.SCAN_ID);
String orgId = (String) properties.get(RouteConstants.ORG_ID);
Boolean isImportOp = (Boolean) properties.get(RouteConstants.IS_DATA_IMPORT);

ScanDataImportMessage importDataMessage =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,13 @@
public class ScanDataProcessor implements Processor {

private final ScanDataPublisher scanDataPublisher;
private final String orgId;
private final String runtimeAgentId;

@Autowired
public ScanDataProcessor(ScanDataPublisher scanDataPublisher, EventPortalProperties eventPortalProperties) {
super();

this.scanDataPublisher = scanDataPublisher;

orgId = eventPortalProperties.getOrganizationId();
runtimeAgentId = eventPortalProperties.getRuntimeAgentId();
}

Expand All @@ -48,9 +45,18 @@ public void process(Exchange exchange) throws Exception {
String traceId = (String) properties.get(RouteConstants.TRACE_ID);
String actorId = (String) properties.get(RouteConstants.ACTOR_ID);
String scanType = (String) properties.get(RouteConstants.SCAN_TYPE);
String orgId = (String) properties.get(RouteConstants.ORG_ID);
Boolean isImportOp = (Boolean) properties.get(RouteConstants.IS_DATA_IMPORT);

ScanDataMessage scanDataMessage = new ScanDataMessage(orgId, scanId, traceId, actorId, scanType, body, Instant.now().toString());
ScanDataMessage scanDataMessage = new ScanDataMessage(
orgId,
scanId,
traceId,
actorId,
scanType,
body,
Instant.now().toString()
);

topicDetails.put("orgId", orgId);
topicDetails.put("runtimeAgentId", runtimeAgentId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
@Component
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class ScanLogsProcessor implements Processor {
private final String orgId;
private final String runtimeAgentId;

private final ScanLogsPublisher logDataPublisher;
Expand All @@ -30,7 +29,6 @@ public ScanLogsProcessor(ScanLogsPublisher logDataPublisher, EventPortalProperti

this.logDataPublisher = logDataPublisher;

orgId = eventPortalProperties.getOrganizationId();
runtimeAgentId = eventPortalProperties.getRuntimeAgentId();
}

Expand All @@ -42,6 +40,7 @@ public void process(Exchange exchange) throws Exception {
ILoggingEvent event = (ILoggingEvent) exchange.getIn().getBody();
String scanId = (String) properties.get(RouteConstants.SCAN_ID);
String traceId = (String) properties.get(RouteConstants.TRACE_ID);
String orgId = (String) properties.get(RouteConstants.ORG_ID);
String actorId = (String) properties.get(RouteConstants.ACTOR_ID);
String messagingServiceId = (String) properties.get(RouteConstants.MESSAGING_SERVICE_ID);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class ScanStatusOverAllProcessor implements Processor {

private final String orgId;
private final String runtimeAgentId;

@Autowired
public ScanStatusOverAllProcessor(EventPortalProperties eventPortalProperties) {
super();

orgId = eventPortalProperties.getOrganizationId();
runtimeAgentId = eventPortalProperties.getRuntimeAgentId();
}

Expand All @@ -46,6 +44,7 @@ public void process(Exchange exchange) throws Exception {
String description = (String) properties.get(RouteConstants.SCAN_STATUS_DESC);

String scanType = (String) properties.get(RouteConstants.SCAN_TYPE);
String orgId = (String) properties.get(RouteConstants.ORG_ID);
List<String> scanTypes = Arrays.asList(scanType.split(","));

topicDetails.put("orgId", orgId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
@SuppressWarnings("unchecked")
public class ScanStatusPerRouteProcessor implements Processor {
private final String orgId;
private final String runtimeAgentId;

@Autowired
public ScanStatusPerRouteProcessor(EventPortalProperties eventPortalProperties) {
super();

orgId = eventPortalProperties.getOrganizationId();
runtimeAgentId = eventPortalProperties.getRuntimeAgentId();
}

Expand All @@ -42,6 +39,7 @@ public void process(Exchange exchange) throws Exception {
String scanType = (String) properties.get(RouteConstants.SCAN_TYPE);
ScanStatus status = (ScanStatus) properties.get(RouteConstants.SCAN_STATUS);
String description = (String) properties.get(RouteConstants.SCAN_STATUS_DESC);
String orgId = (String) properties.get(RouteConstants.ORG_ID);

topicDetails.put("orgId", orgId);
topicDetails.put("runtimeAgentId", runtimeAgentId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class HeartbeatGenerator {
private final String topic;
private final String runtimeAgentVersion;
private final MeterRegistry meterRegistry;
private final EventPortalProperties eventPortalProperties;

public HeartbeatGenerator(SolaceConfiguration solaceConfiguration,
EventPortalProperties eventPortalProperties,
Expand All @@ -43,11 +44,13 @@ public HeartbeatGenerator(SolaceConfiguration solaceConfiguration,
topic = solaceConfiguration.getTopicPrefix() + "heartbeat/v1";
this.runtimeAgentVersion = getFormattedVersion(buildProperties.getVersion());
this.meterRegistry = meterRegistry;
this.eventPortalProperties = eventPortalProperties;
}

@Scheduled(fixedRate = 5000)
public void sendHeartbeat() {
HeartbeatMessage message = new HeartbeatMessage(runtimeAgentId, Instant.now().toString(), runtimeAgentVersion);
message.setOrgId(eventPortalProperties.getOrganizationId());
boolean result = solacePublisher.publish(message, topic);
logHealthMetric(message, result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ public void sendScanData(MOPMessage message, Map<String, String> topicDetails) {

String topicString = isImport ?
String.format("sc/ep/runtime/%s/%s/importScan/v1/%s",
topicDetails.get("orgId"),
message.getOrgId(),
topicDetails.get("runtimeAgentId"),
topicDetails.get("messagingServiceId"))
:
String.format("sc/ep/runtime/%s/%s/scan/data/v1/%s/%s/%s",
topicDetails.get("orgId"),
message.getOrgId(),
topicDetails.get("runtimeAgentId"),
topicDetails.get("messagingServiceId"),
topicDetails.get("scanId"),
Expand All @@ -58,6 +58,6 @@ public void sendScanData(MOPMessage message, Map<String, String> topicDetails) {
meterRegistry.counter(MAAS_EMA_SCAN_EVENT_SENT,
STATUS_TAG, isSuccessful ? ScanStatus.COMPLETE.name() : ScanStatus.FAILED.name(),
SCAN_ID_TAG, topicDetails.get("scanId"),
ORG_ID_TAG, topicDetails.get("orgId")).increment();
ORG_ID_TAG, message.getOrgId()).increment();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public ScanLogsPublisher(SolacePublisher solacePublisher) {

public void sendScanLogData(ScanLogMessage message, Map<String, String> topicDetails) {
String topicString = String.format("sc/ep/runtime/%s/%s/scan/logs/v1/%s/%s",
topicDetails.get("orgId"),
message.getOrgId(),
topicDetails.get("runtimeAgentId"),
topicDetails.get("messagingServiceId"),
topicDetails.get("scanId"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ public void sendOverallScanStatus(ScanStatusMessage message, Map<String, String>
String scanId = topicDetails.get("scanId");
String scanType = topicDetails.get("scanType");
String status = topicDetails.get("status");

String topicString = String.format("sc/ep/runtime/%s/%s/scan/status/v1/%s/%s",
topicDetails.get("orgId"),
message.getOrgId(),
topicDetails.get("runtimeAgentId"),
topicDetails.get("messagingServiceId"),
scanId);
Expand All @@ -72,7 +71,7 @@ public void sendScanDataStatus(ScanDataStatusMessage message, Map<String, String
String status = topicDetails.get("status");

String topicString = String.format("sc/ep/runtime/%s/%s/scan/status/v1/%s/%s/%s/%s",
topicDetails.get("orgId"),
message.getOrgId(),
topicDetails.get("runtimeAgentId"),
status,
topicDetails.get("messagingServiceId"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import com.solace.maas.ep.event.management.agent.repository.model.mesagingservice.MessagingServiceEntity;
import com.solace.maas.ep.event.management.agent.scanManager.model.ScanItemBO;
import com.solace.maas.ep.event.management.agent.scanManager.model.ScanRequestBO;
import com.solace.maas.ep.event.management.agent.scanManager.model.SingleScanSpecification;
import com.solace.maas.ep.event.management.agent.service.MessagingServiceDelegateServiceImpl;
import com.solace.maas.ep.event.management.agent.service.ScanService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
Expand All @@ -40,7 +42,6 @@ public class ScanManager {
private final MessagingServiceDelegateServiceImpl messagingServiceDelegateService;
private final ScanService scanService;
private final String runtimeAgentId;
private final String orgId;
// This is an optional dependency since it is not available in standalone mode.
// If the bean is not present, the publisher will not be used.
private final Optional<ScanStatusPublisher> scanStatusPublisherOpt;
Expand All @@ -54,10 +55,10 @@ public ScanManager(MessagingServiceDelegateServiceImpl messagingServiceDelegateS
this.scanService = scanService;
this.scanStatusPublisherOpt = scanStatusPublisher;
runtimeAgentId = eventPortalProperties.getRuntimeAgentId();
orgId = eventPortalProperties.getOrganizationId();
}

public String scan(ScanRequestBO scanRequestBO) {
Validate.notBlank(scanRequestBO.getOrgId(), " Organization ID cannot be null or empty");
String messagingServiceId = scanRequestBO.getMessagingServiceId();
String scanId = scanRequestBO.getScanId();
String traceId = scanRequestBO.getTraceId();
Expand All @@ -68,6 +69,7 @@ public String scan(ScanRequestBO scanRequestBO) {
MDC.put(RouteConstants.TRACE_ID, traceId);
MDC.put(RouteConstants.ACTOR_ID, actorId);
MDC.put(RouteConstants.SCHEDULE_ID, groupId);
MDC.put(RouteConstants.ORG_ID, scanRequestBO.getOrgId());
MDC.put(RouteConstants.MESSAGING_SERVICE_ID, messagingServiceId);

MessagingServiceEntity messagingServiceEntity = retrieveMessagingServiceEntity(messagingServiceId);
Expand Down Expand Up @@ -114,11 +116,23 @@ public String scan(ScanRequestBO scanRequestBO) {
.toList().stream()
).toList().stream().flatMap(List::stream).toList();

return scanService.singleScan(routes, groupId, scanId, traceId, actorId, messagingServiceEntity, runtimeAgentId);
return scanService.singleScan(
SingleScanSpecification.builder()
.routeBundles(routes)
.orgId(scanRequestBO.getOrgId())
.groupId(groupId)
.scanId(scanId)
.traceId(traceId)
.actorId(actorId)
.messagingServiceEntity(messagingServiceEntity)
.runtimeAgentId(runtimeAgentId)
.build()
);
}

public void handleError(Exception e, ScanCommandMessage message) {

Validate.notBlank(message.getOrgId()," Organization ID cannot be null or empty");
if (scanStatusPublisherOpt.isEmpty()) {
return;
}
Expand All @@ -137,7 +151,7 @@ public void handleError(Exception e, ScanCommandMessage message) {
);

Map<String, String> topicVars = Map.of(
"orgId", orgId,
"orgId", message.getOrgId(),
"runtimeAgentId", runtimeAgentId
);
scanStatusPublisher.sendOverallScanStatus(response, topicVars);
Expand All @@ -162,8 +176,8 @@ public Page<ScanItemBO> findByMessagingServiceId(String messagingServiceId, Page

public boolean isScanComplete(String scanId) {
if (ObjectUtils.isEmpty(scanId)) {
throw new IllegalArgumentException("Scan ID cannot be null or empty");
throw new IllegalArgumentException("Scan ID cannot be null or empty");
}
return scanService.isScanComplete(scanId);
return scanService.isScanComplete(scanId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
@SuppressWarnings("PMD")
public class ScanRequestBO extends AbstractBaseBO<String> {

private String orgId;

private String messagingServiceId;

private String scanId;
Expand Down
Loading

0 comments on commit 3b74e50

Please sign in to comment.