diff --git a/code/zato-cli/src/zato/cli/enmasse.py b/code/zato-cli/src/zato/cli/enmasse.py index 39b107a74b..6612ad1749 100644 --- a/code/zato-cli/src/zato/cli/enmasse.py +++ b/code/zato-cli/src/zato/cli/enmasse.py @@ -11,6 +11,9 @@ from collections import namedtuple, OrderedDict from copy import deepcopy from dataclasses import dataclass +from datetime import datetime, timedelta +from itertools import chain +from time import sleep # Zato from zato.cli import ManageCommand @@ -24,7 +27,8 @@ from logging import Logger from zato.client import APIClient - from zato.common.typing_ import any_, anylist, dictlist, list_, strdict, strstrdict, strlist, strlistdict, strnone + from zato.common.typing_ import any_, anylist, dictlist, list_, stranydict, strdict, strstrdict, strlist, \ + strlistdict, strnone APIClient = APIClient Logger = Logger @@ -91,6 +95,9 @@ class Include_Type: # How to sort attributes of a given object Enmasse_Attr_List_Sort_Order = cast_('strlistdict', None) + # How many seconds to wait for missing objects + Missing_Wait_Time = 120 + # ################################################################################################################################ ModuleCtx.Enmasse_Type = { @@ -1921,6 +1928,7 @@ class Enmasse(ManageCommand): {'name':'--replace', 'help':'Force replacing already server objects during import', 'action':'store_true'}, {'name':'--replace-odb-objects', 'help':'Same as --replace', 'action':'store_true'}, {'name':'--input', 'help':'Path to input file with objects to import'}, + {'name':'--missing-wait-time', 'help':'How many seconds to wait for missing objects', 'default':ModuleCtx.Missing_Wait_Time}, {'name':'--env-file', 'help':'Path to an .ini file with environment variables'}, {'name':'--rbac-sleep', 'help':'How many seconds to sleep for after creating an RBAC object', 'default':'1'}, {'name':'--cols-width', 'help':'A list of columns width to use for the table output, default: {}'.format(DEFAULT_COLS_WIDTH), 'action':'store_true'}, @@ -1932,102 +1940,6 @@ class Enmasse(ManageCommand): 'yml': YamlCodec, } -# ################################################################################################################################ - - def load_input(self, input_path): - - # stdlib - import sys - - _, _, ext = self.args.input.rpartition('.') - codec_class = self.CODEC_BY_EXTENSION.get(ext.lower()) - if codec_class is None: - exts = ', '.join(sorted(self.CODEC_BY_EXTENSION)) - self.logger.error('Unrecognized file extension "{}": must be one of {}'.format(ext.lower(), exts)) - sys.exit(self.SYS_ERROR.INVALID_INPUT) - - parser = InputParser(input_path, self.logger, codec_class()) - results = parser.parse() - if not results.ok: - self.logger.error('JSON parsing failed') - _ = self.report_warnings_errors([results]) - sys.exit(self.SYS_ERROR.INVALID_INPUT) - self.json = parser.json - -# ################################################################################################################################ - - def normalize_path( - self, - arg_name, # type: str - exit_if_missing, # type: bool - *, - needs_parent_dir=False, # type: bool - log_if_missing=False, # type: bool - ) -> 'str': - - # Local aliases - path_to_check:'str' = '' - arg_param = getattr(self.args, arg_name, None) or '' - - # Potentially, expand the path to our home directory .. - arg_name = os.path.expanduser(arg_param) - - # Turn the name into a full path unless it already is one .. - if os.path.isabs(arg_name): - arg_path = arg_name - else: - arg_path = os.path.join(self.curdir, arg_param) - arg_path = os.path.abspath(arg_path) - - # .. we need for a directory to exist .. - if needs_parent_dir: - path_to_check = os.path.join(arg_path, '..') - path_to_check = os.path.abspath(path_to_check) - - # .. or for the actual file to exist .. - else: - path_to_check = arg_path - - # .. make sure that it does exist .. - if not os.path.exists(path_to_check): - - # .. optionally, exit the process if it does not .. - if exit_if_missing: - - if log_if_missing: - self.logger.info(f'Path not found: `{path_to_check}`') - - # Zato - import sys - sys.exit() - - # .. if we are here, it means that we have a valid, absolute path to return .. - return arg_path - -# ################################################################################################################################ - - def _extract_include(self, include_type:'str') -> 'strlist': # type: ignore - - # Local aliases - out:'strlist' = [] - - # Turn the string into a list of items that we will process .. - include_type:'strlist' = include_type.split(',') - include_type = [item.strip().lower() for item in include_type] - - # .. ignore explicit types if all types are to be returned .. - if ModuleCtx.Include_Type.All in include_type: - include_type = [ModuleCtx.Include_Type.All] - else: - out[:] = include_type - - # .. if we do not have anything, it means that we are including all types .. - if not out: - out = [ModuleCtx.Include_Type.All] - - # .. now, we are ready to return our response. - return out - # ################################################################################################################################ def _on_server(self, args:'any_') -> 'None': @@ -2056,6 +1968,9 @@ def _on_server(self, args:'any_') -> 'None': self.json = {} has_import = getattr(args, 'import') + # For type hints + self.missing_wait_time:'int' = getattr(self.args, 'missing_wait_time', None) or ModuleCtx.Missing_Wait_Time + # Initialize environment variables .. env_path = self.normalize_path('env_file', False) populate_environment_from_file(env_path) @@ -2159,7 +2074,104 @@ def _on_server(self, args:'any_') -> 'None': # 4) a/b elif has_import: - _ = self.report_warnings_errors(self.run_import()) + warnings_errors_list = self.run_import() + _ = self.report_warnings_errors(warnings_errors_list) + +# ################################################################################################################################ + + def load_input(self, input_path): + + # stdlib + import sys + + _, _, ext = self.args.input.rpartition('.') + codec_class = self.CODEC_BY_EXTENSION.get(ext.lower()) + if codec_class is None: + exts = ', '.join(sorted(self.CODEC_BY_EXTENSION)) + self.logger.error('Unrecognized file extension "{}": must be one of {}'.format(ext.lower(), exts)) + sys.exit(self.SYS_ERROR.INVALID_INPUT) + + parser = InputParser(input_path, self.logger, codec_class()) + results = parser.parse() + if not results.ok: + self.logger.error('JSON parsing failed') + _ = self.report_warnings_errors([results]) + sys.exit(self.SYS_ERROR.INVALID_INPUT) + self.json = parser.json + +# ################################################################################################################################ + + def normalize_path( + self, + arg_name, # type: str + exit_if_missing, # type: bool + *, + needs_parent_dir=False, # type: bool + log_if_missing=False, # type: bool + ) -> 'str': + + # Local aliases + path_to_check:'str' = '' + arg_param = getattr(self.args, arg_name, None) or '' + + # Potentially, expand the path to our home directory .. + arg_name = os.path.expanduser(arg_param) + + # Turn the name into a full path unless it already is one .. + if os.path.isabs(arg_name): + arg_path = arg_name + else: + arg_path = os.path.join(self.curdir, arg_param) + arg_path = os.path.abspath(arg_path) + + # .. we need for a directory to exist .. + if needs_parent_dir: + path_to_check = os.path.join(arg_path, '..') + path_to_check = os.path.abspath(path_to_check) + + # .. or for the actual file to exist .. + else: + path_to_check = arg_path + + # .. make sure that it does exist .. + if not os.path.exists(path_to_check): + + # .. optionally, exit the process if it does not .. + if exit_if_missing: + + if log_if_missing: + self.logger.info(f'Path not found: `{path_to_check}`') + + # Zato + import sys + sys.exit() + + # .. if we are here, it means that we have a valid, absolute path to return .. + return arg_path + +# ################################################################################################################################ + + def _extract_include(self, include_type:'str') -> 'strlist': # type: ignore + + # Local aliases + out:'strlist' = [] + + # Turn the string into a list of items that we will process .. + include_type:'strlist' = include_type.split(',') + include_type = [item.strip().lower() for item in include_type] + + # .. ignore explicit types if all types are to be returned .. + if ModuleCtx.Include_Type.All in include_type: + include_type = [ModuleCtx.Include_Type.All] + else: + out[:] = include_type + + # .. if we do not have anything, it means that we are including all types .. + if not out: + out = [ModuleCtx.Include_Type.All] + + # .. now, we are ready to return our response. + return out # ################################################################################################################################ @@ -2634,10 +2646,69 @@ def export_local_odb(self, needs_local=True): def run_odb_export(self): return self.export_local_odb(False) +# ################################################################################################################################ + + def _get_missing_objects(self, warnings_errors:'list_[Results]') -> 'strlist': + + # Our response to produce + out:'strlist' = [] + + for item in warnings_errors: + + for elem in chain(item.warnings, item.errors): # type: ignore + elem = cast_('Notice', elem) + if elem.code == ERROR_SERVICE_MISSING: + enmasse_elem:'stranydict' = elem.value_raw[1] + service_name = enmasse_elem['service'] + out.append(service_name) + + # Return everything we have found to our caller, sorted alphabetically + return sorted(out) + # ################################################################################################################################ def run_import(self) -> 'anylist': + # Local variables + start_time = datetime.utcnow() + wait_until = start_time + timedelta(seconds=1) + + # Run the initial import .. + warnings_errors = self._run_import() + + while warnings_errors: + + # Loop variables + now = datetime.utcnow() + + # .. if we have already waited enough, we can already return .. + if now > wait_until: + return warnings_errors + + # .. if there is anything that we need to wait for, .. + # .. such as missing services, we will keep running .. + missing = self._get_missing_objects(warnings_errors) + + # .. for reporting purposes, get information on how much longer are to wait .. + wait_delta = wait_until - now + + # .. report what we are waiting for .. + msg = f'Enmasse waiting; timeout -> {wait_delta}; missing -> {missing}' + self.logger.info(msg) + + # .. do wait now .. + sleep(2) + + # .. re-run the import .. + warnings_errors = self._run_import() + + # .. either we run out of time or we have succeed, in either case, we can return. + return warnings_errors + +# ################################################################################################################################ + + def _run_import(self) -> 'anylist': + # Make sure we have the latest state of information .. self.object_mgr.refresh() @@ -2664,6 +2735,7 @@ def run_import(self) -> 'anylist': if __name__ == '__main__': + """ # stdlib import sys @@ -2682,11 +2754,12 @@ def run_import(self) -> 'anylist': args.output = None args.rbac_sleep = 1 - # args['replace'] = True - args['import'] = False + args['replace'] = True + args['import'] = True - args.path = sys.argv[1] + # args.path = sys.argv[1] # args.input = sys.argv[2] enmasse = Enmasse(args) enmasse.run(args) + """ diff --git a/code/zato-common/src/zato/common/exception.py b/code/zato-common/src/zato/common/exception.py index 73b5913f74..27401a6249 100644 --- a/code/zato-common/src/zato/common/exception.py +++ b/code/zato-common/src/zato/common/exception.py @@ -38,6 +38,11 @@ def __repr__(self): # ################################################################################################################################ +class ServiceMissingException(ZatoException): + pass + +# ################################################################################################################################ + class RuntimeInvocationError(ZatoException): pass diff --git a/code/zato-common/src/zato/common/util/api.py b/code/zato-common/src/zato/common/util/api.py index 90a610b95a..07d0bf3102 100644 --- a/code/zato-common/src/zato/common/util/api.py +++ b/code/zato-common/src/zato/common/util/api.py @@ -1950,8 +1950,15 @@ def slugify(value, allow_unicode=False): # ################################################################################################################################ -def wait_for_predicate(predicate_func, timeout, interval, log_msg_details=None, needs_log=True, *args, **kwargs): - # type: (object, int, float, *object, **object) -> bool +def wait_for_predicate( + predicate_func, # type: callable_ + timeout, # type: int + interval, # type: float + log_msg_details=None, # type: strnone + needs_log=True, # type: bool + *args, # type: any_ + **kwargs # type: any_ +) -> 'bool': # Try out first, perhaps it already fulfilled is_fulfilled = bool(predicate_func(*args, **kwargs)) diff --git a/code/zato-server/src/zato/server/connection/http_soap/channel.py b/code/zato-server/src/zato/server/connection/http_soap/channel.py index 1aaae12832..84ce85b33a 100644 --- a/code/zato-server/src/zato/server/connection/http_soap/channel.py +++ b/code/zato-server/src/zato/server/connection/http_soap/channel.py @@ -23,7 +23,7 @@ SSO, TRACE1, URL_PARAMS_PRIORITY, ZATO_NONE from zato.common.audit_log import DataReceived, DataSent from zato.common.const import ServiceConst -from zato.common.exception import HTTP_RESPONSES +from zato.common.exception import HTTP_RESPONSES, ServiceMissingException from zato.common.hl7 import HL7Exception from zato.common.json_internal import dumps, loads from zato.common.json_schema import DictError as JSONSchemaDictError, ValidationException as JSONSchemaValidationException @@ -486,12 +486,17 @@ def dispatch( if channel_item['data_format'] == DATA_FORMAT.JSON: wsgi_environ['zato.http.response.headers']['Content-Type'] = CONTENT_TYPE['JSON'] - _exc = stack_format(e, style='color', show_vals='like_source', truncate_vals=5000, - add_summary=True, source_lines=20) if stack_format else _format_exc # type: str + # We need a traceback unless we merely report information about a missing service, + # which may happen if enmasse runs before such a service has been deployed. + needs_traceback = not isinstance(e, ServiceMissingException) - # Log what happened - logger.info( - 'Caught an exception, cid:`%s`, status_code:`%s`, `%s`', cid, status_code, _exc) + if needs_traceback: + _exc_string = stack_format(e, style='color', show_vals='like_source', truncate_vals=5000, + add_summary=True, source_lines=20) if stack_format else _format_exc # type: str + + # Log what happened + logger.info( + 'Caught an exception, cid:`%s`, status_code:`%s`, `%s`', cid, status_code, _exc_string) try: error_wrapper = get_client_error_wrapper(channel_item['transport'], channel_item['data_format']) @@ -736,8 +741,6 @@ def handle( self.set_response_in_cache(channel_item, cache_key, response) # Having used the cache or not, we can return the response now - response - response return response # ################################################################################################################################ diff --git a/code/zato-server/src/zato/server/service/internal/channel/amqp_.py b/code/zato-server/src/zato/server/service/internal/channel/amqp_.py index 90941bd4fd..d1f0b70d81 100644 --- a/code/zato-server/src/zato/server/service/internal/channel/amqp_.py +++ b/code/zato-server/src/zato/server/service/internal/channel/amqp_.py @@ -1,19 +1,18 @@ # -*- coding: utf-8 -*- """ -Copyright (C) 2019, Zato Source s.r.o. https://zato.io +Copyright (C) 2023, Zato Source s.r.o. https://zato.io Licensed under LGPLv3, see LICENSE.txt for terms and conditions. """ -from __future__ import absolute_import, division, print_function, unicode_literals - # stdlib from contextlib import closing from traceback import format_exc # Zato from zato.common.broker_message import CHANNEL +from zato.common.exception import ServiceMissingException from zato.common.odb.model import ChannelAMQP, Cluster, ConnDefAMQP, Service from zato.common.odb.query import channel_amqp_list from zato.server.service.internal import AdminService, AdminSIO, GetListAdminSIO @@ -80,7 +79,7 @@ def handle(self): if not service: msg = 'Service `{}` does not exist in this cluster'.format(input.service) - raise Exception(msg) + raise ServiceMissingException(self.cid, msg) try: item = ChannelAMQP() diff --git a/code/zato-server/src/zato/server/service/internal/channel/jms_wmq.py b/code/zato-server/src/zato/server/service/internal/channel/jms_wmq.py index 1a77b9073c..c32cabd20a 100644 --- a/code/zato-server/src/zato/server/service/internal/channel/jms_wmq.py +++ b/code/zato-server/src/zato/server/service/internal/channel/jms_wmq.py @@ -1,13 +1,11 @@ # -*- coding: utf-8 -*- """ -Copyright (C) 2019, Zato Source s.r.o. https://zato.io +Copyright (C) 2023, Zato Source s.r.o. https://zato.io Licensed under LGPLv3, see LICENSE.txt for terms and conditions. """ -from __future__ import absolute_import, division, print_function, unicode_literals - # stdlib from base64 import b64decode from binascii import unhexlify @@ -24,6 +22,7 @@ from zato.common.api import CHANNEL from zato.common.broker_message import CHANNEL as BROKER_MSG_CHANNEL from zato.common.ccsid_ import CCSIDConfig +from zato.common.exception import ServiceMissingException from zato.common.json_internal import loads from zato.common.odb.model import ChannelWMQ, Cluster, ConnDefWMQ, Service as ModelService from zato.common.odb.query import channel_wmq_list @@ -88,8 +87,8 @@ def handle(self): if not service: msg = 'Service `{}` does not exist in this cluster'.format(input.service) - self.logger.error(msg) - raise Exception(msg) + self.logger.info(msg) + raise ServiceMissingException(msg) try: @@ -154,6 +153,7 @@ def handle(self): if not service: msg = 'Service `{}` does not exist in this cluster'.format(input.service) + self.logger.info(msg) raise Exception(msg) try: diff --git a/code/zato-server/src/zato/server/service/internal/http_soap.py b/code/zato-server/src/zato/server/service/internal/http_soap.py index 6559011bdf..c9cac1e07e 100644 --- a/code/zato-server/src/zato/server/service/internal/http_soap.py +++ b/code/zato-server/src/zato/server/service/internal/http_soap.py @@ -18,7 +18,7 @@ HL7, HTTP_SOAP_SERIALIZATION_TYPE, MISC, PARAMS_PRIORITY, SEC_DEF_TYPE, URL_PARAMS_PRIORITY, URL_TYPE, \ ZATO_DEFAULT, ZATO_NONE, ZATO_SEC_USE_RBAC from zato.common.broker_message import CHANNEL, OUTGOING -from zato.common.exception import ZatoException +from zato.common.exception import ServiceMissingException, ZatoException from zato.common.json_internal import dumps from zato.common.odb.model import Cluster, HTTPSOAP, SecurityBase, Service, TLSCACert, to_json from zato.common.odb.query import cache_by_id, http_soap, http_soap_list @@ -357,8 +357,8 @@ def handle(self): if input.connection == CONNECTION.CHANNEL and not service: msg = 'Service `{}` does not exist in this cluster'.format(input.service) - self.logger.error(msg) - raise Exception(msg) + self.logger.info(msg) + raise ServiceMissingException(msg) # Will raise exception if the security type doesn't match connection # type and transport @@ -562,8 +562,8 @@ def handle(self): if input.connection == CONNECTION.CHANNEL and not service: msg = 'Service `{}` does not exist in this cluster'.format(input.service) - self.logger.error(msg) - raise Exception(msg) + self.logger.info(msg) + raise ServiceMissingException(msg) # Will raise exception if the security type doesn't match connection # type and transport diff --git a/code/zato-server/src/zato/server/service/internal/scheduler.py b/code/zato-server/src/zato/server/service/internal/scheduler.py index b07299cc7b..06b871e5ee 100644 --- a/code/zato-server/src/zato/server/service/internal/scheduler.py +++ b/code/zato-server/src/zato/server/service/internal/scheduler.py @@ -1,13 +1,11 @@ # -*- coding: utf-8 -*- """ -Copyright (C) Zato Source s.r.o. https://zato.io +Copyright (C) 2023, Zato Source s.r.o. https://zato.io Licensed under LGPLv3, see LICENSE.txt for terms and conditions. """ -from __future__ import absolute_import, division, print_function, unicode_literals - # stdlib from contextlib import closing from traceback import format_exc @@ -24,7 +22,7 @@ # Zato from zato.common.api import scheduler_date_time_format, SCHEDULER, ZATO_NONE from zato.common.broker_message import MESSAGE_TYPE, SCHEDULER as SCHEDULER_MSG -from zato.common.exception import ZatoException +from zato.common.exception import ServiceMissingException, ZatoException from zato.common.odb.model import Cluster, Job, CronStyleJob, IntervalBasedJob,\ Service from zato.common.odb.query import job_by_id, job_by_name, job_list @@ -81,8 +79,8 @@ def _create_edit(action, cid, input, payload, logger, session, broker_client, re if not service: msg = 'Service `{}` does not exist in this cluster'.format(service_name) - logger.error(msg) - raise ZatoException(cid, msg) + logger.info(msg) + raise ServiceMissingException(cid, msg) # We can create/edit a base Job object now and - optionally - another one # if the job type's is either interval-based or Cron-style. The base diff --git a/code/zato-web-admin/src/zato/admin/settings.py b/code/zato-web-admin/src/zato/admin/settings.py index d05deedbea..566000e74d 100644 --- a/code/zato-web-admin/src/zato/admin/settings.py +++ b/code/zato-web-admin/src/zato/admin/settings.py @@ -10,7 +10,6 @@ import logging import logging.config import os -import sys from uuid import uuid4 # Zato