Skip to content

Commit

Permalink
Merge pull request #37 from rgdoliveira/sync_main
Browse files Browse the repository at this point in the history
Sync main branch with Apache main branch
  • Loading branch information
rgdoliveira authored May 7, 2024
2 parents 588456a + 1bf2feb commit 6cea3ee
Show file tree
Hide file tree
Showing 594 changed files with 6,284 additions and 2,896 deletions.
9 changes: 9 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Apache KIE
Copyright 2023 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).

The Initial Developer of some parts of the framework, which are copied from, derived from, or
inspired by KIE (Knowledge Is Everthing) group, is Red Hat, Inc (https://www.redhat.com/).
Copyright Red Hat, Inc. and/or its affiliates.. All Rights Reserved.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
*/
package org.kie.kogito.monitoring.core.common.process;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.kie.api.event.process.ErrorEvent;
import org.kie.api.event.process.ProcessCompletedEvent;
import org.kie.api.event.process.ProcessNodeLeftEvent;
import org.kie.api.event.process.ProcessStartedEvent;
Expand All @@ -31,12 +32,15 @@
import org.kie.kogito.KogitoGAV;
import org.kie.kogito.internal.process.event.DefaultKogitoProcessEventListener;
import org.kie.kogito.internal.process.runtime.KogitoNodeInstance;
import org.kie.kogito.internal.process.runtime.KogitoProcessInstance;
import org.kie.kogito.internal.process.runtime.KogitoWorkItemNodeInstance;
import org.kie.kogito.internal.process.runtime.KogitoWorkflowProcessInstance;
import org.kie.kogito.internal.utils.KogitoTags;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Counter.Builder;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
Expand All @@ -56,102 +60,129 @@ public MetricsProcessEventListener(String identifier, KogitoGAV gav, MeterRegist
this.meterRegistry = meterRegistry;
}

private Counter getNumberOfProcessInstancesStartedCounter(String appId, String processId) {
return Counter
.builder("kogito_process_instance_started_total")
.description("Started Process Instances")
.tags(Arrays.asList(Tag.of("app_id", appId), Tag.of("process_id", processId), Tag.of("artifactId", gav.getArtifactId()), Tag.of("version", gav.getVersion())))
.register(meterRegistry);
protected Counter buildCounter(String name, String description, String processId, Tag... tags) {
Builder builder = Counter.builder(name)
.description(description)
.tag("app_id", identifier).tag("process_id", processId).tag("artifactId", gav.getArtifactId()).tag("version", gav.getVersion());
for (Tag tag : tags) {
builder.tag(tag.getKey(), tag.getValue());
}
return builder.register(meterRegistry);
}

private Counter getNumberOfSLAsViolatedCounter(String appId, String processId, String nodeName) {
return Counter
.builder("kogito_process_instance_sla_violated_total")
.description("Process Instances SLA Violated")
.tags(Arrays.asList(Tag.of("app_id", appId), Tag.of("process_id", processId), Tag.of("node_name", nodeName), Tag.of("artifactId", gav.getArtifactId()),
Tag.of("version", gav.getVersion())))
.register(meterRegistry);
protected AtomicInteger buildGauge(String name, String description, String processId, Tag... tags) {
AtomicInteger atomicInteger = new AtomicInteger(0);
io.micrometer.core.instrument.Gauge.Builder<AtomicInteger> builder = Gauge.builder(name, atomicInteger, AtomicInteger::doubleValue)
.description(description)
.tag("app_id", identifier).tag("process_id", processId).tag("artifactId", gav.getArtifactId()).tag("version", gav.getVersion());
for (Tag tag : tags) {
builder.tag(tag.getKey(), tag.getValue());
}
builder.register(meterRegistry);
return atomicInteger;
}

private Counter getNumberOfProcessInstancesCompletedCounter(String appId, String processId, String nodeName) {
return Counter
.builder("kogito_process_instance_completed_total")
.description("Completed Process Instances")
.tags(Arrays.asList(Tag.of("app_id", appId), Tag.of("process_id", processId), Tag.of("node_name", nodeName), Tag.of("artifactId", gav.getArtifactId()),
Tag.of("version", gav.getVersion())))
.register(meterRegistry);
protected DistributionSummary buildDistributionSummary(String name, String description, Tag... tags) {
io.micrometer.core.instrument.DistributionSummary.Builder builder = DistributionSummary.builder(name)
.description(description).tag("artifactId", gav.getArtifactId()).tag("version", gav.getVersion());
for (Tag tag : tags) {
builder.tag(tag.getKey(), tag.getValue());
}
return builder.register(meterRegistry);
}

private AtomicInteger getRunningProcessInstancesGauge(String appId, String processId) {
if (gaugeMap.containsKey(appId + processId)) {
return gaugeMap.get(appId + processId);
}
AtomicInteger atomicInteger = new AtomicInteger(0);
Gauge.builder("kogito_process_instance_running_total", atomicInteger, AtomicInteger::doubleValue)
.description("Running Process Instances")
.tags(Arrays.asList(Tag.of("app_id", appId), Tag.of("process_id", processId), Tag.of("artifactId", gav.getArtifactId()), Tag.of("version", gav.getVersion())))
.register(meterRegistry);
gaugeMap.put(appId + processId, atomicInteger);
return atomicInteger;
private Counter getNumberOfProcessInstancesStartedCounter(String processId) {
return buildCounter("kogito_process_instance_started_total", "Started Process Instances", processId);
}

private Counter getErrorCounter(String processId, String errorMessage) {
return buildCounter("kogito_process_instance_error", "Number of errors that has occurred", processId, Tag.of("error_message", errorMessage));
}

private Counter getNumberOfSLAsViolatedCounter(String processId, String nodeName) {
return buildCounter("kogito_process_instance_sla_violated_total", "Number of SLA violations that has ocurred", processId, Tag.of("node_name", nodeName));
}

private Counter getNumberOfProcessInstancesCompletedCounter(String processId, String state) {
return buildCounter("kogito_process_instance_completed_total", "Completed Process Instances", processId, Tag.of("process_state", state));
}

private AtomicInteger getRunningProcessInstancesGauge(String processId) {
return gaugeMap.computeIfAbsent(identifier + processId, k -> buildGauge("kogito_process_instance_running_total", "Running Process Instances", processId));
}

private DistributionSummary getProcessInstancesDurationSummary(String appId, String processId) {
return DistributionSummary.builder("kogito_process_instance_duration_seconds")
.description("Process Instances Duration")
.tags(Arrays.asList(Tag.of("app_id", appId), Tag.of("process_id", processId), Tag.of("artifactId", gav.getArtifactId()), Tag.of("version", gav.getVersion())))
.register(meterRegistry);
private DistributionSummary getProcessInstancesDurationSummary(String processId) {
return buildDistributionSummary("kogito_process_instance_duration_seconds",
"Process Instances Duration", Tag.of("process_id", processId), Tag.of("app_id", identifier));
}

private DistributionSummary getWorkItemsDurationSummary(String name) {
return DistributionSummary.builder("kogito_work_item_duration_seconds")
.description("Work Items Duration")
.tags(Arrays.asList(Tag.of("name", name), Tag.of("artifactId", gav.getArtifactId()), Tag.of("version", gav.getVersion())))
.register(meterRegistry);
return buildDistributionSummary("kogito_work_item_duration_seconds",
"Work Items Duration", Tag.of("name", name));
}

private DistributionSummary getNodeInstancesDurationSummary(String processId, String nodeName) {
return buildDistributionSummary("kogito_node_instance_duration_milliseconds", "Relevant nodes duration in milliseconds", Tag.of("process_id", processId),
Tag.of("node_name", nodeName));
}

protected void recordRunningProcessInstance(String containerId, String processId) {
getRunningProcessInstancesGauge(containerId, processId).incrementAndGet();
protected void recordRunningProcessInstance(String processId) {
getRunningProcessInstancesGauge(processId).incrementAndGet();
}

protected static double millisToSeconds(long millis) {
return millis / 1000.0;
return TimeUnit.MILLISECONDS.toSeconds(millis);
}

@Override
public void afterProcessStarted(ProcessStartedEvent event) {
LOGGER.debug("After process started event: {}", event);
final ProcessInstance processInstance = event.getProcessInstance();
getNumberOfProcessInstancesStartedCounter(identifier, processInstance.getProcessId()).increment();
recordRunningProcessInstance(identifier, processInstance.getProcessId());
getNumberOfProcessInstancesStartedCounter(processInstance.getProcessId()).increment();
recordRunningProcessInstance(processInstance.getProcessId());
}

@Override
public void afterProcessCompleted(ProcessCompletedEvent event) {
LOGGER.debug("After process completed event: {}", event);
final KogitoWorkflowProcessInstance processInstance = (KogitoWorkflowProcessInstance) event.getProcessInstance();
getRunningProcessInstancesGauge(identifier, processInstance.getProcessId()).decrementAndGet();
getRunningProcessInstancesGauge(processInstance.getProcessId()).decrementAndGet();

getNumberOfProcessInstancesCompletedCounter(identifier, processInstance.getProcessId(), String.valueOf(processInstance.getState())).increment();
getNumberOfProcessInstancesCompletedCounter(processInstance.getProcessId(), fromState(processInstance.getState())).increment();

if (processInstance.getStartDate() != null) {
final double duration = millisToSeconds(processInstance.getEndDate().getTime() - processInstance.getStartDate().getTime());
getProcessInstancesDurationSummary(identifier, processInstance.getProcessId()).record(duration);
getProcessInstancesDurationSummary(processInstance.getProcessId()).record(duration);
LOGGER.debug("Process Instance duration: {}s", duration);
}
}

@Override
public void onError(ErrorEvent event) {
LOGGER.debug("After Error event: {}", event);
final KogitoWorkflowProcessInstance processInstance = (KogitoWorkflowProcessInstance) event.getProcessInstance();
getErrorCounter(processInstance.getProcessId(), processInstance.getErrorMessage()).increment();
}

@Override
public void beforeNodeLeft(ProcessNodeLeftEvent event) {
LOGGER.debug("Before Node left event: {}", event);
final KogitoNodeInstance nodeInstance = (KogitoNodeInstance) event.getNodeInstance();
if (nodeInstance instanceof KogitoWorkItemNodeInstance) {
KogitoWorkItemNodeInstance wi = (KogitoWorkItemNodeInstance) nodeInstance;
if (wi.getTriggerTime() != null) {
final String name = (String) wi.getWorkItem().getParameters().getOrDefault("TaskName", wi.getWorkItem().getName());
final double duration = millisToSeconds(wi.getLeaveTime().getTime() - wi.getTriggerTime().getTime());
getWorkItemsDurationSummary(name).record(duration);
LOGGER.debug("Work Item {}, duration: {}s", name, duration);
}
recordNodeDuration(getWorkItemsDurationSummary((String) wi.getWorkItem().getParameters().getOrDefault("TaskName", wi.getWorkItem().getName())), nodeInstance, TimeUnit.SECONDS);
}
String nodeName = (String) nodeInstance.getNode().getMetaData().get(KogitoTags.METRIC_NAME_METADATA);
if (nodeName != null) {
recordNodeDuration(getNodeInstancesDurationSummary(event.getProcessInstance().getProcessId(), nodeName), nodeInstance, TimeUnit.MILLISECONDS);
}
}

private void recordNodeDuration(DistributionSummary summary, KogitoNodeInstance instance, TimeUnit target) {
if (instance.getTriggerTime() != null) {
double duration = target.convert(instance.getLeaveTime().getTime() - instance.getTriggerTime().getTime(), TimeUnit.MILLISECONDS);
summary.record(duration);
LOGGER.debug("Recorded {} {} because of node {} for summary {}", duration, target, instance.getNode().getName(), summary.getId().getName());
}
}

Expand All @@ -160,7 +191,22 @@ public void afterSLAViolated(SLAViolatedEvent event) {
LOGGER.debug("After SLA violated event: {}", event);
final ProcessInstance processInstance = event.getProcessInstance();
if (processInstance != null && event.getNodeInstance() != null) {
getNumberOfSLAsViolatedCounter(identifier, processInstance.getProcessId(), event.getNodeInstance().getNodeName()).increment();
getNumberOfSLAsViolatedCounter(processInstance.getProcessId(), event.getNodeInstance().getNodeName()).increment();
}
}

private static String fromState(int state) {
switch (state) {
case KogitoProcessInstance.STATE_ABORTED:
return "Aborted";
case KogitoProcessInstance.STATE_COMPLETED:
return "Completed";
case KogitoProcessInstance.STATE_ERROR:
return "Error";
default:
case KogitoProcessInstance.STATE_ACTIVE:
return "Active";
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;

public class PostgreSQLCorrelationRepository {
public class JDBCCorrelationRepository {

static final String INSERT = "INSERT INTO correlation_instances (id, encoded_correlation_id, correlated_id, correlation) VALUES (?, ?, ?, ?::json)";
static final String INSERT = "INSERT INTO correlation_instances (id, encoded_correlation_id, correlated_id, correlation) VALUES (?, ?, ?, ?)";
static final String DELETE = "DELETE FROM correlation_instances WHERE encoded_correlation_id = ?";
private static final String FIND_BY_ENCODED_ID = "SELECT correlated_id, correlation FROM correlation_instances WHERE encoded_correlation_id = ?";
private static final String FIND_BY_CORRELATED_ID = "SELECT encoded_correlation_id, correlation FROM correlation_instances WHERE correlated_id = ?";

private DataSource dataSource;
private ObjectMapper objectMapper;

public PostgreSQLCorrelationRepository(DataSource dataSource) {
public JDBCCorrelationRepository(DataSource dataSource) {
this.dataSource = dataSource;
this.objectMapper = ObjectMapperFactory.get().copy();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import org.kie.kogito.correlation.CorrelationService;
import org.kie.kogito.event.correlation.MD5CorrelationEncoder;

public class PostgreSQLCorrelationService implements CorrelationService {
public class JDBCCorrelationService implements CorrelationService {

private PostgreSQLCorrelationRepository repository;
private JDBCCorrelationRepository repository;
private CorrelationEncoder correlationEncoder;

public PostgreSQLCorrelationService(DataSource dataSource) {
this.repository = new PostgreSQLCorrelationRepository(dataSource);
public JDBCCorrelationService(DataSource dataSource) {
this.repository = new JDBCCorrelationRepository(dataSource);
this.correlationEncoder = new MD5CorrelationEncoder();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
CREATE TABLE process_instances
(
id CHAR(36) NOT NULL,
payload BLOB NOT NULL,
process_id VARCHAR(4000) NOT NULL,
version BIGINT(19),
process_version VARCHAR(4000),
id character(36) NOT NULL,
payload varbinary(1000000) NOT NULL,
process_id character varying(4000) NOT NULL,
version bigint,
process_version character varying(4000),
CONSTRAINT process_instances_pkey PRIMARY KEY (id)
);
CREATE INDEX idx_process_instances_process_id ON process_instances (process_id, id, process_version);
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE correlation_instances
(
id character(36) NOT NULL,
encoded_correlation_id character varying(36) NOT NULL UNIQUE,
correlated_id character varying(36) NOT NULL,
correlation character varying(8000) NOT NULL,
version bigint,
CONSTRAINT correlation_instances_pkey PRIMARY KEY (id)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE correlation_instances
ALTER COLUMN correlation TYPE character varying;
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.kie.kogito.correlation.CompositeCorrelation;
import org.kie.kogito.correlation.CorrelationInstance;
import org.kie.kogito.correlation.SimpleCorrelation;
import org.kie.kogito.persistence.jdbc.correlation.PostgreSQLCorrelationService;
import org.kie.kogito.persistence.jdbc.correlation.JDBCCorrelationService;
import org.kie.kogito.testcontainers.KogitoPostgreSqlContainer;
import org.postgresql.ds.PGSimpleDataSource;
import org.testcontainers.containers.JdbcDatabaseContainer;
Expand All @@ -42,15 +42,15 @@ public class JDBCCorrelationServiceIT {
@Container
private static final KogitoPostgreSqlContainer PG_CONTAINER = new KogitoPostgreSqlContainer();
private static PGSimpleDataSource dataSource;
private static PostgreSQLCorrelationService correlationService;
private static JDBCCorrelationService correlationService;

@BeforeAll
public static void setUp() {
dataSource = new PGSimpleDataSource();
dataSource.setUrl(PG_CONTAINER.getJdbcUrl());
dataSource.setUser(PG_CONTAINER.getUsername());
dataSource.setPassword(PG_CONTAINER.getPassword());
correlationService = new PostgreSQLCorrelationService(dataSource);
correlationService = new JDBCCorrelationService(dataSource);
//create table
// DDLRunner.init(new GenericRepository(dataSource), true);
initMigration(PG_CONTAINER, "postgresql");
Expand Down
1 change: 0 additions & 1 deletion api/kogito-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
<scope>provided</scope>
<version>4.0.1</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ public static boolean isNotEmpty(String value) {
return !isEmpty(value);
}

public static String sanitizeString(String string) {
return string.replaceAll("\"", "\\\\\"");
}

public static String sanitizeClassName(String className) {
return sanitizeJavaName(className, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ public class KogitoTags {
public static final String INPUT_TAG = "input";
public static final String OUTPUT_TAG = "output";

public static final String METRIC_NAME_METADATA = "MetricName";

}
Loading

0 comments on commit 6cea3ee

Please sign in to comment.