Skip to content

Commit

Permalink
Issue #2655 - Removing closed WebSocket Session's from WebSocketClient
Browse files Browse the repository at this point in the history
Signed-off-by: Joakim Erdfelt <[email protected]>
  • Loading branch information
joakime committed Jun 12, 2018
1 parent 71dc6d9 commit fe2d969
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,12 @@ public void assertValidTextMessageSize(int requestedSize)

public WebSocketPolicy clonePolicy()
{
WebSocketPolicy clone = new WebSocketPolicy(this.behavior);
return clonePolicy(this.behavior);
}

public WebSocketPolicy clonePolicy(WebSocketBehavior behavior)
{
WebSocketPolicy clone = new WebSocketPolicy(behavior);
clone.idleTimeout = this.idleTimeout;
clone.maxTextMessageSize = this.maxTextMessageSize;
clone.maxTextMessageBufferSize = this.maxTextMessageBufferSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@
import org.eclipse.jetty.websocket.common.WebSocketSessionFactory;
import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
import org.eclipse.jetty.websocket.common.scopes.DelegatedContainerScope;
import org.eclipse.jetty.websocket.common.scopes.SimpleContainerScope;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;

/**
Expand All @@ -70,12 +68,16 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
// From HttpClient
private final HttpClient httpClient;

//
private final WebSocketContainerScope containerScope;
// CDI layer
private final DecoratedObjectFactory objectFactory;

// WebSocket Specifics
private final WebSocketPolicy policy;
private final WebSocketExtensionFactory extensionRegistry;
private final EventDriverFactory eventDriverFactory;
private final SessionFactory sessionFactory;

// ID Generator
private final int id = ThreadLocalRandom.current().nextInt();

// defaults to true for backwards compatibility
Expand Down Expand Up @@ -112,11 +114,13 @@ public WebSocketClient(HttpClient httpClient)
*/
public WebSocketClient(HttpClient httpClient, DecoratedObjectFactory objectFactory)
{
this.containerScope = new SimpleContainerScope(WebSocketPolicy.newClientPolicy(),new MappedByteBufferPool(),objectFactory);

this.httpClient = httpClient;
this.extensionRegistry = new WebSocketExtensionFactory(containerScope);
this.eventDriverFactory = new EventDriverFactory(containerScope);
this.sessionFactory = new WebSocketSessionFactory(containerScope);
this.policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
this.objectFactory = objectFactory;
this.extensionRegistry = new WebSocketExtensionFactory(this);
this.eventDriverFactory = new EventDriverFactory(this);
this.sessionFactory = new WebSocketSessionFactory(this);
}

/**
Expand Down Expand Up @@ -231,13 +235,14 @@ private WebSocketClient(SslContextFactory sslContextFactory, Executor executor,
this.httpClient.setExecutor(executor);
this.httpClient.setByteBufferPool(bufferPool);
addBean(this.httpClient);

this.containerScope = new SimpleContainerScope(WebSocketPolicy.newClientPolicy(), bufferPool, objectFactory);

this.extensionRegistry = new WebSocketExtensionFactory(containerScope);
this.policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
this.objectFactory = objectFactory;

this.extensionRegistry = new WebSocketExtensionFactory(this);

this.eventDriverFactory = new EventDriverFactory(containerScope);
this.sessionFactory = new WebSocketSessionFactory(containerScope);
this.eventDriverFactory = new EventDriverFactory(this);
this.sessionFactory = new WebSocketSessionFactory(this);
}

/**
Expand Down Expand Up @@ -271,19 +276,6 @@ public WebSocketClient(final WebSocketContainerScope scope, EventDriverFactory e
*/
public WebSocketClient(final WebSocketContainerScope scope, EventDriverFactory eventDriverFactory, SessionFactory sessionFactory, HttpClient httpClient)
{
WebSocketContainerScope clientScope;
if (scope.getPolicy().getBehavior() == WebSocketBehavior.CLIENT)
{
clientScope = scope;
}
else
{
// We need to wrap the scope
clientScope = new DelegatedContainerScope(WebSocketPolicy.newClientPolicy(), scope);
}

this.containerScope = clientScope;

if(httpClient == null)
{
this.httpClient = HttpClientProvider.get(scope);
Expand All @@ -293,8 +285,10 @@ public WebSocketClient(final WebSocketContainerScope scope, EventDriverFactory e
{
this.httpClient = httpClient;
}

this.extensionRegistry = new WebSocketExtensionFactory(containerScope);

this.policy = scope.getPolicy().clonePolicy(WebSocketBehavior.CLIENT);
this.objectFactory = getObjectFactory();
this.extensionRegistry = new WebSocketExtensionFactory(this);

this.eventDriverFactory = eventDriverFactory;
this.sessionFactory = sessionFactory;
Expand Down Expand Up @@ -439,7 +433,7 @@ public boolean isDispatchIO()
*/
public long getAsyncWriteTimeout()
{
return this.containerScope.getPolicy().getAsyncWriteTimeout();
return getPolicy().getAsyncWriteTimeout();
}

public SocketAddress getBindAddress()
Expand Down Expand Up @@ -548,7 +542,7 @@ public long getMaxTextMessageSize()
@Override
public DecoratedObjectFactory getObjectFactory()
{
return this.containerScope.getObjectFactory();
return this.objectFactory;
}

public Set<WebSocketSession> getOpenSessions()
Expand All @@ -559,7 +553,7 @@ public Set<WebSocketSession> getOpenSessions()
@Override
public WebSocketPolicy getPolicy()
{
return this.containerScope.getPolicy();
return this.policy;
}

public Scheduler getScheduler()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@

package org.eclipse.jetty.websocket.client;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.SocketTimeoutException;
Expand Down Expand Up @@ -73,15 +83,6 @@
import org.junit.Ignore;
import org.junit.Test;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;

public class ClientCloseTest
{
private static final Logger LOG = Log.getLogger(ClientCloseTest.class);
Expand Down Expand Up @@ -351,6 +352,8 @@ public void testHalfClose() throws Exception
// client close event on ws-endpoint
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.NORMAL), containsString("From Server"));
}

assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}

@Ignore("Need sbordet's help here")
Expand Down Expand Up @@ -452,6 +455,7 @@ public void testProtocolException() throws Exception

// client triggers close event on client ws-endpoint
clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.PROTOCOL),allOf(containsString("Invalid control frame"),containsString("length")));
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}

