Skip to content

Commit

Permalink
Merge pull request vlingo#56 from vlingo/feature/dynamic-app-channels
Browse files Browse the repository at this point in the history
On demand creation of channels
  • Loading branch information
pflueras authored Jun 29, 2022
2 parents b1f50a9 + 7487597 commit ed5ad81
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,51 @@
// one at https://mozilla.org/MPL/2.0/.
package io.vlingo.xoom.wire.fdx.outbound;

import io.vlingo.xoom.actors.Logger;
import io.vlingo.xoom.wire.node.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;

public abstract class AbstractManagedOutboundChannelProvider implements ManagedOutboundChannelProvider {

private static final Logger logger = LoggerFactory.getLogger(AbstractManagedOutboundChannelProvider.class);
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(AbstractManagedOutboundChannelProvider.class);

protected static Address addressOf(final Node node, final AddressType type) {
logger.debug("addressOf {}", node);
return (type == AddressType.OP ? node.operationalAddress() : node.applicationAddress());
}

private final Configuration configuration;
private final Node node;
private final Map<Id, ManagedOutboundChannel> nodeChannels = new HashMap<>();
private final AddressType type;
private final Node localNode;
private final Map<Id, Address> allAddresses; // all addresses provided by configuration
private final Map<Id, ManagedOutboundChannel> nodeChannels = new HashMap<>(); // live channels created on demand

protected AbstractManagedOutboundChannelProvider(final Node node, final AddressType type, final Configuration configuration) {
protected AbstractManagedOutboundChannelProvider(final Node localNode, final AddressType type, final Configuration configuration) {
this.configuration = configuration;
this.node = node;
this.type = type;
this.localNode = localNode;
this.allAddresses = new HashMap<>();
for (final Node node : configuration.allNodes()) { // exclude local node?
allAddresses.put(node.id(), addressOf(node, type));
}
}

protected AbstractManagedOutboundChannelProvider(final Node localNode, final AddressType type, final Configuration configuration, boolean createEagerly) {
this(localNode, type, configuration);
if (createEagerly) {
allAddresses.keySet().forEach(this::channelFor);
}
}

@Override
public Map<Id, ManagedOutboundChannel> allOtherNodeChannels() {
return channelsFor(configuration.allOtherNodes(node.id()));
return nodeChannels.entrySet().stream()
.filter(entry -> !entry.getKey().equals(localNode.id()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
Expand All @@ -48,7 +61,8 @@ public ManagedOutboundChannel channelFor(final Id id) {
return channel;
}

final ManagedOutboundChannel unopenedChannel = unopenedChannelFor(configuration.nodeMatching(id), configuration, type);
final Address nodeAddress = this.allAddresses.get(id);
final ManagedOutboundChannel unopenedChannel = unopenedChannelFor(configuration.nodeMatching(id), nodeAddress, configuration.logger());

nodeChannels.put(id, unopenedChannel);

Expand All @@ -63,7 +77,8 @@ public Map<Id, ManagedOutboundChannel> channelsFor(final Collection<Node> nodes)
ManagedOutboundChannel channel = nodeChannels.get(node.id());

if (channel == null) {
channel = unopenedChannelFor(node, configuration, type);
final Address nodeAddress = allAddresses.get(node.id());
channel = unopenedChannelFor(node, nodeAddress, configuration.logger());
nodeChannels.put(node.id(), channel);
}

Expand Down Expand Up @@ -91,13 +106,5 @@ public void close(final Id id) {
}
}

@Override
public void configureKnownChannels() {
for (final Node node : configuration.allOtherNodes(node.id())) {
nodeChannels.put(node.id(), unopenedChannelFor(node, configuration, type));
}
}

protected abstract ManagedOutboundChannel unopenedChannelFor(final Node node, final Configuration configuration, final AddressType type);

protected abstract ManagedOutboundChannel unopenedChannelFor(final Node node, final Address nodeAddress, final Logger logger);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,4 @@ public interface ManagedOutboundChannelProvider {
Map<Id, ManagedOutboundChannel> channelsFor(final Collection<Node> nodes);
void close();
void close(final Id id);
void configureKnownChannels();
}
2 changes: 0 additions & 2 deletions src/main/java/io/vlingo/xoom/wire/fdx/outbound/Outbound.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ public Outbound(

this.provider = provider;
this.pool = byteBufferPool;

this.provider.configureKnownChannels();
}

public void broadcast(final RawMessage message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.vlingo.xoom.actors.Logger;
import io.vlingo.xoom.wire.fdx.outbound.AbstractManagedOutboundChannelProvider;
import io.vlingo.xoom.wire.fdx.outbound.ManagedOutboundChannel;
import io.vlingo.xoom.wire.node.Address;
Expand All @@ -29,7 +30,7 @@ private ClientTransport transportFor(final Address address) {
}

/**
* Build a instance of provider that will create {@link RSocketOutboundChannel} using the default RSocket client transport, {@link TcpClientTransport}.
* Build an instance of provider that will create {@link RSocketOutboundChannel} using the default RSocket client transport, {@link TcpClientTransport}.
*
* @param node the outbound node to connect to
* @param type the address type
Expand Down Expand Up @@ -59,9 +60,8 @@ public ManagedOutboundRSocketChannelProvider(final Node node, final AddressType
}

@Override
protected ManagedOutboundChannel unopenedChannelFor(final Node node, final Configuration configuration, final AddressType type) {
final Address address = addressOf(node, type);
return new RSocketOutboundChannel(address, transportFor(address), this.connectionTimeout, configuration.logger());
protected ManagedOutboundChannel unopenedChannelFor(final Node node, final Address nodeAddress, final Logger logger) {
return new RSocketOutboundChannel(nodeAddress, transportFor(nodeAddress), this.connectionTimeout, logger);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

package io.vlingo.xoom.wire.fdx.outbound.tcp;

import io.vlingo.xoom.actors.Logger;
import io.vlingo.xoom.wire.fdx.outbound.AbstractManagedOutboundChannelProvider;
import io.vlingo.xoom.wire.fdx.outbound.ManagedOutboundChannel;
import io.vlingo.xoom.wire.node.Address;
import io.vlingo.xoom.wire.node.AddressType;
import io.vlingo.xoom.wire.node.Configuration;
import io.vlingo.xoom.wire.node.Node;
Expand All @@ -19,7 +21,7 @@ public ManagedOutboundSocketChannelProvider(final Node node, final AddressType t
}

@Override
protected ManagedOutboundChannel unopenedChannelFor(final Node node, final Configuration configuration, final AddressType type) {
return new ManagedOutboundSocketChannel(node, addressOf(node, type), configuration.logger());
protected ManagedOutboundChannel unopenedChannelFor(final Node node, final Address nodeAddress, final Logger logger) {
return new ManagedOutboundSocketChannel(node, nodeAddress, logger);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,17 @@ public abstract class AbstractManagedOutboundProviderTest extends AbstractMessag
private ManagedOutboundChannelProvider provider;

@Test
public void testProviderProvides() throws Exception {
assertEquals(2, provider.allOtherNodeChannels().size());
public void testProviderProvides() {
assertEquals(0, provider.allOtherNodeChannels().size()); // channels are lazily created

assertNotNull(provider.channelFor(Id.of(2)));

assertNotNull(provider.channelFor(Id.of(3)));

assertEquals(2, provider.channelsFor(allOtherNodes).size());
}

@Test
public void testProviderCloseAllReopen() throws Exception {
public void testProviderCloseAllReopen() {
provider.close();

assertNotNull(provider.channelFor(Id.of(3)));
Expand All @@ -46,16 +45,19 @@ public void testProviderCloseAllReopen() throws Exception {
}

@Test
public void testProviderCloseOneChannelReopen() throws Exception {
public void testProviderCloseOneChannelReopen() {
assertNotNull(provider.channelFor(Id.of(3))); // channels are created on demand; create the channel
provider.close(Id.of(3));

assertNotNull(provider.channelFor(Id.of(3)));
assertEquals(1, provider.allOtherNodeChannels().size());

assertNotNull(provider.channelFor(Id.of(2))); // create the channel
assertEquals(2, provider.allOtherNodeChannels().size());
}

@Before
public void setUp() throws Exception {
public void setUp() {
allOtherNodes = config.allOtherNodes(Id.of(1));

provider = getProvider(config.nodeMatching(Id.of(1)), AddressType.OP, config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,4 @@ public void close() {
public void close(final Id id) {

}

@Override
public void configureKnownChannels() {

}
}

0 comments on commit ed5ad81

Please sign in to comment.