Skip to content

Commit

Permalink
Replaced bools with IntFlag in AbortTimeoutSender and fix messages mo…
Browse files Browse the repository at this point in the history
…dule import
  • Loading branch information
tkilias committed Jul 14, 2023
1 parent 9577c54 commit 5f1e24f
Show file tree
Hide file tree
Showing 12 changed files with 40 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import time
from typing import cast, Optional

from exasol_advanced_analytics_framework.udf_communication import messages
from exasol_advanced_analytics_framework.udf_communication.discovery.localhost import DiscoverySocket
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.peer_communicator import PeerCommunicator
from exasol_advanced_analytics_framework.udf_communication.serialization import serialize_message, deserialize_message
from tests.udf_communication.test_messages import messages

NANOSECONDS_PER_SECOND = 10 ** 9

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import time
from typing import cast, Optional

from exasol_advanced_analytics_framework.udf_communication import messages
from exasol_advanced_analytics_framework.udf_communication.discovery.multi_node.discovery_socket import DiscoverySocket
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.peer_communicator import PeerCommunicator
from exasol_advanced_analytics_framework.udf_communication.serialization import serialize_message, deserialize_message
from tests.udf_communication.test_messages import messages

NANOSECONDS_PER_SECOND = 10 ** 9

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
from enum import IntFlag, auto

import structlog
from structlog.typing import FilteringBoundLogger

from exasol_advanced_analytics_framework.udf_communication import messages
from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo
from exasol_advanced_analytics_framework.udf_communication.peer import Peer
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.timer import Timer
from exasol_advanced_analytics_framework.udf_communication.serialization import serialize_message
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import Socket
from tests.udf_communication.test_messages import messages

LOGGER: FilteringBoundLogger = structlog.get_logger()


class _States(IntFlag):
INIT = auto()
CONNECTION_SYNCHRONIZED = auto()
CONNECTION_ACKNOWLEDGED = auto()
REGISTER_PEER_ACKNOWLEDGED = auto()
FINISHED = auto()


