diff --git a/common/python/exception/__init__.py b/common/python/exception/__init__.py new file mode 100644 index 000000000..f046bc81c --- /dev/null +++ b/common/python/exception/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2020 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +all = [] diff --git a/common/python/exception/avalon_exceptions.py b/common/python/exception/avalon_exceptions.py new file mode 100644 index 000000000..514467c48 --- /dev/null +++ b/common/python/exception/avalon_exceptions.py @@ -0,0 +1,27 @@ +# Copyright 2020 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class AvalonException(Exception): + """ + Abstract exception class for Avalon exceptions. + """ + pass + + +class WorkerRestartException(AvalonException): + """ + Exception class which indicates that a worker has restarted. + """ + pass diff --git a/enclave_manager/avalon_enclave_manager/work_order_processor_manager.py b/enclave_manager/avalon_enclave_manager/work_order_processor_manager.py index 554c6e1a1..48004bcfe 100644 --- a/enclave_manager/avalon_enclave_manager/work_order_processor_manager.py +++ b/enclave_manager/avalon_enclave_manager/work_order_processor_manager.py @@ -24,6 +24,7 @@ from abc import abstractmethod from error_code.error_status import WorkOrderStatus from avalon_enclave_manager.base_enclave_manager import EnclaveManager +from exception.avalon_exceptions import WorkerRestartException logger = logging.getLogger(__name__) @@ -36,6 +37,7 @@ class WOProcessorManager(EnclaveManager): def __init__(self, config): super().__init__(config) self._identity = None + self._is_restart_required = False # ------------------------------------------------------------------------- @@ -99,7 +101,15 @@ def _process_work_order_by_id(self, wo_id): Returns : wo_resp - A JSON response of the executed work order """ + try: + if self._is_restart_required: + self._kv_helper.csv_prepend("wo-worker-scheduled", + self._worker_id, wo_id) + logger.info("Reinstating work order {} to ".format(wo_id) + + "wo-worker-scheduled; This worker will restart.") + raise WorkerRestartException("This worker needs to restart.") + # Get JSON workorder request corresponding to wo_id wo_json_req = self._kv_helper.get("wo-requests", wo_id) if wo_json_req is None: @@ -107,6 +117,8 @@ def _process_work_order_by_id(self, wo_id): "to id %s from wo-requests table", wo_id) return None + except WorkerRestartException: + raise except Exception as e: logger.error("Problem while reading the work order %s " "from wo-requests table", wo_id) @@ -126,9 +138,22 @@ def _process_work_order_by_id(self, wo_id): # Execute work order request - logger.info("Execute workorder with id %s", wo_id) - wo_json_resp = self._execute_work_order(wo_json_req) - wo_resp = json.loads(wo_json_resp) + try: + logger.info("Execute workorder with id %s", wo_id) + wo_json_resp = self._execute_work_order(wo_json_req) + wo_resp = json.loads(wo_json_resp) + except WorkerRestartException: + self._kv_helper.csv_prepend("wo-worker-scheduled", + self._worker_id, wo_id) + logger.info("Reinstating work order {} to ".format(wo_id) + + "wo-worker-scheduled as this worker will restart.") + raise + except Exception as e: + logger.error("Exception while processing work order: {}".format(e)) + self._persist_wo_response_to_db( + wo_id, WorkOrderStatus.FAILED, None, + "Exception during work order processing") + raise logger.info("Update workorder receipt for workorder %s", wo_id) self._wo_kv_delegate.update_receipt(wo_id, wo_resp) @@ -175,21 +200,19 @@ def _execute_work_order(self, input_json_str): """ try: wo_response = self._execute_wo_in_trusted_enclave(input_json_str) - try: - json_response = json.dumps(wo_response, indent=4) - except Exception as err: - wo_response["Response"] = dict() - logger.error("ERROR: Failed to serialize JSON; %s", str(err)) - wo_response["Response"]["Status"] = WorkOrderStatus.FAILED - wo_response["Response"]["Message"] = "Failed to serialize JSON" - json_response = json.dumps(wo_response) - + json_response = json.dumps(wo_response, indent=4) + + except TypeError as err: + logger.error("Failed to serialize response JSON - %s", str(err)) + return jrpc_utility.create_error_response( + WorkOrderStatus.FAILED, "0", + "Failed to serialize response JSON") + except WorkerRestartException: + raise except Exception as e: - wo_response["Response"] = dict() - logger.error("failed to execute work order; %s", str(e)) - wo_response["Response"]["Status"] = WorkOrderStatus.FAILED - wo_response["Response"]["Message"] = str(e) - json_response = json.dumps(wo_response) + logger.error("Failed to execute work order - %s", str(e)) + return jrpc_utility.create_error_response( + WorkOrderStatus.FAILED, "0", "Failed to execute work order") return json_response @@ -297,6 +320,8 @@ def _start_polling_kvstore(self): logger.info("Enclave manager sleeping for %d secs", sleep_interval) time.sleep(sleep_interval) + except WorkerRestartException: + raise except Exception as inst: logger.error("Error while processing work-order; " + "shutting down enclave manager") @@ -353,6 +378,8 @@ def _start_zmq_listener(self): str(wo_id)) else: socket.send_string("Work order processed: " + str(wo_id)) + except WorkerRestartException: + raise except Exception as inst: logger.error("Error while processing work-order; " + "shutting down enclave manager") diff --git a/enclave_manager/avalon_enclave_manager/wpe/wpe_enclave_manager.py b/enclave_manager/avalon_enclave_manager/wpe/wpe_enclave_manager.py index ff238ba6d..062f8e110 100644 --- a/enclave_manager/avalon_enclave_manager/wpe/wpe_enclave_manager.py +++ b/enclave_manager/avalon_enclave_manager/wpe/wpe_enclave_manager.py @@ -21,6 +21,7 @@ import os import sys +import utility.jrpc_utility as jrpc_utility import avalon_enclave_manager.sgx_work_order_request as work_order_request import avalon_enclave_manager.wpe.wpe_enclave as enclave import avalon_enclave_manager.wpe.wpe_enclave_info as enclave_info @@ -29,6 +30,9 @@ from error_code.error_status import WorkOrderStatus from avalon_enclave_manager.work_order_processor_manager \ import WOProcessorManager +from exception.avalon_exceptions import WorkerRestartException +from multiprocessing import Process + logger = logging.getLogger(__name__) @@ -128,9 +132,13 @@ def _execute_wo_in_trusted_enclave(self, input_json_str): try: pre_proc_output = self._wpe_requester\ .preprocess_work_order(input_json_str, self.encryption_key) - if "error" in pre_proc_output: + if pre_proc_output is None or "error" in pre_proc_output: # If error in preprocessing response, skip workorder processing - logger.error("Failed to preprocess at WPE enclave manager.") + logger.error("Failed to preprocess work order request.") + self._check_for_re_register() + if self._is_restart_required: + raise WorkerRestartException( + "This worker needs to restart.") return pre_proc_output wo_request = work_order_request.SgxWorkOrderRequest( @@ -138,15 +146,58 @@ def _execute_wo_in_trusted_enclave(self, input_json_str): input_json_str, pre_proc_output) wo_response = wo_request.execute() + except WorkerRestartException: + raise except Exception as e: - logger.error("failed to execute work order; %s", str(e)) - wo_response = dict() - wo_response["error"] = dict() - wo_response["error"]["code"] = WorkOrderStatus.FAILED - wo_response["error"]["message"] = str(e) - logger.info("unknown enclave type response = %s", wo_response) + logger.error("Failed to execute work order; %s", str(e)) + wo_response = jrpc_utility.create_error_response( + WorkOrderStatus.FAILED, "0", + "Failed to execute work order : {}".format(str(e))) return wo_response +# ------------------------------------------------------------------------- + + def _check_for_re_register(self): + """ + This function verifies worker details in this instance against that + in database to see if a worker update has taken place. If so, it + enables a flag to re-register this WPE with the KME before next work + order is picked up for processing. + """ + wpes_csv = self._kv_helper.get("worker-pool", self._worker_id) + wpes = [] if wpes_csv is None else wpes_csv.split(",") + if self._identity not in wpes: + # If identity is not found in worker-pool, the KME might have + # restarted in which case the WPEs need to register again. + logger.info("WPE needs to restart and register with KME again.") + self._is_restart_required = True + +# ------------------------------------------------------------------------- + + +def run_wpe(config): + """ + Delegate method that spawns up as a new process and runs the WPE Enclave + Manager. + + Parameters: + @param config - A map of configurations read from file/command line + """ + try: + logger.info("Initialize WorkOrderProcessor enclave_manager") + enclave_manager = WorkOrderProcessorEnclaveManager(config) + logger.info("About to start WorkOrderProcessor Enclave manager") + enclave_manager.start_enclave_manager() + except WorkerRestartException as ex: + logger.error("Exception occurred while processing work orders.") + logger.error(ex) + logger.info("Will try to restart WPE.") + except Exception as e: + logger.error(e) + logger.error("Exception occurred while running WPE, " + + "exiting WPE enclave manager") + sys.exit(1) + # ------------------------------------------------------------------------- @@ -195,16 +246,14 @@ def main(args=None): sys.stderr = plogger.stream_to_logger( logging.getLogger("STDERR"), logging.WARN) - try: - EnclaveManager.parse_command_line(config, remainder) - logger.info("Initialize WorkOrderProcessor enclave_manager") - enclave_manager = WorkOrderProcessorEnclaveManager(config) - logger.info("About to start WorkOrderProcessor Enclave manager") - enclave_manager.start_enclave_manager() - except Exception as e: - logger.error("Exception occurred while running WPE, " + - "exiting WPE enclave manager") - exit(1) + EnclaveManager.parse_command_line(config, remainder) + + while True: + wpe_manager = Process(target=run_wpe, args=(config,)) + wpe_manager.start() + wpe_manager.join() + if wpe_manager.exitcode == 1: + sys.exit(1) main() diff --git a/enclave_manager/avalon_enclave_manager/wpe/wpe_requester.py b/enclave_manager/avalon_enclave_manager/wpe/wpe_requester.py index 4c6b1752c..6b9bc0a31 100644 --- a/enclave_manager/avalon_enclave_manager/wpe/wpe_requester.py +++ b/enclave_manager/avalon_enclave_manager/wpe/wpe_requester.py @@ -178,7 +178,7 @@ def preprocess_work_order(self, wo_request, encryption_key): @param encryption_key - WPE's public encryption key Returns : @returns result - Result from KME that includes the workorder - key info. error response, in case of failure. + key info. Error response, in case of failure. """ workload_id = "kme-preprocess" in_data = [wo_request, encryption_key] diff --git a/tc/sgx/trusted_worker_manager/enclave/kme/ext_work_order_info_kme.h b/tc/sgx/trusted_worker_manager/enclave/kme/ext_work_order_info_kme.h index ec22dea24..0e511a557 100644 --- a/tc/sgx/trusted_worker_manager/enclave/kme/ext_work_order_info_kme.h +++ b/tc/sgx/trusted_worker_manager/enclave/kme/ext_work_order_info_kme.h @@ -34,11 +34,12 @@ enum KmeRegistrationStatus { ERR_MRSIGNER_NOT_MATCH = 4, /// WPE MRSIGNER value not matched ERR_WPE_VERIFICATION_FAILED = 5, /// WPE attestation report verification failed ERR_ENCRYPTION_KEY_NOT_MATCH = 6, /// WPE encryption hash value didn't matched - ERR_UNIQUE_ID_NOT_MATCH = 7 /// WPE unique id didn't match + ERR_UNIQUE_ID_NOT_MATCH = 7, /// WPE unique id didn't match + ERR_WPE_KEY_INFO_CREATION_FAILED = 8 /// CreateWorkOrderKeyInfo for WPE failed }; enum KmePreProcessStatus { - ERR_WPE_MAX_WO_COUNT_REACHED = 1 + ERR_WPE_MAX_WO_COUNT_REACHED = 10 }; class ExtWorkOrderInfoKME : public ExtWorkOrderInfoImpl { diff --git a/tc/sgx/trusted_worker_manager/enclave/kme/workload/kme_workload_plug-in.cpp b/tc/sgx/trusted_worker_manager/enclave/kme/workload/kme_workload_plug-in.cpp index 7a327cb84..52fd132ff 100644 --- a/tc/sgx/trusted_worker_manager/enclave/kme/workload/kme_workload_plug-in.cpp +++ b/tc/sgx/trusted_worker_manager/enclave/kme/workload/kme_workload_plug-in.cpp @@ -303,8 +303,12 @@ void KMEWorkloadProcessor::PreprocessWorkorder( } AddOutput(0, wo_key_data, out_work_order_data); } else { - SetStatus(ERR_WPE_KEY_NOT_FOUND, out_work_order_data); + SetStatus(ERR_WPE_KEY_INFO_CREATION_FAILED, out_work_order_data); + ThrowIf(true, "WPE key info creation failed"); } + } else { + SetStatus(ERR_WPE_KEY_NOT_FOUND, out_work_order_data); + ThrowIf(true, "WPE encryption key not found"); } } // KMEWorkloadProcessor::PreprocessWorkorder