Skip to content

Commit

Permalink
LDEV-5102 - Handle ResultSet.close() in a separate thread to prevent …
Browse files Browse the repository at this point in the history
…blocking in QueryLazy with MySQL
  • Loading branch information
michaeloffner committed Oct 8, 2024
1 parent e5fba89 commit 2c53a97
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 8 deletions.
41 changes: 35 additions & 6 deletions core/src/main/java/lucee/runtime/functions/query/QueryLazy.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import lucee.commons.db.DBUtil;
import lucee.commons.lang.ExceptionUtil;
import lucee.commons.lang.StringUtil;
import lucee.commons.lang.types.RefBoolean;
import lucee.commons.lang.types.RefBooleanImpl;
import lucee.runtime.PageContext;
import lucee.runtime.config.Constants;
import lucee.runtime.config.NullSupportHelper;
Expand All @@ -24,6 +26,7 @@
import lucee.runtime.db.SQLCaster;
import lucee.runtime.db.SQLImpl;
import lucee.runtime.db.SQLItem;
import lucee.runtime.engine.ThreadLocalPageContext;
import lucee.runtime.exp.ApplicationException;
import lucee.runtime.exp.DatabaseException;
import lucee.runtime.exp.FunctionException;
Expand All @@ -49,9 +52,12 @@
import lucee.runtime.type.scope.Argument;
import lucee.runtime.type.util.KeyConstants;
import lucee.runtime.type.util.QueryUtil;
import lucee.runtime.util.threading.Closer;
import lucee.runtime.util.threading.StatmentClose;