class AbortTimeoutSender:
def __init__(self,
my_connection_info: ConnectionInfo,
Expand All @@ -21,55 +31,53 @@ def __init__(self,
self._needs_acknowledge_register_peer = needs_acknowledge_register_peer
self._timer = timer
self._out_control_socket = out_control_socket
self._received_synchronize_connection = False
self._received_acknowledge_connection = False
self._received_acknowledge_register_peer = False
self._finished = False
self._states = _States.INIT
self._logger = LOGGER.bind(
peer=peer.dict(),
my_connection_info=my_connection_info.dict())

def reset_timer(self):
self._logger.info("reset_timer")
self._logger.info("reset_timer", states=self._states)
self._timer.reset_timer()

def try_send(self):
self._logger.debug("try_send")
self._logger.debug("try_send", states=self._states)
should_we_send = self._should_we_send()
if should_we_send:
self._finished = True
self._states |= _States.FINISHED
self._send_timeout_to_frontend()

def _should_we_send(self):
is_time = self._timer.is_time()
abort_stopped = self._abort_stopped()
result = is_time and not self._finished and not abort_stopped
result = is_time and not _States.FINISHED in self._states and not abort_stopped
return result

def _abort_stopped(self):
connection_ok = self._received_synchronize_connection or self._received_acknowledge_connection
connection_ok = _States.CONNECTION_SYNCHRONIZED in self._states \
or _States.CONNECTION_ACKNOWLEDGED in self._states
received_acknowledge_register_peer = self._needs_acknowledge_register_peer \
or self._received_acknowledge_register_peer
or _States.REGISTER_PEER_ACKNOWLEDGED in self._states
abort_stopped = connection_ok and received_acknowledge_register_peer
return abort_stopped

def _send_timeout_to_frontend(self):
self._logger.debug("send")
self._logger.debug("send", states=self._states)
message = messages.Timeout()
serialized_message = serialize_message(message)
self._out_control_socket.send(serialized_message)

def received_synchronize_connection(self):
self._logger.info("received_synchronize_connection")
self._received_synchronize_connection = True
self._states |= _States.CONNECTION_SYNCHRONIZED
self._logger.debug("received_synchronize_connection", states=self._states)

def received_acknowledge_connection(self):
self._logger.info("received_acknowledge_connection")
self._received_acknowledge_connection = True
self._states |= _States.CONNECTION_ACKNOWLEDGED
self._logger.debug("received_acknowledge_connection", states=self._states)

def received_acknowledge_register_peer(self):
self._logger.info("received_acknowledge_register_peer")
self._received_acknowledge_register_peer = True
self._states |= _States.REGISTER_PEER_ACKNOWLEDGED
self._logger.debug("received_acknowledge_register_peer", states=self._states)


class AbortTimeoutSenderFactory:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from exasol_advanced_analytics_framework.udf_communication.serialization import deserialize_message, serialize_message
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import SocketFactory, \
SocketType, Socket, PollerFlag
from tests.udf_communication.test_messages import messages
from exasol_advanced_analytics_framework.udf_communication import messages

LOGGER: FilteringBoundLogger = structlog.get_logger()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import structlog
from structlog.types import FilteringBoundLogger

from exasol_advanced_analytics_framework.udf_communication import messages
from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo
from exasol_advanced_analytics_framework.udf_communication.ip_address import IPAddress, Port
from exasol_advanced_analytics_framework.udf_communication.peer import Peer
Expand All @@ -17,7 +18,6 @@
from exasol_advanced_analytics_framework.udf_communication.serialization import deserialize_message, serialize_message
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import SocketFactory, \
SocketType, Socket, PollerFlag, Frame
from tests.udf_communication.test_messages import messages

LOGGER: FilteringBoundLogger = structlog.get_logger()

Expand Down Expand Up @@ -125,7 +125,6 @@ def _run_message_loop(self):
except Exception as e:
self._logger.exception("Exception in message loop")


def _handle_control_message(self, message: bytes) -> Status:
try:
message_obj: messages.Message = deserialize_message(message, messages.Message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import structlog
from structlog.typing import FilteringBoundLogger

from exasol_advanced_analytics_framework.udf_communication import messages
from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo
from exasol_advanced_analytics_framework.udf_communication.peer import Peer
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.abort_timeout_sender import \
Expand All @@ -24,7 +25,6 @@
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.timer import TimerFactory
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import SocketFactory, \
SocketType, Socket, Frame
from tests.udf_communication.test_messages import messages

LOGGER: FilteringBoundLogger = structlog.get_logger()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import contextlib
from typing import Optional, Generator, List

from exasol_advanced_analytics_framework.udf_communication import messages
from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo
from exasol_advanced_analytics_framework.udf_communication.peer import Peer
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.get_peer_receive_socket_name import \
get_peer_receive_socket_name
from exasol_advanced_analytics_framework.udf_communication.serialization import serialize_message
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import SocketFactory, \
SocketType, Socket, Frame, PollerFlag
from tests.udf_communication.test_messages import messages


class FrontendPeerState:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import structlog
from structlog.types import FilteringBoundLogger

from exasol_advanced_analytics_framework.udf_communication import messages
from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo
from exasol_advanced_analytics_framework.udf_communication.ip_address import IPAddress
from exasol_advanced_analytics_framework.udf_communication.peer import Peer
Expand All @@ -14,7 +15,6 @@
FrontendPeerState
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import SocketFactory, \
Frame
from tests.udf_communication.test_messages import messages

LOGGER: FilteringBoundLogger = structlog.getLogger()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import structlog
from structlog.typing import FilteringBoundLogger

from exasol_advanced_analytics_framework.udf_communication import messages
from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo
from exasol_advanced_analytics_framework.udf_communication.peer import Peer
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.timer import Timer
from exasol_advanced_analytics_framework.udf_communication.serialization import serialize_message
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import Socket
from tests.udf_communication.test_messages import messages

LOGGER: FilteringBoundLogger = structlog.get_logger()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
import structlog
from structlog.typing import FilteringBoundLogger

from exasol_advanced_analytics_framework.udf_communication import messages
from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo
from exasol_advanced_analytics_framework.udf_communication.peer import Peer
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.send_socket_factory import \
SendSocketFactory
from exasol_advanced_analytics_framework.udf_communication.serialization import serialize_message
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import Socket
from tests.udf_communication.test_messages import messages

LOGGER: FilteringBoundLogger = structlog.get_logger()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import structlog
from structlog.typing import FilteringBoundLogger

from exasol_advanced_analytics_framework.udf_communication import messages
from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo
from exasol_advanced_analytics_framework.udf_communication.peer import Peer
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.sender import Sender
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.timer import Timer
from tests.udf_communication.test_messages import messages

LOGGER: FilteringBoundLogger = structlog.get_logger()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@

import pytest

from exasol_advanced_analytics_framework.udf_communication import messages
from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo
from exasol_advanced_analytics_framework.udf_communication.ip_address import IPAddress, Port
from exasol_advanced_analytics_framework.udf_communication.messages import Timeout
from exasol_advanced_analytics_framework.udf_communication.peer import Peer
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.abort_timeout_sender import \
AbortTimeoutSender
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.timer import Timer
from exasol_advanced_analytics_framework.udf_communication.serialization import serialize_message
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import Socket
from tests.udf_communication.test_messages import messages


def mock_cast(obj: Any) -> Mock:
Expand Down Expand Up @@ -155,7 +156,7 @@ def test_try_send_after_acknowledge_register_peer_and_is_time_true(needs_acknowl
assert (
test_setup.out_control_socket_mock.mock_calls ==
[
call.send(serialize_message(Timeout()))
call.send(serialize_message(messages.Timeout()))
]
and test_setup.timer_mock.mock_calls == [
call.is_time()
Expand Down Expand Up @@ -203,7 +204,7 @@ def test_try_send_after_synchronize_connection_and_is_time_true_and_needs_acknow
assert (
test_setup.out_control_socket_mock.mock_calls ==
[
call.send(serialize_message(Timeout()))
call.send(serialize_message(messages.Timeout()))
]
and test_setup.timer_mock.mock_calls == [
call.is_time()
Expand Down

0 comments on commit 5f1e24f

Please sign in to comment.