Skip to content

Commit

Permalink
Backport 1.6.1 and 1.6.2 to 1.2 (#1729)
Browse files Browse the repository at this point in the history
* Multi arch imageSpec (#1630)

Multi arch imageSpec (#1630)

Signed-off-by: Eduardo Apolinario <[email protected]>

* Add executor_path and applications_path to spark config (#1634)

* Add executor_path and applications_path to spark config

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

---------

Signed-off-by: Kevin Su <[email protected]>

* Add support for env vars to pyflyte run (#1617)

* Add support for env vars to pyflyte run

Signed-off-by: Kevin Su <[email protected]>

* bump idl

Signed-off-by: Kevin Su <[email protected]>

* update doc

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

---------

Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Eduardo Apolinario <[email protected]>

* Fetch task executions in dynamic  (#1636)

* fetch task executions in dynamic

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* lint

Signed-off-by: Kevin Su <[email protected]>

---------

Signed-off-by: Kevin Su <[email protected]>

* Added metrics command to pyflyte (#1513)

Signed-off-by: Daniel Rammer <[email protected]>

* Add http_proxy to client & Fix deviceflow (#1611)

* Add http_proxy to client & Fix deviceflow

RB=3890720

Signed-off-by: byhsu <[email protected]>

* nit

Signed-off-by: byhsu <[email protected]>

* lint!

Signed-off-by: byhsu <[email protected]>

---------

Signed-off-by: byhsu <[email protected]>
Co-authored-by: byhsu <[email protected]>
Signed-off-by: Eduardo Apolinario <[email protected]>

* Improve variable names (#1642)

Signed-off-by: byhsu <[email protected]>
Co-authored-by: byhsu <[email protected]>

* Address resolution (#1567)

Signed-off-by: Yee Hing Tong <[email protected]>

* pyflyte run supports pickle (#1646)

Signed-off-by: Kevin Su <[email protected]>

* Wait for the pod plugin instead of flytekit (#1647)

Signed-off-by: eduardo apolinario <[email protected]>
Co-authored-by: eduardo apolinario <[email protected]>

* Beautify deviceflow prompt (#1625)

* Beautify deviceflow prompt

Signed-off-by: byhsu <[email protected]>

* lint!

Signed-off-by: byhsu <[email protected]>

* lint

Signed-off-by: byhsu <[email protected]>

---------

Signed-off-by: byhsu <[email protected]>
Co-authored-by: byhsu <[email protected]>

* Improve flytekit register (#1643)

* Fix pyflyte register

Signed-off-by: byhsu <[email protected]>

* revert

Signed-off-by: byhsu <[email protected]>

* lint

Signed-off-by: byhsu <[email protected]>

---------

Signed-off-by: byhsu <[email protected]>
Co-authored-by: byhsu <[email protected]>

* Pass verify flag to all authenticators (#1641)

Signed-off-by: byhsu <[email protected]>

* Allow annotated FlyteFile as task input argument (#1632)

* fix: Allow annotated FlyteFile as task input argument

Using an annotated FlyteFile type as an input to a task was previously impossible due
to an exception being raised in `FlyteFilePathTransformer.to_python_value`.

This commit applies the fix previously used in `FlyteFilePathTransformer.to_literal`
to permit using annotated FlyteFiles as either inputs and outputs of a task.

Issue: #3424
Signed-off-by: Adrian Rumpold <[email protected]>

* refactor: Unified handling of annotated types in type engine

Issue: #3424
Signed-off-by: Adrian Rumpold <[email protected]>

* fix: Use py3.8-compatible types in type engine tests

Issue: #3424
Signed-off-by: Adrian Rumpold <[email protected]>

---------

Signed-off-by: Adrian Rumpold <[email protected]>

* Use logger instead of print statement in sqlalchemy plugin (#1651)

* use logging info instead of print

Signed-off-by: wirthual <[email protected]>

* isorted files

Signed-off-by: wirthual <[email protected]>

* import root logger from flytekit

Signed-off-by: wirthual <[email protected]>

---------

Signed-off-by: wirthual <[email protected]>

* Map over notebook task (#1650)

* map over notebook

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* tests

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* add a flag

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* fix tests

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* lint

Signed-off-by: Kevin Su <[email protected]>

* Fix tests

Signed-off-by: Kevin Su <[email protected]>

* lint

Signed-off-by: Kevin Su <[email protected]>

---------

Signed-off-by: Kevin Su <[email protected]>

* Support single literals in tiny url (#1654)

Signed-off-by: Yee Hing Tong <[email protected]>

* Add support overriding image (#1652)

Signed-off-by: Kevin Su <[email protected]>

* Fix ability to pass None to task with Optional kwarg, add test (#1657)

Signed-off-by: Fabio Grätz <[email protected]>
Co-authored-by: Fabio Grätz <[email protected]>

* Regenerate plugins requirements

Signed-off-by: eduardo apolinario <[email protected]>

* Regenerate plugins requirements and linting

Signed-off-by: eduardo apolinario <[email protected]>

* Regenerate whylogs requirements

Signed-off-by: eduardo apolinario <[email protected]>

---------

Signed-off-by: Eduardo Apolinario <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: byhsu <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: eduardo apolinario <[email protected]>
Signed-off-by: Adrian Rumpold <[email protected]>
Signed-off-by: wirthual <[email protected]>
Signed-off-by: Fabio Grätz <[email protected]>
Co-authored-by: Kevin Su <[email protected]>
Co-authored-by: Dan Rammer <[email protected]>
Co-authored-by: ByronHsu <[email protected]>
Co-authored-by: byhsu <[email protected]>
Co-authored-by: Yee Hing Tong <[email protected]>
Co-authored-by: eduardo apolinario <[email protected]>
Co-authored-by: Adrian Rumpold <[email protected]>
Co-authored-by: wirthual <[email protected]>
Co-authored-by: Fabio M. Graetz, Ph.D <[email protected]>
Co-authored-by: Fabio Grätz <[email protected]>
  • Loading branch information
11 people authored Jul 12, 2023
1 parent 7c1a354 commit ff1db34
Show file tree
Hide file tree
Showing 101 changed files with 4,567 additions and 1,439 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pythonpublish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
echo "No tagged version found, exiting"
exit 1
fi
LINK="https://pypi.org/project/flytekit/${VERSION}"
LINK="https://pypi.org/project/flytekitplugins-pod/${VERSION}"
for i in {1..60}; do
if curl -L -I -s -f ${LINK} >/dev/null; then
echo "Found pypi"
Expand Down
9 changes: 4 additions & 5 deletions flytekit/clients/auth/authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from abc import abstractmethod
from dataclasses import dataclass

import click

from . import token_client
from .auth_client import AuthorizationClient
from .exceptions import AccessTokenNotFoundError, AuthenticationError
Expand Down Expand Up @@ -255,11 +257,8 @@ def refresh_credentials(self):
resp = token_client.get_device_code(
self._device_auth_endpoint, self._client_id, self._audience, self._scope, self._http_proxy_url, self._verify
)
print(
f"""
To Authenticate navigate in a browser to the following URL: {resp.verification_uri} and enter code: {resp.user_code}
"""
)
text = f"To Authenticate, navigate in a browser to the following URL: {click.style(resp.verification_uri, fg='blue', underline=True)} and enter code: {click.style(resp.user_code, fg='blue')}"
click.secho(text)
try:
# Currently the refresh token is not retreived. We may want to add support for refreshTokens so that
# access tokens can be refreshed for once authenticated machines
Expand Down
6 changes: 6 additions & 0 deletions flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -1018,3 +1018,9 @@ def get_download_signed_url(
expires_in=expires_in_pb,
)
)

def get_data(self, flyte_uri: str) -> _data_proxy_pb2.GetDataResponse:
req = _data_proxy_pb2.GetDataRequest(flyte_url=flyte_uri)

resp = self._dataproxy_stub.GetData(req, metadata=self._metadata)
return resp
19 changes: 14 additions & 5 deletions flytekit/clients/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,15 @@ def get_execution_data(self, get_execution_data_request):
"""
return self._stub.GetExecutionData(get_execution_data_request, metadata=self._metadata)

def get_execution_metrics(self, get_execution_metrics_request):
"""
Returns metrics partitioning and categorizing the workflow execution time-series.
:param flyteidl.admin.execution_pb2.WorkflowExecutionGetMetricsRequest get_execution_metrics_request:
:rtype: flyteidl.admin.execution_pb2.WorkflowExecutionGetMetricsResponse
"""
return self._stub.GetExecutionMetrics(get_execution_metrics_request, metadata=self._metadata)

def list_executions_paginated(self, resource_list_request):
"""
Lists the executions for a given identifier.
Expand Down Expand Up @@ -570,9 +579,9 @@ def create_upload_location(
def create_download_location(
self, create_download_location_request: _dataproxy_pb2.CreateDownloadLocationRequest
) -> _dataproxy_pb2.CreateDownloadLocationResponse:
"""
Get a signed url to be used during fast registration
:param flyteidl.service.dataproxy_pb2.CreateDownloadLocationRequest create_download_location_request:
:rtype: flyteidl.service.dataproxy_pb2.CreateDownloadLocationResponse
"""
return self._dataproxy_stub.CreateDownloadLocation(create_download_location_request, metadata=self._metadata)

def create_download_link(
self, create_download_link_request: _dataproxy_pb2.CreateDownloadLinkRequest
) -> _dataproxy_pb2.CreateDownloadLinkResponse:
return self._dataproxy_stub.CreateDownloadLink(create_download_link_request, metadata=self._metadata)
218 changes: 218 additions & 0 deletions flytekit/clis/sdk_in_container/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
from datetime import datetime

import rich_click as click
import yaml
from flyteidl.admin.execution_pb2 import WorkflowExecutionGetMetricsRequest
from flyteidl.core.identifier_pb2 import WorkflowExecutionIdentifier

from flytekit.clis.sdk_in_container.constants import CTX_DOMAIN, CTX_PROJECT
from flytekit.clis.sdk_in_container.helpers import get_and_save_remote_with_click_context

CTX_DEPTH = "depth"

_dump_help = """
The dump command aggregates workflow execution metrics and displays them. This aggregation is meant to provide an easy
to understand breakdown of where time is spent in a hierarchical manner.
- execution_id refers to the id of the workflow execution
"""

_explain_help = """
The explain command prints each individual execution span and the associated timestamps and Flyte entity reference.
This breakdown provides precise information into exactly how and when Flyte processes a workflow execution.
- execution_id refers to the id of the workflow execution
"""


@click.group("metrics")
@click.option(
"-d",
"--depth",
required=False,
type=int,
default=-1,
help="The depth of Flyte entity hierarchy to traverse when computing metrics for this execution",
)
@click.option(
"-p",
"--project",
required=False,
type=str,
default="flytesnacks",
help="The project of the workflow execution",
)
@click.option(
"-d",
"--domain",
required=False,
type=str,
default="development",
help="The domain of the workflow execution",
)
@click.pass_context
def metrics(ctx: click.Context, depth, domain, project):
ctx.obj[CTX_DEPTH] = depth
ctx.obj[CTX_DOMAIN] = domain
ctx.obj[CTX_PROJECT] = project


@click.command("dump", help=_dump_help)
@click.argument("execution_id", type=str)
@click.pass_context
def metrics_dump(
ctx: click.Context,
execution_id: str,
):
depth = ctx.obj[CTX_DEPTH]
domain = ctx.obj[CTX_DOMAIN]
project = ctx.obj[CTX_PROJECT]

# retrieve remote
remote = get_and_save_remote_with_click_context(ctx, project, domain)
sync_client = remote.client

# retrieve workflow execution metrics
workflow_execution_id = WorkflowExecutionIdentifier(project=project, domain=domain, name=execution_id)

request = WorkflowExecutionGetMetricsRequest(id=workflow_execution_id, depth=depth)
response = sync_client.get_execution_metrics(request)

# aggregate spans and print
id, info = aggregate_reference_span(response.span)
yaml.emitter.Emitter.process_tag = lambda self, *args, **kw: None
print(yaml.dump({id: info}, indent=2))


def aggregate_reference_span(span):
id = ""
id_type = span.WhichOneof("id")
if id_type == "workflow_id":
id = span.workflow_id.name
elif id_type == "node_id":
id = span.node_id.node_id
elif id_type == "task_id":
id = span.task_id.retry_attempt

spans = aggregate_spans(span.spans)
return id, spans


def aggregate_spans(spans):
breakdown = {}

tasks = {}
nodes = {}
workflows = {}

for span in spans:
id_type = span.WhichOneof("id")
if id_type == "operation_id":
operation_id = span.operation_id

start_time = datetime.fromtimestamp(span.start_time.seconds + span.start_time.nanos / 1e9)
end_time = datetime.fromtimestamp(span.end_time.seconds + span.end_time.nanos / 1e9)
total_time = (end_time - start_time).total_seconds()

if operation_id in breakdown:
breakdown[operation_id] += total_time
else:
breakdown[operation_id] = total_time
else:
id, underlying_span = aggregate_reference_span(span)

if id_type == "workflow_id":
workflows[id] = underlying_span
elif id_type == "node_id":
nodes[id] = underlying_span
elif id_type == "task_id":
tasks[id] = underlying_span

for operation_id, total_time in underlying_span["breakdown"].items():
if operation_id in breakdown:
breakdown[operation_id] += total_time
else:
breakdown[operation_id] = total_time

span = {"breakdown": breakdown}

if len(tasks) > 0:
span["task_attempts"] = tasks
if len(nodes) > 0:
span["nodes"] = nodes
if len(workflows) > 0:
span["workflows"] = workflows

return span


@click.command("explain", help=_explain_help)
@click.argument("execution_id", type=str)
@click.pass_context
def metrics_explain(
ctx: click.Context,
execution_id: str,
):
depth = ctx.obj[CTX_DEPTH]
domain = ctx.obj[CTX_DOMAIN]
project = ctx.obj[CTX_PROJECT]

# retrieve remote
remote = get_and_save_remote_with_click_context(ctx, project, domain)
sync_client = remote.client

# retrieve workflow execution metrics
workflow_execution_id = WorkflowExecutionIdentifier(project=project, domain=domain, name=execution_id)

request = WorkflowExecutionGetMetricsRequest(id=workflow_execution_id, depth=depth)
response = sync_client.get_execution_metrics(request)

# print execution spans
print(
"{:25s}{:25s}{:25s} {:>8s} {:s}".format(
"operation", "start_timestamp", "end_timestamp", "duration", "entity"
)
)
print("-" * 140)

print_span(response.span, -1, "")


def print_span(span, indent, identifier):
start_time = datetime.fromtimestamp(span.start_time.seconds + span.start_time.nanos / 1e9)
end_time = datetime.fromtimestamp(span.end_time.seconds + span.end_time.nanos / 1e9)

id_type = span.WhichOneof("id")
span_identifier = ""

if id_type == "operation_id":
indent_str = ""
for i in range(indent):
indent_str += " "

print(
"{:25s}{:25s}{:25s} {:7.2f}s {:s}{:s}".format(
span.operation_id,
start_time.strftime("%m-%d %H:%M:%S.%f"),
end_time.strftime("%m-%d %H:%M:%S.%f"),
(end_time - start_time).total_seconds(),
indent_str,
identifier,
)
)

span_identifier = identifier + "/" + span.operation_id
else:
if id_type == "workflow_id":
span_identifier = "workflow/" + span.workflow_id.name
elif id_type == "node_id":
span_identifier = "node/" + span.node_id.node_id
elif id_type == "task_id":
span_identifier = "task/" + str(span.task_id.retry_attempt)

for under_span in span.spans:
print_span(under_span, indent + 1, span_identifier)


metrics.add_command(metrics_dump)
metrics.add_command(metrics_explain)
2 changes: 2 additions & 0 deletions flytekit/clis/sdk_in_container/pyflyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from flytekit.clis.sdk_in_container.init import init
from flytekit.clis.sdk_in_container.launchplan import launchplan
from flytekit.clis.sdk_in_container.local_cache import local_cache
from flytekit.clis.sdk_in_container.metrics import metrics
from flytekit.clis.sdk_in_container.package import package
from flytekit.clis.sdk_in_container.register import register
from flytekit.clis.sdk_in_container.run import run
Expand Down Expand Up @@ -144,6 +145,7 @@ def main(ctx, pkgs: typing.List[str], config: str, verbose: bool):
main.add_command(backfill)
main.add_command(serve)
main.add_command(build)
main.add_command(metrics)
main.add_command(launchplan)
main.epilog

Expand Down
Loading

0 comments on commit ff1db34

Please sign in to comment.