Skip to content

Commit

Permalink
chore: linting
Browse files Browse the repository at this point in the history
  • Loading branch information
peppelinux committed Jul 23, 2023
1 parent 2ccfb87 commit c75e12a
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 73 deletions.
121 changes: 61 additions & 60 deletions pyeudiw/satosa/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import satosa.logging_util as lu
from satosa.backends.base import BackendModule
from satosa.exception import (
SATOSABadRequestError,
SATOSABadRequestError,
SATOSANoBoundEndpointError
)
from satosa.internal import InternalData
Expand Down Expand Up @@ -74,7 +74,7 @@ def __init__(self, auth_callback_func, internal_attributes, config, base_url, na

logger.debug(
lu.LOG_FMT.format(
id="OpenID4VP init",
id="OpenID4VP init",
message=f"Loaded configuration:\n{json.dumps(config)}"
)
)
Expand All @@ -97,7 +97,7 @@ def register_endpoints(self):

logger.debug(
lu.LOG_FMT.format(
id="OpenID4VP endpoint registration",
id="OpenID4VP endpoint registration",
message=f"[OpenID4VP] Loaded endpoint: '{k}'"
)
)
Expand Down Expand Up @@ -184,124 +184,125 @@ def _translate_response(self, response, issuer):
:return: A SATOSA internal response.
"""
timestamp = response.get(
"auth_time",
"auth_time",
response.get('iat', int(datetime.utcnow().timestamp()))
)

# it may depends by credential type and attested security context evaluated
# if WIA was previously submitted by the Wallet

# auth_class_ref = response.get("acr", response.get("amr", UNSPECIFIED))
# auth_info = AuthenticationInformation(auth_class_ref, timestamp, issuer)
#internal_resp = InternalData(auth_info=auth_info)
# internal_resp = InternalData(auth_info=auth_info)
internal_resp = InternalData()
internal_resp.attributes = self.converter.to_internal("openid4vp", response)
internal_resp.subject_id = "take the subject id from the digital credential" # response["sub"]
internal_resp.attributes = self.converter.to_internal(
"openid4vp", response)
# response["sub"]
internal_resp.subject_id = "take the subject id from the digital credential"
return internal_resp

def redirect_endpoint(self, context, *args):
jwk = self.metadata_jwk
self.metadata_jwk

if context.request_method.lower() != 'post':
raise SATOSABadRequestError("HTTP Method not supported")
if context.request_uri not in self.config["metadata"]['redirect_uris']:

if context.request_uri not in self.config["metadata"]['redirect_uris']:
raise SATOSANoBoundEndpointError("request_uri not valid")

# take the encrypted jwt, decrypt with my public key (one of the metadata) -> if not -> exception

# get state and nonce, do lookup on the db -> if not -> exception

# check with pydantic on the JWT schema

# check if vp_token is string or array, if array iter all the elements

# take the single vp token, take the credential within it, use cnf.jwk to validate the vp token signature -> if not exception

# establish the trust with the issuer of the credential by checking it to the revocation

# check the revocation of the credential

# for all the valid credentials, take the payload and the disclosure and discose the user attributes

# returns the user attributes .. something like the ...

all_user_claims = dict()

logger.debug(
lu.LOG_FMT.format(
id=lu.get_session_id(context.state),
id=lu.get_session_id(context.state),
message=f"Wallet disclosure: {all_user_claims}"
)
)

_info = {"issuer": {}}
internal_resp = self._translate_response(
all_user_claims, _info["issuer"]
)
return self.auth_callback_func(context, internal_resp)


def _request_endpoint_dpop(self, context, *args):
""" This validates, if any, the DPoP http request header """

if context.http_headers and 'HTTP_AUTHORIZATION' in context.http_headers:
# the wallet instance MAY use the endpoint authentication to give its WIA

# TODO - validate the trust to the Wallet Provider
# using the TA public key validate trust_chain and or x5c

# take WIA
wia = unpad_jwt_payload(context.http_headers['HTTP_AUTHORIZATION'])
dpop = DPoPVerifier(
public_jwk = wia['cnf']['jwk'],
http_header_authz = context.http_headers['HTTP_AUTHORIZATION'],
http_header_dpop = context.http_headers['HTTP_DPOP']
public_jwk=wia['cnf']['jwk'],
http_header_authz=context.http_headers['HTTP_AUTHORIZATION'],
http_header_dpop=context.http_headers['HTTP_DPOP']
)

if not dpop.is_valid:
#
#
logger.error("MESSAGE HERE")
raise Exception(
"return an HTTP response application/json with "
"the error and error_description "
"according to the UX design"
)

# TODO
# assert and configure the wallet capabilities?
# assert and configure the wallet Attested Security Context?

else:
# TODO - check that this logging system works ...
logger.warning(
"The Wallet Instance didn't provide its Wallet Instance Attestation "
"a default set of capabilities and a low security level are accorded."

)

def request_endpoint(self, context, *args):
jwk = self.metadata_jwk

# check DPOP for WIA if any
self._request_endpoint_dpop(context)

# TODO
# take decision, do customization if the WIA is available

helper = JWSHelper(jwk)
data = {
"scope": ' '.join(self.config['authorization']['scopes']),
"client_id_scheme": "entity_id", # that's federation.
"client_id": self.client_id,
"response_mode": "direct_post.jwt", # only HTTP POST is allowed.
"response_type": "vp_token",
"response_uri": self.config["metadata"]["redirect_uris"][0],
"nonce": str(uuid.uuid4()),
"state": str(uuid.uuid4()),
"iss": self.client_id,
"iat": iat_now(),
"exp": iat_now() + (self.default_exp * 60) # in seconds
"scope": ' '.join(self.config['authorization']['scopes']),
"client_id_scheme": "entity_id", # that's federation.
"client_id": self.client_id,
"response_mode": "direct_post.jwt", # only HTTP POST is allowed.
"response_type": "vp_token",
"response_uri": self.config["metadata"]["redirect_uris"][0],
"nonce": str(uuid.uuid4()),
"state": str(uuid.uuid4()),
"iss": self.client_id,
"iat": iat_now(),
"exp": iat_now() + (self.default_exp * 60) # in seconds
}
jwt = helper.sign(data)

Expand All @@ -315,32 +316,32 @@ def request_endpoint(self, context, *args):

def handle_error(
self,
context :dict,
context: dict,
message: str,
troubleshoot: str = "",
err="",
err_code="500",
template_path="templates",
error_template="error.html",
):
# TODO: evaluate with UX designers if Jinja2 template

# TODO: evaluate with UX designers if Jinja2 template
# loader and rendering is required, it seems not.
logger.error(
lu.LOG_FMT.format(
id=lu.get_session_id(context.state),
id=lu.get_session_id(context.state),
message=f"{message}: {err}. {troubleshoot}"
)
)

result = json.dumps(
{
"message": message,
"message": message,
"troubleshoot": troubleshoot
}
)
return Response(
result,
content = "text/json; charset=utf8",
status = err_code
result,
content="text/json; charset=utf8",
status=err_code
)
5 changes: 2 additions & 3 deletions pyeudiw/tests/oauth2/test_dpop.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ def wia_jws(jwshelper):

def test_create_validate_dpop_http_headers(wia_jws, private_jwk=PRIVATE_JWK):
# create
headers = unpad_jwt_header(wia_jws)
unpad_jwt_header(wia_jws)
payload = unpad_jwt_payload(wia_jws)
# TODO assertions for upadded headers and payload
# TODO assertions for upadded headers and payload

new_dpop = DPoPIssuer(
htu='https://example.org/redirect',
Expand All @@ -87,4 +87,3 @@ def test_create_validate_dpop_http_headers(wia_jws, private_jwk=PRIVATE_JWK):

assert dpop.is_valid
# TODO assertions

19 changes: 9 additions & 10 deletions pyeudiw/tests/satosa/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from unittest.mock import Mock



BASE_URL = "https://example.com"
AUTHZ_PAGE = "example.com"
AUTH_ENDPOINT = "https://example.com/auth"
Expand Down Expand Up @@ -50,7 +49,7 @@
"default_exp": 6
},
"authorization": {
"url_scheme": "eudiw", # eudiw://
"url_scheme": "eudiw", # eudiw://
"scopes": ["pid-sd-jwt:unique_id+given_name+family_name"],
},
"federation": {
Expand Down Expand Up @@ -362,25 +361,25 @@ def test_redirect_endpoint(self, context):
# TODO any additional checks after the backend returned the user attributes to satosa core

def test_request_endpoint(self, context):

jwshelper = JWSHelper(PRIVATE_JWK)
wia = jwshelper.sign(
WALLET_INSTANCE_ATTESTATION,
protected={'trust_chain': [], 'x5c': []}
)

dpop_wia = wia
dpop_proof = DPoPIssuer(
htu=CONFIG['metadata']['request_uris'][0],
token = dpop_wia,
private_jwk = PRIVATE_JWK
token=dpop_wia,
private_jwk=PRIVATE_JWK
).proof

context.http_headers = dict(
HTTP_AUTHORIZATION = f"DPoP {dpop_wia}",
HTTP_DPOP = dpop_proof
HTTP_AUTHORIZATION=f"DPoP {dpop_wia}",
HTTP_DPOP=dpop_proof
)

request_endpoint = self.backend.request_endpoint(context)

assert request_endpoint
Expand All @@ -389,7 +388,7 @@ def test_request_endpoint(self, context):

msg = json.loads(request_endpoint.message)
assert msg["response"]

# TODO assertion su JWS decodificato
# ...

Expand Down

0 comments on commit c75e12a

Please sign in to comment.