diff --git a/common/python/connectors/common/__init__.py b/common/python/connectors/common/__init__.py new file mode 100644 index 000000000..c71b194f4 --- /dev/null +++ b/common/python/connectors/common/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2020 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__all__ = [] diff --git a/common/python/connectors/common/db_helper/__init__.py b/common/python/connectors/common/db_helper/__init__.py new file mode 100644 index 000000000..c71b194f4 --- /dev/null +++ b/common/python/connectors/common/db_helper/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2020 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__all__ = [] diff --git a/common/python/connectors/common/db_helper/work_order_lmdb_helper.py b/common/python/connectors/common/db_helper/work_order_lmdb_helper.py new file mode 100644 index 000000000..265b19165 --- /dev/null +++ b/common/python/connectors/common/db_helper/work_order_lmdb_helper.py @@ -0,0 +1,164 @@ +# Copyright 2020 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import json +import logging +from error_code.error_status import WorkOrderStatus +from error_code.enclave_error import EnclaveError + +from jsonrpc.exceptions import JSONRPCDispatchException + +logger = logging.getLogger(__name__) + + +class WorkOrderLmdbHelper: + """ + WorkOrderDBHelper helps listener or other client facing modules + to interact with the kv storage for queries related to work + order processing. + """ +# ------------------------------------------------------------------------------------------------ + + def __init__(self, kv_helper): + """ + Function to perform init activity + Parameters: + - kv_helper is a object of shared kv database + """ + + self.kv_helper = kv_helper + +# --------------------------------------------------------------------------------------------- + def get_wo_result(self, wo_id): + """ + Function to get work-order result from shared kv database + Parameters: + - wo_id is the id of the work-order for which result is + requested. + Returns jrpc response as defined in EEA spec 6.1.2 + """ + + # Work order is processed if it is in wo-response table + value = self.kv_helper.get("wo-responses", wo_id) + if value: + response = json.loads(value) + if 'result' in response: + return response['result'] + + # response without a result should have an error + err_code = response["error"]["code"] + err_msg = response["error"]["message"] + if err_code == EnclaveError.ENCLAVE_ERR_VALUE: + err_code = WorkOrderStatus.INVALID_PARAMETER_FORMAT_OR_VALUE + elif err_code == EnclaveError.ENCLAVE_ERR_UNKNOWN: + err_code = WorkOrderStatus.UNKNOWN_ERROR + else: + err_code = WorkOrderStatus.FAILED + raise JSONRPCDispatchException(err_code, err_msg) + + if(self.kv_helper.get("wo-timestamps", wo_id) is not None): + # work order is yet to be processed + raise JSONRPCDispatchException( + WorkOrderStatus.PENDING, + "Work order result is yet to be updated") + + # work order not in 'wo-timestamps' table + raise JSONRPCDispatchException( + WorkOrderStatus.INVALID_PARAMETER_FORMAT_OR_VALUE, + "Work order Id not found in the database. Hence invalid parameter") + +# --------------------------------------------------------------------------------------------- + def submit_wo(self, wo_id, input_json_str): + """ + Function to submit and store a new work-order + Parameters: + - wo_id: id of work-order being submitted + - input_json_str: The actual work-order as received + from the requester + """ + + if(self.kv_helper.get("wo-timestamps", wo_id) is None): + + # Create a new work order entry. + # Don't change the order of table updation. + # The order is important for clean up if the TCS is restarted in + # the middle. + epoch_time = str(time.time()) + + # Update the tables + self.kv_helper.set("wo-timestamps", wo_id, epoch_time) + self.kv_helper.set("wo-requests", wo_id, input_json_str) + self.kv_helper.set("wo-scheduled", wo_id, input_json_str) + + raise JSONRPCDispatchException( + WorkOrderStatus.PENDING, + "Work order is computing. Please query for WorkOrderGetResult \ + to view the result") + + # Workorder id already exists + raise JSONRPCDispatchException( + WorkOrderStatus.INVALID_PARAMETER_FORMAT_OR_VALUE, + "Work order id already exists in the database. \ + Hence invalid parameter") + + def clear_a_processed_wo(self): + """ + Function to clears one(least recently added) processed + work-order from wo-processed table + """ + work_orders = self.kv_helper.lookup("wo-timestamps") + for id in work_orders: + + # If work order is processed then remove from table + if (self.kv_helper.get("wo-processed", id) is not None): + self.kv_helper.remove("wo-processed", id) + self.kv_helper.remove("wo-requests", id) + self.kv_helper.remove("wo-responses", id) + self.kv_helper.remove("wo-receipts", id) + self.kv_helper.remove("wo-timestamps", id) + return id + + def cleanup_hindered_wo(self): + """ + This function is meant to do a boot time cleanup + Returns the list of work-order to be processed and the count + """ + workorder_count = 0 + workorder_list = [] + work_orders = self.kv_helper.lookup("wo-timestamps") + for wo_id in work_orders: + + if (self.kv_helper.get("wo-scheduled", wo_id) is None and + self.kv_helper.get("wo-processing", wo_id) is None and + self.kv_helper.get("wo-processed", wo_id) is None): + + if (self.kv_helper.get("wo-requests", wo_id) is not None): + self.kv_helper.remove("wo-requests", wo_id) + + if (self.kv_helper.get("wo-responses", wo_id) is not None): + self.kv_helper.remove("wo-responses", wo_id) + + if (self.kv_helper.get("wo-receipts", wo_id) is not None): + self.kv_helper.remove("wo-receipts", wo_id) + + self.kv_helper.remove("wo-timestamps", wo_id) + + else: + # Add to the internal FIFO + workorder_list.append(wo_id) + workorder_count += 1 + + return workorder_list, workorder_count +# --------------------------------------------------------------------------------------------- diff --git a/common/python/connectors/common/db_helper/worker_encryption_key_lmdb_helper.py b/common/python/connectors/common/db_helper/worker_encryption_key_lmdb_helper.py new file mode 100644 index 000000000..36100f56a --- /dev/null +++ b/common/python/connectors/common/db_helper/worker_encryption_key_lmdb_helper.py @@ -0,0 +1,47 @@ +# Copyright 2020 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +logger = logging.getLogger(__name__) + + +class WorkerEncryptionKeyLmdbHelper: + """ + WorkerEncryptionKeyDBHelper helps listener or other client + facing modules to interact with the kv storage for queries + related to encryption key. It implements all low level db + calls which aides in the logical flows for getting/setting + encryption keys. + """ +# ------------------------------------------------------------------------------------------------ + + def __init__(self, kv_helper): + """ + Function to perform init activity + Parameters: + - kv_helper is a object of lmdb database + """ + self.kv_helper = kv_helper + +# --------------------------------------------------------------------------------------------- + def get_worker_with_id(self, worker_id): + """ + Function to get worker corresponding to supplied worker id + Parameters: + - worker_id: id of worker being looked for + Returns worker corresponding to key + """ + + return self.kv_helper.get_worker("workers", worker_id) diff --git a/common/python/connectors/common/db_helper/worker_registry_lmdb_helper.py b/common/python/connectors/common/db_helper/worker_registry_lmdb_helper.py new file mode 100644 index 000000000..8f782e5e0 --- /dev/null +++ b/common/python/connectors/common/db_helper/worker_registry_lmdb_helper.py @@ -0,0 +1,82 @@ +# Copyright 2020 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +logger = logging.getLogger(__name__) + + +class WorkerRegistryLmdbHelper: + """ + WorkerRegistryDBHelper helps listener or other client facing modules + to interact with the kv storage for queries related to worker registry. + """ +# ------------------------------------------------------------------------------------------------ + + def __init__(self, kv_helper): + """ + Function to perform init activity + Parameters: + - kv_helper is a object of lmdb database + """ + + self.kv_helper = kv_helper +# ------------------------------------------------------------------------------------------------ + + def get_worker_with_id(self, worker_id): + """ + Function to get worker corresponding to supplied worker id + Parameters: + - worker_id: id of worker being looked for + Returns worker corresponding to key + """ + return self.kv_helper.get("workers", worker_id) + +# ------------------------------------------------------------------------------------------------ + + def save_worker(self, worker_id, worker_details): + """ + Function to save a worker with given id and details + Parameters: + - worker_id: id of worker to be saved + - worker_details: Details of worker to be saved + """ + self.kv_helper.set("workers", worker_id, worker_details) + +# ------------------------------------------------------------------------------------------------ + def get_all_workers(self): + """ + Function to retrieve all workers from database + Returns a list of all workers in the 'workers' table + """ + return self.kv_helper.lookup("workers") +# ------------------------------------------------------------------------------------------------ + + def cleanup_registries(self): + """ + Function to clean up all registries from the database + """ + organisation_id = self.kv_helper.lookup("registries") + for o_id in organisation_id: + self.kv_helper.remove("registries", o_id) +# ------------------------------------------------------------------------------------------------ + + def save_registry(self, reg_id, registry_details): + """ + Function to save/create a new registry in the database + Parameters: + - reg_id: Id of registry to be saved/updated + - registry_details: Details of new/updated registry + """ + return self.kv_helper.set("registries", reg_id, registry_details) diff --git a/common/python/connectors/common/db_helper/workorder_receipt_lmdb_helper.py b/common/python/connectors/common/db_helper/workorder_receipt_lmdb_helper.py new file mode 100644 index 000000000..7d40a8390 --- /dev/null +++ b/common/python/connectors/common/db_helper/workorder_receipt_lmdb_helper.py @@ -0,0 +1,180 @@ +# Copyright 2020 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +from error_code.error_status import ReceiptCreateStatus, SignatureStatus,\ + JRPCErrorCodes +from jsonrpc.exceptions import JSONRPCDispatchException + +logger = logging.getLogger(__name__) + + +class WorkOrderReceiptLmdbHelper: + """ + WorkOrderReceiptDBHelper helps listener or other client facing modules + to interact with the kv storage for queries related to work order + receipts' processing. + """ +# ----------------------------------------------------------------------------- + + def __init__(self, kv_helper): + """ + Function to perform init activity + Parameters: + - kv_helper is a object of lmdb database + """ + # Special index 0xFFFFFFFF value to fetch last update to receipt + self.LAST_RECEIPT_INDEX = 1 << 32 + self.kv_helper = kv_helper + +# ----------------------------------------------------------------------------- + def get_wo_req(self, wo_id): + """ + Function to get a work-order request + Parameters: + - wo_id: work-order id + Return the work-order request corresponding to the id + """ + return self.kv_helper.get("wo-requests", wo_id) + +# ----------------------------------------------------------------------------- + + def get_wo_receipt(self, wo_id): + """ + Function to get a work-order receipt + Parameters: + - wo_id: work-order id + Return the work-order receipt corresponding to the id + """ + return self.kv_helper.get("wo-receipts", wo_id) + +# ----------------------------------------------------------------------------- + + def save_wo_receipt(self, wo_id, input_str): + """ + Function to save a work-order receipt in the database + Parameters: + - wo_id: id of work-order + - input_str: the actual receipt + """ + self.kv_helper.set("wo-receipts", wo_id, input_str) + +# ----------------------------------------------------------------------------- + + def get_wo_receipt_update(self, wo_id): + """ + Function to get work-order receipt updates + Parameters: + - wo_id: id of work-order for which receipt updates are required + Returns the receipt update for corresponding id + """ + return self.kv_helper.get("wo-receipt-updates", wo_id) + +# ----------------------------------------------------------------------------- + + def save_receipt_update(self, wo_id, updated_receipt): + """ + Function to save work-order receipt updates + Parameters: + - wo_id: id of work-order for which receipt updates are intended + - updated_receipt: the content of the updated receipt to be stored + """ + self.kv_helper.set("wo-receipt-updates", wo_id, updated_receipt) + +# ----------------------------------------------------------------------------- + + def retrieve_wo_receipt(self, wo_id): + """ + Function to retrieve details of work-order receipt + Parameters: + - wo_id: id of work-order for which receipt details + are required + Returns details of a receipt + """ + value = self.kv_helper.get("wo-receipts", wo_id) + if value: + receipt = json.loads(value) + receipt_updates = self.kv_helper.get("wo-receipt-updates", wo_id) + if receipt_updates is None: + receipt["params"]["receiptCurrentStatus"] = \ + receipt["params"]["receiptCreateStatus"] + else: + receipt_updates_json = json.loads(receipt_updates) + # Get the recent update to receipt + last_receipt = receipt_updates_json[len(receipt_updates_json) + - 1] + receipt["params"]["receiptCurrentStatus"] = \ + last_receipt["updateType"] + return receipt["params"] + else: + raise JSONRPCDispatchException( + JRPCErrorCodes.INVALID_PARAMETER_FORMAT_OR_VALUE, + "Work order receipt for work order id {} not found in the " + "database. Hence invalid parameter".format( + wo_id + )) + +# ----------------------------------------------------------------------------- + + def retrieve_wo_receipt_update(self, wo_id, update_index, updater_id): + + """ + Function to retrieve a specific(by index) update for a specific + updater-id for a work-order id + Parameters: + - wo_id: id of work-order for which the receipt is being + queried + - update_index: index of update for this specific receipt + - updater_id: id of updater + Returns + """ + receipt_updates = self.kv_helper.get("wo-receipt-updates", wo_id) + + if receipt_updates: + receipt_updates_json = json.loads(receipt_updates) + total_updates = len(receipt_updates_json) + if update_index <= 0: + raise JSONRPCDispatchException( + JRPCErrorCodes.INVALID_PARAMETER_FORMAT_OR_VALUE, + "Update index should be positive non-zero number." + " Hence invalid parameter") + elif update_index > total_updates: + if update_index == self.LAST_RECEIPT_INDEX: + # set to the index of last update to receipt + update_index = total_updates - 1 + else: + raise JSONRPCDispatchException( + JRPCErrorCodes.INVALID_PARAMETER_FORMAT_OR_VALUE, + "Update index is larger than total update count." + " Hence invalid parameter") + else: + # If the index is less than total updates + # then decrement by one since it is zero based array + update_index = update_index - 1 + update_to_receipt = receipt_updates_json[update_index] + # If updater id is present then check whether it matches + if updater_id: + if update_to_receipt["updaterId"] != updater_id: + raise JSONRPCDispatchException( + JRPCErrorCodes.INVALID_PARAMETER_FORMAT_OR_VALUE, + "Update index and updater id doesn't match." + " Hence invalid parameter") + update_to_receipt["updateCount"] = total_updates + return update_to_receipt + else: + raise JSONRPCDispatchException( + JRPCErrorCodes.INVALID_PARAMETER_FORMAT_OR_VALUE, + "There is no updates available to this receipt." + " Hence invalid parameter") diff --git a/listener/avalon_listener/tcs_work_order_handler.py b/listener/avalon_listener/tcs_work_order_handler.py index 7e4f88f68..8dbd5a04a 100644 --- a/listener/avalon_listener/tcs_work_order_handler.py +++ b/listener/avalon_listener/tcs_work_order_handler.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time import json import logging from error_code.error_status import WorkOrderStatus @@ -21,6 +20,8 @@ from avalon_sdk.work_order.work_order_request_validator \ import WorkOrderRequestValidator from utility.hex_utils import is_valid_hex_str +from connectors.common.db_helper.work_order_lmdb_helper \ + import WorkOrderLmdbHelper from jsonrpc.exceptions import JSONRPCDispatchException @@ -51,7 +52,7 @@ def __init__(self, kv_helper, max_wo_count): - kv_helper is a object of lmdb database """ - self.kv_helper = kv_helper + self.db_helper = WorkOrderLmdbHelper(kv_helper) self.workorder_count = 0 self.max_workorder_count = max_wo_count self.workorder_list = [] @@ -64,31 +65,10 @@ def __work_order_handler_on_boot(self): Function to perform on-boot process of work order handler """ - work_orders = self.kv_helper.lookup("wo-timestamps") - for wo_id in work_orders: + self.workorder_list, self.workorder_count = \ + self.db_helper.cleanup_hindered_wo() - if(self.kv_helper.get("wo-scheduled", wo_id) is None and - self.kv_helper.get("wo-processing", wo_id) is None and - self.kv_helper.get("wo-processed", wo_id) is None): - - if(self.kv_helper.get("wo-requests", wo_id) is not None): - self.kv_helper.remove("wo-requests", wo_id) - - if(self.kv_helper.get("wo-responses", wo_id) is not None): - self.kv_helper.remove("wo-responses", wo_id) - - if(self.kv_helper.get("wo-receipts", wo_id) is not None): - self.kv_helper.remove("wo-receipts", wo_id) - - # TODO: uncomment after fixing lmbd error - self.kv_helper.remove("wo-timestamps", wo_id) - - else: - # Add to the internal FIFO - self.workorder_list.append(wo_id) - self.workorder_count += 1 - -# --------------------------------------------------------------------------------------------- + # --------------------------------------------------------------------------------------------- def WorkOrderGetResult(self, **params): """ Function to process work order get result @@ -105,35 +85,7 @@ def WorkOrderGetResult(self, **params): JsonRpcErrorCode.INVALID_PARAMETER, "Invalid work order Id" ) - - # Work order is processed if it is in wo-response table - value = self.kv_helper.get("wo-responses", wo_id) - if value: - response = json.loads(value) - if 'result' in response: - return response['result'] - - # response without a result should have an error - err_code = response["error"]["code"] - err_msg = response["error"]["message"] - if err_code == EnclaveError.ENCLAVE_ERR_VALUE: - err_code = WorkOrderStatus.INVALID_PARAMETER_FORMAT_OR_VALUE - elif err_code == EnclaveError.ENCLAVE_ERR_UNKNOWN: - err_code = WorkOrderStatus.UNKNOWN_ERROR - else: - err_code = WorkOrderStatus.FAILED - raise JSONRPCDispatchException(err_code, err_msg) - - if(self.kv_helper.get("wo-timestamps", wo_id) is not None): - # work order is yet to be processed - raise JSONRPCDispatchException( - WorkOrderStatus.PENDING, - "Work order result is yet to be updated") - - # work order not in 'wo-timestamps' table - raise JSONRPCDispatchException( - WorkOrderStatus.INVALID_PARAMETER_FORMAT_OR_VALUE, - "Work order Id not found in the database. Hence invalid parameter") + return self.db_helper.get_wo_result(wo_id) # --------------------------------------------------------------------------------------------- def WorkOrderSubmit(self, **params): @@ -182,20 +134,10 @@ def WorkOrderSubmit(self, **params): if((self.workorder_count + 1) > self.max_workorder_count): # if max count reached clear a processed entry - work_orders = self.kv_helper.lookup("wo-timestamps") - for id in work_orders: - - # If work order is processed then remove from table - if(self.kv_helper.get("wo-processed", id) is not None): - self.kv_helper.remove("wo-processed", id) - self.kv_helper.remove("wo-requests", id) - self.kv_helper.remove("wo-responses", id) - self.kv_helper.remove("wo-receipts", id) - self.kv_helper.remove("wo-timestamps", id) - - self.workorder_list.remove(id) - self.workorder_count -= 1 - break + removed_wo = self.db_helper.clear_a_processed_wo() + if(removed_wo is not None): + self.workorder_list.remove() + self.workorder_count -= 1 # If no work order is processed then return busy if((self.workorder_count + 1) > self.max_workorder_count): @@ -203,31 +145,6 @@ def WorkOrderSubmit(self, **params): WorkOrderStatus.BUSY, "Work order handler is busy updating the result") - if(self.kv_helper.get("wo-timestamps", wo_id) is None): - - # Create a new work order entry. - # Don't change the order of table updation. - # The order is important for clean up if the TCS is restarted in - # the middle. - epoch_time = str(time.time()) - - # Update the tables - self.kv_helper.set("wo-timestamps", wo_id, epoch_time) - self.kv_helper.set("wo-requests", wo_id, input_json_str) - self.kv_helper.set("wo-scheduled", wo_id, input_json_str) - # Add to the internal FIFO - self.workorder_list.append(wo_id) - self.workorder_count += 1 - - raise JSONRPCDispatchException( - WorkOrderStatus.PENDING, - "Work order is computing. Please query for WorkOrderGetResult \ - to view the result") - - # Workorder id already exists - raise JSONRPCDispatchException( - WorkOrderStatus.INVALID_PARAMETER_FORMAT_OR_VALUE, - "Work order id already exists in the database. \ - Hence invalid parameter") + self.db_helper.submit_wo(wo_id, input_json_str) # --------------------------------------------------------------------------------------------- diff --git a/listener/avalon_listener/tcs_worker_encryption_key_handler.py b/listener/avalon_listener/tcs_worker_encryption_key_handler.py index bf5cb7e32..7fdf18ed2 100644 --- a/listener/avalon_listener/tcs_worker_encryption_key_handler.py +++ b/listener/avalon_listener/tcs_worker_encryption_key_handler.py @@ -18,6 +18,8 @@ from error_code.error_status import WorkerError from jsonrpc.exceptions import JSONRPCDispatchException +from connectors.common.db_helper.worker_encryption_key_lmdb_helper \ + import WorkerEncryptionKeyLmdbHelper logger = logging.getLogger(__name__) # No of bytes of encryptionKeyNonce to encrypt data @@ -48,7 +50,7 @@ def __init__(self, kv_helper): - kv_helper is a object of lmdb database """ - self.kv_helper = kv_helper + self.db_helper = WorkerEncryptionKeyLmdbHelper(kv_helper) # --------------------------------------------------------------------------------------------- def EncryptionKeySet(self, **params): @@ -73,7 +75,7 @@ def EncryptionKeyGet(self, **params): """ worker_id = str(params['workerId']) - value = self.kv_helper.get("workers", worker_id) + value = self.db_helper.get_worker_with_id(worker_id) if value is None: raise JSONRPCDispatchException( diff --git a/listener/avalon_listener/tcs_worker_registry_handler.py b/listener/avalon_listener/tcs_worker_registry_handler.py index 1db2b50e8..221d86a8a 100644 --- a/listener/avalon_listener/tcs_worker_registry_handler.py +++ b/listener/avalon_listener/tcs_worker_registry_handler.py @@ -19,6 +19,9 @@ from jsonrpc.exceptions import JSONRPCDispatchException +from connectors.common.db_helper.worker_registry_lmdb_helper \ + import WorkerRegistryLmdbHelper + logger = logging.getLogger(__name__) @@ -51,7 +54,7 @@ def __init__(self, kv_helper): - kv_helper is a object of lmdb database """ - self.kv_helper = kv_helper + self.db_helper = WorkerRegistryLmdbHelper(kv_helper) self.worker_pool = [] self.__worker_registry_handler_on_boot() # ------------------------------------------------------------------------------------------------ @@ -62,13 +65,11 @@ def __worker_registry_handler_on_boot(self): """ worker_list = [] - worker_list = self.kv_helper.lookup("workers") + worker_list = self.db_helper.get_all_workers() # Initial Worker details are loaded self.worker_pool = worker_list - organisation_id = self.kv_helper.lookup("registries") - for o_id in organisation_id: - self.kv_helper.remove("registries", o_id) + self.db_helper.cleanup_registries() # Adding a new entry that corresponds to this handler, its URI, etc. # json with byte32 orgID, string uri, byte32 scAddr, @@ -83,7 +84,7 @@ def __worker_registry_handler_on_boot(self): response = {} response['id'] = "regid" - set_response = self.kv_helper.set("registries", "regid", value) + set_response = self.db_helper.save_registry("regid", value) if set_response: response['result'] = "WorkerRegistryHandleronBoot Successful" response['error'] = {} @@ -92,12 +93,12 @@ def __worker_registry_handler_on_boot(self): response['error'] = {} response['error']['code'] = WorkerError.UNKNOWN_ERROR response['error']['message'] = 'Unknown Error occurred during' + \ - 'worker registry handler boot up' + 'worker registry handler boot up' return response -# ------------------------------------------------------------------------------------------------ # ------------------------------------------------------------------------------------------------ + def WorkerRegister(self, **params): """ Function to register a new worker to the enclave @@ -107,8 +108,8 @@ def WorkerRegister(self, **params): """ worker_id = must_get_worker_id(params) - - if(self.kv_helper.get("workers", worker_id) is not None): + worker = self.db_helper.get_worker_with_id(worker_id) + if (worker is not None): raise JSONRPCDispatchException( WorkerError.INVALID_PARAMETER_FORMAT_OR_VALUE, "Worker Id already exists in the database." + @@ -118,7 +119,7 @@ def WorkerRegister(self, **params): params["status"] = WorkerStatus.ACTIVE input_json_str = json.dumps(params) - self.kv_helper.set("workers", worker_id, input_json_str) + self.db_helper.save_worker(worker_id, input_json_str) raise JSONRPCDispatchException( WorkerError.SUCCESS, "Successfully Registered") @@ -154,7 +155,7 @@ def WorkerSetStatus(self, **params): worker_id = must_get_worker_id(params) - value = self.kv_helper.get("workers", worker_id) + value = self.db_helper.get_worker_with_id(worker_id) if value: json_dict = json.loads(value) if not self.__validate_input_worker_status(params): @@ -164,7 +165,7 @@ def WorkerSetStatus(self, **params): json_dict['status'] = params['status'] value = json.dumps(json_dict) - self.kv_helper.set("workers", worker_id, value) + self.db_helper.save_worker(worker_id, value) raise JSONRPCDispatchException( WorkerError.SUCCESS, @@ -177,7 +178,7 @@ def WorkerSetStatus(self, **params): # ------------------------------------------------------------------------------------------------ def __lookup_basic(self, is_lookup_next, params): # sync the work pool to that of DB - self.worker_pool = self.kv_helper.lookup("workers") + self.worker_pool = self.db_helper.get_all_workers() total_count = 0 ids = [] @@ -190,7 +191,7 @@ def __lookup_basic(self, is_lookup_next, params): continue matched = True - value = self.kv_helper.get("workers", worker_id) + value = self.db_helper.get_worker_with_id(worker_id) if value: worker = json.loads(value) criteria = ["workerType", "organizationId", @@ -250,7 +251,7 @@ def WorkerRetrieve(self, **params): worker_id = must_get_worker_id(params) # value retrieved is 'result' field as per Spec 5.3.8 Worker Retrieve # Response Payload - value = self.kv_helper.get("workers", worker_id) + value = self.db_helper.get_worker_with_id(worker_id) if value is None: raise JSONRPCDispatchException( @@ -282,7 +283,7 @@ def WorkerUpdate(self, **params): # value retrieved is 'result' field as per Spec 5.3.8 Worker Retrieve # Response Payload - value = self.kv_helper.get("workers", worker_id) + value = self.db_helper.get_worker_with_id(worker_id) if value is None: raise JSONRPCDispatchException( @@ -295,7 +296,7 @@ def WorkerUpdate(self, **params): json_dict["details"][item] = worker_details[item] value = json.dumps(json_dict) - self.kv_helper.set("workers", worker_id, value) + self.db_helper.save_worker(worker_id, value) raise JSONRPCDispatchException( WorkerError.SUCCESS, "Successfully Updated") diff --git a/listener/avalon_listener/tcs_workorder_receipt_handler.py b/listener/avalon_listener/tcs_workorder_receipt_handler.py index 4f49c5c73..eb62e85b8 100644 --- a/listener/avalon_listener/tcs_workorder_receipt_handler.py +++ b/listener/avalon_listener/tcs_workorder_receipt_handler.py @@ -22,6 +22,9 @@ import crypto_utils.signature as signature from jsonrpc.exceptions import JSONRPCDispatchException +from connectors.common.db_helper.workorder_receipt_lmdb_helper \ + import WorkOrderReceiptLmdbHelper + logger = logging.getLogger(__name__) @@ -42,10 +45,8 @@ def __init__(self, kv_helper): - kv_helper is a object of lmdb database """ - self.kv_helper = kv_helper + self.db_helper = WorkOrderReceiptLmdbHelper(kv_helper) self.__workorder_receipt_on_boot() - # Special index 0xFFFFFFFF value to fetch last update to receipt - self.LAST_RECEIPT_INDEX = 1 << 32 # Supported hashing and signing algorithms self.SIGNING_ALGORITHM = "SECP256K1" self.HASHING_ALGORITHM = "SHA-256" @@ -73,7 +74,7 @@ def WorkOrderReceiptCreate(self, **params): input_json_str = params["raw"] input_value = json.loads(input_json_str) - wo_request = self.kv_helper.get("wo-requests", wo_id) + wo_request = self.db_helper.get_wo_req(wo_id) if wo_request is None: raise JSONRPCDispatchException( JRPCErrorCodes.INVALID_PARAMETER_FORMAT_OR_VALUE, @@ -81,13 +82,13 @@ def WorkOrderReceiptCreate(self, **params): "hence invalid request" ) else: - wo_receipt = self.kv_helper.get("wo-receipts", wo_id) + wo_receipt = self.db_helper.get_wo_receipt(wo_id) if wo_receipt is None: status, err_msg = \ self.__validate_work_order_receipt_create_req( input_value, wo_request) if status is True: - self.kv_helper.set("wo-receipts", wo_id, input_json_str) + self.db_helper.save_wo_receipt(wo_id, input_json_str) raise JSONRPCDispatchException( JRPCErrorCodes.SUCCESS, "Receipt created successfully" @@ -104,7 +105,7 @@ def WorkOrderReceiptCreate(self, **params): "Hence invalid parameter" ) -# ----------------------------------------------------------------------------- + # ----------------------------------------------------------------------------- def __validate_work_order_receipt_create_req(self, wo_receipt_req, wo_request): @@ -179,7 +180,7 @@ def WorkOrderReceiptUpdate(self, **params): input_value = json.loads(input_json_str) # Check if receipt for work order id is created or not - value = self.kv_helper.get("wo-receipts", wo_id) + value = self.db_helper.get_wo_receipt(wo_id) if value: # Receipt is created, validate update receipt request @@ -188,7 +189,7 @@ def WorkOrderReceiptUpdate(self, **params): if status is True: # Load previous updates to receipt updates_to_receipt = \ - self.kv_helper.get("wo-receipt-updates", wo_id) + self.db_helper.get_wo_receipt_update(wo_id) # If it is first update to receipt if updates_to_receipt is None: updated_receipt = [] @@ -220,8 +221,8 @@ def WorkOrderReceiptUpdate(self, **params): " is not allowed" ) updated_receipt.append(input_value) - self.kv_helper.set("wo-receipt-updates", wo_id, - json.dumps(updated_receipt)) + self.db_helper.save_receipt_update(wo_id, + json.dumps(updated_receipt)) raise JSONRPCDispatchException( JRPCErrorCodes.SUCCESS, "Receipt updated successfully" @@ -366,36 +367,17 @@ def WorkOrderReceiptLookUpNext(self, **params): def WorkOrderReceiptRetrieve(self, **params): """ - Function to retrieve the details of worker + Function to retrieve the details of work-order receipt Parameters: - - params is variable-length arugment list containing work order - receipt request request as defined in EEA spec 7.2.4 + - params is variable-length argument list containing work order + receipt request as defined in EEA spec 7.2.4 Returns jrpc response as defined in 7.2.5 """ wo_id = params["workOrderId"] - - value = self.kv_helper.get("wo-receipts", wo_id) - if value: - receipt = json.loads(value) - receipt_updates = self.kv_helper.get("wo-receipt-updates", wo_id) - if receipt_updates is None: - receipt["params"]["receiptCurrentStatus"] = \ - receipt["params"]["receiptCreateStatus"] - else: - receipt_updates_json = json.loads(receipt_updates) - # Get the recent update to receipt - last_receipt = receipt_updates_json[len(receipt_updates_json) - - 1] - receipt["params"]["receiptCurrentStatus"] = \ - last_receipt["updateType"] - return receipt["params"] - else: - raise JSONRPCDispatchException( - JRPCErrorCodes.INVALID_PARAMETER_FORMAT_OR_VALUE, - "Work order receipt for work order id {} not found in the " - "database. Hence invalid parameter".format( - wo_id - )) + try: + return self.db_helper.retrieve_wo_receipt(wo_id) + except JSONRPCDispatchException: + raise # ----------------------------------------------------------------------------- @@ -403,7 +385,7 @@ def WorkOrderReceiptUpdateRetrieve(self, **params): """ Function to retrieve the update to work order receipt Parameters: - - params is variable-length arugment list containing work order + - params is variable-length argument list containing work order update retrieve request as defined in EEA spec 7.2.6 Returns: Jrpc response as defined in EEA spec 7.2.7 @@ -420,41 +402,9 @@ def WorkOrderReceiptUpdateRetrieve(self, **params): # starts from 1 update_index = input_params["updateIndex"] # Load list of updates to the receipt - receipt_updates = self.kv_helper.get("wo-receipt-updates", wo_id) - - if receipt_updates: - receipt_updates_json = json.loads(receipt_updates) - total_updates = len(receipt_updates_json) - if update_index <= 0: - raise JSONRPCDispatchException( - JRPCErrorCodes.INVALID_PARAMETER_FORMAT_OR_VALUE, - "Update index should be positive non-zero number." - " Hence invalid parameter") - elif update_index > total_updates: - if update_index == self.LAST_RECEIPT_INDEX: - # set to the index of last update to receipt - update_index = total_updates - 1 - else: - raise JSONRPCDispatchException( - JRPCErrorCodes.INVALID_PARAMETER_FORMAT_OR_VALUE, - "Update index is larger than total update count." - " Hence invalid parameter") - else: - # If the index is less than total updates - # then decrement by one since it is zero based array - update_index = update_index - 1 - update_to_receipt = receipt_updates_json[update_index] - # If updater id is present then check whether it matches - if updater_id: - if update_to_receipt["updaterId"] != updater_id: - raise JSONRPCDispatchException( - JRPCErrorCodes.INVALID_PARAMETER_FORMAT_OR_VALUE, - "Update index and updater id doesn't match" - " Hence invalid parameter") - update_to_receipt["updateCount"] = total_updates - return update_to_receipt - else: - raise JSONRPCDispatchException( - JRPCErrorCodes.INVALID_PARAMETER_FORMAT_OR_VALUE, - "There is no updates available to this receipt" - " Hence invalid parameter") + try: + return self.db_helper.retrieve_wo_receipt_update(wo_id, + update_index, + updater_id) + except JSONRPCDispatchException: + raise