Skip to content

Commit

Permalink
Added automatic retry after a delay for SPARQL endpoints after HTTP 429
Browse files Browse the repository at this point in the history
  • Loading branch information
becker-al committed Aug 14, 2024
1 parent b8d95fd commit 04c69c0
Showing 1 changed file with 32 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import org.aksw.limes.core.io.cache.ACache;
import org.aksw.limes.core.io.config.KBInfo;
import org.apache.http.HttpException;
import org.apache.jena.query.*;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.sparql.engine.http.QueryExceptionHTTP;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,9 +43,8 @@ public SparqlQueryModule(KBInfo kbinfo) {
/**
* Reads from a SPARQL endpoint and writes the results in a cache
*
* @param cache
* The cache in which the content on the SPARQL endpoint is to be
* written
* @param cache The cache in which the content on the SPARQL endpoint is to be
* written
*/
public void fillCache(ACache cache) {
fillCache(cache, true);
Expand All @@ -52,12 +53,10 @@ public void fillCache(ACache cache) {
/**
* Reads from a SPARQL endpoint or a file and writes the results in a cache
*
* @param cache
* The cache in which the content on the SPARQL endpoint is to be
* written
* @param isSparql
* True if the endpoint is a remote SPARQL endpoint, else assume
* that is is a Jena model
* @param cache The cache in which the content on the SPARQL endpoint is to be
* written
* @param isSparql True if the endpoint is a remote SPARQL endpoint, else assume
* that is is a Jena model
*/
public void fillCache(ACache cache, boolean isSparql) {
long startTime = System.currentTimeMillis();
Expand All @@ -75,21 +74,21 @@ public void fillCache(ACache cache, boolean isSparql) {
String basicQuery = query;
do {
int nextOffset = offset + kb.getPageSize();
if(kb.getMaxOffset() > 0) {
if (kb.getMaxOffset() > 0) {
nextOffset = Math.min(kb.getMaxOffset(), nextOffset);
}

logger.info("Getting statements " + offset + " to " + nextOffset);

if (kb.getPageSize() > 0) {
int limit = kb.getPageSize();
if(kb.getMaxOffset() > 0) {
if (kb.getMaxOffset() > 0) {
limit = nextOffset - offset;
}
query = basicQuery + " LIMIT " + limit + " OFFSET " + offset;
} else {
query = basicQuery;
if(kb.getMaxOffset() > 0) {
if (kb.getMaxOffset() > 0) {
query = query + " LIMIT " + kb.getMaxOffset();
}
}
Expand All @@ -114,7 +113,26 @@ public void fillCache(ACache cache, boolean isSparql) {
qexec = QueryExecutionFactory.sparqlService(kb.getEndpoint(), sparqlQuery);
}
}
ResultSet results = qexec.execSelect();

ResultSet results = null;
long retryAfter = 120000;
while (results == null) {
try {
results = qexec.execSelect();
} catch (QueryExceptionHTTP httpException) {
if (httpException.getStatusCode() == 429 || httpException.getStatusCode() == 500) {
logger.error("Received HTTP " + httpException.getStatusCode() + " - Waiting " + retryAfter / 1000 + " seconds before retrying");
try {
Thread.sleep(retryAfter);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
retryAfter *= 2;
} else {
throw httpException;
}
}
}

// write
String uri, value;
Expand All @@ -138,7 +156,7 @@ public void fillCache(ACache cache, boolean isSparql) {
}
i++;
}
if(kb.getOptionalProperties() != null){
if (kb.getOptionalProperties() != null) {
for (String propertyLabel : kb.getOptionalProperties()) {
if (soln.contains("v" + i)) {
value = soln.get("v" + i).toString();
Expand Down

0 comments on commit 04c69c0

Please sign in to comment.