Skip to content

Commit

Permalink
Merge pull request vlingo#57 from vlingo/improvement/dynamic-cluster-…
Browse files Browse the repository at this point in the history
…nodes

Dynamic cluster nodes
  • Loading branch information
pflueras authored Jul 26, 2022
2 parents 18310b4 + 631d2b1 commit 9c4ae93
Show file tree
Hide file tree
Showing 20 changed files with 129 additions and 261 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import io.vlingo.xoom.actors.Logger;
import io.vlingo.xoom.wire.node.*;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashMap;
Expand All @@ -17,33 +16,16 @@
import java.util.stream.Collectors;

public abstract class AbstractManagedOutboundChannelProvider implements ManagedOutboundChannelProvider {

private static final org.slf4j.Logger logger = LoggerFactory.getLogger(AbstractManagedOutboundChannelProvider.class);

protected static Address addressOf(final Node node, final AddressType type) {
logger.debug("addressOf {}", node);
return (type == AddressType.OP ? node.operationalAddress() : node.applicationAddress());
}

private final Configuration configuration;
private final Node localNode;
private final Map<Id, Address> allAddresses; // all addresses provided by configuration
private final AddressType addressType;
private final Logger logger;

private final Map<Id, ManagedOutboundChannel> nodeChannels = new HashMap<>(); // live channels created on demand

protected AbstractManagedOutboundChannelProvider(final Node localNode, final AddressType type, final Configuration configuration) {
this.configuration = configuration;
protected AbstractManagedOutboundChannelProvider(final Node localNode, final AddressType type, final Logger logger) {
this.localNode = localNode;
this.allAddresses = new HashMap<>();
for (final Node node : configuration.allNodes()) { // exclude local node?
allAddresses.put(node.id(), addressOf(node, type));
}
}

protected AbstractManagedOutboundChannelProvider(final Node localNode, final AddressType type, final Configuration configuration, boolean createEagerly) {
this(localNode, type, configuration);
if (createEagerly) {
allAddresses.keySet().forEach(this::channelFor);
}
this.addressType = type;
this.logger = logger;
}

@Override
Expand All @@ -54,17 +36,17 @@ public Map<Id, ManagedOutboundChannel> allOtherNodeChannels() {
}

@Override
public ManagedOutboundChannel channelFor(final Id id) {
final ManagedOutboundChannel channel = nodeChannels.get(id);
public ManagedOutboundChannel channelFor(final Node node) {
final ManagedOutboundChannel channel = nodeChannels.get(node.id());

if (channel != null) {
return channel;
}

final Address nodeAddress = this.allAddresses.get(id);
final ManagedOutboundChannel unopenedChannel = unopenedChannelFor(configuration.nodeMatching(id), nodeAddress, configuration.logger());
final Address nodeAddress = addressOf(node, addressType);
final ManagedOutboundChannel unopenedChannel = unopenedChannelFor(node, nodeAddress, logger);

nodeChannels.put(id, unopenedChannel);
nodeChannels.put(node.id(), unopenedChannel);

return unopenedChannel;
}
Expand All @@ -77,8 +59,8 @@ public Map<Id, ManagedOutboundChannel> channelsFor(final Collection<Node> nodes)
ManagedOutboundChannel channel = nodeChannels.get(node.id());

if (channel == null) {
final Address nodeAddress = allAddresses.get(node.id());
channel = unopenedChannelFor(node, nodeAddress, configuration.logger());
final Address nodeAddress = addressOf(node, addressType);
channel = unopenedChannelFor(node, nodeAddress, logger);
nodeChannels.put(node.id(), channel);
}

Expand Down Expand Up @@ -106,5 +88,10 @@ public void close(final Id id) {
}
}

protected Address addressOf(final Node node, final AddressType type) {
logger.debug("addressOf {}", node);
return (type == AddressType.OP ? node.operationalAddress() : node.applicationAddress());
}

protected abstract ManagedOutboundChannel unopenedChannelFor(final Node node, final Address nodeAddress, final Logger logger);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
import io.vlingo.xoom.wire.message.ConsumerByteBuffer;
import io.vlingo.xoom.wire.message.RawMessage;
import io.vlingo.xoom.wire.node.Id;
import io.vlingo.xoom.wire.node.Node;

public interface ApplicationOutboundStream extends Stoppable {
public static ApplicationOutboundStream instance(
static ApplicationOutboundStream instance(
final Stage stage,
final ManagedOutboundChannelProvider provider,
final ResourcePool<ConsumerByteBuffer, String> byteBufferPool) {
Expand All @@ -35,9 +36,9 @@ public static ApplicationOutboundStream instance(
}

void broadcast(final RawMessage message);
void sendTo(final RawMessage message, final Id targetId);
void sendTo(final RawMessage message, final Node targetNode);

static class ApplicationOutboundStreamInstantiator implements ActorInstantiator<ApplicationOutboundStreamActor> {
class ApplicationOutboundStreamInstantiator implements ActorInstantiator<ApplicationOutboundStreamActor> {
private static final long serialVersionUID = 3996997791426073111L;

private final ResourcePool<ConsumerByteBuffer, String> byteBufferPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.vlingo.xoom.wire.message.ConsumerByteBuffer;
import io.vlingo.xoom.wire.message.RawMessage;
import io.vlingo.xoom.wire.node.Id;
import io.vlingo.xoom.wire.node.Node;

public class ApplicationOutboundStreamActor extends Actor
implements ApplicationOutboundStream {
Expand All @@ -35,8 +36,8 @@ public void broadcast(final RawMessage message) {
}

@Override
public void sendTo(final RawMessage message, final Id targetId) {
outbound.sendTo(message, targetId);
public void sendTo(final RawMessage message, final Node targetNode) {
outbound.sendTo(message, targetNode);
}

//===================================
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

public interface ManagedOutboundChannelProvider {
Map<Id, ManagedOutboundChannel> allOtherNodeChannels();
ManagedOutboundChannel channelFor(final Id id);
ManagedOutboundChannel channelFor(final Node node);
Map<Id, ManagedOutboundChannel> channelsFor(final Collection<Node> nodes);
void close();
void close(final Id id);
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/vlingo/xoom/wire/fdx/outbound/Outbound.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,17 @@ public ConsumerByteBuffer lendByteBuffer() {
return pool.acquire("Outbound#lendByteBuffer");
}

public void open(final Id id) {
provider.channelFor(id);
public void open(final Node node) {
provider.channelFor(node);
}

public void sendTo(final RawMessage message, final Id id) {
sendTo(bytesFrom(message, pool.acquire("Outbound#sendTo")), id);
public void sendTo(final RawMessage message, final Node targetNode) {
sendTo(bytesFrom(message, pool.acquire("Outbound#sendTo")), targetNode);
}

public void sendTo(final ConsumerByteBuffer buffer, final Id id) {
open(id);
provider.channelFor(id).writeAsync(buffer.asByteBuffer())
public void sendTo(final ConsumerByteBuffer buffer, final Node targetNode) {
open(targetNode);
provider.channelFor(targetNode).writeAsync(buffer.asByteBuffer())
.andFinallyConsume((aVoid) -> buffer.release());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.vlingo.xoom.wire.fdx.outbound.ManagedOutboundChannel;
import io.vlingo.xoom.wire.node.Address;
import io.vlingo.xoom.wire.node.AddressType;
import io.vlingo.xoom.wire.node.Configuration;
import io.vlingo.xoom.wire.node.Node;

import java.time.Duration;
Expand All @@ -34,10 +33,10 @@ private ClientTransport transportFor(final Address address) {
*
* @param node the outbound node to connect to
* @param type the address type
* @param configuration current node configuration
* @param logger the logger
*/
public ManagedOutboundRSocketChannelProvider(final Node node, final AddressType type, final Configuration configuration) {
super(node, type, configuration);
public ManagedOutboundRSocketChannelProvider(final Node node, final AddressType type, final Logger logger) {
super(node, type, logger);
this.clientTransportProvider = address -> TcpClientTransport.create(address.hostName(), address.port());
this.connectionTimeout = Duration.ofMillis(100);
}
Expand All @@ -47,14 +46,14 @@ public ManagedOutboundRSocketChannelProvider(final Node node, final AddressType
*
* @param node the outbound node to connect to
* @param type the address type
* @param configuration current node configuration
* @param logger the logger
* @param connectionTimeout connection timeout duration
* @param clientTransportProvider function that given a remote node address, returns a instance of {@link ClientTransport}
*/
public ManagedOutboundRSocketChannelProvider(final Node node, final AddressType type, final Configuration configuration,
public ManagedOutboundRSocketChannelProvider(final Node node, final AddressType type, final Logger logger,
final Duration connectionTimeout,
final Function<Address, ClientTransport> clientTransportProvider) {
super(node, type, configuration);
super(node, type, logger);
this.connectionTimeout = connectionTimeout;
this.clientTransportProvider = clientTransportProvider;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
import io.vlingo.xoom.wire.fdx.outbound.ManagedOutboundChannel;
import io.vlingo.xoom.wire.node.Address;
import io.vlingo.xoom.wire.node.AddressType;
import io.vlingo.xoom.wire.node.Configuration;
import io.vlingo.xoom.wire.node.Node;

public class ManagedOutboundSocketChannelProvider extends AbstractManagedOutboundChannelProvider {
public ManagedOutboundSocketChannelProvider(final Node node, final AddressType type, final Configuration configuration) {
super(node, type, configuration);
public ManagedOutboundSocketChannelProvider(final Node node, final AddressType type, final Logger logger) {
super(node, type, logger);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static Node nodeFromRecord(final String record) {
final Address opAddress = addressFromRecord(record, AddressType.OP);
final Address appAddress = addressFromRecord(record, AddressType.APP);

return new Node(id, name, opAddress, appAddress, false);
return new Node(id, name, false, opAddress, appAddress);
}

public static Id idFrom(final String content) {
Expand Down
27 changes: 0 additions & 27 deletions src/main/java/io/vlingo/xoom/wire/node/Configuration.java

This file was deleted.

Loading

0 comments on commit 9c4ae93

Please sign in to comment.