Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faststream docs #403

Draft
wants to merge 9 commits into
base: fastapi-poc
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions asyncapi.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"asyncapi": "2.6.0",
"defaultContentType": "application/json",
"info": {
"title": "FastStream",
"version": "0.1.0",
"description": ""
},
"servers": {
"development": {
"url": "amqps://org-agent-a7ba9504-a02f-40c8-8c46-145b4ff5765b:B-8Eflf%7Bd5y_%23k_-Lcs%5CVYcN8Osf,!%40l%3CB=dE9j515Yj%3Ec%5Bb4POG2%5D*_ja%60%5BV%[email protected]:5671/",
"protocol": "amqps",
"protocolVersion": "0.9.1"
}
},
"channels": {},
"components": {
"messages": {},
"schemas": {}
}
}
15 changes: 15 additions & 0 deletions asyncapi.yaml
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To see what these docs would look like live, go to the asyncapi studio and paste them into the editor.

https://studio.asyncapi.com/

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
asyncapi: 2.6.0
defaultContentType: application/json
info:
title: FastStream
version: 0.1.0
description: ""
servers:
development:
url: amqps://org-agent-a7ba9504-a02f-40c8-8c46-145b4ff5765b:XI,IT)U5%5Bd%5C%23%22$%40%3F0SwhhdF0gp%60z(1-WHE%3Fu%230tTm6N%3C%5DQLjg;*v6%22%7DU%7D4%5D%7CM%[email protected]:5671/
protocol: amqps
protocolVersion: 0.9.1
channels: {}
components:
messages: {}
schemas: {}
14 changes: 8 additions & 6 deletions great_expectations_cloud/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,9 @@ class GXAgent:
_PYPI_GX_AGENT_PACKAGE_NAME = "great_expectations_cloud"
_PYPI_GREAT_EXPECTATIONS_PACKAGE_NAME = "great_expectations"

def __init__(self: Self):
def __init__(self: Self, app: FastStream, broker: RabbitBroker) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change for runner.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just discovered the Faststream app exposes a .broker attribute, so we don't need to inject app + broker, app by itself is enough.

Copy link
Member Author

@Kilo59 Kilo59 Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TrangPham
Also, I think I see how to make this not breaking, but I didn't have a chance to implement it today.
I'll try to push it in the morning.

self.app = app
self.broker = broker
agent_version: str = self.get_current_gx_agent_version()
great_expectations_version: str = self._get_current_great_expectations_version()
print(f"GX Agent version: {agent_version}")
Expand Down Expand Up @@ -239,15 +241,11 @@ def run(self) -> None:
async def _listen(self) -> None:
"""Manage connection lifecycle."""
try:
broker = RabbitBroker(
url=str(self._config.connection_string),
)
app = FastStream(broker)
queue = RabbitQueue(name=self._config.queue, durable=True, passive=True)
print("Queue is valid.")

# FastStream declares default exchange if not provided
@broker.subscriber(queue, retry=MAX_DELIVERY)
@self.broker.subscriber(queue, retry=MAX_DELIVERY)
async def handle_me(
msg: dict[str, Any], correlation_id: str = Context("message.correlation_id")
) -> None:
Expand Down Expand Up @@ -542,3 +540,7 @@ def _update_headers_agent_patch(
# TODO: this is relying on a private implementation detail
# use a public API once it is available
http._update_headers = _update_headers_agent_patch


broker = RabbitBroker(str(GXAgent._get_config().connection_string))
app: Final[FastStream] = FastStream(broker)
25 changes: 25 additions & 0 deletions tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
PYPROJECT_TOML: Final[pathlib.Path] = PROJECT_ROOT / "pyproject.toml"
DOCKERFILE_PATH: Final[str] = "./Dockerfile"

ASYNCAPI_YAML: Final = PROJECT_ROOT / "asyncapi.yaml"
ASNYCAPI_JSON: Final = PROJECT_ROOT / "asyncapi.json"

# TODO: need a module level constant for the runner app to generate docs
AGENT_APP_PATH_ID: Final = "great_expectations_cloud.agent.agent:app"


@functools.lru_cache(maxsize=8)
def _get_pyproject_tool_dict(
Expand Down Expand Up @@ -78,6 +84,25 @@ def lint(ctx: Context, check: bool = False, unsafe_fixes: bool = False) -> None:
ctx.run(" ".join(cmds), echo=True, pty=True)


@invoke.task(aliases=["sync-asyncapi"])
def gen_asyncapi(ctx: Context) -> None:
"""
Generate asyncapi docs
TODO: this will not work until AGENT_APP_PATH_ID points to a FastStream app instance
https://faststream.airt.ai/latest/faststream/#running-the-application
"""
core_cmds = ["faststream", "docs", "gen"]
yaml_cmds = [*core_cmds, "--yaml", AGENT_APP_PATH_ID]
ctx.run(" ".join(yaml_cmds), echo=True, pty=True)
assert ASYNCAPI_YAML.exists(), f"{ASYNCAPI_YAML} not found"
print(f"Updated {ASYNCAPI_YAML}")

json_cmds = [*core_cmds, AGENT_APP_PATH_ID]
ctx.run(" ".join(json_cmds), echo=True, pty=True)
assert ASNYCAPI_JSON.exists(), f"{ASNYCAPI_JSON} not found"
print(f"Updated {ASNYCAPI_JSON}")


@invoke.task(
aliases=("build",),
help={
Expand Down
Loading