diff --git a/exasol_advanced_analytics_framework/udf_communication/discovery/localhost/communicator.py b/exasol_advanced_analytics_framework/udf_communication/discovery/localhost/communicator.py index 0e3fa4db..f2e73107 100644 --- a/exasol_advanced_analytics_framework/udf_communication/discovery/localhost/communicator.py +++ b/exasol_advanced_analytics_framework/udf_communication/discovery/localhost/communicator.py @@ -1,6 +1,8 @@ from exasol_advanced_analytics_framework.udf_communication.discovery import localhost from exasol_advanced_analytics_framework.udf_communication.ip_address import IPAddress, Port from exasol_advanced_analytics_framework.udf_communication.peer_communicator import PeerCommunicator +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.forward_register_peer_config import \ + ForwardRegisterPeerConfig from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import SocketFactory @@ -20,8 +22,10 @@ def create( number_of_peers=number_of_instances, listen_ip=listen_ip, group_identifier=group_identifier, - is_forward_register_peer_leader=False, - is_forward_register_peer_enabled=False, + forward_register_peer_config=ForwardRegisterPeerConfig( + is_leader=False, + is_enabled=False, + ), socket_factory=socket_factory ) discovery = localhost.DiscoveryStrategy( diff --git a/exasol_advanced_analytics_framework/udf_communication/discovery/multi_node/communicator.py b/exasol_advanced_analytics_framework/udf_communication/discovery/multi_node/communicator.py index b7412af8..ea776063 100644 --- a/exasol_advanced_analytics_framework/udf_communication/discovery/multi_node/communicator.py +++ b/exasol_advanced_analytics_framework/udf_communication/discovery/multi_node/communicator.py @@ -1,6 +1,8 @@ from exasol_advanced_analytics_framework.udf_communication.discovery import multi_node from exasol_advanced_analytics_framework.udf_communication.ip_address import IPAddress, Port from exasol_advanced_analytics_framework.udf_communication.peer_communicator import PeerCommunicator +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.forward_register_peer_config import \ + ForwardRegisterPeerConfig from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import SocketFactory @@ -22,8 +24,10 @@ def create( number_of_peers=number_of_instances, listen_ip=listen_ip, group_identifier=group_identifier, - is_forward_register_peer_leader=is_discovery_leader, - is_forward_register_peer_enabled=True, + forward_register_peer_config=ForwardRegisterPeerConfig( + is_leader=is_discovery_leader, + is_enabled=True, + ), socket_factory=socket_factory ) discovery = multi_node.DiscoveryStrategy( diff --git a/exasol_advanced_analytics_framework/udf_communication/discovery/multi_node/discovery_strategy.py b/exasol_advanced_analytics_framework/udf_communication/discovery/multi_node/discovery_strategy.py index 302a97b6..a7cf91f4 100644 --- a/exasol_advanced_analytics_framework/udf_communication/discovery/multi_node/discovery_strategy.py +++ b/exasol_advanced_analytics_framework/udf_communication/discovery/multi_node/discovery_strategy.py @@ -41,14 +41,14 @@ def _time_left_until_discovery_timeout_in_ns(self, begin_time_ns: int) -> int: return max(0, time_left_until_timeout) def discover_peers(self): - if not self._peer_communicator.is_forward_register_peer_enabled: + if not self._peer_communicator.forward_register_peer_config.is_enabled: raise ValueError("PeerCommunicator.is_forward_register_peer_enabled needs to be true") - if self._peer_communicator.is_forward_register_peer_leader: + if self._peer_communicator.forward_register_peer_config.is_leader: self._global_discovery_socket.bind() self._send_ping() begin_time_ns = time.monotonic_ns() while not self._should_discovery_end(begin_time_ns): - if self._peer_communicator.is_forward_register_peer_leader: + if self._peer_communicator.forward_register_peer_config.is_leader: self._receive_pings(begin_time_ns) self._send_ping() if not self._peer_communicator.are_all_peers_connected(): diff --git a/exasol_advanced_analytics_framework/udf_communication/peer_communicator/background_listener_interface.py b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/background_listener_interface.py index 89c2ab45..96b8c48b 100644 --- a/exasol_advanced_analytics_framework/udf_communication/peer_communicator/background_listener_interface.py +++ b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/background_listener_interface.py @@ -9,7 +9,11 @@ from exasol_advanced_analytics_framework.udf_communication.peer import Peer from exasol_advanced_analytics_framework.udf_communication.peer_communicator.background_listener_thread import \ BackgroundListenerThread +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.forward_register_peer_config import \ + ForwardRegisterPeerConfig from exasol_advanced_analytics_framework.udf_communication.peer_communicator.clock import Clock +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_timeout_config import \ + ConnectionEstablisherTimeoutConfig from exasol_advanced_analytics_framework.udf_communication.serialization import deserialize_message, serialize_message from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import SocketFactory, \ SocketType, Socket, PollerFlag @@ -25,14 +29,11 @@ def __init__(self, socket_factory: SocketFactory, listen_ip: IPAddress, group_identifier: str, - is_forward_register_peer_leader: bool, - is_forward_register_peer_enabled: bool, + forward_register_peer_config: ForwardRegisterPeerConfig, clock: Clock, poll_timeout_in_ms: int, - synchronize_timeout_in_ms: int, - abort_timeout_in_ms: int, - peer_is_ready_wait_time_in_ms: int, send_socket_linger_time_in_ms: int, + connection_establisher_timeout_config: ConnectionEstablisherTimeoutConfig, trace_logging: bool): self._name = name @@ -50,15 +51,12 @@ def __init__(self, group_identifier=group_identifier, out_control_socket_address=out_control_socket_address, in_control_socket_address=in_control_socket_address, - is_forward_register_peer_leader=is_forward_register_peer_leader, - is_forward_register_peer_enabled=is_forward_register_peer_enabled, + forward_register_peer_config=forward_register_peer_config, clock=clock, poll_timeout_in_ms=poll_timeout_in_ms, - synchronize_timeout_in_ms=synchronize_timeout_in_ms, - abort_timeout_in_ms=abort_timeout_in_ms, - peer_is_ready_wait_time_in_ms=peer_is_ready_wait_time_in_ms, send_socket_linger_time_in_ms=send_socket_linger_time_in_ms, - trace_logging=trace_logging + trace_logging=trace_logging, + connection_establisher_timeout_config=connection_establisher_timeout_config ) self._thread = threading.Thread(target=self._background_listener_run.run) self._thread.daemon = True diff --git a/exasol_advanced_analytics_framework/udf_communication/peer_communicator/background_listener_thread.py b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/background_listener_thread.py index 87f80a97..5563520a 100644 --- a/exasol_advanced_analytics_framework/udf_communication/peer_communicator/background_listener_thread.py +++ b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/background_listener_thread.py @@ -1,3 +1,4 @@ +import dataclasses import enum from typing import Dict, List, Optional @@ -8,13 +9,37 @@ from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo from exasol_advanced_analytics_framework.udf_communication.ip_address import IPAddress, Port from exasol_advanced_analytics_framework.udf_communication.peer import Peer +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.abort_timeout_sender import \ + AbortTimeoutSenderFactory +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.acknowledge_register_peer_sender import \ + AcknowledgeRegisterPeerSenderFactory from exasol_advanced_analytics_framework.udf_communication.peer_communicator.background_peer_state import \ BackgroundPeerState +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.background_peer_state_builder import \ + BackgroundPeerStateBuilder from exasol_advanced_analytics_framework.udf_communication.peer_communicator.clock import Clock +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_behavior_config import \ + ConnectionEstablisherBehaviorConfig +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_builder import \ + ConnectionEstablisherBuilder +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_builder_parameter import \ + ConnectionEstablisherBuilderParameter +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_timeout_config import \ + ConnectionEstablisherTimeoutConfig +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.forward_register_peer_config import \ + ForwardRegisterPeerConfig +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.peer_is_ready_sender import \ + PeerIsReadySenderFactory from exasol_advanced_analytics_framework.udf_communication.peer_communicator.register_peer_connection import \ RegisterPeerConnection +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.register_peer_sender import \ + RegisterPeerSenderFactory from exasol_advanced_analytics_framework.udf_communication.peer_communicator.send_socket_factory import \ SendSocketFactory +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.sender import SenderFactory +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.synchronize_connection_sender import \ + SynchronizeConnectionSenderFactory +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.timer import TimerFactory from exasol_advanced_analytics_framework.udf_communication.serialization import deserialize_message, serialize_message from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import SocketFactory, \ SocketType, Socket, PollerFlag, Frame @@ -22,6 +47,29 @@ LOGGER: FilteringBoundLogger = structlog.get_logger() +def create_background_peer_state_builder() -> BackgroundPeerStateBuilder: + timer_factory = TimerFactory() + abort_timeout_sender_factory = AbortTimeoutSenderFactory() + acknowledge_register_peer_sender_factory = AcknowledgeRegisterPeerSenderFactory() + peer_is_ready_sender_factory = PeerIsReadySenderFactory() + register_peer_sender_factory = RegisterPeerSenderFactory() + synchronize_connection_sender_factory = SynchronizeConnectionSenderFactory() + connection_establisher_factory = ConnectionEstablisherBuilder( + abort_timeout_sender_factory=abort_timeout_sender_factory, + register_peer_sender_factory=register_peer_sender_factory, + synchronize_connection_sender_factory=synchronize_connection_sender_factory, + acknowledge_register_peer_sender_factory=acknowledge_register_peer_sender_factory, + peer_is_ready_sender_factory=peer_is_ready_sender_factory, + timer_factory=timer_factory, + ) + sender_factory = SenderFactory() + background_peer_state_factory = BackgroundPeerStateBuilder( + sender_factory=sender_factory, + connection_establisher_factory=connection_establisher_factory, + ) + return background_peer_state_factory + + class BackgroundListenerThread: class Status(enum.Enum): RUNNING = enum.auto() @@ -32,32 +80,27 @@ def __init__(self, socket_factory: SocketFactory, listen_ip: IPAddress, group_identifier: str, - is_forward_register_peer_leader: bool, - is_forward_register_peer_enabled: bool, + forward_register_peer_config: ForwardRegisterPeerConfig, out_control_socket_address: str, in_control_socket_address: str, clock: Clock, poll_timeout_in_ms: int, - synchronize_timeout_in_ms: int, - abort_timeout_in_ms: int, - peer_is_ready_wait_time_in_ms: int, send_socket_linger_time_in_ms: int, - trace_logging: bool): + connection_establisher_timeout_config: ConnectionEstablisherTimeoutConfig, + trace_logging: bool, + background_peer_state_factory: BackgroundPeerStateBuilder = create_background_peer_state_builder()): + self._forward_register_peer_config = forward_register_peer_config + self._background_peer_state_factory = background_peer_state_factory + self._connection_establisher_timeout_config = connection_establisher_timeout_config self._register_peer_connection: Optional[RegisterPeerConnection] = None - self._is_forward_register_peer_enabled = is_forward_register_peer_enabled - self._is_forward_register_peer_leader = is_forward_register_peer_leader self._send_socket_linger_time_in_ms = send_socket_linger_time_in_ms self._trace_logging = trace_logging self._clock = clock - self._peer_is_ready_wait_time_in_ms = peer_is_ready_wait_time_in_ms - self._abort_timeout_in_ms = abort_timeout_in_ms - self._synchronize_timeout_in_ms = synchronize_timeout_in_ms self._name = name self._logger = LOGGER.bind( name=self._name, group_identifier=group_identifier, - is_forward_register_peer_leader=self._is_forward_register_peer_leader, - is_forward_register_peer_enabled=self._is_forward_register_peer_enabled + forward_register_peer_config=dataclasses.asdict(self._forward_register_peer_config) ) self._group_identifier = group_identifier self._listen_ip = listen_ip @@ -132,7 +175,7 @@ def _handle_control_message(self, message: bytes) -> Status: if isinstance(specific_message_obj, messages.Stop): return BackgroundListenerThread.Status.STOPPED elif isinstance(specific_message_obj, messages.RegisterPeer): - if self._is_forward_register_peer_enabled and self._is_forward_register_peer_leader or not self._is_forward_register_peer_enabled: + if self._is_register_peer_message_allowed_as_control_message(): self._handle_register_peer_message(specific_message_obj) else: self._logger.error("RegisterPeerMessage message not allowed", @@ -143,31 +186,32 @@ def _handle_control_message(self, message: bytes) -> Status: self._logger.exception("Exception during handling message", message=message) return BackgroundListenerThread.Status.RUNNING + def _is_register_peer_message_allowed_as_control_message(self): + return self._forward_register_peer_config.is_enabled and self._forward_register_peer_config.is_leader \ + or not self._forward_register_peer_config.is_enabled + def _add_peer(self, peer: Peer, - forward_register_peer: bool = False, - acknowledge_register_peer: bool = False, - needs_register_peer_complete: bool = False): + connection_establisher_behavior_config: ConnectionEstablisherBehaviorConfig = + ConnectionEstablisherBehaviorConfig()): if peer.connection_info.group_identifier != self._my_connection_info.group_identifier: self._logger.error("Peer belongs to a different group", my_connection_info=self._my_connection_info.dict(), peer=peer.dict()) raise ValueError("Peer belongs to a different group") if peer not in self._peer_state: - self._peer_state[peer] = BackgroundPeerState.create( + self._peer_state[peer] = self._background_peer_state_factory.create( my_connection_info=self._my_connection_info, + peer=peer, out_control_socket=self._out_control_socket, socket_factory=self._socket_factory, - peer=peer, - register_peer_connection=self._register_peer_connection, - forward_register_peer=forward_register_peer, - acknowledge_register_peer=acknowledge_register_peer, - needs_register_peer_complete=needs_register_peer_complete, clock=self._clock, - peer_is_ready_wait_time_in_ms=self._peer_is_ready_wait_time_in_ms, - abort_timeout_in_ms=self._abort_timeout_in_ms, - synchronize_timeout_in_ms=self._synchronize_timeout_in_ms, - send_socket_linger_time_in_ms=self._send_socket_linger_time_in_ms + send_socket_linger_time_in_ms=self._send_socket_linger_time_in_ms, + connection_establisher_builder_parameter=ConnectionEstablisherBuilderParameter( + register_peer_connection=self._register_peer_connection, + timeout_config=self._connection_establisher_timeout_config, + behavior_config=connection_establisher_behavior_config + ), ) def _handle_listener_message(self, message: List[Frame]): @@ -183,7 +227,7 @@ def _handle_listener_message(self, message: List[Frame]): elif isinstance(specific_message_obj, messages.AcknowledgeConnection): self._handle_acknowledge_connection(specific_message_obj) elif isinstance(specific_message_obj, messages.RegisterPeer): - if not self._is_forward_register_peer_leader and self._is_forward_register_peer_enabled: + if self.is_register_peer_message_allowed_as_listener_message(): self._handle_register_peer_message(specific_message_obj) else: logger.error("RegisterPeerMessage message not allowed", message_obj=specific_message_obj.dict()) @@ -198,6 +242,9 @@ def _handle_listener_message(self, message: List[Frame]): except Exception as e: logger.exception("Exception during handling message", message_content=message_content_bytes) + def is_register_peer_message_allowed_as_listener_message(self): + return not self._forward_register_peer_config.is_leader and self._forward_register_peer_config.is_enabled + def _handle_payload_message(self, message: messages.Payload, frames: List[Frame]): peer = Peer(connection_info=message.source) self._peer_state[peer].forward_payload(frames[2:]) @@ -222,7 +269,7 @@ def _set_my_connection_info(self, port: int): self._out_control_socket.send(serialize_message(message)) def _handle_register_peer_message(self, message: messages.RegisterPeer): - if not self._is_forward_register_peer_enabled: + if not self._forward_register_peer_config.is_enabled: self._add_peer(message.peer) return @@ -230,16 +277,18 @@ def _handle_register_peer_message(self, message: messages.RegisterPeer): self._create_register_peer_connection(message) self._add_peer( message.peer, - acknowledge_register_peer=True, - needs_register_peer_complete=True + connection_establisher_behavior_config=ConnectionEstablisherBehaviorConfig( + acknowledge_register_peer=True, + needs_register_peer_complete=True) ) return self._add_peer( message.peer, - forward_register_peer=True, - acknowledge_register_peer=True, - needs_register_peer_complete=True + connection_establisher_behavior_config=ConnectionEstablisherBehaviorConfig( + forward_register_peer=True, + acknowledge_register_peer=True, + needs_register_peer_complete=True) ) def _create_register_peer_connection(self, message: messages.RegisterPeer): diff --git a/exasol_advanced_analytics_framework/udf_communication/peer_communicator/background_peer_state.py b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/background_peer_state.py index 56d72a22..7eb093ce 100644 --- a/exasol_advanced_analytics_framework/udf_communication/peer_communicator/background_peer_state.py +++ b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/background_peer_state.py @@ -1,4 +1,4 @@ -from typing import List, Optional +from typing import List import structlog from structlog.typing import FilteringBoundLogger @@ -6,194 +6,36 @@ from exasol_advanced_analytics_framework.udf_communication import messages from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo from exasol_advanced_analytics_framework.udf_communication.peer import Peer -from exasol_advanced_analytics_framework.udf_communication.peer_communicator.abort_timeout_sender import \ - AbortTimeoutSender, AbortTimeoutSenderFactory -from exasol_advanced_analytics_framework.udf_communication.peer_communicator.acknowledge_register_peer_sender import \ - AcknowledgeRegisterPeerSender, AcknowledgeRegisterPeerSenderFactory -from exasol_advanced_analytics_framework.udf_communication.peer_communicator.clock import Clock +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher import \ + ConnectionEstablisher from exasol_advanced_analytics_framework.udf_communication.peer_communicator.get_peer_receive_socket_name import \ get_peer_receive_socket_name -from exasol_advanced_analytics_framework.udf_communication.peer_communicator.peer_is_ready_sender import \ - PeerIsReadySender, PeerIsReadySenderFactory -from exasol_advanced_analytics_framework.udf_communication.peer_communicator.register_peer_connection import \ - RegisterPeerConnection -from exasol_advanced_analytics_framework.udf_communication.peer_communicator.register_peer_sender import \ - RegisterPeerSender, RegisterPeerSenderFactory -from exasol_advanced_analytics_framework.udf_communication.peer_communicator.sender import Sender, SenderFactory -from exasol_advanced_analytics_framework.udf_communication.peer_communicator.synchronize_connection_sender import \ - SynchronizeConnectionSender, SynchronizeConnectionSenderFactory -from exasol_advanced_analytics_framework.udf_communication.peer_communicator.timer import TimerFactory -from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import SocketFactory, \ - SocketType, Socket, Frame +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.sender import Sender +from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract \ + import SocketFactory, Frame, SocketType LOGGER: FilteringBoundLogger = structlog.get_logger() -class BackgroundPeerStateFactory: - def create(self, - my_connection_info: ConnectionInfo, - socket_factory: SocketFactory, - peer: Peer, - forward_register_peer: bool, - acknowledge_register_peer: bool, - needs_register_peer_complete: bool, - register_peer_connection: Optional[RegisterPeerConnection], - sender: Sender, - synchronize_connection_sender: SynchronizeConnectionSender, - abort_timeout_sender: AbortTimeoutSender, - peer_is_ready_sender: PeerIsReadySender, - register_peer_sender: RegisterPeerSender, - acknowledge_register_peer_sender: AcknowledgeRegisterPeerSender) -> "BackgroundPeerState": - return BackgroundPeerState( - my_connection_info=my_connection_info, - socket_factory=socket_factory, - peer=peer, - forward_register_peer=forward_register_peer, - acknowledge_register_peer=acknowledge_register_peer, - needs_register_peer_complete=needs_register_peer_complete, - register_peer_connection=register_peer_connection, - sender=sender, - synchronize_connection_sender=synchronize_connection_sender, - abort_timeout_sender=abort_timeout_sender, - peer_is_ready_sender=peer_is_ready_sender, - register_peer_sender=register_peer_sender, - acknowledge_register_peer_sender=acknowledge_register_peer_sender - ) - - class BackgroundPeerState: - @classmethod - def create( - cls, - my_connection_info: ConnectionInfo, - out_control_socket: Socket, - socket_factory: SocketFactory, - peer: Peer, - register_peer_connection: Optional[RegisterPeerConnection], - forward_register_peer: bool, - acknowledge_register_peer: bool, - needs_register_peer_complete: bool, - clock: Clock, - synchronize_timeout_in_ms: int, - abort_timeout_in_ms: int, - peer_is_ready_wait_time_in_ms: int, - send_socket_linger_time_in_ms: int, - sender_factory: SenderFactory = SenderFactory(), - synchronize_connection_sender_factory: SynchronizeConnectionSenderFactory = - SynchronizeConnectionSenderFactory(), - abort_timeout_sender_factory: AbortTimeoutSenderFactory = AbortTimeoutSenderFactory(), - peer_is_ready_sender_factory: PeerIsReadySenderFactory = PeerIsReadySenderFactory(), - register_peer_sender_factory: RegisterPeerSenderFactory = RegisterPeerSenderFactory(), - acknowledge_register_peer_sender_factory: AcknowledgeRegisterPeerSenderFactory = - AcknowledgeRegisterPeerSenderFactory(), - timer_factory: TimerFactory = TimerFactory(), - background_peer_state_factory: BackgroundPeerStateFactory = BackgroundPeerStateFactory() - ): - sender = sender_factory.create( - my_connection_info=my_connection_info, - socket_factory=socket_factory, - peer=peer, - send_socket_linger_time_in_ms=send_socket_linger_time_in_ms) - synchronize_connection_sender_timer = timer_factory.create(clock=clock, timeout_in_ms=synchronize_timeout_in_ms) - synchronize_connection_sender = synchronize_connection_sender_factory.create( - my_connection_info=my_connection_info, - peer=peer, - sender=sender, - timer=synchronize_connection_sender_timer - ) - needs_acknowledge_register_peer = (register_peer_connection is not None - and forward_register_peer) - abort_timeout_sender_timer = timer_factory.create(clock=clock, timeout_in_ms=abort_timeout_in_ms) - abort_timeout_sender = abort_timeout_sender_factory.create( - out_control_socket=out_control_socket, - timer=abort_timeout_sender_timer, - my_connection_info=my_connection_info, - peer=peer, - needs_acknowledge_register_peer=needs_acknowledge_register_peer - ) - needs_register_peer_complete_and_predecessor_exists = (register_peer_connection is not None - and register_peer_connection.predecessor is not None - and needs_register_peer_complete) - peer_is_ready_sender_timer = timer_factory.create(clock=clock, timeout_in_ms=peer_is_ready_wait_time_in_ms) - peer_is_ready_sender = peer_is_ready_sender_factory.create( - out_control_socket=out_control_socket, - timer=peer_is_ready_sender_timer, - peer=peer, - my_connection_info=my_connection_info, - needs_acknowledge_register_peer=needs_acknowledge_register_peer, - needs_register_peer_complete=needs_register_peer_complete_and_predecessor_exists, - ) - register_peer_timer = timer_factory.create(clock=clock, timeout_in_ms=synchronize_timeout_in_ms) - register_peer_sender = register_peer_sender_factory.create( - register_peer_connection=register_peer_connection, - needs_to_send_for_peer=forward_register_peer, - my_connection_info=my_connection_info, - peer=peer, - timer=register_peer_timer, - ) - acknowledge_register_peer_sender_timer = timer_factory.create(clock=clock, - timeout_in_ms=synchronize_timeout_in_ms) - acknowledge_register_peer_sender = acknowledge_register_peer_sender_factory.create( - register_peer_connection=register_peer_connection, - needs_to_send_for_peer=acknowledge_register_peer, - my_connection_info=my_connection_info, - peer=peer, - timer=acknowledge_register_peer_sender_timer, - ) - peer_state = background_peer_state_factory.create( - my_connection_info=my_connection_info, - socket_factory=socket_factory, - peer=peer, - forward_register_peer=forward_register_peer, - acknowledge_register_peer=acknowledge_register_peer, - needs_register_peer_complete=needs_register_peer_complete, - register_peer_connection=register_peer_connection, - sender=sender, - synchronize_connection_sender=synchronize_connection_sender, - abort_timeout_sender=abort_timeout_sender, - peer_is_ready_sender=peer_is_ready_sender, - register_peer_sender=register_peer_sender, - acknowledge_register_peer_sender=acknowledge_register_peer_sender - ) - return peer_state - def __init__(self, my_connection_info: ConnectionInfo, socket_factory: SocketFactory, peer: Peer, - forward_register_peer: bool, - acknowledge_register_peer: bool, - needs_register_peer_complete: bool, - register_peer_connection: Optional[RegisterPeerConnection], sender: Sender, - synchronize_connection_sender: SynchronizeConnectionSender, - abort_timeout_sender: AbortTimeoutSender, - peer_is_ready_sender: PeerIsReadySender, - register_peer_sender: RegisterPeerSender, - acknowledge_register_peer_sender: AcknowledgeRegisterPeerSender): - self._acknowledge_register_peer_sender = acknowledge_register_peer_sender - self._register_peer_sender = register_peer_sender - self._register_peer_connection = register_peer_connection + connection_establisher: ConnectionEstablisher): + self._connection_establisher = connection_establisher self._my_connection_info = my_connection_info self._peer = peer self._socket_factory = socket_factory self._create_receive_socket() self._sender = sender - self._synchronize_connection_sender = synchronize_connection_sender - self._abort_timeout_sender = abort_timeout_sender - self._peer_is_ready_sender = peer_is_ready_sender self._logger = LOGGER.bind( peer=self._peer.dict(), my_connection_info=self._my_connection_info.dict(), - forward_register_peer=forward_register_peer, - acknowledge_register_peer=acknowledge_register_peer, - needs_register_peer_complete=needs_register_peer_complete ) self._logger.debug("__init__") - self._register_peer_sender.try_send(force=True) - self._acknowledge_register_peer_sender.try_send(force=True) - self._synchronize_connection_sender.try_send(force=True) def _create_receive_socket(self): self._receive_socket = self._socket_factory.create_socket(SocketType.PAIR) @@ -202,42 +44,19 @@ def _create_receive_socket(self): def resend_if_necessary(self): self._logger.debug("resend_if_necessary") - senders = [ - self._register_peer_sender, - self._synchronize_connection_sender, - self._abort_timeout_sender, - self._peer_is_ready_sender, - self._acknowledge_register_peer_sender - ] - for sender in senders: - try_send = getattr(sender, "try_send") - try_send() + self._connection_establisher.try_send() def received_synchronize_connection(self): - self._logger.debug("received_synchronize_connection") - self._peer_is_ready_sender.received_synchronize_connection() - self._peer_is_ready_sender.reset_timer() - self._abort_timeout_sender.received_synchronize_connection() - self._sender.send(messages.Message(__root__=messages.AcknowledgeConnection(source=self._my_connection_info))) + self._connection_establisher.received_synchronize_connection() def received_acknowledge_connection(self): - self._logger.debug("received_acknowledge_connection") - self._abort_timeout_sender.received_acknowledge_connection() - self._peer_is_ready_sender.received_acknowledge_connection() - self._synchronize_connection_sender.stop() + self._connection_establisher.received_acknowledge_connection() def received_acknowledge_register_peer(self): - self._logger.debug("received_acknowledge_register_peer") - self._register_peer_connection.complete(self._peer) - self._peer_is_ready_sender.received_acknowledge_register_peer() - self._peer_is_ready_sender.reset_timer() - self._abort_timeout_sender.received_acknowledge_register_peer() - self._register_peer_sender.stop() + self._connection_establisher.received_acknowledge_register_peer() def received_register_peer_complete(self): - self._logger.debug("received_register_peer_complete") - self._peer_is_ready_sender.received_register_peer_complete() - self._acknowledge_register_peer_sender.stop() + self._connection_establisher.received_register_peer_complete() def forward_payload(self, frames: List[Frame]): self._receive_socket.send_multipart(frames) diff --git a/exasol_advanced_analytics_framework/udf_communication/peer_communicator/background_peer_state_builder.py b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/background_peer_state_builder.py new file mode 100644 index 00000000..5a335df4 --- /dev/null +++ b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/background_peer_state_builder.py @@ -0,0 +1,59 @@ +from typing import Callable + +from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo +from exasol_advanced_analytics_framework.udf_communication.peer import Peer +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.background_peer_state import \ + BackgroundPeerState +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.background_peer_state_factory import \ + BackgroundPeerStateFactory +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.clock import Clock +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_builder import \ + ConnectionEstablisherBuilder +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_builder_parameter import \ + ConnectionEstablisherBuilderParameter +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.sender import SenderFactory +from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import Socket, \ + SocketFactory + + +class BackgroundPeerStateBuilder: + + def __init__(self, + connection_establisher_factory: ConnectionEstablisherBuilder, + sender_factory: SenderFactory, + background_peer_state_factory: BackgroundPeerStateFactory = BackgroundPeerStateFactory()): + self._background_peer_state_factory = background_peer_state_factory + self._sender_factory = sender_factory + self._connection_establisher_factory = connection_establisher_factory + + def create( + self, + my_connection_info: ConnectionInfo, + out_control_socket: Socket, + socket_factory: SocketFactory, + peer: Peer, + clock: Clock, + send_socket_linger_time_in_ms: int, + connection_establisher_builder_parameter: ConnectionEstablisherBuilderParameter, + background_peer_state_factory: Callable = BackgroundPeerState) -> BackgroundPeerState: + sender = self._sender_factory.create( + my_connection_info=my_connection_info, + socket_factory=socket_factory, + peer=peer, + send_socket_linger_time_in_ms=send_socket_linger_time_in_ms) + connection_establisher = self._connection_establisher_factory.create( + peer=peer, + my_connection_info=my_connection_info, + out_control_socket=out_control_socket, + clock=clock, + sender=sender, + parameter=connection_establisher_builder_parameter + ) + peer_state = self._background_peer_state_factory.create( + my_connection_info=my_connection_info, + socket_factory=socket_factory, + peer=peer, + sender=sender, + connection_establisher=connection_establisher, + ) + return peer_state diff --git a/exasol_advanced_analytics_framework/udf_communication/peer_communicator/background_peer_state_factory.py b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/background_peer_state_factory.py new file mode 100644 index 00000000..e976fab4 --- /dev/null +++ b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/background_peer_state_factory.py @@ -0,0 +1,25 @@ +from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo +from exasol_advanced_analytics_framework.udf_communication.peer import Peer +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.background_peer_state import \ + BackgroundPeerState +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher import \ + ConnectionEstablisher +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.sender import Sender +from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import SocketFactory + + +class BackgroundPeerStateFactory: + + def create(self, + my_connection_info: ConnectionInfo, + socket_factory: SocketFactory, + peer: Peer, + sender: Sender, + connection_establisher: ConnectionEstablisher) -> BackgroundPeerState: + return BackgroundPeerState( + my_connection_info=my_connection_info, + socket_factory=socket_factory, + peer=peer, + sender=sender, + connection_establisher=connection_establisher + ) diff --git a/exasol_advanced_analytics_framework/udf_communication/peer_communicator/connection_establisher.py b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/connection_establisher.py new file mode 100644 index 00000000..de794869 --- /dev/null +++ b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/connection_establisher.py @@ -0,0 +1,86 @@ +import structlog +from structlog.typing import FilteringBoundLogger + +from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo +from exasol_advanced_analytics_framework.udf_communication.messages import AcknowledgeConnection, Message +from exasol_advanced_analytics_framework.udf_communication.peer import Peer +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.abort_timeout_sender import \ + AbortTimeoutSender +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.acknowledge_register_peer_sender import \ + AcknowledgeRegisterPeerSender +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.peer_is_ready_sender import \ + PeerIsReadySender +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.register_peer_connection import \ + RegisterPeerConnection +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.register_peer_sender import \ + RegisterPeerSender +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.sender import Sender +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.synchronize_connection_sender import \ + SynchronizeConnectionSender + +LOGGER: FilteringBoundLogger = structlog.get_logger() + + +class ConnectionEstablisher: + def __init__(self, + peer: Peer, + my_connection_info: ConnectionInfo, + sender: Sender, + register_peer_connection: RegisterPeerConnection, + abort_timeout_sender: AbortTimeoutSender, + acknowledge_register_peer_sender: AcknowledgeRegisterPeerSender, + peer_is_ready_sender: PeerIsReadySender, + register_peer_sender: RegisterPeerSender, + synchronize_connection_sender: SynchronizeConnectionSender): + self._synchronize_connection_sender = synchronize_connection_sender + self._register_peer_sender = register_peer_sender + self._peer_is_ready_sender = peer_is_ready_sender + self._acknowledge_register_peer_sender = acknowledge_register_peer_sender + self._abort_timeout_sender = abort_timeout_sender + self._register_peer_connection = register_peer_connection + self._my_connection_info = my_connection_info + self._peer = peer + self._sender = sender + self._logger = LOGGER.bind( + peer=self._peer.dict(), + my_connection_info=self._my_connection_info.dict(), + ) + self._send_initial_messages() + + def _send_initial_messages(self): + self._register_peer_sender.try_send(force=True) + self._acknowledge_register_peer_sender.try_send(force=True) + self._synchronize_connection_sender.try_send(force=True) + + def received_synchronize_connection(self): + self._logger.debug("received_synchronize_connection") + self._peer_is_ready_sender.received_synchronize_connection() + self._peer_is_ready_sender.reset_timer() + self._abort_timeout_sender.received_synchronize_connection() + self._sender.send(Message(__root__=AcknowledgeConnection(source=self._my_connection_info))) + + def received_acknowledge_connection(self): + self._logger.debug("received_acknowledge_connection") + self._abort_timeout_sender.received_acknowledge_connection() + self._peer_is_ready_sender.received_acknowledge_connection() + self._synchronize_connection_sender.stop() + + def received_acknowledge_register_peer(self): + self._logger.debug("received_acknowledge_register_peer") + self._register_peer_connection.complete(self._peer) + self._peer_is_ready_sender.received_acknowledge_register_peer() + self._peer_is_ready_sender.reset_timer() + self._abort_timeout_sender.received_acknowledge_register_peer() + self._register_peer_sender.stop() + + def received_register_peer_complete(self): + self._logger.debug("received_register_peer_complete") + self._peer_is_ready_sender.received_register_peer_complete() + self._acknowledge_register_peer_sender.stop() + + def try_send(self): + self._register_peer_sender.try_send() + self._synchronize_connection_sender.try_send() + self._abort_timeout_sender.try_send() + self._peer_is_ready_sender.try_send() + self._acknowledge_register_peer_sender.try_send() diff --git a/exasol_advanced_analytics_framework/udf_communication/peer_communicator/connection_establisher_behavior_config.py b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/connection_establisher_behavior_config.py new file mode 100644 index 00000000..938b9a02 --- /dev/null +++ b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/connection_establisher_behavior_config.py @@ -0,0 +1,8 @@ +import dataclasses + + +@dataclasses.dataclass +class ConnectionEstablisherBehaviorConfig: + forward_register_peer: bool = False + acknowledge_register_peer: bool = False + needs_register_peer_complete: bool = False diff --git a/exasol_advanced_analytics_framework/udf_communication/peer_communicator/connection_establisher_builder.py b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/connection_establisher_builder.py new file mode 100644 index 00000000..348018c3 --- /dev/null +++ b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/connection_establisher_builder.py @@ -0,0 +1,176 @@ +from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo +from exasol_advanced_analytics_framework.udf_communication.peer import Peer +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.abort_timeout_sender import \ + AbortTimeoutSenderFactory +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.acknowledge_register_peer_sender import \ + AcknowledgeRegisterPeerSenderFactory +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.clock import Clock +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher import \ + ConnectionEstablisher +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_builder_parameter import \ + ConnectionEstablisherBuilderParameter +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_factory import \ + ConnectionEstablisherFactory +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.peer_is_ready_sender import \ + PeerIsReadySenderFactory +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.register_peer_sender import \ + RegisterPeerSenderFactory +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.sender import Sender +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.synchronize_connection_sender import \ + SynchronizeConnectionSenderFactory +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.timer import TimerFactory +from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import Socket + + +def _needs_acknowledge_register_peer(parameter: ConnectionEstablisherBuilderParameter): + needs_acknowledge_register_peer = \ + (parameter.register_peer_connection is not None + and parameter.behavior_config.forward_register_peer) + return needs_acknowledge_register_peer + + +class ConnectionEstablisherBuilder: + + def __init__(self, + abort_timeout_sender_factory: AbortTimeoutSenderFactory, + acknowledge_register_peer_sender_factory: AcknowledgeRegisterPeerSenderFactory, + peer_is_ready_sender_factory: PeerIsReadySenderFactory, + register_peer_sender_factory: RegisterPeerSenderFactory, + synchronize_connection_sender_factory: SynchronizeConnectionSenderFactory, + timer_factory: TimerFactory, + connection_establisher_factory: ConnectionEstablisherFactory = + ConnectionEstablisherFactory()): + self._connection_establisher_factory = connection_establisher_factory + self._timer_factory = timer_factory + self._synchronize_connection_sender_factory = synchronize_connection_sender_factory + self._register_peer_sender_factory = register_peer_sender_factory + self._peer_is_ready_sender_factory = peer_is_ready_sender_factory + self._acknowledge_register_peer_sender_factory = acknowledge_register_peer_sender_factory + self._abort_timeout_sender_factory = abort_timeout_sender_factory + + def create(self, + peer: Peer, + my_connection_info: ConnectionInfo, + out_control_socket: Socket, + clock: Clock, + sender: Sender, + parameter: ConnectionEstablisherBuilderParameter) -> ConnectionEstablisher: + synchronize_connection_sender = self._create_synchronize_connection_sender( + my_connection_info=my_connection_info, + peer=peer, + sender=sender, + clock=clock, + parameter=parameter) + abort_timeout_sender = self._create_abort_timeout_sender( + my_connection_info=my_connection_info, + peer=peer, + out_control_socket=out_control_socket, + clock=clock, + parameter=parameter) + peer_is_ready_sender = self._create_peer_is_ready_sender( + my_connection_info=my_connection_info, + peer=peer, + clock=clock, + out_control_socket=out_control_socket, + parameter=parameter + ) + register_peer_sender = self.create_register_peer_sender( + peer=peer, + my_connection_info=my_connection_info, + clock=clock, + parameter=parameter) + acknowledge_register_peer_sender = \ + self.create_acknowledge_register_peer_sender( + my_connection_info=my_connection_info, + peer=peer, + clock=clock, + parameter=parameter) + return self._connection_establisher_factory.create( + peer=peer, + my_connection_info=my_connection_info, + sender=sender, + register_peer_connection=parameter.register_peer_connection, + acknowledge_register_peer_sender=acknowledge_register_peer_sender, + abort_timeout_sender=abort_timeout_sender, + peer_is_ready_sender=peer_is_ready_sender, + register_peer_sender=register_peer_sender, + synchronize_connection_sender=synchronize_connection_sender + ) + + def create_acknowledge_register_peer_sender(self, + my_connection_info: ConnectionInfo, peer: Peer, + clock: Clock, parameter: ConnectionEstablisherBuilderParameter): + acknowledge_register_peer_sender_timer = self._timer_factory.create( + clock=clock, + timeout_in_ms=parameter.timeout_config.acknowledge_register_peer_retry_timeout_in_ms) + acknowledge_register_peer_sender = self._acknowledge_register_peer_sender_factory.create( + register_peer_connection=parameter.register_peer_connection, + needs_to_send_for_peer=parameter.behavior_config.acknowledge_register_peer, + my_connection_info=my_connection_info, + peer=peer, + timer=acknowledge_register_peer_sender_timer, + ) + return acknowledge_register_peer_sender + + def create_register_peer_sender(self, + my_connection_info: ConnectionInfo, peer: Peer, + clock: Clock, parameter: ConnectionEstablisherBuilderParameter): + register_peer_sender_timer = self._timer_factory.create( + clock=clock, timeout_in_ms=parameter.timeout_config.register_peer_retry_timeout_in_ms) + register_peer_sender = self._register_peer_sender_factory.create( + register_peer_connection=parameter.register_peer_connection, + needs_to_send_for_peer=parameter.behavior_config.forward_register_peer, + my_connection_info=my_connection_info, + peer=peer, + timer=register_peer_sender_timer, + ) + return register_peer_sender + + def _create_peer_is_ready_sender(self, + my_connection_info: ConnectionInfo, peer: Peer, + clock: Clock, out_control_socket: Socket, + parameter: ConnectionEstablisherBuilderParameter): + needs_register_peer_complete_and_predecessor_exists = \ + (parameter.register_peer_connection is not None + and parameter.register_peer_connection.predecessor is not None + and parameter.behavior_config.needs_register_peer_complete) + peer_is_ready_sender_timer = self._timer_factory.create( + clock=clock, timeout_in_ms=parameter.timeout_config.peer_is_ready_wait_time_in_ms) + peer_is_ready_sender = self._peer_is_ready_sender_factory.create( + out_control_socket=out_control_socket, + timer=peer_is_ready_sender_timer, + peer=peer, + my_connection_info=my_connection_info, + needs_acknowledge_register_peer=_needs_acknowledge_register_peer(parameter), + needs_register_peer_complete=needs_register_peer_complete_and_predecessor_exists, + ) + return peer_is_ready_sender + + def _create_abort_timeout_sender(self, + my_connection_info: ConnectionInfo, peer: Peer, + out_control_socket: Socket, clock: Clock, + parameter: ConnectionEstablisherBuilderParameter): + abort_timeout_sender_timer = self._timer_factory.create( + clock=clock, timeout_in_ms=parameter.timeout_config.abort_timeout_in_ms) + abort_timeout_sender = self._abort_timeout_sender_factory.create( + out_control_socket=out_control_socket, + timer=abort_timeout_sender_timer, + my_connection_info=my_connection_info, + peer=peer, + needs_acknowledge_register_peer=_needs_acknowledge_register_peer(parameter) + ) + return abort_timeout_sender + + def _create_synchronize_connection_sender(self, + my_connection_info: ConnectionInfo, peer: Peer, + sender: Sender, clock: Clock, + parameter: ConnectionEstablisherBuilderParameter): + synchronize_connection_sender_timer = self._timer_factory.create( + clock=clock, timeout_in_ms=parameter.timeout_config.synchronize_retry_timeout_in_ms) + synchronize_connection_sender = self._synchronize_connection_sender_factory.create( + my_connection_info=my_connection_info, + peer=peer, + sender=sender, + timer=synchronize_connection_sender_timer + ) + return synchronize_connection_sender diff --git a/exasol_advanced_analytics_framework/udf_communication/peer_communicator/connection_establisher_builder_parameter.py b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/connection_establisher_builder_parameter.py new file mode 100644 index 00000000..55814d8e --- /dev/null +++ b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/connection_establisher_builder_parameter.py @@ -0,0 +1,15 @@ +import dataclasses + +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_behavior_config import \ + ConnectionEstablisherBehaviorConfig +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_timeout_config import \ + ConnectionEstablisherTimeoutConfig +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.register_peer_connection import \ + RegisterPeerConnection + + +@dataclasses.dataclass(frozen=True) +class ConnectionEstablisherBuilderParameter: + register_peer_connection: RegisterPeerConnection + behavior_config: ConnectionEstablisherBehaviorConfig + timeout_config: ConnectionEstablisherTimeoutConfig diff --git a/exasol_advanced_analytics_framework/udf_communication/peer_communicator/connection_establisher_factory.py b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/connection_establisher_factory.py new file mode 100644 index 00000000..06d4052b --- /dev/null +++ b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/connection_establisher_factory.py @@ -0,0 +1,43 @@ +from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo +from exasol_advanced_analytics_framework.udf_communication.peer import Peer +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.abort_timeout_sender import \ + AbortTimeoutSender +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.acknowledge_register_peer_sender import \ + AcknowledgeRegisterPeerSender +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher import \ + ConnectionEstablisher +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.peer_is_ready_sender import \ + PeerIsReadySender +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.register_peer_connection import \ + RegisterPeerConnection +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.register_peer_sender import \ + RegisterPeerSender +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.sender import Sender +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.synchronize_connection_sender import \ + SynchronizeConnectionSender + + +class ConnectionEstablisherFactory: + + def create(self, + peer: Peer, + my_connection_info: ConnectionInfo, + sender: Sender, + register_peer_connection: RegisterPeerConnection, + abort_timeout_sender: AbortTimeoutSender, + acknowledge_register_peer_sender: AcknowledgeRegisterPeerSender, + peer_is_ready_sender: PeerIsReadySender, + register_peer_sender: RegisterPeerSender, + synchronize_connection_sender: SynchronizeConnectionSender + ) -> ConnectionEstablisher: + return ConnectionEstablisher( + peer=peer, + my_connection_info=my_connection_info, + sender=sender, + register_peer_connection=register_peer_connection, + abort_timeout_sender=abort_timeout_sender, + acknowledge_register_peer_sender=acknowledge_register_peer_sender, + peer_is_ready_sender=peer_is_ready_sender, + register_peer_sender=register_peer_sender, + synchronize_connection_sender=synchronize_connection_sender + ) diff --git a/exasol_advanced_analytics_framework/udf_communication/peer_communicator/connection_establisher_timeout_config.py b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/connection_establisher_timeout_config.py new file mode 100644 index 00000000..6e242c37 --- /dev/null +++ b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/connection_establisher_timeout_config.py @@ -0,0 +1,10 @@ +import dataclasses + + +@dataclasses.dataclass +class ConnectionEstablisherTimeoutConfig: + synchronize_retry_timeout_in_ms: int = 1000 + abort_timeout_in_ms: int = 100000 + peer_is_ready_wait_time_in_ms: int = 10000 + register_peer_retry_timeout_in_ms: int = 1000 + acknowledge_register_peer_retry_timeout_in_ms: int = 1000 diff --git a/exasol_advanced_analytics_framework/udf_communication/peer_communicator/forward_register_peer_config.py b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/forward_register_peer_config.py new file mode 100644 index 00000000..d6b5a85e --- /dev/null +++ b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/forward_register_peer_config.py @@ -0,0 +1,7 @@ +import dataclasses + + +@dataclasses.dataclass(frozen=True) +class ForwardRegisterPeerConfig: + is_leader: bool = False + is_enabled: bool = False diff --git a/exasol_advanced_analytics_framework/udf_communication/peer_communicator/peer_communicator.py b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/peer_communicator.py index b7d0134d..67776731 100644 --- a/exasol_advanced_analytics_framework/udf_communication/peer_communicator/peer_communicator.py +++ b/exasol_advanced_analytics_framework/udf_communication/peer_communicator/peer_communicator.py @@ -1,4 +1,5 @@ import time +from dataclasses import asdict from typing import Optional, Dict, List import structlog @@ -10,7 +11,11 @@ from exasol_advanced_analytics_framework.udf_communication.peer import Peer from exasol_advanced_analytics_framework.udf_communication.peer_communicator.background_listener_interface import \ BackgroundListenerInterface +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.forward_register_peer_config import \ + ForwardRegisterPeerConfig from exasol_advanced_analytics_framework.udf_communication.peer_communicator.clock import Clock +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_timeout_config import \ + ConnectionEstablisherTimeoutConfig from exasol_advanced_analytics_framework.udf_communication.peer_communicator.frontend_peer_state import \ FrontendPeerState from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import SocketFactory, \ @@ -38,17 +43,15 @@ def __init__(self, listen_ip: IPAddress, group_identifier: str, socket_factory: SocketFactory, - is_forward_register_peer_leader: bool = False, - is_forward_register_peer_enabled: bool = False, - poll_timeout_in_ms: int = 500, - synchronize_timeout_in_ms: int = 1000, - abort_timeout_in_ms: int = 240000, - peer_is_ready_wait_time_in_ms: int = 10000, + connection_establisher_timeout_config: ConnectionEstablisherTimeoutConfig = + ConnectionEstablisherTimeoutConfig(), + forward_register_peer_config: ForwardRegisterPeerConfig = ForwardRegisterPeerConfig(), + poll_timeout_in_ms: int = 200, send_socket_linger_time_in_ms: int = 100, clock: Clock = Clock(), trace_logging: bool = False): - self._is_forward_register_peer_leader = is_forward_register_peer_leader - self._is_forward_register_peer_enabled = is_forward_register_peer_enabled + self._connection_establisher_settings = connection_establisher_timeout_config + self._forward_register_peer_config = forward_register_peer_config self._socket_factory = socket_factory self._name = name self._group_identifier = group_identifier @@ -56,8 +59,7 @@ def __init__(self, self._logger = LOGGER.bind( name=self._name, group_identifier=self._group_identifier, - is_forward_register_peer_leader=self._is_forward_register_peer_leader, - is_forward_register_peer_enabled=self._is_forward_register_peer_enabled, + forward_register_peer_config=asdict(forward_register_peer_config), number_of_peers=self._number_of_peers, ) self._logger.info("init") @@ -66,15 +68,12 @@ def __init__(self, socket_factory=self._socket_factory, listen_ip=listen_ip, group_identifier=self._group_identifier, - is_forward_register_peer_leader=is_forward_register_peer_leader, - is_forward_register_peer_enabled=is_forward_register_peer_enabled, + forward_register_peer_config=forward_register_peer_config, clock=clock, poll_timeout_in_ms=poll_timeout_in_ms, - synchronize_timeout_in_ms=synchronize_timeout_in_ms, - abort_timeout_in_ms=abort_timeout_in_ms, - peer_is_ready_wait_time_in_ms=peer_is_ready_wait_time_in_ms, send_socket_linger_time_in_ms=send_socket_linger_time_in_ms, - trace_logging=trace_logging + trace_logging=trace_logging, + connection_establisher_timeout_config=connection_establisher_timeout_config ) self._my_connection_info = self._background_listener.my_connection_info self._logger = self._logger.bind(my_connection_info=self._my_connection_info.dict()) @@ -95,7 +94,7 @@ def _handle_messages(self, timeout_in_milliseconds: Optional[int] = 0): else: self._logger.error("Unknown message", message=message.dict()) - def _add_peer_state(self, peer): + def _add_peer_state(self, peer: Peer): if peer not in self._peer_states: self._peer_states[peer] = FrontendPeerState( my_connection_info=self.my_connection_info, @@ -151,12 +150,8 @@ def rank(self) -> int: return self.peers().index(self.peer) @property - def is_forward_register_peer_leader(self) -> bool: - return self._is_forward_register_peer_leader - - @property - def is_forward_register_peer_enabled(self) -> bool: - return self._is_forward_register_peer_enabled + def forward_register_peer_config(self) -> ForwardRegisterPeerConfig: + return self._forward_register_peer_config def are_all_peers_connected(self) -> bool: self._handle_messages() diff --git a/tests/udf_communication/peer_communication/test_add_peer.py b/tests/udf_communication/peer_communication/test_add_peer.py index 44b31f6f..3b424a90 100644 --- a/tests/udf_communication/peer_communication/test_add_peer.py +++ b/tests/udf_communication/peer_communication/test_add_peer.py @@ -16,6 +16,8 @@ from exasol_advanced_analytics_framework.udf_communication.ip_address import IPAddress from exasol_advanced_analytics_framework.udf_communication.peer import Peer from exasol_advanced_analytics_framework.udf_communication.peer_communicator import PeerCommunicator +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.forward_register_peer_config import \ + ForwardRegisterPeerConfig from exasol_advanced_analytics_framework.udf_communication.peer_communicator.peer_communicator import key_for_peer from exasol_advanced_analytics_framework.udf_communication.socket_factory.fault_injection import \ FaultInjectionSocketFactory @@ -54,7 +56,12 @@ def run(parameter: PeerCommunicatorTestProcessParameter, queue: BidirectionalQue number_of_peers=parameter.number_of_instances, listen_ip=listen_ip, group_identifier=parameter.group_identifier, - socket_factory=socket_factory) + socket_factory=socket_factory, + forward_register_peer_config=ForwardRegisterPeerConfig( + is_leader=False, + is_enabled=False + ) + ) try: queue.put(com.my_connection_info) peer_connection_infos = queue.get() diff --git a/tests/udf_communication/peer_communication/test_add_peer_forward.py b/tests/udf_communication/peer_communication/test_add_peer_forward.py index 8e8c271a..b919c258 100644 --- a/tests/udf_communication/peer_communication/test_add_peer_forward.py +++ b/tests/udf_communication/peer_communication/test_add_peer_forward.py @@ -15,6 +15,8 @@ from exasol_advanced_analytics_framework.udf_communication.ip_address import IPAddress from exasol_advanced_analytics_framework.udf_communication.peer import Peer from exasol_advanced_analytics_framework.udf_communication.peer_communicator import PeerCommunicator +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.forward_register_peer_config import \ + ForwardRegisterPeerConfig from exasol_advanced_analytics_framework.udf_communication.peer_communicator.peer_communicator import key_for_peer from exasol_advanced_analytics_framework.udf_communication.socket_factory.fault_injection import \ FaultInjectionSocketFactory @@ -56,8 +58,10 @@ def run(parameter: PeerCommunicatorTestProcessParameter, queue: BidirectionalQue number_of_peers=parameter.number_of_instances, listen_ip=listen_ip, group_identifier=parameter.group_identifier, - is_forward_register_peer_leader=leader, - is_forward_register_peer_enabled=True, + forward_register_peer_config=ForwardRegisterPeerConfig( + is_leader=leader, + is_enabled=True + ), socket_factory=socket_factory ) try: @@ -81,7 +85,7 @@ def test_reliability(number_of_instances: int, repetitions: int): run_test_with_repetitions(number_of_instances, repetitions) -REPETITIONS_FOR_FUNCTIONALITY = 1 +REPETITIONS_FOR_FUNCTIONALITY = 3 def test_functionality_2(): diff --git a/tests/udf_communication/peer_communication/test_background_peer_state.py b/tests/udf_communication/peer_communication/test_background_peer_state.py index 6c712198..5c5750cd 100644 --- a/tests/udf_communication/peer_communication/test_background_peer_state.py +++ b/tests/udf_communication/peer_communication/test_background_peer_state.py @@ -6,23 +6,13 @@ from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo from exasol_advanced_analytics_framework.udf_communication.ip_address import IPAddress, Port from exasol_advanced_analytics_framework.udf_communication.peer import Peer -from exasol_advanced_analytics_framework.udf_communication.peer_communicator.abort_timeout_sender import \ - AbortTimeoutSender -from exasol_advanced_analytics_framework.udf_communication.peer_communicator.acknowledge_register_peer_sender import \ - AcknowledgeRegisterPeerSender from exasol_advanced_analytics_framework.udf_communication.peer_communicator.background_peer_state import \ BackgroundPeerState -from exasol_advanced_analytics_framework.udf_communication.peer_communicator.peer_is_ready_sender import \ - PeerIsReadySender -from exasol_advanced_analytics_framework.udf_communication.peer_communicator.register_peer_connection import \ - RegisterPeerConnection -from exasol_advanced_analytics_framework.udf_communication.peer_communicator.register_peer_sender import \ - RegisterPeerSender +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher import \ + ConnectionEstablisher from exasol_advanced_analytics_framework.udf_communication.peer_communicator.sender import Sender -from exasol_advanced_analytics_framework.udf_communication.peer_communicator.synchronize_connection_sender import \ - SynchronizeConnectionSender from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import Socket, \ - SocketFactory, SocketType + SocketFactory, SocketType, Frame def mock_cast(obj: Any) -> Mock: @@ -36,28 +26,14 @@ class TestSetup: socket_factory_mock: Union[MagicMock, SocketFactory] receive_socket_mock: Union[MagicMock, Socket] sender_mock: Union[MagicMock, Sender] - abort_timeout_sender_mock: Union[MagicMock, AbortTimeoutSender] - peer_is_ready_sender_mock: Union[MagicMock, PeerIsReadySender] - synchronize_connection_sender_mock: Union[MagicMock, SynchronizeConnectionSender] - register_peer_sender: Union[MagicMock, RegisterPeerSender] - register_peer_connection: Union[MagicMock, RegisterPeerConnection] - acknowledge_register_peer_sender: Union[MagicMock, AcknowledgeRegisterPeerSender] + connection_establisher_mock: Union[MagicMock, ConnectionEstablisher] background_peer_state: BackgroundPeerState def reset_mocks(self): - mocks = ( - self.abort_timeout_sender_mock, - self.synchronize_connection_sender_mock, - self.peer_is_ready_sender_mock, - self.sender_mock, - self.receive_socket_mock, - self.socket_factory_mock, - self.acknowledge_register_peer_sender, - self.register_peer_connection, - self.register_peer_sender - ) - for mock in mocks: - mock.reset_mock() + self.sender_mock.reset_mock() + self.receive_socket_mock.reset_mock() + self.socket_factory_mock.reset_mock() + self.connection_establisher_mock.reset_mock() def create_test_setup() -> TestSetup: @@ -74,57 +50,35 @@ def create_test_setup() -> TestSetup: port=Port(port=10), group_identifier="g" ) - receive_socket_mock = create_autospec(Socket) + receive_socket_mock: Union[MagicMock, Socket] = create_autospec(Socket) socket_factory_mock: Union[MagicMock, SocketFactory] = create_autospec(SocketFactory) mock_cast(socket_factory_mock.create_socket).side_effect = [receive_socket_mock] - sender_mock = create_autospec(Sender) - abort_timeout_sender_mock = create_autospec(AbortTimeoutSender) - peer_is_ready_sender_mock = create_autospec(PeerIsReadySender) - synchronize_connection_sender_mock = create_autospec(SynchronizeConnectionSender) - register_peer_sender_mock = create_autospec(RegisterPeerSender) - register_peer_connection_mock = create_autospec(RegisterPeerConnection) - acknowledge_register_peer_sender_mock = create_autospec(AcknowledgeRegisterPeerSender) + sender_mock: Union[MagicMock, Sender] = create_autospec(Sender) + connection_establisher_mock: Union[MagicMock, ConnectionEstablisher] = create_autospec(ConnectionEstablisher) + background_peer_state = BackgroundPeerState( my_connection_info=my_connection_info, peer=peer, socket_factory=socket_factory_mock, sender=sender_mock, - abort_timeout_sender=abort_timeout_sender_mock, - peer_is_ready_sender=peer_is_ready_sender_mock, - synchronize_connection_sender=synchronize_connection_sender_mock, - register_peer_sender=register_peer_sender_mock, - register_peer_connection=register_peer_connection_mock, - acknowledge_register_peer_sender=acknowledge_register_peer_sender_mock, - acknowledge_register_peer=False, - forward_register_peer=False, - needs_register_peer_complete=False + connection_establisher=connection_establisher_mock ) return TestSetup( peer=peer, my_connection_info=my_connection_info, socket_factory_mock=socket_factory_mock, sender_mock=sender_mock, - abort_timeout_sender_mock=abort_timeout_sender_mock, - peer_is_ready_sender_mock=peer_is_ready_sender_mock, - synchronize_connection_sender_mock=synchronize_connection_sender_mock, background_peer_state=background_peer_state, receive_socket_mock=receive_socket_mock, - register_peer_sender=register_peer_sender_mock, - register_peer_connection=register_peer_connection_mock, - acknowledge_register_peer_sender=acknowledge_register_peer_sender_mock + connection_establisher_mock=connection_establisher_mock ) def test_init(): test_setup = create_test_setup() assert ( - test_setup.synchronize_connection_sender_mock.mock_calls == [call.try_send(force=True)] - and test_setup.peer_is_ready_sender_mock.mock_calls == [] - and test_setup.abort_timeout_sender_mock.mock_calls == [] - and test_setup.sender_mock.mock_calls == [] - and test_setup.register_peer_sender.mock_calls == [call.try_send(force=True)] - and test_setup.register_peer_connection.mock_calls == [] - and test_setup.acknowledge_register_peer_sender.mock_calls == [call.try_send(force=True)] + test_setup.sender_mock.mock_calls == [] + and test_setup.connection_establisher_mock.mock_calls == [] and mock_cast(test_setup.socket_factory_mock.create_socket).mock_calls == [call(SocketType.PAIR)] and test_setup.receive_socket_mock.mock_calls == [ call.bind('inproc://peer/g/127.0.0.1/11') @@ -132,18 +86,13 @@ def test_init(): ) -def test_resend(): +def test_resend_if_necessary(): test_setup = create_test_setup() test_setup.reset_mocks() test_setup.background_peer_state.resend_if_necessary() assert ( - test_setup.synchronize_connection_sender_mock.mock_calls == [call.try_send()] - and test_setup.peer_is_ready_sender_mock.mock_calls == [call.try_send()] - and test_setup.abort_timeout_sender_mock.mock_calls == [call.try_send()] + test_setup.connection_establisher_mock.mock_calls == [call.try_send()] and test_setup.sender_mock.mock_calls == [] - and test_setup.register_peer_sender.mock_calls == [call.try_send()] - and test_setup.register_peer_connection.mock_calls == [] - and test_setup.acknowledge_register_peer_sender.mock_calls == [call.try_send()] and mock_cast(test_setup.socket_factory_mock.create_socket).mock_calls == [] and test_setup.receive_socket_mock.mock_calls == [] ) @@ -154,16 +103,8 @@ def test_received_synchronize_connection(): test_setup.reset_mocks() test_setup.background_peer_state.received_synchronize_connection() assert ( - test_setup.synchronize_connection_sender_mock.mock_calls == [] - and test_setup.peer_is_ready_sender_mock.mock_calls == [call.received_synchronize_connection(), - call.reset_timer()] - and test_setup.abort_timeout_sender_mock.mock_calls == [call.received_synchronize_connection()] - and test_setup.register_peer_sender.mock_calls == [] - and test_setup.register_peer_connection.mock_calls == [] - and test_setup.acknowledge_register_peer_sender.mock_calls == [] - and test_setup.sender_mock.mock_calls == [ - call.send( - messages.Message(__root__=messages.AcknowledgeConnection(source=test_setup.my_connection_info)))] + test_setup.connection_establisher_mock.mock_calls == [call.received_synchronize_connection()] + and test_setup.sender_mock.mock_calls == [] and mock_cast(test_setup.socket_factory_mock.create_socket).mock_calls == [] and test_setup.receive_socket_mock.mock_calls == [] ) @@ -174,12 +115,7 @@ def test_received_acknowledge_connection(): test_setup.reset_mocks() test_setup.background_peer_state.received_acknowledge_connection() assert ( - test_setup.synchronize_connection_sender_mock.mock_calls == [call.stop()] - and test_setup.peer_is_ready_sender_mock.mock_calls == [call.received_acknowledge_connection()] - and test_setup.abort_timeout_sender_mock.mock_calls == [call.received_acknowledge_connection()] - and test_setup.register_peer_sender.mock_calls == [] - and test_setup.register_peer_connection.mock_calls == [] - and test_setup.acknowledge_register_peer_sender.mock_calls == [] + test_setup.connection_establisher_mock.mock_calls == [call.received_acknowledge_connection()] and test_setup.sender_mock.mock_calls == [] and mock_cast(test_setup.socket_factory_mock.create_socket).mock_calls == [] and test_setup.receive_socket_mock.mock_calls == [] @@ -191,13 +127,7 @@ def test_received_acknowledge_register_peer(): test_setup.reset_mocks() test_setup.background_peer_state.received_acknowledge_register_peer() assert ( - test_setup.synchronize_connection_sender_mock.mock_calls == [] - and test_setup.peer_is_ready_sender_mock.mock_calls == [call.received_acknowledge_register_peer(), - call.reset_timer()] - and test_setup.abort_timeout_sender_mock.mock_calls == [call.received_acknowledge_register_peer()] - and test_setup.register_peer_sender.mock_calls == [call.stop()] - and test_setup.register_peer_connection.mock_calls == [call.complete(test_setup.peer)] - and test_setup.acknowledge_register_peer_sender.mock_calls == [] + test_setup.connection_establisher_mock.mock_calls == [call.received_acknowledge_register_peer()] and test_setup.sender_mock.mock_calls == [] and mock_cast(test_setup.socket_factory_mock.create_socket).mock_calls == [] and test_setup.receive_socket_mock.mock_calls == [] @@ -209,13 +139,33 @@ def test_received_register_peer_complete(): test_setup.reset_mocks() test_setup.background_peer_state.received_register_peer_complete() assert ( - test_setup.synchronize_connection_sender_mock.mock_calls == [] - and test_setup.peer_is_ready_sender_mock.mock_calls == [call.received_register_peer_complete()] - and test_setup.abort_timeout_sender_mock.mock_calls == [] - and test_setup.register_peer_sender.mock_calls == [] - and test_setup.register_peer_connection.mock_calls == [] - and test_setup.acknowledge_register_peer_sender.mock_calls == [call.stop()] + test_setup.connection_establisher_mock.mock_calls == [call.received_register_peer_complete()] and test_setup.sender_mock.mock_calls == [] and mock_cast(test_setup.socket_factory_mock.create_socket).mock_calls == [] and test_setup.receive_socket_mock.mock_calls == [] ) + + +def test_forward_payload(): + test_setup = create_test_setup() + test_setup.reset_mocks() + frames = [create_autospec(Frame)] + test_setup.background_peer_state.forward_payload(frames=frames) + assert ( + test_setup.connection_establisher_mock.mock_calls == [] + and test_setup.sender_mock.mock_calls == [] + and mock_cast(test_setup.socket_factory_mock.create_socket).mock_calls == [] + and test_setup.receive_socket_mock.mock_calls == [call.send_multipart(frames)] + ) + + +def test_close(): + test_setup = create_test_setup() + test_setup.reset_mocks() + test_setup.background_peer_state.close() + assert ( + test_setup.connection_establisher_mock.mock_calls == [] + and test_setup.sender_mock.mock_calls == [] + and mock_cast(test_setup.socket_factory_mock.create_socket).mock_calls == [] + and test_setup.receive_socket_mock.mock_calls == [call.close(linger=0)] + ) diff --git a/tests/udf_communication/peer_communication/test_connection_establisher.py b/tests/udf_communication/peer_communication/test_connection_establisher.py new file mode 100644 index 00000000..452a058e --- /dev/null +++ b/tests/udf_communication/peer_communication/test_connection_establisher.py @@ -0,0 +1,185 @@ +import dataclasses +from typing import Union +from unittest.mock import MagicMock, create_autospec, call + +from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo +from exasol_advanced_analytics_framework.udf_communication.ip_address import IPAddress, Port +from exasol_advanced_analytics_framework.udf_communication.messages import AcknowledgeConnection, Message +from exasol_advanced_analytics_framework.udf_communication.peer import Peer +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.abort_timeout_sender import \ + AbortTimeoutSender +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.acknowledge_register_peer_sender import \ + AcknowledgeRegisterPeerSender +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher import \ + ConnectionEstablisher +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.peer_is_ready_sender import \ + PeerIsReadySender +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.register_peer_connection import \ + RegisterPeerConnection +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.register_peer_sender import \ + RegisterPeerSender +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.sender import Sender +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.synchronize_connection_sender import \ + SynchronizeConnectionSender + + +@dataclasses.dataclass() +class TestSetup: + peer: Peer + my_connection_info: ConnectionInfo + sender_mock: Union[MagicMock, Sender] + abort_timeout_sender_mock: Union[MagicMock, AbortTimeoutSender] + peer_is_ready_sender_mock: Union[MagicMock, PeerIsReadySender] + synchronize_connection_sender_mock: Union[MagicMock, SynchronizeConnectionSender] + register_peer_sender: Union[MagicMock, RegisterPeerSender] + register_peer_connection: Union[MagicMock, RegisterPeerConnection] + acknowledge_register_peer_sender: Union[MagicMock, AcknowledgeRegisterPeerSender] + connection_establisher: ConnectionEstablisher + + def reset_mock(self): + self.abort_timeout_sender_mock.reset_mock() + self.synchronize_connection_sender_mock.reset_mock() + self.peer_is_ready_sender_mock.reset_mock() + self.sender_mock.reset_mock() + self.register_peer_sender.reset_mock() + self.acknowledge_register_peer_sender.reset_mock() + self.register_peer_connection.reset_mock() + + +def create_test_setup() -> TestSetup: + peer = Peer( + connection_info=ConnectionInfo( + name="t1", + ipaddress=IPAddress(ip_address="127.0.0.1"), + port=Port(port=11), + group_identifier="g" + )) + my_connection_info = ConnectionInfo( + name="t0", + ipaddress=IPAddress(ip_address="127.0.0.1"), + port=Port(port=10), + group_identifier="g" + ) + sender_mock: Union[MagicMock, Sender] = create_autospec(Sender) + abort_timeout_sender_mock: Union[MagicMock, AbortTimeoutSender] = create_autospec(AbortTimeoutSender) + peer_is_ready_sender_mock: Union[MagicMock, PeerIsReadySender] = create_autospec(PeerIsReadySender) + synchronize_connection_sender_mock: Union[MagicMock, SynchronizeConnectionSender] = \ + create_autospec(SynchronizeConnectionSender) + register_peer_sender_mock: Union[MagicMock, RegisterPeerSender] = create_autospec(RegisterPeerSender) + register_peer_connection_mock: Union[MagicMock, RegisterPeerConnection] = create_autospec(RegisterPeerConnection) + acknowledge_register_peer_sender_mock: Union[MagicMock, AcknowledgeRegisterPeerSender] = \ + create_autospec(AcknowledgeRegisterPeerSender) + connection_establisher = ConnectionEstablisher( + my_connection_info=my_connection_info, + peer=peer, + sender=sender_mock, + abort_timeout_sender=abort_timeout_sender_mock, + peer_is_ready_sender=peer_is_ready_sender_mock, + synchronize_connection_sender=synchronize_connection_sender_mock, + register_peer_sender=register_peer_sender_mock, + register_peer_connection=register_peer_connection_mock, + acknowledge_register_peer_sender=acknowledge_register_peer_sender_mock, + ) + return TestSetup( + peer=peer, + my_connection_info=my_connection_info, + sender_mock=sender_mock, + abort_timeout_sender_mock=abort_timeout_sender_mock, + peer_is_ready_sender_mock=peer_is_ready_sender_mock, + synchronize_connection_sender_mock=synchronize_connection_sender_mock, + connection_establisher=connection_establisher, + register_peer_sender=register_peer_sender_mock, + register_peer_connection=register_peer_connection_mock, + acknowledge_register_peer_sender=acknowledge_register_peer_sender_mock + ) + + +def test_init(): + test_setup = create_test_setup() + assert ( + test_setup.synchronize_connection_sender_mock.mock_calls == [call.try_send(force=True)] + and test_setup.peer_is_ready_sender_mock.mock_calls == [] + and test_setup.abort_timeout_sender_mock.mock_calls == [] + and test_setup.sender_mock.mock_calls == [] + and test_setup.register_peer_sender.mock_calls == [call.try_send(force=True)] + and test_setup.register_peer_connection.mock_calls == [] + and test_setup.acknowledge_register_peer_sender.mock_calls == [call.try_send(force=True)] + ) + + +def test_try_send(): + test_setup = create_test_setup() + test_setup.reset_mock() + test_setup.connection_establisher.try_send() + assert ( + test_setup.synchronize_connection_sender_mock.mock_calls == [call.try_send()] + and test_setup.peer_is_ready_sender_mock.mock_calls == [call.try_send()] + and test_setup.abort_timeout_sender_mock.mock_calls == [call.try_send()] + and test_setup.sender_mock.mock_calls == [] + and test_setup.register_peer_sender.mock_calls == [call.try_send()] + and test_setup.register_peer_connection.mock_calls == [] + and test_setup.acknowledge_register_peer_sender.mock_calls == [call.try_send()] + ) + + +def test_received_synchronize_connection(): + test_setup = create_test_setup() + test_setup.reset_mock() + test_setup.connection_establisher.received_synchronize_connection() + assert ( + test_setup.synchronize_connection_sender_mock.mock_calls == [] + and test_setup.peer_is_ready_sender_mock.mock_calls == [call.received_synchronize_connection(), + call.reset_timer()] + and test_setup.abort_timeout_sender_mock.mock_calls == [call.received_synchronize_connection()] + and test_setup.register_peer_sender.mock_calls == [] + and test_setup.register_peer_connection.mock_calls == [] + and test_setup.acknowledge_register_peer_sender.mock_calls == [] + and test_setup.sender_mock.mock_calls == [ + call.send(Message(__root__=AcknowledgeConnection(source=test_setup.my_connection_info)))] + ) + + +def test_received_acknowledge_connection(): + test_setup = create_test_setup() + test_setup.reset_mock() + test_setup.connection_establisher.received_acknowledge_connection() + assert ( + test_setup.synchronize_connection_sender_mock.mock_calls == [call.stop()] + and test_setup.peer_is_ready_sender_mock.mock_calls == [call.received_acknowledge_connection()] + and test_setup.abort_timeout_sender_mock.mock_calls == [call.received_acknowledge_connection()] + and test_setup.register_peer_sender.mock_calls == [] + and test_setup.register_peer_connection.mock_calls == [] + and test_setup.acknowledge_register_peer_sender.mock_calls == [] + and test_setup.sender_mock.mock_calls == [] + ) + + +def test_received_acknowledge_register_peer(): + test_setup = create_test_setup() + test_setup.reset_mock() + test_setup.connection_establisher.received_acknowledge_register_peer() + assert ( + test_setup.synchronize_connection_sender_mock.mock_calls == [] + and test_setup.peer_is_ready_sender_mock.mock_calls == [call.received_acknowledge_register_peer(), + call.reset_timer()] + and test_setup.abort_timeout_sender_mock.mock_calls == [call.received_acknowledge_register_peer()] + and test_setup.register_peer_sender.mock_calls == [call.stop()] + and test_setup.register_peer_connection.mock_calls == [call.complete(test_setup.peer)] + and test_setup.acknowledge_register_peer_sender.mock_calls == [] + and test_setup.sender_mock.mock_calls == [] + ) + + +def test_received_register_peer_complete(): + test_setup = create_test_setup() + test_setup.reset_mock() + test_setup.connection_establisher.received_register_peer_complete() + assert ( + test_setup.synchronize_connection_sender_mock.mock_calls == [] + and test_setup.peer_is_ready_sender_mock.mock_calls == [call.received_register_peer_complete()] + and test_setup.abort_timeout_sender_mock.mock_calls == [] + and test_setup.register_peer_sender.mock_calls == [] + and test_setup.register_peer_connection.mock_calls == [] + and test_setup.acknowledge_register_peer_sender.mock_calls == [call.stop()] + and test_setup.sender_mock.mock_calls == [] + ) diff --git a/tests/udf_communication/peer_communication/test_create_background_peer_state.py b/tests/udf_communication/peer_communication/test_connection_establisher_builder.py similarity index 63% rename from tests/udf_communication/peer_communication/test_create_background_peer_state.py rename to tests/udf_communication/peer_communication/test_connection_establisher_builder.py index 0b053cb9..d4a72dc2 100644 --- a/tests/udf_communication/peer_communication/test_create_background_peer_state.py +++ b/tests/udf_communication/peer_communication/test_connection_establisher_builder.py @@ -1,5 +1,5 @@ import dataclasses -from typing import Union, cast, Any, List +from typing import Union, List from unittest.mock import MagicMock, Mock, create_autospec, call import pytest @@ -11,43 +11,39 @@ AbortTimeoutSenderFactory from exasol_advanced_analytics_framework.udf_communication.peer_communicator.acknowledge_register_peer_sender import \ AcknowledgeRegisterPeerSenderFactory -from exasol_advanced_analytics_framework.udf_communication.peer_communicator.background_peer_state import \ - BackgroundPeerState, BackgroundPeerStateFactory from exasol_advanced_analytics_framework.udf_communication.peer_communicator.clock import Clock +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_behavior_config import \ + ConnectionEstablisherBehaviorConfig +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_builder import \ + ConnectionEstablisherBuilder +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_builder_parameter import \ + ConnectionEstablisherBuilderParameter +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_factory import \ + ConnectionEstablisherFactory +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.connection_establisher_timeout_config import \ + ConnectionEstablisherTimeoutConfig from exasol_advanced_analytics_framework.udf_communication.peer_communicator.peer_is_ready_sender import \ PeerIsReadySenderFactory from exasol_advanced_analytics_framework.udf_communication.peer_communicator.register_peer_connection import \ RegisterPeerConnection from exasol_advanced_analytics_framework.udf_communication.peer_communicator.register_peer_sender import \ RegisterPeerSenderFactory -from exasol_advanced_analytics_framework.udf_communication.peer_communicator.sender import SenderFactory +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.sender import Sender from exasol_advanced_analytics_framework.udf_communication.peer_communicator.synchronize_connection_sender import \ SynchronizeConnectionSenderFactory from exasol_advanced_analytics_framework.udf_communication.peer_communicator.timer import TimerFactory -from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import Socket, \ - SocketFactory - - -def mock_cast(obj: Any) -> Mock: - return cast(Mock, obj) +from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import Socket +from tests.mock_cast import mock_cast @dataclasses.dataclass() class TestSetup: peer: Peer my_connection_info: ConnectionInfo - socket_factory_mock: Union[MagicMock, SocketFactory] out_control_socket_mock: Union[MagicMock, Socket] clock_mock: Union[MagicMock, Clock] - acknowledge_register_peer: bool - forward_register_peer: bool - needs_register_peer_complete: bool + parameter: ConnectionEstablisherBuilderParameter register_peer_connection_predecessor_is_none: bool - abort_timeout_in_ms: int - synchronize_timeout_in_ms: int - peer_is_ready_wait_time_in_ms: int - send_socket_linger_time_in_ms: int - sender_factory_mock: Union[MagicMock, SenderFactory] abort_timeout_sender_factory_mock: Union[MagicMock, AbortTimeoutSenderFactory] peer_is_ready_sender_factory_mock: Union[MagicMock, PeerIsReadySenderFactory] synchronize_connection_sender_factory_mock: Union[MagicMock, SynchronizeConnectionSenderFactory] @@ -56,28 +52,26 @@ class TestSetup: acknowledge_register_peer_sender_factory_mock: Union[MagicMock, AcknowledgeRegisterPeerSenderFactory] timer_factory_mock: Union[MagicMock, TimerFactory] timer_mocks: List[Mock] - background_peer_state_factory: Union[MagicMock, BackgroundPeerStateFactory] - background_peer_state: BackgroundPeerState + sender_mock: Union[MagicMock, Sender] + connection_establisher_builder: ConnectionEstablisherBuilder + connection_establisher_factory_mock: Union[MagicMock, ConnectionEstablisherFactory] def reset_mock(self): self.peer_is_ready_sender_factory_mock.reset_mock() - self.socket_factory_mock.reset_mock() self.register_peer_connection_mock.reset_mock() self.out_control_socket_mock.reset_mock() self.clock_mock.reset_mock() self.abort_timeout_sender_factory_mock.reset_mock() self.synchronize_connection_sender_factory_mock.reset_mock() - self.sender_factory_mock.reset_mock() self.register_peer_sender_factory_mock.reset_mock() self.acknowledge_register_peer_sender_factory_mock.reset_mock() self.timer_factory_mock.reset_mock() - self.background_peer_state_factory.reset_mock() + self.sender_mock.reset_mock() + self.connection_establisher_factory_mock.reset_mock() -def create_test_setup(acknowledge_register_peer: bool, - forward_register_peer: bool, - needs_register_peer_complete: bool, - register_peer_connection_predecessor_is_none: bool) -> TestSetup: +def create_test_setup(register_peer_connection_predecessor_is_none: bool, + behavior_config: ConnectionEstablisherBehaviorConfig) -> TestSetup: peer = Peer( connection_info=ConnectionInfo( name="t1", @@ -91,8 +85,6 @@ def create_test_setup(acknowledge_register_peer: bool, port=Port(port=10), group_identifier="g" ) - socket_factory_mock: Union[MagicMock, SocketFactory] = create_autospec(SocketFactory) - sender_factory_mock: Union[MagicMock, SenderFactory] = create_autospec(SenderFactory) abort_timeout_sender_factory_mock: Union[MagicMock, AbortTimeoutSenderFactory] = create_autospec( AbortTimeoutSenderFactory) peer_is_ready_sender_factory_mock: Union[MagicMock, PeerIsReadySenderFactory] = create_autospec( @@ -110,53 +102,40 @@ def create_test_setup(acknowledge_register_peer: bool, timer_factory_mock: Union[MagicMock, TimerFactory] = create_autospec(TimerFactory) timer_mocks = [Mock(), Mock(), Mock(), Mock(), Mock()] mock_cast(timer_factory_mock.create).side_effect = timer_mocks - background_peer_state_factory_mock: Union[MagicMock, BackgroundPeerStateFactory] = create_autospec( - BackgroundPeerStateFactory) out_control_socket_mock: Union[MagicMock, Socket] = create_autospec(Socket) clock_mock: Union[MagicMock, Clock] = create_autospec(Clock) - abort_timeout_in_ms = 1 - synchronize_timeout_in_ms = 2 - peer_is_ready_wait_time_in_ms = 3 - send_socket_linger_time_in_ms = 4 - background_peer_state = BackgroundPeerState.create( - my_connection_info=my_connection_info, - peer=peer, - socket_factory=socket_factory_mock, - acknowledge_register_peer=acknowledge_register_peer, - forward_register_peer=forward_register_peer, - needs_register_peer_complete=needs_register_peer_complete, - out_control_socket=out_control_socket_mock, - clock=clock_mock, - abort_timeout_in_ms=abort_timeout_in_ms, - synchronize_timeout_in_ms=synchronize_timeout_in_ms, - peer_is_ready_wait_time_in_ms=peer_is_ready_wait_time_in_ms, - send_socket_linger_time_in_ms=send_socket_linger_time_in_ms, - sender_factory=sender_factory_mock, + sender_mock: Union[MagicMock, Sender] = create_autospec(Sender) + connection_establisher_timeout_config = ConnectionEstablisherTimeoutConfig( + abort_timeout_in_ms=1, + synchronize_retry_timeout_in_ms=2, + peer_is_ready_wait_time_in_ms=3, + acknowledge_register_peer_retry_timeout_in_ms=4, + register_peer_retry_timeout_in_ms=5 + ) + connection_establisher_factory_mock: Union[MagicMock, ConnectionEstablisherFactory] = \ + create_autospec(ConnectionEstablisherFactory) + connection_establisher_builder = ConnectionEstablisherBuilder( abort_timeout_sender_factory=abort_timeout_sender_factory_mock, peer_is_ready_sender_factory=peer_is_ready_sender_factory_mock, synchronize_connection_sender_factory=synchronize_connection_sender_factory_mock, register_peer_sender_factory=register_peer_sender_factory_mock, - register_peer_connection=register_peer_connection_mock, acknowledge_register_peer_sender_factory=acknowledge_register_peer_sender_factory_mock, timer_factory=timer_factory_mock, - background_peer_state_factory=background_peer_state_factory_mock + connection_establisher_factory=connection_establisher_factory_mock + ) + parameter = ConnectionEstablisherBuilderParameter( + register_peer_connection=register_peer_connection_mock, + behavior_config=behavior_config, + timeout_config=connection_establisher_timeout_config, ) return TestSetup( - background_peer_state=background_peer_state, + connection_establisher_builder=connection_establisher_builder, + connection_establisher_factory_mock=connection_establisher_factory_mock, peer=peer, my_connection_info=my_connection_info, - socket_factory_mock=socket_factory_mock, out_control_socket_mock=out_control_socket_mock, clock_mock=clock_mock, - acknowledge_register_peer=acknowledge_register_peer, - forward_register_peer=forward_register_peer, - needs_register_peer_complete=needs_register_peer_complete, register_peer_connection_predecessor_is_none=register_peer_connection_predecessor_is_none, - abort_timeout_in_ms=abort_timeout_in_ms, - synchronize_timeout_in_ms=synchronize_timeout_in_ms, - peer_is_ready_wait_time_in_ms=peer_is_ready_wait_time_in_ms, - send_socket_linger_time_in_ms=send_socket_linger_time_in_ms, - sender_factory_mock=sender_factory_mock, abort_timeout_sender_factory_mock=abort_timeout_sender_factory_mock, peer_is_ready_sender_factory_mock=peer_is_ready_sender_factory_mock, synchronize_connection_sender_factory_mock=synchronize_connection_sender_factory_mock, @@ -165,12 +144,31 @@ def create_test_setup(acknowledge_register_peer: bool, acknowledge_register_peer_sender_factory_mock=acknowledge_register_peer_sender_factory_mock, timer_factory_mock=timer_factory_mock, timer_mocks=timer_mocks, - background_peer_state_factory=background_peer_state_factory_mock + sender_mock=sender_mock, + parameter=parameter ) +def test_init(): + test_setup = create_test_setup( + behavior_config=ConnectionEstablisherBehaviorConfig( + acknowledge_register_peer=True, + forward_register_peer=True, + needs_register_peer_complete=True + ), + register_peer_connection_predecessor_is_none=True) + mock_cast(test_setup.timer_factory_mock.create).assert_not_called() + mock_cast(test_setup.register_peer_sender_factory_mock.create).assert_not_called() + mock_cast(test_setup.abort_timeout_sender_factory_mock.create).assert_not_called() + mock_cast(test_setup.synchronize_connection_sender_factory_mock.create).assert_not_called() + mock_cast(test_setup.acknowledge_register_peer_sender_factory_mock.create).assert_not_called() + mock_cast(test_setup.peer_is_ready_sender_factory_mock.create).assert_not_called() + mock_cast(test_setup.connection_establisher_factory_mock.create).assert_not_called() + + @pytest.mark.parametrize( - "acknowledge_register_peer,forward_register_peer,needs_register_peer_complete,register_peer_connection_predecessor_is_none", + ["acknowledge_register_peer", "forward_register_peer", "needs_register_peer_complete", + "register_peer_connection_predecessor_is_none"], [ (True, True, True, True), (True, True, True, False), @@ -189,20 +187,29 @@ def create_test_setup(acknowledge_register_peer: bool, (False, False, False, True), (False, False, False, False), ]) -def test( +def test_create( acknowledge_register_peer: bool, forward_register_peer: bool, needs_register_peer_complete: bool, register_peer_connection_predecessor_is_none: bool ): test_setup = create_test_setup( - acknowledge_register_peer=acknowledge_register_peer, - forward_register_peer=forward_register_peer, - needs_register_peer_complete=needs_register_peer_complete, + behavior_config=ConnectionEstablisherBehaviorConfig( + acknowledge_register_peer=acknowledge_register_peer, + forward_register_peer=forward_register_peer, + needs_register_peer_complete=needs_register_peer_complete), register_peer_connection_predecessor_is_none=register_peer_connection_predecessor_is_none) + test_setup.reset_mock() + test_setup.connection_establisher_builder.create( + my_connection_info=test_setup.my_connection_info, + sender=test_setup.sender_mock, + clock=test_setup.clock_mock, + out_control_socket=test_setup.out_control_socket_mock, + peer=test_setup.peer, + parameter=test_setup.parameter + ) assert_timer_factory(test_setup) - assert_sender_factory(test_setup) - test_setup.socket_factory_mock.assert_not_called() + test_setup.sender_mock.assert_not_called() test_setup.clock_mock.assert_not_called() test_setup.out_control_socket_mock.assert_not_called() test_setup.register_peer_connection_mock.assert_not_called() @@ -212,35 +219,14 @@ def test( assert_peer_is_ready_sender_factory_mock(test_setup) assert_register_peer_sender_factory_mock(test_setup) assert_acknowledge_register_peer_sender_factory_mock(test_setup) - assert_background_peer_state_factory(test_setup) - - -def assert_background_peer_state_factory(test_setup): - mock_cast(test_setup.background_peer_state_factory.create).assert_called_once_with( - my_connection_info=test_setup.my_connection_info, - socket_factory=test_setup.socket_factory_mock, - peer=test_setup.peer, - forward_register_peer=test_setup.forward_register_peer, - acknowledge_register_peer=test_setup.acknowledge_register_peer, - needs_register_peer_complete=test_setup.needs_register_peer_complete, - register_peer_connection=test_setup.register_peer_connection_mock, - sender=mock_cast(test_setup.sender_factory_mock.create).return_value, - synchronize_connection_sender=mock_cast( - test_setup.synchronize_connection_sender_factory_mock.create).return_value, - abort_timeout_sender=mock_cast(test_setup.abort_timeout_sender_factory_mock.create).return_value, - peer_is_ready_sender=mock_cast(test_setup.peer_is_ready_sender_factory_mock.create).return_value, - register_peer_sender=mock_cast(test_setup.register_peer_sender_factory_mock.create).return_value, - acknowledge_register_peer_sender=mock_cast( - test_setup.acknowledge_register_peer_sender_factory_mock.create).return_value - ) def assert_acknowledge_register_peer_sender_factory_mock(test_setup): mock_cast(test_setup.acknowledge_register_peer_sender_factory_mock.create).assert_called_once_with( my_connection_info=test_setup.my_connection_info, peer=test_setup.peer, - register_peer_connection=test_setup.register_peer_connection_mock, - needs_to_send_for_peer=test_setup.acknowledge_register_peer, + register_peer_connection=test_setup.parameter.register_peer_connection, + needs_to_send_for_peer=test_setup.parameter.behavior_config.acknowledge_register_peer, timer=test_setup.timer_mocks[4] ) @@ -249,8 +235,8 @@ def assert_register_peer_sender_factory_mock(test_setup): mock_cast(test_setup.register_peer_sender_factory_mock.create).assert_called_once_with( my_connection_info=test_setup.my_connection_info, peer=test_setup.peer, - register_peer_connection=test_setup.register_peer_connection_mock, - needs_to_send_for_peer=test_setup.forward_register_peer, + register_peer_connection=test_setup.parameter.register_peer_connection, + needs_to_send_for_peer=test_setup.parameter.behavior_config.forward_register_peer, timer=test_setup.timer_mocks[3] ) @@ -261,8 +247,8 @@ def assert_peer_is_ready_sender_factory_mock(test_setup): peer=test_setup.peer, out_control_socket=test_setup.out_control_socket_mock, timer=test_setup.timer_mocks[2], - needs_acknowledge_register_peer=test_setup.forward_register_peer, - needs_register_peer_complete=(test_setup.needs_register_peer_complete + needs_acknowledge_register_peer=test_setup.parameter.behavior_config.forward_register_peer, + needs_register_peer_complete=(test_setup.parameter.behavior_config.needs_register_peer_complete and not test_setup.register_peer_connection_predecessor_is_none) ) @@ -273,7 +259,7 @@ def assert_abort_timeout_sender_factory_mock(test_setup): peer=test_setup.peer, out_control_socket=test_setup.out_control_socket_mock, timer=test_setup.timer_mocks[1], - needs_acknowledge_register_peer=test_setup.forward_register_peer + needs_acknowledge_register_peer=test_setup.parameter.behavior_config.forward_register_peer ) @@ -281,7 +267,7 @@ def assert_synchronize_connection_sender_factory_mock(test_setup): mock_cast(test_setup.synchronize_connection_sender_factory_mock.create).assert_called_once_with( my_connection_info=test_setup.my_connection_info, peer=test_setup.peer, - sender=mock_cast(test_setup.sender_factory_mock.create).return_value, + sender=test_setup.sender_mock, timer=test_setup.timer_mocks[0] ) @@ -302,9 +288,19 @@ def assert_sender_factory(test_setup): def assert_timer_factory(test_setup): test_setup.timer_factory_mock.assert_has_calls([ - call.create(clock=test_setup.clock_mock, timeout_in_ms=test_setup.synchronize_timeout_in_ms), - call.create(clock=test_setup.clock_mock, timeout_in_ms=test_setup.abort_timeout_in_ms), - call.create(clock=test_setup.clock_mock, timeout_in_ms=test_setup.peer_is_ready_wait_time_in_ms), - call.create(clock=test_setup.clock_mock, timeout_in_ms=test_setup.synchronize_timeout_in_ms), - call.create(clock=test_setup.clock_mock, timeout_in_ms=test_setup.synchronize_timeout_in_ms), + call.create( + clock=test_setup.clock_mock, + timeout_in_ms=test_setup.parameter.timeout_config.synchronize_retry_timeout_in_ms), + call.create( + clock=test_setup.clock_mock, + timeout_in_ms=test_setup.parameter.timeout_config.abort_timeout_in_ms), + call.create( + clock=test_setup.clock_mock, + timeout_in_ms=test_setup.parameter.timeout_config.peer_is_ready_wait_time_in_ms), + call.create( + clock=test_setup.clock_mock, + timeout_in_ms=test_setup.parameter.timeout_config.register_peer_retry_timeout_in_ms), + call.create( + clock=test_setup.clock_mock, + timeout_in_ms=test_setup.parameter.timeout_config.acknowledge_register_peer_retry_timeout_in_ms), ]) diff --git a/tests/udf_communication/peer_communication/test_send_recv.py b/tests/udf_communication/peer_communication/test_send_recv.py index f16d1770..9c260e08 100644 --- a/tests/udf_communication/peer_communication/test_send_recv.py +++ b/tests/udf_communication/peer_communication/test_send_recv.py @@ -13,6 +13,8 @@ from exasol_advanced_analytics_framework.udf_communication.ip_address import IPAddress from exasol_advanced_analytics_framework.udf_communication.peer import Peer from exasol_advanced_analytics_framework.udf_communication.peer_communicator import PeerCommunicator +from exasol_advanced_analytics_framework.udf_communication.peer_communicator.forward_register_peer_config import \ + ForwardRegisterPeerConfig from exasol_advanced_analytics_framework.udf_communication.socket_factory.zmq_wrapper import ZMQSocketFactory from tests.udf_communication.peer_communication.conditional_method_dropper import ConditionalMethodDropper from tests.udf_communication.peer_communication.utils import TestProcess, BidirectionalQueue, assert_processes_finish, \ @@ -45,7 +47,12 @@ def run(parameter: PeerCommunicatorTestProcessParameter, queue: BidirectionalQue number_of_peers=parameter.number_of_instances, listen_ip=listen_ip, group_identifier=parameter.group_identifier, - socket_factory=socker_factory) + socket_factory=socker_factory, + forward_register_peer_config=ForwardRegisterPeerConfig( + is_leader=False, + is_enabled=False + ), + ) queue.put(com.my_connection_info) peer_connection_infos = queue.get() for index, connection_infos in peer_connection_infos.items():