Skip to content

Commit

Permalink
#98: Added more robust connection protocol (#108)
Browse files Browse the repository at this point in the history
* Introduced a tcp-like protocol with a two-side synchronize and ack. When a peer receives synchronize it can finish for its peer, but needs to wait, if it receives an ack it can finish immediately for the peer.
*  Also added a log analyzer
* Install itde as python dependency and pin it to 1.7.1

Co-authored-by: Nicola Coretti <[email protected]>
  • Loading branch information
tkilias and Nicoretti authored Jul 5, 2023
1 parent 0caf3b7 commit 19e6d6e
Show file tree
Hide file tree
Showing 50 changed files with 2,325 additions and 462 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ doc/_build

.luarocks

# vim
.*.swp
.*.swo

# vagrant

.vagrant
15 changes: 15 additions & 0 deletions doc/design/establish_connection/sequence/base.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
@startuml

!include ../../legend.puml

box "Peer1"
participant "Frontend" as Peer1Frontend
participant "Backend" as Peer1Backend
end box

box "Peer2"
participant "Backend" as Peer2Backend
participant "Frontend" as Peer2Frontend
end box

@enduml
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
@startuml

'https://plantuml.com/sequence-diagram

!include base.puml

par
[-[$method_call]> Peer1Frontend: register_peer()
Peer1Frontend -[$zmq_inproc]>> Peer1Backend: RegisterPeerMessage
Peer1Backend -[$zmq_tcp_rd_no_con]>> Peer2Backend: SynchronizeConnectionMessage
else
Peer2Frontend <[$method_call]-] : register_peer()
Peer2Backend <<[$zmq_inproc]- Peer2Frontend: RegisterPeerMessage
Peer1Backend <<[$zmq_tcp_rd_no_con]- Peer2Backend : SynchronizeConnectionMessage
end

par
Peer1Backend <<[$zmq_tcp_rd_no_con]- Peer2Backend: AcknowledgeConnectionMessage
Peer1Backend -[$zmq_inproc]>> Peer1Frontend: PeerIsReadyMessage(Peer2)
else
Peer1Backend -[$zmq_tcp_rd_no_con]>> Peer2Backend: AcknowledgeConnectionMessage
Peer2Backend -[$zmq_inproc]>> Peer2Frontend: PeerIsReadyMessage(Peer1)
end

@enduml
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
@startuml
'https://plantuml.com/sequence-diagram

!include base.puml

par
[-[$method_call]> Peer1Frontend: register_peer()
Peer1Frontend -[$zmq_inproc]>> Peer1Backend: RegisterPeerMessage
Peer1Backend -[$zmq_tcp_rd_no_con]x Peer2Backend: SynchronizeConnectionMessage
else
Peer2Frontend <[$method_call]-] : register_peer()
Peer2Backend <<[$zmq_inproc]- Peer2Frontend: RegisterPeerMessage
Peer1Backend x[$zmq_tcp_rd_no_con]- Peer2Backend : SynchronizeConnectionMessage
end

loop until abort timeout
par
...synchronize_timeout...
Peer1Backend -[$zmq_tcp_rd_no_con]x Peer2Backend: SynchronizeConnectionMessage
else
...synchronize_timeout...
Peer1Backend x[$zmq_tcp_rd_no_con]- Peer2Backend : SynchronizeConnectionMessage
end
end

par
Peer2Backend -[$zmq_inproc]>> Peer2Frontend: TimeoutMessage
else
Peer1Frontend <<[$zmq_inproc]- Peer1Backend: TimeoutMessage
end

@enduml
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
@startuml
'https://plantuml.com/sequence-diagram

!include base.puml

par
[-[$method_call]> Peer1Frontend: register_peer()
Peer1Frontend -[$zmq_inproc]>> Peer1Backend: RegisterPeerMessage
Peer1Backend -[$zmq_tcp_rd_no_con]>> Peer2Backend: SynchronizeConnectionMessage
else
Peer2Frontend <[$method_call]-] : register_peer()
Peer2Backend <<[$zmq_inproc]- Peer2Frontend: RegisterPeerMessage
Peer1Backend x[$zmq_tcp_rd_no_con]- Peer2Backend : SynchronizeConnectionMessage
end

