Skip to content

Commit

Permalink
Merge branch 'rickyhuo.enhance.javasubstring'
Browse files Browse the repository at this point in the history
  • Loading branch information
RickyHuo committed Jan 28, 2019
2 parents 96d1495 + 2a3f8a2 commit c85e099
Showing 1 changed file with 23 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import static org.apache.spark.sql.functions.callUDF;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.substring;
import static org.apache.spark.sql.functions.lit;

import io.github.interestinglab.waterdrop.apis.BaseFilter;

import com.typesafe.config.Config;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.api.java.UDF3;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import scala.Tuple2;

import java.util.HashMap;
Expand Down Expand Up @@ -54,8 +59,22 @@ public void prepare(SparkSession spark) {
public Dataset<Row> process(SparkSession spark, Dataset<Row> df) {
String srcField = config.getString("source_field");
String targetField = config.getString("target_field");
int pos = config.getInt("pos");
int len = config.getInt("len");
return df.withColumn(targetField, substring(col(srcField), pos, len));

UDF3<String, Integer, Integer, String> func = (String src, Integer pos, Integer len) -> (defineSubstring(src, pos, len));
// UDF3 func = new UDF3<String, Integer, Integer, String>() {
// @Override
// public String call(String src, Integer pos, Integer len) throws Exception {
// return defineSubstring(src, pos, len);
// }
// };

int pos = this.config.getInt("pos");
int len = this.config.getInt("len");
spark.udf().register("func", func, DataTypes.StringType);
return df.withColumn(targetField, callUDF("func", col(srcField), lit(pos), lit(len)));
}

private String defineSubstring(String src, int pos, int len) {
return src.substring(pos, pos + len);
}
}

0 comments on commit c85e099

Please sign in to comment.