Skip to content

Commit

Permalink
api: deprecate Helper.updateSubchannelAddresses() and add equivalent …
Browse files Browse the repository at this point in the history
…on Subchannel (grpc#5802)

Resolves grpc#5676
  • Loading branch information
zhangkun83 authored May 30, 2019
1 parent 9b4c958 commit af2c16d
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 11 deletions.
17 changes: 17 additions & 0 deletions api/src/main/java/io/grpc/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,9 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) {
* href="https://github.com/grpc/grpc-java/issues/5015">#5015</a> for the background.
*
* @since 1.4.0
* @deprecated use {@link Subchannel#updateAddresses} instead
*/
@Deprecated
public final void updateSubchannelAddresses(
Subchannel subchannel, EquivalentAddressGroup addrs) {
checkNotNull(addrs, "addrs");
Expand All @@ -982,7 +984,9 @@ public final void updateSubchannelAddresses(
* @throws IllegalArgumentException if {@code subchannel} was not returned from {@link
* #createSubchannel} or {@code addrs} is empty
* @since 1.14.0
* @deprecated use {@link Subchannel#updateAddresses} instead
*/
@Deprecated
public void updateSubchannelAddresses(
Subchannel subchannel, List<EquivalentAddressGroup> addrs) {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -1301,6 +1305,19 @@ public ChannelLogger getChannelLogger() {
throw new UnsupportedOperationException();
}

/**
* Replaces the existing addresses used with this {@code Subchannel}. If the new and old
* addresses overlap, the Subchannel can continue using an existing connection.
*
* <p>It must be called from the Synchronization Context or will throw.
*
* @throws IllegalArgumentException if {@code addrs} is empty
* @since 1.22.0
*/
public void updateAddresses(List<EquivalentAddressGroup> addrs) {
throw new UnsupportedOperationException();
}

/**
* (Internal use only) returns an object that represents the underlying subchannel that is used
* by the Channel for sending RPCs when this {@link Subchannel} is picked. This is an opaque
Expand Down
2 changes: 2 additions & 0 deletions api/src/test/java/io/grpc/LoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public void helper_createSubchannelList_throws() {
}
}

@Deprecated
@Test
public void helper_updateSubchannelAddresses_delegates() {
class OverrideUpdateSubchannel extends NoopHelper {
Expand All @@ -187,6 +188,7 @@ public void updateSubchannelAddresses(
assertThat(helper.ran).isTrue();
}

@Deprecated
@Test(expected = UnsupportedOperationException.class)
public void helper_updateSubchannelAddressesList_throws() {
new NoopHelper().updateSubchannelAddresses(null, Arrays.asList(eag));
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,7 @@ public void run() {
syncContext.execute(new LoadBalancerRefreshNameResolution());
}

@Deprecated
@Override
public void updateSubchannelAddresses(
LoadBalancer.Subchannel subchannel, List<EquivalentAddressGroup> addrs) {
Expand Down Expand Up @@ -1621,6 +1622,12 @@ public Object getInternalSubchannel() {
public ChannelLogger getChannelLogger() {
return subchannelLogger;
}

@Override
public void updateAddresses(List<EquivalentAddressGroup> addrs) {
syncContext.throwIfNotInThisSynchronizationContext();
subchannel.updateAddresses(addrs);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) {
helper.updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel)));
subchannel.requestConnection();
} else {
helper.updateSubchannelAddresses(subchannel, servers);
subchannel.updateAddresses(servers);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) {
return delegate().createSubchannel(args);
}

@Deprecated
@Override
public void updateSubchannelAddresses(
Subchannel subchannel, List<EquivalentAddressGroup> addrs) {
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/io/grpc/util/ForwardingSubchannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public Object getInternalSubchannel() {
return delegate().getInternalSubchannel();
}

@Override
public void updateAddresses(List<EquivalentAddressGroup> addrs) {
delegate().updateAddresses(addrs);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ private static void updateSubchannelAddressesSafely(
new Runnable() {
@Override
public void run() {
helper.updateSubchannelAddresses(subchannel, addrs);
subchannel.updateAddresses(Collections.singletonList(addrs));
}
});
}
Expand Down
25 changes: 25 additions & 0 deletions core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,16 @@ public String toString() {
return "test-addr";
}
};
private final SocketAddress socketAddress2 =
new SocketAddress() {
@Override
public String toString() {
return "test-addr";
}
};
private final EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress);
private final EquivalentAddressGroup addressGroup2 =
new EquivalentAddressGroup(Arrays.asList(socketAddress, socketAddress2));
private final FakeClock timer = new FakeClock();
private final FakeClock executor = new FakeClock();
private final FakeClock balancerRpcExecutor = new FakeClock();
Expand Down Expand Up @@ -1322,6 +1331,11 @@ public void run() {
.newClientTransport(
eq(socketAddress), eq(clientTransportOptions), isA(TransportLogger.class));

// updateAddresses()
updateAddressesSafely(helper, sub1, Collections.singletonList(addressGroup2));
assertThat(((InternalSubchannel) sub1.getInternalSubchannel()).getAddressGroups())
.isEqualTo(Collections.singletonList(addressGroup2));

// shutdown() has a delay
shutdownSafely(helper, sub1);
timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS - 1, TimeUnit.SECONDS);
Expand Down Expand Up @@ -4116,6 +4130,17 @@ public void run() {
});
}

private static void updateAddressesSafely(
Helper helper, final Subchannel subchannel, final List<EquivalentAddressGroup> addrs) {
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
subchannel.updateAddresses(addrs);
}
});
}

private static void shutdownSafely(
final Helper helper, final Subchannel subchannel) {
helper.getSynchronizationContext().execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,15 @@ public void pickAfterResolvedAndUnchanged() throws Exception {
verify(mockSubchannel).requestConnection();
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
verify(mockSubchannel).updateAddresses(eq(servers));
verifyNoMoreInteractions(mockSubchannel);

verify(mockHelper).createSubchannel(createArgsCaptor.capture());
assertThat(createArgsCaptor.getValue()).isNotNull();
verify(mockHelper)
.updateBalancingState(isA(ConnectivityState.class), isA(SubchannelPicker.class));
// Updating the subchannel addresses is unnecessary, but doesn't hurt anything
verify(mockHelper).updateSubchannelAddresses(
eq(mockSubchannel), ArgumentMatchers.<EquivalentAddressGroup>anyList());
verify(mockSubchannel).updateAddresses(ArgumentMatchers.<EquivalentAddressGroup>anyList());

verifyNoMoreInteractions(mockHelper);
}
Expand All @@ -191,7 +191,7 @@ public void pickAfterResolvedAndChanged() throws Exception {
List<EquivalentAddressGroup> newServers =
Lists.newArrayList(new EquivalentAddressGroup(socketAddr));

InOrder inOrder = inOrder(mockHelper);
InOrder inOrder = inOrder(mockHelper, mockSubchannel);

loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
Expand All @@ -205,7 +205,7 @@ public void pickAfterResolvedAndChanged() throws Exception {

loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build());
inOrder.verify(mockHelper).updateSubchannelAddresses(eq(mockSubchannel), eq(newServers));
inOrder.verify(mockSubchannel).updateAddresses(eq(newServers));

verifyNoMoreInteractions(mockSubchannel);
verifyNoMoreInteractions(mockHelper);
Expand Down
2 changes: 1 addition & 1 deletion grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ private void useRoundRobinLists(
} else {
checkState(subchannels.size() == 1, "Unexpected Subchannel count: %s", subchannels);
subchannel = subchannels.values().iterator().next();
helper.updateSubchannelAddresses(subchannel, eagList);
subchannel.updateAddresses(eagList);
}
subchannels = Collections.singletonMap(eagList, subchannel);
newBackendList.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1776,8 +1776,7 @@ public void grpclbWorking_pickFirstMode() throws Exception {
// createSubchannel() has ever been called only once
verify(helper, times(1)).createSubchannel(any(List.class), any(Attributes.class));
assertThat(mockSubchannels).isEmpty();
inOrder.verify(helper).updateSubchannelAddresses(
same(subchannel),
verify(subchannel).updateAddresses(
eq(Arrays.asList(
new EquivalentAddressGroup(backends2.get(0).addr, eagAttrsWithToken("token0001")),
new EquivalentAddressGroup(backends2.get(2).addr,
Expand Down Expand Up @@ -1874,8 +1873,7 @@ public void pickFirstMode_fallback() throws Exception {
// createSubchannel() has ever been called only once
verify(helper, times(1)).createSubchannel(any(List.class), any(Attributes.class));
assertThat(mockSubchannels).isEmpty();
inOrder.verify(helper).updateSubchannelAddresses(
same(subchannel),
verify(subchannel).updateAddresses(
eq(Arrays.asList(
new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")),
new EquivalentAddressGroup(backends1.get(1).addr,
Expand Down

0 comments on commit af2c16d

Please sign in to comment.