Skip to content

Commit

Permalink
Merge branch 'main' into support/3.2
Browse files Browse the repository at this point in the history
  • Loading branch information
dsuch committed Oct 21, 2023
2 parents c4ef563 + 30d9094 commit feab81b
Show file tree
Hide file tree
Showing 9 changed files with 216 additions and 132 deletions.
275 changes: 174 additions & 101 deletions code/zato-cli/src/zato/cli/enmasse.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from collections import namedtuple, OrderedDict
from copy import deepcopy
from dataclasses import dataclass
from datetime import datetime, timedelta
from itertools import chain
from time import sleep

# Zato
from zato.cli import ManageCommand
Expand All @@ -24,7 +27,8 @@

from logging import Logger
from zato.client import APIClient
from zato.common.typing_ import any_, anylist, dictlist, list_, strdict, strstrdict, strlist, strlistdict, strnone
from zato.common.typing_ import any_, anylist, dictlist, list_, stranydict, strdict, strstrdict, strlist, \
strlistdict, strnone

APIClient = APIClient
Logger = Logger
Expand Down Expand Up @@ -91,6 +95,9 @@ class Include_Type:
# How to sort attributes of a given object
Enmasse_Attr_List_Sort_Order = cast_('strlistdict', None)

# How many seconds to wait for missing objects
Missing_Wait_Time = 120

# ################################################################################################################################

ModuleCtx.Enmasse_Type = {
Expand Down Expand Up @@ -1921,6 +1928,7 @@ class Enmasse(ManageCommand):
{'name':'--replace', 'help':'Force replacing already server objects during import', 'action':'store_true'},
{'name':'--replace-odb-objects', 'help':'Same as --replace', 'action':'store_true'},
{'name':'--input', 'help':'Path to input file with objects to import'},
{'name':'--missing-wait-time', 'help':'How many seconds to wait for missing objects', 'default':ModuleCtx.Missing_Wait_Time},
{'name':'--env-file', 'help':'Path to an .ini file with environment variables'},
{'name':'--rbac-sleep', 'help':'How many seconds to sleep for after creating an RBAC object', 'default':'1'},
{'name':'--cols-width', 'help':'A list of columns width to use for the table output, default: {}'.format(DEFAULT_COLS_WIDTH), 'action':'store_true'},
Expand All @@ -1932,102 +1940,6 @@ class Enmasse(ManageCommand):
'yml': YamlCodec,
}

# ################################################################################################################################

def load_input(self, input_path):

# stdlib
import sys

_, _, ext = self.args.input.rpartition('.')
codec_class = self.CODEC_BY_EXTENSION.get(ext.lower())
if codec_class is None:
exts = ', '.join(sorted(self.CODEC_BY_EXTENSION))
self.logger.error('Unrecognized file extension "{}": must be one of {}'.format(ext.lower(), exts))
sys.exit(self.SYS_ERROR.INVALID_INPUT)

parser = InputParser(input_path, self.logger, codec_class())
results = parser.parse()
if not results.ok:
self.logger.error('JSON parsing failed')
_ = self.report_warnings_errors([results])
sys.exit(self.SYS_ERROR.INVALID_INPUT)
self.json = parser.json

# ################################################################################################################################

def normalize_path(
self,
arg_name, # type: str
exit_if_missing, # type: bool
*,
needs_parent_dir=False, # type: bool
log_if_missing=False, # type: bool
) -> 'str':

# Local aliases
path_to_check:'str' = ''
arg_param = getattr(self.args, arg_name, None) or ''

# Potentially, expand the path to our home directory ..
arg_name = os.path.expanduser(arg_param)

# Turn the name into a full path unless it already is one ..
if os.path.isabs(arg_name):
arg_path = arg_name
else:
arg_path = os.path.join(self.curdir, arg_param)
arg_path = os.path.abspath(arg_path)

# .. we need for a directory to exist ..
if needs_parent_dir:
path_to_check = os.path.join(arg_path, '..')
path_to_check = os.path.abspath(path_to_check)

