Skip to content

Commit

Permalink
feat(#3282): Add new rule and rule description for regex rule
Browse files Browse the repository at this point in the history
  • Loading branch information
tenthe committed Oct 4, 2024
1 parent d4331d5 commit ffd5e7c
Show file tree
Hide file tree
Showing 13 changed files with 372 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.ChangeDatatypeTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.CorrectionValueTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.RegexTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.TimestampTranfsformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.UnitTransformRuleDescription;
import org.apache.streampipes.model.schema.EventProperty;
Expand Down Expand Up @@ -90,6 +91,11 @@ public void visit(RenameRuleDescription rule) {
property.setRuntimeName(rule.getOldRuntimeKey());
}

@Override
public void visit(RegexTransformationRuleDescription rule) {
// does not affect schema
}

@Override
public void visit(EventRateTransformationRuleDescription rule) {
// does not affect schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.ChangeDatatypeTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.CorrectionValueTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.RegexTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.TimestampTranfsformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.UnitTransformRuleDescription;
import org.apache.streampipes.model.schema.EventProperty;
Expand Down Expand Up @@ -95,6 +96,7 @@ public void visit(RenameRuleDescription rule) {
property.setRuntimeName(rule.getNewRuntimeKey());
}


@Override
public void visit(EventRateTransformationRuleDescription rule) {
// does not affect schema
Expand Down Expand Up @@ -149,6 +151,16 @@ public void visit(CorrectionValueTransformationRuleDescription rule) {
metadata.put("correctionValue", rule.getCorrectionValue());
}

@Override
public void visit(RegexTransformationRuleDescription rule) {
var property = findPrimitiveProperty(properties, rule.getRuntimeKey());
var metadata = property.getAdditionalMetadata();

metadata.put("regex", rule.getRegex());
metadata.put("replaceWith", rule.getReplaceWith());
metadata.put("replaceAll", rule.isReplaceAll());
}

@Override
public void visit(TimestampTranfsformationRuleDescription rule) {
var property = findPrimitiveProperty(properties, rule.getRuntimeKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.ChangeDatatypeTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.CorrectionValueTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.RegexTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.TimestampTranfsformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.UnitTransformRuleDescription;

Expand All @@ -55,6 +56,11 @@ public void visit(RenameRuleDescription rule) {
// skip (not a stateful transformation)
}

@Override
public void visit(RegexTransformationRuleDescription rule) {
// skip (not a stateful transformation)
}

@Override
public void visit(EventRateTransformationRuleDescription ruleDesc) {
rules.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.streampipes.connect.shared.preprocessing.transform.value.AddTimestampTransformationRule;
import org.apache.streampipes.connect.shared.preprocessing.transform.value.CorrectionValueTransformationRule;
import org.apache.streampipes.connect.shared.preprocessing.transform.value.DatatypeTransformationRule;
import org.apache.streampipes.connect.shared.preprocessing.transform.value.RegexTransformationRule;
import org.apache.streampipes.connect.shared.preprocessing.transform.value.TimestampTranformationRuleMode;
import org.apache.streampipes.connect.shared.preprocessing.transform.value.TimestampTransformationRule;
import org.apache.streampipes.connect.shared.preprocessing.transform.value.UnitTransformationRule;
Expand All @@ -40,6 +41,7 @@
import org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.ChangeDatatypeTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.CorrectionValueTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.RegexTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.TimestampTranfsformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.UnitTransformRuleDescription;

Expand Down Expand Up @@ -71,6 +73,15 @@ public void visit(RenameRuleDescription ruleDesc) {
Utils.getLastKey(ruleDesc.getNewRuntimeKey())));
}

@Override
public void visit(RegexTransformationRuleDescription rule) {
rules.add(new RegexTransformationRule(
Utils.toKeyArray(rule.getRuntimeKey()),
rule.getRegex(),
rule.getReplaceWith(),
rule.isReplaceAll()));
}

@Override
public void visit(EventRateTransformationRuleDescription ruleDesc) {
// Do nothing
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.connect.shared.preprocessing.transform.value;

import org.apache.streampipes.connect.shared.preprocessing.SupportsNestedTransformationRule;

import java.util.List;
import java.util.Map;

public class RegexTransformationRule extends SupportsNestedTransformationRule {

private final List<String> eventKeys;
private final String regex;
private final String replaceWith;
private final boolean replaceAll;

public RegexTransformationRule(
List<String> eventKeys,
String regex,
String replaceWith,
boolean replaceAll
) {
this.eventKeys = eventKeys;
this.regex = regex;
this.replaceWith = replaceWith;
this.replaceAll = replaceAll;
}

@Override
protected List<String> getEventKeys() {
return eventKeys;
}

@Override
protected void applyTransformation(Map<String, Object> event, List<String> eventKey) {
var oldValue = event.get(eventKey.get(0));

var newValue = "";

if (oldValue instanceof String oldStringValue) {

if (this.replaceAll) {
newValue = oldStringValue.replaceAll(regex, this.replaceWith);
} else {
newValue = oldStringValue.replaceFirst(regex, this.replaceWith);
}

event.put(eventKey.get(0), newValue);
} else {
// add empty string if key is not present or wrong data type
event.put(eventKey.get(0), newValue);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.streampipes.model.connect.rules.schema.DeleteRuleDescription;
import org.apache.streampipes.model.connect.rules.schema.MoveRuleDescription;
import org.apache.streampipes.model.connect.rules.schema.RenameRuleDescription;
import org.apache.streampipes.model.connect.rules.value.RegexTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.UnitTransformRuleDescription;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyNested;
Expand All @@ -34,7 +35,8 @@
public class Helpers {

public static String getUnit(EventProperty eventProperty) {
return ((EventPropertyPrimitive) eventProperty).getMeasurementUnit().toString();
return ((EventPropertyPrimitive) eventProperty).getMeasurementUnit()
.toString();
}

public static List<EventProperty> makeSimpleProperties(boolean addTimestamp) {
Expand Down Expand Up @@ -67,16 +69,35 @@ public static UnitTransformRuleDescription makeUnitTransformationRule(String run
return rule;
}

public static MoveRuleDescription makeMoveTransformationRule(String runtimeKey,
String newRuntimeKey) {
public static RegexTransformationRuleDescription makeRegexTransformationRule(
String runtimeKey,
String regex,
String replaceWith,
boolean replaceAll
) {
var rule = new RegexTransformationRuleDescription();
rule.setRuntimeKey(runtimeKey);
rule.setRegex(regex);
rule.setReplaceWith(replaceWith);
rule.setReplaceAll(replaceAll);

return rule;
}

public static MoveRuleDescription makeMoveTransformationRule(
String runtimeKey,
String newRuntimeKey
) {
var rule = new MoveRuleDescription();
rule.setOldRuntimeKey(runtimeKey);
rule.setNewRuntimeKey(newRuntimeKey);
return rule;
}

public static RenameRuleDescription makeRenameTransformationRule(String runtimeKey,
String newRuntimeName) {
public static RenameRuleDescription makeRenameTransformationRule(
String runtimeKey,
String newRuntimeName
) {
var rule = new RenameRuleDescription();
rule.setOldRuntimeKey(runtimeKey);
rule.setNewRuntimeKey(newRuntimeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.Labels;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
Expand All @@ -37,6 +36,7 @@
import static org.apache.streampipes.connect.shared.preprocessing.convert.Helpers.makeNestedProperties;
import static org.apache.streampipes.connect.shared.preprocessing.convert.Helpers.makeSimpleProperties;
import static org.apache.streampipes.connect.shared.preprocessing.convert.Helpers.makeUnitTransformationRule;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class ToOriginalSchemaConverterTest {

Expand All @@ -52,8 +52,8 @@ public void testSimpleUnitConversion() {

var resultProperties = executeAndReturnResult(properties, rules);

Assertions.assertEquals(3, resultProperties.size());
Assertions.assertEquals("originalUnit", getUnit(resultProperties.get(0)));
assertEquals(3, resultProperties.size());
assertEquals("originalUnit", getUnit(resultProperties.get(0)));
}

@Test
Expand All @@ -68,8 +68,8 @@ public void testNestedUnitConversion() {
var nestedResultProperty = ((EventPropertyNested) resultProperties.get(1)).getEventProperties()
.get(0);

Assertions.assertEquals(2, resultProperties.size());
Assertions.assertEquals("originalUnit", getUnit(nestedResultProperty));
assertEquals(2, resultProperties.size());
assertEquals("originalUnit", getUnit(nestedResultProperty));
}

@Test
Expand All @@ -84,13 +84,13 @@ public void testSimpleMoveConversion() {
rules.add(makeMoveTransformationRule("epToBeMoved", "nested"));

var result = executeAndReturnResult(properties, rules);
Assertions.assertEquals(3, result.size());
Assertions.assertEquals(
assertEquals(3, result.size());
assertEquals(
"timestamp",
result.get(0)
.getRuntimeName()
);
Assertions.assertEquals(
assertEquals(
2,
((EventPropertyNested) result.get(1)).getEventProperties()
.size()
Expand All @@ -104,8 +104,8 @@ public void testDeleteRule() {

rules.add(makeDeleteTransformationRule("epToBeRestored"));
var result = executeAndReturnResult(properties, rules);
Assertions.assertEquals(4, result.size());
Assertions.assertEquals(
assertEquals(4, result.size());
assertEquals(
"epToBeRestored",
result.get(3)
.getRuntimeName()
Expand Down
Loading

0 comments on commit ffd5e7c

Please sign in to comment.