Skip to content

Commit

Permalink
Merge pull request #27 from discursus-data/26_improve_connectors
Browse files Browse the repository at this point in the history
Renaming orchestrator connector + Changing how we handle default connector profiles
  • Loading branch information
olivierdupuis authored Jul 17, 2023
2 parents 008b95a + 613e0cd commit b6e1f87
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 132 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ The DAMN tool leverages various connectors to interact with different data syste
The configuration file uses the following structure:

```yaml
connector_name:
connector_type:
profile_name:
param1: value1
param2: value2
```
- connector_name: The name of the connector (e.g., dagster, dbt, s3, etc.).
- connector_type: The name of the connector (e.g., orchestrator, io-manager, data-warehouse, etc.).
- profile_name: The name of the profile for the connector. You can have multiple profiles per connector (e.g., prod, dev, test, etc.).
- param1, param2, etc.: The parameters needed for each connector. The required parameters will depend on the specific connector. For example, a Dagster connector might require endpoint and api_token.
### Switching Between Profiles
Expand All @@ -74,12 +74,12 @@ damn ls --profile dev


## Connectors
### Dagster
This is currently the default connector supported by the DAMN tool. Here's an example configuration for a dagster connector with prod and dev profiles:
### Orchestrator
This is the default connector required by the DAMN tool. For now, we only support Dagster as the service provider for this connector. Here's an example configuration for an orchestrator connector with a dagster profiles:

```yaml
dagster:
prod:
orchestrator:
dagster:
endpoint: https://your-dagster-instance.com/prod/graphql
api_token: your-api-token
```
Expand Down
4 changes: 2 additions & 2 deletions connectors.yml.REPLACE
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
dagster:
prod:
orchestrator:
dagster:
endpoint: https://ACCOUNT.dagster.cloud/prod/graphql
api_token: user:123ABC

Expand Down
12 changes: 6 additions & 6 deletions damn_tool/ls.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
from .utils.helpers import load_config, run_and_capture


def get_dagster_assets(prefix, profile):
def get_orchestrator_assets(prefix, profile):
# Get connector configs
dagster_config = load_config('dagster', profile)
orchestrator_config = load_config('orchestrator', profile)

# Set headers
headers = {
"Content-Type": "application/json",
"Dagster-Cloud-Api-Token": dagster_config['api_token'],
"Dagster-Cloud-Api-Token": orchestrator_config['api_token'],
}

# Get data
Expand Down Expand Up @@ -49,7 +49,7 @@ def get_dagster_assets(prefix, profile):
"""

response = requests.post(
dagster_config['endpoint'], # type: ignore
orchestrator_config['endpoint'], # type: ignore
headers=headers, # type: ignore
json={"query": query}
)
Expand All @@ -68,11 +68,11 @@ def display_assets(data):

@click.command()
@click.option('--prefix', default=None, help='Get list of assets with a given prefix')
@click.option('--profile', default='prod', help='Profile to use')
@click.option('--profile', default=None, help='Profile to use')
@click.option('--copy-output', is_flag=True, help='Copy command output to clipboard')
def ls(prefix, profile, copy_output):
"""List your platform's data assets"""
data = get_dagster_assets(prefix, profile)
data = get_orchestrator_assets(prefix, profile)

if copy_output:
output = run_and_capture(display_assets, data)
Expand Down
40 changes: 20 additions & 20 deletions damn_tool/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
from .utils.aws import list_objects_and_folders


def get_dagster_metrics(asset, profile):
# Getting and processing Dagster metrics...
def get_orchestrator_metrics(asset, profile):
# Getting and processing orchestrator metrics...
# Get connector configs
dagster_config = load_config('dagster', profile)
orchestrator_config = load_config('orchestrator', profile)

# Set headers
headers = {
"Content-Type": "application/json",
"Dagster-Cloud-Api-Token": dagster_config['api_token'],
"Dagster-Cloud-Api-Token": orchestrator_config['api_token'],
}

