Skip to content

Commit

Permalink
Rename GlobalDiscovery and LocalDiscovery into MultiNodeDiscovery and…
Browse files Browse the repository at this point in the history
… LocalhostDiscovery and fix some names after applying review suggestion
  • Loading branch information
tkilias committed Jul 14, 2023
1 parent dfaa06d commit 9ce2bf8
Show file tree
Hide file tree
Showing 12 changed files with 25 additions and 26 deletions.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .discovery_socket import DiscoverySocket
from .discovery_strategy import DiscoveryStrategy
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from exasol_advanced_analytics_framework.udf_communication.ip_address import IPAddress, Port


class LocalDiscoverySocket:
class DiscoverySocket:

def __init__(self, port: Port):
self._port = port
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time
from typing import cast, Optional

from exasol_advanced_analytics_framework.udf_communication.local_discovery_socket import LocalDiscoverySocket
from exasol_advanced_analytics_framework.udf_communication.discovery.localhost import DiscoverySocket
from exasol_advanced_analytics_framework.udf_communication.messages import PingMessage
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.peer_communicator import PeerCommunicator
from exasol_advanced_analytics_framework.udf_communication.serialization import serialize_message, deserialize_message
Expand All @@ -15,16 +15,16 @@ def _convert_to_ping_message(serialized_message: bytes) -> PingMessage:
return ping_message


class LocalDiscoveryStrategy:
class DiscoveryStrategy:

def __init__(self,
discovery_timeout_in_seconds: int,
time_between_ping_messages_in_seconds: float,
peer_communicator: PeerCommunicator,
local_discovery_socket: LocalDiscoverySocket
discovery_socket: DiscoverySocket
):
self._time_between_ping_messages_in_seconds = float(time_between_ping_messages_in_seconds)
self._local_discovery_socket = local_discovery_socket
self._local_discovery_socket = discovery_socket
self._peer_communicator = peer_communicator
self._discovery_timeout_in_ns = discovery_timeout_in_seconds * NANOSECONDS_PER_SECOND
self._discover_peers()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .discovery_socket import DiscoverySocket
from .discovery_strategy import DiscoveryStrategy
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
LOGGER: FilteringBoundLogger = structlog.getLogger()


class GlobalDiscoverySocket:
class DiscoverySocket:

def __init__(self, ip_address: IPAddress, port: Port):
self._port = port
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time
from typing import cast, Optional

from exasol_advanced_analytics_framework.udf_communication.global_discovery_socket import GlobalDiscoverySocket
from exasol_advanced_analytics_framework.udf_communication.discovery.multi_node.discovery_socket import DiscoverySocket
from exasol_advanced_analytics_framework.udf_communication.messages import PingMessage
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.peer_communicator import PeerCommunicator
from exasol_advanced_analytics_framework.udf_communication.serialization import serialize_message, deserialize_message
Expand All @@ -15,15 +15,15 @@ def _to_ping_message(serialized_message: bytes) -> PingMessage:
return ping_message


class GlobalDiscoveryStrategy:
class DiscoveryStrategy:

def __init__(self,
discovery_timeout_in_seconds: int,
time_between_ping_messages_in_seconds: float,
peer_communicator: PeerCommunicator,
global_discovery_socket: GlobalDiscoverySocket):
discovery_socket: DiscoverySocket):
self._time_between_ping_messages_in_seconds = float(time_between_ping_messages_in_seconds)
self._global_discovery_socket = global_discovery_socket
self._global_discovery_socket = discovery_socket
self._peer_communicator = peer_communicator
self._discovery_timeout_in_ns = discovery_timeout_in_seconds * NANOSECONDS_PER_SECOND
if not self._peer_communicator.forward_enabled:
Expand Down Expand Up @@ -73,7 +73,7 @@ def _compute_receive_timeout_in_seconds(self, begin_time_ns: int) -> float:
return timeout_in_seconds

def _handle_serialized_message(self, serialized_message) -> float:
ping_message = _convert_to_ping_message(serialized_message)
ping_message = _to_ping_message(serialized_message)
timeout_in_seconds = 0.0
if ping_message is not None:
self._peer_communicator.register_peer(ping_message.source)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def resend_if_necessary(self):
self._peer_is_ready_sender,
self._acknowledge_register_peer_sender
]
for sender in sender:
for sender in senders:
try_send = getattr(sender, "try_send")
try_send()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
LOGGER: FilteringBoundLogger = structlog.get_logger()


