Skip to content

Commit

Permalink
Fix after rebase and use comanndline parameter in analyze_log.py
Browse files Browse the repository at this point in the history
  • Loading branch information
tkilias committed Jul 4, 2023
1 parent fc1a3da commit 5ab7646
Show file tree
Hide file tree
Showing 9 changed files with 18 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Literal, Union
from typing import Literal, Union, Optional

from pydantic import BaseModel

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.send_socket_factory import \
SendSocketFactory
from exasol_advanced_analytics_framework.udf_communication.serialization import serialize_message
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract_socket_factory import Socket
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import Socket

LOGGER: FilteringBoundLogger = structlog.get_logger()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo
from exasol_advanced_analytics_framework.udf_communication.peer import Peer
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract_socket_factory import SocketFactory, \
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import SocketFactory, \
Socket, SocketType

LOGGER: FilteringBoundLogger = structlog.get_logger(__name__)
Expand Down
10 changes: 5 additions & 5 deletions tests/udf_communication/peer_communication/analyze_log.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import sys
from collections import defaultdict, Counter
from pathlib import Path
from typing import Dict, List, Callable
Expand All @@ -23,7 +24,7 @@ def is_connection_acknowledged(line: Dict[str, str]):
def is_connection_synchronized(line: Dict[str, str]):
return line["module"] == "background_peer_state" and line["event"] == "received_synchronize_connection"

def is__register_peer_acknowledged(line: Dict[str, str]):
def is_register_peer_acknowledged(line: Dict[str, str]):
return line["module"] == "background_peer_state" and line["event"] == "received_acknowledge_register_peer"

def is_register_peer_complete(line: Dict[str, str]):
Expand All @@ -43,7 +44,7 @@ def print_source_target_interaction(group_source_target_map):
"is_peer_ready": is_peer_ready,
"is_connection_acknowledged": is_connection_acknowledged,
"is_connection_synchronized": is_connection_synchronized,
"is__register_peer_acknowledged": is__register_peer_acknowledged,
"is_register_peer_acknowledged": is_register_peer_acknowledged,
"is_register_peer_complete": is_register_peer_complete
}
for group, sources in group_source_target_map.items():
Expand All @@ -54,7 +55,7 @@ def print_source_target_interaction(group_source_target_map):
for predicate_name, predicate in predicates.items():
if not is_log_sequence_ok(lines, predicate):
not_ok[predicate_name] += 1
if predicate_name == "is_peer_is_ready_send":
if predicate_name == "is_peer_ready":
print(f"========== {predicate_name}-{group}-{source}-{target} ============")
else:
# if predicate_name == "is_received_acknowledge_register_peer":
Expand Down Expand Up @@ -122,7 +123,6 @@ def analyze_close(log_file_path: Path):


