Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to Marshmallow 3 #57

Open
wants to merge 8 commits into
base: v1.8
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [2.7, 3.5, 3.6, 3.7, 3.8]
python-version: [3.5, 3.6, 3.7, 3.8]

steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ coverage.xml
.tox
.*.sw[op]
.idea/
.eggs
build
12 changes: 6 additions & 6 deletions boundary_layer/builders/primary.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

import datetime
from marshmallow import ValidationError
import boundary_layer
from boundary_layer.builders.base import DagBuilderBase
from boundary_layer.schemas.dag import DagArgsSchema
Expand All @@ -28,16 +29,15 @@ def preamble(self):
self.reference_path)

template = self.get_jinja_template('primary_preamble.j2')

dag_args_dumped = DagArgsSchema(context={'for_dag_output': True}).dump(
self.dag.get('dag_args', {}))
if dag_args_dumped.errors:
try:
dag_args = DagArgsSchema(context={'for_dag_output': True}).dump(
self.dag.get('dag_args', {}))
except ValidationError as err:
# should not happen because the schema was validated upon load,
# but we should check
raise Exception('Error serializing dag_args: {}'.format(
dag_args_dumped.errors))
err.messages))

dag_args = dag_args_dumped.data
dag_args['dag_id'] = self.build_dag_id()

default_task_args = self.dag.get('default_task_args', {})
Expand Down
8 changes: 3 additions & 5 deletions boundary_layer/oozier/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,9 @@ def schema(self):
pass

def __init__(self, context, action_metadata, data):
loaded = self.schema(context=context).load(data)
if loaded.errors:
raise ma.ValidationError(loaded.errors)
loaded_data = self.schema(context=context).load(data)

super(OozieActionBuilderWithSchema, self).__init__(context, action_metadata, loaded.data)
super(OozieActionBuilderWithSchema, self).__init__(context, action_metadata, loaded_data)

def get_action(self):
result = self.action_metadata.copy()
Expand All @@ -67,7 +65,7 @@ def get_action(self):


class OozieSubWorkflowActionSchema(OozieBaseSchema):
app_path = ma.fields.String(required=True, load_from='app-path')
app_path = ma.fields.String(required=True, data_key='app-path')
propagate_configuration = ma.fields.Dict(allow_none=True)


Expand Down
23 changes: 12 additions & 11 deletions boundary_layer/oozier/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import networkx as nx
import six

from marshmallow import ValidationError
from boundary_layer.graph import _GraphUtil
from boundary_layer.logger import logger
from boundary_layer import exceptions, VERSION_STRING, plugins
Expand Down Expand Up @@ -120,20 +121,20 @@ def _parse_workflow(self, filename, cluster_config, oozie_config):
parsed = xmltodict.parse(
self.file_fetcher.fetch_file_content(filename))

loaded = OozieWorkflowSchema(context={
'cluster_config': cluster_config,
'oozie_plugin': oozie_config,
'macro_translator': JspMacroTranslator(oozie_config.jsp_macros()),
'production': self.production,
}).load(parsed)

if loaded.errors:
try:
data = OozieWorkflowSchema(context={
'cluster_config': cluster_config,
'oozie_plugin': oozie_config,
'macro_translator': JspMacroTranslator(oozie_config.jsp_macros()),
'production': self.production,
}).load(parsed)
except ValidationError as err:
raise Exception('Errors parsing file {}: {}'.format(
filename,
loaded.errors))
err.messages))

data_copy = loaded.data.copy()
data_copy.update(self.partition_actions(loaded.data))
data_copy = data.copy()
data_copy.update(self.partition_actions(data))

return data_copy

Expand Down
27 changes: 15 additions & 12 deletions boundary_layer/oozier/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@


class OozieBaseSchema(ma.Schema):
class Meta:
unknown = ma.INCLUDE

singletons_to_lists = []

@ma.pre_load
def _convert_singletons_to_lists(self, data):
def _convert_singletons_to_lists(self, data, **kwargs):
if all(isinstance(data.get(key, []), list) for key in self.singletons_to_lists):
return data

Expand All @@ -34,15 +37,15 @@ def _convert_singletons_to_lists(self, data):


class OozieNamedObjectSchema(OozieBaseSchema):
name = ma.fields.String(required=True, load_from='@name')
name = ma.fields.String(required=True, data_key='@name')


class OozieFlowControlSchema(OozieBaseSchema):
to = ma.fields.String(required=True, load_from='@to')
to = ma.fields.String(required=True, data_key='@to')


