Skip to content

Commit

Permalink
[sql-functions] add ExpandJiraTicketComponents2 variant with POJO
Browse files Browse the repository at this point in the history
  • Loading branch information
NicoK committed May 17, 2021
1 parent 9edf7ac commit 14e2045
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.ververica.platform.sql.functions;

import com.ververica.platform.PatternUtils;
import java.util.regex.Matcher;
import org.apache.flink.table.functions.TableFunction;

/**
* Table SQL function to extract the Jira components of a ticket created/change email from the text
* body field sent by Jira.
*
* @see GetJiraTicketComponents for a Scalar function version of this
*/
@SuppressWarnings("unused")
public class ExpandJiraTicketComponents2
extends TableFunction<ExpandJiraTicketComponents2.Component> {

public static class Component {
public String component;
public Integer componentCount;

public Component(String component, Integer componentCount) {
this.component = component;
this.componentCount = componentCount;
}
}

public void eval(String textBody) {
if (textBody != null) {
Matcher matcher = PatternUtils.EMAIL_BODY_JIRA_TICKET_COMPONENTS_PATTERN.matcher(textBody);
if (matcher.find()) {
String[] components =
PatternUtils.EMAIL_BODY_JIRA_TICKET_COMPONENTS_SPLIT_PATTERN.split(
matcher.group("components"));
for (String component : components) {
collect(new Component(component, components.length));
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package com.ververica.platform.sql.functions;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;

import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.junit.Before;
import org.junit.Test;

/** Integration test for {@link ExpandJiraTicketComponents}. */
public class ExpandJiraTicketComponents2ITCase extends AbstractTableTestBase {

protected StreamExecutionEnvironment env;
protected StreamTableEnvironment tEnv;

@Before
public void setUp() {
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
tEnv =
StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.setStateBackend(new RocksDBStateBackend((StateBackend) new MemoryStateBackend()));

tEnv.createTemporaryFunction("ExpandJiraTicketComponents", ExpandJiraTicketComponents2.class);
}

private void createSource(Row... inputData) {
final String createSource =
String.format(
"CREATE TABLE input ( \n"
+ " `textBody` STRING\n"
+ ") WITH (\n"
+ " 'connector' = 'values',\n"
+ " 'data-id' = '%s'\n"
+ ")",
TestValuesTableFactory.registerData(Arrays.asList(inputData)));
tEnv.executeSql(createSource);
}

private List<Row> executeSql() throws Exception {
TableResult resultTable =
tEnv.executeSql(
"SELECT component, componentCount FROM input LEFT JOIN LATERAL TABLE(ExpandJiraTicketComponents(textBody)) ON TRUE");
return getRowsFromTable(resultTable);
}

@Test
public void testWithString1() throws Exception {
createSource(Row.of(""));

List<Row> rawResult = executeSql();

assertThat(rawResult, containsInAnyOrder(Row.ofKind(RowKind.INSERT, null, null)));
}

@Test
public void testWithString2() throws Exception {
createSource(Row.of(" Components: Tests"));

List<Row> rawResult = executeSql();

assertThat(rawResult, containsInAnyOrder(Row.ofKind(RowKind.INSERT, "Tests", 1)));
}

@Test
public void testWithString3() throws Exception {
createSource(
Row.of(" Components: Tests, Formats (JSON, Avro, Parquet, ORC, SequenceFile)"));

List<Row> rawResult = executeSql();

assertThat(
rawResult,
containsInAnyOrder(
Row.ofKind(RowKind.INSERT, "Tests", 2),
Row.ofKind(RowKind.INSERT, "Formats (JSON, Avro, Parquet, ORC, SequenceFile)", 2)));
}

@Test
public void testWithString4() throws Exception {
createSource(
Row.of(" Components: Tests"),
Row.of(" Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)"));

List<Row> rawResult = executeSql();

assertThat(
rawResult,
containsInAnyOrder(
Row.ofKind(RowKind.INSERT, "Tests", 1),
Row.ofKind(RowKind.INSERT, "Formats (JSON, Avro, Parquet, ORC, SequenceFile)", 1)));
}
}

0 comments on commit 14e2045

Please sign in to comment.