From ec03b83ad5a2ec1a8384ae6e1d38bb693bf2ada4 Mon Sep 17 00:00:00 2001 From: "Jens W. Klein" Date: Thu, 16 Jan 2025 17:00:21 +0100 Subject: [PATCH] example callback and related changes --- .dockerignore | 1 + .gitignore | 1 + .pre-commit-config.yaml | 2 +- .../Dockerfile | 30 ++ .../README.md | 84 +++++ .../edutap_wallet_google_example_callback.py | 35 ++ .../pyproject.toml | 10 + .../requirements.txt | 2 + .../swarm.yml | 47 +++ pyproject.toml | 7 +- .../_vendor/google_pay_token_decryption.py | 14 +- src/edutap/wallet_google/handlers/fastapi.py | 4 +- src/edutap/wallet_google/handlers/validate.py | 153 +++++++- src/edutap/wallet_google/models/handlers.py | 18 +- src/edutap/wallet_google/settings.py | 32 +- tests/test_handler_fastapi.py | 30 +- tests/test_handler_validate.py | 54 ++- tests/test_settings.py | 14 - ...est_vendor_google_pay_token_encryptions.py | 345 ------------------ 19 files changed, 444 insertions(+), 439 deletions(-) create mode 100644 .dockerignore create mode 100644 examples/edutap_wallet_google_example_callback/Dockerfile create mode 100644 examples/edutap_wallet_google_example_callback/README.md create mode 100644 examples/edutap_wallet_google_example_callback/edutap_wallet_google_example_callback.py create mode 100644 examples/edutap_wallet_google_example_callback/pyproject.toml create mode 100644 examples/edutap_wallet_google_example_callback/requirements.txt create mode 100644 examples/edutap_wallet_google_example_callback/swarm.yml delete mode 100644 tests/test_vendor_google_pay_token_encryptions.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..8d98f9d --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +.* diff --git a/.gitignore b/.gitignore index 2bdf9ad..96f7eab 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,7 @@ __pycache__/ node_modules/ sandbox/ htmlcov/ +.ipynb_checkpoints # venv related bin/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0aa3109..8e832bf 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -36,12 +36,12 @@ repos: - id: mypy args: ['--explicit-package-bases'] additional_dependencies: - - "types-requests" - "pytest-stub" - "google-auth-stubs" - "pydantic" - "pydantic_settings" - "fastapi" + - "httpx" - repo: https://github.com/codespell-project/codespell rev: v2.3.0 hooks: diff --git a/examples/edutap_wallet_google_example_callback/Dockerfile b/examples/edutap_wallet_google_example_callback/Dockerfile new file mode 100644 index 0000000..e73ee95 --- /dev/null +++ b/examples/edutap_wallet_google_example_callback/Dockerfile @@ -0,0 +1,30 @@ +# Use a Python image with uv pre-installed +FROM ghcr.io/astral-sh/uv:python3.13-bookworm-slim + +# Enable bytecode compilation +ENV UV_COMPILE_BYTECODE=1 + +# Copy from the cache instead of linking since it's a mounted volume +ENV UV_LINK_MODE=copy + +# copy project into container +COPY . /code + +# install Python packages +RUN \ + uv venv &&\ + uv pip install --no-cache-dir --upgrade -e /code &&\ + uv pip install --no-cache-dir --upgrade -e /code/examples/edutap_wallet_google_example_callback + +# Place executables in the environment at the front of the path +ENV PATH="/.venv/bin:$PATH" + +# Create a directory for logs +RUN mkdir /logs +ENV EDUTAP_WALLET_GOOGLE_EXAMPLE_CALLBACK_LOG_FILE=/logs/callback.log + +# set working directory +WORKDIR /code/examples/edutap_wallet_google_example_callback + +# run fastapi +CMD ["fastapi", "run", "edutap_wallet_google_example_callback.py", "--host", "0.0.0.0", "--port", "8080", "--proxy-headers"] diff --git a/examples/edutap_wallet_google_example_callback/README.md b/examples/edutap_wallet_google_example_callback/README.md new file mode 100644 index 0000000..f356a2b --- /dev/null +++ b/examples/edutap_wallet_google_example_callback/README.md @@ -0,0 +1,84 @@ +# Example callback service for Google + +This service logs received data to a file and to stdout. + +It logs space separated: +- `class_id` +- `object_id` +- `event_type` +- `exp_time_millis` +- `count` +- `nonce` + +# Configuration environment + +Environment variables are used for configuration. + +- `EDUTAP_WALLET_GOOGLE_EXAMPLE_CALLBACK_LOG_FILE` + + The name and location of the log file. + Default is relative to current working directory: `./callback_log.txt`. + +From `edutap.wallet_google`: + +- `EDUTAP_WALLET_GOOGLE_HANDLER_PREFIX_CALLBACK` + + The path prefix of the callback in the browser. + Default: Empty string (no prefix) + +- `EDUTAP_WALLET_GOOGLE_HANDLER_CALLBACK_VERIFY_SIGNATURE` + + Whether to verify the signature (`1`) in the callback or not (`0`). + Default: `1` + + +## Local usage (dev) + +Installation (execute in this folder) + +```shell +uv venv +uv pip install -r requirements.txt +source .venv/bin/activate +``` + +Run with +```shell +fastapi dev edutap_wallet_google_example_callback.py +``` + +## As Docker container + +### Build image and check container + +In the root of the repository (in `../..` relative to the location of this `README.md`), run: + +```shell +docker buildx build --progress=plain --no-cache -f examples/edutap_wallet_google_example_callback/Dockerfile -t edutap_wallet_google_example_callback . +``` + +Then run the container interactive to verify its working: +```shell +docker run -it edutap_wallet_google_example_callback +``` +Watch out for errors. Stop with Ctrl-c. + + +### Run on a server + +To get actual callbacks from Google the application has to be accessible from the internet and it need to serve on `https` with a valid TLS certificate. +No self-signed certificates are allowed! + +A kind of simple way to get an environment up and running is with Docker Swarm, Traefik Web-Proxy with Lets-Encrypt and our container running in there. +Since this is out of scope of this README we point the dear reader to the tutorial website [Docker Swarm Rocks](https://dockerswarm.rocks/traefik/). +The following examples are meant to run in such a cluster, or to be adapted to different environment. +We hope you get the idea. + +There is an example swarm deployment in here in `swarm.yml`. +It can be deployed on the cluster. +The public domain is configured using the environment variable `EDUTAP_WALLET_GOOGLE_EXAMPLE_DOMAIN`. +A TLS certificate will be issued automatically using Lets Encrypt. + +```shell +docker stack deploy swarm.yml -c swarm.yml edutap_wallet_google_example_callback +``` diff --git a/examples/edutap_wallet_google_example_callback/edutap_wallet_google_example_callback.py b/examples/edutap_wallet_google_example_callback/edutap_wallet_google_example_callback.py new file mode 100644 index 0000000..028d976 --- /dev/null +++ b/examples/edutap_wallet_google_example_callback/edutap_wallet_google_example_callback.py @@ -0,0 +1,35 @@ +from edutap.wallet_google.handlers.fastapi import router_callback +from fastapi import FastAPI +from fastapi.logger import logger + +import os +import pathlib + + +app = FastAPI() +app.include_router(router_callback) + + +class LoggingCallbackHandler: + """ + Implementation of edutap.wallet_google.protocols.CallbackHandler + """ + + async def handle( + self, + class_id: str, + object_id: str, + event_type: str, + exp_time_millis: int, + count: int, + nonce: str, + ) -> None: + pathlib.Path( + os.environ.get( + "EDUTAP_WALLET_GOOGLE_EXAMPLE_CALLBACK_LOG_FILE", "./callback_log.txt" + ) + ) + line = f'"{class_id}", "{object_id}", "{event_type}", "{exp_time_millis}", "{count}", "{nonce}"\n' + logger.info(line) + with open("callback_log.txt", "a") as file: + file.write(line) diff --git a/examples/edutap_wallet_google_example_callback/pyproject.toml b/examples/edutap_wallet_google_example_callback/pyproject.toml new file mode 100644 index 0000000..3ecdae2 --- /dev/null +++ b/examples/edutap_wallet_google_example_callback/pyproject.toml @@ -0,0 +1,10 @@ +[project] +name = "edutap_wallet_google_example_callback" +version = "1.0" +dependencies = [ + "edutap.wallet_google[fastapi]", + "fastapi[standard]", +] + +[project.entry-points.'edutap.wallet_google.plugins'] +CallbackHandler = 'edutap_wallet_google_example_callback:LoggingCallbackHandler' diff --git a/examples/edutap_wallet_google_example_callback/requirements.txt b/examples/edutap_wallet_google_example_callback/requirements.txt new file mode 100644 index 0000000..b17667d --- /dev/null +++ b/examples/edutap_wallet_google_example_callback/requirements.txt @@ -0,0 +1,2 @@ +-e ../.. +-e . diff --git a/examples/edutap_wallet_google_example_callback/swarm.yml b/examples/edutap_wallet_google_example_callback/swarm.yml new file mode 100644 index 0000000..efb2434 --- /dev/null +++ b/examples/edutap_wallet_google_example_callback/swarm.yml @@ -0,0 +1,47 @@ +version: '3.7' + +volumes: + logs: + driver_opts: + type: none + device: /data/wallet_google_example_callback + o: bind + +networks: + traefik-public: + external: true + driver: overlay + +services: + callback: + image: 'edutap_wallet_google_example_callback:latest' + volumes: + - logs:/logs + networks: + - traefik-public + environment: + EDUTAP_WALLET_GOOGLE_HANDLER_CALLBACK_VERIFY_SIGNATURE: "0" + deploy: + replicas: 1 + resources: + limits: + cpus: '1' + memory: 128M + labels: + - traefik.enable=true + - traefik.docker.network=traefik-public + - traefik.constraint-label=traefik-public + # SERVICE + - traefik.http.services.wallet_google_example_callback.loadbalancer.server.port=8080 + # DOMAIN TLS + - traefik.http.routers.wallet_google_example_callback-domain.rule=Host(`${EDUTAP_WALLET_GOOGLE_EXAMPLE_DOMAIN?Unset}`) + - traefik.http.routers.wallet_google_example_callback-domain.entrypoints=https + - traefik.http.routers.wallet_google_example_callback-domain.tls=true + - traefik.http.routers.wallet_google_example_callback-domain.tls.certresolver=le + - traefik.http.routers.wallet_google_example_callback-domain.service=wallet_google_example_callback + - traefik.http.routers.wallet_google_example_callback-domain.middlewares=gzip + # DOMAIN insecure + - traefik.http.routers.wallet_google_example_callback-domain-ins.rule=Host(`${EDUTAP_WALLET_GOOGLE_EXAMPLE_DOMAIN?Unset}`) + - traefik.http.routers.wallet_google_example_callback-domain-ins.entrypoints=http + - traefik.http.routers.wallet_google_example_callback-domain-ins.service=wallet_google_example_callback + - traefik.http.routers.wallet_google_example_callback-domain-ins.middlewares=gzip diff --git a/pyproject.toml b/pyproject.toml index 3e805ac..9b47b55 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,8 +42,9 @@ Issues = "https://github.com/edutap-eu/edutap.wallet_google/issues" Documentation = "https://docs.edutap.eu/packages/edutap_wallet_google/index.html" [project.optional-dependencies] -fastapi = [ +callback = [ "fastapi", + "httpx", ] test = [ "pytest", @@ -51,15 +52,13 @@ test = [ "pytest-explicit", "requests-mock", "tox", - "fastapi", - "httpx", + "edutap.wallet_google[callback]", ] typecheck = [ "google-auth-stubs", "mypy", "pytest-stub", "types-cryptography", - "types-requests", ] develop = [ "pdbpp", diff --git a/src/edutap/wallet_google/_vendor/google_pay_token_decryption.py b/src/edutap/wallet_google/_vendor/google_pay_token_decryption.py index 9a49b82..ddcc673 100644 --- a/src/edutap/wallet_google/_vendor/google_pay_token_decryption.py +++ b/src/edutap/wallet_google/_vendor/google_pay_token_decryption.py @@ -28,6 +28,7 @@ ECv2_PROTOCOL_VERSION = "ECv2" +ECv2_PROTOCOL_VERSION_SIGNING = "ECv2SigningOnly" class GooglePayError(Exception): @@ -208,9 +209,12 @@ def verify_signature(self, data: dict) -> None: :raises: An exception if the signatures could not be verified. """ - if data["protocolVersion"] != ECv2_PROTOCOL_VERSION: + if data["protocolVersion"] not in [ + ECv2_PROTOCOL_VERSION, + ECv2_PROTOCOL_VERSION_SIGNING, + ]: raise GooglePayError( - f"Only {ECv2_PROTOCOL_VERSION}-signed tokens are supported, but token is {data['protocolVersion']}-signed." + f"Only {ECv2_PROTOCOL_VERSION} or {ECv2_PROTOCOL_VERSION_SIGNING}-signed tokens are supported, but token is {data['protocolVersion']}-signed." ) self._verify_intermediate_signing_key(data) @@ -284,7 +288,7 @@ def _verify_message_signature(self, signed_key: dict, data: dict) -> None: except Exception: raise GooglePayError("Could not verify message signature") - def _filter_root_signing_keys(self) -> None: + def _filter_root_signing_keys(self, protocol=ECv2_PROTOCOL_VERSION) -> None: """ Filter the root signing keys to get only the keys that use ECv2 protocol and that either doesn't expire or has an expiry date in the future. @@ -292,7 +296,7 @@ def _filter_root_signing_keys(self) -> None: self.root_signing_keys = [ key for key in self.root_signing_keys - if key["protocolVersion"] == ECv2_PROTOCOL_VERSION + if key["protocolVersion"] == protocol and ( "keyExpiration" not in key or check_expiration_date_is_valid(key["keyExpiration"]) @@ -300,5 +304,5 @@ def _filter_root_signing_keys(self) -> None: ] if len(self.root_signing_keys) == 0: raise GooglePayError( - f"At least one root signing key must be {ECv2_PROTOCOL_VERSION}-signed and have a valid expiration date." + f"At least one root signing key must be {protocol}-signed and have a valid expiration date." ) diff --git a/src/edutap/wallet_google/handlers/fastapi.py b/src/edutap/wallet_google/handlers/fastapi.py index cccc08a..b91d854 100644 --- a/src/edutap/wallet_google/handlers/fastapi.py +++ b/src/edutap/wallet_google/handlers/fastapi.py @@ -17,11 +17,11 @@ # define routers for all use cases: callback, images, and the combined router (at bottom of file) router_callback = APIRouter( prefix=session_manager.settings.handler_prefix_callback, - tags=["edutap", "google_wallet"], + tags=["edutap.wallet_google"], ) router_images = APIRouter( prefix=session_manager.settings.handler_prefix_images, - tags=["edutap", "google_wallet"], + tags=["edutap.wallet_google"], ) diff --git a/src/edutap/wallet_google/handlers/validate.py b/src/edutap/wallet_google/handlers/validate.py index 26c512f..e84e35c 100644 --- a/src/edutap/wallet_google/handlers/validate.py +++ b/src/edutap/wallet_google/handlers/validate.py @@ -1,7 +1,47 @@ -from .._vendor.google_pay_token_decryption import GooglePayTokenDecryptor from ..models.handlers import CallbackData +from ..models.handlers import IntermediateSigningKey +from ..models.handlers import RootSigningPublicKeys +from ..models.handlers import SignedKey from ..models.handlers import SignedMessage from ..session import session_manager +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric.ec import ECDSA +from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey +from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey +from cryptography.hazmat.primitives.serialization import load_der_private_key +from cryptography.hazmat.primitives.serialization import load_der_public_key +from typing import cast + +import base64 +import httpx +import time + + +PROTOCOL_VERSION = "ECv2SigningOnly" +ALGORITHM = ECDSA(hashes.SHA256()) +GOOGLE_ROOT_SIGNING_PUBLIC_KEYS_URL = { + # see https://developers.google.com/pay/api/android/guides/resources/payment-data-cryptography#root-signing-keys + "testing": "https://payments.developers.google.com/paymentmethodtoken/test/keys.json", + "production": "https://payments.developers.google.com/paymentmethodtoken/keys.json", +} +GOOGLE_ROOT_SIGNING_PUBLIC_KEYS_VALUE: dict[str, RootSigningPublicKeys] = {} + + +def google_root_signing_public_keys(google_environment: str) -> RootSigningPublicKeys: + """ + Fetch Googles root signing keys once for the configured environment and return them or the cached value. + """ + if GOOGLE_ROOT_SIGNING_PUBLIC_KEYS_VALUE.get(google_environment, None) is not None: + return GOOGLE_ROOT_SIGNING_PUBLIC_KEYS_VALUE[google_environment] + # fetch once + resp = httpx.get(GOOGLE_ROOT_SIGNING_PUBLIC_KEYS_URL[google_environment]) + resp.raise_for_status() + GOOGLE_ROOT_SIGNING_PUBLIC_KEYS_VALUE[google_environment] = ( + RootSigningPublicKeys.model_validate_json(resp.text) + ) + return GOOGLE_ROOT_SIGNING_PUBLIC_KEYS_VALUE[google_environment] def _raw_private_key(in_key: str) -> str: @@ -18,17 +58,114 @@ def _raw_private_key(in_key: str) -> str: return result +def _construct_signed_data(*args: str) -> bytes: + """ + Construct the signed message from the list of its components by concatenating the + byte length of each component in 4 bytes little-endian format plus the UTF-8 encoded + component. + + See https://developers.google.com/pay/api/android/guides/resources/payment-data-cryptography#verify-signature + or https://developers.google.com/pay/api/android/guides/resources/payment-data-cryptography#how-to-construct-the-byte-string-for-intermediate-signing-key-signature + for an example. + """ + signed = b"" + for a in args: + signed += len(a).to_bytes(4, byteorder="little") + signed += bytes(a, "utf-8") + return signed + + +def _load_public_key(key: str) -> EllipticCurvePublicKey: + derdata = base64.b64decode(key) + return cast( + EllipticCurvePublicKey, + load_der_public_key(derdata, default_backend()), + ) + + +def _load_private_key(key: str) -> EllipticCurvePrivateKey: + derdata = base64.b64decode(key) + return cast( + EllipticCurvePrivateKey, + load_der_private_key(derdata, None, default_backend()), + ) + + +def _verify_intermediate_signing_key( + public_keys: RootSigningPublicKeys, + intermediate_signing_key: IntermediateSigningKey, +) -> bool: + """Check the intermediate signing keys signature against the Google root public keys. + + see https://developers.google.com/pay/api/android/guides/resources/payment-data-cryptography#verify-signature + """ + signatures = [ + base64.decodebytes(bytes(sig, "utf-8")) + for sig in intermediate_signing_key.signatures + ] + signed_data = _construct_signed_data( + "Google", + PROTOCOL_VERSION, + intermediate_signing_key.signedKey, + ) + for pkey in public_keys.keys: + + if pkey.protocolVersion != PROTOCOL_VERSION: + continue + + public_key = _load_public_key(pkey.keyValue) + for signature in signatures: + try: + public_key.verify(signature, signed_data, ALGORITHM) + except (ValueError, InvalidSignature): + # Invalid signature. Try the other signatures. + ... + else: + # Valid signature was found + return True + return False + + def verified_signed_message(data: CallbackData) -> SignedMessage: """ Verifies the signature of the callback data. and returns the parsed SignedMessage """ + settings = session_manager.settings - if settings.handler_callback_verify_signature: - decryptor = GooglePayTokenDecryptor( - settings.google_root_signing_public_keys.model_dump()["keys"], - settings.issuer_id, - _raw_private_key(settings.credentials_info["private_key"]), - ) - decryptor.verify_signature(data.model_dump(mode="json")) + if settings.handler_callback_verify_signature == "0": + # shortcut if signature validation is disabled + return SignedMessage.model_validate_json(data.signedMessage) + + if data.protocolVersion != PROTOCOL_VERSION: + raise ValueError("Invalid protocolVersion") + + # check intermediate signing keys signature + if not _verify_intermediate_signing_key( + google_root_signing_public_keys(settings.google_environment), + data.intermediateSigningKey, + ): + raise ValueError("Invalid intermediate signing key") + + # check intermediate signing keys expriration date + intermediate_signing_key = SignedKey.model_validate_json( + data.intermediateSigningKey.signedKey + ) + if int(time.time() * 1000) > int(intermediate_signing_key.keyExpiration): + raise ValueError("Expired intermediate signing key") + + # check signed message's signature + intermediate_public_key = _load_public_key(intermediate_signing_key.keyValue) + signature = base64.decodebytes(bytes(data.signature, "utf-8")) + signed_data = _construct_signed_data( + "GooglePayWallet", + settings.issuer_id, + PROTOCOL_VERSION, + data.signedMessage, + ) + try: + intermediate_public_key.verify(signature, signed_data, ALGORITHM) + except (ValueError, InvalidSignature): + raise ValueError("Invalid signature") + return SignedMessage.model_validate_json(data.signedMessage) diff --git a/src/edutap/wallet_google/models/handlers.py b/src/edutap/wallet_google/models/handlers.py index 30dcced..ae8b9f4 100644 --- a/src/edutap/wallet_google/models/handlers.py +++ b/src/edutap/wallet_google/models/handlers.py @@ -2,14 +2,14 @@ see https://developers.google.com/wallet/generic/use-cases/use-callbacks-for-saves-and-deletions """ -from .bases import Model from .datatypes.enums import CamelCaseAliasEnum +from pydantic import BaseModel # image fetching from data provider -class ImageData(Model): +class ImageData(BaseModel): mimetype: str data: bytes @@ -22,17 +22,17 @@ class EventType(CamelCaseAliasEnum): DEL = "DEL" -class SignedKey(Model): +class SignedKey(BaseModel): keyValue: str keyExpiration: int -class IntermediateSigningKey(Model): - signedKey: SignedKey | str +class IntermediateSigningKey(BaseModel): + signedKey: str signatures: list[str] -class SignedMessage(Model): +class SignedMessage(BaseModel): classId: str objectId: str eventType: EventType @@ -41,7 +41,7 @@ class SignedMessage(Model): nonce: str -class CallbackData(Model): +class CallbackData(BaseModel): signature: str intermediateSigningKey: IntermediateSigningKey protocolVersion: str @@ -51,7 +51,7 @@ class CallbackData(Model): # keys for callback data validation -class RootSigningPublicKey(Model): +class RootSigningPublicKey(BaseModel): """ see https://developers.google.com/pay/api/android/guides/resources/payment-data-cryptography#root-signing-keys """ @@ -61,7 +61,7 @@ class RootSigningPublicKey(Model): keyExpiration: str | None = None -class RootSigningPublicKeys(Model): +class RootSigningPublicKeys(BaseModel): """ see https://developers.google.com/pay/api/android/guides/resources/payment-data-cryptography#root-signing-keys """ diff --git a/src/edutap/wallet_google/settings.py b/src/edutap/wallet_google/settings.py index 0d679ea..ee384b7 100644 --- a/src/edutap/wallet_google/settings.py +++ b/src/edutap/wallet_google/settings.py @@ -1,4 +1,3 @@ -from .models.handlers import RootSigningPublicKeys from pathlib import Path from pydantic import EmailStr from pydantic import Field @@ -8,7 +7,6 @@ from typing import Literal import json -import requests ENV_PREFIX = "EDUTAP_WALLET_GOOGLE_" @@ -16,12 +14,6 @@ API_URL = "https://walletobjects.googleapis.com/walletobjects/v1" SAVE_URL = "https://pay.google.com/gp/v/save" SCOPES = ["https://www.googleapis.com/auth/wallet_object.issuer"] -GOOGLE_ROOT_SIGNING_PUBLIC_KEYS_URL = { - # see https://developers.google.com/pay/api/android/guides/resources/payment-data-cryptography#root-signing-keys - "testing": "https://payments.developers.google.com/paymentmethodtoken/test/keys.json", - "production": "https://payments.developers.google.com/paymentmethodtoken/keys.json", -} -GOOGLE_ROOT_SIGNING_PUBLIC_KEYS_VALUE: dict[str, RootSigningPublicKeys] = {} class Settings(BaseSettings): @@ -45,10 +37,10 @@ class Settings(BaseSettings): api_url: HttpUrl = HttpUrl(API_URL) save_url: HttpUrl = HttpUrl(SAVE_URL) - handler_prefix: str = "/googlewallet" + handler_prefix: str = "/wallet/google" handler_prefix_callback: str = "" handler_prefix_images: str = "" - handler_callback_verify_signature: bool = True + handler_callback_verify_signature: str = "1" handler_image_cache_control: str = "no-cache" handlers_callback_timeout: float = 5.0 handlers_image_timeout: float = 5.0 @@ -64,26 +56,6 @@ class Settings(BaseSettings): cached_credentials_info: dict[str, str] = {} - @property - def google_root_signing_public_keys(self) -> RootSigningPublicKeys: - """ - Fetch Googles root signing keys once for the configured environment and return them or the cached value. - """ - if ( - GOOGLE_ROOT_SIGNING_PUBLIC_KEYS_VALUE.get(self.google_environment, None) - is not None - ): - return GOOGLE_ROOT_SIGNING_PUBLIC_KEYS_VALUE[self.google_environment] - # fetch once - resp = requests.get( - GOOGLE_ROOT_SIGNING_PUBLIC_KEYS_URL[self.google_environment] - ) - resp.raise_for_status() - GOOGLE_ROOT_SIGNING_PUBLIC_KEYS_VALUE[self.google_environment] = ( - RootSigningPublicKeys.model_validate_json(resp.text) - ) - return GOOGLE_ROOT_SIGNING_PUBLIC_KEYS_VALUE[self.google_environment] - @property def credentials_info(self) -> dict[str, str]: if credentials_info := self.cached_credentials_info: diff --git a/tests/test_handler_fastapi.py b/tests/test_handler_fastapi.py index 3e4704a..1454860 100644 --- a/tests/test_handler_fastapi.py +++ b/tests/test_handler_fastapi.py @@ -20,7 +20,7 @@ def test_callback_disabled_signature_check_OK(mock_settings): from edutap.wallet_google.models.handlers import CallbackData # test callback handler without signature check - mock_settings.handler_callback_verify_signature = False + mock_settings.handler_callback_verify_signature = "0" callback_data = CallbackData(**real_callback_data) @@ -30,7 +30,7 @@ def test_callback_disabled_signature_check_OK(mock_settings): app.include_router(router) client = TestClient(app) resp = client.post( - "/googlewallet/callback", json=callback_data.model_dump(mode="json") + "/wallet/google/callback", json=callback_data.model_dump(mode="json") ) assert resp.status_code == 200 assert resp.json() == {"status": "success"} @@ -40,7 +40,7 @@ def test_callback_disabled_signature_check_ERROR(mock_settings): from edutap.wallet_google.models.handlers import CallbackData # test callback handler without signature check - mock_settings.handler_callback_verify_signature = False + mock_settings.handler_callback_verify_signature = "0" callback_data = CallbackData(**real_callback_data) callback_data.signedMessage = '{"classId":"","objectId":"","eventType":"save","expTimeMillis":1734366082269,"count":1,"nonce":""}' @@ -51,7 +51,7 @@ def test_callback_disabled_signature_check_ERROR(mock_settings): app.include_router(router) client = TestClient(app) resp = client.post( - "/googlewallet/callback", json=callback_data.model_dump(mode="json") + "/wallet/google/callback", json=callback_data.model_dump(mode="json") ) assert resp.status_code == 500 assert resp.text == '{"detail":"Error while handling the callbacks (exception)."}' @@ -61,7 +61,7 @@ def test_callback_disabled_signature_check_NOTIMPLEMENTED(monkeypatch, mock_sett from edutap.wallet_google.models.handlers import CallbackData # test callback handler without signature check - mock_settings.handler_callback_verify_signature = False + mock_settings.handler_callback_verify_signature = "0" def raise_not_implemented(): raise NotImplementedError @@ -81,7 +81,7 @@ def raise_not_implemented(): app.include_router(router) client = TestClient(app) resp = client.post( - "/googlewallet/callback", json=callback_data.model_dump(mode="json") + "/wallet/google/callback", json=callback_data.model_dump(mode="json") ) assert resp.status_code == 500 assert resp.text == '{"detail":"No callback handlers were registered."}' @@ -91,7 +91,7 @@ def test_callback_disabled_signature_check_TIMEOUT(mock_settings): from edutap.wallet_google.models.handlers import CallbackData # test callback handler without signature check - mock_settings.handler_callback_verify_signature = False + mock_settings.handler_callback_verify_signature = "0" # set low timeout to trigger a timeout cancellation mock_settings.handlers_callback_timeout = 0.1 @@ -104,7 +104,7 @@ def test_callback_disabled_signature_check_TIMEOUT(mock_settings): app.include_router(router) client = TestClient(app) resp = client.post( - "/googlewallet/callback", json=callback_data.model_dump(mode="json") + "/wallet/google/callback", json=callback_data.model_dump(mode="json") ) assert resp.status_code == 500 assert resp.text == '{"detail":"Error while handling the callbacks (timeout)."}' @@ -117,7 +117,7 @@ def test_image_OK(mock_fernet_encryption_key): app = FastAPI() app.include_router(router) client = TestClient(app) - resp = client.get(f"/googlewallet/images/{encrypt_data('OK')}") + resp = client.get(f"/wallet/google/images/{encrypt_data('OK')}") assert resp.status_code == 200 assert resp.text == "mock-a-jepg" @@ -129,7 +129,7 @@ def test_image_ERROR(mock_fernet_encryption_key): app = FastAPI() app.include_router(router) client = TestClient(app) - resp = client.get(f"/googlewallet/images/{encrypt_data('ERROR')}") + resp = client.get(f"/wallet/google/images/{encrypt_data('ERROR')}") assert resp.status_code == 404 assert resp.text == '{"detail":"Image not found."}' @@ -143,7 +143,7 @@ def test_image_TIMEOUT(mock_settings, mock_fernet_encryption_key): app = FastAPI() app.include_router(router) client = TestClient(app) - resp = client.get(f"/googlewallet/images/{encrypt_data('TIMEOUT')}") + resp = client.get(f"/wallet/google/images/{encrypt_data('TIMEOUT')}") assert resp.status_code == 500 assert resp.text == '{"detail":"Error while handling the image (timeout)."}' @@ -155,7 +155,7 @@ def test_image_CANCEL(mock_fernet_encryption_key): app = FastAPI() app.include_router(router) client = TestClient(app) - resp = client.get(f"/googlewallet/images/{encrypt_data('CANCEL')}") + resp = client.get(f"/wallet/google/images/{encrypt_data('CANCEL')}") assert resp.status_code == 500 assert resp.text == '{"detail":"Error while handling the image (cancel)."}' @@ -167,7 +167,7 @@ def test_image_UNEXPECTED(mock_fernet_encryption_key): app = FastAPI() app.include_router(router) client = TestClient(app) - resp = client.get(f"/googlewallet/images/{encrypt_data('UNEXPECTED')}") + resp = client.get(f"/wallet/google/images/{encrypt_data('UNEXPECTED')}") assert resp.status_code == 500 assert resp.text == '{"detail":"Error while handling the image (exception)."}' @@ -188,7 +188,7 @@ def raise_not_implemented(): app = FastAPI() app.include_router(router) client = TestClient(app) - resp = client.get(f"/googlewallet/images/{encrypt_data('ANYWAY')}") + resp = client.get(f"/wallet/google/images/{encrypt_data('ANYWAY')}") assert resp.status_code == 500 assert resp.text == '{"detail":"No image providers were registered."}' @@ -205,6 +205,6 @@ def test_image_TO_MANY_REGISTERED(monkeypatch, mock_fernet_encryption_key): app = FastAPI() app.include_router(router) client = TestClient(app) - resp = client.get(f"/googlewallet/images/{encrypt_data('ANYWAY')}") + resp = client.get(f"/wallet/google/images/{encrypt_data('ANYWAY')}") assert resp.status_code == 500 assert resp.text == '{"detail":"Multiple image providers found, abort."}' diff --git a/tests/test_handler_validate.py b/tests/test_handler_validate.py index a3e9bcb..6ece99c 100644 --- a/tests/test_handler_validate.py +++ b/tests/test_handler_validate.py @@ -1,15 +1,34 @@ -from edutap.wallet_google._vendor.google_pay_token_decryption import GooglePayError from edutap.wallet_google.models.handlers import CallbackData import json import pytest -callbackdata_for_test_failure = { +def test_google_public_key_cached_empty(mock_settings): + from edutap.wallet_google.handlers.validate import google_root_signing_public_keys + + assert google_root_signing_public_keys(mock_settings.google_environment) is not None + + from edutap.wallet_google.handlers.validate import ( + GOOGLE_ROOT_SIGNING_PUBLIC_KEYS_VALUE, + ) + + assert ( + GOOGLE_ROOT_SIGNING_PUBLIC_KEYS_VALUE.get( + mock_settings.google_environment, None + ) + is not None + ) + assert google_root_signing_public_keys(mock_settings.google_environment) is not None + + +callback_data_for_test_failure = { "signature": "foo", "intermediateSigningKey": { "signedKey": {"keyValue": "baz", "keyExpiration": 0}, - "signatures": ["bar"], + "signatures": [ + "MEUCIQD3IATpRM45gpno9Remtx/FiDCOJUp45+C+Qzw6IrgphwIgJijXISc+Ft8Sj9eXNowzuYyXyWlgKAE+tVnN24Sek5M=" + ], }, "protocolVersion": "ECv2SigningOnly", "signedMessage": json.dumps( @@ -25,11 +44,34 @@ } +@pytest.mark.skip(reason="Not implemented") def test_handler_validate_invalid(mock_settings): from edutap.wallet_google.handlers.validate import verified_signed_message - mock_settings.handler_callback_verify_signature = True + mock_settings.handler_callback_verify_signature = "1" - data = CallbackData.model_validate(callbackdata_for_test_failure) - with pytest.raises(GooglePayError): + data = CallbackData.model_validate(callback_data_for_test_failure) + with pytest.raises(Exception): verified_signed_message(data) + + +callback_data = { + "signature": "MEYCIQCyuBQo/Dao7yUBDUWK12ATFBDkUfJUnropjOaPbPiKEwIhAKXNiVrbNmydpEVIxXRz5z36f8HV2Meq/Td6tqt2+DYO", + "intermediateSigningKey": { + "signedKey": '{"keyValue":"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE3JpSX3AU53vH+IpWBdsbqrL7Ey67QSERsDUztFt8q7t7PzVkh14SeYeokI1zSZiVAWnx4tXD1tCPbrvfFGB8OA\u003d\u003d","keyExpiration":"1735023986000"}', + "signatures": [ + "MEUCIQD3IBTpRM45gpno9Remtx/FiDCOJUp45+C+Qzw6IrgphwIgJijXISc+Ft8Sj9eXNowzuYyXyWlgKAE+tVnN24Sek5M=" + ], + }, + "protocolVersion": "ECv2SigningOnly", + "signedMessage": '{"classId":"3388000000022141777.pass.gift.dev.edutap.eu","objectId":"3388000000022141777.9fd4e525-777c-4e0d-878a-b7993e211997","eventType":"save","expTimeMillis":1734366082269,"count":1,"nonce":"c1359b53-f2bb-4e8f-b392-9a560a21a9a0"}', +} + + +def test_handler_validate_ok(mock_settings): + from edutap.wallet_google.handlers.validate import verified_signed_message + + mock_settings.handler_callback_verify_signature = "0" + + data = CallbackData.model_validate(callback_data) + verified_signed_message(data) diff --git a/tests/test_settings.py b/tests/test_settings.py index 1e358be..51da492 100644 --- a/tests/test_settings.py +++ b/tests/test_settings.py @@ -38,20 +38,6 @@ def test_settings_cached(mock_settings): assert mock_settings.credentials_info == "test" -def test_settings_cached_empty(mock_settings): - assert mock_settings.google_root_signing_public_keys is not None - - from edutap.wallet_google.settings import GOOGLE_ROOT_SIGNING_PUBLIC_KEYS_VALUE - - assert ( - GOOGLE_ROOT_SIGNING_PUBLIC_KEYS_VALUE.get( - mock_settings.google_environment, None - ) - is not None - ) - assert mock_settings.google_root_signing_public_keys is not None - - def test_settings_no_credentials_file(mock_settings): mock_settings.credentials_file = pathlib.Path("nonexistent.json") with pytest.raises(FileNotFoundError): diff --git a/tests/test_vendor_google_pay_token_encryptions.py b/tests/test_vendor_google_pay_token_encryptions.py deleted file mode 100644 index 2df411e..0000000 --- a/tests/test_vendor_google_pay_token_encryptions.py +++ /dev/null @@ -1,345 +0,0 @@ -""" -This module was vendored from https://github.com/yoyowallet/google-pay-token-decryption -Copyright is by its original authors at Yoyo Wallet -It is under the MIT License, as found here https://github.com/yoyowallet/google-pay-token-decryption/blob/5cd006da9687171c1e35b55507b671c6e4eb513d/pyproject.toml#L8 -""" - -from contextlib import contextmanager -from edutap.wallet_google._vendor.google_pay_token_decryption import ( - check_expiration_date_is_valid, -) -from edutap.wallet_google._vendor.google_pay_token_decryption import ( - ECv2_PROTOCOL_VERSION, -) -from edutap.wallet_google._vendor.google_pay_token_decryption import GooglePayError -from edutap.wallet_google._vendor.google_pay_token_decryption import ( - GooglePayTokenDecryptor, -) - -import datetime -import pytest - - -def datetime_to_milliseconds(input_date: datetime.datetime): - return str(int(input_date.timestamp() * 1000)) - - -valid_signature = "MEQCIFBle+JsfsovRBeoFEYKWFAeBYFAhq0S+GtusiosjV4lAiAGcK9qfVpnqG6Hw8cbGBQ79beiAs6IIkBxBfeKDBR+kA==" -invalid_signature = "invalid_signature" - -valid_root_signing_key = { - "keyValue": "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE/1+3HBVSbdv+j7NaArdgMyoSAM43yRydzqdg1TxodSzA96Dj4Mc1EiKroxxunavVIvdxGnJeFViTzFvzFRxyCw==", - "keyExpiration": "32506264800000", - "protocolVersion": "ECv2", -} - -_valid_signature = "MEQCIFBle+JsfsovRBeoFEYKWFAeBYFAhq0S+GtusiosjV4lAiAGcK9qfVpnqG6Hw8cbGBQ79beiAs6IIkBxBfeKDBR+kA==" - - -@pytest.fixture -def encrypted_token(): - """ - Test tokens generated using the Tink test code: - https://github.com/google/tink/blob/06aa21432e1985fea4ab26c26f6038895b22cce0/apps/paymentmethodtoken/src/test/java/com/google/crypto/tink/apps/paymentmethodtoken/PaymentMethodTokenRecipientTest.java#L1042-L1059 - """ - return { - "signature": "MEYCIQCbtFh9UIf1Ty3NKZ2z0ZmL0SHwR30uiRGuRXk9ghpyrwIhANiZQ0Df6noxkQ6M652PcIPkk2m1PQhqiq4UhzvPQOYf", - "intermediateSigningKey": { - "signedKey": '{"keyValue":"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE/1+3HBVSbdv+j7NaArdgMyoSAM43yRydzqdg1TxodSzA96Dj4Mc1EiKroxxunavVIvdxGnJeFViTzFvzFRxyCw==","keyExpiration":"1879409613939"}', - "signatures": [_valid_signature], - }, - "protocolVersion": "ECv2", - "signedMessage": '{"encryptedMessage":"PeYi+ZnJs1Gei1dSOkItdfFG8Y81FvEI7dHE0sSrSU6OPnndftV/qDbbmXHmppoyP/2lhF+XsH93qzD3u46BRnxxPtetzGT0533rIraskTj8SZ6FVYY1Opfo7FECGk57FfF8aDaCSOoyTh1k0v6wdxVwEVvWqG1T/ij+u2KWOw5G1WSB/RVicni0Az13ModYb0KMdMws1USKlWxBfKU5PtxibVx4fZ95HYQ82qgHlV4ToKaUY7YWud1iEspmFsBMk0nh4t1hVxRzsxKUjMV1915qD5yq7k5n9YPao2mR9NJgLPDktsc4uf9bszzvnqhz3T1YID43QwX16yCyn/YxNVe3dJ1+S+BGyJ+vyKXp+Zh4SlIua2NFLwnR06Es3Kvl6LlOGasoPC/tMAWYLQlGsl+vHK3mrMZjC6KbOsXg+2mrlZwL+QOt3ih2jIPe","ephemeralPublicKey":"BD6pQKpy7yDebAX4qV0u/AfMYNQhOD+teyoa/5SsxwTGCoC1ZKHxNMb5BXvRmBcYGPNTx8+fAkEwzJ8GqbX/Q7E=","tag":"8gFteCvCuamX1RmL7ORdHqleyBf0N55OfAs80RYGgwc="}', - } - - -@pytest.fixture -def encrypted_token_with_invalid_signature(encrypted_token): - encrypted_token["intermediateSigningKey"]["signatures"] = [invalid_signature] - return encrypted_token - - -encrypted_expired_token = { - "signature": "MEUCIQCv+gDxUajhYqBcI2tt6zMCekinJsaYL31/aBtS74YN4QIgIZGFztAVTgyV2CB51NIfTtSzQBxNA52P4R8H7K5N/jE=", - "intermediateSigningKey": { - "signedKey": '{"keyValue":"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE/1+3HBVSbdv+j7NaArdgMyoSAM43yRydzqdg1TxodSzA96Dj4Mc1EiKroxxunavVIvdxGnJeFViTzFvzFRxyCw==","keyExpiration":"1880104348293"}', - "signatures": [ - "MEUCIQDpPpV60rY2VLeDKcM3YNmAs+07Qsr8ZVoj8ZDNFcTCAQIgeVH3C/zUVWSIyd+/nO+AlMAemtcfUCX+71VWEF3T4yI=" - ], - }, - "protocolVersion": "ECv2", - "signedMessage": '{"encryptedMessage":"eYKTj3VRk9b4EbvYped0pHvw8ZZTMqRtK6xZ1wzuvNHB3vQHrClYA5wZeujpzwM17SV9HpEzk1r2cKzZC+fpQpoLXk+XWjrPwKUGnPNDLW9aGcNVMqQZrspduLYnZ+SljnIxIBPS13APS70FUaEIj+WZ6HTe7rjhtPOFRJhs0LO5Q4PsGgkpK5d7hw6GNcpaFaVIolaFvHyg7spC9+U7F9fbOcDu1lCx67DjsKoiRPbToXfdu9mgPro0UT5RdTZGdydeijlKBwlv3xToq3M5w7xI2GyaPfvRrQmguADTlCLUL/g0IEYxWXDz/SvpKzU5olLP1lZ+Jpvu+Ah9HYhwyOAetMiCVSEolYjveKDJM0tRnixTOvWtQ1c9ezBkIjyl/iC3Kc+uDliswZeC7E6FHqb0sVs/IfImwO5kEdRgi138t6Ztl2Mvz1muMJ22avYw1wlhU6+46k+b8iirTD8WifVd2rEj19o6kMikFlo43rgz7aYtWNZnIFE//BSBy+eNvG0/aFrFdevrNiboxn++B1A=","ephemeralPublicKey":"BDMecFYEc1K+22fvcZImwfrRTa0r4Tiay/fFH3W+Ktnd/Zl1uiq4XwfvTfJwJB50elBmmyY43MdlyZjqiZjWJsw=","tag":"CW+gKSA9fvg4zX+9QBZwtmYcEvLGlTywopwWZIVZwhQ="}', -} - -encrypted_plaintext = { - "signature": "MEUCIE9yZoaWuT+xS8GRAeFMhox/FYmmHaZqSjD/g4fJBBAeAiEAiuQzXoPWRSB6AhSD81q2bUOTLK4k+MntQx2UERk67fE=", - "intermediateSigningKey": { - "signedKey": '{"keyValue":"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE/1+3HBVSbdv+j7NaArdgMyoSAM43yRydzqdg1TxodSzA96Dj4Mc1EiKroxxunavVIvdxGnJeFViTzFvzFRxyCw==","keyExpiration":"1880104972775"}', - "signatures": [ - "MEYCIQC70Y6VnxvXnwQ5HgJCD6HxPa2EMDCU6AVk6gAFi/A2TwIhAIvAbw5VShuO6uf8N0qJ8l8oVT3TMBX2Zv28+FKlvnRL" - ], - }, - "protocolVersion": "ECv2", - "signedMessage": '{"encryptedMessage":"ssCh5UO4l4Xp","ephemeralPublicKey":"BKg85WIik76zjAKEFehNZ4seOh2/RX8WNPpX1gBzgQlsZCNYCj/2O8TsbDxYI2c9L5yBSpJOLy6AOE3q+5m3idg=","tag":"yRYHWOnuDsx6/KjS7axdL8YIPXuMnExw6FDldolRl+M="}', -} - -encrypted_invalid_tag = { - "signature": "MEQCIGZJz6qTyiMeVzlwJ2FQtUUh5bfORrT7/ZW7iqtAOa7JAiBBOTPXLNr/gfgEmlAfUW6c7OpwriuYzW8SUMWNz6Qy3g==", - "intermediateSigningKey": { - "signedKey": '{"keyValue":"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE/1+3HBVSbdv+j7NaArdgMyoSAM43yRydzqdg1TxodSzA96Dj4Mc1EiKroxxunavVIvdxGnJeFViTzFvzFRxyCw==","keyExpiration":"1880106295412"}', - "signatures": [ - "MEUCIQCJihoRFfDp170pXqtUn0u9eDX8pU5r06HzcSisttR+fwIgQH9ROgfNj69y3HSWOiSVGbSYQ/XD6kvWDHI43Qo9CpQ=" - ], - }, - "protocolVersion": "ECv2", - "signedMessage": '{"encryptedMessage":"wFnzlFi/al6Kwk8/VFZp5R25WqN8eWQzRBZpjHbzDYMYeI97DeMEewI0BevsAgbQkJG03ktxOvUcFGjWVc/tePDBJUKYZDWwNosvRV4LgUkC7n7ByGntSJ/ekqXlm48UDAOPYchRTG6KvMdtYAfZQ5TbggbmLiJkvO55eHwyBHbgYiERJMQLh50QTp2peL7ZC7f8lBZ0Z3VWSEDLyFza93uadlzMEdhjXcgWwxIWAtxmMFvdLaXn0Crp7m0b90CfbcBSzsf4OeVbbUA8PoFhYJ9qjRErA+4MO+cXvtl3+AwiudVs3MtsvZX2Be/LniKHZoGNz34gn2+MTd9tGeiw3kRBYgwOVMciVcXtiq47XCcj5ttszfXctbNzuCS+MTDp0V3W0T/i9B2Vj6ocFFTOqvTz8AtMDfphXoSGAV+txv4S8iclFjKiMXHJw6dmI/deabisdXPoqAhpdfO3DGlO48NQF4Fwo0qUCwTdVeZ3HNYD0NYYKt6eKghdrXZFcDaE9FPs1xVFYUzJeupV2+Kpr9J9vA==","ephemeralPublicKey":"BC7Gy3uiwBWRh1qYRoJ8DhD05XSr2RjW2NcNIKXhxx1gfsp1BuET9uxaFvujKFTFSAPSYdYSRYB4mDzb7rO+12I=","tag":"AYcBmZRnlcPtU/t/CWUQJMDM6BjNEH3klAE1HxlvXPc="}', -} - -decrypted_google_pay_token = { - "messageExpiration": "32506264800000", - "messageId": "AH2EjtfkY514K5lmPF4NOP9lMR5tPedsjQR719hIzI-zB1g0A-TBlYInGQuEVQeIWGlajqEpvSyrl3r_iN0RxoV9RYjxqnzG-kXmcBNkferp4NfNjVqxYrVT0e5JRzU3dQjkb0tQWOxN", - "paymentMethod": "CARD", - "paymentMethodDetails": { - "expirationYear": 2026, - "expirationMonth": 12, - "pan": "4111111111111111", - "authMethod": "PAN_ONLY", - }, -} - - -@contextmanager -def does_not_raise(): - yield - - -@pytest.fixture -def encrypted_google_pay_token_with_expired_intermediate_signature(): - """ - Generated using the sealECV2 method in the Java Tink library: - https://github.com/google/tink/blob/06aa21432e1985fea4ab26c26f6038895b22cce0/apps/paymentmethodtoken/src/test/java/com/google/crypto/tink/apps/paymentmethodtoken/PaymentMethodTokenRecipientTest.java#L1042-L1059 - """ - return { - "signature": "MEQCIADUoxj1TKGFieh3aPn4rShKyM6bGtHi+SabRnvAlB33AiAIgjQIfZ7hMDOuxMXC/lrm4COrqH/PJ4vRtmBZn9438Q==", - "intermediateSigningKey": { - "signedKey": '{"keyValue":"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE/1+3HBVSbdv+j7NaArdgMyoSAM43yRydzqdg1TxodSzA96Dj4Mc1EiKroxxunavVIvdxGnJeFViTzFvzFRxyCw==","keyExpiration":"1361023457681"}', - "signatures": [ - "MEYCIQD3fluX1fLwuuo/oCuyifmGM1xisLNylRZ5902dbjabSQIhAOuDBulLTtF5vuo6TxFBD/s3J4PqVzBC5y5d28Y4adsN" - ], - }, - "protocolVersion": "ECv2", - "signedMessage": '{"encryptedMessage":"2aZvfZU49a0/Pi+Jl/1qFWK5faeDQvpWeXW+BcWDRVaYeqV4oAgfzp0Z0v6KPZMu/9bWeuHVRlymQruiqPHJlggCb3syo0HOz2ls59YEpTWfSZsWEGwhoiIbrrcZ953IQ1gxzaahYt6mAXIlHwAujhyqBcw8QdkzPgnr2PJhDHIGioy2u4iHnWHhvJkdcwbmtifd+pS/KDzN40ipFhaYwFPikRi9Br5vT1SEEbOH4UCY5ceJH3ZQJeaWCBYYrJU8ZpJjFXvOerRxLB995lNMHYDHV5jh5i3CMv7Sb4CjrVUy3ld7zhXOlDtRSbwze5aaFUqGjEIoIlwO56pB5qPCXAj/zjT67K5HGfx0/yc8hzT+RAc0lLCrIjBW9SWxPzq8hzhTWbsmI9hp8UcEO/H+EdlL0i8ENVXSRehCl7/LEEJKS3EBRR3h7W1ojZtl","ephemeralPublicKey":"BPxDq7BdXa7TBuY4PdlQVqfLjpgSNvC5TJgWZ6WetuR269iQHZVxohMbgUlHl1Hbs2JpXwPNpDLgzHOizvi+aAw=","tag":"IiMYvCc3gEvR4P3xhF5DaN/cyjOc++NJmqxthrbdc0U="}', - } - - -@pytest.fixture -def root_signing_keys(): - return [valid_root_signing_key] - - -@pytest.fixture -def private_key(): - return "MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgCPSuFr4iSIaQprjjchHPyDu2NXFe0vDBoTpPkYaK9dehRANCAATnaFz/vQKuO90pxsINyVNWojabHfbx9qIJ6uD7Q7ZSxmtyo/Ez3/o2kDT8g0pIdyVIYktCsq65VoQIDWSh2Bdm" - - -@pytest.fixture -def recipient_id(): - return "someRecipient" - - -@pytest.fixture -def google_pay_token_decryptor(root_signing_keys, recipient_id, private_key): - return GooglePayTokenDecryptor(root_signing_keys, recipient_id, private_key) - - -class TestGooglePayTokenDecryptor: - @pytest.mark.parametrize( - ("invalid_root_signing_keys", "error_message"), - [ - ("keys", "root_signing_keys must be a list"), - ( - [ - { - "keyValue": "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE/1+3HBVSbdv+j7NaArdgMyoSAM43yRydzqdg1TxodSzA96Dj4Mc1EiKroxxunavVIvdxGnJeFViTzFvzFRxyCw==", - "keyExpiration": datetime_to_milliseconds( - datetime.datetime(2000, 1, 1) - ), - "protocolVersion": "ECv2", - } - ], - f"At least one root signing key must be {ECv2_PROTOCOL_VERSION}-signed and have a valid expiration date.", - ), - ], - ) - def test_init_root_signing_keys_validation( - self, recipient_id, invalid_root_signing_keys, private_key, error_message - ): - with pytest.raises(GooglePayError, match=error_message): - GooglePayTokenDecryptor( - invalid_root_signing_keys, recipient_id, private_key - ) - - def test_init_filter_root_signing_keys(self, recipient_id, private_key): - # GIVEN a list of keys - non_expired_ecv2_key = { - "keyValue": "abcd", - "keyExpiration": datetime_to_milliseconds(datetime.datetime(2040, 1, 1)), - "protocolVersion": "ECv2", - } - expired_key = { - "keyValue": "abcd", - "keyExpiration": datetime_to_milliseconds(datetime.datetime(2010, 1, 1)), - "protocolVersion": "ECv2", - } - non_ecv2_key = { - "keyValue": "abcd", - "keyExpiration": datetime_to_milliseconds(datetime.datetime(2040, 1, 1)), - "protocolVersion": "ECv1", - } - - keys = [non_expired_ecv2_key, expired_key, non_ecv2_key] - - # WHEN the GooglePayTokenDecryptor is created - decryptor = GooglePayTokenDecryptor(keys, recipient_id, private_key) - - # THEN the invalid keys are filtered out - assert decryptor.root_signing_keys == [non_expired_ecv2_key] - - @pytest.mark.parametrize( - ("intermediate_signing_key_signatures"), - [ - ( - [ - valid_signature, - ] - ), - ( - [ - "invalid-signature", # Only one signature needs to be valid - valid_signature, - ] - ), - ], - ) - def test_verify_signature__success( - self, - encrypted_token, - google_pay_token_decryptor, - intermediate_signing_key_signatures, - ): - encrypted_token["intermediateSigningKey"][ - "signatures" - ] = intermediate_signing_key_signatures - google_pay_token_decryptor.verify_signature(encrypted_token) - # No exceptions will be raised if at least one signature is valid - assert True - - def test_verify_signature__intermediate_key_expired( - self, - encrypted_google_pay_token_with_expired_intermediate_signature, - google_pay_token_decryptor, - ): - with pytest.raises( - GooglePayError, match="Intermediate signing key has expired" - ): - google_pay_token_decryptor.verify_signature( - encrypted_google_pay_token_with_expired_intermediate_signature - ) - - def test_verify_signature__invalid_intermediate_signature( - self, encrypted_token, google_pay_token_decryptor - ): - encrypted_token["intermediateSigningKey"]["signatures"][0] = "invalid-signature" - with pytest.raises( - GooglePayError, match="Could not verify intermediate signing key signature" - ): - google_pay_token_decryptor.verify_signature(encrypted_token) - - def test_verify_signature__invalid_signature( - self, encrypted_token, google_pay_token_decryptor - ): - encrypted_token["signature"] = "invalid-signature" - with pytest.raises(GooglePayError, match="Could not verify message signature"): - google_pay_token_decryptor.verify_signature(encrypted_token) - - def test_verify_signature__invalid_protocol( - self, encrypted_token, google_pay_token_decryptor - ): - encrypted_token["protocolVersion"] = "ECv1" - with pytest.raises( - GooglePayError, - match=f"Only {ECv2_PROTOCOL_VERSION}-signed tokens are supported, but token is {encrypted_token['protocolVersion']}-signed.", - ): - google_pay_token_decryptor.verify_signature(encrypted_token) - - @pytest.mark.parametrize( - ("verify"), - [True, False], - ) - def test_decrypt_token__success( - self, - encrypted_token, - google_pay_token_decryptor, - verify, - ): - assert ( - google_pay_token_decryptor.decrypt_token(encrypted_token, verify) - == decrypted_google_pay_token - ) - - @pytest.mark.parametrize( - ("verify", "expectation"), - [ - (False, does_not_raise()), - ( - True, - pytest.raises( - Exception, - match="Could not verify intermediate signing key signature", - ), - ), - ], - ) - def test_decrypt_token_with_invalid_signature( - self, - encrypted_token_with_invalid_signature, - google_pay_token_decryptor, - verify, - expectation, - ): - with expectation: - assert ( - google_pay_token_decryptor.decrypt_token( - encrypted_token_with_invalid_signature, verify - ) - == decrypted_google_pay_token - ) - - @pytest.mark.parametrize( - ("encrypted_token", "reason"), - [ - (encrypted_expired_token, "Token message has expired."), - ( - encrypted_plaintext, - "Token payload does not contain valid JSON. Payload: 'plaintext'", - ), - (encrypted_invalid_tag, "Tag is not a valid MAC for the encrypted message"), - ], - ) - def test_decrypt_token__fail( - self, google_pay_token_decryptor, encrypted_token, reason - ): - with pytest.raises(GooglePayError, match=reason): - google_pay_token_decryptor.decrypt_token(encrypted_token) - - -@pytest.mark.parametrize( - ("expiration", "expired"), - [ - (datetime_to_milliseconds(datetime.datetime(2040, 1, 1)), True), - (datetime_to_milliseconds(datetime.datetime(2000, 1, 1)), False), - (datetime_to_milliseconds(datetime.datetime(2019, 12, 1)), False), - ], -) -def test_check_expiration_date_is_valid(expiration, expired): - assert check_expiration_date_is_valid(expiration) == expired