class OozieForkStartSchema(OozieBaseSchema):
start = ma.fields.String(required=True, load_from='@start')
start = ma.fields.String(required=True, data_key='@start')


class OozieForkSchema(OozieNamedObjectSchema):
Expand All @@ -51,7 +54,7 @@ class OozieForkSchema(OozieNamedObjectSchema):
singletons_to_lists = ['path']

@ma.post_load
def add_base_operator(self, data):
def add_base_operator(self, data, **kwargs):
operator = {
'name': data['name'],
'type': 'flow_control',
Expand All @@ -64,7 +67,7 @@ def add_base_operator(self, data):

class OozieJoinSchema(OozieNamedObjectSchema, OozieFlowControlSchema):
@ma.post_load
def add_base_operator(self, data):
def add_base_operator(self, data, **kwargs):
operator = {
'name': data['name'],
'type': 'flow_control',
Expand Down Expand Up @@ -101,15 +104,15 @@ def _get_action_builder(self, data):
return keyed_action_builders[keys_present[0]]

@ma.validates_schema(pass_original=True)
def one_action_type(self, _, original):
def one_action_type(self, _, original, **kwargs):
""" Runs the validation checks to make sure that exactly one
known action is present, and prints an error message based on
the content of the original, unparsed input
"""
self._get_action_builder(original)

@ma.post_load(pass_original=True)
def fetch_base_action(self, data, original):
def fetch_base_action(self, data, original, **kwargs):
builder_cls = self._get_action_builder(original)

return builder_cls(self.context, data, original[builder_cls.key])
Expand All @@ -120,7 +123,7 @@ class OozieKillSchema(OozieNamedObjectSchema):


class OozieCaseSchema(OozieFlowControlSchema):
text = ma.fields.String(required=True, load_from='#text')
text = ma.fields.String(required=True, data_key='#text')


class OozieSwitchSchema(OozieBaseSchema):
Expand All @@ -135,7 +138,7 @@ class OozieDecisionSchema(OozieNamedObjectSchema):


class OozieWorkflowAppSchema(OozieNamedObjectSchema):
name = ma.fields.String(required=True, load_from='@name')
name = ma.fields.String(required=True, data_key='@name')
action = ma.fields.List(ma.fields.Nested(OozieActionSchema), missing=[])
join = ma.fields.List(ma.fields.Nested(OozieJoinSchema), missing=[])
fork = ma.fields.List(ma.fields.Nested(OozieForkSchema), missing=[])
Expand All @@ -150,8 +153,8 @@ class OozieWorkflowAppSchema(OozieNamedObjectSchema):

class OozieWorkflowSchema(OozieBaseSchema):
workflow_app = ma.fields.Nested(
OozieWorkflowAppSchema, load_from='workflow-app', required=True)
OozieWorkflowAppSchema, data_key='workflow-app', required=True)

@ma.post_load
def return_workflow(self, data):
def return_workflow(self, data, **kwargs):
return data['workflow_app']
9 changes: 5 additions & 4 deletions boundary_layer/plugins/plugin_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,15 @@ def parse_plugin_config(self, plugin, config):
'Config schema for plugin {} is not a marshmallow Schema. '
'Found: {}'.format(plugin.name, plugin.config_schema_cls))

parsed_config = plugin.config_schema_cls().load(config or {})
if parsed_config.errors:
try:
parsed_config_data = plugin.config_schema_cls().load(config or {})
except ValidationError as err:
raise Exception(
'Errors parsing configuration for plugin {}: {}'.format(
plugin.name,
parsed_config.errors))
err.messages))

return parsed_config.data
return parsed_config_data

def insert_imports(self, plugin_config):
objects = []
Expand Down
11 changes: 6 additions & 5 deletions boundary_layer/registry/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import os
from enum import Enum
import yaml
from marshmallow import ValidationError

from boundary_layer.logger import logger
from boundary_layer import util
Expand Down Expand Up @@ -177,13 +178,13 @@ def load_from_file(self, filename):

logger.debug('validating item %s against schema %s',
item, self.spec_schema_cls.__name__)

loaded = self.spec_schema_cls().load(item)
if loaded.errors:
try:
data = self.spec_schema_cls().load(item)
except ValidationError as err:
raise InvalidConfig('Invalid config spec in file {}: {}'.format(
filename, loaded.errors))
filename, err.messages))

return loaded.data
return data

def load_configs(self, config_paths):
if not isinstance(config_paths, list):
Expand Down
16 changes: 8 additions & 8 deletions boundary_layer/registry/types/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

