Skip to content

Commit

Permalink
core,netty: quick patch for setListener regression
Browse files Browse the repository at this point in the history
resolves grpc/grpc#8715
now that setListener is called prior to
`JumpToApplicationThreadServerStreamListener` being completely ready to
use. We should not call `AbstractStream2#onStreamAllocated()` inside
`setListener()` anymore, but call it after `ServerImpl#streamCreated()`
is completed.
  • Loading branch information
dapengzhang0 authored and carl-mastrangelo committed Nov 23, 2016
1 parent 872c239 commit 08a2106
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 8 deletions.
12 changes: 7 additions & 5 deletions core/src/main/java/io/grpc/internal/AbstractServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected interface Sink {
* Tears down the stream, typically in the event of a timeout. This method may be called
* multiple times and from any thread.
*
* <p>This is a clone of {@link ServerStream#cancel()}.
* <p>This is a clone of {@link ServerStream#cancel(Status)}.
*/
void cancel(Status status);
}
Expand Down Expand Up @@ -177,11 +177,13 @@ protected TransportState(int maxMessageSize) {
* thread.
*/
public final void setListener(ServerStreamListener listener) {
this.listener = Preconditions.checkNotNull(listener);
Preconditions.checkState(this.listener == null, "setListener should be called only once");
this.listener = Preconditions.checkNotNull(listener, "listener");
}

// Now that the stream has actually been initialized, call the listener's onReady callback if
// appropriate.
onStreamAllocated();
@Override
public final void onStreamAllocated() {
super.onStreamAllocated();
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/io/grpc/internal/AbstractStream2.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private boolean isReady() {
* StreamListener#onReady()} handler if appropriate. This must be called from the transport
* thread, since the listener may be called back directly.
*/
protected final void onStreamAllocated() {
protected void onStreamAllocated() {
checkState(listener() != null);
synchronized (onReadyLock) {
checkState(!allocated, "Already allocated");
Expand Down
14 changes: 12 additions & 2 deletions core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,27 @@ public void completeWithoutClose() {

@Test
public void setListener_setOnlyOnce() {

stream.transportState().setListener(new ServerStreamListenerBase());
thrown.expect(IllegalStateException.class);

stream.transportState().setListener(new ServerStreamListenerBase());
}

@Test
public void setListener_readyCalled() {
public void listenerReady_onlyOnce() {
stream.transportState().setListener(new ServerStreamListenerBase());
stream.transportState().onStreamAllocated();
thrown.expect(IllegalStateException.class);

stream.transportState().onStreamAllocated();
}


@Test
public void listenerReady_readyCalled() {
ServerStreamListener streamListener = mock(ServerStreamListener.class);
stream.transportState().setListener(streamListener);
stream.transportState().onStreamAllocated();

verify(streamListener).onReady();
}
Expand Down
1 change: 1 addition & 0 deletions netty/src/main/java/io/grpc/netty/NettyServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers

Metadata metadata = Utils.convertHeaders(headers);
transportListener.streamCreated(stream, method, metadata);
state.onStreamAllocated();
http2Stream.setProperty(streamKey, state);
} catch (Http2Exception e) {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
new NettyServerStream.TransportState(handler, http2Stream, DEFAULT_MAX_MESSAGE_SIZE);
NettyServerStream stream = new NettyServerStream(channel, state);
stream.transportState().setListener(serverListener);
state.onStreamAllocated();
verify(serverListener, atLeastOnce()).onReady();
verifyNoMoreInteractions(serverListener);
return stream;
Expand Down

0 comments on commit 08a2106

Please sign in to comment.