From 2a3f8a299a32d9084dac4618317af9804752c1e4 Mon Sep 17 00:00:00 2001 From: RickyHuo Date: Wed, 19 Sep 2018 10:23:46 +0800 Subject: [PATCH] Update filter/JavaSubstring.java --- .../waterdrop/filter/JavaSubstring.java | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/interestinglab/waterdrop/filter/JavaSubstring.java b/src/main/java/org/interestinglab/waterdrop/filter/JavaSubstring.java index 262657d..088cbd7 100644 --- a/src/main/java/org/interestinglab/waterdrop/filter/JavaSubstring.java +++ b/src/main/java/org/interestinglab/waterdrop/filter/JavaSubstring.java @@ -5,12 +5,17 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import static org.apache.spark.sql.functions.callUDF; import static org.apache.spark.sql.functions.col; -import static org.apache.spark.sql.functions.substring; +import static org.apache.spark.sql.functions.lit; import io.github.interestinglab.waterdrop.apis.BaseFilter; import com.typesafe.config.Config; +import org.apache.spark.sql.api.java.UDF1; +import org.apache.spark.sql.api.java.UDF3; +import org.apache.spark.sql.expressions.UserDefinedFunction; +import org.apache.spark.sql.types.DataTypes; import scala.Tuple2; import java.util.HashMap; @@ -54,8 +59,22 @@ public void prepare(SparkSession spark) { public Dataset process(SparkSession spark, Dataset df) { String srcField = config.getString("source_field"); String targetField = config.getString("target_field"); - int pos = config.getInt("pos"); - int len = config.getInt("len"); - return df.withColumn(targetField, substring(col(srcField), pos, len)); + + UDF3 func = (String src, Integer pos, Integer len) -> (defineSubstring(src, pos, len)); +// UDF3 func = new UDF3() { +// @Override +// public String call(String src, Integer pos, Integer len) throws Exception { +// return defineSubstring(src, pos, len); +// } +// }; + + int pos = this.config.getInt("pos"); + int len = this.config.getInt("len"); + spark.udf().register("func", func, DataTypes.StringType); + return df.withColumn(targetField, callUDF("func", col(srcField), lit(pos), lit(len))); + } + + private String defineSubstring(String src, int pos, int len) { + return src.substring(pos, pos + len); } }