Skip to content

Commit

Permalink
#239 Close Input streams for sliced triplifiers
Browse files Browse the repository at this point in the history
  • Loading branch information
luigi-asprino committed Aug 18, 2024
1 parent f192921 commit 6892c0c
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public Set<String> getExtensions() {
}

@Override
public Iterable<Slice> slice(Properties properties) throws IOException, TriplifierHTTPException {
public CloseableIterable<Slice> slice(Properties properties) throws IOException, TriplifierHTTPException {

CSVFormat format = buildFormat(properties);
Charset charset = Triplifier.getCharsetArgument(properties);
Expand All @@ -239,7 +239,13 @@ public Iterable<Slice> slice(Properties properties) throws IOException, Triplifi
final Iterator<CSVRecord> recordIterator = records.iterator();
final LinkedHashMap<Integer, String> headers_map = makeHeadersMapFromOpenIterator(recordIterator, properties, format, charset);

return new Iterable<Slice>() {
return new CloseableIterable<Slice>() {

@Override
public void close() throws IOException {
// The InputStream is closed by hasNext method of the iterator
}

@Override
public Iterator<Slice> iterator() {
log.debug("Iterating slices");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ public class QueryIterSlicer extends QueryIter {
private QueryIterator current = null;
private final Properties p;

private final CloseableIterable<Slice> it;

public QueryIterSlicer(ExecutionContext execCxt, QueryIterator input, Triplifier t, Properties properties, Op op) throws TriplifierHTTPException, IOException {
super(execCxt);
slicer = (Slicer) t;
this.slicer = (Slicer) t;
this.p = properties;
final Iterable<Slice> it = slicer.slice(p);
this.it = slicer.slice(p);
this.input = input;

elements = new ArrayList<>();
Expand Down Expand Up @@ -102,7 +104,7 @@ protected boolean hasNextBinding() {
FacadeXExecutionContext ec = Utils.getFacadeXExecutionContext(execCxt, p, dg);
logger.trace("Op {}", op);
logger.trace("OpName {}", op.getName());
/**
/*
* input needs to be reset before each execution, otherwise the executor will skip subsequent executions
* since input bindings have been flushed!
*/
Expand All @@ -116,14 +118,19 @@ protected boolean hasNextBinding() {
}
} else {
logger.trace("Slices finished");
/**
/*
* Input iterator can be closed
*/
input.cancel();
// Make sure the original Op is executed
// XXX Maybe there is a better qay of doing it?
// XXX Maybe there is a better way of doing it?
ExecutionContext exc = new ExecutionContext(DatasetGraphFactory.create());
QC.execute(op, QueryIterNullIterator.create(exc), exc);
try {
this.it.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.github.sparqlanything.model.annotations.Option;
import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Sets;
import org.apache.jena.atlas.iterator.IteratorCloseable;
import org.jsfr.json.Collector;
import org.jsfr.json.JacksonParser;
import org.jsfr.json.JsonSurfer;
Expand Down Expand Up @@ -321,8 +322,9 @@ private void transformFromJSONPath(Properties properties, FacadeXGraphBuilder bu
}

@Override
public Iterable<Slice> slice(Properties properties) throws IOException, TriplifierHTTPException {
public CloseableIterable<Slice> slice(Properties properties) throws IOException, TriplifierHTTPException {
List<String> jsonPaths = PropertyUtils.getPropertyValues(properties, PROPERTY_JSONPATH.toString());
Iterable<Slice> r = sliceFromJSONPath(properties);
if (!jsonPaths.isEmpty()) {
return sliceFromJSONPath(properties);
} else {
Expand All @@ -331,7 +333,7 @@ public Iterable<Slice> slice(Properties properties) throws IOException, Triplifi

}

private Iterable<Slice> sliceFromJSONPath(Properties properties) throws TriplifierHTTPException, IOException {
private CloseableIterable<Slice> sliceFromJSONPath(Properties properties) throws TriplifierHTTPException, IOException {
JsonSurfer surfer = new JsonSurfer(JacksonParser.INSTANCE, JacksonProvider.INSTANCE);
final InputStream us = Triplifier.getInputStream(properties);
Collector collector = surfer.collector(us);
Expand All @@ -347,12 +349,19 @@ private Iterable<Slice> sliceFromJSONPath(Properties properties) throws Triplifi
collector.exec();
Iterator<ValueBox<Collection<Object>>> matchesIterator = matches.iterator();
// Only 1 data source expected
return new Iterable<Slice>() {
return new CloseableIterable<Slice>() {

@Override
public void close() throws IOException {
us.close();
}

@Override
public Iterator<Slice> iterator() {

log.debug("Iterating slices");
return new Iterator<Slice>() {

int sln = 0;
Object next = null;
Iterator<Object> objectIterator = null;
Expand Down Expand Up @@ -403,7 +412,7 @@ public Slice next() {
}
}

private Iterable<Slice> sliceFromArray(Properties properties) throws IOException, TriplifierHTTPException {
private CloseableIterable<Slice> sliceFromArray(Properties properties) throws IOException, TriplifierHTTPException {
// XXX How do we close the input stream?
final InputStream us = Triplifier.getInputStream(properties);
JsonFactory factory = JsonFactory.builder().build();
Expand All @@ -417,7 +426,13 @@ private Iterable<Slice> sliceFromArray(Properties properties) throws IOException
}

// Only 1 data source expected
return new Iterable<Slice>() {
return new CloseableIterable<Slice>() {

@Override
public void close() throws IOException {
us.close();
}

JsonToken next = null;

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2024 SPARQL Anything Contributors @ http://github.com/sparql-anything
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.github.sparqlanything.model;


import java.io.Closeable;
import java.util.Iterator;

public interface CloseableIterable<T> extends Iterable<T>, Closeable {

Iterator<T> iterator();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

public interface Slicer {

Iterable<Slice> slice(Properties p) throws IOException, TriplifierHTTPException;
CloseableIterable<Slice> slice(Properties p) throws IOException, TriplifierHTTPException;

void triplify(Slice slice, Properties p, FacadeXGraphBuilder builder);
}
Original file line number Diff line number Diff line change
Expand Up @@ -358,27 +358,37 @@ public Set<String> getExtensions() {
}

@Override
public Iterable<Slice> slice(Properties properties) throws IOException, TriplifierHTTPException {
public CloseableIterable<Slice> slice(Properties properties) throws IOException, TriplifierHTTPException {
final String dataSourceId = SPARQLAnythingConstants.DATA_SOURCE_ID;
List<String> xpaths = PropertyUtils.getPropertyValues(properties, PROPERTY_XPATH);

try {
VTDNav vn = buildVTDNav(properties);
final Iterator<Pair<VTDNav, Integer>> it = evaluateXPaths(vn, xpaths);
return () -> new Iterator<>() {
int theCount = 1;

return new CloseableIterable<Slice>() {
@Override
public boolean hasNext() {
return it.hasNext();
public Iterator<Slice> iterator() {
return new Iterator<>() {
int theCount = 1;

@Override
public boolean hasNext() {
return it.hasNext();
}

@Override
public Slice next() {
Pair<VTDNav, Integer> pair = it.next();
int c = theCount;
theCount++;
return XPathSlice.make(pair.getKey(), pair.getValue(), c, dataSourceId);
}
};
}

@Override
public Slice next() {
Pair<VTDNav, Integer> pair = it.next();
int c = theCount;
theCount++;
return XPathSlice.make(pair.getKey(), pair.getValue(), c, dataSourceId);
public void close() throws IOException {
// Input stream is already closed as evaluateXPaths reads it all and close it
}
};
} catch (Exception e) {
Expand All @@ -388,7 +398,9 @@ public Slice next() {

private VTDNav buildVTDNav(Properties properties) throws TriplifierHTTPException, IOException, ParseException {
VTDGen vg = new VTDGen();
byte[] bytes = IOUtils.toByteArray(Triplifier.getInputStream(properties));
InputStream is = Triplifier.getInputStream(properties);
byte[] bytes = IOUtils.toByteArray(is);
is.close();
vg.setDoc(bytes);
// TODO Support namespaces
vg.parse(false);
Expand Down

0 comments on commit 6892c0c

Please sign in to comment.