Skip to content

Commit

Permalink
rework internal com based on XML-RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
julien6387 committed Apr 18, 2024
1 parent 8bf57f1 commit c0fcc7a
Show file tree
Hide file tree
Showing 15 changed files with 740 additions and 486 deletions.
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
The `internal_port` of the **Supvisors** section in the Supervisor configuration file is no longer needed.
As a consequence, the `supvisors_list` option is simplified as follows: `<identifier>host_name:http_port`.
The transitional `SupvisorsInstanceStates.ISOLATING` state has been removed.
The `rpc_failure` information has been added to the result of the `get_instance_info` XML-RPC.
The remote **Supvisors** instance becomes `SILENT` as soon as the published events fails due to a transport issue.

* Implement [Issue #50](https://github.com/julien6387/supvisors/issues/50).
A new tag `operational_status` in the Application rules allows to declare the formula applicable to evaluate the
Expand Down
99 changes: 67 additions & 32 deletions supvisors/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,12 @@ def invalid(self, status: SupvisorsInstanceStatus, fence=None) -> None:
The local Supvisors instance is never set to ISOLATED, whatever the option is set or not.
Always give it a chance to restart.
@param: fence: True when the remote Supvisors instance has isolated the local Supvisors instance
:param status: the Supvisors instance to invalid.
:param fence: True when the remote Supvisors instance has isolated the local Supvisors instance.
"""
if status.identifier == self.local_identifier:
# A very few events can cause this situation:
# 1. a network failure
# 1. a network failure (XML-RPC request)
# 2. a discrepancy has been detected between the internal context and the process events received
# a new CHECKING phase is required
# NOTE: the local Supvisors instance cannot be ISOLATED from itself
Expand Down Expand Up @@ -425,31 +426,38 @@ def setdefault_process(self, identifier: str, info: Payload) -> Optional[Process
application.add_process(process)
return process

def load_processes(self, identifier: str, all_info: PayloadList) -> None:
def load_processes(self, identifier: str, all_info: Optional[PayloadList]) -> None:
""" Load application dictionary from the process information received from the remote Supvisors.
:param identifier: the identifier of the Supvisors instance
:param all_info: the process information got from the node
:return: None
:param identifier: the identifier of the Supvisors instance.
:param all_info: the process information got from the node.
:return: None.
"""
self.logger.trace(f'Context.load_processes: identifier={identifier} all_info={all_info}')
# get SupvisorsInstanceStatus corresponding to identifier
status = self.instances[identifier]
# store processes into their application entry
for info in all_info:
# get or create process
process = self.setdefault_process(identifier, info)
if process:
# share the instance to the Supervisor instance that holds it
status.add_process(process)
# re-evaluate application sequences and status
for application in self.applications.values():
application.update_sequences()
application.update()
# Write the disabilities file when all local information is made available,
# so that an example exists whatever enable / disable has been called or not
if identifier == self.local_identifier:
self.supvisors.server_options.write_disabilities(False)
if all_info is None:
# the check call in SupervisorProxy failed
# the remote Supvisors instance is likely starting, restarting or shutting down so defer
self.logger.warn(f'Context.load_processes: failed to get all process info from Supvisors={identifier}')
# go back to UNKNOWN to give it a chance at next TICK
status.state = SupvisorsInstanceStates.UNKNOWN
else:
# store processes into their application entry
for info in all_info:
# get or create process
process = self.setdefault_process(identifier, info)
if process:
# share the instance to the Supervisor instance that holds it
status.add_process(process)
# re-evaluate application sequences and status
for application in self.applications.values():
application.update_sequences()
application.update()
# Write the disabilities file when all local information is made available,
# so that an example exists whatever enable / disable has been called or not
if identifier == self.local_identifier:
self.supvisors.server_options.write_disabilities(False)

# methods on events
def on_instance_state_event(self, identifier: str, event: Payload) -> None:
Expand Down Expand Up @@ -506,7 +514,7 @@ def on_authorization(self, identifier: str, authorized: Optional[bool]) -> bool:
return False
# process authorization status
if authorized is None:
# the check call in SupvisorsMainLoop failed
# the check call in SupervisorProxy failed
# the remote Supvisors instance is likely starting, restarting or shutting down so defer
self.logger.warn(f'Context.on_authorization: failed to get auth status from Supvisors={identifier}')
# go back to UNKNOWN to give it a chance at next TICK
Expand Down Expand Up @@ -597,7 +605,7 @@ def on_timer_event(self, event: Payload) -> Tuple[NameList, Set[ProcessStatus]]:
:return: the identifiers of the invalidated Supvisors instances and the processes in failure.
"""
invalidated_identifiers: List[str] = []
process_failures: Set[ProcessStatus] = set({}) # strange but avoids IDE warning on annotations
failed_processes: Set[ProcessStatus] = set()
# use the local TICK counter received as a reference
sequence_counter = event['sequence_counter']
# check all Supvisors instances
Expand All @@ -608,25 +616,52 @@ def on_timer_event(self, event: Payload) -> Tuple[NameList, Set[ProcessStatus]]:
invalidated_identifiers.append(status.identifier)
# for processes that were running on node, invalidate node in process
# WARN: Decision is made NOT to remove the node payload from the ProcessStatus and NOT to remove
# the ProcessStatus from the Context if no more node payload left.
# The aim is to keep a trace in the Web UI about the application processes that have been lost
# and their related description.
process_failures.update({process for process in status.running_processes()
# the ProcessStatus from the Context if no more node payload left.
# The aim is to keep a trace in the Web UI about the application processes that have been lost
# and their related description.
failed_processes.update({process for process in status.running_processes()
if process.invalidate_identifier(status.identifier)})
# trigger the corresponding Supvisors events
self.publish_process_failures(failed_processes)
# return the identifiers of all invalidated Supvisors instances and the processes declared in failure
return invalidated_identifiers, failed_processes

def publish_process_failures(self, failed_processes: Set[ProcessStatus]) -> None:
""" Publish the Supvisors events related with the processes failures.
:param failed_processes: the processes in failure.
:return: None.
"""
# publish process status in failure
if self.external_publisher:
for process in process_failures:
for process in failed_processes:
self.external_publisher.send_process_status(process.serial())
# update all application sequences and status
for application_name in {process.application_name for process in process_failures}:
for application_name in {process.application_name for process in failed_processes}:
application = self.applications[application_name]
# update sequence useless as long as the application.process map is not impacted (see decision above)
# the application sequences update is useless as long as the application.process map is not impacted
# (see decision comment above in on_timer_event)
# application.update_sequences()
application.update()
if self.external_publisher:
self.external_publisher.send_application_status(application.serial())
# return the identifiers of all invalidated Supvisors instances and the processes declared in failure
return invalidated_identifiers, process_failures

def on_instance_failure(self, identifier: str) -> Set[ProcessStatus]:
""" Invalid a Supvisors instance that had an XML-RPC failure.
:param identifier: the identifier of the Supvisors instance that sent the event.
:return: the identifiers of the invalidated Supvisors instances and the processes in failure.
"""
failed_processes: Set[ProcessStatus] = set()
# invalid silent Supvisors instances
status = self.instances[identifier]
self.invalid(status)
# for processes that were running on node, invalidate node in process
failed_processes.update({process for process in status.running_processes()
if process.invalidate_identifier(status.identifier)})
# trigger the corresponding Supvisors events
self.publish_process_failures(failed_processes)
return failed_processes

def check_process(self, instance_status: SupvisorsInstanceStatus,
event: Payload, check_source=True) -> Optional[Tuple[ApplicationStatus, ProcessStatus]]:
Expand Down
6 changes: 3 additions & 3 deletions supvisors/internal_com/multicast.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from supervisor.loggers import Logger

from supvisors.ttypes import Ipv4Address, PublicationHeaders, Payload
from supvisors.ttypes import Ipv4Address, NotificationHeaders, Payload
from .internalinterface import payload_to_bytes, bytes_to_payload
from .mapper import SupvisorsInstanceId

Expand Down Expand Up @@ -58,7 +58,7 @@ def send_discovery_event(self, payload: Payload):
""" Multicast the discovery event.
It is not necessary to add a message size. """
self.logger.info('MulticastSender.emit_message')
message = payload_to_bytes((PublicationHeaders.DISCOVERY.value, (self.identifier, payload)))
message = payload_to_bytes((NotificationHeaders.DISCOVERY.value, (self.identifier, payload)))
try:
self.socket.sendto(message, self.mc_group)
except OSError:
Expand Down Expand Up @@ -171,7 +171,7 @@ def __init__(self, supvisors: Any) -> None:
local_instance: SupvisorsInstanceId = supvisors.mapper.local_instance
self.mc_sender: MulticastSender = MulticastSender(local_instance.identifier, mc_group, mc_ttl, supvisors.logger)
# create the Multicast message receiver
callback = supvisors.rpc_handler.post_discovery
callback = supvisors.rpc_handler.push_notification
self.mc_receiver = MulticastReceiver(mc_group, mc_interface, callback, supvisors.logger)
self.mc_receiver.start()

Expand Down
26 changes: 13 additions & 13 deletions supvisors/internal_com/rpchandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,32 +48,32 @@ def stop(self):
self.proxy_server.stop()

# Discovery events
def post_discovery(self, discovery_event) -> None:
def push_notification(self, discovery_event) -> None:
""" Post the discovery event to the Supervisor proxy server.
:param discovery_event: the Supvisors discovery event received on the Multicast Group.
:return: None.
"""
self.proxy_server.post_discovery(discovery_event)
self.proxy_server.push_notification(discovery_event)

# Deferred XML-RPC requests
def post_request(self, identifier: str, request_type: RequestHeaders, request_body: Optional[Tuple] = None) -> None:
def push_request(self, identifier: str, request_type: RequestHeaders, request_body: Optional[Tuple] = None) -> None:
""" Push the request to the Supervisor proxy server.
:param identifier: the target identifier of the request.
:param request_type: the type of the request to send.
:param request_body: the request_body.
:return: None.
"""
self.proxy_server.post_request(identifier, (request_type.value, request_body))
self.proxy_server.push_request(identifier, (request_type.value, request_body))

def send_check_instance(self, identifier: str) -> None:
""" Send request to check authorization to deal with the Supvisors instance.
:param identifier: the identifier of the Supvisors instance to check.
:return: None.
"""
self.post_request(identifier, RequestHeaders.CHECK_INSTANCE)
self.push_request(identifier, RequestHeaders.CHECK_INSTANCE)

def send_start_process(self, identifier: str, namespec: str, extra_args: str) -> None:
""" Send request to start process.
Expand All @@ -83,7 +83,7 @@ def send_start_process(self, identifier: str, namespec: str, extra_args: str) ->
:param extra_args: the additional arguments to be passed to the command line.
:return: None.
"""
self.post_request(identifier, RequestHeaders.START_PROCESS, (namespec, extra_args))
self.push_request(identifier, RequestHeaders.START_PROCESS, (namespec, extra_args))

def send_stop_process(self, identifier: str, namespec: str) -> None:
""" Send request to stop process.
Expand All @@ -92,47 +92,47 @@ def send_stop_process(self, identifier: str, namespec: str) -> None:
:param namespec: the process namespec.
:return: None.
"""
self.post_request(identifier, RequestHeaders.STOP_PROCESS, (namespec,))
self.push_request(identifier, RequestHeaders.STOP_PROCESS, (namespec,))

def send_restart(self, identifier: str):
""" Send request to restart a Supervisor.
:param identifier: the identifier of the Supvisors instance where Supvisors has to be restarted.
:return: None.
"""
self.post_request(identifier, RequestHeaders.RESTART)
self.push_request(identifier, RequestHeaders.RESTART)

def send_shutdown(self, identifier: str):
""" Send request to shut down a Supervisor.
:param identifier: the identifier of the Supvisors instance where Supvisors has to be shut down.
:return: None.
"""
self.post_request(identifier, RequestHeaders.SHUTDOWN)
self.push_request(identifier, RequestHeaders.SHUTDOWN)

def send_restart_sequence(self, identifier: str):
""" Send request to trigger the DEPLOYMENT phase.
:param identifier: the Master Supvisors instance.
:return: None.
"""
self.post_request(identifier, RequestHeaders.RESTART_SEQUENCE)
self.push_request(identifier, RequestHeaders.RESTART_SEQUENCE)

def send_restart_all(self, identifier: str):
""" Send request to restart Supvisors.
:param identifier: the Master Supvisors instance.
:return: None.
"""
self.post_request(identifier, RequestHeaders.RESTART_ALL)
self.push_request(identifier, RequestHeaders.RESTART_ALL)

def send_shutdown_all(self, identifier: str):
""" Send request to shut down Supvisors.
:param identifier: the Master Supvisors instance.
:return: None.
"""
self.post_request(identifier, RequestHeaders.SHUTDOWN_ALL)
self.push_request(identifier, RequestHeaders.SHUTDOWN_ALL)

# Publications
def push_publication(self, publication_type: PublicationHeaders, publication_body: Payload) -> None:
Expand All @@ -142,7 +142,7 @@ def push_publication(self, publication_type: PublicationHeaders, publication_bod
:param publication_body: the data to publish.
:return: None.
"""
self.proxy_server.publish((publication_type.value, publication_body))
self.proxy_server.push_publication((publication_type.value, publication_body))

def send_tick_event(self, payload: Payload) -> None:
""" Send the tick event.
Expand Down
Loading

0 comments on commit c0fcc7a

Please sign in to comment.