Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATAGO-89836: Support Scan for Public CEMA #228

Merged
merged 5 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.solace.maas.ep.event.management.agent.cli;

import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.repository.model.mesagingservice.MessagingServiceEntity;
import com.solace.maas.ep.event.management.agent.scanManager.ScanManager;
import com.solace.maas.ep.event.management.agent.scanManager.model.ScanRequestBO;
Expand All @@ -26,16 +27,18 @@ public class CommandLineScan {
private final MessagingServiceDelegateServiceImpl messagingServiceDelegateService;
private final ImportService importService;
private final CommandLineCommon commandLineCommon;
private final EventPortalProperties eventPortalProperties;

public CommandLineScan(ScanManager scanManager, IDGenerator idGenerator,
MessagingServiceDelegateServiceImpl messagingServiceDelegateService,
ImportService importService,
CommandLineCommon commandLineCommon) {
CommandLineCommon commandLineCommon, EventPortalProperties eventPortalProperties) {
this.scanManager = scanManager;
this.idGenerator = idGenerator;
this.messagingServiceDelegateService = messagingServiceDelegateService;
this.importService = importService;
this.commandLineCommon = commandLineCommon;
this.eventPortalProperties = eventPortalProperties;
}

public void runScan(String messagingServiceId, String filePathAndName) throws InterruptedException, IOException {
Expand All @@ -45,6 +48,7 @@ public void runScan(String messagingServiceId, String filePathAndName) throws In
.messagingServiceId(messagingServiceId)
.scanId(idGenerator.generateRandomUniqueId())
.destinations(List.of("FILE_WRITER"))
.orgId(eventPortalProperties.getOrganizationId())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for command line scan, pass in the org_id from application.yaml

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

.build();
setScanType(messagingServiceEntity, scanRequestBO, messagingServiceId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,21 @@ public class StreamingAppender extends AppenderBase<ILoggingEvent> {

@Override
protected void append(ILoggingEvent event) {
String orgId = event.getMDCPropertyMap().get(RouteConstants.ORG_ID);
if (!standalone) {
if (StringUtils.isNotEmpty(event.getMDCPropertyMap().get(RouteConstants.SCAN_ID))) {
sendScanLogsAsync(event,
sendScanLogsAsync(orgId,
event,
event.getMDCPropertyMap().get(RouteConstants.SCAN_ID),
event.getMDCPropertyMap().get(RouteConstants.TRACE_ID),
event.getMDCPropertyMap().get(RouteConstants.ACTOR_ID),
event.getMDCPropertyMap().get(RouteConstants.SCAN_TYPE),
event.getMDCPropertyMap().get(RouteConstants.SCHEDULE_ID),
event.getMDCPropertyMap().get(RouteConstants.MESSAGING_SERVICE_ID));
} else if (StringUtils.isNotEmpty(event.getMDCPropertyMap().get(RouteConstants.COMMAND_CORRELATION_ID))) {
sendCommandLogsAsync(event,
sendCommandLogsAsync(
orgId,
event,
event.getMDCPropertyMap().get(RouteConstants.COMMAND_CORRELATION_ID),
event.getMDCPropertyMap().get(RouteConstants.TRACE_ID),
event.getMDCPropertyMap().get(RouteConstants.ACTOR_ID),
Expand All @@ -45,7 +49,7 @@ protected void append(ILoggingEvent event) {
}
}

private void sendCommandLogsAsync(ILoggingEvent event, String commandCorrelationId, String traceId,
private void sendCommandLogsAsync(String orgId, ILoggingEvent event, String commandCorrelationId, String traceId,
String actorId, String messagingServiceId) {


Expand All @@ -59,6 +63,7 @@ private void sendCommandLogsAsync(ILoggingEvent event, String commandCorrelation
exchange.getIn().setHeader(RouteConstants.COMMAND_CORRELATION_ID, commandCorrelationId);
exchange.getIn().setHeader(RouteConstants.TRACE_ID, traceId);
exchange.getIn().setHeader(RouteConstants.ACTOR_ID, actorId);
exchange.getIn().setHeader(RouteConstants.ORG_ID, orgId);
exchange.getIn().setHeader(RouteConstants.MESSAGING_SERVICE_ID, messagingServiceId);

exchange.getIn().setBody(event);
Expand All @@ -71,7 +76,7 @@ private void sendCommandLogsAsync(ILoggingEvent event, String commandCorrelation
});
}

private void sendScanLogsAsync(ILoggingEvent event, String scanId, String traceId, String actorId,
private void sendScanLogsAsync(String orgId, ILoggingEvent event, String scanId, String traceId, String actorId,
String scanType, String groupId, String messagingServiceId) {
RouteEntity route = creatLoggingRoute(scanType, messagingServiceId);

Expand All @@ -81,6 +86,7 @@ private void sendScanLogsAsync(ILoggingEvent event, String scanId, String traceI
exchange.getIn().setHeader(RouteConstants.TRACE_ID, traceId);
exchange.getIn().setHeader(RouteConstants.ACTOR_ID, actorId);
exchange.getIn().setHeader(RouteConstants.SCAN_TYPE, scanType);
exchange.getIn().setHeader(RouteConstants.ORG_ID, orgId);
exchange.getIn().setHeader(RouteConstants.SCHEDULE_ID, groupId);
exchange.getIn().setHeader(RouteConstants.MESSAGING_SERVICE_ID, messagingServiceId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
@Component
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class ScanDataImportPublishImportScanEventProcessor implements Processor {
private final String orgId;
private final String runtimeAgentId;
private final ScanDataPublisher scanDataPublisher;

Expand All @@ -29,7 +28,6 @@ public ScanDataImportPublishImportScanEventProcessor(ScanDataPublisher scanDataP

this.scanDataPublisher = scanDataPublisher;

orgId = eventPortalProperties.getOrganizationId();
runtimeAgentId = eventPortalProperties.getRuntimeAgentId();
}

Expand All @@ -50,6 +48,7 @@ public void process(Exchange exchange) throws Exception {

String messagingServiceId = (String) properties.get(RouteConstants.MESSAGING_SERVICE_ID);
String scanId = (String) properties.get(RouteConstants.SCAN_ID);
String orgId = (String) properties.get(RouteConstants.ORG_ID);
Boolean isImportOp = (Boolean) properties.get(RouteConstants.IS_DATA_IMPORT);

ScanDataImportMessage importDataMessage =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,13 @@
public class ScanDataProcessor implements Processor {

private final ScanDataPublisher scanDataPublisher;
private final String orgId;
private final String runtimeAgentId;

@Autowired
public ScanDataProcessor(ScanDataPublisher scanDataPublisher, EventPortalProperties eventPortalProperties) {
super();

this.scanDataPublisher = scanDataPublisher;

orgId = eventPortalProperties.getOrganizationId();
runtimeAgentId = eventPortalProperties.getRuntimeAgentId();
}

Expand All @@ -48,9 +45,18 @@ public void process(Exchange exchange) throws Exception {
String traceId = (String) properties.get(RouteConstants.TRACE_ID);
String actorId = (String) properties.get(RouteConstants.ACTOR_ID);
String scanType = (String) properties.get(RouteConstants.SCAN_TYPE);
String orgId = (String) properties.get(RouteConstants.ORG_ID);
Boolean isImportOp = (Boolean) properties.get(RouteConstants.IS_DATA_IMPORT);

ScanDataMessage scanDataMessage = new ScanDataMessage(orgId, scanId, traceId, actorId, scanType, body, Instant.now().toString());
ScanDataMessage scanDataMessage = new ScanDataMessage(
orgId,
scanId,
traceId,
actorId,
scanType,
body,
Instant.now().toString()
);

topicDetails.put("orgId", orgId);
topicDetails.put("runtimeAgentId", runtimeAgentId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
@Component
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class ScanLogsProcessor implements Processor {
private final String orgId;
private final String runtimeAgentId;

private final ScanLogsPublisher logDataPublisher;
Expand All @@ -30,7 +29,6 @@ public ScanLogsProcessor(ScanLogsPublisher logDataPublisher, EventPortalProperti

this.logDataPublisher = logDataPublisher;

orgId = eventPortalProperties.getOrganizationId();
runtimeAgentId = eventPortalProperties.getRuntimeAgentId();
}

Expand All @@ -42,6 +40,7 @@ public void process(Exchange exchange) throws Exception {
ILoggingEvent event = (ILoggingEvent) exchange.getIn().getBody();
String scanId = (String) properties.get(RouteConstants.SCAN_ID);
String traceId = (String) properties.get(RouteConstants.TRACE_ID);
String orgId = (String) properties.get(RouteConstants.ORG_ID);
String actorId = (String) properties.get(RouteConstants.ACTOR_ID);
String messagingServiceId = (String) properties.get(RouteConstants.MESSAGING_SERVICE_ID);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class ScanStatusOverAllProcessor implements Processor {

private final String orgId;
private final String runtimeAgentId;

@Autowired
public ScanStatusOverAllProcessor(EventPortalProperties eventPortalProperties) {
super();

orgId = eventPortalProperties.getOrganizationId();
runtimeAgentId = eventPortalProperties.getRuntimeAgentId();
}

Expand All @@ -46,6 +44,7 @@ public void process(Exchange exchange) throws Exception {
String description = (String) properties.get(RouteConstants.SCAN_STATUS_DESC);

String scanType = (String) properties.get(RouteConstants.SCAN_TYPE);
String orgId = (String) properties.get(RouteConstants.ORG_ID);
List<String> scanTypes = Arrays.asList(scanType.split(","));

topicDetails.put("orgId", orgId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
@SuppressWarnings("unchecked")
public class ScanStatusPerRouteProcessor implements Processor {
private final String orgId;
private final String runtimeAgentId;

@Autowired
public ScanStatusPerRouteProcessor(EventPortalProperties eventPortalProperties) {
super();

orgId = eventPortalProperties.getOrganizationId();
runtimeAgentId = eventPortalProperties.getRuntimeAgentId();
}

Expand All @@ -42,6 +39,7 @@ public void process(Exchange exchange) throws Exception {
String scanType = (String) properties.get(RouteConstants.SCAN_TYPE);
ScanStatus status = (ScanStatus) properties.get(RouteConstants.SCAN_STATUS);
String description = (String) properties.get(RouteConstants.SCAN_STATUS_DESC);
String orgId = (String) properties.get(RouteConstants.ORG_ID);

topicDetails.put("orgId", orgId);
topicDetails.put("runtimeAgentId", runtimeAgentId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class HeartbeatGenerator {
private final String topic;
private final String runtimeAgentVersion;
private final MeterRegistry meterRegistry;
private final EventPortalProperties eventPortalProperties;

public HeartbeatGenerator(SolaceConfiguration solaceConfiguration,
EventPortalProperties eventPortalProperties,
Expand All @@ -43,11 +44,13 @@ public HeartbeatGenerator(SolaceConfiguration solaceConfiguration,
topic = solaceConfiguration.getTopicPrefix() + "heartbeat/v1";
this.runtimeAgentVersion = getFormattedVersion(buildProperties.getVersion());
this.meterRegistry = meterRegistry;
this.eventPortalProperties = eventPortalProperties;
}

@Scheduled(fixedRate = 5000)
public void sendHeartbeat() {
HeartbeatMessage message = new HeartbeatMessage(runtimeAgentId, Instant.now().toString(), runtimeAgentVersion);
message.setOrgId(eventPortalProperties.getOrganizationId());
boolean result = solacePublisher.publish(message, topic);
logHealthMetric(message, result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ public void sendScanData(MOPMessage message, Map<String, String> topicDetails) {

String topicString = isImport ?
String.format("sc/ep/runtime/%s/%s/importScan/v1/%s",
topicDetails.get("orgId"),
message.getOrgId(),
topicDetails.get("runtimeAgentId"),
topicDetails.get("messagingServiceId"))
:
String.format("sc/ep/runtime/%s/%s/scan/data/v1/%s/%s/%s",
topicDetails.get("orgId"),
message.getOrgId(),
topicDetails.get("runtimeAgentId"),
topicDetails.get("messagingServiceId"),
topicDetails.get("scanId"),
Expand All @@ -58,6 +58,6 @@ public void sendScanData(MOPMessage message, Map<String, String> topicDetails) {
meterRegistry.counter(MAAS_EMA_SCAN_EVENT_SENT,
STATUS_TAG, isSuccessful ? ScanStatus.COMPLETE.name() : ScanStatus.FAILED.name(),
SCAN_ID_TAG, topicDetails.get("scanId"),
ORG_ID_TAG, topicDetails.get("orgId")).increment();
ORG_ID_TAG, message.getOrgId()).increment();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public ScanLogsPublisher(SolacePublisher solacePublisher) {

public void sendScanLogData(ScanLogMessage message, Map<String, String> topicDetails) {
String topicString = String.format("sc/ep/runtime/%s/%s/scan/logs/v1/%s/%s",
topicDetails.get("orgId"),
message.getOrgId(),
topicDetails.get("runtimeAgentId"),
topicDetails.get("messagingServiceId"),
topicDetails.get("scanId"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ public void sendOverallScanStatus(ScanStatusMessage message, Map<String, String>
String scanId = topicDetails.get("scanId");
String scanType = topicDetails.get("scanType");
String status = topicDetails.get("status");

String topicString = String.format("sc/ep/runtime/%s/%s/scan/status/v1/%s/%s",
topicDetails.get("orgId"),
message.getOrgId(),
topicDetails.get("runtimeAgentId"),
topicDetails.get("messagingServiceId"),
scanId);
Expand All @@ -72,7 +71,7 @@ public void sendScanDataStatus(ScanDataStatusMessage message, Map<String, String
String status = topicDetails.get("status");

String topicString = String.format("sc/ep/runtime/%s/%s/scan/status/v1/%s/%s/%s/%s",
topicDetails.get("orgId"),
message.getOrgId(),
topicDetails.get("runtimeAgentId"),
status,
topicDetails.get("messagingServiceId"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import com.solace.maas.ep.event.management.agent.repository.model.mesagingservice.MessagingServiceEntity;
import com.solace.maas.ep.event.management.agent.scanManager.model.ScanItemBO;
import com.solace.maas.ep.event.management.agent.scanManager.model.ScanRequestBO;
import com.solace.maas.ep.event.management.agent.scanManager.model.SingleScanSpecification;
import com.solace.maas.ep.event.management.agent.service.MessagingServiceDelegateServiceImpl;
import com.solace.maas.ep.event.management.agent.service.ScanService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
Expand All @@ -40,7 +42,6 @@ public class ScanManager {
private final MessagingServiceDelegateServiceImpl messagingServiceDelegateService;
private final ScanService scanService;
private final String runtimeAgentId;
private final String orgId;
// This is an optional dependency since it is not available in standalone mode.
// If the bean is not present, the publisher will not be used.
private final Optional<ScanStatusPublisher> scanStatusPublisherOpt;
Expand All @@ -54,10 +55,10 @@ public ScanManager(MessagingServiceDelegateServiceImpl messagingServiceDelegateS
this.scanService = scanService;
this.scanStatusPublisherOpt = scanStatusPublisher;
runtimeAgentId = eventPortalProperties.getRuntimeAgentId();
orgId = eventPortalProperties.getOrganizationId();
}

public String scan(ScanRequestBO scanRequestBO) {
Validate.notBlank(scanRequestBO.getOrgId(), " Organization ID cannot be null or empty");
String messagingServiceId = scanRequestBO.getMessagingServiceId();
String scanId = scanRequestBO.getScanId();
String traceId = scanRequestBO.getTraceId();
Expand All @@ -68,6 +69,7 @@ public String scan(ScanRequestBO scanRequestBO) {
MDC.put(RouteConstants.TRACE_ID, traceId);
MDC.put(RouteConstants.ACTOR_ID, actorId);
MDC.put(RouteConstants.SCHEDULE_ID, groupId);
MDC.put(RouteConstants.ORG_ID, scanRequestBO.getOrgId());
MDC.put(RouteConstants.MESSAGING_SERVICE_ID, messagingServiceId);

MessagingServiceEntity messagingServiceEntity = retrieveMessagingServiceEntity(messagingServiceId);
Expand Down Expand Up @@ -114,11 +116,23 @@ public String scan(ScanRequestBO scanRequestBO) {
.toList().stream()
).toList().stream().flatMap(List::stream).toList();

return scanService.singleScan(routes, groupId, scanId, traceId, actorId, messagingServiceEntity, runtimeAgentId);
return scanService.singleScan(
SingleScanSpecification.builder()
.routeBundles(routes)
.orgId(scanRequestBO.getOrgId())
.groupId(groupId)
.scanId(scanId)
.traceId(traceId)
.actorId(actorId)
.messagingServiceEntity(messagingServiceEntity)
.runtimeAgentId(runtimeAgentId)
.build()
);
}

public void handleError(Exception e, ScanCommandMessage message) {

Validate.notBlank(message.getOrgId()," Organization ID cannot be null or empty");
if (scanStatusPublisherOpt.isEmpty()) {
return;
}
Expand All @@ -137,7 +151,7 @@ public void handleError(Exception e, ScanCommandMessage message) {
);

Map<String, String> topicVars = Map.of(
"orgId", orgId,
"orgId", message.getOrgId(),
"runtimeAgentId", runtimeAgentId
);
scanStatusPublisher.sendOverallScanStatus(response, topicVars);
Expand All @@ -162,8 +176,8 @@ public Page<ScanItemBO> findByMessagingServiceId(String messagingServiceId, Page

public boolean isScanComplete(String scanId) {
if (ObjectUtils.isEmpty(scanId)) {
throw new IllegalArgumentException("Scan ID cannot be null or empty");
throw new IllegalArgumentException("Scan ID cannot be null or empty");
}
return scanService.isScanComplete(scanId);
return scanService.isScanComplete(scanId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
@SuppressWarnings("PMD")
public class ScanRequestBO extends AbstractBaseBO<String> {

private String orgId;

private String messagingServiceId;

private String scanId;
Expand Down
Loading