# .. or for the actual file to exist ..
else:
path_to_check = arg_path

# .. make sure that it does exist ..
if not os.path.exists(path_to_check):

# .. optionally, exit the process if it does not ..
if exit_if_missing:

if log_if_missing:
self.logger.info(f'Path not found: `{path_to_check}`')

# Zato
import sys
sys.exit()

# .. if we are here, it means that we have a valid, absolute path to return ..
return arg_path

# ################################################################################################################################

def _extract_include(self, include_type:'str') -> 'strlist': # type: ignore

# Local aliases
out:'strlist' = []

# Turn the string into a list of items that we will process ..
include_type:'strlist' = include_type.split(',')
include_type = [item.strip().lower() for item in include_type]

# .. ignore explicit types if all types are to be returned ..
if ModuleCtx.Include_Type.All in include_type:
include_type = [ModuleCtx.Include_Type.All]
else:
out[:] = include_type

# .. if we do not have anything, it means that we are including all types ..
if not out:
out = [ModuleCtx.Include_Type.All]

# .. now, we are ready to return our response.
return out

# ################################################################################################################################

def _on_server(self, args:'any_') -> 'None':
Expand Down Expand Up @@ -2056,6 +1968,9 @@ def _on_server(self, args:'any_') -> 'None':
self.json = {}
has_import = getattr(args, 'import')

# For type hints
self.missing_wait_time:'int' = getattr(self.args, 'missing_wait_time', None) or ModuleCtx.Missing_Wait_Time

# Initialize environment variables ..
env_path = self.normalize_path('env_file', False)
populate_environment_from_file(env_path)
Expand Down Expand Up @@ -2159,7 +2074,104 @@ def _on_server(self, args:'any_') -> 'None':

# 4) a/b
elif has_import:
_ = self.report_warnings_errors(self.run_import())
warnings_errors_list = self.run_import()
_ = self.report_warnings_errors(warnings_errors_list)

# ################################################################################################################################

def load_input(self, input_path):

# stdlib
import sys

_, _, ext = self.args.input.rpartition('.')
codec_class = self.CODEC_BY_EXTENSION.get(ext.lower())
if codec_class is None:
exts = ', '.join(sorted(self.CODEC_BY_EXTENSION))
self.logger.error('Unrecognized file extension "{}": must be one of {}'.format(ext.lower(), exts))
sys.exit(self.SYS_ERROR.INVALID_INPUT)

parser = InputParser(input_path, self.logger, codec_class())
results = parser.parse()
if not results.ok:
self.logger.error('JSON parsing failed')
_ = self.report_warnings_errors([results])
sys.exit(self.SYS_ERROR.INVALID_INPUT)
self.json = parser.json

# ################################################################################################################################

def normalize_path(
self,
arg_name, # type: str
exit_if_missing, # type: bool
*,
needs_parent_dir=False, # type: bool
log_if_missing=False, # type: bool
) -> 'str':

# Local aliases
path_to_check:'str' = ''
arg_param = getattr(self.args, arg_name, None) or ''

# Potentially, expand the path to our home directory ..
arg_name = os.path.expanduser(arg_param)

# Turn the name into a full path unless it already is one ..
if os.path.isabs(arg_name):
arg_path = arg_name
else:
arg_path = os.path.join(self.curdir, arg_param)
arg_path = os.path.abspath(arg_path)

# .. we need for a directory to exist ..
if needs_parent_dir:
path_to_check = os.path.join(arg_path, '..')
path_to_check = os.path.abspath(path_to_check)

# .. or for the actual file to exist ..
else:
path_to_check = arg_path

# .. make sure that it does exist ..
if not os.path.exists(path_to_check):

# .. optionally, exit the process if it does not ..
if exit_if_missing:

if log_if_missing:
self.logger.info(f'Path not found: `{path_to_check}`')

# Zato
import sys
sys.exit()

# .. if we are here, it means that we have a valid, absolute path to return ..
return arg_path

# ################################################################################################################################

def _extract_include(self, include_type:'str') -> 'strlist': # type: ignore

# Local aliases
out:'strlist' = []