public class QueryLazy extends BIF {

private static Closer closer;
private static int RETURN_TYPE_QUERY = 1;
private static int RETURN_TYPE_ARRAY = 2;
private static int RETURN_TYPE_STRUCT = 3;
Expand Down Expand Up @@ -80,6 +86,15 @@ public Object invoke(PageContext pc, Object[] args) throws PageException {

// name is set by evaluator
public static String call(PageContext pc, String strSQL, UDF listener, Object params, Struct options) throws PageException {

if (closer == null) {
synchronized (options) {
if (closer == null) {
closer = new Closer(ThreadLocalPageContext.getLog(pc, "datasource"));
}
}
}

DataSource ds = getDatasource(pc, options);
// credentials
String user = getString(pc, options, KeyConstants._username, null);
Expand Down Expand Up @@ -115,6 +130,7 @@ public static String call(PageContext pc, String strSQL, UDF listener, Object pa
Statement stat = null;
ResultSet res = null;
boolean hasResult = false;
final RefBoolean aborted = new RefBooleanImpl(false);
try {
SQLItem[] items = sql.getItems();
if (items.length == 0) {
Expand All @@ -136,7 +152,7 @@ public static String call(PageContext pc, String strSQL, UDF listener, Object pa
do {
if (hasResult) {
res = stat.getResultSet();
exe(pc, res, tz, listener, blockfactor, returntype, columnKey);
exe(pc, res, tz, listener, blockfactor, returntype, columnKey, aborted);
break;
}
throw new ApplicationException("the function QueryLazy can only be used for queries returning a resultset");
Expand All @@ -151,15 +167,22 @@ public static String call(PageContext pc, String strSQL, UDF listener, Object pa
throw Caster.toPageException(e);
}
finally {
DBUtil.closeEL(res);
DBUtil.closeEL(stat);
manager.releaseConnection(pc, dc);
// because MySQL will loop to all the remaing records, we close the resultset in a separate thread,
// so we not have to wait for it
// TODO does this make sense for other datasource types as well?
if (isMySQL && aborted.toBooleanValue()) {
closer.add(new StatmentClose(manager, dc, stat, ThreadLocalPageContext.getLog(pc, "datasource")));
}
else {
DBUtil.closeEL(stat);
manager.releaseConnection(pc, dc);
}
}

return null;
}

private static void exe(PageContext pc, ResultSet res, TimeZone tz, UDF listener, int blockfactor, int returntype, Collection.Key columnKey)
private static void exe(PageContext pc, ResultSet res, TimeZone tz, UDF listener, int blockfactor, int returntype, Collection.Key columnKey, RefBoolean aborted)
throws SQLException, PageException, IOException {
ResultSetMetaData meta = res.getMetaData();

Expand Down Expand Up @@ -229,6 +252,7 @@ else if (returntype == RETURN_TYPE_STRUCT) {
if (blockfactor == rownbr) {
if (!Caster.toBooleanValue(listener.call(pc, new Object[] { _arrRows }, true), true)) {
rownbr = 0;
aborted.setValue(true);
break;
}
_arrRows = new ArrayImpl();
Expand All @@ -240,6 +264,7 @@ else if (isStruct) {
if (blockfactor == rownbr) {
if (!Caster.toBooleanValue(listener.call(pc, new Object[] { sctRows }, true), true)) {
rownbr = 0;
aborted.setValue(true);
break;
}
sctRows = new StructImpl();
Expand All @@ -250,6 +275,7 @@ else if (isQuery) {
if (blockfactor == rownbr) {
if (!Caster.toBooleanValue(listener.call(pc, new Object[] { qryRows }, true), true)) {
rownbr = 0;
aborted.setValue(true);
break;
}
qryRows = new QueryImpl(tmpKeys.toArray(new Collection.Key[tmpKeys.size()]), blockfactor, "queryLazy");
Expand All @@ -258,7 +284,10 @@ else if (isQuery) {
}
}
else {
if (!Caster.toBooleanValue(listener.call(pc, new Object[] { row }, true), true)) break;
if (!Caster.toBooleanValue(listener.call(pc, new Object[] { row }, true), true)) {
aborted.setValue(true);
break;
}
}

}
Expand Down
82 changes: 82 additions & 0 deletions core/src/main/java/lucee/runtime/util/threading/Closer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package lucee.runtime.util.threading;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import lucee.commons.io.log.Log;

public class Closer {
private CloserThread thread;
private Log log;
private final BlockingQueue<CloserJob> jobs = new LinkedBlockingQueue<>();

public Closer(Log log) {
this.log = log;
}

public void touch() {
if (thread == null || !thread.isAlive()) {
synchronized (this) {
if (thread == null || !thread.isAlive()) {
thread = new CloserThread(jobs, log);
thread.start();
}
}
}
}

public void add(CloserJob job) {
jobs.add(job);
touch();
}

private static class CloserThread extends Thread {
private static final long IDLE_TIMEOUT = 10000;
private static final long INTERVALL = 1000;
private long lastMod;
private Log log;
private BlockingQueue<CloserJob> jobs;

public CloserThread(BlockingQueue<CloserJob> jobs, Log log) {
this.jobs = jobs;
this.log = log;
}

@Override
public void run() {
while (true) {
CloserJob job = jobs.poll();

if (job != null) {
if (log != null) log.debug("Closer", "executing job: " + job.getLablel());

long now = System.currentTimeMillis();
try {
job.execute();
}
catch (Exception e) {
if (log != null) log.error("Closer", e);
}
lastMod = now;
}
// nothing to do ATM
else {
long now = System.currentTimeMillis();
if (lastMod + IDLE_TIMEOUT < now) {
if (log != null) log.debug("Closer", "nothing to do, idle timeout reached, stoping observer ");
break;
}
else if (log != null) log.debug("Closer", "nothing to do, remaining idle for another " + ((lastMod + IDLE_TIMEOUT) - now) + "ms");
}
if (log != null) log.debug("Closer", "sleep for " + INTERVALL + "ms");
try {
sleep(INTERVALL);
}
catch (InterruptedException e) {
if (log != null) log.error("Closer", e);
}
}
}
}

}
11 changes: 11 additions & 0 deletions core/src/main/java/lucee/runtime/util/threading/CloserJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package lucee.runtime.util.threading;

import lucee.runtime.exp.PageException;

public interface CloserJob {

public String getLablel();

public void execute() throws PageException;

}
45 changes: 45 additions & 0 deletions core/src/main/java/lucee/runtime/util/threading/StatmentClose.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package lucee.runtime.util.threading;

import java.sql.Statement;

import lucee.commons.db.DBUtil;
import lucee.commons.io.log.Log;
import lucee.runtime.db.DatasourceConnection;
import lucee.runtime.db.DatasourceManagerImpl;
import lucee.runtime.exp.PageException;

public class StatmentClose implements CloserJob {

private DatasourceManagerImpl manager;
private DatasourceConnection dc;
private Statement stat;
private Log log;

public StatmentClose(DatasourceManagerImpl manager, DatasourceConnection dc, Statement stat, Log log) {
this.manager = manager;
this.dc = dc;
this.stat = stat;
this.log = log;
}

@Override
public String getLablel() {
return "closing datasource connection resultset";
}

@Override
public void execute() throws PageException {
// TODO add virtual threads
new Thread(() -> {
try {
DBUtil.closeEL(stat);
manager.releaseConnection(null, dc);
if (log != null) log.debug("StatmentClose", "sucessfully closed resultset");
}
catch (Exception e) {
if (log != null) log.error("StatmentClose", e);
}
}).start();
}

}
2 changes: 1 addition & 1 deletion loader/build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<project default="core" basedir="." name="Lucee"
xmlns:resolver="antlib:org.apache.maven.resolver.ant">

<property name="version" value="6.1.1.91-SNAPSHOT"/>
<property name="version" value="6.1.1.92-SNAPSHOT"/>

<taskdef uri="antlib:org.apache.maven.resolver.ant" resource="org/apache/maven/resolver/ant/antlib.xml">
<classpath>
Expand Down
2 changes: 1 addition & 1 deletion loader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

<groupId>org.lucee</groupId>
<artifactId>lucee</artifactId>
<version>6.1.1.91-SNAPSHOT</version>
<version>6.1.1.92-SNAPSHOT</version>
<packaging>jar</packaging>

<name>Lucee Loader Build</name>
Expand Down

0 comments on commit 2c53a97

Please sign in to comment.