@Test
Expand Down Expand Up @@ -495,6 +499,7 @@ public void testReadEOF() throws Exception
containsString("Disconnected")
));
}
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}

@Test
Expand Down Expand Up @@ -533,10 +538,17 @@ public void testServerNoCloseHandshake() throws Exception
// client idle timeout triggers close event on client ws-endpoint
assertThat("OnError Latch", clientSocket.errorLatch.await(2, TimeUnit.SECONDS), is(true));
assertThat("OnError", clientSocket.error.get(), instanceOf(TimeoutException.class));

// client close should occur
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL),
anyOf(
containsString("Timeout"),
containsString("Disconnected")
));
}
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}

@Ignore("Issue #2625")
@Test(timeout = 5000L)
public void testStopLifecycle() throws Exception
{
Expand Down Expand Up @@ -585,6 +597,7 @@ public void testStopLifecycle() throws Exception
{
clientSockets.get(i).assertReceivedCloseEvent(timeout, is(StatusCode.SHUTDOWN), containsString("Shutdown"));
}
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}
finally
{
Expand Down Expand Up @@ -638,5 +651,6 @@ public void testWriteException() throws Exception
// assert - close reason message contains (write failure)
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL), containsString("EOF"));
}
assertThat("Client Open Sessions", client.getOpenSessions(), empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,6 @@ public SimpleContainerScope(WebSocketPolicy policy, ByteBufferPool bufferPool, E
}
}

@Override
protected void doStart() throws Exception
{
super.doStart();
}

@Override
protected void doStop() throws Exception
{
super.doStop();
}

@Override
public ByteBufferPool getBufferPool()
{
Expand Down

0 comments on commit fe2d969

Please sign in to comment.