Skip to content

Commit

Permalink
Add new process for stopping a PeerCommunicator
Browse files Browse the repository at this point in the history
  • Loading branch information
tkilias committed Aug 25, 2023
1 parent 3e9ea1b commit 6cc7a33
Show file tree
Hide file tree
Showing 23 changed files with 641 additions and 735 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ class Ping(BaseModel, frozen=True):
class Stop(BaseModel, frozen=True):
message_type: Literal["Stop"] = "Stop"

class PrepareToStop(BaseModel, frozen=True):
message_type: Literal["PrepareToStop"] = "PrepareToStop"

class IsReadyToStop(BaseModel, frozen=True):
message_type: Literal["IsReadyToStop"] = "IsReadyToStop"

class Payload(BaseModel, frozen=True):
message_type: Literal["Payload"] = "Payload"
Expand Down Expand Up @@ -69,6 +74,8 @@ class Message(BaseModel, frozen=True):
AcknowledgeRegisterPeer,
RegisterPeerComplete,
Stop,
PrepareToStop,
IsReadyToStop,
Payload,
MyConnectionInfo,
PeerIsReadyToReceive,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ def _should_we_send(self):
result = is_time and not self._finished
return result

def is_ready_to_stop(self):
result = (self._finished and self._needs_to_send_for_peer) or not self._needs_to_send_for_peer
self._logger.debug("is_ready_to_stop", finished=self._finished, is_ready_to_stop=result)
return result