if __name__ == "__main__":
root = Path(__file__).parent
log_file_path = root / "test_add_peer_forward.log"
log_file_path=Path(sys.argv[1]).absolute()
analyze_source_target_interaction(log_file_path)
analyze_close(log_file_path)
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
logger_factory=WriteLoggerFactory(file=Path(__file__).with_suffix(".log").open("wt")),
processors=[
structlog.contextvars.merge_contextvars,
ConditionalMethodDropper(method_name="debug"),
#ConditionalMethodDropper(method_name="debug"),
structlog.processors.add_log_level,
structlog.processors.TimeStamper(),
structlog.processors.ExceptionRenderer(exception_formatter=ExceptionDictTransformer(locals_max_string=320)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
from exasol_advanced_analytics_framework.udf_communication.peer import Peer
from exasol_advanced_analytics_framework.udf_communication.peer_communicator import PeerCommunicator
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.peer_communicator import key_for_peer
from exasol_advanced_analytics_framework.udf_communication.socket_factory.fault_injection_socket_factory import \
FISocketFactory
from exasol_advanced_analytics_framework.udf_communication.socket_factory.zmq_socket_factory import ZMQSocketFactory
from exasol_advanced_analytics_framework.udf_communication.socket_factory.fault_injection import FaultInjectionSocketFactory
from exasol_advanced_analytics_framework.udf_communication.socket_factory.zmq_wrapper import ZMQSocketFactory
from tests.udf_communication.peer_communication.conditional_method_dropper import ConditionalMethodDropper
from tests.udf_communication.peer_communication.utils import TestProcess, BidirectionalQueue, assert_processes_finish

Expand All @@ -45,7 +44,7 @@ def run(name: str, group_identifier: str, number_of_instances: int, queue: Bidir
listen_ip = IPAddress(ip_address=f"127.1.0.1")
context = zmq.Context()
socket_factory = ZMQSocketFactory(context)
socket_factory = FISocketFactory(socket_factory, 0.01, RandomState(seed))
socket_factory = FaultInjectionSocketFactory(socket_factory, 0.01, RandomState(seed))
leader = False
leader_name = "t0"
if name == leader_name:
Expand Down Expand Up @@ -75,7 +74,7 @@ def run(name: str, group_identifier: str, number_of_instances: int, queue: Bidir
logger.exception("Exception during test", exception=e)


@pytest.mark.parametrize("number_of_instances, repetitions", [(2, 1000), (10, 100), (50, 10)])
@pytest.mark.parametrize("number_of_instances, repetitions", [(2, 1000), (10, 100)])
def test_reliability(number_of_instances: int, repetitions: int):
run_test_with_repetitions(number_of_instances, repetitions)

Expand Down Expand Up @@ -106,10 +105,6 @@ def test_functionality_25():
run_test_with_repetitions(25, REPETITIONS_FOR_FUNCTIONALITY)


def test_functionality_50():
run_test_with_repetitions(50, REPETITIONS_FOR_FUNCTIONALITY)


def run_test_with_repetitions(number_of_instances: int, repetitions: int):
for i in range(repetitions):
LOGGER.info(f"Start iteration",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
from typing import Union, cast, Any
from unittest.mock import MagicMock, Mock, create_autospec, call


from exasol_advanced_analytics_framework.udf_communication.connection_info import ConnectionInfo
from exasol_advanced_analytics_framework.udf_communication.ip_address import IPAddress, Port
from exasol_advanced_analytics_framework.udf_communication.messages import AcknowledgeConnectionMessage, Message
from exasol_advanced_analytics_framework.udf_communication.peer import Peer
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.abort_timeout_sender import \
AbortTimeoutSender
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.acknowledge_register_peer_sender import \
AcknowledgeRegisterPeerSender
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.background_peer_state import \
BackgroundPeerState
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.peer_is_ready_sender import \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.synchronize_connection_sender import \
SynchronizeConnectionSender, SynchronizeConnectionSenderFactory
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.timer import TimerFactory
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract_socket_factory import Socket, \
from exasol_advanced_analytics_framework.udf_communication.socket_factory.abstract import Socket, \
SocketFactory, SocketType


Expand Down
10 changes: 2 additions & 8 deletions tests/udf_communication/test_global_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@
from exasol_advanced_analytics_framework.udf_communication.global_discovery_socket import GlobalDiscoverySocket
from exasol_advanced_analytics_framework.udf_communication.global_discovery_strategy import GlobalDiscoveryStrategy
from exasol_advanced_analytics_framework.udf_communication.ip_address import Port, IPAddress
from exasol_advanced_analytics_framework.udf_communication.local_discovery_socket import LocalDiscoverySocket
from exasol_advanced_analytics_framework.udf_communication.local_discovery_strategy import LocalDiscoveryStrategy
from exasol_advanced_analytics_framework.udf_communication.peer import Peer
from exasol_advanced_analytics_framework.udf_communication.peer_communicator import PeerCommunicator
from exasol_advanced_analytics_framework.udf_communication.peer_communicator.peer_communicator import key_for_peer
from exasol_advanced_analytics_framework.udf_communication.socket_factory.zmq_socket_factory import ZMQSocketFactory
from exasol_advanced_analytics_framework.udf_communication.socket_factory.zmq_wrapper import ZMQSocketFactory
from tests.udf_communication.peer_communication.conditional_method_dropper import ConditionalMethodDropper
from tests.udf_communication.peer_communication.utils import TestProcess, BidirectionalQueue, assert_processes_finish

Expand Down Expand Up @@ -71,7 +69,7 @@ def run(name: str, group_identifier: str, number_of_instances: int, queue: Bidir
queue.put([])


@pytest.mark.parametrize("number_of_instances, repetitions", [(2, 1000), (10, 100), (50, 10)])
@pytest.mark.parametrize("number_of_instances, repetitions", [(2, 1000), (10, 100)])
def test_reliability(number_of_instances: int, repetitions: int):
run_test_with_repetitions(number_of_instances, repetitions)

Expand Down Expand Up @@ -99,10 +97,6 @@ def test_functionality_25():
run_test_with_repetitions(25, REPETITIONS_FOR_FUNCTIONALITY)


def test_functionality_50():
run_test_with_repetitions(50, REPETITIONS_FOR_FUNCTIONALITY)


def run_test_with_repetitions(number_of_instances: int, repetitions: int):
for i in range(repetitions):
LOGGER.info(f"Start iteration",
Expand Down

0 comments on commit 5ab7646

Please sign in to comment.