Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #12266 - InvocationType improvements and cleanups. #12551

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,6 @@ public void failed(Throwable x)
promise.failed(x);
}

@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
sbordet marked this conversation as resolved.
Show resolved Hide resolved

@Override
public void onFillable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
Expand All @@ -43,6 +44,7 @@ public class ServerFCGIConnection extends AbstractMetaDataConnection implements
{
private static final Logger LOG = LoggerFactory.getLogger(ServerFCGIConnection.class);

private final Callback fillableCallback = new FillableCallback();
private final HttpChannel.Factory httpChannelFactory = new HttpChannel.DefaultFactory();
private final Attributes attributes = new Lazy();
private final Connector connector;
Expand Down Expand Up @@ -161,7 +163,7 @@ public void clearAttributes()
public void onOpen()
{
super.onOpen();
fillInterested();
fillInterested(fillableCallback);
}

@Override
Expand Down Expand Up @@ -189,7 +191,7 @@ public void onFillable()
else if (read == 0)
{
releaseInputBuffer();
fillInterested();
fillInterested(fillableCallback);
return;
}
else
Expand Down Expand Up @@ -305,7 +307,7 @@ void onCompleted(Throwable failure)
{
releaseInputBuffer();
if (failure == null)
fillInterested();
fillInterested(fillableCallback);
else
getFlusher().shutdown();
}
Expand Down Expand Up @@ -407,25 +409,41 @@ public void onFailure(int request, Throwable failure)
@Override
public void close()
{
if (stream != null)
try
{
Runnable task = stream.getHttpChannel().onClose();
if (task != null)
if (stream != null)
{
ThreadPool.executeImmediately(getExecutor(), () ->
{
try
{
task.run();
}
finally
{
super.close();
}
});
return;
Runnable task = stream.getHttpChannel().onClose();
if (task != null)
task.run();
}
}
super.close();
finally
{
super.close();
}
}

private class FillableCallback implements Callback
{
private final InvocationType invocationType = getConnector().getServer().getInvocationType();

@Override
public void succeeded()
{
onFillable();
}

@Override
public void failed(Throwable x)
{
onFillInterestedFailed(x);
}

@Override
public InvocationType getInvocationType()
{
return invocationType;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,6 @@ public void failed(Throwable x)
close();
promise.failed(x);
}

@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
}

private static class ConnectionListener implements Connection.Listener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ else if (filled == 0)
}
networkBuffer = null;
if (interested)
getEndPoint().fillInterested(fillableCallback);
fillInterested(fillableCallback);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,18 @@ public void onStreamTimeout(Stream stream, TimeoutException timeout, Promise<Boo
{
channel.onTimeout(timeout, (task, timedOut) ->
{
if (task != null)
offerTask(task, true);
promise.succeeded(timedOut);
ThreadPool.executeImmediately(getExecutor(), () ->
{
try
{
task.run();
gregw marked this conversation as resolved.
Show resolved Hide resolved
promise.succeeded(timedOut);
}
catch (Throwable x)
{
promise.failed(x);
}
});
gregw marked this conversation as resolved.
Show resolved Hide resolved
});
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.eclipse.jetty.http3.server;

import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;

import org.eclipse.jetty.http.HttpFields;
Expand All @@ -35,6 +36,7 @@
import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -153,27 +155,31 @@ public void onTrailer(Stream.Server stream, HeadersFrame frame)
public void onIdleTimeout(Stream.Server stream, TimeoutException timeout, Promise<Boolean> promise)
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
getConnection().onIdleTimeout((HTTP3Stream)stream, timeout, (task, timedOut) ->
getConnection().onIdleTimeout(http3Stream, timeout, (task, timedOut) ->
{
if (task != null)
Executor executor = http3Stream.getSession().getProtocolSession().getQuicSession().getExecutor();
ThreadPool.executeImmediately(executor, () ->
{
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
protocolSession.offer(task, true);
}
promise.succeeded(timedOut);
try
{
task.run();
gregw marked this conversation as resolved.
Show resolved Hide resolved
promise.succeeded(timedOut);
}
catch (Throwable x)
{
promise.failed(x);
}
});
});
}

@Override
public void onFailure(Stream.Server stream, long error, Throwable failure)
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
Runnable task = getConnection().onFailure((HTTP3Stream)stream, failure);
if (task != null)
{
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
protocolSession.offer(task, true);
}
Runnable task = getConnection().onFailure(http3Stream, failure);
Executor executor = http3Stream.getSession().getProtocolSession().getQuicSession().getExecutor();
ThreadPool.executeImmediately(executor, task);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public abstract class AbstractConnection implements Connection, Invocable
private final EndPoint _endPoint;
private final Executor _executor;
private final Callback _readCallback;
private final Callback _nonBlockingReadCallback;
private int _inputBufferSize = 2048;

protected AbstractConnection(EndPoint endPoint, Executor executor)
Expand All @@ -48,16 +49,8 @@ protected AbstractConnection(EndPoint endPoint, Executor executor)
throw new IllegalArgumentException("Executor must not be null!");
_endPoint = endPoint;
_executor = executor;
_readCallback = new ReadCallback();
}

