Skip to content
This repository has been archived by the owner on Jan 27, 2022. It is now read-only.

Commit

Permalink
Restart WPE on KME restart
Browse files Browse the repository at this point in the history
Signed-off-by: Rajeev Ranjan <[email protected]>
  • Loading branch information
rranjan3 committed Aug 7, 2020
1 parent 6b57754 commit 1f6098a
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 39 deletions.
15 changes: 15 additions & 0 deletions common/python/exception/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2020 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

all = []
27 changes: 27 additions & 0 deletions common/python/exception/avalon_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2020 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class AvalonException(Exception):
"""
Abstract exception class for Avalon exceptions.
"""
pass


class WorkerRestartException(AvalonException):
"""
Exception class which indicates that a worker has restarted.
"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from abc import abstractmethod
from error_code.error_status import WorkOrderStatus
from avalon_enclave_manager.base_enclave_manager import EnclaveManager
from exception.avalon_exceptions import WorkerRestartException

logger = logging.getLogger(__name__)

Expand All @@ -36,6 +37,7 @@ class WOProcessorManager(EnclaveManager):
def __init__(self, config):
super().__init__(config)
self._identity = None
self._is_restart_required = False

# -------------------------------------------------------------------------

Expand Down Expand Up @@ -99,14 +101,24 @@ def _process_work_order_by_id(self, wo_id):
Returns :
wo_resp - A JSON response of the executed work order
"""

try:
if self._is_restart_required:
self._kv_helper.csv_prepend("wo-worker-scheduled",
self._worker_id, wo_id)
logger.info("Reinstating work order {} to ".format(wo_id)
+ "wo-worker-scheduled; This worker will restart.")
raise WorkerRestartException("This worker needs to restart.")

# Get JSON workorder request corresponding to wo_id
wo_json_req = self._kv_helper.get("wo-requests", wo_id)
if wo_json_req is None:
logger.error("Received empty work order corresponding " +
"to id %s from wo-requests table", wo_id)
return None

except WorkerRestartException:
raise
except Exception as e:
logger.error("Problem while reading the work order %s "
"from wo-requests table", wo_id)
Expand All @@ -126,9 +138,22 @@ def _process_work_order_by_id(self, wo_id):

# Execute work order request

logger.info("Execute workorder with id %s", wo_id)
wo_json_resp = self._execute_work_order(wo_json_req)
wo_resp = json.loads(wo_json_resp)
try:
logger.info("Execute workorder with id %s", wo_id)
wo_json_resp = self._execute_work_order(wo_json_req)
wo_resp = json.loads(wo_json_resp)
except WorkerRestartException:
self._kv_helper.csv_prepend("wo-worker-scheduled",
self._worker_id, wo_id)
logger.info("Reinstating work order {} to ".format(wo_id)
+ "wo-worker-scheduled as this worker will restart.")
raise
except Exception as e:
logger.error("Exception while processing work order: {}".format(e))
self._persist_wo_response_to_db(
wo_id, WorkOrderStatus.FAILED, None,
"Exception during work order processing")
raise

