Skip to content

Commit

Permalink
Closes openaire#1099: Reduce IIS execution time - TARA project refere…
Browse files Browse the repository at this point in the history
…nce extraction caching

Adding new implementation for TARA project reference extraction using spark pipe with implementation of cache. Also adding support for dataframes and datasets using spark-avro package. 

Adding one new workflow parameter
* tara_cache_root_dir - tara reference extraction cache root directory
  • Loading branch information
przemyslawjacewicz authored Jul 20, 2020
1 parent add943b commit 4323a80
Show file tree
Hide file tree
Showing 47 changed files with 2,189 additions and 807 deletions.
5 changes: 5 additions & 0 deletions iis-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@
<artifactId>spark-sql_2.11</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
</dependency>

<dependency>
<groupId>pl.edu.icm.spark-utils</groupId>
<artifactId>spark-utils_2.11</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package eu.dnetlib.iis.common.lock;

import org.apache.hadoop.conf.Configuration;

import java.lang.reflect.Constructor;

public class LockManagerUtils {

private LockManagerUtils() {
}

public static LockManager instantiateLockManager(String lockManagerFactoryClassName,
Configuration config) throws Exception {
Class<?> clazz = Class.forName(lockManagerFactoryClassName);
Constructor<?> constructor = clazz.getConstructor();
LockManagerFactory lockManagerFactory = (LockManagerFactory) constructor.newInstance();
return lockManagerFactory.instantiate(config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package eu.dnetlib.iis.common.spark.pipe;

import java.io.IOException;

/**
* Abstraction of execution environment for scripts and commands run using 'pipe' method on RDD.
* <p>
* Classes implementing this interface should propagate any necessary files and directories to cluster nodes and define
* the command to be run by 'pipe' method. This interface allows to separate production code and test code by providing
* different implementation for each execution environment.
*/
public interface PipeExecutionEnvironment {

/**
* Returns the string command to be run by RDD 'pipe' method. The command executes in the context of each node's
* local worker dir. Must not reference files or dir by absolute paths. Instead a relative path within worker dir
* should be used or explicit file path retrieval using 'SparkFiles.get' method.
*
* @return String representing the command to be run by RDD 'pipe' method.
* @throws IOException
*/
String pipeCommand() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package eu.dnetlib.iis.common.spark.avro

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import org.apache.avro.Schema
import org.apache.avro.specific.SpecificRecordBase
import org.apache.spark.sql._
import org.apache.spark.sql.avro._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType

import scala.collection.JavaConverters._

/**
* Spark avro datasource supporting functions for dataframes.
*
* @param spark SparkSession instance.
*/
class AvroDataFrameSupport(val spark: SparkSession) extends Serializable {

/**
* Creates a dataframe from a given collection.
*
* @param data List with elements for the dataframe.
* @param avroSchema Avro schema of the elements.
* @tparam T Type of elements.
* @return DataFrame containing data from the given list.
*/
def createDataFrame[T](data: java.util.List[T], avroSchema: Schema): DataFrame = {
createDataFrame(data.asScala, avroSchema)
}

/**
* Creates a dataframe from a given collection.
*
* @param data Seq with elements for the dataframe.
* @param avroSchema Avro schema of the elements.
* @tparam T Type of elements.
* @return DataFrame containing data from the given seq.
*/
def createDataFrame[T](data: Seq[T], avroSchema: Schema): DataFrame = {
val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
val encoder = RowEncoder.apply(rowSchema).resolveAndBind()
val deserializer = new AvroDeserializer(avroSchema, rowSchema)
val rows = data.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow]))
spark.createDataFrame(spark.sparkContext.parallelize(rows), rowSchema)
}

/**
* Reads data as a dataframe from an avro data store using sql schema.
*
* @param path Path to the data store.
* @param schema SQL schema of the records.
* @return DataFrame with data read from given path.
*/
def read(path: String, schema: StructType): DataFrame = {
read(path, SchemaConverters.toAvroType(schema))
}

/**
* Reads data as a dataframe from an avro data store using avro schema.
*
* @param path Path to the data store.
* @param avroSchema Avro schema of the records.
* @return DataFrame with data read from given path.
*/
def read(path: String, avroSchema: Schema): DataFrame = {
spark.read
.format("avro")
.option("avroSchema", avroSchema.toString)
.load(path)
}

/**
* Writes a dataframe as an avro data store using an avro schema generated from sql schema.
*
* @param df DataFrame to be saved as avro data store.
* @param path Path to the data store.
* @return
*/
def write(df: DataFrame, path: String): Unit = {
write(df, path, SchemaConverters.toAvroType(df.schema))
}

/**
* Writes a dataframe as an avro data store using given avro schema.
*
* @param df DataFrame to be saved as avro data store.
* @param path Path to the data store.
* @param avroSchema Avro schema of the records.
*/
def write(df: DataFrame, path: String, avroSchema: Schema): Unit = {
df
.write
.format("avro")
.option("avroSchema", avroSchema.toString)
.save(path)
}

/**
* Creates a dataset from given dataframe using kryo encoder.
*
* NOTE: due to inability to use bean based encoder for avro types this method uses kryo encoder;
* for this reason this method creates objects by mapping rows to jsons and jsons to instances of objects.
*
* @param df DataFrame to be converted to a dataset.
* @param clazz Class of objects in the dataset.
* @tparam T Type of objects in the dataset.
* @return Dataset of objects corresponding to records in the given dataframe.
*/
def toDS[T <: SpecificRecordBase](df: DataFrame, clazz: Class[T]): Dataset[T] = {
implicit val encoder: Encoder[T] = Encoders.kryo(clazz)
val mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
df
.toJSON
.map(json => mapper.readValue(json, clazz))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package eu.dnetlib.iis.common.spark.avro

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import org.apache.avro.Schema
import org.apache.avro.specific.SpecificRecordBase
import org.apache.spark.sql._
import org.apache.spark.sql.avro._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType

/**
* Spark avro datasource supporting functions for datasets.
*
* @param spark SparkSession instance.
*/
class AvroDatasetSupport(val spark: SparkSession) extends Serializable {

/**
* Reads data as a dataset from an avro data store using kryo encoder.
*
* NOTE: due to inability to use bean based encoder for avro types this method uses kryo encoder;
* for this reason this method creates objects by mapping rows to jsons and jsons to instances of objects.
*
* @param path Path to the data store.
* @param avroSchema Avro schema of the records.
* @param clazz Class of objects in the dataset.
* @tparam T Type of objects in the dataset.
* @return Dataset with data read from given path.
*/
def read[T <: SpecificRecordBase](path: String, avroSchema: Schema, clazz: Class[T]): Dataset[T] = {
implicit val encoder: Encoder[T] = Encoders.kryo(clazz)
val mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
spark.read
.format("avro")
.option("avroSchema", avroSchema.toString)
.load(path)
.toJSON
.map(json => mapper.readValue(json, clazz))
}

/**
* Writes a dataset as an avro data store using an avro schema.
*
* @param ds Dataset to be saved as avro data store.
* @param path Path to the data store.
* @param avroSchema Avro schema of the records.
* @tparam T Type of objects in the dataset.
*/
def write[T <: SpecificRecordBase](ds: Dataset[T], path: String, avroSchema: Schema): Unit = {
toDF(ds, avroSchema)
.write
.format("avro")
.option("avroSchema", avroSchema.toString)
.save(path)
}

/**
* Creates a dataframe from given dataset.
*
* @param ds Dataset to be converted to a dataframe.
* @param avroSchema Avro schema of the records.
* @tparam T Type of objects in the dataset.
* @return DataFrame of objects corresponding to records in the given dataset.
*/
def toDF[T <: SpecificRecordBase](ds: Dataset[T], avroSchema: Schema): DataFrame = {
val avroSchemaStr = avroSchema.toString
val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
val encoder = RowEncoder(rowSchema).resolveAndBind()

object SerializationSupport extends Serializable {
@transient private lazy val deserializer = new AvroDeserializer(new Schema.Parser().parse(avroSchemaStr), rowSchema)
private val rows = ds.rdd.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow]))

def doToDF(): DataFrame = {
spark.createDataFrame(rows, rowSchema)
}
}

SerializationSupport.doToDF()
}
}

This file was deleted.

Loading

0 comments on commit 4323a80

Please sign in to comment.