From 89bf270eb94146ff149d82ceae0c932134977f10 Mon Sep 17 00:00:00 2001 From: Hamidreza Ghafghazi 10054225 Date: Tue, 25 May 2021 14:26:02 -0400 Subject: [PATCH] SH-5754 Adding CPV4 CA_ extension functions Adding CA_MigrateKeys --- docs/conf.py | 2 +- pycryptoki/ca_extensions/cpv4.py | 59 +++++ pycryptoki/cryptoki/__init__.py | 4 + pycryptoki/cryptoki/ck_defs.py | 16 ++ pycryptoki/cryptoki/func_defs.py | 8 + pycryptoki/daemon/rpyc_pycryptoki.py | 3 + pycryptoki/defines.py | 1 + setup.py | 2 +- tests/functional/conftest.py | 9 + tests/functional/test_cpv4.py | 357 +++++++++++++++++++++++++++ 10 files changed, 459 insertions(+), 2 deletions(-) create mode 100644 pycryptoki/ca_extensions/cpv4.py create mode 100755 tests/functional/test_cpv4.py diff --git a/docs/conf.py b/docs/conf.py index 5b12c9f..4fadba4 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -61,7 +61,7 @@ # The short X.Y version. version = "2.5" # The full version, including alpha/beta/rc tags. -release = "2.5.22" +release = "2.5.23" # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/pycryptoki/ca_extensions/cpv4.py b/pycryptoki/ca_extensions/cpv4.py new file mode 100644 index 0000000..254303b --- /dev/null +++ b/pycryptoki/ca_extensions/cpv4.py @@ -0,0 +1,59 @@ +""" +cpv4 ca extensions +""" +import logging +from collections import namedtuple +from copy import deepcopy + +from pycryptoki.defines import CKR_OK +from pycryptoki.cryptoki import ( + CA_MigrateKeys, + CK_ULONG, + CK_SESSION_HANDLE, + CK_OBJECT_MIGRATION_DATA, +) +from pycryptoki.exceptions import make_error_handle_function + + +LOG = logging.getLogger(__name__) +MIGRATION_KEYS = ["object_type", "source_handle"] +MIGRATION_DATA = namedtuple("MIGRATION_DATA", deepcopy(MIGRATION_KEYS)) + + +def get_mig_data_c_struct(mig_data_list): + """ + Build an array of :class:`~pycryptoki.cryptoki.CK_OBJECT_MIGRATION_DATA` Structs & return it. + + :return: :class:`~pycryptoki.cryptoki.CK_OBJECT_MIGRATION_DATA` array + """ + ret_struct = (CK_OBJECT_MIGRATION_DATA * len(mig_data_list))() + for index, mig_data in enumerate(mig_data_list): + object_type, source_handle = mig_data + ret_struct[index] = CK_OBJECT_MIGRATION_DATA( + objectType=object_type, sourceHandle=source_handle + ) + return ret_struct + + +def ca_migrate_keys( + source_session, target_session, migration_flags, num_objects, objects_to_migrate +): + """ + Runs CA_MigrateKeys command + + :param objects_to_migrate: a list of tuples (objectType, sourceHandle) or list of MIGRATION_DATA + """ + objects_to_migrate = ( + objects_to_migrate if isinstance(objects_to_migrate, list) else [objects_to_migrate] + ) + c_mig_data = get_mig_data_c_struct(objects_to_migrate) + + ret = CA_MigrateKeys(source_session, target_session, migration_flags, num_objects, c_mig_data) + + if ret != CKR_OK: + return ret, None + + return ret, [(data.rv, data.targetHandle) for data in c_mig_data] + + +ca_migrate_keys_ex = make_error_handle_function(ca_migrate_keys) diff --git a/pycryptoki/cryptoki/__init__.py b/pycryptoki/cryptoki/__init__.py index 60e8515..7e1c0d5 100644 --- a/pycryptoki/cryptoki/__init__.py +++ b/pycryptoki/cryptoki/__init__.py @@ -138,6 +138,8 @@ "CA_InitSlotRolePIN", "CA_InitializeRemotePEDVector", "CA_Insert", + "CA_MigrateKeys", + "CA_MigrationStartSessionNegotiation", "CA_InsertMaskedObject", "CA_InvokeService", "CA_InvokeServiceAsynch", @@ -476,6 +478,8 @@ "CK_CPV4_EXTRACT_PARAMS_PTR", "CK_CPV4_INSERT_PARAMS", "CK_CPV4_INSERT_PARAMS_PTR", + "CK_OBJECT_MIGRATION_DATA", + "CK_OBJECT_MIGRATION_DATA_PTR", "C_CancelFunction", "C_CloseAllSessions", "C_CloseSession", diff --git a/pycryptoki/cryptoki/ck_defs.py b/pycryptoki/cryptoki/ck_defs.py index a8bb876..4178dec 100644 --- a/pycryptoki/cryptoki/ck_defs.py +++ b/pycryptoki/cryptoki/ck_defs.py @@ -1126,6 +1126,22 @@ def __init__(self, aid=None): struct_def(CK_APPLICATION_ID, [("id", CK_BYTE * 16)]) +class CK_OBJECT_MIGRATION_DATA(Structure): + pass + + +struct_def( + CK_OBJECT_MIGRATION_DATA, + [ + ("objectType", CK_ULONG), + ("sourceHandle", CK_OBJECT_HANDLE), + ("targetHandle", CK_OBJECT_HANDLE), + ("rv", CK_RV), + ], +) +CK_OBJECT_MIGRATION_DATA_PTR = POINTER(CK_OBJECT_MIGRATION_DATA) + + class CK_CPV4_EXTRACT_PARAMS(Structure): pass diff --git a/pycryptoki/cryptoki/func_defs.py b/pycryptoki/cryptoki/func_defs.py index 7dd7cde..7d32ab3 100644 --- a/pycryptoki/cryptoki/func_defs.py +++ b/pycryptoki/cryptoki/func_defs.py @@ -578,6 +578,14 @@ ) CA_Extract = make_late_binding_function("CA_Extract", [CK_SESSION_HANDLE, CK_MECHANISM_PTR]) CA_Insert = make_late_binding_function("CA_Insert", [CK_SESSION_HANDLE, CK_MECHANISM_PTR]) +CA_MigrateKeys = make_late_binding_function( + "CA_MigrateKeys", + [CK_SESSION_HANDLE, CK_SESSION_HANDLE, CK_ULONG, CK_ULONG, CK_OBJECT_MIGRATION_DATA_PTR], +) +CA_MigrationStartSessionNegotiation = make_late_binding_function( + "CA_MigrationStartSessionNegotiation", + [CK_SESSION_HANDLE, CK_ULONG, CK_BYTE_PTR, CK_ULONG_PTR, CK_ULONG_PTR, CK_BYTE_PTR], +) CA_GetTokenObjectUID = make_late_binding_function( "CA_GetTokenObjectUID", [CK_SLOT_ID, CK_ULONG, CK_ULONG, POINTER(CK_BYTE)] ) diff --git a/pycryptoki/daemon/rpyc_pycryptoki.py b/pycryptoki/daemon/rpyc_pycryptoki.py index c96c4a4..0ce7f6d 100755 --- a/pycryptoki/daemon/rpyc_pycryptoki.py +++ b/pycryptoki/daemon/rpyc_pycryptoki.py @@ -187,6 +187,7 @@ ca_get_cv_firmware_version, ca_get_cv_firmware_version_ex, ) +from pycryptoki.ca_extensions.cpv4 import ca_migrate_keys, ca_migrate_keys_ex from pycryptoki.cryptoki import CK_ULONG from pycryptoki.encryption import ( c_encrypt, @@ -754,6 +755,8 @@ def test_attrs(attributes): ca_stc_get_digest_ids_ex = staticmethod(ca_stc_get_digest_ids_ex) ca_stc_get_digest_name_by_id = staticmethod(ca_stc_get_digest_name_by_id) ca_stc_get_digest_name_by_id_ex = staticmethod(ca_stc_get_digest_name_by_id_ex) + ca_migrate_keys = staticmethod(ca_migrate_keys) + ca_migrate_keys_ex = staticmethod(ca_migrate_keys_ex) def server_launch(service, ip, port, config): diff --git a/pycryptoki/defines.py b/pycryptoki/defines.py index ad20531..d3474e7 100755 --- a/pycryptoki/defines.py +++ b/pycryptoki/defines.py @@ -1507,6 +1507,7 @@ CKF_EC_UNCOMPRESS = 0x01000000 CKF_EC_COMPRESS = 0x02000000 CKF_EXTENSION = 0x80000000 +CKF_CPV4_CONTINUE_ON_ERR = 0x01 CKR_ARGUMENTS_BAD = 0x00000007 CKR_ATTRIBUTE_READ_ONLY = 0x00000010 CKR_ATTRIBUTE_SENSITIVE = 0x00000011 diff --git a/setup.py b/setup.py index a279d73..1b033ee 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ description="A python wrapper around the C cryptoki library.", author="Ashley Straw", url="https://github.com/gemalto/pycryptoki", - version="2.5.22", + version="2.5.23", packages=[ "pycryptoki", "pycryptoki.cryptoki", diff --git a/tests/functional/conftest.py b/tests/functional/conftest.py index 78f05e1..783d08f 100644 --- a/tests/functional/conftest.py +++ b/tests/functional/conftest.py @@ -50,6 +50,14 @@ def pytest_addoption(parser): default=os.environ.get("SLOT", 1), dest="test_slot", ) + optiongroup.addoption( + "--clo-slot", + help="Specify the slot as the target cloning slot", + type=int, + default=os.environ.get("CLONE_SLOT", 2), + dest="test_clone_slot", + required=False, + ) optiongroup.addoption( "--reset", help="Reset the HSM back to its default settings with a factory" " reset.", @@ -97,6 +105,7 @@ def pytest_configure(config): logger.setLevel(config.getoption("loglevel").upper()) hsm_config["test_slot"] = config.getoption("test_slot") + hsm_config["test_clone_slot"] = config.getoption("test_clone_slot") hsm_config["user"] = config.getoption("user") hsm_config["reset"] = config.getoption("reset") diff --git a/tests/functional/test_cpv4.py b/tests/functional/test_cpv4.py new file mode 100755 index 0000000..eab47a5 --- /dev/null +++ b/tests/functional/test_cpv4.py @@ -0,0 +1,357 @@ +""" +Testcases for wrapping/unwrapping keys. +""" +import logging +from distutils.version import LooseVersion +from contextlib import contextmanager +import random + +import pytest + +from pycryptoki.session_management import c_open_session_ex, login_ex, c_close_session, c_logout +from pycryptoki.cryptoki import CK_ULONG +from pycryptoki.ca_extensions.cpv4 import ca_migrate_keys, MIGRATION_DATA +from pycryptoki.default_templates import ( + get_default_key_template, + CERTIFICATE_TEMPLATE, + DATA_TEMPLATE, + get_default_key_pair_template, +) +from pycryptoki.defines import ( + CKF_RW_SESSION, + CKF_SERIAL_SESSION, + CKF_CPV4_CONTINUE_ON_ERR, + LUNA_CRYPTOKI_ELEMENT, + CKM_DES_ECB, + CKM_DES_CBC, + CKM_DES_CBC_PAD, + CKM_DES_KEY_GEN, + CKM_DES3_ECB, + CKM_DES3_CBC, + CKM_DES3_CBC_PAD, + CKM_DES3_KEY_GEN, + CKM_AES_ECB, + CKM_AES_CBC, + CKM_AES_CBC_PAD, + CKM_AES_KEY_GEN, + CKM_CAST3_ECB, + CKM_CAST3_CBC, + CKM_CAST3_CBC_PAD, + CKM_CAST3_KEY_GEN, + CKM_CAST5_ECB, + CKM_CAST5_CBC, + CKM_CAST5_CBC_PAD, + CKM_CAST5_KEY_GEN, + CKM_RC4_KEY_GEN, + CKM_RC2_KEY_GEN, + CKM_RSA_PKCS_KEY_PAIR_GEN, + CKM_RSA_X9_31_KEY_PAIR_GEN, + CKM_SEED_ECB, + CKM_SEED_CBC, + CKM_SEED_KEY_GEN, + CKR_OK, + CKA_DECRYPT, + CKA_VERIFY, + CKA_UNWRAP, + CKM_AES_KWP, + CKA_VALUE_LEN, + CKA_EXTRACTABLE, + CKA_OUID, + LUNA_NULL_ELEMENT, + LUNA_PARAM_ELEMENT, + LUNA_CONTAINER_ACTIVATION_ELEMENT, + LUNA_MOFN_ACTIVATION_ELEMENT, + LUNA_CONTAINER_ELEMENT, + LUNA_UNKNOWN_ELEMENT, + CKA_TOKEN, + CKR_INTEGER_OVERFLOW, +) +from pycryptoki.encryption import c_wrap_key, c_unwrap_key, c_encrypt, c_decrypt +from pycryptoki.key_generator import ( + c_destroy_object, + c_generate_key, + c_generate_key_ex, + c_generate_key_pair, +) +from pycryptoki.misc import c_create_object_ex +from pycryptoki.lookup_dicts import ret_vals_dictionary +from pycryptoki.object_attr_lookup import c_get_attribute_value_ex, c_find_objects_ex +from pycryptoki.test_functions import verify_object_attributes +from . import config as hsm_config +from .util import get_session_template + +logger = logging.getLogger(__name__) + +SYM_KEYS = [ + CKM_DES_KEY_GEN, + CKM_DES3_KEY_GEN, + CKM_AES_KEY_GEN, + CKM_CAST3_KEY_GEN, + CKM_CAST5_KEY_GEN, + CKM_SEED_KEY_GEN, + CKM_RC4_KEY_GEN, + CKM_RC2_KEY_GEN, +] +ASYM_KEYS = [CKM_RSA_PKCS_KEY_PAIR_GEN] +SYM = "sym" +ASYM = "asym" +DATA = "data" +CERT_DATA = "cert_data" +USER_TYPE = 1 +SESSION_FLAGS = CKF_SERIAL_SESSION | CKF_RW_SESSION +INVALID_OBJECTS_TYPES = [ + LUNA_NULL_ELEMENT, + LUNA_PARAM_ELEMENT, + LUNA_CONTAINER_ACTIVATION_ELEMENT, + LUNA_MOFN_ACTIVATION_ELEMENT, + LUNA_CONTAINER_ELEMENT, + LUNA_UNKNOWN_ELEMENT, +] + + +@contextmanager +def get_co_auth_session(slot, user_pwd): + """opens a session and logs in as CO""" + h_session = None + try: + h_session = c_open_session_ex(slot, SESSION_FLAGS) + login_ex(h_session, slot, user_pwd, USER_TYPE) + yield h_session + finally: + if h_session: + c_logout(h_session) + c_close_session(slot) + + +@pytest.fixture(scope="class") +def source_session(hsm_configured): + """Opens a session for the source partition and logs in""" + _ = hsm_configured + slot = hsm_config["test_slot"] + with get_co_auth_session(slot, hsm_config["password"]) as session: + yield session + + +@pytest.fixture(scope="class") +def target_session(hsm_configured): + """opens a session for target partition and logs in""" + _ = hsm_configured + slot = hsm_config["test_clone_slot"] + with get_co_auth_session(slot, hsm_config["password"]) as session: + yield session + + +@contextmanager +def gen_sym_keys(source_session, key_type, num_obj=1): + """ Fixture containing keys""" + keys = [] + try: + for _ in range(num_obj): + template = get_default_key_template(key_type) + ret, key_handle = c_generate_key(source_session, key_type, template) + if ret == CKR_OK: + keys.append(key_handle) + else: + logger.info("Failed to generate key: {}\nReturn code: {}".format(key_type, ret)) + yield keys + finally: + destroy_objects(source_session, keys) + + +@contextmanager +def gen_asym_keys(source_session, key_type, num_obj=1): + """ Fixture containing all asym. keys """ + keys = [] + try: + for _ in range(num_obj): + pub_temp, prv_temp = get_default_key_pair_template(key_type) + + ret, pub_key, prv_key = c_generate_key_pair( + source_session, key_type, pub_temp, prv_temp + ) + if ret == CKR_OK: + keys += [pub_key, prv_key] + else: + logger.info("Failed to generate key: %s\nReturn code: %s", key_type, ret) + yield keys + + finally: + destroy_objects(source_session, keys) + + +@contextmanager +def create_data_object(session, data_type, num_obj=1): + template = get_session_template(data_type) + data_objects = [] + try: + for _ in range(num_obj): + h_obj = c_create_object_ex(session, template) + data_objects.append(h_obj) + yield data_objects + finally: + destroy_objects(session, data_objects) + + +OBJ_FUNC_MAP = { + SYM: (gen_sym_keys, SYM_KEYS), + ASYM: (gen_asym_keys, ASYM_KEYS), + DATA: (create_data_object, [DATA_TEMPLATE]), + CERT_DATA: (create_data_object, [CERTIFICATE_TEMPLATE]), +} + + +def destroy_objects(session, obj_list): + """common function to destroy objects""" + for obj in obj_list: + if obj is not None: + c_destroy_object(session, obj) + + +@pytest.fixture(scope="class") +def migration_flags(): + """The value of the migration flag""" + yield CKF_CPV4_CONTINUE_ON_ERR + + +def get_migration_data(objects_to_clone, object_type=LUNA_CRYPTOKI_ELEMENT): + """Prepares the migration data""" + mig_data_list = [] + for obj in objects_to_clone: + mig_data_list.append(MIGRATION_DATA(object_type=object_type, source_handle=obj)) + + return mig_data_list + + +@pytest.fixture(scope="function", params=[CERT_DATA, DATA, SYM, ASYM]) +def one_object(request, source_session): + """generates one object (pair of objects)""" + func, param = OBJ_FUNC_MAP[request.param] + with func(source_session, param[0]) as obj: + yield get_migration_data(obj) + + +@pytest.fixture(scope="function") +def invalid_object_handle(): + """invalid object handle""" + yield [MIGRATION_DATA(object_type=LUNA_CRYPTOKI_ELEMENT, source_handle=11111)] + + +@pytest.fixture(scope="function", params=INVALID_OBJECTS_TYPES) +def invalid_object_type(request, source_session): + """Create invalid object types MIGRATION_DATA""" + with create_data_object(source_session, DATA_TEMPLATE) as data_obj: + yield [MIGRATION_DATA(object_type=request.param, source_handle=data_obj[0])] + + +@pytest.fixture(scope="function") +def hundred_objects(source_session): + """generates 100 token objects""" + with gen_asym_keys(source_session, ASYM_KEYS[0], 30) as asym_keys, gen_sym_keys( + source_session, random.choice(SYM_KEYS), 40 + ) as sym_keys: + yield get_migration_data(asym_keys + sym_keys) + + +@pytest.fixture(scope="function") +def mix_objects(source_session): + """generates a mix of sym/asym keys and data and certificate objects""" + with gen_asym_keys(source_session, ASYM_KEYS[0], 5) as asym_keys, gen_sym_keys( + source_session, random.choice(SYM_KEYS), 20 + ) as sym_keys, create_data_object( + source_session, DATA_TEMPLATE, 20 + ) as data_obj, create_data_object( + source_session, CERTIFICATE_TEMPLATE, 20 + ) as cert_obj: + yield get_migration_data(asym_keys + sym_keys + data_obj + cert_obj) + + +def verify_migrated_objects( + source_session, target_session, source_objs, migrated_objs, expected_retcode=CKR_OK +): + """verifies CA_MigrateKeys result""" + src_ouids = [ + c_get_attribute_value_ex(source_session, obj.source_handle, {CKA_OUID: None})[CKA_OUID] + for obj in source_objs + ] + for rv, obj_h in migrated_objs: + + assert rv == expected_retcode + if expected_retcode == CKR_OK: + target_uid = c_get_attribute_value_ex(target_session, obj_h, {CKA_OUID: None})[CKA_OUID] + assert target_uid in src_ouids + + +@pytest.fixture(scope="function", autouse=True) +def clean_target_partition(target_session): + """finds objects in target partition and cleans them""" + yield + objs = c_find_objects_ex(target_session, {CKA_TOKEN: True}, 100) + destroy_objects(target_session, objs) + + +class TestMigrateKeys(object): + """ + Testcases for migrating keys. + """ + + def test_migrate_one_obj(self, source_session, target_session, migration_flags, one_object): + """ + Test migrating keys + """ + ret, mig_data = ca_migrate_keys( + source_session, target_session, migration_flags, len(one_object), one_object + ) + assert ret == CKR_OK + verify_migrated_objects(source_session, target_session, one_object, mig_data) + + def test_migrate_invalid_obj_handle( + self, source_session, target_session, migration_flags, invalid_object_handle + ): + """ + Test migrating keys + """ + ret, mig_data = ca_migrate_keys( + source_session, + target_session, + migration_flags, + len(invalid_object_handle), + invalid_object_handle, + ) + assert ret == 7 + + def test_migrate_invalid_obj_type( + self, source_session, target_session, migration_flags, invalid_object_type + ): + """ + Test migrating keys + """ + ret, mig_data = ca_migrate_keys( + source_session, + target_session, + migration_flags, + len(invalid_object_type), + invalid_object_type, + ) + assert ret == 5 + + def test_migrate_hundred_obj( + self, source_session, target_session, migration_flags, hundred_objects + ): + """ + Test migrating keys + """ + ret, mig_data = ca_migrate_keys( + source_session, target_session, migration_flags, len(hundred_objects), hundred_objects + ) + assert ret == CKR_OK + verify_migrated_objects(source_session, target_session, hundred_objects, mig_data) + + def test_migrate_mix_obj(self, source_session, target_session, migration_flags, mix_objects): + """ + Test migrating keys + """ + ret, mig_data = ca_migrate_keys( + source_session, target_session, migration_flags, len(mix_objects), mix_objects + ) + assert ret == CKR_OK + verify_migrated_objects(source_session, target_session, mix_objects, mig_data)