From 2c53a973ee26e5315923e7e19e5aa7210134855f Mon Sep 17 00:00:00 2001 From: michaeloffner Date: Tue, 8 Oct 2024 15:43:50 +0200 Subject: [PATCH] LDEV-5102 - Handle ResultSet.close() in a separate thread to prevent blocking in QueryLazy with MySQL --- .../runtime/functions/query/QueryLazy.java | 41 ++++++++-- .../lucee/runtime/util/threading/Closer.java | 82 +++++++++++++++++++ .../runtime/util/threading/CloserJob.java | 11 +++ .../runtime/util/threading/StatmentClose.java | 45 ++++++++++ loader/build.xml | 2 +- loader/pom.xml | 2 +- 6 files changed, 175 insertions(+), 8 deletions(-) create mode 100644 core/src/main/java/lucee/runtime/util/threading/Closer.java create mode 100644 core/src/main/java/lucee/runtime/util/threading/CloserJob.java create mode 100644 core/src/main/java/lucee/runtime/util/threading/StatmentClose.java diff --git a/core/src/main/java/lucee/runtime/functions/query/QueryLazy.java b/core/src/main/java/lucee/runtime/functions/query/QueryLazy.java index 8ab1a0a2ce..c74ba731b7 100644 --- a/core/src/main/java/lucee/runtime/functions/query/QueryLazy.java +++ b/core/src/main/java/lucee/runtime/functions/query/QueryLazy.java @@ -13,6 +13,8 @@ import lucee.commons.db.DBUtil; import lucee.commons.lang.ExceptionUtil; import lucee.commons.lang.StringUtil; +import lucee.commons.lang.types.RefBoolean; +import lucee.commons.lang.types.RefBooleanImpl; import lucee.runtime.PageContext; import lucee.runtime.config.Constants; import lucee.runtime.config.NullSupportHelper; @@ -24,6 +26,7 @@ import lucee.runtime.db.SQLCaster; import lucee.runtime.db.SQLImpl; import lucee.runtime.db.SQLItem; +import lucee.runtime.engine.ThreadLocalPageContext; import lucee.runtime.exp.ApplicationException; import lucee.runtime.exp.DatabaseException; import lucee.runtime.exp.FunctionException; @@ -49,9 +52,12 @@ import lucee.runtime.type.scope.Argument; import lucee.runtime.type.util.KeyConstants; import lucee.runtime.type.util.QueryUtil; +import lucee.runtime.util.threading.Closer; +import lucee.runtime.util.threading.StatmentClose; public class QueryLazy extends BIF { + private static Closer closer; private static int RETURN_TYPE_QUERY = 1; private static int RETURN_TYPE_ARRAY = 2; private static int RETURN_TYPE_STRUCT = 3; @@ -80,6 +86,15 @@ public Object invoke(PageContext pc, Object[] args) throws PageException { // name is set by evaluator public static String call(PageContext pc, String strSQL, UDF listener, Object params, Struct options) throws PageException { + + if (closer == null) { + synchronized (options) { + if (closer == null) { + closer = new Closer(ThreadLocalPageContext.getLog(pc, "datasource")); + } + } + } + DataSource ds = getDatasource(pc, options); // credentials String user = getString(pc, options, KeyConstants._username, null); @@ -115,6 +130,7 @@ public static String call(PageContext pc, String strSQL, UDF listener, Object pa Statement stat = null; ResultSet res = null; boolean hasResult = false; + final RefBoolean aborted = new RefBooleanImpl(false); try { SQLItem[] items = sql.getItems(); if (items.length == 0) { @@ -136,7 +152,7 @@ public static String call(PageContext pc, String strSQL, UDF listener, Object pa do { if (hasResult) { res = stat.getResultSet(); - exe(pc, res, tz, listener, blockfactor, returntype, columnKey); + exe(pc, res, tz, listener, blockfactor, returntype, columnKey, aborted); break; } throw new ApplicationException("the function QueryLazy can only be used for queries returning a resultset"); @@ -151,15 +167,22 @@ public static String call(PageContext pc, String strSQL, UDF listener, Object pa throw Caster.toPageException(e); } finally { - DBUtil.closeEL(res); - DBUtil.closeEL(stat); - manager.releaseConnection(pc, dc); + // because MySQL will loop to all the remaing records, we close the resultset in a separate thread, + // so we not have to wait for it + // TODO does this make sense for other datasource types as well? + if (isMySQL && aborted.toBooleanValue()) { + closer.add(new StatmentClose(manager, dc, stat, ThreadLocalPageContext.getLog(pc, "datasource"))); + } + else { + DBUtil.closeEL(stat); + manager.releaseConnection(pc, dc); + } } return null; } - private static void exe(PageContext pc, ResultSet res, TimeZone tz, UDF listener, int blockfactor, int returntype, Collection.Key columnKey) + private static void exe(PageContext pc, ResultSet res, TimeZone tz, UDF listener, int blockfactor, int returntype, Collection.Key columnKey, RefBoolean aborted) throws SQLException, PageException, IOException { ResultSetMetaData meta = res.getMetaData(); @@ -229,6 +252,7 @@ else if (returntype == RETURN_TYPE_STRUCT) { if (blockfactor == rownbr) { if (!Caster.toBooleanValue(listener.call(pc, new Object[] { _arrRows }, true), true)) { rownbr = 0; + aborted.setValue(true); break; } _arrRows = new ArrayImpl(); @@ -240,6 +264,7 @@ else if (isStruct) { if (blockfactor == rownbr) { if (!Caster.toBooleanValue(listener.call(pc, new Object[] { sctRows }, true), true)) { rownbr = 0; + aborted.setValue(true); break; } sctRows = new StructImpl(); @@ -250,6 +275,7 @@ else if (isQuery) { if (blockfactor == rownbr) { if (!Caster.toBooleanValue(listener.call(pc, new Object[] { qryRows }, true), true)) { rownbr = 0; + aborted.setValue(true); break; } qryRows = new QueryImpl(tmpKeys.toArray(new Collection.Key[tmpKeys.size()]), blockfactor, "queryLazy"); @@ -258,7 +284,10 @@ else if (isQuery) { } } else { - if (!Caster.toBooleanValue(listener.call(pc, new Object[] { row }, true), true)) break; + if (!Caster.toBooleanValue(listener.call(pc, new Object[] { row }, true), true)) { + aborted.setValue(true); + break; + } } } diff --git a/core/src/main/java/lucee/runtime/util/threading/Closer.java b/core/src/main/java/lucee/runtime/util/threading/Closer.java new file mode 100644 index 0000000000..4124b723f5 --- /dev/null +++ b/core/src/main/java/lucee/runtime/util/threading/Closer.java @@ -0,0 +1,82 @@ +package lucee.runtime.util.threading; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import lucee.commons.io.log.Log; + +public class Closer { + private CloserThread thread; + private Log log; + private final BlockingQueue jobs = new LinkedBlockingQueue<>(); + + public Closer(Log log) { + this.log = log; + } + + public void touch() { + if (thread == null || !thread.isAlive()) { + synchronized (this) { + if (thread == null || !thread.isAlive()) { + thread = new CloserThread(jobs, log); + thread.start(); + } + } + } + } + + public void add(CloserJob job) { + jobs.add(job); + touch(); + } + + private static class CloserThread extends Thread { + private static final long IDLE_TIMEOUT = 10000; + private static final long INTERVALL = 1000; + private long lastMod; + private Log log; + private BlockingQueue jobs; + + public CloserThread(BlockingQueue jobs, Log log) { + this.jobs = jobs; + this.log = log; + } + + @Override + public void run() { + while (true) { + CloserJob job = jobs.poll(); + + if (job != null) { + if (log != null) log.debug("Closer", "executing job: " + job.getLablel()); + + long now = System.currentTimeMillis(); + try { + job.execute(); + } + catch (Exception e) { + if (log != null) log.error("Closer", e); + } + lastMod = now; + } + // nothing to do ATM + else { + long now = System.currentTimeMillis(); + if (lastMod + IDLE_TIMEOUT < now) { + if (log != null) log.debug("Closer", "nothing to do, idle timeout reached, stoping observer "); + break; + } + else if (log != null) log.debug("Closer", "nothing to do, remaining idle for another " + ((lastMod + IDLE_TIMEOUT) - now) + "ms"); + } + if (log != null) log.debug("Closer", "sleep for " + INTERVALL + "ms"); + try { + sleep(INTERVALL); + } + catch (InterruptedException e) { + if (log != null) log.error("Closer", e); + } + } + } + } + +} diff --git a/core/src/main/java/lucee/runtime/util/threading/CloserJob.java b/core/src/main/java/lucee/runtime/util/threading/CloserJob.java new file mode 100644 index 0000000000..5ae11a83a0 --- /dev/null +++ b/core/src/main/java/lucee/runtime/util/threading/CloserJob.java @@ -0,0 +1,11 @@ +package lucee.runtime.util.threading; + +import lucee.runtime.exp.PageException; + +public interface CloserJob { + + public String getLablel(); + + public void execute() throws PageException; + +} diff --git a/core/src/main/java/lucee/runtime/util/threading/StatmentClose.java b/core/src/main/java/lucee/runtime/util/threading/StatmentClose.java new file mode 100644 index 0000000000..a5caf71eb5 --- /dev/null +++ b/core/src/main/java/lucee/runtime/util/threading/StatmentClose.java @@ -0,0 +1,45 @@ +package lucee.runtime.util.threading; + +import java.sql.Statement; + +import lucee.commons.db.DBUtil; +import lucee.commons.io.log.Log; +import lucee.runtime.db.DatasourceConnection; +import lucee.runtime.db.DatasourceManagerImpl; +import lucee.runtime.exp.PageException; + +public class StatmentClose implements CloserJob { + + private DatasourceManagerImpl manager; + private DatasourceConnection dc; + private Statement stat; + private Log log; + + public StatmentClose(DatasourceManagerImpl manager, DatasourceConnection dc, Statement stat, Log log) { + this.manager = manager; + this.dc = dc; + this.stat = stat; + this.log = log; + } + + @Override + public String getLablel() { + return "closing datasource connection resultset"; + } + + @Override + public void execute() throws PageException { + // TODO add virtual threads + new Thread(() -> { + try { + DBUtil.closeEL(stat); + manager.releaseConnection(null, dc); + if (log != null) log.debug("StatmentClose", "sucessfully closed resultset"); + } + catch (Exception e) { + if (log != null) log.error("StatmentClose", e); + } + }).start(); + } + +} \ No newline at end of file diff --git a/loader/build.xml b/loader/build.xml index 2b5c3c56f6..efbf28137a 100644 --- a/loader/build.xml +++ b/loader/build.xml @@ -2,7 +2,7 @@ - + diff --git a/loader/pom.xml b/loader/pom.xml index 6b35a5e050..a184b46e4a 100644 --- a/loader/pom.xml +++ b/loader/pom.xml @@ -3,7 +3,7 @@ org.lucee lucee - 6.1.1.91-SNAPSHOT + 6.1.1.92-SNAPSHOT jar Lucee Loader Build