import six
from marshmallow import ValidationError
from boundary_layer.logger import logger
from boundary_layer.registry import ConfigFileRegistry, RegistryNode, NodeTypes
from boundary_layer.schemas.internal.operators import OperatorSpecSchema
Expand Down Expand Up @@ -79,15 +80,14 @@ def imports(self):
'resolve_properties() has not been called yet!'.format(
self))

loaded = ImportSchema().load(self.config.get('imports', {}))

assert not loaded.errors, \
('Internal error: processing `imports` config {} for '
'operator `{}`').format(
try:
result = ImportSchema().load(self.config.get('imports', {}))
except ValidationError as err:
raise Exception(
('Internal error: processing `imports` config {} for '
'operator `{}`').format(
self.config.get('imports', {}),
self.name)

result = loaded.data
self.name))

if self.operator_class:
result.setdefault('objects', [])
Expand Down
12 changes: 7 additions & 5 deletions boundary_layer/registry/types/preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

import abc
from marshmallow import ValidationError
from boundary_layer.registry import Registry, RegistryNode, NodeTypes


Expand Down Expand Up @@ -43,14 +44,15 @@ def __init__(self, properties):
self.properties = None
return

loaded = self.properties_schema_cls().load(properties)
if loaded.errors:
try:
data = self.properties_schema_cls().load(properties)
except ValidationError as err:
raise Exception(
'Error parsing properties for preprocessor `{}`: {}'.format(
'Error parsing properties for preprocessor {}: {}'.format(
self.type,
loaded.errors))
err.messages))

self.properties = loaded.data
self.properties = data

@property
def properties_schema_cls(self):
Expand Down
2 changes: 1 addition & 1 deletion boundary_layer/schemas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

class StrictSchema(ma.Schema):
@ma.validates_schema(pass_original=True)
def check_no_unknowns(self, _, original_data):
def check_no_unknowns(self, _, original_data, **kwargs):
def check_single_datum(datum):
unknown = set(datum) - set(self.fields)
if unknown:
Expand Down
16 changes: 8 additions & 8 deletions boundary_layer/schemas/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class GeneratorSchema(ReferenceSchema):
regex_blocklist = fields.List(fields.String())

@validates_schema
def check_task_id_mode(self, data):
def check_task_id_mode(self, data, **kwargs):
if 'auto_task_id_mode' not in data:
return

Expand Down Expand Up @@ -133,7 +133,7 @@ class DagArgsSchema(StrictSchema):
access_control = fields.Dict()

@validates_schema
def validate_callbacks(self, data):
def validate_callbacks(self, data, **kwargs):
callbacks = ['sla_miss_callback', 'on_success_callback',
'on_failure_callback']
for cb in callbacks:
Expand All @@ -145,7 +145,7 @@ def validate_callbacks(self, data):
[cb])

@validates_schema
def validate_default_view(self, data):
def validate_default_view(self, data, **kwargs):
if 'default_view' not in data:
return

Expand All @@ -157,7 +157,7 @@ def validate_default_view(self, data):
['default_view'])

@validates_schema
def validate_orientation(self, data):
def validate_orientation(self, data, **kwargs):
if 'orientation' not in data:
return

Expand All @@ -168,7 +168,7 @@ def validate_orientation(self, data):
['orientation'])

@validates_schema
def validate_template_undefined(self, data):
def validate_template_undefined(self, data, **kwargs):
if 'template_undefined' not in data:
return
if not re.compile('<<.+>>').match(data['template_undefined']):
Expand All @@ -177,7 +177,7 @@ def validate_template_undefined(self, data):
['template_undefined'])

@post_dump
def dagrun_timeout_to_timedelta(self, data):
def dagrun_timeout_to_timedelta(self, data, **kwargs):
if not self.context.get('for_dag_output'):
return data
if 'dagrun_timeout' in data:
Expand All @@ -199,7 +199,7 @@ class PrimaryDagSchema(BaseDagSchema):
default_task_args = fields.Dict()

@validates_schema
def validate_compatibility_version(self, data):
def validate_compatibility_version(self, data, **kwargs):
if not data.get('compatibility_version'):
return

Expand All @@ -222,4 +222,4 @@ def validate_compatibility_version(self, data):
'Incompatible boundary_layer version: This workflow '
'is for the incompatible prior version {}. Use the '
'migrate-workflow script to update it.'.format(version),
['compatibility_version'])
['compatibility_version'])
Loading