Skip to content

Commit

Permalink
Merge branch 'master' into master+ing-473-redshift-collapse-cll
Browse files Browse the repository at this point in the history
  • Loading branch information
sid-acryl authored Jan 31, 2024
2 parents a3e9b0d + 3f9490d commit 886a513
Show file tree
Hide file tree
Showing 67 changed files with 1,161 additions and 772 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/docker-unified.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ jobs:
steps:
- name: Check out the repo
uses: hsheth2/sane-checkout-action@v1
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
distribution: "zulu"
java-version: 17
- name: Run lint on smoke test
run: |
./gradlew :smoke-test:lint
- name: Compute Tag
id: tag
run: |
Expand Down
2 changes: 1 addition & 1 deletion docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ failure_log:
### init
The init command is used to tell `datahub` about where your DataHub instance is located. The CLI will point to localhost DataHub by default.
Running `datahub init` will allow you to customize the datahub instance you are communicating with.
Running `datahub init` will allow you to customize the datahub instance you are communicating with. It has an optional `--use-password` option which allows to initialise the config using username, password. We foresee this mainly being used by admins as majority of organisations will be using SSO and there won't be any passwords to use.

**_Note_**: Provide your GMS instance's host when the prompt asks you for the DataHub host.

Expand Down
207 changes: 102 additions & 105 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@

import click
import requests
import yaml
from deprecated import deprecated
from pydantic import BaseModel, ValidationError
from requests.models import Response
from requests.sessions import Session

from datahub.cli import config_utils
from datahub.emitter.aspect import ASPECT_MAP, TIMESERIES_ASPECT_MAP
from datahub.emitter.request_helper import make_curl_command
from datahub.emitter.serialization_helper import post_json_transform
Expand All @@ -23,13 +22,6 @@

log = logging.getLogger(__name__)

DEFAULT_GMS_HOST = "http://localhost:8080"
CONDENSED_DATAHUB_CONFIG_PATH = "~/.datahubenv"
DATAHUB_CONFIG_PATH = os.path.expanduser(CONDENSED_DATAHUB_CONFIG_PATH)

DATAHUB_ROOT_FOLDER = os.path.expanduser("~/.datahub")

ENV_SKIP_CONFIG = "DATAHUB_SKIP_CONFIG"
ENV_METADATA_HOST_URL = "DATAHUB_GMS_URL"
ENV_METADATA_HOST = "DATAHUB_GMS_HOST"
ENV_METADATA_PORT = "DATAHUB_GMS_PORT"
Expand All @@ -45,104 +37,13 @@
# For the methods that aren't duplicates, that logic should be moved to the client.


class GmsConfig(BaseModel):
server: str
token: Optional[str] = None


class DatahubConfig(BaseModel):
gms: GmsConfig


def get_boolean_env_variable(key: str, default: bool = False) -> bool:
value = os.environ.get(key)
if value is None:
return default
elif value.lower() in ("true", "1"):
return True
else:
return False


def set_env_variables_override_config(url: str, token: Optional[str]) -> None:
"""Should be used to override the config when using rest emitter"""
config_override[ENV_METADATA_HOST_URL] = url
if token is not None:
config_override[ENV_METADATA_TOKEN] = token


def persist_datahub_config(config: dict) -> None:
with open(DATAHUB_CONFIG_PATH, "w+") as outfile:
yaml.dump(config, outfile, default_flow_style=False)
return None


def write_gms_config(
host: str, token: Optional[str], merge_with_previous: bool = True
) -> None:
config = DatahubConfig(gms=GmsConfig(server=host, token=token))
if merge_with_previous:
try:
previous_config = get_client_config(as_dict=True)
assert isinstance(previous_config, dict)
except Exception as e:
# ok to fail on this
previous_config = {}
log.debug(
f"Failed to retrieve config from file {DATAHUB_CONFIG_PATH}: {e}. This isn't fatal."
)
config_dict = {**previous_config, **config.dict()}
else:
config_dict = config.dict()
persist_datahub_config(config_dict)


def should_skip_config() -> bool:
return get_boolean_env_variable(ENV_SKIP_CONFIG, False)


def ensure_datahub_config() -> None:
if not os.path.isfile(DATAHUB_CONFIG_PATH):
click.secho(
f"No {CONDENSED_DATAHUB_CONFIG_PATH} file found, generating one for you...",
bold=True,
)
write_gms_config(DEFAULT_GMS_HOST, None)