class AcknowledgeRegisterPeerSenderFactory():
def create(self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo
from exasol_advanced_analytics_framework.udf_communication.ip_address import IPAddress
from exasol_advanced_analytics_framework.udf_communication.messages import Message, IsReadyToStop, Stop, PrepareToStop
from exasol_advanced_analytics_framework.udf_communication.peer import Peer
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.background_listener_thread import \
BackgroundListenerThread
Expand All @@ -25,6 +26,7 @@ class BackgroundListenerInterface:

def __init__(self,
name: str,
number_of_peers: int,
socket_factory: SocketFactory,
listen_ip: IPAddress,
group_identifier: str,
Expand All @@ -42,8 +44,10 @@ def __init__(self,
out_control_socket_address = self._create_out_control_socket(socket_factory)
in_control_socket_address = self._create_in_control_socket(socket_factory)
self._my_connection_info: Optional[ConnectionInfo] = None
self._is_ready_to_stop = False
self._background_listener_run = BackgroundListenerThread(
name=self._name,
number_of_peers=number_of_peers,
socket_factory=socket_factory,
listen_ip=listen_ip,
group_identifier=group_identifier,
Expand Down Expand Up @@ -95,15 +99,24 @@ def receive_messages(self, timeout_in_milliseconds: Optional[int] = 0) -> Iterat
timeout_in_ms=timeout_in_milliseconds):
message = None
try:
message = self._out_control_socket.receive()
message_obj: messages.Message = deserialize_message(message, messages.Message)
specific_message_obj = message_obj.__root__
timeout_in_milliseconds = 0
yield specific_message_obj
message = self._out_control_socket.receive()
message_obj: Message = deserialize_message(message, Message)
yield from self._handle_message(message_obj)
except Exception as e:
self._logger.exception("Exception", raw_message=message)

def close(self):
def _handle_message(self, message_obj: Message) -> Message:
specific_message_obj = message_obj.__root__
if isinstance(specific_message_obj, IsReadyToStop):
self._is_ready_to_stop = True
else:
yield message_obj

def is_ready_to_stop(self):
return self._is_ready_to_stop

def stop(self):
self._logger.info("start")
self._send_stop()
self._thread.join()
Expand All @@ -112,6 +125,12 @@ def close(self):
self._logger.info("end")

def _send_stop(self):
self._logger.info("_send_stop")
stop_message = messages.Stop()
self._in_control_socket.send(serialize_message(stop_message))
self._in_control_socket.send(serialize_message(Stop()))

def prepare_to_stop(self):
self._logger.info("start")
self._send_prepare_to_stop()
self._logger.info("end")

def _send_prepare_to_stop(self):
self._in_control_socket.send(serialize_message(PrepareToStop()))
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from exasol_advanced_analytics_framework.udf_communication import messages
from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo
from exasol_advanced_analytics_framework.udf_communication.ip_address import IPAddress, Port
from exasol_advanced_analytics_framework.udf_communication.messages import IsReadyToStop, PrepareToStop
from exasol_advanced_analytics_framework.udf_communication.peer import Peer
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.abort_timeout_sender import \
AbortTimeoutSenderFactory
Expand Down Expand Up @@ -71,10 +72,12 @@ def create_background_peer_state_builder() -> BackgroundPeerStateBuilder:
class BackgroundListenerThread:
class Status(enum.Enum):
RUNNING = enum.auto()
PREPARE_TO_STOP = enum.auto()
STOPPED = enum.auto()

def __init__(self,
name: str,
number_of_peers: int,
socket_factory: SocketFactory,
listen_ip: IPAddress,
group_identifier: str,
Expand All @@ -84,6 +87,7 @@ def __init__(self,
config: PeerCommunicatorConfig,
trace_logging: bool,
background_peer_state_factory: BackgroundPeerStateBuilder = create_background_peer_state_builder()):
self._number_of_peers = number_of_peers
self._config = config
self._background_peer_state_factory = background_peer_state_factory
self._register_peer_connection: Optional[RegisterPeerConnection] = None
Expand Down Expand Up @@ -112,16 +116,16 @@ def run(self):
try:
self._run_message_loop()
finally:
self._close()
self._stop()

def _close(self):
def _stop(self):
self._logger.info("start")
if self._register_peer_connection is not None:
self._register_peer_connection.close()
self._out_control_socket.close(linger=0)
self._in_control_socket.close(linger=0)
for peer_state in self._peer_state.values():
peer_state.close()
peer_state.stop()
self._listener_socket.close(linger=0)
self._logger.info("end")

Expand All @@ -146,31 +150,51 @@ def _create_poller(self):

def _run_message_loop(self):
try:
while self._status == BackgroundListenerThread.Status.RUNNING:
poll = self.poller.poll(timeout_in_ms=self._config.poll_timeout_in_ms)
if self._in_control_socket in poll and PollerFlag.POLLIN in poll[self._in_control_socket]:
message = self._in_control_socket.receive()
self._status = self._handle_control_message(message)
if self._listener_socket in poll and PollerFlag.POLLIN in poll[self._listener_socket]:
message = self._listener_socket.receive_multipart()
self._handle_listener_message(message)
if self._status == BackgroundListenerThread.Status.RUNNING:
for peer_state in self._peer_state.values():
peer_state.resend_if_necessary()
while self._status != BackgroundListenerThread.Status.STOPPED:
self._handle_message()
self._try_send()
self._check_is_ready_to_stop()
except Exception as e:
self._logger.exception("Exception in message loop")

def _check_is_ready_to_stop(self):
if self._status == BackgroundListenerThread.Status.PREPARE_TO_STOP:
if self._is_ready_to_stop():
self._out_control_socket.send(serialize_message(IsReadyToStop()))

def _is_ready_to_stop(self):
peers_status = [peer_state.is_ready_to_stop()
for peer_state in self._peer_state.values()]
is_ready_to_stop = all(peers_status) and len(peers_status) == self._number_of_peers - 1
return is_ready_to_stop

def _try_send(self):
if self._status != BackgroundListenerThread.Status.STOPPED:
for peer_state in self._peer_state.values():
peer_state.try_send()

def _handle_message(self):
poll = self.poller.poll(timeout_in_ms=self._config.poll_timeout_in_ms)
if self._in_control_socket in poll and PollerFlag.POLLIN in poll[self._in_control_socket]:
message = self._in_control_socket.receive()
self._status = self._handle_control_message(message)
if self._listener_socket in poll and PollerFlag.POLLIN in poll[self._listener_socket]:
message = self._listener_socket.receive_multipart()
self._handle_listener_message(message)

def _handle_control_message(self, message: bytes) -> Status:
try:
message_obj: messages.Message = deserialize_message(message, messages.Message)
specific_message_obj = message_obj.__root__
if isinstance(specific_message_obj, messages.Stop):
return BackgroundListenerThread.Status.STOPPED
elif isinstance(specific_message_obj, PrepareToStop):
return BackgroundListenerThread.Status.PREPARE_TO_STOP
elif isinstance(specific_message_obj, messages.RegisterPeer):
if self._is_register_peer_message_allowed_as_control_message():
self._handle_register_peer_message(specific_message_obj)
else:
self._logger.error("RegisterPeerMessage message not allowed",
self._logger.error("RegisterPeer message not allowed",
message_obj=specific_message_obj.dict())
else:
self._logger.error("Unknown message type", message_obj=specific_message_obj.dict())
Expand All @@ -180,11 +204,11 @@ def _handle_control_message(self, message: bytes) -> Status:

def _is_register_peer_message_allowed_as_control_message(self):
return (
(
self._config.forward_register_peer_config.is_enabled
and self._config.forward_register_peer_config.is_leader
)
or not self._config.forward_register_peer_config.is_enabled
(
self._config.forward_register_peer_config.is_enabled
and self._config.forward_register_peer_config.is_leader
)
or not self._config.forward_register_peer_config.is_enabled
)

def _add_peer(self,
Expand Down Expand Up @@ -227,7 +251,7 @@ def _handle_listener_message(self, message: List[Frame]):
if self.is_register_peer_message_allowed_as_listener_message():
self._handle_register_peer_message(specific_message_obj)
else:
logger.error("RegisterPeerMessage message not allowed", message_obj=specific_message_obj.dict())
logger.error("RegisterPeer message not allowed", message_obj=specific_message_obj.dict())
elif isinstance(specific_message_obj, messages.AcknowledgeRegisterPeer):
self._handle_acknowledge_register_peer_message(specific_message_obj)
elif isinstance(specific_message_obj, messages.RegisterPeerComplete):
Expand Down Expand Up @@ -276,7 +300,7 @@ def _handle_register_peer_message(self, message: messages.RegisterPeer):
self._add_peer(
message.peer,
connection_establisher_behavior_config=ConnectionEstablisherBehaviorConfig(
acknowledge_register_peer=True,
acknowledge_register_peer=not self._config.forward_register_peer_config.is_leader,
needs_register_peer_complete=True)
)
return
Expand All @@ -285,7 +309,7 @@ def _handle_register_peer_message(self, message: messages.RegisterPeer):
message.peer,
connection_establisher_behavior_config=ConnectionEstablisherBehaviorConfig(
forward_register_peer=True,
acknowledge_register_peer=True,
acknowledge_register_peer=not self._config.forward_register_peer_config.is_leader,
needs_register_peer_complete=True)
)

Expand Down Expand Up @@ -313,12 +337,12 @@ def _create_register_peer_connection(self, message: messages.RegisterPeer):

def _handle_acknowledge_register_peer_message(self, message: messages.AcknowledgeRegisterPeer):
if self._register_peer_connection.successor != message.source:
self._logger.error("AcknowledgeRegisterPeerMessage message not from successor", message_obj=message.dict())
self._logger.error("AcknowledgeRegisterPeer message not from successor", message_obj=message.dict())
peer = message.peer
self._peer_state[peer].received_acknowledge_register_peer()

def _handle_register_peer_complete_message(self, message: messages.RegisterPeerComplete):
if self._register_peer_connection.predecessor != message.source:
self._logger.error("RegisterPeerCompleteMessage message not from predecssor", message_obj=message.dict())
self._logger.error("RegisterPeerComplete message not from predecssor", message_obj=message.dict())
peer = message.peer
self._peer_state[peer].received_register_peer_complete()
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def _create_receive_socket(self):
receive_socket_address = get_peer_receive_socket_name(self._peer)
self._receive_socket.bind(receive_socket_address)

def resend_if_necessary(self):
def try_send(self):
self._logger.debug("resend_if_necessary")
self._connection_establisher.try_send()

Expand All @@ -61,5 +61,8 @@ def received_register_peer_complete(self):
def forward_payload(self, frames: List[Frame]):
self._receive_socket.send_multipart(frames)

def close(self):
def stop(self):
self._receive_socket.close(linger=0)

def is_ready_to_stop(self):
return self._connection_establisher.is_ready_to_stop()
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def __init__(self,
register_peer_sender: RegisterPeerSender,
synchronize_connection_sender: SynchronizeConnectionSender):
self._synchronize_connection_sender = synchronize_connection_sender
self._register_peer_sender = register_peer_sender
self._peer_is_ready_sender = peer_is_ready_sender
self._register_peer_sender = register_peer_sender
self._acknowledge_register_peer_sender = acknowledge_register_peer_sender
self._abort_timeout_sender = abort_timeout_sender
self._register_peer_connection = register_peer_connection
Expand Down Expand Up @@ -84,3 +84,15 @@ def try_send(self):
self._abort_timeout_sender.try_send()
self._peer_is_ready_sender.try_send()
self._acknowledge_register_peer_sender.try_send()

def is_ready_to_stop(self):
peer_is_ready_sender = self._peer_is_ready_sender.is_ready_to_stop()
register_peer_sender = self._register_peer_sender.is_ready_to_stop()
self._logger.debug("is_ready_to_stop",
peer_is_ready_sender=peer_is_ready_sender,
register_peer_sender=register_peer_sender,
)
return (
peer_is_ready_sender
and register_peer_sender
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
from typing import Optional, Generator, List

from exasol_advanced_analytics_framework.udf_communication import messages
import structlog
from structlog.typing import FilteringBoundLogger

from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo
from exasol_advanced_analytics_framework.udf_communication.peer import Peer
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.get_peer_receive_socket_name import \
Expand All @@ -10,6 +13,8 @@
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import SocketFactory, \
SocketType, Socket, Frame, PollerFlag

LOGGER: FilteringBoundLogger = structlog.getLogger()


class FrontendPeerState:

Expand Down Expand Up @@ -54,10 +59,11 @@ def send(self, payload: List[Frame]):
serialized_message = serialize_message(message)
frame = self._socket_factory.create_frame(serialized_message)
send_socket.send_multipart([frame] + payload)
send_socket.close(linger=100)

def recv(self, timeout_in_milliseconds: Optional[int] = None) -> List[Frame]:
if self._receive_socket.poll(flags=PollerFlag.POLLIN, timeout_in_ms=timeout_in_milliseconds) != 0:
return self._receive_socket.receive_multipart()

def close(self):
def stop(self):
self._receive_socket.close(linger=0)
Loading

0 comments on commit 6cc7a33

Please sign in to comment.