Skip to content

Commit

Permalink
UIS subscription changes
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Mar 7, 2025
1 parent c9f9aa0 commit aa7279c
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 76 deletions.
2 changes: 1 addition & 1 deletion conda-environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ dependencies:
- ansimarkup >=1.0.0
- async-timeout>=3.0.0 # [py<3.11]
- colorama >=0.4,<1.0
- graphene >=3.4.3,<4
- graphene ==3.4.3
- graphviz # for static graphing
# Note: can't pin jinja2 any higher than this until we give up on Cylc 7 back-compat
- jinja2 >=3.0,<3.1
Expand Down
35 changes: 16 additions & 19 deletions cylc/flow/network/graphql.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
visit,
get_argument_values,
get_named_type,
introspection_types,
is_introspection_type,
)
from graphql.pyutils import AwaitableOrValue, is_awaitable

Expand All @@ -47,10 +47,6 @@
NULL_VALUE = None
EMPTY_VALUES: Tuple[list, dict] = ([], {})
STRIP_OPS = {'query', 'subscription'}
INTROSPECTS = {
k.lower()
for k in introspection_types
}

T = TypeVar("T")
U = TypeVar("U")
Expand Down Expand Up @@ -82,6 +78,17 @@ def grow_tree(tree, path, leaves=None):
tree_loc[len(path) % 2].update({'leaves': leaves})


def execution_result_to_dict(execution_result):
result = {}
if execution_result.data:
result["data"] = execution_result.data
if execution_result.errors:
result["errors"] = [
error.formatted for error in execution_result.errors
]
return result


def instantiate_middleware(middlewares):
"""Take iterable of middlewares and instantiate.
Expand Down Expand Up @@ -154,21 +161,11 @@ def strip_null(data):
return data


def attr_strip_null(result):
"""Work on the attribute/data of ExecutionResult if present."""
if hasattr(result, 'data'):
result.data = strip_null(result.data)
return result
return strip_null(result)


def null_stripper(exe_result):
"""Strip nulls in accordance with type of execution result."""
if is_awaitable(exe_result):
return async_next(attr_strip_null, exe_result)
if getattr(exe_result, 'errors', None) is None:
return attr_strip_null(exe_result)
return exe_result
return async_next(strip_null, exe_result)

Check warning on line 167 in cylc/flow/network/graphql.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/graphql.py#L167

Added line #L167 was not covered by tests
return strip_null(exe_result)


class CylcVisitor(Visitor):
Expand Down Expand Up @@ -260,12 +257,13 @@ def __init__(self):
def resolve(self, next_, root, info, **args):
"""Middleware resolver; handles field according to operation."""
# GraphiQL introspection is 'query' but not async
if INTROSPECTS.intersection({f'{p}' for p in info.path.as_list()}):
if is_introspection_type(get_named_type(info.return_type)):
return next_(root, info, **args)

if info.operation.operation.value in STRIP_OPS:
path_list = info.path.as_list()
path_string = f'{path_list}'
parent_path_string = f'{path_list[:-1:]}'
# Needed for child fields that resolve without args.
# Store arguments of parents as leaves of schema tree from path
# to respective field.
Expand Down Expand Up @@ -301,7 +299,6 @@ def resolve(self, next_, root, info, **args):
):

# Gather fields set in root
parent_path_string = f'{path_list[:-1:]}'
stamp = getattr(root, 'stamp', '')
if (
parent_path_string not in self.field_sets
Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,8 +557,9 @@ async def subscribe_delta(
# NOTE: we don't expect workflows to be returned in definition order
# so it is ok to use `set` here
workflow_ids = set(args.get('workflows', args.get('ids', ())))

sub_id = uuid4()
info.variable_values['backend_sub_id'] = sub_id
info.context['sub_id'] = sub_id

Check warning on line 562 in cylc/flow/network/resolvers.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/resolvers.py#L562

Added line #L562 was not covered by tests
self.delta_store[sub_id] = {}

op_id = root
Expand Down
100 changes: 68 additions & 32 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
)
from graphene.types.generic import GenericScalar
from graphene.utils.str_converters import to_snake_case
from graphene.types.schema import identity_resolve

from graphql.type.definition import get_named_type

from cylc.flow import LOG_LEVELS
Expand All @@ -70,8 +72,7 @@
FLOW_NONE,
)
from cylc.flow.id import Tokens
from cylc.flow.run_modes import (
TASK_CONFIG_RUN_MODES, WORKFLOW_RUN_MODES, RunMode)
from cylc.flow.run_modes import RunMode
from cylc.flow.task_outputs import SORT_ORDERS
from cylc.flow.task_state import (
TASK_STATUS_DESC,
Expand Down Expand Up @@ -309,8 +310,8 @@ def process_resolver_info(
"""Set and gather info for resolver."""
# Add the subscription id to the resolver context
# to know which delta-store to use."""
if 'backend_sub_id' in info.variable_values:
args['sub_id'] = info.variable_values['backend_sub_id']
if 'sub_id' in info.context:
args['sub_id'] = info.context['sub_id']