def get_client_config(as_dict: bool = False) -> Union[Optional[DatahubConfig], dict]:
with open(DATAHUB_CONFIG_PATH, "r") as stream:
try:
config_json = yaml.safe_load(stream)
if as_dict:
return config_json
try:
datahub_config = DatahubConfig.parse_obj(config_json)
return datahub_config
except ValidationError as e:
click.echo(
f"Received error, please check your {CONDENSED_DATAHUB_CONFIG_PATH}"
)
click.echo(e, err=True)
sys.exit(1)
except yaml.YAMLError as exc:
click.secho(f"{DATAHUB_CONFIG_PATH} malformed, error: {exc}", bold=True)
return None


def get_details_from_config():
datahub_config = get_client_config(as_dict=False)
assert isinstance(datahub_config, DatahubConfig)
if datahub_config is not None:
gms_config = datahub_config.gms

gms_host = gms_config.server
gms_token = gms_config.token
return gms_host, gms_token
else:
return None, None


def get_details_from_env() -> Tuple[Optional[str], Optional[str]]:
host = os.environ.get(ENV_METADATA_HOST)
port = os.environ.get(ENV_METADATA_PORT)
Expand Down Expand Up @@ -178,12 +79,12 @@ def get_url_and_token():
if len(config_override.keys()) > 0:
gms_host = config_override.get(ENV_METADATA_HOST_URL)
gms_token = config_override.get(ENV_METADATA_TOKEN)
elif should_skip_config():
elif config_utils.should_skip_config():
gms_host = gms_host_env
gms_token = gms_token_env
else:
ensure_datahub_config()
gms_host_conf, gms_token_conf = get_details_from_config()
config_utils.ensure_datahub_config()
gms_host_conf, gms_token_conf = config_utils.get_details_from_config()
gms_host = first_non_null([gms_host_env, gms_host_conf])
gms_token = first_non_null([gms_token_env, gms_token_conf])
return gms_host, gms_token
Expand Down Expand Up @@ -253,14 +154,18 @@ def parse_run_restli_response(response: requests.Response) -> dict:
exit()

if not isinstance(response_json, dict):
click.echo(f"Received error, please check your {CONDENSED_DATAHUB_CONFIG_PATH}")
click.echo(
f"Received error, please check your {config_utils.CONDENSED_DATAHUB_CONFIG_PATH}"
)
click.echo()
click.echo(response_json)
exit()

summary = response_json.get("value")
if not isinstance(summary, dict):
click.echo(f"Received error, please check your {CONDENSED_DATAHUB_CONFIG_PATH}")
click.echo(
f"Received error, please check your {config_utils.CONDENSED_DATAHUB_CONFIG_PATH}"
)
click.echo()
click.echo(response_json)
exit()
Expand Down Expand Up @@ -686,3 +591,95 @@ def command(ctx: click.Context) -> None:
ctx.exit(1)

return command


def get_session_login_as(
username: str, password: str, frontend_url: str
) -> requests.Session:
session = requests.Session()
headers = {
"Content-Type": "application/json",
}
system_auth = get_system_auth()
if system_auth is not None:
session.headers.update({"Authorization": system_auth})
else:
data = '{"username":"' + username + '", "password":"' + password + '"}'
response = session.post(f"{frontend_url}/logIn", headers=headers, data=data)
response.raise_for_status()
return session


def _ensure_valid_gms_url_acryl_cloud(url: str) -> str:
if "acryl.io" not in url:
return url
if url.startswith("http://"):
url = url.replace("http://", "https://")
if url.endswith("acryl.io"):
url = f"{url}/gms"
return url


def fixup_gms_url(url: str) -> str:
if url is None:
return ""
if url.endswith("/"):
url = url.rstrip("/")
url = _ensure_valid_gms_url_acryl_cloud(url)
return url


def guess_frontend_url_from_gms_url(gms_url: str) -> str:
gms_url = fixup_gms_url(gms_url)
url = gms_url
if url.endswith("/gms"):
url = gms_url.rstrip("/gms")
if url.endswith("8080"):
url = url[:-4] + "9002"
return url


