Skip to content

Commit

Permalink
API test for manage secrets privilege
Browse files Browse the repository at this point in the history
  • Loading branch information
kkorchak committed Oct 27, 2023
1 parent 1ac831f commit 2ae9f7c
Show file tree
Hide file tree
Showing 2 changed files with 352 additions and 0 deletions.
181 changes: 181 additions & 0 deletions smoke-test/tests/privileges/test_manage_secrets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import pytest
import tenacity

from tests.utils import (get_frontend_session, wait_for_writes_to_sync, wait_for_healthcheck_util,
get_frontend_url, get_admin_credentials,get_sleep_info)
from tests.privileges.utils import (base_privileges_set_status, sensitive_info_prifileges_set_status,
view_entity_prifileges_set_status, create_user, remove_user,login_as)

sleep_sec, sleep_times = get_sleep_info()

@pytest.fixture(scope="session")
def wait_for_healthchecks():
wait_for_healthcheck_util()
yield


@pytest.mark.dependency()
def test_healthchecks(wait_for_healthchecks):
# Call to wait_for_healthchecks fixture will do the actual functionality.
pass


@pytest.fixture(scope="session")
def admin_session(wait_for_healthchecks):
yield get_frontend_session()


@pytest.mark.dependency(depends=["test_healthchecks"])
@pytest.fixture(scope="module", autouse=True)
def privileges_and_test_user_setup(admin_session):
"""Fixture to execute setup before and tear down after all tests are run"""
# Disable 'All users' privileges
base_privileges_set_status("INACTIVE", admin_session)
sensitive_info_prifileges_set_status("INACTIVE", admin_session)
view_entity_prifileges_set_status("INACTIVE", admin_session)
# Sleep for eventual consistency
wait_for_writes_to_sync()

# Create a new user
admin_session = create_user(admin_session, "user", "user")

yield

# Remove test user
remove_user(admin_session, "urn:li:corpuser:user")

# Restore All users vrivileges
base_privileges_set_status("ACTIVE", admin_session)
sensitive_info_prifileges_set_status("ACTIVE", admin_session)
view_entity_prifileges_set_status("ACTIVE", admin_session)

# Sleep for eventual consistency
wait_for_writes_to_sync()


