You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Add the ability to fetch the history of a metric from the SDK.
For the following example flow:
fromaqueductimportClientclient=Client()
# Fetch the hotel_reviews table.db=client.integration("aqueduct_demo")
hotel_reviews=db.sql("select * from hotel_reviews")
# Add a metric for the number of rows in the table.@metric(output="num rows")defnum_rows(df):
returnlen(df)
num_row_metric=num_rows(hotel_reviews)
client.publish_flow("Example Flow", artifacts=num_row_metric)
We can currently fetch the metric value for a particular flow with:
flow=client.flow(flow_name="Example Flow")
flow_run=flow.latest() # can also use `flow.fetch(<workflow_dag_id>)` for historical flows.m=flow_run.artifact("num rows")
m.get() # Should return `len(hotel_reviews)`.
However, we’d like to be able to fetch the history of a metric here with a history() method.
# This should return a list of all computed metric values up until this run.# Every run will have a corresponding entry in this list, in reverse # chronological order (latest first).m.history()
That is to say, if “Example Flow” has ran 3 times. Then flow.latest().artifact("num rows").history() should return a list of length three, with each entry containing the value of that metric for its corresponding run.
Implementation Details
SDK
The history() method should be added to the NumericArtifact class here. Note that history() should only work for artifact’s fetched from an existing flow, meaning when self._from_flow_run(code) is set to true. You will need to edit and use the APIClient to make requests to the server.
Backend
For this task, you shouldn’t need to change the implementation of REST API endpoint. In case you’d like a reference, it’s located here . Prepare parses the raw http request into a structured listArtifactResultsArgs , and Perform uses the structured args and run more complex application logic.
In case you may want to change the backend implementation, the typical steps to follow are:
Update listArtifactResultArgs with new parameters if necessary
Parse any new parameters from raw request (the parameter can come from path, query, or request body).
Consume the parsed parameter in Perform with your updated application logic.
Requirements
Add the ability to fetch the history of a metric from the SDK.
For the following example flow:
We can currently fetch the metric value for a particular flow with:
However, we’d like to be able to fetch the history of a metric here with a
history()
method.That is to say, if “Example Flow” has ran 3 times. Then
flow.latest().artifact("num rows").history()
should return a list of length three, with each entry containing the value of that metric for its corresponding run.Implementation Details
SDK
The
history()
method should be added to theNumericArtifact
class here. Note thathistory()
should only work for artifact’s fetched from an existing flow, meaning whenself._from_flow_run
(code) is set to true. You will need to edit and use the APIClient to make requests to the server.Backend
For this task, you shouldn’t need to change the implementation of REST API endpoint. In case you’d like a reference, it’s located here .
Prepare
parses the raw http request into a structuredlistArtifactResultsArgs
, andPerform
uses the structured args and run more complex application logic.In case you may want to change the backend implementation, the typical steps to follow are:
listArtifactResultArgs
with new parameters if necessaryPerform
with your updated application logic.Running and Testing
Please refer to our CONTRIBUTING.md
References:
flow_run.artifact()
is hereThe text was updated successfully, but these errors were encountered: