Skip to content

Commit

Permalink
wip: integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
elisanp committed Oct 15, 2024
1 parent ea7069d commit 4c1511d
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 370 deletions.
56 changes: 30 additions & 26 deletions example/satosa/integration_test/commons.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
import base64
from bs4 import BeautifulSoup
import datetime
import requests
from typing import Any, Literal
from bs4 import BeautifulSoup
from sd_jwt.holder import SDJWTHolder

from pyeudiw.jwk import JWK
from pyeudiw.jwt import DEFAULT_SIG_KTY_MAP, JWEHelper
from pyeudiw.jwt.utils import decode_jwt_payload
from pyeudiw.presentation_exchange.schemas.oid4vc_presentation_definition import PresentationDefinition
from pyeudiw.sd_jwt import (
# _adapt_keys,
import_ec,
issue_sd_jwt,
load_specification_from_yaml_string
)
from pyeudiw.storage.base_storage import TrustType
from pyeudiw.storage.db_engine import DBEngine
from pyeudiw.tests.federation.base import (
EXP,
Expand All @@ -29,9 +27,11 @@
leaf_wallet_signed,
trust_chain_issuer
)
from sd_jwt.holder import SDJWTHolder
from saml2_sp import saml2_request

from saml2_sp import IDP_BASEURL
from settings import (
IDP_BASEURL,
CONFIG_DB,
RP_EID,
its_trust_chain
Expand All @@ -53,7 +53,6 @@
"default_exp": 1024,
"key_binding": True
}
ISSUER_PRIVATE_JWK = JWK(leaf_cred_jwk.serialize(private=True))
WALLET_PRIVATE_JWK = JWK(leaf_wallet_jwk.serialize(private=True))
WALLET_PUBLIC_JWK = JWK(leaf_wallet_jwk.serialize())

Expand All @@ -64,7 +63,7 @@ def setup_test_db_engine() -> DBEngine:

def apply_trust_settings(db_engine_inst: DBEngine) -> DBEngine:
db_engine_inst.add_trust_anchor(
entity_id=ta_ec['iss'],
entity_id=ta_ec["iss"],
entity_configuration=ta_ec_signed,
exp=EXP
)
Expand All @@ -76,24 +75,32 @@ def apply_trust_settings(db_engine_inst: DBEngine) -> DBEngine:
)

db_engine_inst.add_or_update_trust_attestation(
entity_id=leaf_wallet['iss'],
entity_id=leaf_wallet["iss"],
attestation=leaf_wallet_signed,
exp=datetime.datetime.now().isoformat()
)

db_engine_inst.add_or_update_trust_attestation(
entity_id=leaf_cred['iss'],
entity_id=leaf_cred["iss"],
attestation=leaf_cred_signed,
exp=datetime.datetime.now().isoformat()
)

settings = ISSUER_CONF
db_engine_inst.add_or_update_trust_attestation(
entity_id=settings["issuer"],
trust_type=TrustType.DIRECT_TRUST_SD_JWT_VC,
jwks=leaf_cred_jwk_prot.serialize())
return db_engine_inst

def create_saml_auth_request() -> str:
auth_req_url = f"{saml2_request["headers"][0][1]}&idp_hinting=wallet"
return auth_req_url

def create_issuer_test_data() -> dict[Literal["jws"] | Literal["issuance"], str]:
# create a SD-JWT signed by a trusted credential issuer
settings = ISSUER_CONF
settings['issuer'] = leaf_cred['iss']
settings['default_exp'] = 33
settings["default_exp"] = 33
sd_specification = load_specification_from_yaml_string(
settings["sd_specification"]
)
Expand All @@ -103,13 +110,12 @@ def create_issuer_test_data() -> dict[Literal["jws"] | Literal["issuance"], str]
settings,
CREDENTIAL_ISSUER_JWK,
WALLET_PUBLIC_JWK,
trust_chain=trust_chain_issuer,
additional_headers={"typ": "vc+sd-jwt"}
)
return issued_jwt


def create_holder_test_data(issued_jwt: dict[Literal["jws"] | Literal["issuance"], str], request_nonce: str, verifier_id: str) -> str:
def create_holder_test_data(issued_jwt: dict[Literal["jws"] | Literal["issuance"], str], request_nonce: str, request_aud: str) -> str:
settings = ISSUER_CONF

