diff --git a/jina/__init__.py b/jina/__init__.py index 275e99b92465b..523a24abfd3e1 100644 --- a/jina/__init__.py +++ b/jina/__init__.py @@ -14,7 +14,7 @@ import warnings as _warnings import faulthandler -faulthandler.enable(all_threads=True) +faulthandler.enable(file=_sys.stdout, all_threads=True) import docarray as _docarray diff --git a/jina/orchestrate/deployments/__init__.py b/jina/orchestrate/deployments/__init__.py index d22ec03c20b03..3e653808c0154 100644 --- a/jina/orchestrate/deployments/__init__.py +++ b/jina/orchestrate/deployments/__init__.py @@ -66,12 +66,17 @@ class DeploymentType(type(ExitStack), type(JAMLCompatible)): def _call_add_voters(leader, voters, replica_ids, name, event_signal=None): # this method needs to be run in multiprocess, importing jraft in main process # makes it impossible to do tests sequentially + import faulthandler + + faulthandler.enable(all_threads=True) + import signal + def _cancel(signum, frame): print(f'\n\n\n\n\n HEY HERE SEGFAULT {signum}, frame {frame}') signal.signal(signal.SIGSEGV, _cancel) - + import jraft logger = JinaLogger(context=f'add_voter-{name}', name=f'add_voter-{name}') @@ -115,11 +120,11 @@ class Deployment(JAMLCompatible, PostMixin, BaseOrchestrator, metaclass=Deployme class _ReplicaSet: def __init__( - self, - deployment_args: Namespace, - args: List[Namespace], - head_pod, - name, + self, + deployment_args: Namespace, + args: List[Namespace], + head_pod, + name, ): self.deployment_args = copy.copy(deployment_args) self.args = args @@ -251,76 +256,76 @@ def __exit__(self, exc_type, exc_val, exc_tb): # overload_inject_start_deployment @overload def __init__( - self, - *, - allow_concurrent: Optional[bool] = False, - compression: Optional[str] = None, - connection_list: Optional[str] = None, - cors: Optional[bool] = False, - description: Optional[str] = None, - disable_auto_volume: Optional[bool] = False, - docker_kwargs: Optional[dict] = None, - entrypoint: Optional[str] = None, - env: Optional[dict] = None, - env_from_secret: Optional[dict] = None, - exit_on_exceptions: Optional[List[str]] = [], - external: Optional[bool] = False, - floating: Optional[bool] = False, - force_update: Optional[bool] = False, - gpus: Optional[str] = None, - grpc_channel_options: Optional[dict] = None, - grpc_metadata: Optional[dict] = None, - grpc_server_options: Optional[dict] = None, - host: Optional[List[str]] = ['0.0.0.0'], - install_requirements: Optional[bool] = False, - log_config: Optional[str] = None, - metrics: Optional[bool] = False, - metrics_exporter_host: Optional[str] = None, - metrics_exporter_port: Optional[int] = None, - monitoring: Optional[bool] = False, - name: Optional[str] = 'executor', - native: Optional[bool] = False, - no_reduce: Optional[bool] = False, - output_array_type: Optional[str] = None, - polling: Optional[str] = 'ANY', - port: Optional[int] = None, - port_monitoring: Optional[int] = None, - prefer_platform: Optional[str] = None, - protocol: Optional[Union[str, List[str]]] = ['GRPC'], - py_modules: Optional[List[str]] = None, - quiet: Optional[bool] = False, - quiet_error: Optional[bool] = False, - raft_configuration: Optional[dict] = None, - reload: Optional[bool] = False, - replicas: Optional[int] = 1, - retries: Optional[int] = -1, - runtime_cls: Optional[str] = 'WorkerRuntime', - shards: Optional[int] = 1, - ssl_certfile: Optional[str] = None, - ssl_keyfile: Optional[str] = None, - stateful: Optional[bool] = False, - timeout_ctrl: Optional[int] = 60, - timeout_ready: Optional[int] = 600000, - timeout_send: Optional[int] = None, - title: Optional[str] = None, - tls: Optional[bool] = False, - traces_exporter_host: Optional[str] = None, - traces_exporter_port: Optional[int] = None, - tracing: Optional[bool] = False, - uses: Optional[Union[str, Type['BaseExecutor'], dict]] = 'BaseExecutor', - uses_after: Optional[Union[str, Type['BaseExecutor'], dict]] = None, - uses_after_address: Optional[str] = None, - uses_before: Optional[Union[str, Type['BaseExecutor'], dict]] = None, - uses_before_address: Optional[str] = None, - uses_dynamic_batching: Optional[dict] = None, - uses_metas: Optional[dict] = None, - uses_requests: Optional[dict] = None, - uses_with: Optional[dict] = None, - uvicorn_kwargs: Optional[dict] = None, - volumes: Optional[List[str]] = None, - when: Optional[dict] = None, - workspace: Optional[str] = None, - **kwargs, + self, + *, + allow_concurrent: Optional[bool] = False, + compression: Optional[str] = None, + connection_list: Optional[str] = None, + cors: Optional[bool] = False, + description: Optional[str] = None, + disable_auto_volume: Optional[bool] = False, + docker_kwargs: Optional[dict] = None, + entrypoint: Optional[str] = None, + env: Optional[dict] = None, + env_from_secret: Optional[dict] = None, + exit_on_exceptions: Optional[List[str]] = [], + external: Optional[bool] = False, + floating: Optional[bool] = False, + force_update: Optional[bool] = False, + gpus: Optional[str] = None, + grpc_channel_options: Optional[dict] = None, + grpc_metadata: Optional[dict] = None, + grpc_server_options: Optional[dict] = None, + host: Optional[List[str]] = ['0.0.0.0'], + install_requirements: Optional[bool] = False, + log_config: Optional[str] = None, + metrics: Optional[bool] = False, + metrics_exporter_host: Optional[str] = None, + metrics_exporter_port: Optional[int] = None, + monitoring: Optional[bool] = False, + name: Optional[str] = 'executor', + native: Optional[bool] = False, + no_reduce: Optional[bool] = False, + output_array_type: Optional[str] = None, + polling: Optional[str] = 'ANY', + port: Optional[int] = None, + port_monitoring: Optional[int] = None, + prefer_platform: Optional[str] = None, + protocol: Optional[Union[str, List[str]]] = ['GRPC'], + py_modules: Optional[List[str]] = None, + quiet: Optional[bool] = False, + quiet_error: Optional[bool] = False, + raft_configuration: Optional[dict] = None, + reload: Optional[bool] = False, + replicas: Optional[int] = 1, + retries: Optional[int] = -1, + runtime_cls: Optional[str] = 'WorkerRuntime', + shards: Optional[int] = 1, + ssl_certfile: Optional[str] = None, + ssl_keyfile: Optional[str] = None, + stateful: Optional[bool] = False, + timeout_ctrl: Optional[int] = 60, + timeout_ready: Optional[int] = 600000, + timeout_send: Optional[int] = None, + title: Optional[str] = None, + tls: Optional[bool] = False, + traces_exporter_host: Optional[str] = None, + traces_exporter_port: Optional[int] = None, + tracing: Optional[bool] = False, + uses: Optional[Union[str, Type['BaseExecutor'], dict]] = 'BaseExecutor', + uses_after: Optional[Union[str, Type['BaseExecutor'], dict]] = None, + uses_after_address: Optional[str] = None, + uses_before: Optional[Union[str, Type['BaseExecutor'], dict]] = None, + uses_before_address: Optional[str] = None, + uses_dynamic_batching: Optional[dict] = None, + uses_metas: Optional[dict] = None, + uses_requests: Optional[dict] = None, + uses_with: Optional[dict] = None, + uvicorn_kwargs: Optional[dict] = None, + volumes: Optional[List[str]] = None, + when: Optional[dict] = None, + workspace: Optional[str] = None, + **kwargs, ): """Create a Deployment to serve or deploy and Executor or Gateway @@ -451,11 +456,11 @@ def __init__( # overload_inject_end_deployment def __init__( - self, - args: Union['Namespace', Dict, None] = None, - needs: Optional[Set[str]] = None, - include_gateway: bool = True, - **kwargs, + self, + args: Union['Namespace', Dict, None] = None, + needs: Optional[Set[str]] = None, + include_gateway: bool = True, + **kwargs, ): super().__init__() self._gateway_kwargs = {} @@ -489,9 +494,9 @@ def __init__( self.args.polling = PollingType.ANY if ( - getattr(args, 'shards', 1) > 1 - and ProtocolType.HTTP in self.args.protocol - and self.args.deployment_role != DeploymentRoleType.GATEWAY + getattr(args, 'shards', 1) > 1 + and ProtocolType.HTTP in self.args.protocol + and self.args.deployment_role != DeploymentRoleType.GATEWAY ): raise RuntimeError( f'It is not supported to have {ProtocolType.HTTP.to_string()} deployment for ' @@ -499,8 +504,8 @@ def __init__( ) if ( - ProtocolType.WEBSOCKET in self.args.protocol - and self.args.deployment_role != DeploymentRoleType.GATEWAY + ProtocolType.WEBSOCKET in self.args.protocol + and self.args.deployment_role != DeploymentRoleType.GATEWAY ): raise RuntimeError( f'It is not supported to have {ProtocolType.WEBSOCKET.to_string()} deployment for ' @@ -518,15 +523,15 @@ def __init__( f'Stateful feature when running on MacOS requires Python3.8 or newer version' ) if self.args.stateful and ( - ProtocolType.WEBSOCKET in self.args.protocol - or ProtocolType.HTTP in self.args.protocol - or len(self.args.protocol) > 1 + ProtocolType.WEBSOCKET in self.args.protocol + or ProtocolType.HTTP in self.args.protocol + or len(self.args.protocol) > 1 ): raise RuntimeError( f'Stateful feature is only available for Deployments using a single {ProtocolType.GRPC.to_string()} protocol. {self.args.protocol} were requested' ) self.needs = ( - needs or set() + needs or set() ) #: used in the :class:`jina.flow.Flow` to build the graph # parse addresses for distributed replicas @@ -541,7 +546,7 @@ def __init__( self._parse_addresses_into_host_and_port() if len(self.ext_repl_ports) > 1: if self.args.replicas != 1 and self.args.replicas != len( - self.ext_repl_ports + self.ext_repl_ports ): raise ValueError( f'Number of hosts ({len(self.args.host)}) does not match the number of replicas ({self.args.replicas})' @@ -600,7 +605,7 @@ def _get_connection_list_for_flow(self) -> List[str]: ] def _get_connection_list_for_single_executor( - self, + self, ) -> Union[List[str], Dict[str, List[str]]]: if self.head_args: # add head information @@ -610,7 +615,7 @@ def _get_connection_list_for_single_executor( ports_dict = defaultdict(list) for replica_pod_arg in self.pod_args['pods'][0]: for protocol, port in zip( - replica_pod_arg.protocol, replica_pod_arg.port + replica_pod_arg.protocol, replica_pod_arg.port ): ports_dict[str(protocol)].append(port) @@ -654,7 +659,7 @@ def _parse_addresses_into_host_and_port(self): for i, repl_host in enumerate(self.ext_repl_hosts): _hostname, port, scheme, tls = parse_host_scheme(repl_host) if ( - _hostname != self.ext_repl_hosts[i] + _hostname != self.ext_repl_hosts[i] ): # more than just hostname was passed to `host` self.ext_repl_hosts[i] = _hostname self.ext_repl_ports[i] = port @@ -667,13 +672,13 @@ def _parse_external_replica_hosts_and_ports(self): ext_repl_hosts: List = self.args.host.copy() if len(ext_repl_hosts) < len(ext_repl_ports): if ( - len(ext_repl_hosts) == 1 + len(ext_repl_hosts) == 1 ): # only one host given, assume replicas are on the same host ext_repl_hosts = ext_repl_hosts * len(ext_repl_ports) self.args.host = self.args.host * len(ext_repl_ports) elif len(ext_repl_hosts) > len(ext_repl_ports): if ( - len(ext_repl_ports) == 1 + len(ext_repl_ports) == 1 ): # only one port given, assume replicas are on the same port ext_repl_ports = ext_repl_ports * len(ext_repl_hosts) self.args.port = self.args.port * len(ext_repl_hosts) @@ -959,7 +964,7 @@ def hosts(self) -> List[str]: return [replica.host for replica in self.pod_args['pods'][0]] def _parse_args( - self, args: Namespace + self, args: Namespace ) -> Dict[str, Optional[Union[List[Namespace], Namespace]]]: return self._parse_base_deployment_args(args) @@ -1024,10 +1029,10 @@ def all_args(self) -> List[Namespace]: .. # noqa: DAR201 """ all_args = ( - ([self.pod_args['uses_before']] if self.pod_args['uses_before'] else []) - + ([self.pod_args['uses_after']] if self.pod_args['uses_after'] else []) - + ([self.pod_args['head']] if self.pod_args['head'] else []) - + ([self.pod_args['gateway']] if self._include_gateway else []) + ([self.pod_args['uses_before']] if self.pod_args['uses_before'] else []) + + ([self.pod_args['uses_after']] if self.pod_args['uses_after'] else []) + + ([self.pod_args['head']] if self.pod_args['head'] else []) + + ([self.pod_args['gateway']] if self._include_gateway else []) ) for shard_id in self.pod_args['pods']: all_args += self.pod_args['pods'][shard_id] @@ -1072,7 +1077,7 @@ def get_worker_host(pod_args, pod_is_container, head_is_container): worker_host = ( __docker_host__ if (pod_is_container and (head_is_container or in_docker())) - and host_is_local(pod_args.host) + and host_is_local(pod_args.host) else pod_args.host ) return worker_host @@ -1322,10 +1327,10 @@ def _roundrobin_cuda_device(device_str: Optional[str], replicas: int): :return: a map from replica ID to device ID """ if ( - device_str - and isinstance(device_str, str) - and device_str.startswith('RR') - and replicas >= 1 + device_str + and isinstance(device_str, str) + and device_str.startswith('RR') + and replicas >= 1 ): try: num_devices = str(subprocess.check_output(['nvidia-smi', '-L'])).count( @@ -1449,12 +1454,12 @@ def _set_pod_args(self) -> Dict[int, List[Namespace]]: _args.port[0] = peer_port elif shards > 1: port_monitoring_index = ( - replica_id + replicas * shard_id + 1 + replica_id + replicas * shard_id + 1 ) # the first index is for the head _args.port_monitoring = ( random_port() if port_monitoring_index - >= len(self.args.all_port_monitoring) + >= len(self.args.all_port_monitoring) else self.args.all_port_monitoring[ port_monitoring_index ] # we skip the head port here @@ -1530,8 +1535,8 @@ def _parse_base_deployment_args(self, args): # also there a no heads created, if there are no shards if self.role != DeploymentRoleType.GATEWAY and getattr(args, 'shards', 1) > 1: if ( - getattr(args, 'uses_before', None) - and args.uses_before != __default_executor__ + getattr(args, 'uses_before', None) + and args.uses_before != __default_executor__ ): uses_before_args = self._set_uses_before_after_args( args, entity_type='uses_before' @@ -1541,8 +1546,8 @@ def _parse_base_deployment_args(self, args): f'{uses_before_args.host}:{uses_before_args.port[0]}' ) if ( - getattr(args, 'uses_after', None) - and args.uses_after != __default_executor__ + getattr(args, 'uses_after', None) + and args.uses_after != __default_executor__ ): uses_after_args = self._set_uses_before_after_args( args, entity_type='uses_after' @@ -1658,8 +1663,8 @@ def _mermaid_str(self) -> List[str]: return mermaid_graph def block( - self, - stop_event: Optional[Union['threading.Event', 'multiprocessing.Event']] = None, + self, + stop_event: Optional[Union['threading.Event', 'multiprocessing.Event']] = None, ): """Block the Deployment until `stop_event` is set or user hits KeyboardInterrupt @@ -1684,8 +1689,8 @@ def _reload_deployment(changed_file): if watch_changes and self._is_executor_from_yaml: with ImportExtensions( - required=True, - help_text='''reload requires watchfiles dependency to be installed. You can run `pip install + required=True, + help_text='''reload requires watchfiles dependency to be installed. You can run `pip install watchfiles''', ): from watchfiles import watch @@ -1848,9 +1853,9 @@ def _inner_gateway_to_docker_compose_config(self): return docker_compose_deployment.to_docker_compose_config() def to_docker_compose_yaml( - self, - output_path: Optional[str] = None, - network_name: Optional[str] = None, + self, + output_path: Optional[str] = None, + network_name: Optional[str] = None, ): """ Converts a Jina Deployment into a Docker compose YAML file @@ -1900,11 +1905,11 @@ def to_docker_compose_yaml( ) def _to_kubernetes_yaml( - self, - output_base_path: str, - k8s_namespace: Optional[str] = None, - k8s_deployments_addresses: Optional[Dict] = None, - k8s_port: Optional[int] = GrpcConnectionPool.K8S_PORT, + self, + output_base_path: str, + k8s_namespace: Optional[str] = None, + k8s_deployments_addresses: Optional[Dict] = None, + k8s_port: Optional[int] = GrpcConnectionPool.K8S_PORT, ): import yaml @@ -1947,9 +1952,9 @@ def _to_kubernetes_yaml( fp.write('---\n') def to_kubernetes_yaml( - self, - output_base_path: str, - k8s_namespace: Optional[str] = None, + self, + output_base_path: str, + k8s_namespace: Optional[str] = None, ): """ Convert a Jina Deployment into a set of YAML deployments to deploy in Kubernetes. diff --git a/jina/serve/executors/run.py b/jina/serve/executors/run.py index 2fdda0ae162f0..a48db7fcbe613 100644 --- a/jina/serve/executors/run.py +++ b/jina/serve/executors/run.py @@ -25,6 +25,10 @@ def run_raft( :param is_ready: concurrency event to communicate Executor runtime is ready to receive messages """ + print(f'RUN RAFT') + logger = JinaLogger('run_raft', **vars(args)) + logger.debug(f'Start process to run RAFT node') + import jraft def pascal_case_dict(d): @@ -48,7 +52,6 @@ def pascal_case_dict(d): executor_target = f'{args.host}:{port + RAFT_TO_EXECUTOR_PORT}' # if the Executor was already persisted, retrieve its port and host configuration - logger = JinaLogger('run_raft', **vars(args)) persisted_address = jraft.get_configuration(raft_id, raft_dir) if persisted_address: logger.debug(f'Configuration found on the node: Address {persisted_address}')