Skip to content

Commit

Permalink
Issue #2655 - Removing closed WebSocket Session's from WebSocketClient
Browse files Browse the repository at this point in the history
+ Correcting Native WebSocketConfiguration impact.
+ CDI requires a customized DecoratedObjectFactory, which is bound
  later in the lifecycle, which means we cannot rely on it being
  provided directly in the constructors, but rather have to look
  for it in the ServletContext.

Signed-off-by: Joakim Erdfelt <[email protected]>
  • Loading branch information
joakime committed Jun 13, 2018
1 parent 395f439 commit 870c87f
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public void testHttpClientThreads_AfterClientConnectTo() throws Exception
}
}

@Test
@Test(timeout = 5000)
public void testHttpClientThreads_AfterServerConnectTo() throws Exception
{
Server server = new Server(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,12 @@ public void assertValidTextMessageSize(int requestedSize)

public WebSocketPolicy clonePolicy()
{
WebSocketPolicy clone = new WebSocketPolicy(this.behavior);
return clonePolicy(this.behavior);
}

public WebSocketPolicy clonePolicy(WebSocketBehavior behavior)
{
WebSocketPolicy clone = new WebSocketPolicy(behavior);
clone.idleTimeout = this.idleTimeout;
clone.maxTextMessageSize = this.maxTextMessageSize;
clone.maxTextMessageBufferSize = this.maxTextMessageBufferSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;

import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.DeprecationWarning;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
Expand All @@ -56,8 +58,6 @@
import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
import org.eclipse.jetty.websocket.common.scopes.DelegatedContainerScope;
import org.eclipse.jetty.websocket.common.scopes.SimpleContainerScope;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;

/**
Expand All @@ -70,12 +70,16 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
// From HttpClient
private final HttpClient httpClient;

//
private final WebSocketContainerScope containerScope;
// CDI layer
private final Supplier<DecoratedObjectFactory> objectFactorySupplier;

// WebSocket Specifics
private final WebSocketPolicy policy;
private final WebSocketExtensionFactory extensionRegistry;
private final EventDriverFactory eventDriverFactory;
private final SessionFactory sessionFactory;

// ID Generator
private final int id = ThreadLocalRandom.current().nextInt();

// defaults to true for backwards compatibility
Expand Down Expand Up @@ -112,11 +116,13 @@ public WebSocketClient(HttpClient httpClient)
*/
public WebSocketClient(HttpClient httpClient, DecoratedObjectFactory objectFactory)
{
this.containerScope = new SimpleContainerScope(WebSocketPolicy.newClientPolicy(),new MappedByteBufferPool(),objectFactory);
this.httpClient = httpClient;
this.extensionRegistry = new WebSocketExtensionFactory(containerScope);
this.eventDriverFactory = new EventDriverFactory(containerScope);
this.sessionFactory = new WebSocketSessionFactory(containerScope);
this.httpClient = Objects.requireNonNull(httpClient, "HttpClient");
this.policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
final DecoratedObjectFactory decoratedObjectFactory = objectFactory != null ? objectFactory : newDecoratedObjectFactory();
this.objectFactorySupplier = () -> decoratedObjectFactory;
this.extensionRegistry = new WebSocketExtensionFactory(this);
this.eventDriverFactory = new EventDriverFactory(this);
this.sessionFactory = new WebSocketSessionFactory(this);
}

/**
Expand Down Expand Up @@ -231,13 +237,15 @@ private WebSocketClient(SslContextFactory sslContextFactory, Executor executor,
this.httpClient.setExecutor(executor);
this.httpClient.setByteBufferPool(bufferPool);
addBean(this.httpClient);

this.containerScope = new SimpleContainerScope(WebSocketPolicy.newClientPolicy(), bufferPool, objectFactory);

this.extensionRegistry = new WebSocketExtensionFactory(containerScope);
this.policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
final DecoratedObjectFactory decoratedObjectFactory = objectFactory != null ? objectFactory : newDecoratedObjectFactory();
this.objectFactorySupplier = ()-> decoratedObjectFactory;

this.extensionRegistry = new WebSocketExtensionFactory(this);

this.eventDriverFactory = new EventDriverFactory(containerScope);
this.sessionFactory = new WebSocketSessionFactory(containerScope);
this.eventDriverFactory = new EventDriverFactory(this);
this.sessionFactory = new WebSocketSessionFactory(this);
}

/**
Expand Down Expand Up @@ -271,19 +279,6 @@ public WebSocketClient(final WebSocketContainerScope scope, EventDriverFactory e
*/
public WebSocketClient(final WebSocketContainerScope scope, EventDriverFactory eventDriverFactory, SessionFactory sessionFactory, HttpClient httpClient)
{
WebSocketContainerScope clientScope;
if (scope.getPolicy().getBehavior() == WebSocketBehavior.CLIENT)
{
clientScope = scope;
}
else
{
// We need to wrap the scope
clientScope = new DelegatedContainerScope(WebSocketPolicy.newClientPolicy(), scope);
}

this.containerScope = clientScope;

if(httpClient == null)
{
this.httpClient = HttpClientProvider.get(scope);
Expand All @@ -293,13 +288,23 @@ public WebSocketClient(final WebSocketContainerScope scope, EventDriverFactory e
{
this.httpClient = httpClient;
}

this.extensionRegistry = new WebSocketExtensionFactory(containerScope);

this.policy = scope.getPolicy().clonePolicy(WebSocketBehavior.CLIENT);
// Support Late Binding of Object Factory (for CDI)
this.objectFactorySupplier = () -> scope.getObjectFactory();
this.extensionRegistry = new WebSocketExtensionFactory(this);

this.eventDriverFactory = eventDriverFactory;
this.sessionFactory = sessionFactory;
}

private DecoratedObjectFactory newDecoratedObjectFactory()
{
DecoratedObjectFactory objectFactory = new DecoratedObjectFactory();
objectFactory.addDecorator(new DeprecationWarning());
return objectFactory;
}

public Future<Session> connect(Object websocket, URI toUri) throws IOException
{
ClientUpgradeRequest request = new ClientUpgradeRequest(toUri);
Expand Down Expand Up @@ -439,7 +444,7 @@ public boolean isDispatchIO()
*/
public long getAsyncWriteTimeout()
{
return this.containerScope.getPolicy().getAsyncWriteTimeout();
return getPolicy().getAsyncWriteTimeout();
}

public SocketAddress getBindAddress()
Expand Down Expand Up @@ -548,7 +553,7 @@ public long getMaxTextMessageSize()
@Override
public DecoratedObjectFactory getObjectFactory()
{
return this.containerScope.getObjectFactory();
return this.objectFactorySupplier.get();
}

public Set<WebSocketSession> getOpenSessions()
Expand All @@ -559,7 +564,7 @@ public Set<WebSocketSession> getOpenSessions()
@Override
public WebSocketPolicy getPolicy()
{
return this.containerScope.getPolicy();
return this.policy;
}

public Scheduler getScheduler()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@

package org.eclipse.jetty.websocket.client;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.SocketTimeoutException;
Expand Down Expand Up @@ -73,15 +83,6 @@
import org.junit.Ignore;
import org.junit.Test;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;

public class ClientCloseTest
{
private static final Logger LOG = Log.getLogger(ClientCloseTest.class);
Expand Down Expand Up @@ -351,6 +352,8 @@ public void testHalfClose() throws Exception
// client close event on ws-endpoint
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.NORMAL), containsString("From Server"));
}

assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}

@Ignore("Need sbordet's help here")
Expand Down Expand Up @@ -452,6 +455,7 @@ public void testProtocolException() throws Exception

// client triggers close event on client ws-endpoint
clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.PROTOCOL),allOf(containsString("Invalid control frame"),containsString("length")));
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}