@tenacity.retry(
stop=tenacity.stop_after_attempt(10), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_can_create_secret(session, json):
create_secret_success = session.post(
f"{get_frontend_url()}/api/v2/graphql", json=json)
create_secret_success.raise_for_status()
secret_data = create_secret_success.json()

assert secret_data
assert secret_data["data"]
assert secret_data["data"]["createSecret"]
assert secret_data["data"]["createSecret"] == "urn:li:dataHubSecret:TestSecretName"


@tenacity.retry(
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_cant_create_secret(session, json):
create_secret_response = session.post(
f"{get_frontend_url()}/api/v2/graphql", json=json)
create_secret_response.raise_for_status()
create_secret_data = create_secret_response.json()

assert create_secret_data["errors"][0]["extensions"]["code"] == 403
assert create_secret_data["errors"][0]["extensions"]["type"] == "UNAUTHORIZED"
assert create_secret_data["data"]["createSecret"] == None


@pytest.mark.dependency(depends=["test_healthchecks"])
def test_add_and_verify_privileges_to_manage_secrets():

(admin_user, admin_pass) = get_admin_credentials()
admin_session = login_as(admin_user, admin_pass)
user_session = login_as("user", "user")

# Verify new user can't create secrets
create_secret = {
"query": """mutation createSecret($input: CreateSecretInput!) {\n
createSecret(input: $input)\n}""",
"variables": {
"input":{
"name":"TestSecretName",
"value":"Test Secret Value",
"description":"Test Secret Description"
}
},
}
_ensure_cant_create_secret(user_session, create_secret)


# Assign privileges to the new user to manage secretes
manage_secrets = {
"query": """mutation createPolicy($input: PolicyUpdateInput!) {\n
createPolicy(input: $input) }""",
"variables": {
"input": {
"type": "PLATFORM",
"name": "Manage Secrets",
"description": "Manage Secrets Policy",
"state": "ACTIVE",
"resources": {"filter":{"criteria":[]}},
"privileges": ["MANAGE_SECRETS"],
"actors": {
"users": ["urn:li:corpuser:user"],
"resourceOwners": False,
"allUsers": False,
"allGroups": False,
},
}
},
}

response = admin_session.post(f"{get_frontend_url()}/api/v2/graphql", json=manage_secrets)
response.raise_for_status()
res_data = response.json()

assert res_data
assert res_data["data"]
assert res_data["data"]["createPolicy"]
policy_urn = res_data["data"]["createPolicy"]


# Verify new user can create and manage secrets
# Create a secret
_ensure_can_create_secret(user_session, create_secret)


# Remove a secret
remove_secret = {
"query": """mutation deleteSecret($urn: String!) {\n
deleteSecret(urn: $urn)\n}""",
"variables": {
"urn": "urn:li:dataHubSecret:TestSecretName"
},
}

remove_secret_response = user_session.post(f"{get_frontend_url()}/api/v2/graphql", json=remove_secret)
remove_secret_response.raise_for_status()
secret_data = remove_secret_response.json()

assert secret_data
assert secret_data["data"]
assert secret_data["data"]["deleteSecret"]
assert secret_data["data"]["deleteSecret"] == "urn:li:dataHubSecret:TestSecretName"


# Remove the policy
remove_policy = {
"query": """mutation deletePolicy($urn: String!) {\n
deletePolicy(urn: $urn) }""",
"variables": {"urn": policy_urn},
}

response = admin_session.post(f"{get_frontend_url()}/api/v2/graphql", json=remove_policy)
response.raise_for_status()
res_data = response.json()

assert res_data
assert res_data["data"]
assert res_data["data"]["deletePolicy"]
assert res_data["data"]["deletePolicy"] == policy_urn


# Ensure user can't create secret after policy is removed
_ensure_cant_create_secret(user_session, create_secret)
171 changes: 171 additions & 0 deletions smoke-test/tests/privileges/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import requests_wrapper as requests
from tests.consistency_utils import wait_for_writes_to_sync
from tests.utils import (get_frontend_url, wait_for_writes_to_sync, get_admin_credentials)


def base_privileges_set_status(status, session):
base_platform_privileges = {
"query": """mutation updatePolicy($urn: String!, $input: PolicyUpdateInput!) {\n
updatePolicy(urn: $urn, input: $input) }""",
"variables": {
"urn": "urn:li:dataHubPolicy:7",
"input": {
"type": "PLATFORM",
"state": status,
"name": "All Users - Base Platform Privileges",
"description": "Grants base platform privileges to ALL users of DataHub. Change this policy to alter that behavior.",
"privileges": ["MANAGE_INGESTION",
"MANAGE_SECRETS",
"MANAGE_USERS_AND_GROUPS",
"VIEW_ANALYTICS",
"GENERATE_PERSONAL_ACCESS_TOKENS",
"MANAGE_DOMAINS",
"MANAGE_GLOBAL_ANNOUNCEMENTS",
"MANAGE_TESTS",
"MANAGE_GLOSSARIES",
"MANAGE_TAGS",
"MANAGE_GLOBAL_VIEWS",
"MANAGE_GLOBAL_OWNERSHIP_TYPES"],
"actors": {
"users": [],
"groups": None,
"resourceOwners": False,
"allUsers": True,
"allGroups": False,
"resourceOwnersTypes": None,
},
},
},
}
base_privileges_response = session.post(
f"{get_frontend_url()}/api/v2/graphql", json=base_platform_privileges)
base_privileges_response.raise_for_status()
base_res_data = base_privileges_response.json()
assert base_res_data["data"]["updatePolicy"] == "urn:li:dataHubPolicy:7"

def sensitive_info_prifileges_set_status(status, session):
dataset_sensitive_information = {
"query": """mutation updatePolicy($urn: String!, $input: PolicyUpdateInput!) {\n
updatePolicy(urn: $urn, input: $input) }""",
"variables": {
"urn": "urn:li:dataHubPolicy:view-dataset-sensitive",
"input": {
"type": "METADATA",
"state": status,
"name": "All Users - View Dataset Sensitive Information",
"description": "Grants viewing privileges of usage and profile information of all datasets for all users",
"privileges": ["VIEW_DATASET_USAGE","VIEW_DATASET_PROFILE"],
"actors": {
"users": [],
"groups": None,
"resourceOwners": False,
"allUsers": True,
"allGroups": False,
"resourceOwnersTypes": None,
},
},
},
}
sensitive_info_response = session.post(
f"{get_frontend_url()}/api/v2/graphql", json=dataset_sensitive_information)
sensitive_info_response.raise_for_status()
sens_info_data = sensitive_info_response.json()
assert sens_info_data["data"]["updatePolicy"] == "urn:li:dataHubPolicy:view-dataset-sensitive"

def view_entity_prifileges_set_status(status, session):
view_entity_page = {
"query": """mutation updatePolicy($urn: String!, $input: PolicyUpdateInput!) {\n
updatePolicy(urn: $urn, input: $input) }""",
"variables": {
"urn": "urn:li:dataHubPolicy:view-entity-page-all",
"input": {
"type": "METADATA",
"state": status,
"name": "All Users - View Entity Page",
"description": "Grants entity view to all users",
"privileges": ["VIEW_ENTITY_PAGE",
"SEARCH_PRIVILEGE",
"GET_COUNTS_PRIVILEGE",
"GET_TIMESERIES_ASPECT_PRIVILEGE",
"GET_ENTITY_PRIVILEGE",
"GET_TIMELINE_PRIVILEGE"],
"actors": {
"users": [],
"groups": None,
"resourceOwners": False,
"allUsers": True,
"allGroups": False,
"resourceOwnersTypes": None,
},
},
},
}
view_entity_response = session.post(
f"{get_frontend_url()}/api/v2/graphql", json=view_entity_page)
view_entity_response.raise_for_status()
view_entity_data = view_entity_response.json()
assert view_entity_data["data"]["updatePolicy"] == "urn:li:dataHubPolicy:view-entity-page-all"

def create_user(session, email, password):
# Remove user if exists
res_data = remove_user(session, f"urn:li:corpuser:{email}")
assert res_data
assert "error" not in res_data
# Get the invite token
get_invite_token_json = {
"query": """query getInviteToken($input: GetInviteTokenInput!) {\n
getInviteToken(input: $input){\n
inviteToken\n
}\n
}""",
"variables": {"input": {}},
}
get_invite_token_response = session.post(
f"{get_frontend_url()}/api/v2/graphql", json=get_invite_token_json
)
get_invite_token_response.raise_for_status()
get_invite_token_res_data = get_invite_token_response.json()
invite_token = get_invite_token_res_data["data"]["getInviteToken"]["inviteToken"]
assert invite_token is not None
assert "error" not in invite_token
# Create a new user using the invite token
sign_up_json = {
"fullName": "Test User",
"email": email,
"password": password,
"title": "Data Engineer",
"inviteToken": invite_token,
}
sign_up_response = session.post(
f"{get_frontend_url()}/signUp", json=sign_up_json
)
sign_up_response.raise_for_status()
assert sign_up_response
assert "error" not in sign_up_response
wait_for_writes_to_sync()
session.cookies.clear()
(admin_user, admin_pass) = get_admin_credentials()
admin_session = login_as(admin_user, admin_pass)
return admin_session


def login_as(username, password):
session = requests.Session()
headers = {
"Content-Type": "application/json",
}
data = '{"username":"' + username + '", "password":"' + password + '"}'
response = session.post(f"{get_frontend_url()}/logIn", headers=headers, data=data)
response.raise_for_status()
return session

def remove_user(session, urn):
json = {
"query": """mutation removeUser($urn: String!) {\n
removeUser(urn: $urn)
}""",
"variables": {"urn": urn},
}
response = session.post(f"{get_frontend_url()}/api/v2/graphql", json=json)
response.raise_for_status()
return response.json()

0 comments on commit 2ae9f7c

Please sign in to comment.