def generate_access_token(
username: str,
password: str,
gms_url: str,
token_name: Optional[str] = None,
validity: str = "ONE_HOUR",
) -> Tuple[str, str]:
frontend_url = guess_frontend_url_from_gms_url(gms_url)
session = get_session_login_as(
username=username,
password=password,
frontend_url=frontend_url,
)
now = datetime.now()
timestamp = now.astimezone().isoformat()
if token_name is None:
token_name = f"cli token {timestamp}"
json = {
"query": """mutation createAccessToken($input: CreateAccessTokenInput!) {
createAccessToken(input: $input) {
accessToken
metadata {
id
actorUrn
ownerUrn
name
description
}
}
}""",
"variables": {
"input": {
"type": "PERSONAL",
"actorUrn": f"urn:li:corpuser:{username}",
"duration": validity,
"name": token_name,
}
},
}
response = session.post(f"{frontend_url}/api/v2/graphql", json=json)
response.raise_for_status()
return token_name, response.json().get("data", {}).get("createAccessToken", {}).get(
"accessToken", None
)
102 changes: 102 additions & 0 deletions metadata-ingestion/src/datahub/cli/config_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import logging
import os
import sys
from typing import Optional, Union

import click
import yaml
from pydantic import BaseModel, ValidationError

from datahub.cli.env_utils import get_boolean_env_variable

__help__ = (
"For helper methods to contain manipulation of the config file in local system."
)
log = logging.getLogger(__name__)

DEFAULT_GMS_HOST = "http://localhost:8080"
CONDENSED_DATAHUB_CONFIG_PATH = "~/.datahubenv"
DATAHUB_CONFIG_PATH = os.path.expanduser(CONDENSED_DATAHUB_CONFIG_PATH)
DATAHUB_ROOT_FOLDER = os.path.expanduser("~/.datahub")
ENV_SKIP_CONFIG = "DATAHUB_SKIP_CONFIG"


class GmsConfig(BaseModel):
server: str
token: Optional[str] = None


class DatahubConfig(BaseModel):
gms: GmsConfig


def persist_datahub_config(config: dict) -> None:
with open(DATAHUB_CONFIG_PATH, "w+") as outfile:
yaml.dump(config, outfile, default_flow_style=False)
return None


def write_gms_config(
host: str, token: Optional[str], merge_with_previous: bool = True
) -> None:
config = DatahubConfig(gms=GmsConfig(server=host, token=token))
if merge_with_previous:
try:
previous_config = get_client_config(as_dict=True)
assert isinstance(previous_config, dict)
except Exception as e:
# ok to fail on this
previous_config = {}
log.debug(
f"Failed to retrieve config from file {DATAHUB_CONFIG_PATH}: {e}. This isn't fatal."
)
config_dict = {**previous_config, **config.dict()}
else:
config_dict = config.dict()
persist_datahub_config(config_dict)


def get_details_from_config():
datahub_config = get_client_config(as_dict=False)
assert isinstance(datahub_config, DatahubConfig)
if datahub_config is not None:
gms_config = datahub_config.gms

gms_host = gms_config.server
gms_token = gms_config.token
return gms_host, gms_token
else:
return None, None


def should_skip_config() -> bool:
return get_boolean_env_variable(ENV_SKIP_CONFIG, False)


def ensure_datahub_config() -> None:
if not os.path.isfile(DATAHUB_CONFIG_PATH):
click.secho(
f"No {CONDENSED_DATAHUB_CONFIG_PATH} file found, generating one for you...",
bold=True,
)
write_gms_config(DEFAULT_GMS_HOST, None)


def get_client_config(as_dict: bool = False) -> Union[Optional[DatahubConfig], dict]:
with open(DATAHUB_CONFIG_PATH, "r") as stream:
try:
config_json = yaml.safe_load(stream)
if as_dict:
return config_json
try:
datahub_config = DatahubConfig.parse_obj(config_json)
return datahub_config
except ValidationError as e:
click.echo(
f"Received error, please check your {CONDENSED_DATAHUB_CONFIG_PATH}"
)
click.echo(e, err=True)
sys.exit(1)
except yaml.YAMLError as exc:
click.secho(f"{DATAHUB_CONFIG_PATH} malformed, error: {exc}", bold=True)
return None
Loading

0 comments on commit 886a513

Please sign in to comment.