par
Peer1Backend <<[$zmq_tcp_rd_no_con]- Peer2Backend: AcknowledgeConnectionMessage
Peer1Backend -[$zmq_inproc]>> Peer1Frontend: PeerIsReadyMessage(Peer2)
else
loop until peer_is_ready_wait_time
...synchronize_timeout...
Peer1Backend x[$zmq_tcp_rd_no_con]- Peer2Backend : SynchronizeConnectionMessage
end
end

Peer2Backend -[$zmq_inproc]>> Peer2Frontend: PeerIsReadyMessage(Peer1)

@enduml
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
@startuml
'https://plantuml.com/sequence-diagram

!include base.puml

[-[$method_call]> Peer1Frontend: register_peer()
Peer1Frontend -[$zmq_inproc]>> Peer1Backend: RegisterPeerMessage
Peer1Backend -[$zmq_tcp_rd_no_con]>> Peer2Backend: SynchronizeConnectionMessage
Peer1Backend <<[$zmq_tcp_rd_no_con]- Peer2Backend : SynchronizeConnectionMessage
par
Peer1Backend -[$zmq_tcp_rd_no_con]>> Peer2Backend: AcknowledgeConnectionMessage
Peer2Backend -[$zmq_inproc]>> Peer2Frontend: PeerIsReadyMessage(Peer1)
else
Peer1Backend <<[$zmq_tcp_rd_no_con]- Peer2Backend: AcknowledgeConnectionMessage
Peer1Frontend <<[$zmq_inproc]- Peer1Backend: PeerIsReadyMessage(Peer2)
end

@enduml
Binary file added doc/design/establish_connection/state_machine.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
18 changes: 18 additions & 0 deletions doc/design/establish_connection/state_machine.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
@startuml
'https://plantuml.com/state-diagram

[*] --> PeerIsRegistered : received RegisterPeerMessage / \n send SynchronizeConnectionMessage
[*] --> ReceivedSynchronizeConnectionMessage : received SynchronizeConnectionMessage / \n send SynchronizeConnectionMessage \n send AcknowledgeConnectionMessage
PeerIsRegistered --> ReceivedSynchronizeConnectionMessage : received SynchronizeConnectionMessage \n send AcknowledgeConnectionMessage
PeerIsRegistered --> ReceivedAcknowledgeConnectionMessage : received AcknowledgeConnectionMessage

ReceivedSynchronizeConnectionMessage --> PeerIsReady : received AcknowledgeConnectionMessage

PeerIsRegistered -> PeerIsRegistered : Timeout / \n send SynchronizeConnectionMessage

ReceivedSynchronizeConnectionMessage --> ReceivedSynchronizeConnectionMessage : Timeout / \n send SynchronizeConnectionMessage
ReceivedSynchronizeConnectionMessage --> PeerIsReady : Waited a while

ReceivedAcknowledgeConnectionMessage --> PeerIsReady : received SynchronizeConnectionMessage \n send AcknowledgeConnectionMessage
ReceivedAcknowledgeConnectionMessage --> PeerIsReady : Waited a while
@enduml
14 changes: 14 additions & 0 deletions doc/design/legend.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
@startuml
!$udp = '#red'
!$method_call = '#green'
!$zmq_inproc = '#blue'
!$zmq_tcp_rd_no_con = '#darkviolet'

legend
|Color| Protocol |
|<$udp>| UDP |
|<$method_call>| method calls |
|<$zmq_inproc>| ZMQ inproc |
|<$zmq_tcp_rd_no_con>| ZMQ TCP Dealer/Router Connection Less |
end legend
@enduml
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
36 changes: 36 additions & 0 deletions doc/design/local_discovery_strategy/both_peer_receive_ping.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
@startuml
'https://plantuml.com/sequence-diagram

!include ../legend.puml

box "Peer1"
participant "Frontend" as Peer1Frontend
participant "DiscoveryStrategy" as Peer1DiscoveryStrategy
end box

box "Peer2"
participant "DiscoveryStrategy" as Peer2DiscoveryStrategy
participant "Frontend" as Peer2Frontend
end box

par
loop are_all_peers_connected()
Peer1DiscoveryStrategy -[$udp]>> Peer2DiscoveryStrategy: PingMessage
Peer2DiscoveryStrategy -[$method_call]> Peer2Frontend: register_peer()
return
ref over Peer1Frontend, Peer2Frontend: Establish Connection
Peer2DiscoveryStrategy -[$method_call]> Peer2Frontend: are_all_peers_connected()
return
end
else
loop are_all_peers_connected()
Peer1DiscoveryStrategy <<[$udp]- Peer2DiscoveryStrategy: PingMessage
Peer1DiscoveryStrategy -[$method_call]> Peer1Frontend: register_peer()
return
ref over Peer1Frontend, Peer2Frontend: Establish Connection
Peer1DiscoveryStrategy -[$method_call]> Peer1Frontend: are_all_peers_connected()
return
end
end

@enduml
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
31 changes: 31 additions & 0 deletions doc/design/local_discovery_strategy/one_peer_receives_ping.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
@startuml
'https://plantuml.com/sequence-diagram

!include ../legend.puml

box "Peer1"
participant "Frontend" as Peer1Frontend
participant "DiscoveryStrategy" as Peer1DiscoveryStrategy
end box

box "Peer2"
participant "DiscoveryStrategy" as Peer2DiscoveryStrategy
participant "Frontend" as Peer2Frontend
end box

par
loop are_all_peers_connected()
Peer1DiscoveryStrategy -[$udp]>> Peer2DiscoveryStrategy: PingMessage
Peer2DiscoveryStrategy -[$method_call]> Peer2Frontend: register_peer()
return
ref over Peer1Frontend, Peer2Frontend: Establish Connection
Peer2DiscoveryStrategy -[$method_call]> Peer2Frontend: are_all_peers_connected()
return
end
else
loop are_all_peers_connected()
Peer1DiscoveryStrategy x[$udp]- Peer2DiscoveryStrategy: PingMessage
end
end

@enduml
89 changes: 34 additions & 55 deletions doc/design/udf_discovery_and_communication.rst
Original file line number Diff line number Diff line change
@@ -1,80 +1,59 @@
UDF Discovery and Communication
===============================

===============
Local Discovery
===============
===================
Establish Connection
===================

*********
Overview:
*********
* We use a protocol similar to the TCP Handshake for establishing the connection
* However, our protocol has two different requirements compared to TCP:

- UDP Broadcast for the initial `PingMessage` with connection information for the receiving socket of the reliable network
- After receiving `PingMessage`:
* Two peers can establish the connection at the same time
* In case of lost messages, one of the peers in a connection can successful terminate

- Create sending socket for peer to its receiving socket of the reliable network
- Send a `ReadyToReceiveMessage` with our connection information including the receiving socket port
over our sending socket for it to inform the peer that we are ready to receive from it
* To handle, these two requirements, we add the following modification:

- After receiving `ReadyToReceiveMessage` on our receiving socket:
* We allow both peers to send a synchronize at the same time
* When a peer receives the `SynchronizeConnectionMessage` from the second peer

- If not yet discovered, we create a sending socket for the peer to its receiving socket of the reliable network
- Send a `ReadyToReceiveMessage` with our connection information including the receiving socket port
over our sending socket for it, in case we didn't get its `PingMessage`.
- If we didn't get a `ReadyToReceiveMessage` message after a certain time ,
yet, we send a `AreYouReadyToReceiveMessage` to a discovered peer
* It sends first a `SynchronizeConnectionMessage` and `AcknowledgeConnectionMessage` back
* It can mark the second peer as ready, after it waited for the peer_is_ready_wait_time

- This should prevent a stuck handshake if we should lose a `ReadyToReceiveMessage` for whatever reason
* Both peers register each other:

- After receiving `AreYouReadyToReceiveMessage`:
.. image:: establish_connection/sequence/both_peers_receive_register_peer.png

- If not yet discovered, we create a sending socket for the peer to its receiving socket of the reliable network
- Send a `ReadyToReceiveMessage` with our connection information including the receiving socket port
over our sending socket for it, in case we didn't get its `PingMessage`.
* One peer registers the other peer:

.. image:: udf_communication_simple_overview.drawio.png
.. image:: establish_connection/sequence/one_peer_receives_register_peer.png

********
Details:
********
* Both peers register each other, one peer loses the `SynchronizeConnectionMessage`:

We separated the Local Discovery into two components. The component `LocalDiscoveryStrategy` implements
the discovery via UDP Broadcast. The component `PeerCommunicator` handles the reliable network.
.. image:: establish_connection/sequence/both_peers_receive_register_peer_one_peer_loses_synchonize.png

If the `LocalDiscoveryStrategy` receives a `PingMessage` via UDP it registers the connection info for
the reliable network of the peer with the `PeerCommunicator`.
* Both peers register each other, both lose the `SynchronizeConnectionMessage`:

The `PeerCommunicator` then handles the reliable network communication.
This includes sending and receiving the `ReadyToReceiveMessage` or `AreYouReadyToReceiveMessage`.
It also provides an interface for the user of the library to check if all peers are connected, which peers are there
and to send and receive message from these peers.
.. image:: establish_connection/sequence/both_peers_receive_register_peer_both_peers_lose_synchonize.png

The `PeerCommunicator` can be used with different discovery strategies.
The `LocalDiscoveryStrategy` is one, but the `GlobalDiscoveryStrategy` can use it as well
to form the reliable networks between the leaders.
* State diagram:

The current implementation of the `PeerCommunicator` use `ZMQ` for the reliable communication,
because it abstracts away the low-level network. It provides:
.. image:: establish_connection/state_diagram.png

- A message-based interface, instead the stream-based interface of TCP.
- Asynchronous message queue, instead of synchronous TCP socket
========================
Local Discovery Strategy
========================

- Being asynchronous means that the timings of the physical connection setup and tear down,
reconnect and effective delivery are transparent to the user and organized by ZeroMQ itself.
- Further, messages may be queued in the event that a peer is unavailable to receive them.
- The Local Discovery Strategy sends `PingMessage` with connection information
for establishing the connection via UDP Broadcast.
- When a peer receives a `PingMessage` from another peer.
it registers the other peer and treis to establish a connection
- The strategy sends and receives UDP Broadcast messages until all other peers are connected

We further split up the `PeerCommunicator` into a frontend which is called `PeerCommunicator`
and a `BackgroundListener` which runs in a thread. The `BackgroundListener` is also split into the
`BackgroundListenerInterface` and the `BackgroundListenerThread` to simplify the interaction between it
and the `PeerCommunicator`
* Both peers receive `PingMessage`:

The `BackgroundListenerThread` listens for incoming messages from other peers or the frontend and
forwards messages to the frontend.
.. image:: establish_connection/both_peer_receive_ping.png

Here a detailed overview of the information flow:
* One peer receive `PingMessage`:

.. image:: udf_communication_detail_overview.drawio.png
.. image:: establish_connection/one_peer_receives_ping.png

Here the state machines for the BackgroundListener and Frontend

.. image:: peer_communicator_state_machine.drawio.png
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import dataclasses

from pydantic import BaseModel


Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import socket
import time
from typing import Optional

from exasol_advanced_analytics_framework.udf_communication.ip_address import IPAddress, Port

Expand Down
Loading

0 comments on commit 19e6d6e

Please sign in to comment.