Check warning on line 314 in cylc/flow/network/schema.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/schema.py#L314

Added line #L314 was not covered by tests

field_name: str = to_snake_case(info.field_name)
# root is the parent data object.
Expand Down Expand Up @@ -620,19 +621,18 @@ class Meta:
string_extended = String()


# The run mode for the workflow.
WorkflowRunMode = graphene.Enum(
'WorkflowRunMode',
[(m.capitalize(), m) for m in WORKFLOW_RUN_MODES],
description=lambda x: RunMode(x.value).describe() if x else None,
)
class TaskRunMode(graphene.Enum):
"""The mode used to run a task."""

# The run mode for the task.
TaskRunMode = graphene.Enum(
'TaskRunMode',
[(m.capitalize(), m) for m in TASK_CONFIG_RUN_MODES],
description=lambda x: RunMode(x.value).describe() if x else None,
)
# NOTE: using a different enum because:
# * We only want to offer a subset of run modes (REQUEST_* only).

Live = cast('Enum', RunMode.LIVE) # type: graphene.Enum
Skip = cast('Enum', RunMode.SKIP) # type: graphene.Enum

@property
def description(self):
return RunMode(self.value).describe()


class Workflow(ObjectType):
Expand Down Expand Up @@ -862,7 +862,7 @@ class Meta:
directives = graphene.List(RuntimeSetting, resolver=resolve_json_dump)
environment = graphene.List(RuntimeSetting, resolver=resolve_json_dump)
outputs = graphene.List(RuntimeSetting, resolver=resolve_json_dump)
run_mode = TaskRunMode(default_value=TaskRunMode.Live.name)
run_mode = TaskRunMode(default_value=TaskRunMode.Live)


RUNTIME_FIELD_TO_CFG_MAP = {
Expand Down Expand Up @@ -1694,6 +1694,21 @@ class TimePoint(String):
)


class WorkflowRunMode(graphene.Enum):
"""The mode used to run a workflow."""

# NOTE: using a different enum because:
# * We only want to offer a subset of run modes (REQUEST_* only).

Live = cast('Enum', RunMode.LIVE) # type: graphene.Enum
Dummy = cast('Enum', RunMode.DUMMY) # type: graphene.Enum
Simulation = cast('Enum', RunMode.SIMULATION) # type: graphene.Enum

@property
def description(self):
return RunMode(self.value).describe()

Check warning on line 1709 in cylc/flow/network/schema.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/schema.py#L1709

Added line #L1709 was not covered by tests


class WorkflowStopMode(graphene.Enum):
"""The mode used to stop a running workflow."""