# Turn the string into a list of items that we will process ..
include_type:'strlist' = include_type.split(',')
include_type = [item.strip().lower() for item in include_type]

# .. ignore explicit types if all types are to be returned ..
if ModuleCtx.Include_Type.All in include_type:
include_type = [ModuleCtx.Include_Type.All]
else:
out[:] = include_type

# .. if we do not have anything, it means that we are including all types ..
if not out:
out = [ModuleCtx.Include_Type.All]

# .. now, we are ready to return our response.
return out

# ################################################################################################################################

Expand Down Expand Up @@ -2634,10 +2646,69 @@ def export_local_odb(self, needs_local=True):
def run_odb_export(self):
return self.export_local_odb(False)

# ################################################################################################################################

def _get_missing_objects(self, warnings_errors:'list_[Results]') -> 'strlist':

# Our response to produce
out:'strlist' = []

for item in warnings_errors:

for elem in chain(item.warnings, item.errors): # type: ignore
elem = cast_('Notice', elem)
if elem.code == ERROR_SERVICE_MISSING:
enmasse_elem:'stranydict' = elem.value_raw[1]
service_name = enmasse_elem['service']
out.append(service_name)

# Return everything we have found to our caller, sorted alphabetically
return sorted(out)

# ################################################################################################################################

def run_import(self) -> 'anylist':

# Local variables
start_time = datetime.utcnow()
wait_until = start_time + timedelta(seconds=1)

# Run the initial import ..
warnings_errors = self._run_import()

while warnings_errors:

# Loop variables
now = datetime.utcnow()

# .. if we have already waited enough, we can already return ..
if now > wait_until:
return warnings_errors

# .. if there is anything that we need to wait for, ..
# .. such as missing services, we will keep running ..
missing = self._get_missing_objects(warnings_errors)

# .. for reporting purposes, get information on how much longer are to wait ..
wait_delta = wait_until - now

# .. report what we are waiting for ..
msg = f'Enmasse waiting; timeout -> {wait_delta}; missing -> {missing}'
self.logger.info(msg)

# .. do wait now ..
sleep(2)

# .. re-run the import ..
warnings_errors = self._run_import()

# .. either we run out of time or we have succeed, in either case, we can return.
return warnings_errors

# ################################################################################################################################

def _run_import(self) -> 'anylist':

# Make sure we have the latest state of information ..
self.object_mgr.refresh()

Expand All @@ -2664,6 +2735,7 @@ def run_import(self) -> 'anylist':

if __name__ == '__main__':

"""
# stdlib
import sys
Expand All @@ -2682,11 +2754,12 @@ def run_import(self) -> 'anylist':
args.output = None
args.rbac_sleep = 1
# args['replace'] = True
args['import'] = False
args['replace'] = True
args['import'] = True
args.path = sys.argv[1]
# args.path = sys.argv[1]
# args.input = sys.argv[2]
enmasse = Enmasse(args)
enmasse.run(args)
"""
5 changes: 5 additions & 0 deletions code/zato-common/src/zato/common/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def __repr__(self):

# ################################################################################################################################

class ServiceMissingException(ZatoException):
pass

# ################################################################################################################################

class RuntimeInvocationError(ZatoException):
pass

Expand Down
11 changes: 9 additions & 2 deletions code/zato-common/src/zato/common/util/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1950,8 +1950,15 @@ def slugify(value, allow_unicode=False):

# ################################################################################################################################

def wait_for_predicate(predicate_func, timeout, interval, log_msg_details=None, needs_log=True, *args, **kwargs):
# type: (object, int, float, *object, **object) -> bool
def wait_for_predicate(
predicate_func, # type: callable_
timeout, # type: int
interval, # type: float
log_msg_details=None, # type: strnone
needs_log=True, # type: bool
*args, # type: any_
**kwargs # type: any_
) -> 'bool':

# Try out first, perhaps it already fulfilled
is_fulfilled = bool(predicate_func(*args, **kwargs))
Expand Down
Loading

0 comments on commit feab81b

Please sign in to comment.