@Deprecated
@Override
public InvocationType getInvocationType()
{
// TODO consider removing the #fillInterested method from the connection and only use #fillInterestedCallback
// so a connection need not be Invocable
return Invocable.super.getInvocationType();
_readCallback = new FillableCallback();
_nonBlockingReadCallback = new NonBlockingFillableCallback();
}

@Override
Expand Down Expand Up @@ -90,25 +83,32 @@ protected Executor getExecutor()
}

/**
* <p>Utility method to be called to register read interest.</p>
* <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
* will be called back as appropriate.</p>
* <p>Registers read interest using the default {@link Callback} with {@link Invocable.InvocationType#BLOCKING}.</p>
* <p>When read readiness is signaled, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
* will be invoked.</p>
* <p>This method should be used sparingly, mainly from {@link #onOpen()}, and {@link #fillInterested(Callback)}
* should be preferred instead, passing a {@link Callback} that specifies the {@link Invocable.InvocationType}
* for each specific case where read interest needs to be registered.</p>
*
* @see #fillInterested(Callback)
* @see #onFillable()
* @see #onFillInterestedFailed(Throwable)
*/
public void fillInterested()
{
if (LOG.isDebugEnabled())
LOG.debug("fillInterested {}", this);
getEndPoint().fillInterested(_readCallback);
fillInterested(_readCallback);
}

public void nonBlockingFillInterested()
gregw marked this conversation as resolved.
Show resolved Hide resolved
{
fillInterested(_nonBlockingReadCallback);
}

/**
* <p>Utility method to be called to register read interest.</p>
* <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
* will be called back as appropriate.</p>
* <p>Registers read interest with the given callback.</p>
* <p>When read readiness is signaled, the callback will be completed.</p>
*
* @see #onFillable()
* @param callback the callback to complete when read readiness is signaled
*/
public void fillInterested(Callback callback)
{
Expand All @@ -130,7 +130,7 @@ public boolean isFillInterested()
/**
* <p>Callback method invoked when the endpoint is ready to be read.</p>
*
* @see #fillInterested()
* @see #fillInterested(Callback)
*/
public abstract void onFillable();

Expand All @@ -139,7 +139,7 @@ public boolean isFillInterested()
*
* @param cause the exception that caused the failure
*/
protected void onFillInterestedFailed(Throwable cause)
public void onFillInterestedFailed(Throwable cause)
{
if (LOG.isDebugEnabled())
LOG.debug("onFillInterestedFailed {}", this, cause);
Expand Down Expand Up @@ -286,7 +286,12 @@ public String toConnectionString()
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}

private class ReadCallback implements Callback, Invocable
/**
* <p>The default {@link Callback} for read interest, typically used from {@link #onOpen()}.</p>
* <p>In other cases, use {@link #fillInterested(Callback)} with a {@link Callback} that
* reports a more specific {@link Invocable.InvocationType}.</p>
*/
private class FillableCallback implements Callback
{
@Override
public void succeeded()
Expand All @@ -305,11 +310,19 @@ public String toString()
{
return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), AbstractConnection.this);
}
}

/**
* <p>The default {@link Callback} for read interest, typically used from {@link #onOpen()}.</p>
* <p>In other cases, use {@link #fillInterested(Callback)} with a {@link Callback} that
* reports a more specific {@link Invocable.InvocationType}.</p>
*/
private class NonBlockingFillableCallback extends FillableCallback
{
@Override
public InvocationType getInvocationType()
{
return AbstractConnection.this.getInvocationType();
return InvocationType.NON_BLOCKING;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void onOpen()
if (completed)
replaceConnection();
else
fillInterested();
nonBlockingFillInterested();
}
catch (Throwable x)
{
Expand All @@ -89,7 +89,7 @@ public void onFillable()
}
if (filled == 0)
{
fillInterested();
nonBlockingFillInterested();
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void onOpen()
if (upgraded)
_buffer.release();
else
fillInterested();
nonBlockingFillInterested();
}
catch (Throwable x)
{
Expand Down Expand Up @@ -211,7 +211,7 @@ public void onFillable()
}
if (fill == 0)
{
fillInterested();
nonBlockingFillInterested();
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected void setProtocol(String protocol)
public void onOpen()
{
super.onOpen();
fillInterested();
nonBlockingFillInterested();
}

@Override
Expand All @@ -106,7 +106,7 @@ public void onFillable()
{
// Here the SSL handshake is not finished yet but we filled 0 bytes,
// so we need to read more.
fillInterested();
nonBlockingFillInterested();
}
}
else
Expand Down
Loading
Loading