Expand Down Expand Up @@ -1984,7 +1999,7 @@ class Meta:
submitted immediately if the workflow is restarted.
Remaining task event handlers, job poll and kill commands, will
be executed prior to shutdown, unless
the stop mode is `{WorkflowStopMode.Now.name}`.
the stop mode is `{WorkflowStopMode.Now}`.
Valid for: paused, running, stopping workflows.
''')
Expand Down Expand Up @@ -2509,6 +2524,27 @@ class Meta:
)


# TODO: Change to use subscribe arg/default graphql-core has a subscribe field
# for both Meta and Field, graphene at v3.4.3 does not.. As a workaround
# the subscribe function is looked up via the following mapping:
SUB_RESOLVER_MAPPING = {
'deltas': delta_subs,
'workflows': delta_subs,
'job': delta_subs,
'jobs': delta_subs,
'task': delta_subs,
'tasks': delta_subs,
'taskProxy': delta_subs,
'taskProxies': delta_subs,
'family': delta_subs,
'families': delta_subs,
'familyProxy': delta_subs,
'familyProxies': delta_subs,
'edges': delta_subs,
'nodesEdges': delta_subs,
}


class Subscriptions(ObjectType):
"""Defines the subscriptions available in the schema."""
class Meta:
Expand All @@ -2523,7 +2559,7 @@ class Meta:
strip_null=Boolean(default_value=False),
initial_burst=Boolean(default_value=True),
ignore_interval=Float(default_value=0.0),
resolver=delta_subs
resolver=identity_resolve
)
workflows = graphene.List(
Workflow,
Expand All @@ -2536,7 +2572,7 @@ class Meta:
delta_type=String(default_value=DELTA_ADDED),
initial_burst=Boolean(default_value=True),
ignore_interval=Float(default_value=2.5),
resolver=delta_subs
resolver=identity_resolve
)
job = Field(
Job,
Expand All @@ -2547,7 +2583,7 @@ class Meta:
delta_type=String(default_value=DELTA_ADDED),
initial_burst=Boolean(default_value=True),
ignore_interval=Float(default_value=0.0),
resolver=delta_subs
resolver=identity_resolve
)
jobs = graphene.List(
Job,
Expand All @@ -2558,7 +2594,7 @@ class Meta:
delta_type=String(default_value=DELTA_ADDED),
initial_burst=Boolean(default_value=True),
ignore_interval=Float(default_value=0.0),
resolver=delta_subs
resolver=identity_resolve
)
task = Field(
Task,
Expand All @@ -2569,7 +2605,7 @@ class Meta:
delta_type=String(default_value=DELTA_ADDED),
initial_burst=Boolean(default_value=True),
ignore_interval=Float(default_value=0.0),
resolver=delta_subs
resolver=identity_resolve
)
tasks = graphene.List(
Task,
Expand All @@ -2580,7 +2616,7 @@ class Meta:
delta_type=String(default_value=DELTA_ADDED),
initial_burst=Boolean(default_value=True),
ignore_interval=Float(default_value=0.0),
resolver=delta_subs
resolver=identity_resolve
)
task_proxy = Field(
TaskProxy,
Expand All @@ -2591,7 +2627,7 @@ class Meta:
delta_type=String(default_value=DELTA_ADDED),
initial_burst=Boolean(default_value=True),
ignore_interval=Float(default_value=0.0),
resolver=delta_subs
resolver=identity_resolve
)
task_proxies = graphene.List(
TaskProxy,
Expand All @@ -2602,7 +2638,7 @@ class Meta:
delta_type=String(default_value=DELTA_ADDED),
initial_burst=Boolean(default_value=True),
ignore_interval=Float(default_value=0.0),
resolver=delta_subs
resolver=identity_resolve
)
family = Field(
Family,
Expand All @@ -2613,7 +2649,7 @@ class Meta:
delta_type=String(default_value=DELTA_ADDED),
initial_burst=Boolean(default_value=True),
ignore_interval=Float(default_value=0.0),
resolver=delta_subs
resolver=identity_resolve
)
families = graphene.List(
Family,
Expand All @@ -2624,7 +2660,7 @@ class Meta:
delta_type=String(default_value=DELTA_ADDED),
initial_burst=Boolean(default_value=True),
ignore_interval=Float(default_value=0.0),
resolver=delta_subs
resolver=identity_resolve
)
family_proxy = Field(
FamilyProxy,
Expand All @@ -2635,7 +2671,7 @@ class Meta:
delta_type=String(default_value=DELTA_ADDED),
initial_burst=Boolean(default_value=True),
ignore_interval=Float(default_value=0.0),
resolver=delta_subs
resolver=identity_resolve
)
family_proxies = graphene.List(
FamilyProxy,
Expand All @@ -2646,7 +2682,7 @@ class Meta:
delta_type=String(default_value=DELTA_ADDED),
initial_burst=Boolean(default_value=True),
ignore_interval=Float(default_value=0.0),
resolver=delta_subs
resolver=identity_resolve
)
edges = graphene.List(
Edge,
Expand All @@ -2657,7 +2693,7 @@ class Meta:
delta_type=String(default_value=DELTA_ADDED),
initial_burst=Boolean(default_value=True),
ignore_interval=Float(default_value=0.0),
resolver=delta_subs
resolver=identity_resolve
)
nodes_edges = Field(
NodesEdges,
Expand All @@ -2668,7 +2704,7 @@ class Meta:
delta_type=String(default_value=DELTA_ADDED),
initial_burst=Boolean(default_value=True),
ignore_interval=Float(default_value=0.0),
resolver=delta_subs
resolver=identity_resolve
)


Expand Down
18 changes: 10 additions & 8 deletions cylc/flow/network/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
from cylc.flow.network.graphql import (
CylcExecutionContext,
IgnoreFieldMiddleware,
instantiate_middleware
execution_result_to_dict,
instantiate_middleware,
)
from cylc.flow.network.publisher import WorkflowPublisher
from cylc.flow.network.replier import WorkflowReplier
Expand Down Expand Up @@ -414,14 +415,15 @@ def graphql(
execution_context_class=CylcExecutionContext,
)
if is_awaitable(executed):
result = self.loop.run_until_complete(executed)
if result.errors:
for error in result.errors:
executed = self.loop.run_until_complete(executed)
result = execution_result_to_dict(executed)
if result.get('errors'):
# If there are execution errors log and return the errors,
# don't raise them and end the server over a bad query.
for error in result['errors']:
LOG.warning(f"GraphQL: {error}")
# If there are execution errors, it means there was an unexpected
# error, so fail the command.
raise Exception(*result.errors)
return result.data
return result['errors']
return result.get('data')

# UIServer Data Commands
@expose
Expand Down
Loading

0 comments on commit aa7279c

Please sign in to comment.