@Test
Expand Down Expand Up @@ -495,6 +499,7 @@ public void testReadEOF() throws Exception
containsString("Disconnected")
));
}
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}

@Test
Expand Down Expand Up @@ -533,10 +538,17 @@ public void testServerNoCloseHandshake() throws Exception
// client idle timeout triggers close event on client ws-endpoint
assertThat("OnError Latch", clientSocket.errorLatch.await(2, TimeUnit.SECONDS), is(true));
assertThat("OnError", clientSocket.error.get(), instanceOf(TimeoutException.class));

// client close should occur
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL),
anyOf(
containsString("Timeout"),
containsString("Disconnected")
));
}
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}

@Ignore("Issue #2625")
@Test(timeout = 5000L)
public void testStopLifecycle() throws Exception
{
Expand Down Expand Up @@ -585,6 +597,7 @@ public void testStopLifecycle() throws Exception
{
clientSockets.get(i).assertReceivedCloseEvent(timeout, is(StatusCode.SHUTDOWN), containsString("Shutdown"));
}
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}
finally
{
Expand Down Expand Up @@ -638,5 +651,6 @@ public void testWriteException() throws Exception
// assert - close reason message contains (write failure)
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL), containsString("EOF"));
}
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,6 @@ public SimpleContainerScope(WebSocketPolicy policy, ByteBufferPool bufferPool, E
}
}

@Override
protected void doStart() throws Exception
{
super.doStart();
}

@Override
protected void doStop() throws Exception
{
super.doStop();
}

@Override
public ByteBufferPool getBufferPool()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@

import javax.servlet.ServletContainerInitializer;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;

import org.eclipse.jetty.server.handler.ContextHandler;

public class NativeWebSocketServletContainerInitializer implements ServletContainerInitializer
{
Expand All @@ -33,14 +34,23 @@ public static NativeWebSocketConfiguration getDefaultFrom(ServletContext context
NativeWebSocketConfiguration configuration = (NativeWebSocketConfiguration) context.getAttribute(KEY);
if (configuration == null)
{
// Not provided to us, create a new default one.
configuration = new NativeWebSocketConfiguration(context);
context.setAttribute(KEY, configuration);

// Attach default configuration to context lifecycle
if (context instanceof ContextHandler.Context)
{
ContextHandler handler = ((ContextHandler.Context)context).getContextHandler();
// Let ContextHandler handle configuration lifecycle
handler.addBean(configuration);
}
}
return configuration;
}

@Override
public void onStartup(Set<Class<?>> c, ServletContext ctx) throws ServletException
public void onStartup(Set<Class<?>> c, ServletContext ctx)
{
// initialize
getDefaultFrom(ctx);
Expand Down
Loading

0 comments on commit 870c87f

Please sign in to comment.