diff --git a/sparql-anything-csv/src/main/java/io/github/sparqlanything/csv/CSVTriplifier.java b/sparql-anything-csv/src/main/java/io/github/sparqlanything/csv/CSVTriplifier.java index b8064d51..3999c899 100644 --- a/sparql-anything-csv/src/main/java/io/github/sparqlanything/csv/CSVTriplifier.java +++ b/sparql-anything-csv/src/main/java/io/github/sparqlanything/csv/CSVTriplifier.java @@ -224,7 +224,7 @@ public Set getExtensions() { } @Override - public Iterable slice(Properties properties) throws IOException, TriplifierHTTPException { + public CloseableIterable slice(Properties properties) throws IOException, TriplifierHTTPException { CSVFormat format = buildFormat(properties); Charset charset = Triplifier.getCharsetArgument(properties); @@ -239,7 +239,13 @@ public Iterable slice(Properties properties) throws IOException, Triplifi final Iterator recordIterator = records.iterator(); final LinkedHashMap headers_map = makeHeadersMapFromOpenIterator(recordIterator, properties, format, charset); - return new Iterable() { + return new CloseableIterable() { + + @Override + public void close() throws IOException { + // The InputStream is closed by hasNext method of the iterator + } + @Override public Iterator iterator() { log.debug("Iterating slices"); diff --git a/sparql-anything-engine/src/main/java/io/github/sparqlanything/engine/QueryIterSlicer.java b/sparql-anything-engine/src/main/java/io/github/sparqlanything/engine/QueryIterSlicer.java index a1e237c2..8453cbc8 100644 --- a/sparql-anything-engine/src/main/java/io/github/sparqlanything/engine/QueryIterSlicer.java +++ b/sparql-anything-engine/src/main/java/io/github/sparqlanything/engine/QueryIterSlicer.java @@ -50,11 +50,13 @@ public class QueryIterSlicer extends QueryIter { private QueryIterator current = null; private final Properties p; + private final CloseableIterable it; + public QueryIterSlicer(ExecutionContext execCxt, QueryIterator input, Triplifier t, Properties properties, Op op) throws TriplifierHTTPException, IOException { super(execCxt); - slicer = (Slicer) t; + this.slicer = (Slicer) t; this.p = properties; - final Iterable it = slicer.slice(p); + this.it = slicer.slice(p); this.input = input; elements = new ArrayList<>(); @@ -102,7 +104,7 @@ protected boolean hasNextBinding() { FacadeXExecutionContext ec = Utils.getFacadeXExecutionContext(execCxt, p, dg); logger.trace("Op {}", op); logger.trace("OpName {}", op.getName()); - /** + /* * input needs to be reset before each execution, otherwise the executor will skip subsequent executions * since input bindings have been flushed! */ @@ -116,14 +118,19 @@ protected boolean hasNextBinding() { } } else { logger.trace("Slices finished"); - /** + /* * Input iterator can be closed */ input.cancel(); // Make sure the original Op is executed - // XXX Maybe there is a better qay of doing it? + // XXX Maybe there is a better way of doing it? ExecutionContext exc = new ExecutionContext(DatasetGraphFactory.create()); QC.execute(op, QueryIterNullIterator.create(exc), exc); + try { + this.it.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } return false; } } diff --git a/sparql-anything-json/src/main/java/io/github/sparqlanything/json/JSONTriplifier.java b/sparql-anything-json/src/main/java/io/github/sparqlanything/json/JSONTriplifier.java index 0356efa4..6a70a7bd 100644 --- a/sparql-anything-json/src/main/java/io/github/sparqlanything/json/JSONTriplifier.java +++ b/sparql-anything-json/src/main/java/io/github/sparqlanything/json/JSONTriplifier.java @@ -24,6 +24,7 @@ import io.github.sparqlanything.model.annotations.Option; import org.apache.commons.lang3.StringUtils; import com.google.common.collect.Sets; +import org.apache.jena.atlas.iterator.IteratorCloseable; import org.jsfr.json.Collector; import org.jsfr.json.JacksonParser; import org.jsfr.json.JsonSurfer; @@ -321,8 +322,9 @@ private void transformFromJSONPath(Properties properties, FacadeXGraphBuilder bu } @Override - public Iterable slice(Properties properties) throws IOException, TriplifierHTTPException { + public CloseableIterable slice(Properties properties) throws IOException, TriplifierHTTPException { List jsonPaths = PropertyUtils.getPropertyValues(properties, PROPERTY_JSONPATH.toString()); + Iterable r = sliceFromJSONPath(properties); if (!jsonPaths.isEmpty()) { return sliceFromJSONPath(properties); } else { @@ -331,7 +333,7 @@ public Iterable slice(Properties properties) throws IOException, Triplifi } - private Iterable sliceFromJSONPath(Properties properties) throws TriplifierHTTPException, IOException { + private CloseableIterable sliceFromJSONPath(Properties properties) throws TriplifierHTTPException, IOException { JsonSurfer surfer = new JsonSurfer(JacksonParser.INSTANCE, JacksonProvider.INSTANCE); final InputStream us = Triplifier.getInputStream(properties); Collector collector = surfer.collector(us); @@ -347,12 +349,19 @@ private Iterable sliceFromJSONPath(Properties properties) throws Triplifi collector.exec(); Iterator>> matchesIterator = matches.iterator(); // Only 1 data source expected - return new Iterable() { + return new CloseableIterable() { + + @Override + public void close() throws IOException { + us.close(); + } + @Override public Iterator iterator() { log.debug("Iterating slices"); return new Iterator() { + int sln = 0; Object next = null; Iterator objectIterator = null; @@ -403,7 +412,7 @@ public Slice next() { } } - private Iterable sliceFromArray(Properties properties) throws IOException, TriplifierHTTPException { + private CloseableIterable sliceFromArray(Properties properties) throws IOException, TriplifierHTTPException { // XXX How do we close the input stream? final InputStream us = Triplifier.getInputStream(properties); JsonFactory factory = JsonFactory.builder().build(); @@ -417,7 +426,13 @@ private Iterable sliceFromArray(Properties properties) throws IOException } // Only 1 data source expected - return new Iterable() { + return new CloseableIterable() { + + @Override + public void close() throws IOException { + us.close(); + } + JsonToken next = null; @Override diff --git a/sparql-anything-model/src/main/java/io/github/sparqlanything/model/CloseableIterable.java b/sparql-anything-model/src/main/java/io/github/sparqlanything/model/CloseableIterable.java new file mode 100644 index 00000000..44acb61b --- /dev/null +++ b/sparql-anything-model/src/main/java/io/github/sparqlanything/model/CloseableIterable.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2024 SPARQL Anything Contributors @ http://github.com/sparql-anything + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.github.sparqlanything.model; + + +import java.io.Closeable; +import java.util.Iterator; + +public interface CloseableIterable extends Iterable, Closeable { + + Iterator iterator(); +} diff --git a/sparql-anything-model/src/main/java/io/github/sparqlanything/model/Slicer.java b/sparql-anything-model/src/main/java/io/github/sparqlanything/model/Slicer.java index 591e2768..5de66d4f 100644 --- a/sparql-anything-model/src/main/java/io/github/sparqlanything/model/Slicer.java +++ b/sparql-anything-model/src/main/java/io/github/sparqlanything/model/Slicer.java @@ -21,7 +21,7 @@ public interface Slicer { - Iterable slice(Properties p) throws IOException, TriplifierHTTPException; + CloseableIterable slice(Properties p) throws IOException, TriplifierHTTPException; void triplify(Slice slice, Properties p, FacadeXGraphBuilder builder); } diff --git a/sparql-anything-xml/src/main/java/io/github/sparqlanything/xml/XMLTriplifier.java b/sparql-anything-xml/src/main/java/io/github/sparqlanything/xml/XMLTriplifier.java index 9bb516cc..56bca459 100644 --- a/sparql-anything-xml/src/main/java/io/github/sparqlanything/xml/XMLTriplifier.java +++ b/sparql-anything-xml/src/main/java/io/github/sparqlanything/xml/XMLTriplifier.java @@ -358,27 +358,37 @@ public Set getExtensions() { } @Override - public Iterable slice(Properties properties) throws IOException, TriplifierHTTPException { + public CloseableIterable slice(Properties properties) throws IOException, TriplifierHTTPException { final String dataSourceId = SPARQLAnythingConstants.DATA_SOURCE_ID; List xpaths = PropertyUtils.getPropertyValues(properties, PROPERTY_XPATH); try { VTDNav vn = buildVTDNav(properties); final Iterator> it = evaluateXPaths(vn, xpaths); - return () -> new Iterator<>() { - int theCount = 1; - + return new CloseableIterable() { @Override - public boolean hasNext() { - return it.hasNext(); + public Iterator iterator() { + return new Iterator<>() { + int theCount = 1; + + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public Slice next() { + Pair pair = it.next(); + int c = theCount; + theCount++; + return XPathSlice.make(pair.getKey(), pair.getValue(), c, dataSourceId); + } + }; } @Override - public Slice next() { - Pair pair = it.next(); - int c = theCount; - theCount++; - return XPathSlice.make(pair.getKey(), pair.getValue(), c, dataSourceId); + public void close() throws IOException { + // Input stream is already closed as evaluateXPaths reads it all and close it } }; } catch (Exception e) { @@ -388,7 +398,9 @@ public Slice next() { private VTDNav buildVTDNav(Properties properties) throws TriplifierHTTPException, IOException, ParseException { VTDGen vg = new VTDGen(); - byte[] bytes = IOUtils.toByteArray(Triplifier.getInputStream(properties)); + InputStream is = Triplifier.getInputStream(properties); + byte[] bytes = IOUtils.toByteArray(is); + is.close(); vg.setDoc(bytes); // TODO Support namespaces vg.parse(false);