diff --git a/iis-common/pom.xml b/iis-common/pom.xml index 04bc85c1a..cbab15013 100644 --- a/iis-common/pom.xml +++ b/iis-common/pom.xml @@ -70,6 +70,11 @@ spark-sql_2.11 + + org.apache.spark + spark-avro_2.11 + + pl.edu.icm.spark-utils spark-utils_2.11 diff --git a/iis-common/src/main/java/eu/dnetlib/iis/common/lock/LockManagerUtils.java b/iis-common/src/main/java/eu/dnetlib/iis/common/lock/LockManagerUtils.java new file mode 100644 index 000000000..d16be4fce --- /dev/null +++ b/iis-common/src/main/java/eu/dnetlib/iis/common/lock/LockManagerUtils.java @@ -0,0 +1,19 @@ +package eu.dnetlib.iis.common.lock; + +import org.apache.hadoop.conf.Configuration; + +import java.lang.reflect.Constructor; + +public class LockManagerUtils { + + private LockManagerUtils() { + } + + public static LockManager instantiateLockManager(String lockManagerFactoryClassName, + Configuration config) throws Exception { + Class clazz = Class.forName(lockManagerFactoryClassName); + Constructor constructor = clazz.getConstructor(); + LockManagerFactory lockManagerFactory = (LockManagerFactory) constructor.newInstance(); + return lockManagerFactory.instantiate(config); + } +} diff --git a/iis-common/src/main/java/eu/dnetlib/iis/common/spark/pipe/PipeExecutionEnvironment.java b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/pipe/PipeExecutionEnvironment.java new file mode 100644 index 000000000..eb1c904f7 --- /dev/null +++ b/iis-common/src/main/java/eu/dnetlib/iis/common/spark/pipe/PipeExecutionEnvironment.java @@ -0,0 +1,23 @@ +package eu.dnetlib.iis.common.spark.pipe; + +import java.io.IOException; + +/** + * Abstraction of execution environment for scripts and commands run using 'pipe' method on RDD. + *

+ * Classes implementing this interface should propagate any necessary files and directories to cluster nodes and define + * the command to be run by 'pipe' method. This interface allows to separate production code and test code by providing + * different implementation for each execution environment. + */ +public interface PipeExecutionEnvironment { + + /** + * Returns the string command to be run by RDD 'pipe' method. The command executes in the context of each node's + * local worker dir. Must not reference files or dir by absolute paths. Instead a relative path within worker dir + * should be used or explicit file path retrieval using 'SparkFiles.get' method. + * + * @return String representing the command to be run by RDD 'pipe' method. + * @throws IOException + */ + String pipeCommand() throws IOException; +} diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.scala new file mode 100644 index 000000000..b6d608a7f --- /dev/null +++ b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.scala @@ -0,0 +1,118 @@ +package eu.dnetlib.iis.common.spark.avro + +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import org.apache.avro.Schema +import org.apache.avro.specific.SpecificRecordBase +import org.apache.spark.sql._ +import org.apache.spark.sql.avro._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.types.StructType + +import scala.collection.JavaConverters._ + +/** + * Spark avro datasource supporting functions for dataframes. + * + * @param spark SparkSession instance. + */ +class AvroDataFrameSupport(val spark: SparkSession) extends Serializable { + + /** + * Creates a dataframe from a given collection. + * + * @param data List with elements for the dataframe. + * @param avroSchema Avro schema of the elements. + * @tparam T Type of elements. + * @return DataFrame containing data from the given list. + */ + def createDataFrame[T](data: java.util.List[T], avroSchema: Schema): DataFrame = { + createDataFrame(data.asScala, avroSchema) + } + + /** + * Creates a dataframe from a given collection. + * + * @param data Seq with elements for the dataframe. + * @param avroSchema Avro schema of the elements. + * @tparam T Type of elements. + * @return DataFrame containing data from the given seq. + */ + def createDataFrame[T](data: Seq[T], avroSchema: Schema): DataFrame = { + val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] + val encoder = RowEncoder.apply(rowSchema).resolveAndBind() + val deserializer = new AvroDeserializer(avroSchema, rowSchema) + val rows = data.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow])) + spark.createDataFrame(spark.sparkContext.parallelize(rows), rowSchema) + } + + /** + * Reads data as a dataframe from an avro data store using sql schema. + * + * @param path Path to the data store. + * @param schema SQL schema of the records. + * @return DataFrame with data read from given path. + */ + def read(path: String, schema: StructType): DataFrame = { + read(path, SchemaConverters.toAvroType(schema)) + } + + /** + * Reads data as a dataframe from an avro data store using avro schema. + * + * @param path Path to the data store. + * @param avroSchema Avro schema of the records. + * @return DataFrame with data read from given path. + */ + def read(path: String, avroSchema: Schema): DataFrame = { + spark.read + .format("avro") + .option("avroSchema", avroSchema.toString) + .load(path) + } + + /** + * Writes a dataframe as an avro data store using an avro schema generated from sql schema. + * + * @param df DataFrame to be saved as avro data store. + * @param path Path to the data store. + * @return + */ + def write(df: DataFrame, path: String): Unit = { + write(df, path, SchemaConverters.toAvroType(df.schema)) + } + + /** + * Writes a dataframe as an avro data store using given avro schema. + * + * @param df DataFrame to be saved as avro data store. + * @param path Path to the data store. + * @param avroSchema Avro schema of the records. + */ + def write(df: DataFrame, path: String, avroSchema: Schema): Unit = { + df + .write + .format("avro") + .option("avroSchema", avroSchema.toString) + .save(path) + } + + /** + * Creates a dataset from given dataframe using kryo encoder. + * + * NOTE: due to inability to use bean based encoder for avro types this method uses kryo encoder; + * for this reason this method creates objects by mapping rows to jsons and jsons to instances of objects. + * + * @param df DataFrame to be converted to a dataset. + * @param clazz Class of objects in the dataset. + * @tparam T Type of objects in the dataset. + * @return Dataset of objects corresponding to records in the given dataframe. + */ + def toDS[T <: SpecificRecordBase](df: DataFrame, clazz: Class[T]): Dataset[T] = { + implicit val encoder: Encoder[T] = Encoders.kryo(clazz) + val mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + df + .toJSON + .map(json => mapper.readValue(json, clazz)) + } +} diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupport.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupport.scala new file mode 100644 index 000000000..f2e842a4e --- /dev/null +++ b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupport.scala @@ -0,0 +1,82 @@ +package eu.dnetlib.iis.common.spark.avro + +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import org.apache.avro.Schema +import org.apache.avro.specific.SpecificRecordBase +import org.apache.spark.sql._ +import org.apache.spark.sql.avro._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.types.StructType + +/** + * Spark avro datasource supporting functions for datasets. + * + * @param spark SparkSession instance. + */ +class AvroDatasetSupport(val spark: SparkSession) extends Serializable { + + /** + * Reads data as a dataset from an avro data store using kryo encoder. + * + * NOTE: due to inability to use bean based encoder for avro types this method uses kryo encoder; + * for this reason this method creates objects by mapping rows to jsons and jsons to instances of objects. + * + * @param path Path to the data store. + * @param avroSchema Avro schema of the records. + * @param clazz Class of objects in the dataset. + * @tparam T Type of objects in the dataset. + * @return Dataset with data read from given path. + */ + def read[T <: SpecificRecordBase](path: String, avroSchema: Schema, clazz: Class[T]): Dataset[T] = { + implicit val encoder: Encoder[T] = Encoders.kryo(clazz) + val mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + spark.read + .format("avro") + .option("avroSchema", avroSchema.toString) + .load(path) + .toJSON + .map(json => mapper.readValue(json, clazz)) + } + + /** + * Writes a dataset as an avro data store using an avro schema. + * + * @param ds Dataset to be saved as avro data store. + * @param path Path to the data store. + * @param avroSchema Avro schema of the records. + * @tparam T Type of objects in the dataset. + */ + def write[T <: SpecificRecordBase](ds: Dataset[T], path: String, avroSchema: Schema): Unit = { + toDF(ds, avroSchema) + .write + .format("avro") + .option("avroSchema", avroSchema.toString) + .save(path) + } + + /** + * Creates a dataframe from given dataset. + * + * @param ds Dataset to be converted to a dataframe. + * @param avroSchema Avro schema of the records. + * @tparam T Type of objects in the dataset. + * @return DataFrame of objects corresponding to records in the given dataset. + */ + def toDF[T <: SpecificRecordBase](ds: Dataset[T], avroSchema: Schema): DataFrame = { + val avroSchemaStr = avroSchema.toString + val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] + val encoder = RowEncoder(rowSchema).resolveAndBind() + + object SerializationSupport extends Serializable { + @transient private lazy val deserializer = new AvroDeserializer(new Schema.Parser().parse(avroSchemaStr), rowSchema) + private val rows = ds.rdd.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow])) + + def doToDF(): DataFrame = { + spark.createDataFrame(rows, rowSchema) + } + } + + SerializationSupport.doToDF() + } +} diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroSaver.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroSaver.scala deleted file mode 100644 index 1e3ffd650..000000000 --- a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroSaver.scala +++ /dev/null @@ -1,140 +0,0 @@ -package eu.dnetlib.iis.common.spark.avro - -import java.sql.Timestamp -import java.util -import java.util.HashMap - -import org.apache.avro.generic.GenericData.Record -import org.apache.avro.generic.GenericRecord -import org.apache.avro.mapred.{AvroJob, AvroKey, AvroOutputFormat, AvroWrapper} -import org.apache.avro.{Schema, SchemaBuilder} -import org.apache.hadoop.io.NullWritable -import org.apache.hadoop.mapred.JobConf -import org.apache.spark.rdd.RDD.rddToPairRDDFunctions -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, Encoder, Encoders, Row} - -import scala.collection.immutable.Map - -/** - * Based on com.databricks.spark.avro.AvroSaver. This version takes avroSchema as a parameter - * to the save method - thanks to that the schema is not a generic one ([[org.apache.avro.generic.GenericRecord]]) - * generated from a data frame schema - * - * This object provides a save() method that is used to save DataFrame as avro file. - * To do this, we first convert the schema and then convert each row of the RDD to corresponding - * avro types. One remark worth mentioning is the structName parameter that functions have. Avro - * records have a name associated with them, which must be unique. Since SturctType in sparkSQL - * doesn't have a name associated with it, we are taking the name of the last structure field that - * the current structure is a child of. For example if the row at the top level had a field called - * "X", which happens to be a structure, we would call that structure "X". When we process original - * rows, they get a name "topLevelRecord". - */ -object AvroSaver { - - def save(dataFrame: DataFrame, avroSchema: Schema, location: String): Unit = { - val jobConf = new JobConf(dataFrame.sqlContext.sparkContext.hadoopConfiguration) - val builder = SchemaBuilder.record("topLevelRecord") - val schema = dataFrame.schema - AvroJob.setOutputSchema(jobConf, avroSchema) - - implicit val encoder: Encoder[(AvroKey[GenericRecord], NullWritable)] = - Encoders.tuple(Encoders.kryo(classOf[AvroKey[GenericRecord]]), Encoders.kryo(classOf[NullWritable])) - - dataFrame - .mapPartitions(rowsToAvro(_, schema)) - .rdd - .saveAsHadoopFile(location, - classOf[AvroWrapper[GenericRecord]], - classOf[NullWritable], - classOf[AvroOutputFormat[GenericRecord]], - jobConf) - } - - private def rowsToAvro(rows: Iterator[Row], - schema: StructType): Iterator[(AvroKey[GenericRecord], NullWritable)] = { - val converter = createConverter(schema, "topLevelRecord") - rows.map(x => (new AvroKey(converter(x).asInstanceOf[GenericRecord]), - NullWritable.get())) - } - - /** - * This function constructs converter function for a given sparkSQL datatype. These functions - * will be used to convert dataFrame to avro format. - */ - def createConverter(dataType: DataType, structName: String): Any => Any = { - dataType match { - case ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | StringType | - BinaryType | BooleanType => - (item: Any) => item - - case _: DecimalType => - (item: Any) => if (item == null) null else item.toString - - case TimestampType => - (item: Any) => { - if (item == null) null else item.asInstanceOf[Timestamp].getTime - } - - case ArrayType(elementType, _) => - val elementConverter = createConverter(elementType, structName) - - (item: Any) => { - if (item == null) { - null - } else { - val sourceArray = item.asInstanceOf[Seq[Any]] - val sourceArraySize = sourceArray.size - val targetArray = new Array[Any](sourceArraySize) - var idx = 0 - - while (idx < sourceArraySize) { - targetArray(idx) = elementConverter(sourceArray(idx)) - idx += 1 - } - - targetArray - } - } - - case MapType(StringType, valueType, _) => - val valueConverter = createConverter(valueType, structName) - - (item: Any) => { - if (item == null) { - null - } else { - val javaMap = new util.HashMap[String, Any]() - item.asInstanceOf[Map[String, Any]].foreach { case (key, value) => - javaMap.put(key, valueConverter(value)) - } - javaMap - } - } - - case structType: StructType => - val builder = SchemaBuilder.record(structName) - val schema: Schema = SchemaConverters.convertStructToAvro( - structType, builder) - val fieldConverters = structType.fields.map(field => - createConverter(field.dataType, field.name)) - - (item: Any) => { - if (item == null) { - null - } else { - val record = new Record(schema) - val convertersIterator = fieldConverters.iterator - val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator - val rowIterator = item.asInstanceOf[Row].toSeq.iterator - - while (convertersIterator.hasNext) { - val converter = convertersIterator.next - record.put(fieldNamesIterator.next, converter(rowIterator.next)) - } - record - } - } - } - } -} diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/SchemaConverters.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/SchemaConverters.scala deleted file mode 100644 index e170cd09d..000000000 --- a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/SchemaConverters.scala +++ /dev/null @@ -1,178 +0,0 @@ -package eu.dnetlib.iis.common.spark.avro - -import org.apache.avro.Schema.Type._ -import org.apache.avro.SchemaBuilder._ -import org.apache.avro.{Schema, SchemaBuilder} -import org.apache.spark.sql.types._ - -import scala.collection.JavaConversions._ - -/** - * This object contains method that are used to convert sparkSQL schemas to avro schemas and vice - * versa. - */ -private object SchemaConverters { - - case class SchemaType(dataType: DataType, nullable: Boolean) - - /** - * This function takes an avro schema and returns a sql schema. - */ - private[avro] def toSqlType(avroSchema: Schema): SchemaType = { - avroSchema.getType match { - case INT => SchemaType(IntegerType, nullable = false) - case STRING => SchemaType(StringType, nullable = false) - case BOOLEAN => SchemaType(BooleanType, nullable = false) - case BYTES => SchemaType(BinaryType, nullable = false) - case DOUBLE => SchemaType(DoubleType, nullable = false) - case FLOAT => SchemaType(FloatType, nullable = false) - case LONG => SchemaType(LongType, nullable = false) - case FIXED => SchemaType(BinaryType, nullable = false) - case ENUM => SchemaType(StringType, nullable = false) - - case RECORD => - val fields = avroSchema.getFields.map { f => - val schemaType = toSqlType(f.schema()) - StructField(f.name, schemaType.dataType, schemaType.nullable) - } - - SchemaType(StructType(fields), nullable = false) - - case ARRAY => - val schemaType = toSqlType(avroSchema.getElementType) - SchemaType( - ArrayType(schemaType.dataType, containsNull = schemaType.nullable), - nullable = false) - - case MAP => - val schemaType = toSqlType(avroSchema.getValueType) - SchemaType( - MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable), - nullable = false) - - case UNION => - if (avroSchema.getTypes.exists(_.getType == NULL)) { - // In case of a union with null, eliminate it and make a recursive call - val remainingUnionTypes = avroSchema.getTypes.filterNot(_.getType == NULL) - if (remainingUnionTypes.size == 1) { - toSqlType(remainingUnionTypes.get(0)).copy(nullable = true) - } else { - toSqlType(Schema.createUnion(remainingUnionTypes)).copy(nullable = true) - } - } else avroSchema.getTypes.map(_.getType) match { - case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) => - SchemaType(LongType, nullable = false) - case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) => - SchemaType(DoubleType, nullable = false) - case other => - sys.error(s"This mix of union types is not supported (see README): $other") - } - - case other => sys.error(s"Unsupported type $other") - } - } - - /** - * This function converts sparkSQL StructType into avro schema. This method uses two other - * converter methods in order to do the conversion. - */ - private[avro] def convertStructToAvro[T](structType: StructType, - schemaBuilder: RecordBuilder[T]): T = { - val fieldsAssembler: FieldAssembler[T] = schemaBuilder.fields() - structType.fields.foreach { field => - val newField = fieldsAssembler.name(field.name).`type`() - - if (field.nullable) { - convertFieldTypeToAvro(field.dataType, newField.nullable(), field.name).noDefault - } else { - convertFieldTypeToAvro(field.dataType, newField, field.name).noDefault - } - } - fieldsAssembler.endRecord() - } - - /** - * This function is used to convert some sparkSQL type to avro type. Note that this function won't - * be used to construct fields of avro record (convertFieldTypeToAvro is used for that). - */ - private def convertTypeToAvro[T]( - dataType: DataType, - schemaBuilder: BaseTypeBuilder[T], - structName: String): T = { - dataType match { - case ByteType => schemaBuilder.intType() - case ShortType => schemaBuilder.intType() - case IntegerType => schemaBuilder.intType() - case LongType => schemaBuilder.longType() - case FloatType => schemaBuilder.floatType() - case DoubleType => schemaBuilder.doubleType() - case _: DecimalType => schemaBuilder.stringType() - case StringType => schemaBuilder.stringType() - case BinaryType => schemaBuilder.bytesType() - case BooleanType => schemaBuilder.booleanType() - case TimestampType => schemaBuilder.longType() - - case ArrayType(elementType, _) => - val builder = getSchemaBuilder(dataType.asInstanceOf[ArrayType].containsNull) - val elementSchema = convertTypeToAvro(elementType, builder, structName) - schemaBuilder.array().items(elementSchema) - - case MapType(StringType, valueType, _) => - val builder = getSchemaBuilder(dataType.asInstanceOf[MapType].valueContainsNull) - val valueSchema = convertTypeToAvro(valueType, builder, structName) - schemaBuilder.map().values(valueSchema) - - case structType: StructType => - convertStructToAvro(structType, schemaBuilder.record(structName)) - - case other => throw new IllegalArgumentException(s"Unexpected type $dataType.") - } - } - - /** - * This function is used to construct fields of the avro record, where schema of the field is - * specified by avro representation of dataType. Since builders for record fields are different - * from those for everything else, we have to use a separate method. - */ - private def convertFieldTypeToAvro[T]( - dataType: DataType, - newFieldBuilder: BaseFieldTypeBuilder[T], - structName: String): FieldDefault[T, _] = { - dataType match { - case ByteType => newFieldBuilder.intType() - case ShortType => newFieldBuilder.intType() - case IntegerType => newFieldBuilder.intType() - case LongType => newFieldBuilder.longType() - case FloatType => newFieldBuilder.floatType() - case DoubleType => newFieldBuilder.doubleType() - case _: DecimalType => newFieldBuilder.stringType() - case StringType => newFieldBuilder.stringType() - case BinaryType => newFieldBuilder.bytesType() - case BooleanType => newFieldBuilder.booleanType() - case TimestampType => newFieldBuilder.longType() - - case ArrayType(elementType, _) => - val builder = getSchemaBuilder(dataType.asInstanceOf[ArrayType].containsNull) - val elementSchema = convertTypeToAvro(elementType, builder, structName) - newFieldBuilder.array().items(elementSchema) - - case MapType(StringType, valueType, _) => - val builder = getSchemaBuilder(dataType.asInstanceOf[MapType].valueContainsNull) - val valueSchema = convertTypeToAvro(valueType, builder, structName) - newFieldBuilder.map().values(valueSchema) - - case structType: StructType => - convertStructToAvro(structType, newFieldBuilder.record(structName)) - - case other => throw new IllegalArgumentException(s"Unexpected type $dataType.") - } - } - - private def getSchemaBuilder(isNullable: Boolean): BaseTypeBuilder[Schema] = { - if (isNullable) { - SchemaBuilder.builder().nullable() - } else { - SchemaBuilder.builder() - } - } -} diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupportTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupportTest.java new file mode 100644 index 000000000..e60d7102a --- /dev/null +++ b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupportTest.java @@ -0,0 +1,179 @@ +package eu.dnetlib.iis.common.spark.avro; + +import eu.dnetlib.iis.common.avro.Person; +import eu.dnetlib.iis.common.utils.AvroTestUtils; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.avro.SchemaConverters; +import org.apache.spark.sql.types.StructType; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class AvroDataFrameSupportTest { + + private static SparkSession spark; + private static AvroDataFrameSupport avroDataFrameSupport; + private static Path workingDir; + + @BeforeClass + public static void beforeClass() throws IOException { + SparkConf conf = new SparkConf(); + conf.setMaster("local"); + conf.set("spark.driver.host", "localhost"); + conf.setAppName(AvroDataFrameSupportTest.class.getSimpleName()); + spark = SparkSession.builder().config(conf).getOrCreate(); + avroDataFrameSupport = new AvroDataFrameSupport(spark); + workingDir = Files.createTempDirectory(AvroDataFrameSupportTest.class.getSimpleName()); + } + + @AfterClass + public static void afterClass() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + } + + @Test + public void createDataFrameShouldRunProperly() { + // given + Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); + List data = Collections.singletonList(person); + + // when + Dataset result = avroDataFrameSupport.createDataFrame(data, Person.SCHEMA$); + + // then + assertSchemasEqualIgnoringNullability(Person.SCHEMA$, result.schema()); + List rows = result.collectAsList(); + assertEquals(1, rows.size()); + Row row = rows.get(0); + assertEquals(person.getId(), row.getAs("id")); + assertEquals(person.getName(), row.getAs("name")); + assertEquals(person.getAge(), row.getAs("age")); + } + + @Test + public void readShouldReadProperlyUsingSqlSchema() throws IOException { + // given + Path inputDir = workingDir.resolve("input"); + Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); + List data = Collections.singletonList(person); + AvroTestUtils.createLocalAvroDataStore(data, inputDir.toString(), Person.class); + + // when + Dataset result = avroDataFrameSupport.read(inputDir.toString(), + (StructType) SchemaConverters.toSqlType(Person.SCHEMA$).dataType()); + + // then + assertSchemasEqualIgnoringNullability(Person.SCHEMA$, result.schema()); + List rows = result.collectAsList(); + assertEquals(1, rows.size()); + Row row = rows.get(0); + assertEquals(person.getId(), row.getAs("id")); + assertEquals(person.getName(), row.getAs("name")); + assertEquals(person.getAge(), row.getAs("age")); + } + + @Test + public void readShouldReadProperlyUsingAvroSchema() throws IOException { + // given + Path inputDir = workingDir.resolve("input"); + Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); + List data = Collections.singletonList(person); + AvroTestUtils.createLocalAvroDataStore(data, inputDir.toString(), Person.class); + + // when + Dataset result = avroDataFrameSupport.read(inputDir.toString(), Person.SCHEMA$); + + // then + assertSchemasEqualIgnoringNullability(Person.SCHEMA$, result.schema()); + List rows = result.collectAsList(); + assertEquals(1, rows.size()); + Row row = rows.get(0); + assertEquals(person.getId(), row.getAs("id")); + assertEquals(person.getName(), row.getAs("name")); + assertEquals(person.getAge(), row.getAs("age")); + } + + @Test + public void writeShouldRunProperlyUsingSqlSchema() throws IOException { + // given + Path outputDir = workingDir.resolve("output1"); + Row personRow = RowFactory.create(1, "name", 2); + Dataset df = spark.createDataFrame( + Collections.singletonList(personRow), + (StructType) SchemaConverters.toSqlType(Person.SCHEMA$).dataType() + ); + + // when + avroDataFrameSupport.write(df, outputDir.toString()); + + // then + List genericRecordList = AvroTestUtils.readLocalAvroDataStore(outputDir.toString()); + assertEquals(1, genericRecordList.size()); + GenericRecord genericRecord = genericRecordList.get(0); + assertEquals(personRow.getAs(0), genericRecord.get(0)); + assertEquals(personRow.getAs(1).toString(), genericRecord.get(1).toString()); + assertEquals(personRow.getAs(2), genericRecord.get(2)); + } + + @Test + public void writeShouldRunProperlyUsingAvroSchema() throws IOException { + // given + Path outputDir = workingDir.resolve("output2"); + Row personRow = RowFactory.create(1, "name", 2); + Dataset df = spark.createDataFrame( + Collections.singletonList(personRow), + (StructType) SchemaConverters.toSqlType(Person.SCHEMA$).dataType() + ); + + // when + avroDataFrameSupport.write(df, outputDir.toString(), Person.SCHEMA$); + + // then + List personList = AvroTestUtils.readLocalAvroDataStore(outputDir.toString()); + assertEquals(1, personList.size()); + Person person = personList.get(0); + assertEquals(personRow.getAs(0), person.getId()); + assertEquals(personRow.getAs(1), person.getName()); + assertEquals(personRow.getAs(2), person.getAge()); + } + + @Test + public void toDSShouldRunProperly() { + // given + Row personRow = RowFactory.create(1, "name", 2); + List data = Collections.singletonList(personRow); + Dataset df = spark.createDataFrame( + data, (StructType) SchemaConverters.toSqlType(Person.SCHEMA$).dataType() + ); + + // when + Dataset result = avroDataFrameSupport.toDS(df, Person.class); + + // then + List personList = result.collectAsList(); + assertEquals(1, personList.size()); + Person person = personList.get(0); + assertEquals(personRow.getAs(0), person.getId()); + assertEquals(personRow.getAs(1), person.getName()); + assertEquals(personRow.getAs(2), person.getAge()); + } + + public static void assertSchemasEqualIgnoringNullability(Schema avroSchema, StructType sqlSchema) { + assertEquals(SchemaConverters.toSqlType(avroSchema).dataType().asNullable(), sqlSchema.asNullable()); + } +} \ No newline at end of file diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupportTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupportTest.java new file mode 100644 index 000000000..164576be9 --- /dev/null +++ b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupportTest.java @@ -0,0 +1,110 @@ +package eu.dnetlib.iis.common.spark.avro; + +import eu.dnetlib.iis.common.avro.Person; +import eu.dnetlib.iis.common.utils.AvroTestUtils; +import org.apache.avro.Schema; +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.avro.SchemaConverters; +import org.apache.spark.sql.types.StructType; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class AvroDatasetSupportTest { + private static SparkSession spark; + private static AvroDatasetSupport avroDatasetSupport; + private static Path workingDir; + + @BeforeClass + public static void beforeClass() throws IOException { + SparkConf conf = new SparkConf(); + conf.setMaster("local"); + conf.set("spark.driver.host", "localhost"); + conf.setAppName(AvroDatasetSupportTest.class.getSimpleName()); + spark = SparkSession.builder().config(conf).getOrCreate(); + avroDatasetSupport = new AvroDatasetSupport(spark); + workingDir = Files.createTempDirectory(AvroDatasetSupportTest.class.getSimpleName()); + } + + @AfterClass + public static void afterClass() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + } + + @Test + public void readShouldRunProperly() throws IOException { + // given + Path inputDir = workingDir.resolve("input"); + Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); + List data = Collections.singletonList(person); + AvroTestUtils.createLocalAvroDataStore(data, inputDir.toString(), Person.class); + + // when + Dataset result = avroDatasetSupport.read(inputDir.toString(), Person.SCHEMA$, Person.class); + + // then + List personList = result.collectAsList(); + assertEquals(1, personList.size()); + Person personRead = personList.get(0); + assertEquals(person, personRead); + } + + @Test + public void writeShouldRunProperly() throws IOException { + // given + Path outputDir = workingDir.resolve("output"); + Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); + Dataset ds = spark.createDataset( + Collections.singletonList(person), + Encoders.kryo(Person.class) + ); + + // when + avroDatasetSupport.write(ds, outputDir.toString(), Person.SCHEMA$); + + // then + List personList = AvroTestUtils.readLocalAvroDataStore(outputDir.toString()); + assertEquals(1, personList.size()); + Person personRead = personList.get(0); + assertEquals(person, personRead); + } + + @Test + public void toDFShouldRunProperly() { + // given + Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); + Dataset ds = spark.createDataset( + Collections.singletonList(person), + Encoders.kryo(Person.class) + ); + + // when + Dataset result = avroDatasetSupport.toDF(ds, Person.SCHEMA$); + + // then + assertSchemasEqualIgnoringNullability(Person.SCHEMA$, result.schema()); + List rows = result.collectAsList(); + assertEquals(1, rows.size()); + Row row = rows.get(0); + assertEquals(person.getId(), row.getAs("id")); + assertEquals(person.getName(), row.getAs("name")); + assertEquals(person.getAge(), row.getAs("age")); + } + + public static void assertSchemasEqualIgnoringNullability(Schema avroSchema, StructType sqlSchema) { + assertEquals(SchemaConverters.toSqlType(avroSchema).dataType().asNullable(), sqlSchema.asNullable()); + } +} \ No newline at end of file diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroSaverTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroSaverTest.java deleted file mode 100644 index 49dd3d085..000000000 --- a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroSaverTest.java +++ /dev/null @@ -1,90 +0,0 @@ -package eu.dnetlib.iis.common.spark.avro; - -import eu.dnetlib.iis.common.IntegrationTest; -import eu.dnetlib.iis.common.avro.Country; -import eu.dnetlib.iis.common.spark.SparkSessionFactory; -import eu.dnetlib.iis.common.utils.AvroTestUtils; -import org.apache.commons.io.FileUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import pl.edu.icm.sparkutils.test.SparkJob; -import pl.edu.icm.sparkutils.test.SparkJobBuilder; -import pl.edu.icm.sparkutils.test.SparkJobExecutor; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.util.List; - -import static org.junit.Assert.assertEquals; - -/** - * @author Ɓukasz Dumiszewski - */ -@Category(IntegrationTest.class) -public class AvroSaverTest { - - private SparkJobExecutor executor = new SparkJobExecutor(); - - private static File workingDir; - - private static String outputDirPath; - - @Before - public void before() throws IOException { - workingDir = Files.createTempDirectory(AvroSaverTest.class.getSimpleName() + "_").toFile(); - outputDirPath = workingDir + "/spark_sql_avro_cloner/output"; - } - - @After - public void after() throws IOException { - FileUtils.deleteDirectory(workingDir); - } - - //------------------------ TESTS -------------------------- - - @Test - public void test() throws IOException { - // given - SparkJob sparkJob = SparkJobBuilder - .create() - .setAppName("Spark Avro Saver Test") - .addJobProperty("spark.driver.host", "localhost") - .setMainClass(AvroSaverTest.class) - .build(); - - // execute - executor.execute(sparkJob); - - // assert - List countries = AvroTestUtils.readLocalAvroDataStore(outputDirPath); - assertEquals(4, countries.size()); - assertEquals(1, countries.stream().filter(c -> c.getIso().equals("PL")).count()); - } - - //------------------------ LOGIC -------------------------- - - public static void main(String[] args) { - SparkConf conf = new SparkConf(); - conf.set("spark.driver.host", "localhost"); - - try (SparkSession spark = SparkSessionFactory.withConfAndKryo(conf)) { - Dataset countries = spark.read() - .json("src/test/resources/eu/dnetlib/iis/common/avro/countries.json"); - - // without these 2 lines below there is no guarantee as to the field order and then - // they can be saved not in accordance with avro schema - countries.registerTempTable("countries"); - countries = spark - .sql("select id, name, iso from countries"); - - AvroSaver.save(countries, Country.SCHEMA$, outputDirPath); - } - } -} diff --git a/iis-schemas/src/main/avro/eu/dnetlib/iis/referenceextraction/project/DocumentHash.avdl b/iis-schemas/src/main/avro/eu/dnetlib/iis/referenceextraction/project/DocumentHash.avdl new file mode 100644 index 000000000..1e2da671d --- /dev/null +++ b/iis-schemas/src/main/avro/eu/dnetlib/iis/referenceextraction/project/DocumentHash.avdl @@ -0,0 +1,8 @@ +@namespace("eu.dnetlib.iis.referenceextraction.project.schemas") +protocol IIS{ + + record DocumentHash { +// document hash value calculated as sha1 hash of 'title|abstract|text' + string hashValue; + } +} \ No newline at end of file diff --git a/iis-schemas/src/main/avro/eu/dnetlib/iis/referenceextraction/project/DocumentHashToProject.avdl b/iis-schemas/src/main/avro/eu/dnetlib/iis/referenceextraction/project/DocumentHashToProject.avdl new file mode 100644 index 000000000..490828b9e --- /dev/null +++ b/iis-schemas/src/main/avro/eu/dnetlib/iis/referenceextraction/project/DocumentHashToProject.avdl @@ -0,0 +1,18 @@ +@namespace("eu.dnetlib.iis.referenceextraction.project.schemas") +protocol IIS{ + + record DocumentHashToProject { +// document hash value calculated as sha1 hash of 'title|abstract|text' + string hashValue; + +// identifier of project being funding source of this document, +// foreign key: Project.id + string projectId; + +// Find more details on `confidenceLevel` constraints in eu/dnetlib/iis/README.markdown file. + float confidenceLevel; + +// text snippet surrounding the matched reference, required mostly for internal debugging and analytics + union { null , string } textsnippet = null; + } +} \ No newline at end of file diff --git a/iis-schemas/src/main/avro/eu/dnetlib/iis/referenceextraction/project/DocumentMetadata.avdl b/iis-schemas/src/main/avro/eu/dnetlib/iis/referenceextraction/project/DocumentMetadata.avdl deleted file mode 100644 index db43f61c4..000000000 --- a/iis-schemas/src/main/avro/eu/dnetlib/iis/referenceextraction/project/DocumentMetadata.avdl +++ /dev/null @@ -1,18 +0,0 @@ -@namespace("eu.dnetlib.iis.referenceextraction.project.schemas") -protocol IIS{ - - record DocumentMetadata { -// ID of the document, foreign key: Document.id ("document" data store) - string id; - -// document title - union { null , string } title = null; - -// abstract or description - union { null , string } abstract = null; - -// document plaintext - union { null , string } text = null; - } - -} diff --git a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/main/oozie_app/workflow.xml b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/main/oozie_app/workflow.xml index 17965ee25..39342c395 100644 --- a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/main/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/main/oozie_app/workflow.xml @@ -199,6 +199,10 @@ Set to 'a^' by default to guarantee nothing will be matched. + + tara_cache_root_dir + tara reference extraction cache root directory + export_action_set_id_document_similarities_standard diff --git a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/processing/oozie_app/workflow.xml b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/processing/oozie_app/workflow.xml index 414a8b676..1cc5a9c28 100644 --- a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/processing/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/processing/oozie_app/workflow.xml @@ -124,6 +124,10 @@ Set to 'a^' by default to guarantee nothing will be matched. + + tara_cache_root_dir + tara reference extraction cache root directory + software_webcrawl_cache_location diff --git a/iis-wf/iis-wf-primary/src/test/resources/eu/dnetlib/iis/wf/primary/processing/sampledataproducer/oozie_app/workflow.xml b/iis-wf/iis-wf-primary/src/test/resources/eu/dnetlib/iis/wf/primary/processing/sampledataproducer/oozie_app/workflow.xml index 9988af9ee..d48b21732 100644 --- a/iis-wf/iis-wf-primary/src/test/resources/eu/dnetlib/iis/wf/primary/processing/sampledataproducer/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-primary/src/test/resources/eu/dnetlib/iis/wf/primary/processing/sampledataproducer/oozie_app/workflow.xml @@ -406,7 +406,15 @@ dataset_datacite_sqlite_builder_java_opts -Xmx1g - + + tara_lock_manager_factory_class_name + eu.dnetlib.iis.wf.primary.processing.LockManagerFactoryMock + + + tara_cache_root_dir + ${workingDir}/cache/tara + + webcrawlContentRetrieverClassName eu.dnetlib.iis.wf.primary.processing.ClasspathContentRetriever @@ -416,7 +424,7 @@ software_webcrawl_cache_location - ${workingDir} + ${workingDir}/cache/software_webcrawl metric_pusher_creator_class_name diff --git a/iis-wf/iis-wf-referenceextraction/pom.xml b/iis-wf/iis-wf-referenceextraction/pom.xml index 52aa06876..febb0a7b0 100644 --- a/iis-wf/iis-wf-referenceextraction/pom.xml +++ b/iis-wf/iis-wf-referenceextraction/pom.xml @@ -74,6 +74,11 @@ spark-sql_2.11 + + org.apache.spark + spark-avro_2.11 + + pl.edu.icm.spark-utils spark-utils_2.11 diff --git a/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/project/input/TaraReferenceExtractionInputTransformerJob.java b/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/project/input/TaraReferenceExtractionInputTransformerJob.java deleted file mode 100644 index 25d2a473c..000000000 --- a/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/project/input/TaraReferenceExtractionInputTransformerJob.java +++ /dev/null @@ -1,90 +0,0 @@ -package eu.dnetlib.iis.wf.referenceextraction.project.input; - -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import com.beust.jcommander.Parameters; -import eu.dnetlib.iis.common.java.io.HdfsUtils; -import eu.dnetlib.iis.common.spark.JavaSparkContextFactory; -import eu.dnetlib.iis.metadataextraction.schemas.DocumentText; -import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentMetadata; -import eu.dnetlib.iis.transformers.metadatamerger.schemas.ExtractedDocumentMetadataMergedWithOriginal; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.Optional; -import pl.edu.icm.sparkutils.avro.SparkAvroLoader; -import pl.edu.icm.sparkutils.avro.SparkAvroSaver; -import scala.Tuple2; - -import java.io.IOException; - -/** - * - * @author mhorst - * - */ -public class TaraReferenceExtractionInputTransformerJob { - - private static SparkAvroLoader avroLoader = new SparkAvroLoader(); - private static SparkAvroSaver avroSaver = new SparkAvroSaver(); - - //------------------------ LOGIC -------------------------- - - public static void main(String[] args) throws IOException { - TaraReferenceExtractionInputTransformerJobParameters params = new TaraReferenceExtractionInputTransformerJobParameters(); - JCommander jcommander = new JCommander(params); - jcommander.parse(args); - - try (JavaSparkContext sc = JavaSparkContextFactory.withConfAndKryo(new SparkConf())) { - HdfsUtils.remove(sc.hadoopConfiguration(), params.output); - - JavaRDD inputMeta = avroLoader - .loadJavaRDD(sc, params.inputMetadata, ExtractedDocumentMetadataMergedWithOriginal.class); - - JavaRDD inputText = avroLoader - .loadJavaRDD(sc, params.inputText, DocumentText.class); - - JavaPairRDD> idToTitleAndAbstract = inputMeta - .mapToPair(x -> new Tuple2<>(x.getId(), new Tuple2<>(x.getTitle(), x.getAbstract$()))); - JavaPairRDD idToText = inputText - .mapToPair(x -> new Tuple2<>(x.getId(), x.getText())); - JavaPairRDD>>> joined = idToText - .leftOuterJoin(idToTitleAndAbstract); - - JavaRDD output = joined.map(x -> buildMetadata(x._1, x._2)); - - avroSaver.saveJavaRDD(output, DocumentMetadata.SCHEMA$, params.output); - } - - } - - //------------------------ PRIVATE -------------------------- - - private static DocumentMetadata buildMetadata(CharSequence id, Tuple2>> rddRecord) { - DocumentMetadata.Builder metaBuilder = DocumentMetadata.newBuilder(); - metaBuilder.setId(id); - metaBuilder.setText(rddRecord._1); - - if (rddRecord._2.isPresent()) { - Tuple2 tuple = rddRecord._2.get(); - metaBuilder.setTitle(tuple._1); - metaBuilder.setAbstract$(tuple._2); - } - return metaBuilder.build(); - } - - @Parameters(separators = "=") - private static class TaraReferenceExtractionInputTransformerJobParameters { - - @Parameter(names = "-inputMetadata", required = true) - private String inputMetadata; - - @Parameter(names = "-inputText", required = true) - private String inputText; - - @Parameter(names = "-output", required = true) - private String output; - - } -} diff --git a/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/CachedTaraReferenceExtractionJob.java b/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/CachedTaraReferenceExtractionJob.java new file mode 100644 index 000000000..e94079187 --- /dev/null +++ b/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/CachedTaraReferenceExtractionJob.java @@ -0,0 +1,146 @@ +package eu.dnetlib.iis.wf.referenceextraction.project.tara; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import eu.dnetlib.iis.common.cache.CacheMetadataManagingProcess; +import eu.dnetlib.iis.common.lock.LockManager; +import eu.dnetlib.iis.common.lock.LockManagerUtils; +import eu.dnetlib.iis.common.spark.SparkSessionFactory; +import eu.dnetlib.iis.common.spark.avro.AvroDataFrameSupport; +import eu.dnetlib.iis.metadataextraction.schemas.DocumentText; +import eu.dnetlib.iis.transformers.metadatamerger.schemas.ExtractedDocumentMetadataMergedWithOriginal; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.*; + +import java.util.Arrays; + +import static eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionIOUtils.*; +import static eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionUtils.*; + +public class CachedTaraReferenceExtractionJob { + + public enum CacheRecordType { + documentHashToProject, documentHash + } + + public static final StructType DOCUMENT_METADATA_BY_ID_SCHEMA = StructType$.MODULE$.apply( + Arrays.asList( + StructField$.MODULE$.apply("id", DataTypes.StringType, false, Metadata.empty()), + StructField$.MODULE$.apply("title", DataTypes.StringType, true, Metadata.empty()), + StructField$.MODULE$.apply("abstract", DataTypes.StringType, true, Metadata.empty()), + StructField$.MODULE$.apply("text", DataTypes.StringType, false, Metadata.empty()) + ) + ); + + public static final StructType DOCUMENT_METADATA_BY_HASH_SCHEMA = StructType$.MODULE$.apply( + Arrays.asList( + StructField$.MODULE$.apply("hashValue", DataTypes.StringType, false, Metadata.empty()), + StructField$.MODULE$.apply("title", DataTypes.StringType, true, Metadata.empty()), + StructField$.MODULE$.apply("abstract", DataTypes.StringType, true, Metadata.empty()), + StructField$.MODULE$.apply("text", DataTypes.StringType, false, Metadata.empty()) + ) + ); + + public static final StructType DOCUMENT_METADATA_SCHEMA = StructType$.MODULE$.apply( + Arrays.asList( + StructField$.MODULE$.apply("id", DataTypes.StringType, false, Metadata.empty()), + StructField$.MODULE$.apply("title", DataTypes.StringType, true, Metadata.empty()), + StructField$.MODULE$.apply("abstract", DataTypes.StringType, true, Metadata.empty()), + StructField$.MODULE$.apply("text", DataTypes.StringType, false, Metadata.empty()), + StructField$.MODULE$.apply("hashValue", DataTypes.StringType, false, Metadata.empty()) + ) + ); + + public static void main(String[] args) throws Exception { + JobParameters params = new JobParameters(); + JCommander jcommander = new JCommander(params); + jcommander.parse(args); + + CacheMetadataManagingProcess cacheManager = new CacheMetadataManagingProcess(); + + try (SparkSession spark = SparkSessionFactory.withConfAndKryo(new SparkConf())) { + clearOutput(spark, params.outputDocumentToProject); + + AvroDataFrameSupport avroDataFrameSupport = new AvroDataFrameSupport(spark); + + Dataset inputExtractedDocumentMetadataMergedWithOriginalDF = avroDataFrameSupport.read( + params.inputExtractedDocumentMetadataMergedWithOriginal, + ExtractedDocumentMetadataMergedWithOriginal.SCHEMA$); + Dataset inputDocumentTextDF = avroDataFrameSupport.read(params.inputDocumentText, + DocumentText.SCHEMA$); + Dataset documentMetadataByIdDF = buildDocumentMetadataById(inputDocumentTextDF, + inputExtractedDocumentMetadataMergedWithOriginalDF); + Dataset documentMetadataDF = buildDocumentMetadata(documentMetadataByIdDF); + + String existingCacheId = cacheManager.getExistingCacheId(spark.sparkContext().hadoopConfiguration(), + params.cacheRootDir); + Dataset documentHashToProjectFromCacheDF = readDocumentHashToProjectFromCacheOrEmpty(spark, + params.cacheRootDir, + existingCacheId); + Dataset documentHashFromCacheDF = readDocumentHashFromCacheOrEmpty(spark, + params.cacheRootDir, + existingCacheId); + + Dataset documentMetadataByHashToBeProcessedDF = documentMetadataByHashToBeProcessed(documentMetadataDF, + documentHashFromCacheDF); + + TaraPipeExecutionEnvironment environment = new TaraPipeExecutionEnvironment(spark.sparkContext(), + params.scriptsDir, params.projectDbFile); + Dataset documentHashToProjectDF = runReferenceExtraction(spark, + documentMetadataByHashToBeProcessedDF, + environment) + .cache(); + + Dataset documentHashToProjectToBeCachedDF = documentHashToProjectToBeCached(spark, + documentHashToProjectDF, + documentHashToProjectFromCacheDF); + Dataset documentHashToBeCachedDF = documentHashToBeCached(spark, + documentHashFromCacheDF, + documentMetadataDF); + LockManager lockManager = LockManagerUtils.instantiateLockManager(params.lockManagerFactoryClassName, + spark.sparkContext().hadoopConfiguration()); + storeInCache(spark, + documentHashToProjectToBeCachedDF, + documentHashToBeCachedDF, + params.cacheRootDir, + lockManager, + cacheManager); + + Dataset documentToProjectToOutputDF = documentToProjectToOutput(spark, + documentHashToProjectToBeCachedDF, + documentMetadataDF); + storeInOutput(spark, + documentToProjectToOutputDF, + params.outputDocumentToProject); + } + } + + @Parameters(separators = "=") + public static class JobParameters { + + @Parameter(names = "-inputExtractedDocumentMetadataMergedWithOriginal", required = true) + private String inputExtractedDocumentMetadataMergedWithOriginal; + + @Parameter(names = "-inputDocumentText", required = true) + private String inputDocumentText; + + @Parameter(names = "-lockManagerFactoryClassName", required = true) + private String lockManagerFactoryClassName; + + @Parameter(names = "-cacheRootDir", required = true) + private String cacheRootDir; + + @Parameter(names = "-scriptsDir", required = true) + private String scriptsDir; + + @Parameter(names = "-projectDbFile", required = true) + private String projectDbFile; + + @Parameter(names = "-outputDocumentToProject", required = true) + private String outputDocumentToProject; + } +} diff --git a/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraPipeExecutionEnvironment.java b/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraPipeExecutionEnvironment.java new file mode 100644 index 000000000..98b3188bf --- /dev/null +++ b/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraPipeExecutionEnvironment.java @@ -0,0 +1,24 @@ +package eu.dnetlib.iis.wf.referenceextraction.project.tara; + +import eu.dnetlib.iis.common.spark.pipe.PipeExecutionEnvironment; +import org.apache.spark.SparkContext; + +/** + * Abstraction of cluster execution environment for TARA project reference extraction. + *

+ * Madis scripts dir and SQLite projects db file will be propagated to cluster worker nodes upon creation of this + * class. String command to be run by RDD 'pipe' method executes TARA reference extraction in the context of each + * node's worker dir. + */ +public class TaraPipeExecutionEnvironment implements PipeExecutionEnvironment { + + public TaraPipeExecutionEnvironment(SparkContext sc, String scriptsDir, String projectDbFile) { + sc.addFile(scriptsDir, true); + sc.addFile(projectDbFile); + } + + @Override + public String pipeCommand() { + return "bash scripts/run_tara_referenceextraction.sh"; + } +} diff --git a/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionIOUtils.java b/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionIOUtils.java new file mode 100644 index 000000000..faf079807 --- /dev/null +++ b/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionIOUtils.java @@ -0,0 +1,213 @@ +package eu.dnetlib.iis.wf.referenceextraction.project.tara; + +import eu.dnetlib.iis.common.cache.CacheMetadataManagingProcess; +import eu.dnetlib.iis.common.java.io.HdfsUtils; +import eu.dnetlib.iis.common.lock.LockManager; +import eu.dnetlib.iis.common.spark.avro.AvroDataFrameSupport; +import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentHash; +import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentHashToProject; +import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject; +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; + +public class TaraReferenceExtractionIOUtils { + + private static final Logger logger = LoggerFactory.getLogger(TaraReferenceExtractionIOUtils.class); + + private TaraReferenceExtractionIOUtils() { + } + + public static void clearOutput(SparkSession spark, + String outputDocumentToProject) throws IOException { + clearOutput(outputDocumentToProject, new OutputCleaner(spark)); + } + + public static void clearOutput(String outputDocumentToProject, + OutputCleaner cleaner) throws IOException { + logger.info("Clearing output location {}.", outputDocumentToProject); + cleaner.clearOutput(outputDocumentToProject); + } + + public static Dataset readDocumentHashToProjectFromCacheOrEmpty(SparkSession spark, + String cacheRootDir, + String existingCacheId) { + return readDocumentHashToProjectFromCacheOrEmpty( + spark, + cacheRootDir, + existingCacheId, + new AvroDataStoreReader(spark)); + } + + public static Dataset readDocumentHashToProjectFromCacheOrEmpty(SparkSession spark, + String cacheRootDir, + String existingCacheId, + AvroDataStoreReader reader) { + if (!cacheExist(existingCacheId)) { + logger.info("Existing cache id is {}. Returning empty datastore.", existingCacheId); + return new AvroDataFrameSupport(spark).createDataFrame(Collections.emptyList(), + DocumentHashToProject.SCHEMA$); + } + Path existingCache = new Path(cacheRootDir, existingCacheId); + logger.info("Existing cache id is {}. Reading datastore from cache path {}.", existingCacheId, existingCache); + Path path = new Path(existingCache, CachedTaraReferenceExtractionJob.CacheRecordType.documentHashToProject.name()); + return reader.read(path.toString(), DocumentHashToProject.SCHEMA$); + } + + public static Dataset readDocumentHashFromCacheOrEmpty(SparkSession spark, + String cacheRootDir, + String existingCacheId) { + return readDocumentHashFromCacheOrEmpty(spark, + cacheRootDir, + existingCacheId, + new AvroDataStoreReader(spark)); + } + + public static Dataset readDocumentHashFromCacheOrEmpty(SparkSession spark, + String cacheRootDir, + String existingCacheId, + AvroDataStoreReader reader) { + if (!cacheExist(existingCacheId)) { + return new AvroDataFrameSupport(spark).createDataFrame(Collections.emptyList(), DocumentHash.SCHEMA$); + } + Path existingCache = new Path(cacheRootDir, existingCacheId); + logger.info("Existing cache id is {}. Reading datastore from cache path {}.", existingCacheId, existingCache); + Path path = new Path(existingCache, CachedTaraReferenceExtractionJob.CacheRecordType.documentHash.name()); + return reader.read(path.toString(), DocumentHash.SCHEMA$); + } + + private static Boolean cacheExist(String existingCacheId) { + return !CacheMetadataManagingProcess.UNDEFINED.equals(existingCacheId); + } + + public static void storeInCache(SparkSession spark, + Dataset documentHashToProjectToBeCachedDF, + Dataset documentHashToBeCachedDF, + String cacheRootDir, + LockManager lockManager, + CacheMetadataManagingProcess cacheManager) throws Exception { + storeInCache(spark, + documentHashToProjectToBeCachedDF, + documentHashToBeCachedDF, + cacheRootDir, + lockManager, + cacheManager, + new AvroDataStoreWriter(spark)); + } + + public static void storeInCache(SparkSession spark, + Dataset documentHashToProjectToBeCachedDF, + Dataset documentHashToBeCachedDF, + String cacheRootDir, + LockManager lockManager, + CacheMetadataManagingProcess cacheManager, + AvroDataStoreWriter writer) throws Exception { + lockManager.obtain(cacheRootDir); + try { + String newCacheId = cacheManager.generateNewCacheId(spark.sparkContext().hadoopConfiguration(), cacheRootDir); + Path newCachePath = new Path(cacheRootDir, newCacheId); + FileSystem fileSystem = FileSystem.get(spark.sparkContext().hadoopConfiguration()); + logger.info("Storing cached data in path {}.", newCachePath.toString()); + try { + storeDocumentHashToProjectInCache(documentHashToProjectToBeCachedDF, + newCachePath, + writer); + storeDocumentHashInCache(documentHashToBeCachedDF, + newCachePath, + writer); + storeMetaInCache(cacheManager, + cacheRootDir, + newCacheId, + spark.sparkContext().hadoopConfiguration()); + } catch (Exception e) { + fileSystem.delete(newCachePath, true); + throw e; + } + } finally { + lockManager.release(cacheRootDir); + } + } + + private static void storeDocumentHashToProjectInCache(Dataset documentHashToProjectToBeCachedDF, + Path newCachePath, + AvroDataStoreWriter writer) { + writer.write(documentHashToProjectToBeCachedDF, + new Path(newCachePath, + CachedTaraReferenceExtractionJob.CacheRecordType.documentHashToProject.name()).toString(), + DocumentHashToProject.SCHEMA$); + } + + private static void storeDocumentHashInCache(Dataset documentHashToBeCachedDF, + Path newCachePath, + AvroDataStoreWriter writer) { + writer.write(documentHashToBeCachedDF, + new Path(newCachePath, + CachedTaraReferenceExtractionJob.CacheRecordType.documentHash.name()).toString(), + DocumentHash.SCHEMA$); + } + + private static void storeMetaInCache(CacheMetadataManagingProcess cacheManager, + String cacheRootDir, + String newCacheId, + Configuration conf) throws IOException { + cacheManager.writeCacheId(conf, cacheRootDir, newCacheId); + } + + public static void storeInOutput(SparkSession spark, + Dataset resultsToOutputDF, + String outputDocumentToProject) { + storeInOutput(resultsToOutputDF, outputDocumentToProject, new AvroDataStoreWriter(spark)); + } + + public static void storeInOutput(Dataset resultsToOutputDF, + String outputDocumentToProject, + AvroDataStoreWriter writer) { + logger.info("Storing output data in path {}.", outputDocumentToProject); + writer.write(resultsToOutputDF, outputDocumentToProject, DocumentToProject.SCHEMA$); + } + + public static class OutputCleaner { + private Configuration conf; + + public OutputCleaner(SparkSession spark) { + this.conf = spark.sparkContext().hadoopConfiguration(); + } + + public void clearOutput(String output) throws IOException { + HdfsUtils.remove(conf, output); + } + } + + public static class AvroDataStoreReader { + private AvroDataFrameSupport avroDataFrameSupport; + + private AvroDataStoreReader(SparkSession spark) { + this.avroDataFrameSupport = new AvroDataFrameSupport(spark); + } + + public Dataset read(String path, Schema schema) { + return avroDataFrameSupport.read(path, schema); + } + } + + public static class AvroDataStoreWriter { + private AvroDataFrameSupport avroDataFrameSupport; + + public AvroDataStoreWriter(SparkSession spark) { + this.avroDataFrameSupport = new AvroDataFrameSupport(spark); + } + + public void write(Dataset df, String path, Schema avroSchema) { + avroDataFrameSupport.write(df, path, avroSchema); + } + } +} diff --git a/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionUtils.java b/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionUtils.java new file mode 100644 index 000000000..07ef767fc --- /dev/null +++ b/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionUtils.java @@ -0,0 +1,148 @@ +package eu.dnetlib.iis.wf.referenceextraction.project.tara; + +import eu.dnetlib.iis.common.spark.pipe.PipeExecutionEnvironment; +import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentHash; +import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentHashToProject; +import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject; +import org.apache.avro.Schema; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.*; +import org.apache.spark.sql.avro.SchemaConverters; +import org.apache.spark.sql.types.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; + +import static org.apache.spark.sql.functions.*; + +public class TaraReferenceExtractionUtils { + + private static final Logger logger = LoggerFactory.getLogger(TaraReferenceExtractionUtils.class); + + private static final String SEP = "|"; + + private static final StructType PIPE_RESULT_SCHEMA = StructType$.MODULE$.apply( + Collections.singletonList( + StructField$.MODULE$.apply("value", DataTypes.StringType, true, Metadata.empty()) + ) + ); + + private static final StructType REFERENCE_EXTRACTION_RESULT_SCHEMA = StructType$.MODULE$.apply( + Arrays.asList( + StructField$.MODULE$.apply("documentId", DataTypes.StringType, true, Metadata.empty()), + StructField$.MODULE$.apply("projectId", DataTypes.StringType, true, Metadata.empty()), + StructField$.MODULE$.apply("confidenceLevel", DataTypes.FloatType, true, Metadata.empty()), + StructField$.MODULE$.apply("textsnippet", DataTypes.StringType, true, Metadata.empty()) + ) + ); + + public static Dataset buildDocumentMetadataById(Dataset documentTextDF, + Dataset extractedDocumentMetadataMergedWithOriginalDF) { + logger.info("Building document metadata by id for input data."); + Column joinExprs = documentTextDF.col("id").equalTo( + extractedDocumentMetadataMergedWithOriginalDF.col("id")); + return documentTextDF + .join(extractedDocumentMetadataMergedWithOriginalDF, joinExprs, "left_outer") + .select( + documentTextDF.col("id"), + extractedDocumentMetadataMergedWithOriginalDF.col("title"), + extractedDocumentMetadataMergedWithOriginalDF.col("abstract"), + documentTextDF.col("text") + ); + } + + public static Dataset buildDocumentMetadata(Dataset documentMetadataByIdDF) { + return buildDocumentMetadata(documentMetadataByIdDF, new DocumentMetadataHashColumnCreator()); + } + + public static Dataset buildDocumentMetadata(Dataset documentMetadataByIdDF, + DocumentMetadataHashColumnCreator hashColumnCreator) { + logger.info("Building document metadata with hash column."); + return documentMetadataByIdDF + .withColumn("hashValue", + hashColumnCreator.hashCol("title", "abstract", "text")); + } + + public static Dataset documentMetadataByHashToBeProcessed(Dataset documentMetadataDF, + Dataset documentHashFromCacheDF) { + logger.info("Finding document metadata to be processed."); + Column joinExprs = documentMetadataDF.col("hashValue").equalTo( + documentHashFromCacheDF.col("hashValue")); + return documentMetadataDF + .join(documentHashFromCacheDF, joinExprs, "left_anti") + .select( + col("hashValue"), + col("title"), + col("abstract"), + col("text") + ) + .distinct(); + } + + public static Dataset runReferenceExtraction(SparkSession spark, + Dataset documentMetadataByHashDF, + PipeExecutionEnvironment environment) throws IOException { + logger.info("Running reference extraction for input document metadata."); + String pipeCommandStr = environment.pipeCommand(); + JavaRDD piped = documentMetadataByHashDF + .withColumnRenamed("hashValue", "id") + .toJSON() + .javaRDD() + .pipe(pipeCommandStr) + .map(RowFactory::create); + return spark.createDataFrame(piped, PIPE_RESULT_SCHEMA) + .withColumn("json_struct", from_json(col("value"), REFERENCE_EXTRACTION_RESULT_SCHEMA)) + .select(expr("json_struct.*")) + .withColumnRenamed("documentId", "hashValue"); + } + + public static Dataset documentHashToProjectToBeCached(SparkSession spark, + Dataset documentHashToProjectDF, + Dataset documentHashToProjectFromCacheDF) { + logger.info("Finding reference extraction results to be cached."); + Dataset toBeCached = documentHashToProjectFromCacheDF.union(documentHashToProjectDF); + return dataFrameWithSchema(spark, toBeCached, DocumentHashToProject.SCHEMA$); + } + + public static Dataset documentHashToBeCached(SparkSession spark, + Dataset documentHashFromCacheDF, + Dataset documentMetadataDF) { + logger.info("Finding processed documents to be cached."); + Dataset documentHashDF = documentMetadataDF + .select("hashValue"); + Dataset toBeCached = documentHashFromCacheDF.union(documentHashDF).distinct(); + return dataFrameWithSchema(spark, toBeCached, DocumentHash.SCHEMA$); + } + + public static Dataset documentToProjectToOutput(SparkSession spark, + Dataset documentHashToProjectDF, + Dataset documentMetadataDF) { + logger.info("Finding reference extraction results to be saved to output."); + Column joinExprs = documentHashToProjectDF.col("hashValue").equalTo( + documentMetadataDF.col("hashValue")); + Dataset toBeOutput = documentHashToProjectDF + .join(documentMetadataDF, joinExprs, "inner") + .select( + col("id").as("documentId"), + col("projectId"), + col("confidenceLevel"), + col("textsnippet") + ); + return dataFrameWithSchema(spark, toBeOutput, DocumentToProject.SCHEMA$); + } + + private static Dataset dataFrameWithSchema(SparkSession spark, + Dataset df, + Schema avroSchema) { + return spark.createDataFrame(df.javaRDD(), (StructType) SchemaConverters.toSqlType(avroSchema).dataType()); + } + + public static class DocumentMetadataHashColumnCreator { + public Column hashCol(String titleColName, String abstractColName, String textColName) { + return sha1(concat_ws(SEP, col(titleColName), col(abstractColName), col(textColName))); + } + } +} diff --git a/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/softwareurl/CachedWebCrawlerJob.java b/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/softwareurl/CachedWebCrawlerJob.java index 947d86c6f..7e2692ec6 100644 --- a/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/softwareurl/CachedWebCrawlerJob.java +++ b/iis-wf/iis-wf-referenceextraction/src/main/java/eu/dnetlib/iis/wf/referenceextraction/softwareurl/CachedWebCrawlerJob.java @@ -10,6 +10,7 @@ import eu.dnetlib.iis.common.java.io.HdfsUtils; import eu.dnetlib.iis.common.lock.LockManager; import eu.dnetlib.iis.common.lock.LockManagerFactory; +import eu.dnetlib.iis.common.lock.LockManagerUtils; import eu.dnetlib.iis.common.report.ReportEntryFactory; import eu.dnetlib.iis.common.schemas.ReportEntry; import eu.dnetlib.iis.common.spark.JavaSparkContextFactory; @@ -72,7 +73,8 @@ public static void main(String[] args) throws Exception { HdfsUtils.remove(sc.hadoopConfiguration(), params.outputFaultPath); HdfsUtils.remove(sc.hadoopConfiguration(), params.outputReportPath); - LockManager lockManager = instantiateLockManager(params.lockManagerFactoryClassName, sc.hadoopConfiguration()); + LockManager lockManager = LockManagerUtils.instantiateLockManager(params.lockManagerFactoryClassName, + sc.hadoopConfiguration()); OutputPaths outputPaths = new OutputPaths(params); @@ -102,15 +104,7 @@ public static void main(String[] args) throws Exception { } //------------------------ PRIVATE -------------------------- - - private static LockManager instantiateLockManager(String lockManagerFactoryClassName, - Configuration config) throws ClassNotFoundException, NoSuchMethodException, SecurityException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { - Class clazz = Class.forName(lockManagerFactoryClassName); - Constructor constructor = clazz.getConstructor(); - LockManagerFactory lockManagerFactory = (LockManagerFactory) constructor.newInstance(); - return lockManagerFactory.instantiate(config); - } - + private static String normalizePath(String cacheRootDir) { // omitting last separator when defined if (File.separatorChar == cacheRootDir.charAt(cacheRootDir.length()-1)) { @@ -142,7 +136,6 @@ private static void createCache(JavaRDD documentToSoftwar // store final results JavaRDD entitiesToBeStored = produceEntitiesToBeStored(documentToSoftwareUrl, returnedFromWebcrawlTuple._1); entitiesToBeStored.cache(); - produceFaultToBeStored(documentToSoftwareUrl, returnedFromWebcrawlTuple._2); JavaRDD faultsToBeStored = produceFaultToBeStored(documentToSoftwareUrl, returnedFromWebcrawlTuple._2); faultsToBeStored.cache(); diff --git a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/community/main_sqlite/oozie_app/workflow.xml b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/community/main_sqlite/oozie_app/workflow.xml index 8a3f9c89a..3fa36b2c1 100644 --- a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/community/main_sqlite/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/community/main_sqlite/oozie_app/workflow.xml @@ -104,7 +104,7 @@ by process storing plaintexts into the database --> mapreduce.task.timeout - 14400000 + 43200000 oozie.action.external.stats.write diff --git a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/covid19/main/oozie_app/workflow.xml b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/covid19/main/oozie_app/workflow.xml index d8fcff89c..87a1e3c36 100644 --- a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/covid19/main/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/covid19/main/oozie_app/workflow.xml @@ -167,7 +167,7 @@ by process storing plaintexts into the database --> mapreduce.task.timeout - 7200000 + 43200000 oozie.action.external.stats.write diff --git a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/dataset/datacite/main_sqlite/oozie_app/workflow.xml b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/dataset/datacite/main_sqlite/oozie_app/workflow.xml index 11c00a242..9b640f8f4 100644 --- a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/dataset/datacite/main_sqlite/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/dataset/datacite/main_sqlite/oozie_app/workflow.xml @@ -115,7 +115,7 @@ by process storing plaintexts into the database --> mapreduce.task.timeout - 14400000 + 43200000 diff --git a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/dataset/opentrials/main_sqlite/oozie_app/workflow.xml b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/dataset/opentrials/main_sqlite/oozie_app/workflow.xml index b21a384e5..e8d08842c 100644 --- a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/dataset/opentrials/main_sqlite/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/dataset/opentrials/main_sqlite/oozie_app/workflow.xml @@ -115,7 +115,7 @@ by process storing plaintexts into the database --> mapreduce.task.timeout - 14400000 + 43200000 diff --git a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/patent/main_sqlite/oozie_app/workflow.xml b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/patent/main_sqlite/oozie_app/workflow.xml index 615d6050c..a1e1bc5cf 100644 --- a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/patent/main_sqlite/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/patent/main_sqlite/oozie_app/workflow.xml @@ -93,7 +93,7 @@ mapreduce.task.timeout - 14400000 + 43200000 ${input_patent_db}#patent.db diff --git a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/pdb/main/oozie_app/workflow.xml b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/pdb/main/oozie_app/workflow.xml index 5c677035f..58309e410 100644 --- a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/pdb/main/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/pdb/main/oozie_app/workflow.xml @@ -102,7 +102,7 @@ by process storing plaintexts into the database --> mapreduce.task.timeout - 7200000 + 43200000 diff --git a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/oozie_app/import.txt b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/oozie_app/import.txt index 3797b82eb..62180ad86 100644 --- a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/oozie_app/import.txt +++ b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/oozie_app/import.txt @@ -1,6 +1,6 @@ ## This is a classpath-based import file (this header is required) sqlite_builder classpath eu/dnetlib/iis/wf/referenceextraction/project/sqlite_builder/oozie_app main_sqlite classpath eu/dnetlib/iis/wf/referenceextraction/project/main_sqlite/oozie_app -tara_sqlite classpath eu/dnetlib/iis/wf/referenceextraction/project/tara_sqlite/oozie_app +tara_main classpath eu/dnetlib/iis/wf/referenceextraction/project/tara/oozie_app funder_report classpath eu/dnetlib/iis/wf/referenceextraction/project/funder_report/oozie_app transformers_common_union_dedup classpath eu/dnetlib/iis/wf/transformers/common/uniondedup/oozie_app \ No newline at end of file diff --git a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/oozie_app/workflow.xml b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/oozie_app/workflow.xml index bfaebb992..36f25d1bc 100644 --- a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/oozie_app/workflow.xml @@ -18,6 +18,17 @@ output_document_to_project output document to project + + tara_lock_manager_factory_class_name + eu.dnetlib.iis.common.lock.ZookeeperLockManagerFactory + + lock manager factory class name, to be used for synchronizing access to cache directory + + + + tara_cache_root_dir + tara reference extraction cache root directory + sparkDriverMemory memory for driver process @@ -81,7 +92,7 @@ - + @@ -110,67 +121,38 @@ - - - - - - - - - oozie.action.sharelib.for.spark - ${oozieActionShareLibForSpark2} - - - yarn-cluster - cluster - tara-referenceextraction-input-transformer - eu.dnetlib.iis.wf.referenceextraction.project.input.TaraReferenceExtractionInputTransformerJob - ${oozieTopWfApplicationPath}/lib/iis-wf-referenceextraction-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - -inputMetadata = ${input_document_metadata} - -inputText = ${input_document_text} - -output = ${workingDir}/meta_with_text - - - - - - + + - ${wf:appPath()}/tara_sqlite + ${wf:appPath()}/tara_main workingDir - ${workingDir}/tara_sqlite/working_dir + ${workingDir}/tara_main/working_dir - input_document_metadata - ${workingDir}/meta_with_text + lock_manager_factory_class_name + ${tara_lock_manager_factory_class_name} - input_project_db + cache_root_dir + ${tara_cache_root_dir} + + + project_db_file ${workingDir}/projects.db output_document_to_project - ${workingDir}/tara_sqlite/output + ${workingDir}/tara_main/output - - + + @@ -188,7 +170,7 @@ input_b - ${workingDir}/tara_sqlite/output + ${workingDir}/tara_main/output group_by_field_1 diff --git a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/main_sqlite/oozie_app/workflow.xml b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/main_sqlite/oozie_app/workflow.xml index 890636c3b..2b484f9e2 100644 --- a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/main_sqlite/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/main_sqlite/oozie_app/workflow.xml @@ -106,7 +106,7 @@ by process storing plaintexts into the database --> mapreduce.task.timeout - 28800000 + 43200000 oozie.action.external.stats.write diff --git a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/tara_sqlite/oozie_app/lib/scripts/import.txt b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/tara/oozie_app/lib/scripts/import.txt similarity index 100% rename from iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/tara_sqlite/oozie_app/lib/scripts/import.txt rename to iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/tara/oozie_app/lib/scripts/import.txt diff --git a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/tara/oozie_app/lib/scripts/run_tara_referenceextraction.sh b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/tara/oozie_app/lib/scripts/run_tara_referenceextraction.sh new file mode 100644 index 000000000..0571805fe --- /dev/null +++ b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/tara/oozie_app/lib/scripts/run_tara_referenceextraction.sh @@ -0,0 +1,7 @@ +#!/bin/bash +# NOTE: this script is intended to be run with RDD 'pipe' method using bash; +# commands in this script are executed by spark within spark worker dir on each node; +# the dir should contain madis scripts in 'scripts' dir and projects db file as 'projects.db'; + +set -o pipefail +python scripts/madis/mexec.py -d projects.db -f scripts/taraextract.sql \ No newline at end of file diff --git a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/tara_sqlite/oozie_app/lib/scripts/taraextract.sql b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/tara/oozie_app/lib/scripts/taraextract.sql similarity index 100% rename from iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/tara_sqlite/oozie_app/lib/scripts/taraextract.sql rename to iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/tara/oozie_app/lib/scripts/taraextract.sql diff --git a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/tara/oozie_app/workflow.xml b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/tara/oozie_app/workflow.xml new file mode 100644 index 000000000..2ab1befa4 --- /dev/null +++ b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/tara/oozie_app/workflow.xml @@ -0,0 +1,127 @@ + + + + + input_document_metadata + + input document metadata with + eu.dnetlib.iis.transformers.metadatamerger.schemas.ExtractedDocumentMetadataMergedWithOriginal avro + records + + + + input_document_text + + input document text with eu.dnetlib.iis.metadataextraction.schemas.DocumentText avro records + + + + lock_manager_factory_class_name + eu.dnetlib.iis.common.lock.ZookeeperLockManagerFactory + + lock manager factory class name, to be used for synchronizing access to cache directory + + + + cache_root_dir + tara reference extraction cache root directory + + + project_db_file + input project SQLite DB path + + + output_document_to_project + output document to project + + + sparkSqlShufflePartitions + 2880 + number of partitions after shuffle, 3 times the number of cores in the cluster + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + + yarn + cluster + tara-referenceextraction + eu.dnetlib.iis.wf.referenceextraction.project.tara.CachedTaraReferenceExtractionJob + ${oozieTopWfApplicationPath}/lib/iis-wf-referenceextraction-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=${sparkSqlShufflePartitions} + + -inputExtractedDocumentMetadataMergedWithOriginal=${input_document_metadata} + -inputDocumentText=${input_document_text} + -lockManagerFactoryClassName=${lock_manager_factory_class_name} + -cacheRootDir=${cache_root_dir} + -scriptsDir=${wf:conf('oozie.wf.application.path')}/lib/scripts + -projectDbFile=${nameNode}${project_db_file} + -outputDocumentToProject=${output_document_to_project} + + + + + + + Unfortunately, the process failed -- error message: + [${wf:errorMessage(wf:lastErrorNode())}] + + + + + diff --git a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/tara_sqlite/oozie_app/workflow.xml b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/tara_sqlite/oozie_app/workflow.xml deleted file mode 100644 index 117aa43a9..000000000 --- a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/project/tara_sqlite/oozie_app/workflow.xml +++ /dev/null @@ -1,128 +0,0 @@ - - - - - - input_document_metadata - input document metadata with eu.dnetlib.iis.referenceextraction.project.schemas.DocumentMetadata avro records - - - input_project_db - input project SQLite DB path - - - output_document_to_project - output document to project - - - - - ${jobTracker} - ${nameNode} - - - mapreduce.job.queuename - ${queueName} - - - oozie.launcher.mapred.job.queue.name - ${oozieLauncherQueueName} - - - - - - - - - - eu.dnetlib.iis.common.javamapreduce.hack.AvroSchemaGenerator - eu.dnetlib.iis.referenceextraction.project.schemas.DocumentMetadata - eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject - - - - - - - - - - - - - - scripts/madis/mexec.py -d project.db -f scripts/taraextract.sql - - - - - mapred.output.format.class - com.cloudera.science.avro.streaming.AvroAsJSONOutputFormat - - - mapred.input.format.class - com.cloudera.science.avro.streaming.AvroAsJSONInputFormat - - - - - mapreduce.job.reduces - 0 - - - - - mapreduce.input.fileinputformat.inputdir - ${input_document_metadata} - - - - input.schema.literal - ${wf:actionData('generate-schema')['eu.dnetlib.iis.referenceextraction.project.schemas.DocumentMetadata']} - - - - - mapreduce.output.fileoutputformat.outputdir - ${output_document_to_project} - - - - output.schema.literal - ${wf:actionData('generate-schema')['eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject']} - - - - - mapreduce.task.timeout - 28800000 - - - oozie.action.external.stats.write - true - - - ${input_project_db}#project.db - - - - - - - - Unfortunately, the process failed -- error message: - [${wf:errorMessage(wf:lastErrorNode())}] - - - - - diff --git a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/researchinitiative/main_sqlite/oozie_app/workflow.xml b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/researchinitiative/main_sqlite/oozie_app/workflow.xml index 1226dbbeb..c636f5a8c 100644 --- a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/researchinitiative/main_sqlite/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/researchinitiative/main_sqlite/oozie_app/workflow.xml @@ -106,7 +106,7 @@ by process storing plaintexts into the database --> mapreduce.task.timeout - 14400000 + 43200000 ${input_researchinitiative_db}#researchinitiative.db diff --git a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/softwareurl/main/oozie_app/workflow.xml b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/softwareurl/main/oozie_app/workflow.xml index e6740f500..68d75190c 100644 --- a/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/softwareurl/main/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-referenceextraction/src/main/resources/eu/dnetlib/iis/wf/referenceextraction/softwareurl/main/oozie_app/workflow.xml @@ -181,7 +181,7 @@ mapreduce.task.timeout - 7200000 + 43200000 @@ -283,7 +283,7 @@ mapreduce.task.timeout - 7200000 + 43200000 ${input_softwareheritage_origins_db}#origins.db diff --git a/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/softwareurl/LockManagerFactoryMock.java b/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/LockManagerFactoryMock.java similarity index 91% rename from iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/softwareurl/LockManagerFactoryMock.java rename to iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/LockManagerFactoryMock.java index 22b573760..a5aec44cf 100644 --- a/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/softwareurl/LockManagerFactoryMock.java +++ b/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/LockManagerFactoryMock.java @@ -1,4 +1,4 @@ -package eu.dnetlib.iis.wf.referenceextraction.softwareurl; +package eu.dnetlib.iis.wf.referenceextraction; import org.apache.hadoop.conf.Configuration; diff --git a/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/CachedTaraReferenceExtractionJobTest.java b/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/CachedTaraReferenceExtractionJobTest.java new file mode 100644 index 000000000..633675b03 --- /dev/null +++ b/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/CachedTaraReferenceExtractionJobTest.java @@ -0,0 +1,61 @@ +package eu.dnetlib.iis.wf.referenceextraction.project.tara; + +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class CachedTaraReferenceExtractionJobTest { + + @Test + public void documentMetadataByIdShouldHaveProperSchema() { + // then + assertForField(CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_BY_ID_SCHEMA.fields()[0], + "id", DataTypes.StringType, false); + assertForField(CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_BY_ID_SCHEMA.fields()[1], + "title", DataTypes.StringType, true); + assertForField(CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_BY_ID_SCHEMA.fields()[2], + "abstract", DataTypes.StringType, true); + assertForField(CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_BY_ID_SCHEMA.fields()[3], + "text", DataTypes.StringType, false); + } + + @Test + public void documentMetadataByHashShouldHaveProperSchema() { + // then + assertForField(CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_BY_HASH_SCHEMA.fields()[0], + "hashValue", DataTypes.StringType, false); + assertForField(CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_BY_HASH_SCHEMA.fields()[1], + "title", DataTypes.StringType, true); + assertForField(CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_BY_HASH_SCHEMA.fields()[2], + "abstract", DataTypes.StringType, true); + assertForField(CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_BY_HASH_SCHEMA.fields()[3], + "text", DataTypes.StringType, false); + } + + @Test + public void documentMetadataWithHashShouldHaveProperSchema() { + // then + assertForField(CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_SCHEMA.fields()[0], + "id", DataTypes.StringType, false); + assertForField(CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_SCHEMA.fields()[1], + "title", DataTypes.StringType, true); + assertForField(CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_SCHEMA.fields()[2], + "abstract", DataTypes.StringType, true); + assertForField(CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_SCHEMA.fields()[3], + "text", DataTypes.StringType, false); + assertForField(CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_SCHEMA.fields()[4], + "hashValue", DataTypes.StringType, false); + } + + private static void assertForField(StructField field, + String name, + DataType dataType, + Boolean nullable) { + assertEquals(name, field.name()); + assertEquals(dataType, field.dataType()); + assertEquals(nullable, field.nullable()); + } +} \ No newline at end of file diff --git a/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionIOUtilsTest.java b/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionIOUtilsTest.java new file mode 100644 index 000000000..187714c9e --- /dev/null +++ b/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionIOUtilsTest.java @@ -0,0 +1,259 @@ +package eu.dnetlib.iis.wf.referenceextraction.project.tara; + +import eu.dnetlib.iis.common.cache.CacheMetadataManagingProcess; +import eu.dnetlib.iis.common.lock.LockManager; +import eu.dnetlib.iis.common.spark.avro.AvroDataFrameSupport; +import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentHash; +import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentHashToProject; +import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.avro.SchemaConverters; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionIOUtils.*; +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +public class TaraReferenceExtractionIOUtilsTest { + + private static SparkSession spark; + + @BeforeClass + public static void beforeClass() { + SparkConf conf = new SparkConf(); + conf.setMaster("local"); + conf.set("spark.driver.host", "localhost"); + conf.setAppName(TaraReferenceExtractionIOUtilsTest.class.getSimpleName()); + spark = SparkSession.builder().config(conf).getOrCreate(); + } + + @AfterClass + public static void afterClass() { + spark.stop(); + } + + @Test + public void clearOutputShouldRunProperly() throws IOException { + // given + OutputCleaner cleaner = mock(OutputCleaner.class); + + // when + clearOutput("path/to/output", cleaner); + + // then + verify(cleaner, atLeastOnce()).clearOutput("path/to/output"); + } + + @Test + public void readDocumentHashToProjectFromCacheOrEmptyShouldReadEmptyDataFrameIfExistingCacheIdIsUndefined() { + // given + AvroDataStoreReader reader = mock(AvroDataStoreReader.class); + + // when + Dataset resultDF = readDocumentHashToProjectFromCacheOrEmpty(spark, + "path/to/cache", + CacheMetadataManagingProcess.UNDEFINED, + reader); + + // then + assertEquals(SchemaConverters.toSqlType(DocumentHashToProject.SCHEMA$).dataType(), resultDF.schema()); + + List results = resultDF.collectAsList(); + assertTrue(results.isEmpty()); + + verify(reader, never()).read(any(), any()); + } + + @Test + public void readDocumentHashToProjectFromCacheOrEmptyShouldReadNonEmptyDataFraneIfExistingCacheIdIsDefined() { + // given + DocumentHashToProject documentHashToProject = DocumentHashToProject.newBuilder() + .setHashValue("a1") + .setProjectId("projectId") + .setConfidenceLevel(1.0f) + .build(); + List documentHashToProjectList = Collections.singletonList(documentHashToProject); + AvroDataStoreReader reader = mock(AvroDataStoreReader.class); + when(reader.read(new Path("path/to/cache/01", + CachedTaraReferenceExtractionJob.CacheRecordType.documentHashToProject.name()).toString(), + DocumentHashToProject.SCHEMA$)) + .thenReturn(new AvroDataFrameSupport(spark).createDataFrame(documentHashToProjectList, + DocumentHashToProject.SCHEMA$)); + + // when + Dataset resultDF = readDocumentHashToProjectFromCacheOrEmpty(spark, + "path/to/cache", + "01", + reader); + + // then + assertEquals(SchemaConverters.toSqlType(DocumentHashToProject.SCHEMA$).dataType(), resultDF.schema()); + + List results = resultDF.collectAsList(); + assertEquals(1, results.size()); + Row row = results.get(0); + assertEquals(documentHashToProject.getHashValue(), row.getAs("hashValue")); + assertEquals(documentHashToProject.getProjectId(), row.getAs("projectId")); + assertEquals(documentHashToProject.getConfidenceLevel(), row.getAs("confidenceLevel"), 1e-3); + assertNull(row.getAs("textsnippet")); + } + + @Test + public void readDocumentHashFromCacheOrEmptyShouldReadEmptyDataFrameIfExistingCacheIdIsUndefined() { + // given + AvroDataStoreReader reader = mock(AvroDataStoreReader.class); + + // when + Dataset resultDF = readDocumentHashFromCacheOrEmpty(spark, + "path/to/cache", + CacheMetadataManagingProcess.UNDEFINED, + reader); + + // then + assertEquals(SchemaConverters.toSqlType(DocumentHash.SCHEMA$).dataType(), resultDF.schema()); + + List results = resultDF.collectAsList(); + assertTrue(results.isEmpty()); + + verify(reader, never()).read(any(), any()); + } + + @Test + public void readDocumentHashFromCacheOrEmptyShouldReadNonEmptyDataFrameIfExistingCacheIdIsDefined() { + // given + DocumentHash documentHash = DocumentHash.newBuilder().setHashValue("a1").build(); + List documentHashList = Collections.singletonList(documentHash); + AvroDataStoreReader reader = mock(AvroDataStoreReader.class); + when(reader.read(new Path("path/to/cache/01", CachedTaraReferenceExtractionJob.CacheRecordType.documentHash.name()).toString(), + DocumentHash.SCHEMA$)) + .thenReturn(new AvroDataFrameSupport(spark).createDataFrame(documentHashList, DocumentHash.SCHEMA$)); + + // when + Dataset resultDF = readDocumentHashFromCacheOrEmpty(spark, + "path/to/cache", + "01", + reader); + + // then + assertEquals(SchemaConverters.toSqlType(DocumentHash.SCHEMA$).dataType(), resultDF.schema()); + + List results = resultDF.collectAsList(); + assertEquals(1, results.size()); + Row row = results.get(0); + assertEquals(documentHash.getHashValue(), row.getAs("hashValue")); + } + + @Test + public void storeInCacheShouldRunProperly() throws Exception { + // given + AvroDataFrameSupport avroDataFrameSupport = new AvroDataFrameSupport(spark); + DocumentHashToProject documentHashToProject = DocumentHashToProject.newBuilder() + .setHashValue("a1") + .setProjectId("projId-1") + .setConfidenceLevel(1.0f) + .build(); + List documentHashToProjectList = Collections.singletonList(documentHashToProject); + Dataset documentHashToProjectDF = avroDataFrameSupport.createDataFrame( + documentHashToProjectList, + DocumentHashToProject.SCHEMA$); + DocumentHash documentHash = DocumentHash.newBuilder().setHashValue("a1").build(); + List documentHashList = Collections.singletonList(documentHash); + Dataset documentHashDF = avroDataFrameSupport.createDataFrame( + documentHashList, + DocumentHash.SCHEMA$); + LockManager lockManager = mock(LockManager.class); + CacheMetadataManagingProcess cacheManager = mock(CacheMetadataManagingProcess.class); + when(cacheManager.generateNewCacheId(any(Configuration.class), eq("path/to/cache"))).thenReturn("01"); + AvroDataStoreWriter writer = mock(AvroDataStoreWriter.class); + + // when + storeInCache(spark, + documentHashToProjectDF, + documentHashDF, + "path/to/cache", + lockManager, + cacheManager, + writer); + + // then + ArgumentCaptor> dataFrameCaptor = new ArgumentCaptor<>(); + + verify(lockManager, atLeastOnce()).obtain("path/to/cache"); + + verify(writer, atLeastOnce()).write(dataFrameCaptor.capture(), + eq(String.format("path/to/cache/01/%s", + CachedTaraReferenceExtractionJob.CacheRecordType.documentHashToProject.name())), + eq(DocumentHashToProject.SCHEMA$)); + + Dataset documentHashToProjectStoredDF = dataFrameCaptor.getValue(); + assertEquals(SchemaConverters.toSqlType(DocumentHashToProject.SCHEMA$).dataType(), documentHashToProjectStoredDF.schema()); + List documentHashToProjectRows = documentHashToProjectStoredDF.collectAsList(); + assertEquals(1, documentHashToProjectRows.size()); + Row documentHashToProjectRow = documentHashToProjectRows.get(0); + assertEquals(documentHashToProject.getHashValue(), documentHashToProjectRow.getAs("hashValue")); + assertEquals(documentHashToProject.getProjectId(), documentHashToProjectRow.getAs("projectId")); + assertEquals(documentHashToProject.getConfidenceLevel(), documentHashToProjectRow.getAs("confidenceLevel"), 1e-3); + assertNull(documentHashToProjectRow.getAs("textsnippet")); + + verify(writer, atLeastOnce()).write(dataFrameCaptor.capture(), + eq(String.format("path/to/cache/01/%s", + CachedTaraReferenceExtractionJob.CacheRecordType.documentHash.name())), + eq(DocumentHash.SCHEMA$)); + + Dataset documentHashStoredDF = dataFrameCaptor.getValue(); + assertEquals(SchemaConverters.toSqlType(DocumentHash.SCHEMA$).dataType(), documentHashStoredDF.schema()); + List documentHashRows = documentHashStoredDF.collectAsList(); + assertEquals(1, documentHashRows.size()); + Row documentHashRow = documentHashRows.get(0); + assertEquals(documentHash.getHashValue(), documentHashRow.getAs("hashValue")); + + verify(cacheManager, atLeastOnce()).writeCacheId(any(Configuration.class), eq("path/to/cache"), eq("01")); + verify(lockManager, atLeastOnce()).release("path/to/cache"); + } + + @Test + public void storeInOutputShouldRunProperly() { + // given + DocumentToProject documentToProject = DocumentToProject.newBuilder() + .setDocumentId("docId-1") + .setProjectId("projId-1") + .setConfidenceLevel(1.0f) + .build(); + List documentToProjectList = Collections.singletonList(documentToProject); + Dataset documentToProjectDF = new AvroDataFrameSupport(spark).createDataFrame( + documentToProjectList, + DocumentToProject.SCHEMA$); + AvroDataStoreWriter writer = mock(AvroDataStoreWriter.class); + + // when + storeInOutput(documentToProjectDF, "path/to/output", writer); + + // then + ArgumentCaptor> dataFrameCaptor = new ArgumentCaptor<>(); + verify(writer, atLeastOnce()).write(dataFrameCaptor.capture(), + eq("path/to/output"), + eq(DocumentToProject.SCHEMA$)); + Dataset documentToProjectStoredDF = dataFrameCaptor.getValue(); + assertEquals(SchemaConverters.toSqlType(DocumentToProject.SCHEMA$).dataType(), documentToProjectStoredDF.schema()); + List documentToProjectRows = documentToProjectStoredDF.collectAsList(); + assertEquals(1, documentToProjectRows.size()); + Row documentToProjecRow = documentToProjectRows.get(0); + assertEquals(documentToProject.getDocumentId(), documentToProjecRow.getAs("documentId")); + assertEquals(documentToProject.getProjectId(), documentToProjecRow.getAs("projectId")); + assertEquals(documentToProject.getConfidenceLevel(), documentToProjecRow.getAs("confidenceLevel"), 1e-3); + assertNull(documentToProjecRow.getAs("textsnippet")); + } +} \ No newline at end of file diff --git a/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionUtilsTest.java b/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionUtilsTest.java new file mode 100644 index 000000000..4dc1b04a4 --- /dev/null +++ b/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionUtilsTest.java @@ -0,0 +1,354 @@ +package eu.dnetlib.iis.wf.referenceextraction.project.tara; + +import eu.dnetlib.iis.common.spark.avro.AvroDataFrameSupport; +import eu.dnetlib.iis.common.spark.pipe.PipeExecutionEnvironment; +import eu.dnetlib.iis.metadataextraction.schemas.DocumentText; +import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentHash; +import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentHashToProject; +import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject; +import eu.dnetlib.iis.transformers.metadatamerger.schemas.ExtractedDocumentMetadataMergedWithOriginal; +import eu.dnetlib.iis.transformers.metadatamerger.schemas.PublicationType; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkFiles; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.avro.SchemaConverters; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +import static eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionUtils.*; +import static org.apache.spark.sql.functions.lit; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TaraReferenceExtractionUtilsTest { + + private static SparkSession spark; + + @BeforeClass + public static void beforeClass() { + SparkConf conf = new SparkConf(); + conf.setMaster("local"); + conf.set("spark.driver.host", "localhost"); + conf.setAppName(TaraReferenceExtractionUtilsTest.class.getSimpleName()); + spark = SparkSession.builder().config(conf).getOrCreate(); + } + + @AfterClass + public static void afterClass() { + spark.stop(); + } + + @Test + public void buildDocumentMetadataByIdShouldRunProperly() { + // given + AvroDataFrameSupport avroDataFrameSupport = new AvroDataFrameSupport(spark); + Dataset documentTextDF = avroDataFrameSupport.createDataFrame( + Arrays.asList( + createDocumentText("docId-1", "text-1"), + createDocumentText("docId-2", "text-2") + ), + DocumentText.SCHEMA$); + Dataset extractedDocumentMetadataMergedWithOriginalDF = avroDataFrameSupport.createDataFrame( + Arrays.asList( + createExtractedDocumentMetadataMergedWithOriginal("docId-1"), + createExtractedDocumentMetadataMergedWithOriginal("docId-a") + ), + ExtractedDocumentMetadataMergedWithOriginal.SCHEMA$); + + // when + Dataset resultDF = buildDocumentMetadataById(documentTextDF, extractedDocumentMetadataMergedWithOriginalDF); + + // then + assertEquals(CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_BY_ID_SCHEMA, resultDF.schema()); + + List results = resultDF.collectAsList().stream() + .sorted(Comparator.comparing(o -> o.getAs("id"))) + .collect(Collectors.toList()); + assertEquals(2, results.size()); + assertForDocumentMetadataByIdRow(results.get(0), "docId-1", null, null, "text-1"); + assertForDocumentMetadataByIdRow(results.get(1), "docId-2", null, null, "text-2"); + } + + @Test + public void buildDocumentMetadataShouldRunProperly() { + // given + Dataset documentMetadataByIdDF = spark.createDataFrame( + Collections.singletonList( + createDocumentMetadataById("docId-1", "text-1") + ), + CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_SCHEMA); + DocumentMetadataHashColumnCreator hashColumnCreator = mock(DocumentMetadataHashColumnCreator.class); + when(hashColumnCreator.hashCol("title", "abstract", "text")) + .thenReturn(lit("a1")); + + // when + Dataset resultDF = buildDocumentMetadata(documentMetadataByIdDF, hashColumnCreator); + + // then + assertEquals(CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_SCHEMA, resultDF.schema()); + + List results = resultDF.collectAsList(); + assertEquals(1, results.size()); + Row row = results.get(0); + assertEquals("docId-1", row.getAs("id")); + assertEquals("a1", row.getAs("hashValue")); + } + + @Test + public void documentMetadataByHashToBeProcessedShouldFilterOutPreviouslyProcessedDocuments() { + // given + Dataset documentMetadataDF = spark.createDataFrame( + Arrays.asList( + createDocumentMetadata("docId-1", "text-1", "a1"), + createDocumentMetadata("docId-1", "text-1", "a1"), + createDocumentMetadata("docId-2", "text-2", "b2"), + createDocumentMetadata("docId-2", "text-2", "b2") + ), + CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_SCHEMA); + Dataset documentHashFromCacheDF = new AvroDataFrameSupport(spark).createDataFrame( + Arrays.asList( + createDocumentHash("b2"), + createDocumentHash("c3") + ), + DocumentHash.SCHEMA$); + + // when + Dataset resultDF = documentMetadataByHashToBeProcessed(documentMetadataDF, documentHashFromCacheDF); + + // then + assertEquals(CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_BY_HASH_SCHEMA, resultDF.schema()); + + List results = resultDF.collectAsList(); + assertEquals(1, results.size()); + assertForDocumentMetadataByHashRow(results.get(0), "a1", null, null, "text-1"); + } + + @Test + public void shouldRunReferenceExtraction() throws IOException { + // given + Dataset documentMetadataByHashDF = spark.createDataFrame( + Collections.singletonList( + createDocumentMetadataByHash("a1", "text-1") + ), + CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_BY_HASH_SCHEMA); + PipeExecutionEnvironment pipeExecutionEnvironment = () -> { + Path scriptWithInputCheck = createTestScriptWithInputCheck(); + spark.sparkContext().addFile(scriptWithInputCheck.toString()); + return String.format("bash %s/%s", SparkFiles.getRootDirectory(), + scriptWithInputCheck.getFileName().toString()); + }; + + // when + Dataset resultDF = runReferenceExtraction(spark, documentMetadataByHashDF, pipeExecutionEnvironment); + + // then + assertEquals(SchemaConverters.toSqlType(DocumentHashToProject.SCHEMA$).dataType().asNullable(), + resultDF.schema().asNullable()); + + List results = resultDF.collectAsList(); + assertEquals(1, results.size()); + Row row = results.get(0); + assertForDocumentHashToProject(row, "a1", "projId-1", 1.0f); + } + + @Test + public void documentHashToProjectToBeCachedShouldRunProperly() { + // given + AvroDataFrameSupport avroDataFrameSupport = new AvroDataFrameSupport(spark); + Dataset documentHashToProjectDF = avroDataFrameSupport.createDataFrame( + Collections.singletonList( + createDocumentHashToProject("a1", "projId-1", 1f) + ), + DocumentHashToProject.SCHEMA$); + Dataset documentHashToProjectFromCacheDF = new AvroDataFrameSupport(spark).createDataFrame( + Collections.singletonList( + createDocumentHashToProject("b2", "projId-2", 2f) + ), + DocumentHashToProject.SCHEMA$); + + // when + Dataset resultDF = documentHashToProjectToBeCached(spark, + documentHashToProjectDF, + documentHashToProjectFromCacheDF); + + // then + assertEquals(SchemaConverters.toSqlType(DocumentHashToProject.SCHEMA$).dataType(), resultDF.schema()); + + List results = resultDF.collectAsList().stream() + .sorted(Comparator.comparing(o -> o.getAs("hashValue"))) + .collect(Collectors.toList()); + assertEquals(2, results.size()); + + assertForDocumentHashToProject(results.get(0), "a1", "projId-1", 1f); + assertForDocumentHashToProject(results.get(1), "b2", "projId-2", 2f); + } + + @Test + public void documentHashToBeCachedShouldRunProperly() { + // given + Dataset documentHashFromCacheDF = new AvroDataFrameSupport(spark).createDataFrame( + Arrays.asList( + createDocumentHash("b2"), + createDocumentHash("c3") + ), + DocumentHash.SCHEMA$); + Dataset documentMetadataDF = spark.createDataFrame( + Arrays.asList( + createDocumentMetadata("docId-1", "text-1", "a1"), + createDocumentMetadata("docId-2", "text-2", "b2") + ), + CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_SCHEMA); + + // when + Dataset resultDF = documentHashToBeCached(spark, + documentHashFromCacheDF, + documentMetadataDF); + + // then + assertEquals(SchemaConverters.toSqlType(DocumentHash.SCHEMA$).dataType(), resultDF.schema()); + + List results = resultDF.collectAsList().stream() + .sorted(Comparator.comparing(o -> o.getAs("hashValue"))) + .collect(Collectors.toList()); + assertEquals(3, results.size()); + + assertEquals("a1", results.get(0).getAs("hashValue")); + assertEquals("b2", results.get(1).getAs("hashValue")); + assertEquals("c3", results.get(2).getAs("hashValue")); + } + + @Test + public void documentToProjectToOutputShouldRunProperly() { + // given + Dataset documentHashToProjectDF = new AvroDataFrameSupport(spark).createDataFrame( + Collections.singletonList( + createDocumentHashToProject("a1", "projId-1", 1f) + ), + DocumentHashToProject.SCHEMA$); + Dataset documentMetadataDF = spark.createDataFrame( + Arrays.asList( + createDocumentMetadata("docId-1", "text-1", "a1"), + createDocumentMetadata("docId-2", "text-2", "b2") + ), + CachedTaraReferenceExtractionJob.DOCUMENT_METADATA_SCHEMA); + + // when + Dataset resultDF = documentToProjectToOutput(spark, + documentHashToProjectDF, + documentMetadataDF); + + // then + assertEquals(SchemaConverters.toSqlType(DocumentToProject.SCHEMA$).dataType(), resultDF.schema()); + + List results = resultDF.collectAsList().stream() + .sorted(Comparator.comparing(o -> o.getAs("documentId"))) + .collect(Collectors.toList()); + assertEquals(1, results.size()); + + Row row = results.get(0); + assertEquals("docId-1", row.getAs("documentId")); + assertEquals("projId-1", row.getAs("projectId")); + assertEquals(1f, row.getAs("confidenceLevel"), 1e-3); + assertNull(row.getAs("textsnippet")); + } + + private static DocumentText createDocumentText(String id, String text) { + return DocumentText.newBuilder() + .setId(id) + .setText(text) + .build(); + } + + private static ExtractedDocumentMetadataMergedWithOriginal createExtractedDocumentMetadataMergedWithOriginal(String id) { + return ExtractedDocumentMetadataMergedWithOriginal.newBuilder() + .setId(id) + .setPublicationType(PublicationType.newBuilder().build()) + .build(); + } + + private static Row createDocumentMetadataById(String id, + String text) { + return RowFactory.create(id, null, null, text); + } + + private static Row createDocumentMetadataByHash(String hashCode, + String text) { + return RowFactory.create(hashCode, null, null, text); + } + + private static Row createDocumentMetadata(String id, + String text, + String hashValue) { + return RowFactory.create(id, null, null, text, hashValue); + } + + private static DocumentHash createDocumentHash(String hashValue) { + return DocumentHash.newBuilder().setHashValue(hashValue).build(); + } + + private static DocumentHashToProject createDocumentHashToProject(String hashValue, + String projectId, + Float confidenceLevel) { + return DocumentHashToProject.newBuilder() + .setHashValue(hashValue) + .setProjectId(projectId) + .setConfidenceLevel(confidenceLevel) + .build(); + } + + private static Path createTestScriptWithInputCheck() throws IOException { + String content = String.join(System.getProperty("line.separator"), + "#!/bin/bash", + "read in", + "test ${in:0:1} == '{' -a ${in: -1} == '}' && echo '{\"documentId\":\"a1\",\"projectId\":\"projId-1\",\"confidenceLevel\":1,\"textsnippet\":null}'" + ); + return Files.write(Files.createTempFile(null, "sh"), content.getBytes()); + } + + private static void assertForDocumentMetadataByIdRow(Row row, + String id, + String title, + String abstract$, + String text) { + assertEquals(id, row.getAs("id")); + assertEquals(title, row.getAs("title")); + assertEquals(abstract$, row.getAs("abstract")); + assertEquals(text, row.getAs("text")); + } + + private static void assertForDocumentMetadataByHashRow(Row row, + String hashValue, + String title, + String abstract$, + String text) { + assertEquals(hashValue, row.getAs("hashValue")); + assertEquals(title, row.getAs("title")); + assertEquals(abstract$, row.getAs("abstract")); + assertEquals(text, row.getAs("text")); + } + + private static void assertForDocumentHashToProject(Row row, + String hashValue, + String projectId, + Float confidenceLevel) { + assertEquals(hashValue, row.getAs("hashValue")); + assertEquals(projectId, row.getAs("projectId")); + assertEquals(confidenceLevel, row.getAs("confidenceLevel"), 1e-3); + assertNull(row.getAs("textsnippet")); + } +} \ No newline at end of file diff --git a/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/data/document_to_project.json b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/data/document_to_project.json index d4c4cb1b0..8d4f61dc4 100644 --- a/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/data/document_to_project.json +++ b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/data/document_to_project.json @@ -104,13 +104,13 @@ "documentId": "PMC3386204", "projectId": "40|taraexp_____::c81e728d9d4c2f636f067f89cc14862c", "confidenceLevel": 0.68313, - "textsnippet": "null" + "textsnippet": null } { "documentId": "PMC3625178", "projectId": "40|taraexp_____::6413b29c08e6d71a9cf6c4d50d7dc6f6", "confidenceLevel": 0.68313, - "textsnippet": "null" + "textsnippet": null } { "documentId": "WOS:000298601300043", diff --git a/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest/data/tara_cache_output/document_hash.json b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest/data/tara_cache_output/document_hash.json new file mode 100644 index 000000000..d7fa6e728 --- /dev/null +++ b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest/data/tara_cache_output/document_hash.json @@ -0,0 +1,43 @@ +{"hashValue":"bd056318149eadd7369cfb433a2996860b9bcd71"} +{"hashValue":"07ea9793b2f6342fe5f3e6bf4800fdf52e0f2f36"} +{"hashValue":"622182b68bd91d8943d6d5219e2c9f093d57c8b2"} +{"hashValue":"22aef60cdad76c55b71b1cfa52639b81f0b39987"} +{"hashValue":"009b8eb3d27b659eba54d9db886311e61d0530ea"} +{"hashValue":"ca333e73b6aa32e747299156120d697c09267697"} +{"hashValue":"c94b09b65e8d84f91b327ca00cb101bdcadb84c1"} +{"hashValue":"3764dbc1863c149f86ecf07ffefae9f244fd095a"} +{"hashValue":"207365e9fb42f0442fa91d939a0b6759f3cd80a2"} +{"hashValue":"559fc3906607251e82a04989645296d13f641ff9"} +{"hashValue":"f1ad0d7ff1737b144dc9881b6bc5db1be9b8f603"} +{"hashValue":"4ffe1c7d29e06a8abcaaa6212dca2aa38200a470"} +{"hashValue":"1f023b2bb30c5a8b4931f7d44623b6e0f29981e5"} +{"hashValue":"5aa188eb1972b8773b59df8047a57503118459fd"} +{"hashValue":"47ae144819d7cfa2c1a71285a7024a5749a72024"} +{"hashValue":"f77b0073de9bf1d4bb125d12d7164eb457d4a923"} +{"hashValue":"3d2fbe6fb174f6afa2537e9ce4ccdf1be121aaf9"} +{"hashValue":"678c87f43c8e6cacf950abe5be18abb1fbd0f1ae"} +{"hashValue":"fb522a0f623a21d88ac233aeab1e059afe12eae6"} +{"hashValue":"4bd3264244ba34e0e87efa9e795d7f4b2c6a9aa9"} +{"hashValue":"b13d2ff5f2e242fa124ea519f4a8e276a8ec100f"} +{"hashValue":"aa3ea206dea5a38488203bcddd0dae7d2b6fe243"} +{"hashValue":"1ef4001a3991149a005630033173e2a37cfc2f3b"} +{"hashValue":"9a750b4497e7039e26496151de484ab65af5fad6"} +{"hashValue":"5fc963afa0a40eb104b0df6b1496be391e8223ae"} +{"hashValue":"c0b5de49a709299e95acbe6588bf79a7ec036ed9"} +{"hashValue":"b297f7ce686e955114db74c957a635d39823230a"} +{"hashValue":"1290566257d9b0114986a9af40d67726da986e8e"} +{"hashValue":"2090187637da95b119e8ea7c60b5831736545bc8"} +{"hashValue":"dff153f67c72e345e94720bc4d45df107977d63b"} +{"hashValue":"f08602af3a14c7ef13275a4a14e2ef421c7f9fc3"} +{"hashValue":"bb898244c60a9f4669dca97f646cb98f80f3166a"} +{"hashValue":"f7b2143fe7628de62311ad66b10651602b933e72"} +{"hashValue":"78d526f902d6519901b54ac7ea1a8050ff8c9a22"} +{"hashValue":"e8e6fd1273eb639e7cf6c7f7a62dff442ba8ed1f"} +{"hashValue":"ad0df1750e62f3a62fe657412fd5e76724c9104a"} +{"hashValue":"a6e4142c14ebccc5b787aced63573002d2ef103e"} +{"hashValue":"6017e71926276898a330eef4703621fc1a044297"} +{"hashValue":"585f6941a1f4422d3a6c38688e927a1eec7e0e9d"} +{"hashValue":"e7f226ca4b0b176ab42c4fad5d9b6dad04bd6df8"} +{"hashValue":"bcfef12b15ca910445e76a585b94e387140435e9"} +{"hashValue":"b40d602bc41c0fde8dc7faf531649f5f40a163a7"} +{"hashValue":"f642e2866a150312c7ba6b57a3c3824125c357c9"} diff --git a/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest/data/tara_cache_output/document_hash_to_project.json b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest/data/tara_cache_output/document_hash_to_project.json new file mode 100644 index 000000000..1cfbab191 --- /dev/null +++ b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest/data/tara_cache_output/document_hash_to_project.json @@ -0,0 +1,2 @@ +{"hashValue":"f77b0073de9bf1d4bb125d12d7164eb457d4a923","projectId":"40|taraexp_____::c81e728d9d4c2f636f067f89cc14862c","confidenceLevel":0.68313,"textsnippet":null} +{"hashValue":"1290566257d9b0114986a9af40d67726da986e8e","projectId":"40|taraexp_____::6413b29c08e6d71a9cf6c4d50d7dc6f6","confidenceLevel":0.68313,"textsnippet":null} diff --git a/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest/oozie_app/workflow.xml b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest/oozie_app/workflow.xml index 0fbeba78b..56763abdc 100644 --- a/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest/oozie_app/workflow.xml @@ -1,6 +1,4 @@ - @@ -17,23 +15,18 @@ - - + - - eu.dnetlib.iis.common.java.ProcessWrapper - eu.dnetlib.iis.common.java.jsonworkflownodes.Producer - - -C{document_text, + -C{document_text, eu.dnetlib.iis.metadataextraction.schemas.DocumentText, eu/dnetlib/iis/wf/referenceextraction/project/data/document_text.json} -C{project, @@ -42,7 +35,6 @@ -C{document_metadata, eu.dnetlib.iis.transformers.metadatamerger.schemas.ExtractedDocumentMetadataMergedWithOriginal, eu/dnetlib/iis/wf/referenceextraction/project/data/document_metadata.json} - -Odocument_text=${workingDir}/producer/document_text -Oproject=${workingDir}/producer/project -Odocument_metadata=${workingDir}/producer/document_metadata @@ -56,7 +48,6 @@ ${wf:appPath()}/referenceextraction_project - workingDir ${workingDir}/referenceextraction_project/working_dir @@ -73,6 +64,14 @@ input_document_metadata ${workingDir}/producer/document_metadata + + tara_lock_manager_factory_class_name + eu.dnetlib.iis.wf.referenceextraction.LockManagerFactoryMock + + + tara_cache_root_dir + ${workingDir}/cache/tara + output_document_to_project ${workingDir}/referenceextraction_project/document_to_project @@ -87,30 +86,39 @@ - - - - eu.dnetlib.iis.common.java.ProcessWrapper - - eu.dnetlib.iis.common.java.jsonworkflownodes.TestingConsumer - - -C{document_to_project, - eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject, - eu/dnetlib/iis/wf/referenceextraction/project/data/document_to_project.json} - -C{report_funder,eu.dnetlib.iis.common.schemas.ReportEntry, - eu/dnetlib/iis/wf/referenceextraction/project/data/report_funder.json} - - -Idocument_to_project=${workingDir}/referenceextraction_project/document_to_project + + + eu.dnetlib.iis.common.java.ProcessWrapper + eu.dnetlib.iis.common.java.jsonworkflownodes.TestingConsumer + -C{document_to_project, + eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject, + eu/dnetlib/iis/wf/referenceextraction/project/data/document_to_project.json} + + -C{report_funder, + eu.dnetlib.iis.common.schemas.ReportEntry, + eu/dnetlib/iis/wf/referenceextraction/project/data/report_funder.json} + + -C{tara_document_hash_to_project, + eu.dnetlib.iis.referenceextraction.project.schemas.DocumentHashToProject, + eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest/data/tara_cache_output/document_hash_to_project.json} + + -C{tara_document_hash, + eu.dnetlib.iis.referenceextraction.project.schemas.DocumentHash, + eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest/data/tara_cache_output/document_hash.json} + + -Idocument_to_project=${workingDir}/referenceextraction_project/document_to_project -Ireport_funder=${workingDir}/report/document_to_project_by_funder - - - - + -Itara_document_hash_to_project=${workingDir}/cache/tara/000001/documentHashToProject + -Itara_document_hash=${workingDir}/cache/tara/000001/documentHash + + + + Unfortunately, the process failed -- error message: - [${wf:errorMessage(wf:lastErrorNode())}] - + [${wf:errorMessage(wf:lastErrorNode())}] + diff --git a/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest_empty_input/oozie_app/workflow.xml b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest_empty_input/oozie_app/workflow.xml index 6113646e8..9a304ef63 100644 --- a/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest_empty_input/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest_empty_input/oozie_app/workflow.xml @@ -1,6 +1,4 @@ - @@ -17,23 +15,18 @@ - - + - - eu.dnetlib.iis.common.java.ProcessWrapper - eu.dnetlib.iis.common.java.jsonworkflownodes.Producer - - -C{document_text, + -C{document_text, eu.dnetlib.iis.metadataextraction.schemas.DocumentText, eu/dnetlib/iis/wf/referenceextraction/project/data/empty.json} -C{project, @@ -42,7 +35,6 @@ -C{document_metadata, eu.dnetlib.iis.transformers.metadatamerger.schemas.ExtractedDocumentMetadataMergedWithOriginal, eu/dnetlib/iis/wf/referenceextraction/project/data/empty.json} - -Odocument_text=${workingDir}/producer/document_text -Oproject=${workingDir}/producer/project -Odocument_metadata=${workingDir}/producer/document_metadata @@ -56,7 +48,6 @@ ${wf:appPath()}/referenceextraction_project - workingDir ${workingDir}/referenceextraction_project/working_dir @@ -73,6 +64,14 @@ input_document_metadata ${workingDir}/producer/document_metadata + + tara_lock_manager_factory_class_name + eu.dnetlib.iis.wf.referenceextraction.LockManagerFactoryMock + + + tara_cache_root_dir + ${workingDir}/cache/tara + output_document_to_project ${workingDir}/referenceextraction_project/document_to_project @@ -89,28 +88,37 @@ - eu.dnetlib.iis.common.java.ProcessWrapper - eu.dnetlib.iis.common.java.jsonworkflownodes.TestingConsumer - -C{document_to_project, eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject, - eu/dnetlib/iis/wf/referenceextraction/project/data/empty.json} - -C{report_funder,eu.dnetlib.iis.common.schemas.ReportEntry, - eu/dnetlib/iis/wf/referenceextraction/project/data/report_empty.json} - - -Idocument_to_project=${workingDir}/referenceextraction_project/document_to_project + eu/dnetlib/iis/wf/referenceextraction/project/data/empty.json} + + -C{report_funder, + eu.dnetlib.iis.common.schemas.ReportEntry, + eu/dnetlib/iis/wf/referenceextraction/project/data/report_empty.json} + + -C{tara_document_hash_to_project, + eu.dnetlib.iis.referenceextraction.project.schemas.DocumentHashToProject, + eu/dnetlib/iis/wf/referenceextraction/project/data/empty.json} + + -C{tara_document_hash, + eu.dnetlib.iis.referenceextraction.project.schemas.DocumentHash, + eu/dnetlib/iis/wf/referenceextraction/project/data/empty.json} + + -Idocument_to_project=${workingDir}/referenceextraction_project/document_to_project -Ireport_funder=${workingDir}/report/document_to_project_by_funder - + -Itara_document_hash_to_project=${workingDir}/cache/tara/000001/documentHashToProject + -Itara_document_hash=${workingDir}/cache/tara/000001/documentHash + Unfortunately, the process failed -- error message: - [${wf:errorMessage(wf:lastErrorNode())}] - + [${wf:errorMessage(wf:lastErrorNode())}] + diff --git a/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest_without_projects/data/tara_cache_output/document_hash.json b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest_without_projects/data/tara_cache_output/document_hash.json new file mode 100644 index 000000000..d7fa6e728 --- /dev/null +++ b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest_without_projects/data/tara_cache_output/document_hash.json @@ -0,0 +1,43 @@ +{"hashValue":"bd056318149eadd7369cfb433a2996860b9bcd71"} +{"hashValue":"07ea9793b2f6342fe5f3e6bf4800fdf52e0f2f36"} +{"hashValue":"622182b68bd91d8943d6d5219e2c9f093d57c8b2"} +{"hashValue":"22aef60cdad76c55b71b1cfa52639b81f0b39987"} +{"hashValue":"009b8eb3d27b659eba54d9db886311e61d0530ea"} +{"hashValue":"ca333e73b6aa32e747299156120d697c09267697"} +{"hashValue":"c94b09b65e8d84f91b327ca00cb101bdcadb84c1"} +{"hashValue":"3764dbc1863c149f86ecf07ffefae9f244fd095a"} +{"hashValue":"207365e9fb42f0442fa91d939a0b6759f3cd80a2"} +{"hashValue":"559fc3906607251e82a04989645296d13f641ff9"} +{"hashValue":"f1ad0d7ff1737b144dc9881b6bc5db1be9b8f603"} +{"hashValue":"4ffe1c7d29e06a8abcaaa6212dca2aa38200a470"} +{"hashValue":"1f023b2bb30c5a8b4931f7d44623b6e0f29981e5"} +{"hashValue":"5aa188eb1972b8773b59df8047a57503118459fd"} +{"hashValue":"47ae144819d7cfa2c1a71285a7024a5749a72024"} +{"hashValue":"f77b0073de9bf1d4bb125d12d7164eb457d4a923"} +{"hashValue":"3d2fbe6fb174f6afa2537e9ce4ccdf1be121aaf9"} +{"hashValue":"678c87f43c8e6cacf950abe5be18abb1fbd0f1ae"} +{"hashValue":"fb522a0f623a21d88ac233aeab1e059afe12eae6"} +{"hashValue":"4bd3264244ba34e0e87efa9e795d7f4b2c6a9aa9"} +{"hashValue":"b13d2ff5f2e242fa124ea519f4a8e276a8ec100f"} +{"hashValue":"aa3ea206dea5a38488203bcddd0dae7d2b6fe243"} +{"hashValue":"1ef4001a3991149a005630033173e2a37cfc2f3b"} +{"hashValue":"9a750b4497e7039e26496151de484ab65af5fad6"} +{"hashValue":"5fc963afa0a40eb104b0df6b1496be391e8223ae"} +{"hashValue":"c0b5de49a709299e95acbe6588bf79a7ec036ed9"} +{"hashValue":"b297f7ce686e955114db74c957a635d39823230a"} +{"hashValue":"1290566257d9b0114986a9af40d67726da986e8e"} +{"hashValue":"2090187637da95b119e8ea7c60b5831736545bc8"} +{"hashValue":"dff153f67c72e345e94720bc4d45df107977d63b"} +{"hashValue":"f08602af3a14c7ef13275a4a14e2ef421c7f9fc3"} +{"hashValue":"bb898244c60a9f4669dca97f646cb98f80f3166a"} +{"hashValue":"f7b2143fe7628de62311ad66b10651602b933e72"} +{"hashValue":"78d526f902d6519901b54ac7ea1a8050ff8c9a22"} +{"hashValue":"e8e6fd1273eb639e7cf6c7f7a62dff442ba8ed1f"} +{"hashValue":"ad0df1750e62f3a62fe657412fd5e76724c9104a"} +{"hashValue":"a6e4142c14ebccc5b787aced63573002d2ef103e"} +{"hashValue":"6017e71926276898a330eef4703621fc1a044297"} +{"hashValue":"585f6941a1f4422d3a6c38688e927a1eec7e0e9d"} +{"hashValue":"e7f226ca4b0b176ab42c4fad5d9b6dad04bd6df8"} +{"hashValue":"bcfef12b15ca910445e76a585b94e387140435e9"} +{"hashValue":"b40d602bc41c0fde8dc7faf531649f5f40a163a7"} +{"hashValue":"f642e2866a150312c7ba6b57a3c3824125c357c9"} diff --git a/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest_without_projects/oozie_app/workflow.xml b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest_without_projects/oozie_app/workflow.xml index a11d01036..1c55a3745 100644 --- a/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest_without_projects/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest_without_projects/oozie_app/workflow.xml @@ -1,6 +1,4 @@ - @@ -17,23 +15,18 @@ - - + - - eu.dnetlib.iis.common.java.ProcessWrapper - eu.dnetlib.iis.common.java.jsonworkflownodes.Producer - - -C{document_text, + -C{document_text, eu.dnetlib.iis.metadataextraction.schemas.DocumentText, eu/dnetlib/iis/wf/referenceextraction/project/data/document_text.json} -C{project, @@ -41,8 +34,7 @@ eu/dnetlib/iis/wf/referenceextraction/project/data/empty.json} -C{document_metadata, eu.dnetlib.iis.transformers.metadatamerger.schemas.ExtractedDocumentMetadataMergedWithOriginal, - eu/dnetlib/iis/wf/referenceextraction/project/data/empty.json} - + eu/dnetlib/iis/wf/referenceextraction/project/data/document_metadata.json} -Odocument_text=${workingDir}/producer/document_text -Oproject=${workingDir}/producer/project -Odocument_metadata=${workingDir}/producer/document_metadata @@ -56,7 +48,6 @@ ${wf:appPath()}/referenceextraction_project - workingDir ${workingDir}/referenceextraction_project/working_dir @@ -73,6 +64,14 @@ input_document_metadata ${workingDir}/producer/document_metadata + + tara_lock_manager_factory_class_name + eu.dnetlib.iis.wf.referenceextraction.LockManagerFactoryMock + + + tara_cache_root_dir + ${workingDir}/cache/tara + output_document_to_project ${workingDir}/referenceextraction_project/document_to_project @@ -89,28 +88,37 @@ - eu.dnetlib.iis.common.java.ProcessWrapper - eu.dnetlib.iis.common.java.jsonworkflownodes.TestingConsumer - -C{document_to_project, eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject, - eu/dnetlib/iis/wf/referenceextraction/project/data/empty.json} - -C{report_funder,eu.dnetlib.iis.common.schemas.ReportEntry, - eu/dnetlib/iis/wf/referenceextraction/project/data/report_empty.json} - - -Idocument_to_project=${workingDir}/referenceextraction_project/document_to_project + eu/dnetlib/iis/wf/referenceextraction/project/data/empty.json} + + -C{report_funder, + eu.dnetlib.iis.common.schemas.ReportEntry, + eu/dnetlib/iis/wf/referenceextraction/project/data/report_empty.json} + + -C{tara_document_hash_to_project, + eu.dnetlib.iis.referenceextraction.project.schemas.DocumentHashToProject, + eu/dnetlib/iis/wf/referenceextraction/project/data/empty.json} + + -C{tara_document_hash, + eu.dnetlib.iis.referenceextraction.project.schemas.DocumentHash, + eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest_without_projects/data/tara_cache_output/document_hash.json} + + -Idocument_to_project=${workingDir}/referenceextraction_project/document_to_project -Ireport_funder=${workingDir}/report/document_to_project_by_funder - + -Itara_document_hash_to_project=${workingDir}/cache/tara/000001/documentHashToProject + -Itara_document_hash=${workingDir}/cache/tara/000001/documentHash + Unfortunately, the process failed -- error message: - [${wf:errorMessage(wf:lastErrorNode())}] - + [${wf:errorMessage(wf:lastErrorNode())}] + diff --git a/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest_without_references/data/tara_cache_output/document_hash.json b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest_without_references/data/tara_cache_output/document_hash.json new file mode 100644 index 000000000..264d7515b --- /dev/null +++ b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest_without_references/data/tara_cache_output/document_hash.json @@ -0,0 +1,2 @@ +{"hashValue":"fff1bcd932f3d647cd8739c9e0c638a64690e707"} +{"hashValue":"7a54470fa03545654a54bc66e1910bc54e49199d"} \ No newline at end of file diff --git a/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest_without_references/oozie_app/workflow.xml b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest_without_references/oozie_app/workflow.xml index 1e2ce6c69..96d8a3c1c 100644 --- a/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest_without_references/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest_without_references/oozie_app/workflow.xml @@ -1,6 +1,4 @@ - @@ -17,23 +15,18 @@ - - + - - eu.dnetlib.iis.common.java.ProcessWrapper - eu.dnetlib.iis.common.java.jsonworkflownodes.Producer - - -C{document_text, + -C{document_text, eu.dnetlib.iis.metadataextraction.schemas.DocumentText, eu/dnetlib/iis/wf/referenceextraction/project/data/document_text_without_refs.json} -C{project, @@ -42,7 +35,6 @@ -C{document_metadata, eu.dnetlib.iis.transformers.metadatamerger.schemas.ExtractedDocumentMetadataMergedWithOriginal, eu/dnetlib/iis/wf/referenceextraction/project/data/empty.json} - -Odocument_text=${workingDir}/producer/document_text -Oproject=${workingDir}/producer/project -Odocument_metadata=${workingDir}/producer/document_metadata @@ -56,7 +48,6 @@ ${wf:appPath()}/referenceextraction_project - workingDir ${workingDir}/referenceextraction_project/working_dir @@ -73,6 +64,14 @@ input_document_metadata ${workingDir}/producer/document_metadata + + tara_lock_manager_factory_class_name + eu.dnetlib.iis.wf.referenceextraction.LockManagerFactoryMock + + + tara_cache_root_dir + ${workingDir}/cache/tara + output_document_to_project ${workingDir}/referenceextraction_project/document_to_project @@ -89,28 +88,37 @@ - eu.dnetlib.iis.common.java.ProcessWrapper - eu.dnetlib.iis.common.java.jsonworkflownodes.TestingConsumer - -C{document_to_project, eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject, - eu/dnetlib/iis/wf/referenceextraction/project/data/empty.json} - -C{report_funder,eu.dnetlib.iis.common.schemas.ReportEntry, - eu/dnetlib/iis/wf/referenceextraction/project/data/report_empty.json} - - -Idocument_to_project=${workingDir}/referenceextraction_project/document_to_project + eu/dnetlib/iis/wf/referenceextraction/project/data/empty.json} + + -C{report_funder, + eu.dnetlib.iis.common.schemas.ReportEntry, + eu/dnetlib/iis/wf/referenceextraction/project/data/report_empty.json} + + -C{tara_document_hash_to_project, + eu.dnetlib.iis.referenceextraction.project.schemas.DocumentHashToProject, + eu/dnetlib/iis/wf/referenceextraction/project/data/empty.json} + + -C{tara_document_hash, + eu.dnetlib.iis.referenceextraction.project.schemas.DocumentHash, + eu/dnetlib/iis/wf/referenceextraction/project/main/sampletest_without_references/data/tara_cache_output/document_hash.json} + + -Idocument_to_project=${workingDir}/referenceextraction_project/document_to_project -Ireport_funder=${workingDir}/report/document_to_project_by_funder - + -Itara_document_hash_to_project=${workingDir}/cache/tara/000001/documentHashToProject + -Itara_document_hash=${workingDir}/cache/tara/000001/documentHash + Unfortunately, the process failed -- error message: - [${wf:errorMessage(wf:lastErrorNode())}] - + [${wf:errorMessage(wf:lastErrorNode())}] + diff --git a/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/softwareurl/main/sampletest/oozie_app/workflow.xml b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/softwareurl/main/sampletest/oozie_app/workflow.xml index 45931d596..2d27afbae 100644 --- a/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/softwareurl/main/sampletest/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/softwareurl/main/sampletest/oozie_app/workflow.xml @@ -77,7 +77,7 @@ webcrawlLockManagerFactoryClassName - eu.dnetlib.iis.wf.referenceextraction.softwareurl.LockManagerFactoryMock + eu.dnetlib.iis.wf.referenceextraction.LockManagerFactoryMock diff --git a/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/softwareurl/main/sampletest_empty_input/oozie_app/workflow.xml b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/softwareurl/main/sampletest_empty_input/oozie_app/workflow.xml index 7aaebd35c..378889608 100644 --- a/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/softwareurl/main/sampletest_empty_input/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/softwareurl/main/sampletest_empty_input/oozie_app/workflow.xml @@ -77,7 +77,7 @@ webcrawlLockManagerFactoryClassName - eu.dnetlib.iis.wf.referenceextraction.softwareurl.LockManagerFactoryMock + eu.dnetlib.iis.wf.referenceextraction.LockManagerFactoryMock diff --git a/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/softwareurl/main/sampletest_without_references/oozie_app/workflow.xml b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/softwareurl/main/sampletest_without_references/oozie_app/workflow.xml index cd2a55fb2..bb75d2229 100644 --- a/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/softwareurl/main/sampletest_without_references/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-referenceextraction/src/test/resources/eu/dnetlib/iis/wf/referenceextraction/softwareurl/main/sampletest_without_references/oozie_app/workflow.xml @@ -77,7 +77,7 @@ webcrawlLockManagerFactoryClassName - eu.dnetlib.iis.wf.referenceextraction.softwareurl.LockManagerFactoryMock + eu.dnetlib.iis.wf.referenceextraction.LockManagerFactoryMock diff --git a/pom.xml b/pom.xml index 05b4c6e97..17f81ff78 100644 --- a/pom.xml +++ b/pom.xml @@ -306,6 +306,13 @@ provided + + org.apache.spark + spark-avro_2.11 + ${iis.spark.version} + provided + + pl.edu.icm.spark-utils spark-utils_2.11