logger.info("Update workorder receipt for workorder %s", wo_id)
self._wo_kv_delegate.update_receipt(wo_id, wo_resp)
Expand Down Expand Up @@ -175,21 +200,19 @@ def _execute_work_order(self, input_json_str):
"""
try:
wo_response = self._execute_wo_in_trusted_enclave(input_json_str)
try:
json_response = json.dumps(wo_response, indent=4)
except Exception as err:
wo_response["Response"] = dict()
logger.error("ERROR: Failed to serialize JSON; %s", str(err))
wo_response["Response"]["Status"] = WorkOrderStatus.FAILED
wo_response["Response"]["Message"] = "Failed to serialize JSON"
json_response = json.dumps(wo_response)

json_response = json.dumps(wo_response, indent=4)

except TypeError as err:
logger.error("Failed to serialize response JSON - %s", str(err))
return jrpc_utility.create_error_response(
WorkOrderStatus.FAILED, "0",
"Failed to serialize response JSON")
except WorkerRestartException:
raise
except Exception as e:
wo_response["Response"] = dict()
logger.error("failed to execute work order; %s", str(e))
wo_response["Response"]["Status"] = WorkOrderStatus.FAILED
wo_response["Response"]["Message"] = str(e)
json_response = json.dumps(wo_response)
logger.error("Failed to execute work order - %s", str(e))
return jrpc_utility.create_error_response(
WorkOrderStatus.FAILED, "0", "Failed to execute work order")

return json_response

Expand Down Expand Up @@ -297,6 +320,8 @@ def _start_polling_kvstore(self):
logger.info("Enclave manager sleeping for %d secs",
sleep_interval)
time.sleep(sleep_interval)
except WorkerRestartException:
raise
except Exception as inst:
logger.error("Error while processing work-order; " +
"shutting down enclave manager")
Expand Down Expand Up @@ -353,6 +378,8 @@ def _start_zmq_listener(self):
str(wo_id))
else:
socket.send_string("Work order processed: " + str(wo_id))
except WorkerRestartException:
raise
except Exception as inst:
logger.error("Error while processing work-order; " +
"shutting down enclave manager")
Expand Down
85 changes: 67 additions & 18 deletions enclave_manager/avalon_enclave_manager/wpe/wpe_enclave_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import os
import sys

import utility.jrpc_utility as jrpc_utility
import avalon_enclave_manager.sgx_work_order_request as work_order_request
import avalon_enclave_manager.wpe.wpe_enclave as enclave
import avalon_enclave_manager.wpe.wpe_enclave_info as enclave_info
Expand All @@ -29,6 +30,9 @@
from error_code.error_status import WorkOrderStatus
from avalon_enclave_manager.work_order_processor_manager \
import WOProcessorManager
from exception.avalon_exceptions import WorkerRestartException
from multiprocessing import Process


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -128,25 +132,72 @@ def _execute_wo_in_trusted_enclave(self, input_json_str):
try:
pre_proc_output = self._wpe_requester\
.preprocess_work_order(input_json_str, self.encryption_key)
if "error" in pre_proc_output:
if pre_proc_output is None or "error" in pre_proc_output:
# If error in preprocessing response, skip workorder processing
logger.error("Failed to preprocess at WPE enclave manager.")
logger.error("Failed to preprocess work order request.")
self._check_for_re_register()
if self._is_restart_required:
raise WorkerRestartException(
"This worker needs to restart.")
return pre_proc_output

wo_request = work_order_request.SgxWorkOrderRequest(
"WPE",
input_json_str,
pre_proc_output)
wo_response = wo_request.execute()
except WorkerRestartException:
raise
except Exception as e:
logger.error("failed to execute work order; %s", str(e))
wo_response = dict()
wo_response["error"] = dict()
wo_response["error"]["code"] = WorkOrderStatus.FAILED
wo_response["error"]["message"] = str(e)
logger.info("unknown enclave type response = %s", wo_response)
logger.error("Failed to execute work order; %s", str(e))
wo_response = jrpc_utility.create_error_response(
WorkOrderStatus.FAILED, "0",
"Failed to execute work order : {}".format(str(e)))
return wo_response

# -------------------------------------------------------------------------

def _check_for_re_register(self):
"""
This function verifies worker details in this instance against that
in database to see if a worker update has taken place. If so, it
enables a flag to re-register this WPE with the KME before next work
order is picked up for processing.
"""
wpes_csv = self._kv_helper.get("worker-pool", self._worker_id)
wpes = [] if wpes_csv is None else wpes_csv.split(",")
if self._identity not in wpes:
# If identity is not found in worker-pool, the KME might have
# restarted in which case the WPEs need to register again.
logger.info("WPE needs to restart and register with KME again.")
self._is_restart_required = True

# -------------------------------------------------------------------------


def run_wpe(config):
"""
Delegate method that spawns up as a new process and runs the WPE Enclave
Manager.
Parameters:
@param config - A map of configurations read from file/command line
"""
try:
logger.info("Initialize WorkOrderProcessor enclave_manager")
enclave_manager = WorkOrderProcessorEnclaveManager(config)
logger.info("About to start WorkOrderProcessor Enclave manager")
enclave_manager.start_enclave_manager()
except WorkerRestartException as ex:
logger.error("Exception occurred while processing work orders.")
logger.error(ex)
logger.info("Will try to restart WPE.")
except Exception as e:
logger.error(e)
logger.error("Exception occurred while running WPE, " +
"exiting WPE enclave manager")
sys.exit(1)

# -------------------------------------------------------------------------


Expand Down Expand Up @@ -195,16 +246,14 @@ def main(args=None):
sys.stderr = plogger.stream_to_logger(
logging.getLogger("STDERR"), logging.WARN)

try:
EnclaveManager.parse_command_line(config, remainder)
logger.info("Initialize WorkOrderProcessor enclave_manager")
enclave_manager = WorkOrderProcessorEnclaveManager(config)
logger.info("About to start WorkOrderProcessor Enclave manager")
enclave_manager.start_enclave_manager()
except Exception as e:
logger.error("Exception occurred while running WPE, " +
"exiting WPE enclave manager")
exit(1)
EnclaveManager.parse_command_line(config, remainder)

while True:
wpe_manager = Process(target=run_wpe, args=(config,))
wpe_manager.start()
wpe_manager.join()
if wpe_manager.exitcode == 1:
sys.exit(1)


main()
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def preprocess_work_order(self, wo_request, encryption_key):
@param encryption_key - WPE's public encryption key
Returns :
@returns result - Result from KME that includes the workorder
key info. error response, in case of failure.
key info. Error response, in case of failure.
"""
workload_id = "kme-preprocess"
in_data = [wo_request, encryption_key]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ enum KmeRegistrationStatus {
ERR_MRSIGNER_NOT_MATCH = 4, /// WPE MRSIGNER value not matched
ERR_WPE_VERIFICATION_FAILED = 5, /// WPE attestation report verification failed
ERR_ENCRYPTION_KEY_NOT_MATCH = 6, /// WPE encryption hash value didn't matched
ERR_UNIQUE_ID_NOT_MATCH = 7 /// WPE unique id didn't match
ERR_UNIQUE_ID_NOT_MATCH = 7, /// WPE unique id didn't match
ERR_WPE_KEY_INFO_CREATION_FAILED = 8 /// CreateWorkOrderKeyInfo for WPE failed
};

enum KmePreProcessStatus {
ERR_WPE_MAX_WO_COUNT_REACHED = 1
ERR_WPE_MAX_WO_COUNT_REACHED = 10
};

class ExtWorkOrderInfoKME : public ExtWorkOrderInfoImpl {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,12 @@ void KMEWorkloadProcessor::PreprocessWorkorder(
}
AddOutput(0, wo_key_data, out_work_order_data);
} else {
SetStatus(ERR_WPE_KEY_NOT_FOUND, out_work_order_data);
SetStatus(ERR_WPE_KEY_INFO_CREATION_FAILED, out_work_order_data);
ThrowIf<ValueError>(true, "WPE key info creation failed");
}
} else {
SetStatus(ERR_WPE_KEY_NOT_FOUND, out_work_order_data);
ThrowIf<ValueError>(true, "WPE encryption key not found");
}
} // KMEWorkloadProcessor::PreprocessWorkorder

Expand Down

0 comments on commit 1f6098a

Please sign in to comment.