Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/openaire/iis
Browse files Browse the repository at this point in the history
  • Loading branch information
LSmyrnaios committed Jul 24, 2020
2 parents 4f36049 + 4323a80 commit f5d286a
Show file tree
Hide file tree
Showing 56 changed files with 2,199 additions and 817 deletions.
5 changes: 5 additions & 0 deletions iis-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@
<artifactId>spark-sql_2.11</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
</dependency>

<dependency>
<groupId>pl.edu.icm.spark-utils</groupId>
<artifactId>spark-utils_2.11</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package eu.dnetlib.iis.common.lock;

import org.apache.hadoop.conf.Configuration;

import java.lang.reflect.Constructor;

public class LockManagerUtils {

private LockManagerUtils() {
}

public static LockManager instantiateLockManager(String lockManagerFactoryClassName,
Configuration config) throws Exception {
Class<?> clazz = Class.forName(lockManagerFactoryClassName);
Constructor<?> constructor = clazz.getConstructor();
LockManagerFactory lockManagerFactory = (LockManagerFactory) constructor.newInstance();
return lockManagerFactory.instantiate(config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package eu.dnetlib.iis.common.spark.pipe;

import java.io.IOException;

/**
* Abstraction of execution environment for scripts and commands run using 'pipe' method on RDD.
* <p>
* Classes implementing this interface should propagate any necessary files and directories to cluster nodes and define
* the command to be run by 'pipe' method. This interface allows to separate production code and test code by providing
* different implementation for each execution environment.
*/
public interface PipeExecutionEnvironment {

/**
* Returns the string command to be run by RDD 'pipe' method. The command executes in the context of each node's
* local worker dir. Must not reference files or dir by absolute paths. Instead a relative path within worker dir
* should be used or explicit file path retrieval using 'SparkFiles.get' method.
*
* @return String representing the command to be run by RDD 'pipe' method.
* @throws IOException
*/
String pipeCommand() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package eu.dnetlib.iis.common.spark.avro

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import org.apache.avro.Schema
import org.apache.avro.specific.SpecificRecordBase
import org.apache.spark.sql._
import org.apache.spark.sql.avro._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType

import scala.collection.JavaConverters._

/**
* Spark avro datasource supporting functions for dataframes.
*
* @param spark SparkSession instance.
*/
class AvroDataFrameSupport(val spark: SparkSession) extends Serializable {

/**
* Creates a dataframe from a given collection.
*
* @param data List with elements for the dataframe.
* @param avroSchema Avro schema of the elements.
* @tparam T Type of elements.
* @return DataFrame containing data from the given list.
*/
def createDataFrame[T](data: java.util.List[T], avroSchema: Schema): DataFrame = {
createDataFrame(data.asScala, avroSchema)
}

/**
* Creates a dataframe from a given collection.
*
* @param data Seq with elements for the dataframe.
* @param avroSchema Avro schema of the elements.
* @tparam T Type of elements.
* @return DataFrame containing data from the given seq.
*/
def createDataFrame[T](data: Seq[T], avroSchema: Schema): DataFrame = {
val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
val encoder = RowEncoder.apply(rowSchema).resolveAndBind()
val deserializer = new AvroDeserializer(avroSchema, rowSchema)
val rows = data.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow]))
spark.createDataFrame(spark.sparkContext.parallelize(rows), rowSchema)
}

/**
* Reads data as a dataframe from an avro data store using sql schema.
*
* @param path Path to the data store.
* @param schema SQL schema of the records.
* @return DataFrame with data read from given path.
*/
def read(path: String, schema: StructType): DataFrame = {
read(path, SchemaConverters.toAvroType(schema))
}

/**
* Reads data as a dataframe from an avro data store using avro schema.
*
* @param path Path to the data store.
* @param avroSchema Avro schema of the records.
* @return DataFrame with data read from given path.
*/
def read(path: String, avroSchema: Schema): DataFrame = {
spark.read
.format("avro")
.option("avroSchema", avroSchema.toString)
.load(path)
}

/**
* Writes a dataframe as an avro data store using an avro schema generated from sql schema.
*
* @param df DataFrame to be saved as avro data store.
* @param path Path to the data store.
* @return
*/
def write(df: DataFrame, path: String): Unit = {
write(df, path, SchemaConverters.toAvroType(df.schema))
}

/**
* Writes a dataframe as an avro data store using given avro schema.
*
* @param df DataFrame to be saved as avro data store.
* @param path Path to the data store.
* @param avroSchema Avro schema of the records.
*/
def write(df: DataFrame, path: String, avroSchema: Schema): Unit = {
df
.write
.format("avro")
.option("avroSchema", avroSchema.toString)
.save(path)
}

/**
* Creates a dataset from given dataframe using kryo encoder.
*
* NOTE: due to inability to use bean based encoder for avro types this method uses kryo encoder;
* for this reason this method creates objects by mapping rows to jsons and jsons to instances of objects.
*
* @param df DataFrame to be converted to a dataset.
* @param clazz Class of objects in the dataset.
* @tparam T Type of objects in the dataset.
* @return Dataset of objects corresponding to records in the given dataframe.
*/
def toDS[T <: SpecificRecordBase](df: DataFrame, clazz: Class[T]): Dataset[T] = {
implicit val encoder: Encoder[T] = Encoders.kryo(clazz)
val mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
df
.toJSON
.map(json => mapper.readValue(json, clazz))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package eu.dnetlib.iis.common.spark.avro

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import org.apache.avro.Schema
import org.apache.avro.specific.SpecificRecordBase
import org.apache.spark.sql._
import org.apache.spark.sql.avro._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType

/**
* Spark avro datasource supporting functions for datasets.
*
* @param spark SparkSession instance.
*/
class AvroDatasetSupport(val spark: SparkSession) extends Serializable {

/**
* Reads data as a dataset from an avro data store using kryo encoder.
*
* NOTE: due to inability to use bean based encoder for avro types this method uses kryo encoder;
* for this reason this method creates objects by mapping rows to jsons and jsons to instances of objects.
*
* @param path Path to the data store.
* @param avroSchema Avro schema of the records.
* @param clazz Class of objects in the dataset.
* @tparam T Type of objects in the dataset.
* @return Dataset with data read from given path.
*/
def read[T <: SpecificRecordBase](path: String, avroSchema: Schema, clazz: Class[T]): Dataset[T] = {
implicit val encoder: Encoder[T] = Encoders.kryo(clazz)
val mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
spark.read
.format("avro")
.option("avroSchema", avroSchema.toString)
.load(path)
.toJSON
.map(json => mapper.readValue(json, clazz))
}

/**
* Writes a dataset as an avro data store using an avro schema.
*
* @param ds Dataset to be saved as avro data store.
* @param path Path to the data store.
* @param avroSchema Avro schema of the records.
* @tparam T Type of objects in the dataset.
*/
def write[T <: SpecificRecordBase](ds: Dataset[T], path: String, avroSchema: Schema): Unit = {
toDF(ds, avroSchema)
.write
.format("avro")
.option("avroSchema", avroSchema.toString)
.save(path)
}

/**
* Creates a dataframe from given dataset.
*
* @param ds Dataset to be converted to a dataframe.
* @param avroSchema Avro schema of the records.
* @tparam T Type of objects in the dataset.
* @return DataFrame of objects corresponding to records in the given dataset.
*/
def toDF[T <: SpecificRecordBase](ds: Dataset[T], avroSchema: Schema): DataFrame = {
val avroSchemaStr = avroSchema.toString
val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
val encoder = RowEncoder(rowSchema).resolveAndBind()

object SerializationSupport extends Serializable {
@transient private lazy val deserializer = new AvroDeserializer(new Schema.Parser().parse(avroSchemaStr), rowSchema)
private val rows = ds.rdd.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow]))

def doToDF(): DataFrame = {
spark.createDataFrame(rows, rowSchema)
}
}

SerializationSupport.doToDF()
}
}

This file was deleted.

Loading

0 comments on commit f5d286a

Please sign in to comment.