import enum

class PeerIsReadySender:


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.send_socket_factory import \
SendSocketFactory
from exasol_advanced_analytics_framework.udf_communication.serialization import serialize_message
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import SocketFactory, \
Socket, SocketType
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import SocketFactory

LOGGER: FilteringBoundLogger = structlog.get_logger(__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
from structlog.types import FilteringBoundLogger

from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo
from exasol_advanced_analytics_framework.udf_communication.discovery import localhost
from exasol_advanced_analytics_framework.udf_communication.ip_address import Port, IPAddress
from exasol_advanced_analytics_framework.udf_communication.local_discovery_socket import LocalDiscoverySocket
from exasol_advanced_analytics_framework.udf_communication.local_discovery_strategy import LocalDiscoveryStrategy
from exasol_advanced_analytics_framework.udf_communication.peer import Peer
from exasol_advanced_analytics_framework.udf_communication.peer_communicator import PeerCommunicator
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.peer_communicator import key_for_peer
Expand All @@ -38,7 +37,7 @@


def run(name: str, group_identifier: str, number_of_instances: int, queue: BidirectionalQueue, seed: int = 0):
local_discovery_socket = LocalDiscoverySocket(Port(port=44444))
local_discovery_socket = localhost.DiscoverySocket(Port(port=44444))
listen_ip = IPAddress(ip_address="127.1.0.1")
context = zmq.Context()
socker_factory = ZMQSocketFactory(context)
Expand All @@ -49,10 +48,10 @@ def run(name: str, group_identifier: str, number_of_instances: int, queue: Bidir
group_identifier=group_identifier,
socket_factory=socker_factory)
queue.put(peer_communicator.my_connection_info)
discovery = LocalDiscoveryStrategy(
discovery = localhost.DiscoveryStrategy(
discovery_timeout_in_seconds=120,
time_between_ping_messages_in_seconds=1,
local_discovery_socket=local_discovery_socket,
discovery_socket=local_discovery_socket,
peer_communicator=peer_communicator
)
if peer_communicator.are_all_peers_connected():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
from structlog.types import FilteringBoundLogger

from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo
from exasol_advanced_analytics_framework.udf_communication.global_discovery_socket import GlobalDiscoverySocket
from exasol_advanced_analytics_framework.udf_communication.global_discovery_strategy import GlobalDiscoveryStrategy
from exasol_advanced_analytics_framework.udf_communication.discovery import multi_node
from exasol_advanced_analytics_framework.udf_communication.ip_address import Port, IPAddress
from exasol_advanced_analytics_framework.udf_communication.peer import Peer
from exasol_advanced_analytics_framework.udf_communication.peer_communicator import PeerCommunicator
Expand Down Expand Up @@ -39,7 +38,7 @@

def run(name: str, group_identifier: str, number_of_instances: int, queue: BidirectionalQueue, seed: int = 0):
listen_ip = IPAddress(ip_address=f"127.1.0.1")
global_discovery_socket = GlobalDiscoverySocket(ip_address=listen_ip, port=Port(port=44444))
global_discovery_socket = multi_node.DiscoverySocket(ip_address=listen_ip, port=Port(port=44444))
context = zmq.Context()
socket_factory = ZMQSocketFactory(context)
leader = False
Expand All @@ -56,10 +55,10 @@ def run(name: str, group_identifier: str, number_of_instances: int, queue: Bidir
socket_factory=socket_factory
)
queue.put(peer_communicator.my_connection_info)
discovery = GlobalDiscoveryStrategy(
discovery = multi_node.DiscoveryStrategy(
discovery_timeout_in_seconds=120,
time_between_ping_messages_in_seconds=1,
global_discovery_socket=global_discovery_socket,
discovery_socket=global_discovery_socket,
peer_communicator=peer_communicator
)
if peer_communicator.are_all_peers_connected():
Expand Down

0 comments on commit 9ce2bf8

Please sign in to comment.