Skip to content

Commit

Permalink
Extract ConnectionEstablisher from BackgroundPeerState. Introduce Par…
Browse files Browse the repository at this point in the history
…ameter object. And implement Builder for BackgroundPeerState and ConnectionEstablisher
  • Loading branch information
tkilias committed Aug 2, 2023
1 parent 73027cc commit fb72c81
Show file tree
Hide file tree
Showing 22 changed files with 923 additions and 476 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from exasol_advanced_analytics_framework.udf_communication.discovery import localhost
from exasol_advanced_analytics_framework.udf_communication.ip_address import IPAddress, Port
from exasol_advanced_analytics_framework.udf_communication.peer_communicator import PeerCommunicator
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.forward_register_peer_config import \
ForwardRegisterPeerConfig
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import SocketFactory


Expand All @@ -20,8 +22,10 @@ def create(
number_of_peers=number_of_instances,
listen_ip=listen_ip,
group_identifier=group_identifier,
is_forward_register_peer_leader=False,
is_forward_register_peer_enabled=False,
forward_register_peer_config=ForwardRegisterPeerConfig(
is_leader=False,
is_enabled=False,
),
socket_factory=socket_factory
)
discovery = localhost.DiscoveryStrategy(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from exasol_advanced_analytics_framework.udf_communication.discovery import multi_node
from exasol_advanced_analytics_framework.udf_communication.ip_address import IPAddress, Port
from exasol_advanced_analytics_framework.udf_communication.peer_communicator import PeerCommunicator
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.forward_register_peer_config import \
ForwardRegisterPeerConfig
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import SocketFactory


Expand All @@ -22,8 +24,10 @@ def create(
number_of_peers=number_of_instances,
listen_ip=listen_ip,
group_identifier=group_identifier,
is_forward_register_peer_leader=is_discovery_leader,
is_forward_register_peer_enabled=True,
forward_register_peer_config=ForwardRegisterPeerConfig(
is_leader=is_discovery_leader,
is_enabled=True,
),
socket_factory=socket_factory
)
discovery = multi_node.DiscoveryStrategy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ def _time_left_until_discovery_timeout_in_ns(self, begin_time_ns: int) -> int:
return max(0, time_left_until_timeout)

def discover_peers(self):
if not self._peer_communicator.is_forward_register_peer_enabled:
if not self._peer_communicator.forward_register_peer_config.is_enabled:
raise ValueError("PeerCommunicator.is_forward_register_peer_enabled needs to be true")
if self._peer_communicator.is_forward_register_peer_leader:
if self._peer_communicator.forward_register_peer_config.is_leader:
self._global_discovery_socket.bind()
self._send_ping()
begin_time_ns = time.monotonic_ns()
while not self._should_discovery_end(begin_time_ns):
if self._peer_communicator.is_forward_register_peer_leader:
if self._peer_communicator.forward_register_peer_config.is_leader:
self._receive_pings(begin_time_ns)
self._send_ping()
if not self._peer_communicator.are_all_peers_connected():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
from exasol_advanced_analytics_framework.udf_communication.peer import Peer
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.background_listener_thread import \
BackgroundListenerThread
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.forward_register_peer_config import \
ForwardRegisterPeerConfig
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.clock import Clock
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_timeout_config import \
ConnectionEstablisherTimeoutConfig
from exasol_advanced_analytics_framework.udf_communication.serialization import deserialize_message, serialize_message
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import SocketFactory, \
SocketType, Socket, PollerFlag
Expand All @@ -25,14 +29,11 @@ def __init__(self,
socket_factory: SocketFactory,
listen_ip: IPAddress,
group_identifier: str,
is_forward_register_peer_leader: bool,
is_forward_register_peer_enabled: bool,
forward_register_peer_config: ForwardRegisterPeerConfig,
clock: Clock,
poll_timeout_in_ms: int,
synchronize_timeout_in_ms: int,
abort_timeout_in_ms: int,
peer_is_ready_wait_time_in_ms: int,
send_socket_linger_time_in_ms: int,
connection_establisher_timeout_config: ConnectionEstablisherTimeoutConfig,
trace_logging: bool):

self._name = name
Expand All @@ -50,15 +51,12 @@ def __init__(self,
group_identifier=group_identifier,
out_control_socket_address=out_control_socket_address,
in_control_socket_address=in_control_socket_address,
is_forward_register_peer_leader=is_forward_register_peer_leader,
is_forward_register_peer_enabled=is_forward_register_peer_enabled,
forward_register_peer_config=forward_register_peer_config,
clock=clock,
poll_timeout_in_ms=poll_timeout_in_ms,
synchronize_timeout_in_ms=synchronize_timeout_in_ms,
abort_timeout_in_ms=abort_timeout_in_ms,
peer_is_ready_wait_time_in_ms=peer_is_ready_wait_time_in_ms,
send_socket_linger_time_in_ms=send_socket_linger_time_in_ms,
trace_logging=trace_logging
trace_logging=trace_logging,
connection_establisher_timeout_config=connection_establisher_timeout_config
)
self._thread = threading.Thread(target=self._background_listener_run.run)
self._thread.daemon = True
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dataclasses
import enum
from typing import Dict, List, Optional

Expand All @@ -8,20 +9,67 @@
from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo
from exasol_advanced_analytics_framework.udf_communication.ip_address import IPAddress, Port
from exasol_advanced_analytics_framework.udf_communication.peer import Peer
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.abort_timeout_sender import \
AbortTimeoutSenderFactory
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.acknowledge_register_peer_sender import \
AcknowledgeRegisterPeerSenderFactory
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.background_peer_state import \
BackgroundPeerState
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.background_peer_state_builder import \
BackgroundPeerStateBuilder
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.clock import Clock
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_behavior_config import \
ConnectionEstablisherBehaviorConfig
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_builder import \
ConnectionEstablisherBuilder
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_builder_parameter import \
ConnectionEstablisherBuilderParameter
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_timeout_config import \
ConnectionEstablisherTimeoutConfig
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.forward_register_peer_config import \
ForwardRegisterPeerConfig
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.peer_is_ready_sender import \
PeerIsReadySenderFactory
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.register_peer_connection import \
RegisterPeerConnection
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.register_peer_sender import \
RegisterPeerSenderFactory
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.send_socket_factory import \
SendSocketFactory
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.sender import SenderFactory
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.synchronize_connection_sender import \
SynchronizeConnectionSenderFactory
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.timer import TimerFactory
from exasol_advanced_analytics_framework.udf_communication.serialization import deserialize_message, serialize_message
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import SocketFactory, \
SocketType, Socket, PollerFlag, Frame

LOGGER: FilteringBoundLogger = structlog.get_logger()


def create_background_peer_state_builder() -> BackgroundPeerStateBuilder:
timer_factory = TimerFactory()
abort_timeout_sender_factory = AbortTimeoutSenderFactory()
acknowledge_register_peer_sender_factory = AcknowledgeRegisterPeerSenderFactory()
peer_is_ready_sender_factory = PeerIsReadySenderFactory()
register_peer_sender_factory = RegisterPeerSenderFactory()
synchronize_connection_sender_factory = SynchronizeConnectionSenderFactory()
connection_establisher_factory = ConnectionEstablisherBuilder(
abort_timeout_sender_factory=abort_timeout_sender_factory,
register_peer_sender_factory=register_peer_sender_factory,
synchronize_connection_sender_factory=synchronize_connection_sender_factory,
acknowledge_register_peer_sender_factory=acknowledge_register_peer_sender_factory,
peer_is_ready_sender_factory=peer_is_ready_sender_factory,
timer_factory=timer_factory,
)
sender_factory = SenderFactory()
background_peer_state_factory = BackgroundPeerStateBuilder(
sender_factory=sender_factory,
connection_establisher_factory=connection_establisher_factory,
)
return background_peer_state_factory


class BackgroundListenerThread:
class Status(enum.Enum):
RUNNING = enum.auto()
Expand All @@ -32,32 +80,27 @@ def __init__(self,
socket_factory: SocketFactory,
listen_ip: IPAddress,
group_identifier: str,
is_forward_register_peer_leader: bool,
is_forward_register_peer_enabled: bool,
forward_register_peer_config: ForwardRegisterPeerConfig,
out_control_socket_address: str,
in_control_socket_address: str,
clock: Clock,
poll_timeout_in_ms: int,
synchronize_timeout_in_ms: int,
abort_timeout_in_ms: int,
peer_is_ready_wait_time_in_ms: int,
send_socket_linger_time_in_ms: int,
trace_logging: bool):
connection_establisher_timeout_config: ConnectionEstablisherTimeoutConfig,
trace_logging: bool,
background_peer_state_factory: BackgroundPeerStateBuilder = create_background_peer_state_builder()):
self._forward_register_peer_config = forward_register_peer_config
self._background_peer_state_factory = background_peer_state_factory
self._connection_establisher_timeout_config = connection_establisher_timeout_config
self._register_peer_connection: Optional[RegisterPeerConnection] = None
self._is_forward_register_peer_enabled = is_forward_register_peer_enabled
self._is_forward_register_peer_leader = is_forward_register_peer_leader
self._send_socket_linger_time_in_ms = send_socket_linger_time_in_ms
self._trace_logging = trace_logging
self._clock = clock
self._peer_is_ready_wait_time_in_ms = peer_is_ready_wait_time_in_ms
self._abort_timeout_in_ms = abort_timeout_in_ms
self._synchronize_timeout_in_ms = synchronize_timeout_in_ms
self._name = name
self._logger = LOGGER.bind(
name=self._name,
group_identifier=group_identifier,
is_forward_register_peer_leader=self._is_forward_register_peer_leader,
is_forward_register_peer_enabled=self._is_forward_register_peer_enabled
forward_register_peer_config=dataclasses.asdict(self._forward_register_peer_config)
)
self._group_identifier = group_identifier
self._listen_ip = listen_ip
Expand Down Expand Up @@ -132,7 +175,7 @@ def _handle_control_message(self, message: bytes) -> Status:
if isinstance(specific_message_obj, messages.Stop):
return BackgroundListenerThread.Status.STOPPED
elif isinstance(specific_message_obj, messages.RegisterPeer):
if self._is_forward_register_peer_enabled and self._is_forward_register_peer_leader or not self._is_forward_register_peer_enabled:
if self._is_register_peer_message_allowed_as_control_message():
self._handle_register_peer_message(specific_message_obj)
else:
self._logger.error("RegisterPeerMessage message not allowed",
Expand All @@ -143,31 +186,32 @@ def _handle_control_message(self, message: bytes) -> Status:
self._logger.exception("Exception during handling message", message=message)
return BackgroundListenerThread.Status.RUNNING

def _is_register_peer_message_allowed_as_control_message(self):
return self._forward_register_peer_config.is_enabled and self._forward_register_peer_config.is_leader \
or not self._forward_register_peer_config.is_enabled

def _add_peer(self,
peer: Peer,
forward_register_peer: bool = False,
acknowledge_register_peer: bool = False,
needs_register_peer_complete: bool = False):
connection_establisher_behavior_config: ConnectionEstablisherBehaviorConfig =
ConnectionEstablisherBehaviorConfig()):
if peer.connection_info.group_identifier != self._my_connection_info.group_identifier:
self._logger.error("Peer belongs to a different group",
my_connection_info=self._my_connection_info.dict(),
peer=peer.dict())
raise ValueError("Peer belongs to a different group")
if peer not in self._peer_state:
self._peer_state[peer] = BackgroundPeerState.create(
self._peer_state[peer] = self._background_peer_state_factory.create(
my_connection_info=self._my_connection_info,
peer=peer,
out_control_socket=self._out_control_socket,
socket_factory=self._socket_factory,
peer=peer,
register_peer_connection=self._register_peer_connection,
forward_register_peer=forward_register_peer,
acknowledge_register_peer=acknowledge_register_peer,
needs_register_peer_complete=needs_register_peer_complete,
clock=self._clock,
peer_is_ready_wait_time_in_ms=self._peer_is_ready_wait_time_in_ms,
abort_timeout_in_ms=self._abort_timeout_in_ms,
synchronize_timeout_in_ms=self._synchronize_timeout_in_ms,
send_socket_linger_time_in_ms=self._send_socket_linger_time_in_ms
send_socket_linger_time_in_ms=self._send_socket_linger_time_in_ms,
connection_establisher_builder_parameter=ConnectionEstablisherBuilderParameter(
register_peer_connection=self._register_peer_connection,
timeout_config=self._connection_establisher_timeout_config,
behavior_config=connection_establisher_behavior_config
),
)

def _handle_listener_message(self, message: List[Frame]):
Expand All @@ -183,7 +227,7 @@ def _handle_listener_message(self, message: List[Frame]):
elif isinstance(specific_message_obj, messages.AcknowledgeConnection):
self._handle_acknowledge_connection(specific_message_obj)
elif isinstance(specific_message_obj, messages.RegisterPeer):
if not self._is_forward_register_peer_leader and self._is_forward_register_peer_enabled:
if self.is_register_peer_message_allowed_as_listener_message():
self._handle_register_peer_message(specific_message_obj)
else:
logger.error("RegisterPeerMessage message not allowed", message_obj=specific_message_obj.dict())
Expand All @@ -198,6 +242,9 @@ def _handle_listener_message(self, message: List[Frame]):
except Exception as e:
logger.exception("Exception during handling message", message_content=message_content_bytes)

def is_register_peer_message_allowed_as_listener_message(self):
return not self._forward_register_peer_config.is_leader and self._forward_register_peer_config.is_enabled

def _handle_payload_message(self, message: messages.Payload, frames: List[Frame]):
peer = Peer(connection_info=message.source)
self._peer_state[peer].forward_payload(frames[2:])
Expand All @@ -222,24 +269,26 @@ def _set_my_connection_info(self, port: int):
self._out_control_socket.send(serialize_message(message))

def _handle_register_peer_message(self, message: messages.RegisterPeer):
if not self._is_forward_register_peer_enabled:
if not self._forward_register_peer_config.is_enabled:
self._add_peer(message.peer)
return

if self._register_peer_connection is None:
self._create_register_peer_connection(message)
self._add_peer(
message.peer,
acknowledge_register_peer=True,
needs_register_peer_complete=True
connection_establisher_behavior_config=ConnectionEstablisherBehaviorConfig(
acknowledge_register_peer=True,
needs_register_peer_complete=True)
)
return

self._add_peer(
message.peer,
forward_register_peer=True,
acknowledge_register_peer=True,
needs_register_peer_complete=True
connection_establisher_behavior_config=ConnectionEstablisherBehaviorConfig(
forward_register_peer=True,
acknowledge_register_peer=True,
needs_register_peer_complete=True)
)

def _create_register_peer_connection(self, message: messages.RegisterPeer):
Expand Down
Loading

0 comments on commit fb72c81

Please sign in to comment.