Skip to content

Commit

Permalink
eclipse-rdf4jGH-5058: added parser code (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
barthanssens committed Jul 8, 2024
1 parent 67e1965 commit ce43022
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class CSVW {
public static final IRI BASE;

/** csvw:columns */
public static final IRI COLUMNS;
public static final IRI COLUMN;

/** csvw:datatype */
public static final IRI DATATYPE;
Expand All @@ -60,17 +60,23 @@ public class CSVW {
/** csvw:lang */
public static final IRI LANG;

/** csvw:name */
public static final IRI NAME;

/** csvw:propertyUrl */
public static final IRI PROPERTY_URL;

/** csvw:required */
public static final IRI REQUIRED;

/** csvw:tableSchema */
public static final IRI TABLE_SCHEMA;

/** csvw:tables */
public static final IRI TABLES;

/** csvw:titles */
public static final IRI TITLES;
/** csvw:title */
public static final IRI TITLE;

/** csvw:url */
public static final IRI URL;
Expand All @@ -80,16 +86,18 @@ public class CSVW {

static {
BASE = Vocabularies.createIRI(NAMESPACE, "base");
COLUMNS = Vocabularies.createIRI(NAMESPACE, "columns");
COLUMN = Vocabularies.createIRI(NAMESPACE, "column");
DATATYPE = Vocabularies.createIRI(NAMESPACE, "datatype");
DEFAULT = Vocabularies.createIRI(NAMESPACE, "default");
DIALECT = Vocabularies.createIRI(NAMESPACE, "dialect");
HEADER = Vocabularies.createIRI(NAMESPACE, "header");
LANG = Vocabularies.createIRI(NAMESPACE, "lang");
NAME = Vocabularies.createIRI(NAMESPACE, "name");
PROPERTY_URL = Vocabularies.createIRI(NAMESPACE, "propertyUrl");
REQUIRED = Vocabularies.createIRI(NAMESPACE, "required");
TABLE_SCHEMA = Vocabularies.createIRI(NAMESPACE, "tableSchema");
TABLES = Vocabularies.createIRI(NAMESPACE, "tables");
TITLES = Vocabularies.createIRI(NAMESPACE, "titles");
TITLE = Vocabularies.createIRI(NAMESPACE, "title");
URL = Vocabularies.createIRI(NAMESPACE, "url");
VALUE_URL = Vocabularies.createIRI(NAMESPACE, "valueUrl");
}
Expand Down
171 changes: 154 additions & 17 deletions core/rio/csvw/src/main/java/org/eclipse/rdf4j/rio/csvw/CSVWParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,35 @@
*******************************************************************************/
package org.eclipse.rdf4j.rio.csvw;

import com.opencsv.CSVParser;
import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import com.opencsv.exceptions.CsvValidationException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Model;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.base.CoreDatatype.XSD;
import org.eclipse.rdf4j.model.util.Models;
import org.eclipse.rdf4j.model.util.RDFCollections;
import org.eclipse.rdf4j.model.vocabulary.CSVW;
import org.eclipse.rdf4j.rio.ParserConfig;
Expand Down Expand Up @@ -54,12 +71,24 @@ public synchronized void parse(InputStream in, String baseURI)
clear();

Model metadata = parseMetadata(in, null, baseURI);

Iterable<Statement> tables = metadata.getStatements(null, CSVW.TABLE_SCHEMA, null);
for (Statement table : tables) {
getCellParsers(metadata, table.getObject());
if (metadata == null || metadata.isEmpty()) {
throw new RDFParseException("No metadata found");
}

List<Value> tables = getTables(metadata);
for (Value table : tables) {
URI csvFile = getURL(metadata, (Resource) table, baseURI);
if (csvFile == null) {
throw new RDFParseException("Could not find URL");
}
Resource tableSchema = getTableSchema(metadata, (Resource) table);
List<Value> columns = getColumns(metadata, tableSchema);
Parser[] cellParsers = columns.stream()
.map(c -> getCellParser(metadata, (Resource) c))
.collect(Collectors.toList())
.toArray(new Parser[columns.size()]);
parseCSV(csvFile, cellParsers);
}
clear();
}

Expand Down Expand Up @@ -94,32 +123,140 @@ private Model parseMetadata(InputStream in, Reader reader, String baseURI) throw
return metadata;
}

/**
* Get the subject of the table(s)
*
* @param metadata
* @return
*/
private List<Value> getTables(Model metadata) throws RDFParseException {
Iterator<Statement> it = metadata.getStatements(null, CSVW.TABLES, null).iterator();
if (!it.hasNext()) {
// only one table, simplified structure
it = metadata.getStatements(null, CSVW.TABLE_SCHEMA, null).iterator();
if (!it.hasNext()) {
throw new RDFParseException("Metadata file has no tables and no tableSschema");
}
return List.of(it.next().getSubject());
}
return RDFCollections.asValues(metadata, (Resource) it.next().getObject(), new ArrayList<>());
}

/**
* Get URL of the CSV file
*
* @param metadata
* @param subject
* @param baseURI
*/
private URI getURL(Model metadata, Resource subject, String baseURI) {
Optional<String> val = Models.getPropertyString(metadata, subject, CSVW.URL);
if (val.isPresent()) {
String s = val.get();
if (s.startsWith("http")) {
return URI.create(s);
}
return URI.create(baseURI).resolve(s);
}
return null;
}

/**
* Get tableschema for a given table
*
* @param metadata
* @param subject
* @return
* @throws RDFParseException
*/
private Resource getTableSchema(Model metadata, Resource subject) throws RDFParseException {
return Models.getPropertyResource(metadata, subject, CSVW.TABLE_SCHEMA)
.orElseThrow(() -> new RDFParseException("Metadata file does not contain tableSchema for " + subject));
}

/**
* Get columns for a given tableschema
*
* @param metadata
* @param subject
* @return
* @throws RDFParseException
*/
private List<Value> getColumns(Model metadata, Resource subject) throws RDFParseException {
Optional<Resource> head = Models.getPropertyResource(metadata, subject, CSVW.COLUMN);
if (!head.isPresent()) {
throw new RDFParseException("Metadata file does not contain columns for " + subject);
}
return RDFCollections.asValues(metadata, head.get(), new ArrayList<>());
}

/**
*
* @param metadata
* @param table
* @return
*/
private List<Parser> getCellParsers(Model metadata, Value table) {
List<Parser> parsers = new ArrayList<>();
private Parser getCellParser(Model metadata, Resource subject) {
Parser parser = new Parser();

Iterable<Statement> columns = metadata.getStatements((Resource) table, CSVW.COLUMNS, null);
Statement s = columns.iterator().next();
Optional<Value> name = Models.getProperty(metadata, subject, CSVW.NAME);
if (!name.isPresent()) {
throw new RDFParseException("Metadata file does not contain name for column " + subject);
}
parser.setName(name.get().stringValue());

// the columns must be retrieved in the exact same order as they appear in the JSON metadata file,
// especially when the CSV does not have a header row
if (s != null) {
List<Value> cols = RDFCollections.asValues(metadata, (Resource) s.getObject(), new ArrayList());
for (Value col : cols) {
Parser p = new Parser();
p.setDataType(getDataType(metadata, col));
Optional<Value> defaultVal = Models.getProperty(metadata, subject, CSVW.DEFAULT);
if (defaultVal.isPresent()) {
parser.setDefaultValue(defaultVal.get().stringValue());
}

}
Optional<Value> dataType = Models.getProperty(metadata, subject, CSVW.DATATYPE);
parser.setDataType((IRI) dataType.orElse(XSD.STRING.getIri()));

Optional<Value> propertyURL = Models.getProperty(metadata, subject, CSVW.PROPERTY_URL);
if (propertyURL.isPresent()) {
parser.setPropertyURL(propertyURL.get().toString());
}

Optional<Value> valueURL = Models.getProperty(metadata, subject, CSVW.VALUE_URL);
if (valueURL.isPresent()) {
parser.setValueURL(valueURL.get().toString());
}
return parsers;
return parser;
}

private IRI getDataType(Model metadata, Value col) {
return XSD.STRING.getIri();
}

/**
* Parse a CSV file
*
* @param csvFile URI of CSV file
* @param cellParsers cell parsers
*/
private void parseCSV(URI csvFile, Parser[] cellParsers) {
CSVParser parser = new CSVParserBuilder().build();

try(InputStream is = csvFile.toURL().openStream();
BufferedReader buf = new BufferedReader(new InputStreamReader(is));
CSVReader csv = new CSVReaderBuilder(buf).withSkipLines(1).withCSVParser(parser).build()) {

String[] cells;
while ((cells = csv.readNext()) != null) {

/* would it make much difference if processed in parallel ?
final String[] c = cells;
IntStream.range(0, cells.length)
.parallel()
.forEach(i -> cellParsers[i].parse(c[i]));
*/
for(int i = 0; i < cells.length; i++) {
cellParsers[i].parse(cells[i]);
}
}
} catch (IOException| CsvValidationException ex) {
throw new RDFParseException("Error parsing " + csvFile, ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* @author Bart.Hanssens
*/
public class Parser {
private String name;
private IRI dataType;
private String defaultValue;
private boolean isRequired;
Expand All @@ -29,6 +30,13 @@ public class Parser {
private String valueUrl;
private String separator;

/**
* @param name
*/
public void setName(String name) {
this.name = name;
}

/**
* @param dataType
*/
Expand Down Expand Up @@ -60,28 +68,28 @@ public void setFormat(String format) {
/**
* @return the propertyUrl
*/
public String getPropertyUrl() {
public String getPropertyURL() {
return propertyUrl;
}

/**
* @param propertyUrl the propertyUrl to set
*/
public void setPropertyUrl(String propertyUrl) {
public void setPropertyURL(String propertyUrl) {
this.propertyUrl = propertyUrl;
}

/**
* @return the valueUrl
*/
public String getValueUrl() {
public String getValueURL() {
return valueUrl;
}

/**
* @param valueUrl the valueUrl to set
*/
public void setValueUrl(String valueUrl) {
public void setValueURL(String valueUrl) {
this.valueUrl = valueUrl;
}

Expand All @@ -99,11 +107,22 @@ public void setSeparator(String separator) {
this.separator = separator;
}

/**
* Get the value from a cell
*
* @param cell
* @return
*/
public Value parse(String cell) {
if (cell == null || cell.isEmpty()) {
return Values.literal(defaultValue);
String s = cell;
if ((s == null || s.isEmpty()) && (defaultValue != null)) {
s = defaultValue;
}
return Values.literal(cell);
if (valueUrl != null && s != null) {
return Values.iri(valueUrl.replace("{" + name + "}", s));
}

return Values.literal(s, dataType);
}

}

0 comments on commit ce43022

Please sign in to comment.