From 59c9a1e3567819f41e01c30a9456f635abd7c457 Mon Sep 17 00:00:00 2001 From: Paul Madden Date: Tue, 19 Sep 2023 16:52:53 -0600 Subject: [PATCH] fixes-for-asset-dicts (#2) --- README.md | 212 ++++++++++++++++--------------- recipe/meta.yaml | 2 +- src/iotaa/core.py | 233 +++++++++++++++++++---------------- src/iotaa/demo.py | 16 +-- src/iotaa/tests/test_core.py | 130 +++++++++---------- 5 files changed, 301 insertions(+), 292 deletions(-) diff --git a/README.md b/README.md index ada83bb..97d298e 100644 --- a/README.md +++ b/README.md @@ -10,20 +10,20 @@ Workflows comprise: - Assets (observable external state -- typically files, but sometimes more abstract state, e.g. a time duration) - Requirement relationships between assets -- Means by which assets are made ready (e.g. created) +- Executable logic to make assets ready (e.g. create them) ## Assets -The `asset` has two attributes: +An `asset` object has two attributes: -1. `id`: A value, of any type, that uniquely identifies the observable state this asset represents (e.g. a POSIX filesytem path, an S3 URI, an ISO8601 timestamp) -2. `ready`: A 0-arity (no-argument) function returning a `bool` value indicating whether or not the asset is ready to use +1. `id`: A value, of any type, uniquely identifying the observable state this asset represents (e.g. a POSIX filesytem path, an S3 URI, an ISO8601 timestamp) +2. `ready`: A 0-arity (no-argument) function returning a `bool` indicating whether or not the asset is ready to use Create an `asset` by calling `asset()`. ## Tasks -Task are functions that declare, by `yield`ing values to `iotaa`, one or more of: asset description, requirement relationships between assets, and imperative recipes for readying assets. `iotaa` provides three Python decorators to define tasks: +Task are functions that declare, by `yield`ing values to `iotaa`, a description of the assets represented by the task (aka the task's name), plus -- depending on task type -- one or more of: the `asset`s themselves, other tasks that the task requires, and/or executable logic to make the task's asset ready. `iotaa` provides three Python decorators to define tasks: ### `@task` @@ -31,13 +31,13 @@ The essential workflow function type. A `@task` function `yield`s, in order: 1. A task name describing the assets being readied, for logging 2. An `asset` -- or an `asset` `list`, or a `dict` mapping `str` keys to `asset` values, or `None` -- that the task is responsible for making ready -3. A task-function call (e.g. `t(args)` for task `t`) -- or a `list` of such calls, or `None` -- that this task requires +3. A task-function call (e.g. `t(args)` for task `t`) -- or a `list` or `dict` of such calls, or `None` -- that this task requires before it can ready its own assets -Arbitrary Python statements may appear before and interspersed between the `yield` statements. All statements following the third and final `yield` will be executed -- if and only if the assets of all required tasks are ready -- with the expectation that they will make ready the task's assets. +Arbitrary Python statements may appear before and interspersed between the `yield` statements. If the assets of all required tasks are ready, the statements following the third and final `yield` will be executed, with the expectation that they will make the task's assets ready. ### `@external` -A function type representing a required `asset` that `iotaa` cannot make ready. An `@external` function `yield`s, in order: +A function type representing a required `asset` that `iotaa` cannot make ready, or a `list` or `dict` of such assets. An `@external` function `yield`s, in order: 1. A task name describing the assets being readied, for logging 2. A required `asset` -- or an `asset` `list`, or a `dict` mapping `str` keys to `asset` values, or `None` -- that must become ready via external means not under workflow control. (Specifying `None` may be nonsensical.) @@ -49,7 +49,7 @@ As with `@task` functions, arbitrary Python statements may appear before and int A function type serving as a container for other tasks. A `@tasks` function `yield`s, in order: 1. A task name describing the assets being readied, for logging -2. A task-function call (e.g. `t(args)` for task `t`) -- or a `list` of such calls, or `None` -- this task requires. (Specifying `None` may be nonsensical.) +2. A task-function call (e.g. `t(args)` for task `t`) -- or a `list` or `dict` of such calls, or `None` -- that this task requires. (Specifying `None` may be nonsensical.) As with `@external` tasks, no statements should follow the second and final `yield`, as they will never execute. @@ -132,7 +132,7 @@ The first `@tasks` method defines the end result: A cup of tea, steeped, with su @tasks def a_cup_of_tea(basedir): yield "A cup of steeped tea with sugar" - cupdir = ids(cup(basedir))[0] + cupdir = ids(cup(basedir)) yield [cup(basedir), steeped_tea_with_sugar(cupdir)] ``` @@ -142,7 +142,7 @@ Note that the function could have equivalently ``` python the_cup = cup(basedir) - cupdir = ids(the_cup)[0] + cupdir = ids(the_cup) yield [the_cup, steeped_tea_with_sugar(cupdir)] ``` @@ -170,7 +170,7 @@ The `steeped_tea_with_sugar()` `@task` function is next: ``` python @task def steeped_tea_with_sugar(cupdir): - # Add sugar to the steeped tea. + # Add sugar to the steeped tea. Requires tea to have steeped. for x in ingredient(cupdir, "sugar", "Steeped tea with sugar", steeped_tea): yield x ``` @@ -184,7 +184,7 @@ def ingredient(cupdir, fn, name, req=None): path = Path(cupdir) / fn path.parent.mkdir(parents=True, exist_ok=True) yield f"{name} in {cupdir}" - yield asset(path, path.exists) + yield {fn: asset(path, path.exists)} yield req(cupdir) if req else None path.touch() ``` @@ -201,7 +201,7 @@ def steeped_tea(cupdir): # Give tea time to steep. yield f"Steeped tea in {cupdir}" ready = False - water = ids(steeping_tea(cupdir))[0] + water = ids(steeping_tea(cupdir))["water"] if water.exists(): water_poured_time = dt.datetime.fromtimestamp(water.stat().st_mtime) ready_time = water_poured_time + dt.timedelta(seconds=10) @@ -209,7 +209,7 @@ def steeped_tea(cupdir): ready = now >= ready_time yield asset(None, lambda: ready) if not ready: - logging.info("Tea steeping for %ss more", int((ready_time - now).total_seconds())) + logging.info("Tea needs to steep for %ss", int((ready_time - now).total_seconds())) else: yield asset(None, lambda: False) yield steeping_tea(cupdir) @@ -222,7 +222,7 @@ The `steeping_tea()` and `tea_bad()` functions are again straightforward `@task` ``` python @task def steeping_tea(cupdir): - # Pour boiling water over the tea. + # Pour boiling water over the tea. Requires tea bag in cup. for x in ingredient(cupdir, "water", "Boiling water over the tea", tea_bag): yield x ``` @@ -230,7 +230,7 @@ def steeping_tea(cupdir): ``` python @task def tea_bag(cupdir): - # Place tea bag in the cup. + # Place tea bag in the cup. Requires box of tea bags. for x in ingredient(cupdir, "tea", "Tea bag", box_of_tea_bags): yield x ``` @@ -251,41 +251,40 @@ Let's run this workflow with the `iotaa` command-line tool, requesting that the ``` % iotaa src/iotaa/demo.py a_cup_of_tea ./teatime -[2023-09-18T02:33:02] INFO A cup of steeped tea with sugar: Initial state: Pending -[2023-09-18T02:33:02] INFO A cup of steeped tea with sugar: Checking required tasks -[2023-09-18T02:33:02] INFO The cup: teatime/cup: Initial state: Pending -[2023-09-18T02:33:02] INFO The cup: teatime/cup: Checking required tasks -[2023-09-18T02:33:02] INFO The cup: teatime/cup: Ready -[2023-09-18T02:33:02] INFO The cup: teatime/cup: Executing -[2023-09-18T02:33:02] INFO The cup: teatime/cup: Final state: Ready -[2023-09-18T02:33:02] INFO Steeped tea with sugar in teatime/cup: Initial state: Pending -[2023-09-18T02:33:02] INFO Steeped tea with sugar in teatime/cup: Checking required tasks -[2023-09-18T02:33:02] INFO Boiling water over the tea in teatime/cup: Initial state: Pending -[2023-09-18T02:33:02] INFO Boiling water over the tea in teatime/cup: Checking required tasks -[2023-09-18T02:33:02] INFO Tea bag in teatime/cup: Initial state: Pending -[2023-09-18T02:33:02] INFO Tea bag in teatime/cup: Checking required tasks -[2023-09-18T02:33:02] WARNING Tea from store: teatime/box-of-tea: Final state: Pending (EXTERNAL) -[2023-09-18T02:33:02] INFO Tea bag in teatime/cup: Pending -[2023-09-18T02:33:02] WARNING Tea bag in teatime/cup: Final state: Pending -[2023-09-18T02:33:02] INFO Boiling water over the tea in teatime/cup: Pending -[2023-09-18T02:33:02] WARNING Boiling water over the tea in teatime/cup: Final state: Pending -[2023-09-18T02:33:02] INFO Steeped tea in teatime/cup: Initial state: Pending -[2023-09-18T02:33:02] INFO Steeped tea in teatime/cup: Checking required tasks -[2023-09-18T02:33:02] INFO Steeped tea in teatime/cup: Pending -[2023-09-18T02:33:02] WARNING Steeped tea in teatime/cup: Final state: Pending -[2023-09-18T02:33:02] INFO Steeped tea with sugar in teatime/cup: Pending -[2023-09-18T02:33:02] WARNING Steeped tea with sugar in teatime/cup: Final state: Pending -[2023-09-18T02:33:02] WARNING A cup of steeped tea with sugar: Final state: Pending +[2023-09-19T22:44:21] INFO A cup of steeped tea with sugar: Checking required tasks +[2023-09-19T22:44:21] INFO The cup: teatime/cup: Initial state: Pending +[2023-09-19T22:44:21] INFO The cup: teatime/cup: Checking required tasks +[2023-09-19T22:44:21] INFO The cup: teatime/cup: Ready +[2023-09-19T22:44:21] INFO The cup: teatime/cup: Executing +[2023-09-19T22:44:21] INFO The cup: teatime/cup: Final state: Ready +[2023-09-19T22:44:21] INFO Steeped tea with sugar in teatime/cup: Initial state: Pending +[2023-09-19T22:44:21] INFO Steeped tea with sugar in teatime/cup: Checking required tasks +[2023-09-19T22:44:21] INFO Boiling water over the tea in teatime/cup: Initial state: Pending +[2023-09-19T22:44:21] INFO Boiling water over the tea in teatime/cup: Checking required tasks +[2023-09-19T22:44:21] INFO Tea bag in teatime/cup: Initial state: Pending +[2023-09-19T22:44:21] INFO Tea bag in teatime/cup: Checking required tasks +[2023-09-19T22:44:21] WARNING Tea from store: teatime/box-of-tea: Final state: Pending (EXTERNAL) +[2023-09-19T22:44:21] INFO Tea bag in teatime/cup: Pending +[2023-09-19T22:44:21] WARNING Tea bag in teatime/cup: Final state: Pending +[2023-09-19T22:44:21] INFO Boiling water over the tea in teatime/cup: Pending +[2023-09-19T22:44:21] WARNING Boiling water over the tea in teatime/cup: Final state: Pending +[2023-09-19T22:44:21] INFO Steeped tea in teatime/cup: Initial state: Pending +[2023-09-19T22:44:21] INFO Steeped tea in teatime/cup: Checking required tasks +[2023-09-19T22:44:21] INFO Steeped tea in teatime/cup: Pending +[2023-09-19T22:44:21] WARNING Steeped tea in teatime/cup: Final state: Pending +[2023-09-19T22:44:21] INFO Steeped tea with sugar in teatime/cup: Pending +[2023-09-19T22:44:21] WARNING Steeped tea with sugar in teatime/cup: Final state: Pending +[2023-09-19T22:44:21] WARNING A cup of steeped tea with sugar: Final state: Pending ``` There's lots to see during the first invocation. Most of the tasks start and end in a pending state. Only the `cup()` task makes progress from pending to ready state: ``` -[2023-09-18T02:33:02] INFO The cup: teatime/cup: Initial state: Pending -[2023-09-18T02:33:02] INFO The cup: teatime/cup: Checking required tasks -[2023-09-18T02:33:02] INFO The cup: teatime/cup: Ready -[2023-09-18T02:33:02] INFO The cup: teatime/cup: Executing -[2023-09-18T02:33:02] INFO The cup: teatime/cup: Final state: Ready +[2023-09-19T22:44:21] INFO The cup: teatime/cup: Initial state: Pending +[2023-09-19T22:44:21] INFO The cup: teatime/cup: Checking required tasks +[2023-09-19T22:44:21] INFO The cup: teatime/cup: Ready +[2023-09-19T22:44:21] INFO The cup: teatime/cup: Executing +[2023-09-19T22:44:21] INFO The cup: teatime/cup: Final state: Ready ``` The on-disk workflow state is: @@ -299,7 +298,8 @@ teatime/ Note the blocker: ``` -[2023-09-18T02:33:02] WARNING Tea from store: teatime/box-of-tea: Final state: Pending (EXTERNAL) +[2023-09-19T22:44:21] WARNING Tea from store: teatime/box-of-tea: Final state: Pending (EXTERNAL) + ``` The file `teatime/box-of-tea` cannot be created by the workflow, as it is declared `@external`. Let's create it externally: @@ -316,29 +316,27 @@ Now let's iterate the workflow: ``` % iotaa src/iotaa/demo.py a_cup_of_tea ./teatime -[2023-09-18T02:34:32] INFO A cup of steeped tea with sugar: Initial state: Pending -[2023-09-18T02:34:32] INFO A cup of steeped tea with sugar: Checking required tasks -[2023-09-18T02:34:32] INFO Steeped tea with sugar in teatime/cup: Initial state: Pending -[2023-09-18T02:34:32] INFO Steeped tea with sugar in teatime/cup: Checking required tasks -[2023-09-18T02:34:32] INFO Boiling water over the tea in teatime/cup: Initial state: Pending -[2023-09-18T02:34:32] INFO Boiling water over the tea in teatime/cup: Checking required tasks -[2023-09-18T02:34:32] INFO Tea bag in teatime/cup: Initial state: Pending -[2023-09-18T02:34:32] INFO Tea bag in teatime/cup: Checking required tasks -[2023-09-18T02:34:32] INFO Tea bag in teatime/cup: Ready -[2023-09-18T02:34:32] INFO Tea bag in teatime/cup: Executing -[2023-09-18T02:34:32] INFO Tea bag in teatime/cup: Final state: Ready -[2023-09-18T02:34:32] INFO Boiling water over the tea in teatime/cup: Ready -[2023-09-18T02:34:32] INFO Boiling water over the tea in teatime/cup: Executing -[2023-09-18T02:34:32] INFO Boiling water over the tea in teatime/cup: Final state: Ready -[2023-09-18T02:34:32] INFO Steeped tea in teatime/cup: Initial state: Pending -[2023-09-18T02:34:32] INFO Steeped tea in teatime/cup: Checking required tasks -[2023-09-18T02:34:32] INFO Tea steeping for 9s more -[2023-09-18T02:34:32] INFO Steeped tea in teatime/cup: Ready -[2023-09-18T02:34:32] INFO Steeped tea in teatime/cup: Executing -[2023-09-18T02:34:32] WARNING Steeped tea in teatime/cup: Final state: Pending -[2023-09-18T02:34:32] INFO Steeped tea with sugar in teatime/cup: Pending -[2023-09-18T02:34:32] WARNING Steeped tea with sugar in teatime/cup: Final state: Pending -[2023-09-18T02:34:32] WARNING A cup of steeped tea with sugar: Final state: Pending +[2023-09-19T22:46:33] INFO A cup of steeped tea with sugar: Checking required tasks +[2023-09-19T22:46:33] INFO Steeped tea with sugar in teatime/cup: Initial state: Pending +[2023-09-19T22:46:33] INFO Steeped tea with sugar in teatime/cup: Checking required tasks +[2023-09-19T22:46:33] INFO Boiling water over the tea in teatime/cup: Initial state: Pending +[2023-09-19T22:46:33] INFO Boiling water over the tea in teatime/cup: Checking required tasks +[2023-09-19T22:46:33] INFO Tea bag in teatime/cup: Initial state: Pending +[2023-09-19T22:46:33] INFO Tea bag in teatime/cup: Checking required tasks +[2023-09-19T22:46:33] INFO Tea bag in teatime/cup: Ready +[2023-09-19T22:46:33] INFO Tea bag in teatime/cup: Executing +[2023-09-19T22:46:33] INFO Tea bag in teatime/cup: Final state: Ready +[2023-09-19T22:46:33] INFO Boiling water over the tea in teatime/cup: Ready +[2023-09-19T22:46:33] INFO Boiling water over the tea in teatime/cup: Executing +[2023-09-19T22:46:33] INFO Boiling water over the tea in teatime/cup: Final state: Ready +[2023-09-19T22:46:33] INFO Steeped tea in teatime/cup: Initial state: Pending +[2023-09-19T22:46:33] INFO Steeped tea in teatime/cup: Checking required tasks +[2023-09-19T22:46:33] INFO Tea needs to steep for 9s +[2023-09-19T22:46:33] INFO Steeped tea in teatime/cup: Ready +[2023-09-19T22:46:33] INFO Steeped tea in teatime/cup: Executing +[2023-09-19T22:46:33] INFO Steeped tea with sugar in teatime/cup: Pending +[2023-09-19T22:46:33] WARNING Steeped tea with sugar in teatime/cup: Final state: Pending +[2023-09-19T22:46:33] WARNING A cup of steeped tea with sugar: Final state: Pending ``` On-disk workflow state now: @@ -352,12 +350,12 @@ teatime/ └── water ``` -Since the box of tea became available, the workflow could add tea to the cup and pour boiling water over it. Note the informative message `Tea steeping for 9s more`. If we iterate the workflow again quickly, we can see the steep time decreasing: +Since the box of tea became available, the workflow could add tea to the cup and pour boiling water over it. Note the informative message `Tea needs to steep for 9s`. If we iterate the workflow again quickly, we can see the steep time decreasing: ``` % iotaa src/iotaa/demo.py a_cup_of_tea ./teatime ... -[2023-09-18T02:34:39] INFO Tea steeping for 3s more +[2023-09-19T22:46:37] INFO Tea needs to steep for 5s ... ``` @@ -365,14 +363,13 @@ If we wait a few seconds more and iterate: ``` % iotaa src/iotaa/demo.py a_cup_of_tea ./teatime -[2023-09-18T02:35:36] INFO A cup of steeped tea with sugar: Initial state: Pending -[2023-09-18T02:35:36] INFO A cup of steeped tea with sugar: Checking required tasks -[2023-09-18T02:35:36] INFO Steeped tea with sugar in teatime/cup: Initial state: Pending -[2023-09-18T02:35:36] INFO Steeped tea with sugar in teatime/cup: Checking required tasks -[2023-09-18T02:35:36] INFO Steeped tea with sugar in teatime/cup: Ready -[2023-09-18T02:35:36] INFO Steeped tea with sugar in teatime/cup: Executing -[2023-09-18T02:35:36] INFO Steeped tea with sugar in teatime/cup: Final state: Ready -[2023-09-18T02:35:36] INFO A cup of steeped tea with sugar: Final state: Ready +[2023-09-19T22:47:11] INFO A cup of steeped tea with sugar: Checking required tasks +[2023-09-19T22:47:11] INFO Steeped tea with sugar in teatime/cup: Initial state: Pending +[2023-09-19T22:47:11] INFO Steeped tea with sugar in teatime/cup: Checking required tasks +[2023-09-19T22:47:11] INFO Steeped tea with sugar in teatime/cup: Ready +[2023-09-19T22:47:11] INFO Steeped tea with sugar in teatime/cup: Executing +[2023-09-19T22:47:11] INFO Steeped tea with sugar in teatime/cup: Final state: Ready +[2023-09-19T22:47:11] INFO A cup of steeped tea with sugar: Final state: Ready ``` Now that the tea has steeped long enough, the sugar has been added: @@ -391,9 +388,8 @@ One more iteration and we see that the workflow has reached its final state and ``` % iotaa src/iotaa/demo.py a_cup_of_tea ./teatime -[2023-09-18T02:35:58] INFO A cup of steeped tea with sugar: Initial state: Pending -[2023-09-18T02:35:58] INFO A cup of steeped tea with sugar: Checking required tasks -[2023-09-18T02:35:58] INFO A cup of steeped tea with sugar: Final state: Ready +[2023-09-19T22:48:22] INFO A cup of steeped tea with sugar: Checking required tasks +[2023-09-19T22:48:22] INFO A cup of steeped tea with sugar: Final state: Ready ``` Since `a_cup_of_tea()` is a `@tasks` collection, its state is contingent on that of its required tasks, so its readiness check will always involve checking requirements, unlike a non-collection `@task`, which can just check its own assets. @@ -415,14 +411,13 @@ Note how the workflow detects the change to the readiness of its assets and reco ``` % iotaa src/iotaa/demo.py a_cup_of_tea ./teatime -[2023-09-18T02:36:49] INFO A cup of steeped tea with sugar: Initial state: Pending -[2023-09-18T02:36:49] INFO A cup of steeped tea with sugar: Checking required tasks -[2023-09-18T02:36:49] INFO Steeped tea with sugar in teatime/cup: Initial state: Pending -[2023-09-18T02:36:49] INFO Steeped tea with sugar in teatime/cup: Checking required tasks -[2023-09-18T02:36:49] INFO Steeped tea with sugar in teatime/cup: Ready -[2023-09-18T02:36:49] INFO Steeped tea with sugar in teatime/cup: Executing -[2023-09-18T02:36:49] INFO Steeped tea with sugar in teatime/cup: Final state: Ready -[2023-09-18T02:36:49] INFO A cup of steeped tea with sugar: Final state: Ready +[2023-09-19T22:49:03] INFO A cup of steeped tea with sugar: Checking required tasks +[2023-09-19T22:49:03] INFO Steeped tea with sugar in teatime/cup: Initial state: Pending +[2023-09-19T22:49:03] INFO Steeped tea with sugar in teatime/cup: Checking required tasks +[2023-09-19T22:49:03] INFO Steeped tea with sugar in teatime/cup: Ready +[2023-09-19T22:49:03] INFO Steeped tea with sugar in teatime/cup: Executing +[2023-09-19T22:49:03] INFO Steeped tea with sugar in teatime/cup: Final state: Ready +[2023-09-19T22:49:03] INFO A cup of steeped tea with sugar: Final state: Ready ``` ``` @@ -454,29 +449,28 @@ Now request tea without sugar (note that task `steeped_tea` expects a path to th ``` % iotaa src/iotaa/demo.py steeped_tea ./teatime/cup -[2023-09-18T02:37:41] INFO Boiling water over the tea in ./teatime/cup: Initial state: Pending -[2023-09-18T02:37:41] INFO Boiling water over the tea in ./teatime/cup: Checking required tasks -[2023-09-18T02:37:41] INFO Tea bag in ./teatime/cup: Initial state: Pending -[2023-09-18T02:37:41] INFO Tea bag in ./teatime/cup: Checking required tasks -[2023-09-18T02:37:41] INFO Tea bag in ./teatime/cup: Ready -[2023-09-18T02:37:41] INFO Tea bag in ./teatime/cup: Executing -[2023-09-18T02:37:41] INFO Tea bag in ./teatime/cup: Final state: Ready -[2023-09-18T02:37:41] INFO Boiling water over the tea in ./teatime/cup: Ready -[2023-09-18T02:37:41] INFO Boiling water over the tea in ./teatime/cup: Executing -[2023-09-18T02:37:41] INFO Boiling water over the tea in ./teatime/cup: Final state: Ready -[2023-09-18T02:37:41] INFO Steeped tea in ./teatime/cup: Initial state: Pending -[2023-09-18T02:37:41] INFO Steeped tea in ./teatime/cup: Checking required tasks -[2023-09-18T02:37:41] INFO Tea steeping for 9s more -[2023-09-18T02:37:41] INFO Steeped tea in ./teatime/cup: Ready -[2023-09-18T02:37:41] INFO Steeped tea in ./teatime/cup: Executing -[2023-09-18T02:37:41] WARNING Steeped tea in ./teatime/cup: Final state: Pending +[2023-09-19T22:49:40] INFO Boiling water over the tea in ./teatime/cup: Initial state: Pending +[2023-09-19T22:49:40] INFO Boiling water over the tea in ./teatime/cup: Checking required tasks +[2023-09-19T22:49:40] INFO Tea bag in ./teatime/cup: Initial state: Pending +[2023-09-19T22:49:40] INFO Tea bag in ./teatime/cup: Checking required tasks +[2023-09-19T22:49:40] INFO Tea bag in ./teatime/cup: Ready +[2023-09-19T22:49:40] INFO Tea bag in ./teatime/cup: Executing +[2023-09-19T22:49:40] INFO Tea bag in ./teatime/cup: Final state: Ready +[2023-09-19T22:49:40] INFO Boiling water over the tea in ./teatime/cup: Ready +[2023-09-19T22:49:40] INFO Boiling water over the tea in ./teatime/cup: Executing +[2023-09-19T22:49:40] INFO Boiling water over the tea in ./teatime/cup: Final state: Ready +[2023-09-19T22:49:40] INFO Steeped tea in ./teatime/cup: Initial state: Pending +[2023-09-19T22:49:40] INFO Steeped tea in ./teatime/cup: Checking required tasks +[2023-09-19T22:49:40] INFO Tea needs to steep for 9s +[2023-09-19T22:49:40] INFO Steeped tea in ./teatime/cup: Ready +[2023-09-19T22:49:40] INFO Steeped tea in ./teatime/cup: Executing ``` After waiting for the tea to steep: ``` % iotaa src/iotaa/demo.py steeped_tea ./teatime/cup -[2023-09-18T02:38:01] INFO Steeped tea in ./teatime/cup: Initial state: Ready +[2023-09-19T22:49:56] INFO Steeped tea in ./teatime/cup: Initial state: Ready ``` On-disk state: diff --git a/recipe/meta.yaml b/recipe/meta.yaml index 202f27c..735d042 100644 --- a/recipe/meta.yaml +++ b/recipe/meta.yaml @@ -1,6 +1,6 @@ package: name: iotaa - version: 0.1.0 + version: 0.1.1 source: path: ../src build: diff --git a/src/iotaa/core.py b/src/iotaa/core.py index 2c580df..cd9b5d6 100644 --- a/src/iotaa/core.py +++ b/src/iotaa/core.py @@ -8,6 +8,7 @@ from dataclasses import dataclass from functools import cache from importlib import import_module +from itertools import chain from json import JSONDecodeError, loads from pathlib import Path from subprocess import STDOUT, CalledProcessError, check_output @@ -23,10 +24,10 @@ @dataclass class asset: """ - Description of a workflow asset. + A workflow asset (observable external state). - :ivar id: The asset itself (e.g. a path string or pathlib Path object). :ivar ready: A function - that, when called, indicates whether the asset is ready to use. + :param id: An object uniquely identifying the asset (e.g. a filesystem path). + :param ready: A function that, when called, indicates whether the asset is ready to use. """ id: Any @@ -34,6 +35,7 @@ class asset: _Assets = Union[Dict[str, asset], List[asset]] +_AssetT = Optional[Union[_Assets, asset]] def dryrun() -> None: @@ -44,26 +46,32 @@ def dryrun() -> None: _state.dry_run_enabled = True -def ids(assets: Union[_Assets, asset, None]) -> Dict[Union[int, str], Any]: +def ids(assets: _AssetT) -> Any: """ - Extract and return asset identity objects (e.g. paths to files). + Extract and return asset identity objects. - :param assets: A collection of assets. - :return: A dict of asset identity objects. + :param assets: A collection of assets, one asset, or None. + :return: Identity object(s) for the asset(s), in the same shape (e.g. dict, list, scalar, None) + as the provided assets. """ + # The Any return type is unfortunate, but avoids "not indexible" typechecker complaints when + # scalar types are included in a compound type. + if isinstance(assets, dict): return {k: v.id for k, v in assets.items()} if isinstance(assets, list): return {i: v.id for i, v in enumerate(assets)} if isinstance(assets, asset): - return {0: assets.id} - return {} + return assets.id + return None -def logcfg(verbose: Optional[bool] = False) -> None: +def logcfg(verbose: bool = False) -> None: """ Configure default logging. + + :param bool: Log at the debug level? """ logging.basicConfig( @@ -115,13 +123,14 @@ def run( :return: Did cmd exit with 0 (success) status? """ + indent = " " logging.info("%s: Running: %s", taskname, cmd) if cwd: - logging.info("%s: in %s", taskname, cwd) + logging.info("%s: %sin %s", taskname, indent, cwd) if env: - logging.info("%s: with environment variables:", taskname) + logging.info("%s: %swith environment variables:", taskname, indent) for key, val in env.items(): - logging.info("%s: %s=%s", taskname, key, val) + logging.info("%s: %s%s=%s", taskname, indent * 2, key, val) try: output = check_output( cmd, cwd=cwd, encoding="utf=8", env=env, shell=True, stderr=STDOUT, text=True @@ -130,80 +139,81 @@ def run( success = True except CalledProcessError as e: output = e.output - logging.error("%s: Failed with status: %s", taskname, e.returncode) + logging.error("%s: %sFailed with status: %s", taskname, indent, e.returncode) logfunc = logging.error success = False if output and (log or not success): - logfunc("%s: Output:", taskname) + logfunc("%s: %sOutput:", taskname, indent) for line in output.split("\n"): - logfunc("%s: %s", taskname, line) + logfunc("%s: %s%s", taskname, indent * 2, line) return success # Decorators -def external(f) -> Callable[..., _Assets]: +def external(f) -> Callable[..., _AssetT]: """ The @external decorator for assets that cannot be produced by the workflow. """ @cache - def decorated_external(*args, **kwargs) -> _Assets: + def decorated_external(*args, **kwargs) -> _AssetT: + top = _i_am_top_task() g = f(*args, **kwargs) taskname = next(g) - assets = _assets(next(g)) - for a in _extract(assets): - if not a.ready(): - _readiness(ready=False, taskname=taskname, external_=True) + assets = next(g) + ready = all(a.ready() for a in _listify(assets)) + if not ready or top: + _report_readiness(ready=ready, taskname=taskname, is_external=True) return assets return decorated_external -def task(f) -> Callable[..., _Assets]: +def task(f) -> Callable[..., _AssetT]: """ The @task decorator for assets that the workflow can produce. """ @cache - def decorated_task(*args, **kwargs) -> _Assets: - i_am_top_task = _am_i_top_task() + def decorated_task(*args, **kwargs) -> _AssetT: + top = _i_am_top_task() g = f(*args, **kwargs) taskname = next(g) - assets = _assets(next(g)) - ready = all(a.ready() for a in _extract(assets)) - if i_am_top_task or not ready: - _readiness(ready=ready, taskname=taskname, initial=True) - for a in _extract(assets): - if not a.ready(): - req_assets = _delegate(g, taskname) - if all(req_asset.ready() for req_asset in req_assets): - logging.info("%s: Ready", taskname) - _execute(g, taskname) - else: - logging.info("%s: Pending", taskname) - _readiness(ready=a.ready(), taskname=taskname) + assets = next(g) + ready_initial = all(a.ready() for a in _listify(assets)) + if not ready_initial or top: + _report_readiness(ready=ready_initial, taskname=taskname, initial=True) + if not ready_initial: + if all(req_asset.ready() for req_asset in _delegate(g, taskname)): + logging.info("%s: Ready", taskname) + _execute(g, taskname) + else: + logging.info("%s: Pending", taskname) + _report_readiness(ready=False, taskname=taskname) + ready_final = all(a.ready() for a in _listify(assets)) + if ready_final != ready_initial: + _report_readiness(ready=ready_final, taskname=taskname) return assets return decorated_task -def tasks(f) -> Callable[..., _Assets]: +def tasks(f) -> Callable[..., _AssetT]: """ - The @tasks decorator for collections of @task functions. + The @tasks decorator for collections of @task function calls. """ @cache - def decorated_tasks(*args, **kwargs) -> _Assets: - i_am_top_task = _am_i_top_task() + def decorated_tasks(*args, **kwargs) -> _AssetT: + top = _i_am_top_task() g = f(*args, **kwargs) taskname = next(g) - _readiness(ready=False, taskname=taskname, initial=True) assets = _delegate(g, taskname) - ready = all(a.ready() for a in _extract(assets)) - if i_am_top_task or not ready: - _readiness(ready=ready, taskname=taskname) + ready = all(a.ready() for a in _listify(assets)) + if not ready or top: + _report_readiness(ready=ready, taskname=taskname) return assets return decorated_tasks @@ -212,58 +222,27 @@ def decorated_tasks(*args, **kwargs) -> _Assets: # Private functions -def _am_i_top_task() -> bool: - """ - Is the calling task the first to execute in the workflow? - - :return: Is it? - """ - - if _state.initialized: - return False - _state.initialized = True - return True - - -def _assets(x: Optional[Union[Dict, List, asset]]) -> _Assets: - """ - Create an asset list when the argument is not already itearble. - - :param x: A singe asset, a None object or a dict or list of assets. - :return: A possibly empty iterable collecton of assets. - """ - - if x is None: - return [] - if isinstance(x, asset): - return [x] - return x - - def _delegate(g: Generator, taskname: str) -> List[asset]: """ Delegate execution to the current task's requirement(s). :param g: The current task. :param taskname: The current task's name. - :return: The assets of the required task(s), and the task name. + :return: The assets of the required task(s). """ # The next value of the generator is the collection of requirements of the current task. This - # may be a dict or list of task-function calls, a single task-function call, or None. The VALUES - # of each of those CALLS are asset collections -- also dicts, lists, scalars or None. A flat - # list of all the assets, filetered of None objects, is constructed and returned. + # may be a dict or list of task-function calls, a single task-function call, or None, so convert + # it to a list for iteration. The value of each task-function call is a collection of assets, + # one asset, or None. Convert those values to lists, flatten them, and filter None objects. logging.info("%s: Checking required tasks", taskname) - flat: list = [] - for a in _assets(next(g)): - flat += a.values() if isinstance(a, dict) else a if isinstance(a, list) else [a] - return list(filter(None, flat)) + return list(filter(None, chain(*[_listify(a) for a in _listify(next(g))]))) def _execute(g: Generator, taskname: str) -> None: """ - Execute the body of a decorated function. + Execute the post-yield body of a decorated function. :param g: The current task. :param taskname: The current task's name. @@ -279,25 +258,60 @@ def _execute(g: Generator, taskname: str) -> None: pass -def _extract(assets: _Assets) -> Generator: +def _formatter(prog: str) -> HelpFormatter: """ - Extract and yield individual assets from asset collections. + Help-message formatter. - :param assets: A collection of assets. + :param prog: The program name. + :return: An argparse help formatter. """ - for a in assets if isinstance(assets, list) else assets.values(): - yield a + return HelpFormatter(prog, max_help_position=4) -def _formatter(prog: str) -> HelpFormatter: +def _i_am_top_task() -> bool: """ - Help-message formatter. + Is the calling task the task-tree entry point? - :param prog: The program name. + :return: Is it? """ - return HelpFormatter(prog, max_help_position=4) + if _state.initialized: + return False + _state.initialized = True + return True + + +def _iterable(assets: _AssetT) -> _Assets: + """ + Create an asset list when the argument is not already itearble. + + :param assets: A collection of assets, one asset, or None. + :return: A possibly empty iterable collecton of assets. + """ + + if assets is None: + return [] + if isinstance(assets, asset): + return [assets] + return assets + + +def _listify(assets: _AssetT) -> List[asset]: + """ + Return a list representation of the provided asset(s). + + :param assets: A collection of assets, one asset, or None. + :return: A possibly empty list of assets. + """ + + if assets is None: + return [] + if isinstance(assets, asset): + return [assets] + if isinstance(assets, dict): + return list(assets.values()) + return assets def _parse_args(raw: List[str]) -> Namespace: @@ -319,19 +333,33 @@ def _parse_args(raw: List[str]) -> Namespace: return parser.parse_args(raw) -def _readiness( - ready: bool, taskname: str, external_: Optional[bool] = False, initial: Optional[bool] = False +def _reify(s: str) -> Any: + """ + Convert strings, when possible, to more specifically typed objects. + + :param s: The string to convert. + :return: A more Pythonic represetnation of the input string. + """ + + try: + return loads(s) + except JSONDecodeError: + return loads(f'"{s}"') + + +def _report_readiness( + ready: bool, taskname: str, is_external: Optional[bool] = False, initial: Optional[bool] = False ) -> None: """ Log information about the readiness of an asset. :param ready: Is the asset ready to use? :param taskname: The current task's name. - :param external_: Is this an @external task? + :param is_external: Is this an @external task? :param initial: Is this a initial (i.e. pre-run) readiness report? """ - extmsg = " (EXTERNAL)" if external_ and not ready else "" + extmsg = " (EXTERNAL)" if is_external and not ready else "" logf = logging.info if initial or ready else logging.warning logf( "%s: %s state: %s%s", @@ -340,16 +368,3 @@ def _readiness( "Ready" if ready else "Pending", extmsg, ) - - -def _reify(s: str) -> Any: - """ - Convert strings, when possible, to more specifically types. - - :param s: The string to convert. - """ - - try: - return loads(s) - except JSONDecodeError: - return loads(f'"{s}"') diff --git a/src/iotaa/demo.py b/src/iotaa/demo.py index 07a55d2..099e3db 100644 --- a/src/iotaa/demo.py +++ b/src/iotaa/demo.py @@ -1,5 +1,5 @@ """ -An iotaa demo application. +iotaa.demo. """ # pylint: disable=C0116 @@ -14,7 +14,7 @@ @tasks def a_cup_of_tea(basedir): yield "A cup of steeped tea with sugar" - cupdir = ids(cup(basedir))[0] + cupdir = ids(cup(basedir)) yield [cup(basedir), steeped_tea_with_sugar(cupdir)] @@ -30,7 +30,7 @@ def cup(basedir): @task def steeped_tea_with_sugar(cupdir): - # Add sugar to the steeped tea. + # Add sugar to the steeped tea. Requires tea to have steeped. for x in ingredient(cupdir, "sugar", "Steeped tea with sugar", steeped_tea): yield x @@ -40,7 +40,7 @@ def steeped_tea(cupdir): # Give tea time to steep. yield f"Steeped tea in {cupdir}" ready = False - water = ids(steeping_tea(cupdir))[0] + water = ids(steeping_tea(cupdir))["water"] if water.exists(): water_poured_time = dt.datetime.fromtimestamp(water.stat().st_mtime) ready_time = water_poured_time + dt.timedelta(seconds=10) @@ -48,7 +48,7 @@ def steeped_tea(cupdir): ready = now >= ready_time yield asset(None, lambda: ready) if not ready: - logging.info("Tea steeping for %ss more", int((ready_time - now).total_seconds())) + logging.info("Tea needs to steep for %ss", int((ready_time - now).total_seconds())) else: yield asset(None, lambda: False) yield steeping_tea(cupdir) @@ -56,14 +56,14 @@ def steeped_tea(cupdir): @task def steeping_tea(cupdir): - # Pour boiling water over the tea. + # Pour boiling water over the tea. Requires tea bag in cup. for x in ingredient(cupdir, "water", "Boiling water over the tea", tea_bag): yield x @task def tea_bag(cupdir): - # Place tea bag in the cup. + # Place tea bag in the cup. Requires box of tea bags. for x in ingredient(cupdir, "tea", "Tea bag", box_of_tea_bags): yield x @@ -79,6 +79,6 @@ def ingredient(cupdir, fn, name, req=None): path = Path(cupdir) / fn path.parent.mkdir(parents=True, exist_ok=True) yield f"{name} in {cupdir}" - yield asset(path, path.exists) + yield {fn: asset(path, path.exists)} yield req(cupdir) if req else None path.touch() diff --git a/src/iotaa/tests/test_core.py b/src/iotaa/tests/test_core.py index 4d69824..defd120 100644 --- a/src/iotaa/tests/test_core.py +++ b/src/iotaa/tests/test_core.py @@ -19,7 +19,12 @@ @fixture -def external_foo_scalar_dict(): +def delegate_assets(): + return (ic.asset(id=n, ready=lambda: True) for n in range(4)) + + +@fixture +def external_foo_scalar(): @ic.external def foo(path): f = path / "foo" @@ -29,6 +34,18 @@ def foo(path): return foo +@fixture +def module_for_main(tmp_path): + func = """ +def hi(x): + print(f"hello {x}!") +""".strip() + m = tmp_path / "a.py" + with open(m, "w", encoding="utf-8") as f: + print(func, file=f) + return m + + @fixture def rungen(): ic.logging.getLogger().setLevel(ic.logging.INFO) @@ -42,37 +59,37 @@ def f(): @fixture -def task_bar_list(external_foo_scalar_dict): +def task_bar_list(external_foo_scalar): @ic.task def bar(path): f = path / "bar" yield f"task bar {f}" yield [ic.asset(f, f.is_file)] - yield [external_foo_scalar_dict(path)] + yield [external_foo_scalar(path)] f.touch() return bar @fixture -def task_bar_dict(external_foo_scalar_dict): +def task_bar_dict(external_foo_scalar): @ic.task def bar(path): f = path / "bar" yield f"task bar {f}" yield {"path": ic.asset(f, f.is_file)} - yield [external_foo_scalar_dict(path)] + yield [external_foo_scalar(path)] f.touch() return bar @fixture -def tasks_baz(external_foo_scalar_dict, task_bar_dict): +def tasks_baz(external_foo_scalar, task_bar_dict): @ic.tasks def baz(path): yield "tasks baz" - yield [external_foo_scalar_dict(path), task_bar_dict(path)] + yield [external_foo_scalar(path), task_bar_dict(path)] return baz @@ -104,8 +121,8 @@ def test_ids_dict(): asset = ic.asset(id="bar", ready=lambda: True) assert ic.ids(assets={"foo": asset})["foo"] == expected assert ic.ids(assets=[asset])[0] == expected - assert ic.ids(assets=asset)[0] == expected - assert ic.ids(assets=None) == {} + assert ic.ids(assets=asset) == expected + assert ic.ids(assets=None) is None @pytest.mark.parametrize("vals", [(False, ic.logging.INFO), (True, ic.logging.DEBUG)]) @@ -116,18 +133,6 @@ def test_logcfg(vals): basicConfig.assert_called_once_with(datefmt=ANY, format=ANY, level=level) -@fixture -def module_for_main(tmp_path): - func = """ -def hi(x): - print(f"hello {x}!") -""".strip() - m = tmp_path / "a.py" - with open(m, "w", encoding="utf-8") as f: - print(func, file=f) - return m - - def test_main_live_abspath(capsys, module_for_main): with patch.object(ic.sys, "argv", new=["prog", str(module_for_main), "hi", "world"]): ic.main() @@ -193,28 +198,28 @@ def test_run_success(caplog, tmp_path): # Decorator tests -def test_external_not_ready(external_foo_scalar_dict, tmp_path): +def test_external_not_ready(external_foo_scalar, tmp_path): f = tmp_path / "foo" assert not f.is_file() - assets = list(ic._extract(external_foo_scalar_dict(tmp_path))) + assets = list(ic._listify(external_foo_scalar(tmp_path))) assert ic.ids(assets)[0] == f assert not assets[0].ready() -def test_external_ready(external_foo_scalar_dict, tmp_path): +def test_external_ready(external_foo_scalar, tmp_path): f = tmp_path / "foo" f.touch() assert f.is_file() - assets = list(ic._extract(external_foo_scalar_dict(tmp_path))) - assert ic.ids(assets)[0] == f - assert assets[0].ready() + asset = external_foo_scalar(tmp_path) + assert ic.ids(asset) == f + assert asset.ready() def test_task_not_ready(caplog, task_bar_dict, tmp_path): ic.logging.getLogger().setLevel(ic.logging.INFO) f_foo, f_bar = (tmp_path / x for x in ["foo", "bar"]) assert not any(x.is_file() for x in [f_foo, f_bar]) - assets = list(ic._extract(task_bar_dict(tmp_path))) + assets = list(ic._listify(task_bar_dict(tmp_path))) assert ic.ids(assets)[0] == f_bar assert not assets[0].ready() assert not any(x.is_file() for x in [f_foo, f_bar]) @@ -227,7 +232,7 @@ def test_task_ready(caplog, task_bar_list, tmp_path): f_foo.touch() assert f_foo.is_file() assert not f_bar.is_file() - assets = list(ic._extract(task_bar_list(tmp_path))) + assets = list(ic._listify(task_bar_list(tmp_path))) assert ic.ids(assets)[0] == f_bar assert assets[0].ready() assert all(x.is_file for x in [f_foo, f_bar]) @@ -237,7 +242,7 @@ def test_task_ready(caplog, task_bar_list, tmp_path): def test_tasks_not_ready(tasks_baz, tmp_path): f_foo, f_bar = (tmp_path / x for x in ["foo", "bar"]) assert not any(x.is_file() for x in [f_foo, f_bar]) - assets = list(ic._extract(tasks_baz(tmp_path))) + assets = list(ic._listify(tasks_baz(tmp_path))) assert ic.ids(assets)[0] == f_foo assert ic.ids(assets)[1] == f_bar assert not any(x.ready() for x in assets) @@ -249,7 +254,7 @@ def test_tasks_ready(tasks_baz, tmp_path): f_foo.touch() assert f_foo.is_file() assert not f_bar.is_file() - assets = list(ic._extract(tasks_baz(tmp_path))) + assets = list(ic._listify(tasks_baz(tmp_path))) assert ic.ids(assets)[0] == f_foo assert ic.ids(assets)[1] == f_bar assert all(x.ready() for x in assets) @@ -259,20 +264,6 @@ def test_tasks_ready(tasks_baz, tmp_path): # Private function tests -@pytest.mark.parametrize("val", [True, False]) -def test__am_i_top_task(val): - with patch.object(ic, "_state", new=ic.ns(initialized=not val)): - assert ic._am_i_top_task() == val - - -def test__assets(): - a = ic.asset(id=None, ready=lambda: True) - assert ic._assets(x=None) == [] - assert ic._assets(x=a) == [a] - assert ic._assets(x=[a]) == [a] - assert ic._assets(x={"a": a}) == {"a": a} - - def test__delegate_none(caplog): ic.logging.getLogger().setLevel(ic.logging.INFO) @@ -283,11 +274,6 @@ def f(): assert logged("task: Checking required tasks", caplog) -@fixture -def delegate_assets(): - return (ic.asset(id=n, ready=lambda: True) for n in range(4)) - - def test__delegate_scalar(caplog, delegate_assets): ic.logging.getLogger().setLevel(ic.logging.INFO) a1, *_ = delegate_assets @@ -342,20 +328,34 @@ def test__execute_live(caplog, rungen): assert logged("task: Executing", caplog) -def test__extract(): - expected = {0: "foo", 1: "bar"} - ready = lambda: True - asset_foo, asset_bar = ic.asset("foo", ready), ic.asset("bar", ready) - assert ic.ids(list(ic._extract(assets={"foo": asset_foo, "bar": asset_bar}))) == expected - assert ic.ids(list(ic._extract(assets=[asset_foo, asset_bar]))) == expected - - def test__formatter(): formatter = ic._formatter("foo") assert isinstance(formatter, ic.HelpFormatter) assert formatter._prog == "foo" +@pytest.mark.parametrize("val", [True, False]) +def test__i_am_top_task(val): + with patch.object(ic, "_state", new=ic.ns(initialized=not val)): + assert ic._i_am_top_task() == val + + +def test__iterable(): + a = ic.asset(id=None, ready=lambda: True) + assert ic._iterable(assets=None) == [] + assert ic._iterable(assets=a) == [a] + assert ic._iterable(assets=[a]) == [a] + assert ic._iterable(assets={"a": a}) == {"a": a} + + +def test__listify(): + a = ic.asset(id=None, ready=lambda: True) + assert ic._listify(assets=None) == [] + assert ic._listify(assets=a) == [a] + assert ic._listify(assets=[a]) == [a] + assert ic._listify(assets={"a": a}) == [a] + + def test__parse_args(): # Specifying module, function, and two args (standard logging): a0 = ic._parse_args(raw="a_module a_function arg1 arg2".split(" ")) @@ -381,6 +381,11 @@ def test__parse_args(): assert e.value.code == 2 +def test__reify(): + strs = ["foo", "88", "3.14", "true"] + assert [ic._reify(s) for s in strs] == ["foo", 88, 3.14, True] + + @pytest.mark.parametrize( "vals", [ @@ -388,13 +393,8 @@ def test__parse_args(): (False, True, False, "Final state: Pending (EXTERNAL)"), ], ) -def test__readiness(caplog, vals): +def test__report_readiness(caplog, vals): ready, ext, init, msg = vals ic.logging.getLogger().setLevel(ic.logging.INFO) - ic._readiness(ready=ready, taskname="task", external_=ext, initial=init) + ic._report_readiness(ready=ready, taskname="task", is_external=ext, initial=init) assert logged(f"task: {msg}", caplog) - - -def test__reify(): - strs = ["foo", "88", "3.14", "true"] - assert [ic._reify(s) for s in strs] == ["foo", 88, 3.14, True]