Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

run window checkpoint, integration #474

Draft
wants to merge 34 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
2255fc2
Upgrade greatexpections v1.0.0 and fix unit tests
TrangPham Sep 19, 2024
0b19d11
Skip integration tests and update to alpha version
TrangPham Sep 19, 2024
c1a9291
Update other gx version
TrangPham Sep 20, 2024
93302fd
Updated for type check
TrangPham Sep 20, 2024
4a73cdb
Merge branch 'main' into v1-unit-tests
TrangPham Sep 22, 2024
c1db7c5
WIP v0 version (no tests)
anthonyburdi Oct 2, 2024
d200de2
Pre-release version bump
tiny-tim-bot Oct 2, 2024
b5e51f8
Update integration tests (#452)
TrangPham Oct 3, 2024
5796392
Upgrade draft data source config action for v1 (#455)
rreinoldsc Oct 3, 2024
71984e5
add RunWindowCheckpointAction to available actions __init__
shiplet Oct 3, 2024
726969f
add leading slash to api url
shiplet Oct 3, 2024
d0f4c99
update url to retrieve cloud config
joshua-stauffer Oct 4, 2024
ce44e62
update core a
joshua-stauffer Oct 4, 2024
e6f7574
update tests
joshua-stauffer Oct 4, 2024
522d108
update checkpoint action for v1
joshua-stauffer Oct 4, 2024
2481766
fix tests & tweak
joshua-stauffer Oct 4, 2024
1eb0946
register all checkpoint events as v1, remove outdated todos
joshua-stauffer Oct 4, 2024
e377381
update create scheduled event payload
joshua-stauffer Oct 4, 2024
560382b
Merge branch 'v1-unit-tests' into integrate-with-mercury-fastapi-endp…
rreinoldsc Oct 4, 2024
9fa70ef
Merge branch 'integrate-with-mercury-fastapi-endpoints' of github.com…
rreinoldsc Oct 4, 2024
a1624b5
rename action, and fix url
rreinoldsc Oct 7, 2024
71f566d
add unit test
rreinoldsc Oct 7, 2024
28e2d38
lint
rreinoldsc Oct 7, 2024
91f66ef
remove window from parametrization, needs diff test setup
rreinoldsc Oct 7, 2024
b4b7367
update sig
rreinoldsc Oct 7, 2024
52a435d
Merge branch 'main' of github.com:great-expectations/cloud into v1-un…
rreinoldsc Oct 7, 2024
7007cf4
Merge branch 'v1-unit-tests' of github.com:great-expectations/cloud i…
rreinoldsc Oct 7, 2024
9fb5d7d
fix kwarg
rreinoldsc Oct 7, 2024
1f11047
fix test kwarg
rreinoldsc Oct 7, 2024
9ce4f2a
fix unit assert
rreinoldsc Oct 7, 2024
d7989f2
rm note
rreinoldsc Oct 7, 2024
df42c28
api updates
rreinoldsc Oct 7, 2024
1a714f8
include WIP integration test
rreinoldsc Oct 7, 2024
c2e3f45
cleanup
rreinoldsc Oct 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ services:
- mq

mercury-service-api:
image: 258143015559.dkr.ecr.us-east-1.amazonaws.com/mercury/api
platform: linux/amd64
image: 258143015559.dkr.ecr.us-east-1.amazonaws.com/mercury/api:v1api-latest
restart: always
volumes:
- ./services/ge_cloud/:/code
ports:
- 5000:5000
# V1 runs on external port 7000
- 5000:7000
environment:
LOGGING_LEVEL: ${LOGGING_LEVEL}
ENVIRONMENT: ${ENVIRONMENT}
Expand Down
1 change: 1 addition & 0 deletions great_expectations_cloud/agent/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
from great_expectations_cloud.agent.actions.run_scheduled_checkpoint import (
RunScheduledCheckpointAction,
)
from great_expectations_cloud.agent.actions.run_window_checkpoint import RunWindowCheckpointAction
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def check_draft_datasource_config(
"fluent-style Data Source."
)
try:
datasource_cls = self._context.sources.type_lookup[datasource_type]
datasource_cls = self._context.data_sources.type_lookup[datasource_type]
except LookupError as exc:
raise TypeError( # noqa: TRY003 # one off error
"DraftDatasourceConfigAction received an unknown Data Source type."
Expand All @@ -75,10 +75,10 @@ def _get_table_names(self, datasource: Datasource) -> list[str]:

def _update_table_names_list(self, config_id: UUID, table_names: list[str]) -> None:
with create_session(access_token=self._auth_key) as session:
response = session.patch(
url=f"{self._base_url}/organizations/"
f"{self._organization_id}/datasources/drafts/{config_id}",
json={"table_names": table_names},
url = f"{self._base_url}/api/v1/organizations/{self._organization_id}/draft-table-names/{config_id}"
response = session.put(
url=url,
json={"data": {"table_names": table_names}},
)
if not response.ok:
raise RuntimeError( # noqa: TRY003 # one off error
Expand All @@ -90,8 +90,8 @@ def _update_table_names_list(self, config_id: UUID, table_names: list[str]) -> N

def get_draft_config(self, config_id: UUID) -> dict[str, Any]:
resource_url = (
f"{self._base_url}/organizations/"
f"{self._organization_id}/datasources/drafts/{config_id}"
f"{self._base_url}/api/v1/organizations/"
f"{self._organization_id}/draft-datasources/{config_id}"
)
with create_session(access_token=self._auth_key) as session:
response = session.get(resource_url)
Expand All @@ -102,11 +102,11 @@ def get_draft_config(self, config_id: UUID) -> dict[str, Any]:
)
data = response.json()
try:
return data["data"]["attributes"]["draft_config"] # type: ignore[no-any-return]
return data["data"]["config"] # type: ignore[no-any-return]
except KeyError as e:
raise RuntimeError( # noqa: TRY003 # one off error
"Malformed response received from GX Cloud"
) from e


register_event_action("0", DraftDatasourceConfigEvent, DraftDatasourceConfigAction)
register_event_action("1", DraftDatasourceConfigEvent, DraftDatasourceConfigAction)
30 changes: 20 additions & 10 deletions great_expectations_cloud/agent/actions/run_checkpoint.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

from typing_extensions import override

Expand All @@ -21,33 +21,43 @@


class RunCheckpointAction(AgentAction[RunCheckpointEvent]):
# TODO: New actions need to be created that are compatible with GX v1 and registered for v1.
# This action is registered for v0, see register_event_action()

@override
def run(self, event: RunCheckpointEvent, id: str) -> ActionResult:
return run_checkpoint(self._context, event, id)


class MissingCheckpointNameError(ValueError):
"""Property checkpoint_name is required but not present."""


def run_checkpoint(
context: CloudDataContext,
event: RunCheckpointEvent | RunScheduledCheckpointEvent | RunWindowCheckpointEvent,
id: str,
expectation_parameters: dict[str, Any] | None = None,
) -> ActionResult:
"""Note: the logic for this action is broken out into this function so that
the same logic can be used for both RunCheckpointEvent and RunScheduledCheckpointEvent."""
# TODO: move connection testing into OSS; there isn't really a reason it can't be done there

# the checkpoint_name property on possible events is optional for backwards compatibility,
# but this action requires it in order to run:
if not event.checkpoint_name:
raise MissingCheckpointNameError

# test connection to data source and any assets used by checkpoint
for datasource_name, data_asset_names in event.datasource_names_to_asset_names.items():
datasource = context.get_datasource(datasource_name)
datasource = context.data_sources.get(name=datasource_name)
datasource.test_connection(test_assets=False) # raises `TestConnectionError` on failure
for (
data_asset_name
) in data_asset_names: # only test connection for assets that are validated in checkpoint
asset = datasource.get_asset(data_asset_name)
asset.test_connection() # raises `TestConnectionError` on failure
checkpoint_run_result = context.run_checkpoint(
ge_cloud_id=event.checkpoint_id,
batch_request={"options": event.splitter_options} if event.splitter_options else None,

# run checkpoint
checkpoint = context.checkpoints.get(name=event.checkpoint_name)
checkpoint_run_result = checkpoint.run(
batch_parameters=event.splitter_options, expectation_parameters=expectation_parameters
)

validation_results = checkpoint_run_result.run_results
Expand All @@ -66,4 +76,4 @@ def run_checkpoint(
)


register_event_action("0", RunCheckpointEvent, RunCheckpointAction)
register_event_action("1", RunCheckpointEvent, RunCheckpointAction)
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,4 @@ def _raise_on_any_metric_exception(self, metric_run: MetricRun) -> None:


register_event_action("0", RunMetricsListEvent, MetricListAction)
register_event_action("1", RunMetricsListEvent, MetricListAction)
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@


class RunScheduledCheckpointAction(AgentAction[RunScheduledCheckpointEvent]):
# TODO: New actions need to be created that are compatible with GX v1 and registered for v1.
# This action is registered for v0, see register_event_action()

@override
def run(self, event: RunScheduledCheckpointEvent, id: str) -> ActionResult:
return run_checkpoint(self._context, event, id)


register_event_action("0", RunScheduledCheckpointEvent, RunScheduledCheckpointAction)
register_event_action("1", RunScheduledCheckpointEvent, RunScheduledCheckpointAction)
32 changes: 25 additions & 7 deletions great_expectations_cloud/agent/actions/run_window_checkpoint.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

from great_expectations.core.http import create_session
from great_expectations.exceptions import GXCloudError
from typing_extensions import override

from great_expectations_cloud.agent.actions.agent_action import (
Expand All @@ -14,14 +16,30 @@


class RunWindowCheckpointAction(AgentAction[RunWindowCheckpointEvent]):
# TODO: New actions need to be created that are compatible with GX v1 and registered for v1.
# This action is registered for v0, see register_event_action()

@override
def run(self, event: RunWindowCheckpointEvent, id: str) -> ActionResult:
# TODO: https://greatexpectations.atlassian.net/browse/ZELDA-922
# This currently only runs a normal checkpoint. Logic for window checkpoints needs to be added (e.g. call the backend to get the params and then construct the evaluation_parameters before passing them into context.run_checkpoint()) One way we can do this via a param in `run_checkpoint()` that takes a function to build the evaluation_parameters, defaulting to a noop for the other checkpoint action types.
return run_checkpoint(self._context, event, id)
with create_session(access_token=self._auth_key) as session:
expectation_parameters_for_checkpoint_url = f"{self._base_url}/api/v1/organizations/{self._organization_id}/checkpoints/{event.checkpoint_id}/expectation-parameters"
response = session.get(url=expectation_parameters_for_checkpoint_url)

if not response.ok:
raise GXCloudError(
message=f"RunWindowCheckpointAction encountered an error while connecting to GX Cloud. "
f"Unable to retrieve expectation_parameters for Checkpoint with ID={event.checkpoint_id}.",
response=response,
)
data = response.json()
try:
expectation_parameters = data["data"]["expectation_parameters"]
except KeyError as e:
raise GXCloudError(
message="Malformed response received from GX Cloud",
response=response,
) from e

return run_checkpoint(
self._context, event, id, expectation_parameters=expectation_parameters
)


register_event_action("0", RunWindowCheckpointEvent, RunWindowCheckpointAction)
register_event_action("1", RunWindowCheckpointEvent, RunWindowCheckpointAction)
16 changes: 9 additions & 7 deletions great_expectations_cloud/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
JobStatus,
ScheduledEventBase,
UnknownEvent,
UpdateJobStatusRequest,
build_failed_job_completed_status,
)

Expand All @@ -74,7 +75,7 @@ class GXAgentConfig(AgentBaseExtraForbid):
queue: str
connection_string: AmqpDsn
# pydantic will coerce this string to AnyUrl type
gx_cloud_base_url: AnyUrl = CLOUD_DEFAULT_BASE_URL # type: ignore[assignment] # pydantic will coerce
gx_cloud_base_url: AnyUrl = CLOUD_DEFAULT_BASE_URL
gx_cloud_organization_id: str
gx_cloud_access_token: str

Expand Down Expand Up @@ -389,7 +390,7 @@ def _get_config(cls) -> GXAgentConfig:

# obtain the broker url and queue name from Cloud
agent_sessions_url = (
f"{env_vars.gx_cloud_base_url}/organizations/"
f"{env_vars.gx_cloud_base_url}/api/v1/organizations/"
f"{env_vars.gx_cloud_organization_id}/agent-sessions"
)

Expand Down Expand Up @@ -431,10 +432,11 @@ def _update_status(self, job_id: str, status: JobStatus, org_id: UUID) -> None:
extra={"job_id": job_id, "status": str(status), "organization_id": str(org_id)},
)
agent_sessions_url = (
f"{self._config.gx_cloud_base_url}/organizations/{org_id}" + f"/agent-jobs/{job_id}"
f"{self._config.gx_cloud_base_url}/api/v1/organizations/{org_id}"
+ f"/agent-jobs/{job_id}"
)
with create_session(access_token=self.get_auth_key()) as session:
data = status.json()
data = UpdateJobStatusRequest(data=status).json()
session.patch(agent_sessions_url, data=data)
LOGGER.info(
"Status updated",
Expand All @@ -454,16 +456,16 @@ def _create_scheduled_job_and_set_started(
event_context: event with related properties and actions.
"""
data = {
**event_context.event.dict(),
"correlation_id": event_context.correlation_id,
"event": event_context.event.dict(),
}
LOGGER.info(
"Creating scheduled job and setting started",
extra={**data, "organization_id": str(org_id)},
)

agent_sessions_url = (
f"{self._config.gx_cloud_base_url}/organizations/{org_id}" + "/agent-jobs"
f"{self._config.gx_cloud_base_url}/api/v1/organizations/{org_id}" + "/agent-jobs"
)
with create_session(access_token=self.get_auth_key()) as session:
payload = Payload(data=data)
Expand Down Expand Up @@ -493,7 +495,7 @@ def _set_http_session_headers(
Note: the Agent-Job-Id header value will be set for all GX Cloud request until this method is
called again.
"""
from great_expectations import __version__ # type: ignore[attr-defined] # TODO: fix this
from great_expectations import __version__
from great_expectations.core import http
from great_expectations.data_context.store.gx_cloud_store_backend import GXCloudStoreBackend

Expand Down
2 changes: 1 addition & 1 deletion great_expectations_cloud/agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

class GxAgentEnvVars(BaseSettings):
# pydantic will coerce this string to AnyUrl type
gx_cloud_base_url: AnyUrl = CLOUD_DEFAULT_BASE_URL # type: ignore[assignment]
gx_cloud_base_url: AnyUrl = CLOUD_DEFAULT_BASE_URL
gx_cloud_organization_id: str
gx_cloud_access_token: str

Expand Down
2 changes: 1 addition & 1 deletion great_expectations_cloud/agent/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def _get_major_version(version: str) -> str:
return str(parsed.major)


version = gx.__version__ # type: ignore[attr-defined] # TODO: fix this
version = gx.__version__
_GX_MAJOR_VERSION = _get_major_version(str(version))


Expand Down
4 changes: 4 additions & 0 deletions great_expectations_cloud/agent/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ class JobCompleted(AgentBaseExtraForbid):
JobStatus = Union[JobStarted, JobCompleted]


class UpdateJobStatusRequest(AgentBaseExtraForbid):
data: JobStatus


def build_failed_job_completed_status(error: BaseException) -> JobCompleted:
if isinstance(error, GXCoreError):
status = JobCompleted(
Expand Down
Loading
Loading