asset_list = asset.split('/')
Expand Down Expand Up @@ -59,7 +59,7 @@ def get_dagster_metrics(asset, profile):
"""

response = requests.post(
dagster_config['endpoint'], # type: ignore
orchestrator_config['endpoint'], # type: ignore
headers=headers, # type: ignore
json={"query": query}
)
Expand Down Expand Up @@ -140,20 +140,20 @@ def get_io_manager_metrics(asset, io_manager):
}


def display_metrics(dagster_metrics, io_manager_metrics):
click.echo(colored("Latest Dagster materialization metrics:", 'magenta'))
click.echo(colored(f"- Latest run ID: ", 'yellow') + colored(f"{dagster_metrics['run_id']}", 'green'))
click.echo(colored(f"- Status: ", 'yellow') + colored(f"{dagster_metrics['status']}", 'green'))
click.echo(colored(f"- Start time: ", 'yellow') + colored(f"{dagster_metrics['start_time']}", 'green'))
click.echo(colored(f"- End time: ", 'yellow') + colored(f"{dagster_metrics['end_time']}", 'green'))
click.echo(colored(f"- Elapsed time: ", 'yellow') + colored(f"{dagster_metrics['elapsed_time']}", 'green'))
def display_metrics(orchestrator_metrics, io_manager_metrics):
click.echo(colored("Latest Orchestrator materialization metrics:", 'magenta'))
click.echo(colored(f"- Latest run ID: ", 'yellow') + colored(f"{orchestrator_metrics['run_id']}", 'green'))
click.echo(colored(f"- Status: ", 'yellow') + colored(f"{orchestrator_metrics['status']}", 'green'))
click.echo(colored(f"- Start time: ", 'yellow') + colored(f"{orchestrator_metrics['start_time']}", 'green'))
click.echo(colored(f"- End time: ", 'yellow') + colored(f"{orchestrator_metrics['end_time']}", 'green'))
click.echo(colored(f"- Elapsed time: ", 'yellow') + colored(f"{orchestrator_metrics['elapsed_time']}", 'green'))

click.echo('\n')

click.echo(colored("Dagster partitions:", 'magenta'))
click.echo(colored(f"- Number of partitions: ", 'yellow') + colored(f"{dagster_metrics['num_partitions']}", 'green'))
click.echo(colored(f"- Materialized partitions: ", 'yellow') + colored(f"{dagster_metrics['num_materialized']}", 'green'))
click.echo(colored(f"- Failed partitions: ", 'yellow') + colored(f"{dagster_metrics['num_failed']}", 'green'))
click.echo(colored("Orchestrator partitions:", 'magenta'))
click.echo(colored(f"- Number of partitions: ", 'yellow') + colored(f"{orchestrator_metrics['num_partitions']}", 'green'))
click.echo(colored(f"- Materialized partitions: ", 'yellow') + colored(f"{orchestrator_metrics['num_materialized']}", 'green'))
click.echo(colored(f"- Failed partitions: ", 'yellow') + colored(f"{orchestrator_metrics['num_failed']}", 'green'))

click.echo('\n')

Expand All @@ -165,17 +165,17 @@ def display_metrics(dagster_metrics, io_manager_metrics):

@click.command()
@click.argument('asset', type=str)
@click.option('--profile', default='prod', help='Profile to use')
@click.option('--profile', default=None, help='Profile to use')
@click.option('--io_manager', default='aws', help='IO manager storage system to use')
@click.option('--copy-output', is_flag=True, help='Copy command output to clipboard')
def metrics(asset, profile, io_manager, copy_output):
"""List your asset's metrics"""
dagster_metrics = get_dagster_metrics(asset, profile)
orchestrator_metrics = get_orchestrator_metrics(asset, profile)
io_manager_metrics = get_io_manager_metrics(asset, io_manager)

if copy_output:
output = run_and_capture(display_metrics, dagster_metrics, io_manager_metrics)
output = run_and_capture(display_metrics, orchestrator_metrics, io_manager_metrics)
markdown_output = output.replace('\x1b[36m- ', '- ').replace('\x1b[0m', '') # Removing the color codes
pyperclip.copy(markdown_output)
else:
display_metrics(dagster_metrics, io_manager_metrics)
display_metrics(orchestrator_metrics, io_manager_metrics)
12 changes: 6 additions & 6 deletions damn_tool/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
from .utils.helpers import load_config, run_and_capture


def get_dagster_asset_info(asset, profile):
def get_orchestrator_asset_info(asset, profile):
# Get connector configs
dagster_config = load_config('dagster', profile)
orchestrator_config = load_config('orchestrator', profile)

# Set headers
headers = {
"Content-Type": "application/json",
"Dagster-Cloud-Api-Token": dagster_config['api_token'],
"Dagster-Cloud-Api-Token": orchestrator_config['api_token'],
}

# Split the asset key into a list of strings
Expand Down Expand Up @@ -126,7 +126,7 @@ def get_dagster_asset_info(asset, profile):
"""

response = requests.post(
dagster_config['endpoint'], # type: ignore
orchestrator_config['endpoint'], # type: ignore
headers=headers,
json={"query": query}
)
Expand Down Expand Up @@ -232,11 +232,11 @@ def display_asset_info(asset, data):

@click.command()
@click.argument('asset', required=True)
@click.option('--profile', default='prod', help='Profile to use')
@click.option('--profile', default=None, help='Profile to use')
@click.option('--copy-output', is_flag=True, help='Copy command output to clipboard')
def show(asset, profile, copy_output):
"""Show details for a specific asset"""
data = get_dagster_asset_info(asset, profile)
data = get_orchestrator_asset_info(asset, profile)

if copy_output:
output = run_and_capture(display_asset_info, asset, data)
Expand Down
6 changes: 5 additions & 1 deletion damn_tool/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ def load_config(connector, profile):
config = yaml.safe_load(rendered_template)

try:
return config[connector][profile]
if not profile:
first_key = list(config[connector].keys())[0]
return config[connector][first_key]
else:
return config[connector][profile]
except KeyError:
raise ValueError(f"No configuration found for connector '{connector}' with profile '{profile}'")

Expand Down
6 changes: 3 additions & 3 deletions performance_profiler.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from line_profiler import LineProfiler
from damn_tool.metrics import get_dagster_metrics
from damn_tool.metrics import get_orchestrator_metrics

def do_profile():
lp = LineProfiler()
lp.add_function(get_dagster_metrics)
lp.runcall(get_dagster_metrics, "gdelt/gdelt_gkg_articles", "prod")
lp.add_function(get_orchestrator_metrics)
lp.runcall(get_orchestrator_metrics, "gdelt/gdelt_gkg_articles", "prod")
lp.print_stats()

do_profile()
Loading

0 comments on commit b6e1f87

Please sign in to comment.