sdjwt_at_holder = SDJWTHolder(
Expand All @@ -118,12 +124,12 @@ def create_holder_test_data(issued_jwt: dict[Literal["jws"] | Literal["issuance"
)
sdjwt_at_holder.create_presentation(
claims_to_disclose={
'tax_id_code': "TINIT-XXXXXXXXXXXXXXXX",
'given_name': 'Mario',
'family_name': 'Rossi'
"tax_id_code": True,
"given_name": True,
"family_name": True
},
nonce=request_nonce,
aud=verifier_id,
aud=request_aud,
sign_alg=DEFAULT_SIG_KTY_MAP[WALLET_PRIVATE_JWK.key.kty],
holder_key=(
import_ec(
Expand All @@ -138,19 +144,17 @@ def create_holder_test_data(issued_jwt: dict[Literal["jws"] | Literal["issuance"
vp_token = sdjwt_at_holder.sd_jwt_presentation
return vp_token


def create_authorize_response(vp_token: str, state: str, nonce: str, response_uri: str) -> str:
# take relevant information from RP's entity configuration
def create_authorize_response(vp_token: str, state: str, response_uri: str) -> str:
# Extract public key from RP's entity configuration
client = requests.Session()
rp_ec_jwt = client.get(
f'{IDP_BASEURL}/OpenID4VP/.well-known/openid-federation',
f"{IDP_BASEURL}/OpenID4VP/.well-known/openid-federation",
verify=False
).content.decode()
rp_ec = decode_jwt_payload(rp_ec_jwt)

presentation_definition = rp_ec["metadata"]["wallet_relying_party"]["presentation_definition"]
PresentationDefinition(**presentation_definition)
assert response_uri == rp_ec["metadata"]['wallet_relying_party']["response_uris_supported"][0]
assert response_uri == rp_ec["metadata"]["wallet_relying_party"]["response_uris_supported"][0]
encryption_key = rp_ec["metadata"]["wallet_relying_party"]["jwks"]["keys"][1]

response = {
"state": state,
Expand All @@ -169,8 +173,8 @@ def create_authorize_response(vp_token: str, state: str, nonce: str, response_ur
}
}
encrypted_response = JWEHelper(
# RSA (EC is not fully supported todate)
JWK(rp_ec["metadata"]['wallet_relying_party']['jwks']['keys'][1])
# RSA (EC is not fully supported to date)
JWK(encryption_key)
).encrypt(response)
return encrypted_response

Expand Down
59 changes: 28 additions & 31 deletions example/satosa/integration_test/cross_device_integration_test.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,27 @@
import urllib.parse
import requests
import urllib
from bs4 import BeautifulSoup
import re
import requests
import urllib.parse

from pyeudiw.jwt.utils import decode_jwt_payload

from commons import (
ISSUER_CONF,
setup_test_db_engine,
apply_trust_settings,
create_saml_auth_request,
create_authorize_response,
create_holder_test_data,
create_issuer_test_data,
extract_saml_attributes,
setup_test_db_engine,
extract_saml_attributes
)
from saml2_sp import saml2_request
from settings import TIMEOUT_S

# put a trust attestation related itself into the storage
# this is then used as trust_chain header parameter in the signed request object
db_engine_inst = setup_test_db_engine()
db_engine_inst = apply_trust_settings(db_engine_inst)

headers_browser = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36"
}


def _verify_status(status_uri: str, expected_code: int):
status_check = http_user_agent.get(
status_uri,
Expand All @@ -38,16 +34,16 @@ def _verify_status(status_uri: str, expected_code: int):
def _extract_request_uri(bs: BeautifulSoup) -> str:
# Request URI is extracted by parsing the QR code in the response page
qrcode_element = list(bs.find(id="content-qrcode-payload").children)[1]
qrcode_text = qrcode_element.get('contents')
request_uri = urllib.parse.parse_qs(qrcode_text)['request_uri'][0]
qrcode_text = qrcode_element.get("contents")
request_uri = urllib.parse.parse_qs(qrcode_text)["request_uri"][0]
return request_uri


def _extract_status_uri(bs: BeautifulSoup) -> str:
# Status uri is extracted by parsing a matching regexp in the <script> portion of the HTML.
# This funciton is somewhat unstable as it supposes that "qr_code.html" has certain properties
# which might not be true.
qrcode_script_element: str = bs.find_all('script')[-1].string
# which might not be true.
qrcode_script_element: str = bs.find_all("script")[-1].string
qrcode_script_element_formatted = [item.strip() for item in qrcode_script_element.splitlines()]
qrcode_script_element_formatted = str.join("", qrcode_script_element_formatted)

Expand All @@ -61,16 +57,18 @@ def _extract_status_uri(bs: BeautifulSoup) -> str:
http_user_agent = requests.Session()
wallet_user_agent = requests.Session()

initiating_saml_request_uri = f"{saml2_request['headers'][0][1]}&idp_hinting=wallet"
auth_req_url = create_saml_auth_request()
headers_browser = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36"
}
request_uri = ""
authn_response = http_user_agent.get(
url=initiating_saml_request_uri,
url=auth_req_url,
verify=False,
headers=headers_browser,
timeout=TIMEOUT_S
)


# Extract request URI and status endpoint by parsing the response page
qrcode_page = BeautifulSoup(authn_response.content.decode(), features="html.parser")
request_uri = _extract_request_uri(qrcode_page)
Expand All @@ -85,7 +83,7 @@ def _extract_status_uri(bs: BeautifulSoup) -> str:
timeout=TIMEOUT_S)

request_object_claims = decode_jwt_payload(sign_request_obj.text)
response_uri = request_object_claims['response_uri']
response_uri = request_object_claims["response_uri"]

# Wallet obtained the Request Object; verify that status is 202
_verify_status(status_uri, expected_code=202)
Expand All @@ -94,25 +92,24 @@ def _extract_status_uri(bs: BeautifulSoup) -> str:
verifiable_credential = create_issuer_test_data()
verifiable_presentations = create_holder_test_data(
verifiable_credential,
request_object_claims['nonce'],
request_object_claims['client_id']
request_object_claims["nonce"],
request_object_claims["client_id"]
)
wallet_response_data = create_authorize_response(
verifiable_presentations,
request_object_claims["state"],
request_object_claims["nonce"],
response_uri
)

authz_response_feedback = wallet_user_agent.post(
authz_response = wallet_user_agent.post(
response_uri,
verify=False,
data={'response': wallet_response_data},
data={"response": wallet_response_data},
timeout=TIMEOUT_S
)

assert authz_response_feedback.status_code == 200
assert authz_response_feedback.json().get("redirect_uri", None) is None
assert authz_response.status_code == 200
assert authz_response.json().get("redirect_uri", None) is None

status_check = http_user_agent.get(
status_uri,
Expand All @@ -122,15 +119,15 @@ def _extract_status_uri(bs: BeautifulSoup) -> str:
assert status_check.status_code == 200
assert status_check.json().get("redirect_uri", None) is not None

callback_uri = status_check.json().get("redirect_uri", None)
# TODO: this test does not check that the login page is properly updated with a login button linkint to the redirect uri
callback_uri = status_check.json().get("redirect_uri", None)
satosa_authn_response = http_user_agent.get(
callback_uri,
verify=False,
timeout=TIMEOUT_S
)

assert 'SAMLResponse' in satosa_authn_response.content.decode()
assert "SAMLResponse" in satosa_authn_response.content.decode()
print(satosa_authn_response.content.decode())

attributes = extract_saml_attributes(satosa_authn_response.content.decode())
Expand All @@ -139,9 +136,9 @@ def _extract_status_uri(bs: BeautifulSoup) -> str:

expected = {
# https://oidref.com/2.5.4.42
"urn:oid:2.5.4.42": ISSUER_CONF['sd_specification'].split('!sd given_name:')[1].split('"')[1].lower(),
"urn:oid:2.5.4.42": ISSUER_CONF["sd_specification"].split("!sd given_name:")[1].split('"')[1].lower(),
# https://oidref.com/2.5.4.4
"urn:oid:2.5.4.4": ISSUER_CONF['sd_specification'].split('!sd family_name:')[1].split('"')[1].lower()
"urn:oid:2.5.4.4": ISSUER_CONF["sd_specification"].split("!sd family_name:")[1].split('"')[1].lower()
}

for exp_att_name, exp_att_value in expected.items():
Expand All @@ -155,4 +152,4 @@ def _extract_status_uri(bs: BeautifulSoup) -> str:
assert exp_att_value == obt_att_value, f"wrong attrirbute parsing expected {exp_att_value}, obtained {obt_att_value}"


print('TEST PASSED')
print("TEST PASSED")
Loading

0 comments on commit 4c1511d

Please sign in to comment.