diff --git a/core/src/main/java/io/grpc/internal/AbstractServerStream.java b/core/src/main/java/io/grpc/internal/AbstractServerStream.java index a71fd58d87f..887b29ba4a2 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerStream.java @@ -85,7 +85,7 @@ protected interface Sink { * Tears down the stream, typically in the event of a timeout. This method may be called * multiple times and from any thread. * - *
This is a clone of {@link ServerStream#cancel()}. + *
This is a clone of {@link ServerStream#cancel(Status)}. */ void cancel(Status status); } @@ -177,11 +177,13 @@ protected TransportState(int maxMessageSize) { * thread. */ public final void setListener(ServerStreamListener listener) { - this.listener = Preconditions.checkNotNull(listener); + Preconditions.checkState(this.listener == null, "setListener should be called only once"); + this.listener = Preconditions.checkNotNull(listener, "listener"); + } - // Now that the stream has actually been initialized, call the listener's onReady callback if - // appropriate. - onStreamAllocated(); + @Override + public final void onStreamAllocated() { + super.onStreamAllocated(); } @Override diff --git a/core/src/main/java/io/grpc/internal/AbstractStream2.java b/core/src/main/java/io/grpc/internal/AbstractStream2.java index c346f998455..64f23f909fc 100644 --- a/core/src/main/java/io/grpc/internal/AbstractStream2.java +++ b/core/src/main/java/io/grpc/internal/AbstractStream2.java @@ -224,7 +224,7 @@ private boolean isReady() { * StreamListener#onReady()} handler if appropriate. This must be called from the transport * thread, since the listener may be called back directly. */ - protected final void onStreamAllocated() { + protected void onStreamAllocated() { checkState(listener() != null); synchronized (onReadyLock) { checkState(!allocated, "Already allocated"); diff --git a/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java index dd8d8f86cdd..a3865792945 100644 --- a/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java @@ -107,7 +107,6 @@ public void completeWithoutClose() { @Test public void setListener_setOnlyOnce() { - stream.transportState().setListener(new ServerStreamListenerBase()); thrown.expect(IllegalStateException.class); @@ -115,9 +114,20 @@ public void setListener_setOnlyOnce() { } @Test - public void setListener_readyCalled() { + public void listenerReady_onlyOnce() { + stream.transportState().setListener(new ServerStreamListenerBase()); + stream.transportState().onStreamAllocated(); + thrown.expect(IllegalStateException.class); + + stream.transportState().onStreamAllocated(); + } + + + @Test + public void listenerReady_readyCalled() { ServerStreamListener streamListener = mock(ServerStreamListener.class); stream.transportState().setListener(streamListener); + stream.transportState().onStreamAllocated(); verify(streamListener).onReady(); } diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 8397277d5d6..6f4e6bec47f 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -194,6 +194,7 @@ private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers Metadata metadata = Utils.convertHeaders(headers); transportListener.streamCreated(stream, method, metadata); + state.onStreamAllocated(); http2Stream.setProperty(streamKey, state); } catch (Http2Exception e) { throw e; diff --git a/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java b/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java index 2f30ae1e4aa..9744108e305 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java @@ -294,6 +294,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable { new NettyServerStream.TransportState(handler, http2Stream, DEFAULT_MAX_MESSAGE_SIZE); NettyServerStream stream = new NettyServerStream(channel, state); stream.transportState().setListener(serverListener); + state.onStreamAllocated(); verify(serverListener, atLeastOnce()).onReady(); verifyNoMoreInteractions(serverListener); return stream;