diff --git a/sql-functions/src/main/java/com/ververica/platform/sql/functions/ExpandJiraTicketComponents2.java b/sql-functions/src/main/java/com/ververica/platform/sql/functions/ExpandJiraTicketComponents2.java new file mode 100644 index 0000000..c53e4aa --- /dev/null +++ b/sql-functions/src/main/java/com/ververica/platform/sql/functions/ExpandJiraTicketComponents2.java @@ -0,0 +1,40 @@ +package com.ververica.platform.sql.functions; + +import com.ververica.platform.PatternUtils; +import java.util.regex.Matcher; +import org.apache.flink.table.functions.TableFunction; + +/** + * Table SQL function to extract the Jira components of a ticket created/change email from the text + * body field sent by Jira. + * + * @see GetJiraTicketComponents for a Scalar function version of this + */ +@SuppressWarnings("unused") +public class ExpandJiraTicketComponents2 + extends TableFunction { + + public static class Component { + public String component; + public Integer componentCount; + + public Component(String component, Integer componentCount) { + this.component = component; + this.componentCount = componentCount; + } + } + + public void eval(String textBody) { + if (textBody != null) { + Matcher matcher = PatternUtils.EMAIL_BODY_JIRA_TICKET_COMPONENTS_PATTERN.matcher(textBody); + if (matcher.find()) { + String[] components = + PatternUtils.EMAIL_BODY_JIRA_TICKET_COMPONENTS_SPLIT_PATTERN.split( + matcher.group("components")); + for (String component : components) { + collect(new Component(component, components.length)); + } + } + } + } +} diff --git a/sql-functions/src/test/java/com/ververica/platform/sql/functions/ExpandJiraTicketComponents2ITCase.java b/sql-functions/src/test/java/com/ververica/platform/sql/functions/ExpandJiraTicketComponents2ITCase.java new file mode 100644 index 0000000..32c6c69 --- /dev/null +++ b/sql-functions/src/test/java/com/ververica/platform/sql/functions/ExpandJiraTicketComponents2ITCase.java @@ -0,0 +1,107 @@ +package com.ververica.platform.sql.functions; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; + +import java.util.Arrays; +import java.util.List; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.junit.Before; +import org.junit.Test; + +/** Integration test for {@link ExpandJiraTicketComponents}. */ +public class ExpandJiraTicketComponents2ITCase extends AbstractTableTestBase { + + protected StreamExecutionEnvironment env; + protected StreamTableEnvironment tEnv; + + @Before + public void setUp() { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + tEnv = + StreamTableEnvironment.create( + env, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.setStateBackend(new RocksDBStateBackend((StateBackend) new MemoryStateBackend())); + + tEnv.createTemporaryFunction("ExpandJiraTicketComponents", ExpandJiraTicketComponents2.class); + } + + private void createSource(Row... inputData) { + final String createSource = + String.format( + "CREATE TABLE input ( \n" + + " `textBody` STRING\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'data-id' = '%s'\n" + + ")", + TestValuesTableFactory.registerData(Arrays.asList(inputData))); + tEnv.executeSql(createSource); + } + + private List executeSql() throws Exception { + TableResult resultTable = + tEnv.executeSql( + "SELECT component, componentCount FROM input LEFT JOIN LATERAL TABLE(ExpandJiraTicketComponents(textBody)) ON TRUE"); + return getRowsFromTable(resultTable); + } + + @Test + public void testWithString1() throws Exception { + createSource(Row.of("")); + + List rawResult = executeSql(); + + assertThat(rawResult, containsInAnyOrder(Row.ofKind(RowKind.INSERT, null, null))); + } + + @Test + public void testWithString2() throws Exception { + createSource(Row.of(" Components: Tests")); + + List rawResult = executeSql(); + + assertThat(rawResult, containsInAnyOrder(Row.ofKind(RowKind.INSERT, "Tests", 1))); + } + + @Test + public void testWithString3() throws Exception { + createSource( + Row.of(" Components: Tests, Formats (JSON, Avro, Parquet, ORC, SequenceFile)")); + + List rawResult = executeSql(); + + assertThat( + rawResult, + containsInAnyOrder( + Row.ofKind(RowKind.INSERT, "Tests", 2), + Row.ofKind(RowKind.INSERT, "Formats (JSON, Avro, Parquet, ORC, SequenceFile)", 2))); + } + + @Test + public void testWithString4() throws Exception { + createSource( + Row.of(" Components: Tests"), + Row.of(" Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)")); + + List rawResult = executeSql(); + + assertThat( + rawResult, + containsInAnyOrder( + Row.ofKind(RowKind.INSERT, "Tests", 1), + Row.ofKind(RowKind.INSERT, "Formats (JSON, Avro, Parquet, ORC, SequenceFile)", 1))); + } +}