From 7efe689f8bf83f6fb3c4fa60b1f89019d4e5b216 Mon Sep 17 00:00:00 2001 From: Laurin Brandner Date: Tue, 26 Apr 2022 11:25:42 +0200 Subject: [PATCH] Clean up azure platform --- sebs/azure/__init__.py | 2 +- sebs/azure/azure.py | 151 +++++--------------- sebs/azure/{function.py => function_app.py} | 8 +- sebs/azure/workflow.py | 36 ----- 4 files changed, 41 insertions(+), 156 deletions(-) rename sebs/azure/{function.py => function_app.py} (90%) delete mode 100644 sebs/azure/workflow.py diff --git a/sebs/azure/__init__.py b/sebs/azure/__init__.py index 499b1372..5736abdc 100644 --- a/sebs/azure/__init__.py +++ b/sebs/azure/__init__.py @@ -1,4 +1,4 @@ from .azure import Azure # noqa -from .function import AzureFunction # noqa +from .function_app import AzureFunction, AzureWorkflow # noqa from .config import AzureConfig # noqa from .blob_storage import BlobStorage # noqa diff --git a/sebs/azure/azure.py b/sebs/azure/azure.py index 663f3715..659f9439 100644 --- a/sebs/azure/azure.py +++ b/sebs/azure/azure.py @@ -2,19 +2,15 @@ import json import glob import os -import io import shutil import time -from typing import cast, Dict, List, Optional, Set, Tuple, Type # noqa +from typing import cast, Dict, List, Optional, Set, Tuple, Type, TypeVar # noqa import docker -import pandas as pd -from azure.storage.blob import BlobServiceClient from sebs.azure.blob_storage import BlobStorage from sebs.azure.cli import AzureCLI -from sebs.azure.function import AzureFunction -from sebs.azure.workflow import AzureWorkflow +from sebs.azure.function_app import FunctionApp, AzureFunction, AzureWorkflow from sebs.azure.config import AzureConfig, AzureResources from sebs.azure.triggers import AzureTrigger, HTTPTrigger from sebs.code_package import CodePackage @@ -232,32 +228,25 @@ def package_code(self, code_package: CodePackage, directory: str, is_workflow: b shell=True, cwd=directory) return directory, code_size - def publish_function( + def publish_benchmark( self, - function: Function, + benchmark: Benchmark, code_package: CodePackage, repeat_on_failure: bool = False, ) -> str: success = False url = "" self.logging.info( - "Attempting publish of function {}".format(function.name)) + "Attempting publish of {}".format(benchmark.name)) while not success: try: ret = self.cli_instance.execute( "bash -c 'cd /mnt/function " "&& func azure functionapp publish {} --{} --no-build'".format( - function.name, self.AZURE_RUNTIMES[code_package.language_name] + benchmark.name, self.AZURE_RUNTIMES[code_package.language_name] ) ) - # ret = self.cli_instance.execute( - # "bash -c 'cd /mnt/function " - # "&& az functionapp deployment source config-zip " - # "--src {}.zip -g {} -n {} --build-remote false '".format( - # code_package.name, resource_group, function.name - # ) - # ) - # print(ret) + url = "" for line in ret.split(b"\n"): line = line.decode("utf-8") @@ -277,7 +266,7 @@ def publish_function( time.sleep(30) self.logging.info( "Sleep 30 seconds for Azure to register function app {}".format( - function.name + benchmark.name ) ) # escape loop. we failed! @@ -296,11 +285,11 @@ def publish_function( :return: URL to reach HTTP-triggered function """ - def update_function(self, function: Function, code_package: CodePackage): + def update_benchmark(self, benchmark: Benchmark, code_package: CodePackage): # Mount code package in Docker instance self._mount_function_code(code_package) - url = self.publish_function(function, code_package, True) + url = self.publish_benchmark(benchmark, code_package, True) trigger = HTTPTrigger( url, self.config.resources.data_storage_account(self.cli_instance)) @@ -326,8 +315,8 @@ def default_benchmark_name(self, code_package: CodePackage) -> str: ) return func_name - def create_function(self, code_package: CodePackage, func_name: str) -> AzureFunction: - + B = TypeVar("B", bound=FunctionApp) + def create_benchmark(self, code_package: CodePackage, name: str, benchmark_cls: B) -> B: language = code_package.language_name language_runtime = code_package.language_version resource_group = self.config.resources.resource_group( @@ -336,7 +325,7 @@ def create_function(self, code_package: CodePackage, func_name: str) -> AzureFun config = { "resource_group": resource_group, - "func_name": func_name, + "name": name, "region": region, "runtime": self.AZURE_RUNTIMES[language], "runtime_version": language_runtime, @@ -349,7 +338,7 @@ def create_function(self, code_package: CodePackage, func_name: str) -> AzureFun ( " az functionapp config appsettings list " " --resource-group {resource_group} " - " --name {func_name} " + " --name {name} " ).format(**config) ) for setting in json.loads(ret.decode()): @@ -362,7 +351,7 @@ def create_function(self, code_package: CodePackage, func_name: str) -> AzureFun account_name, connection_string ) self.logging.info( - "Azure: Selected {} function app".format(func_name)) + "Azure: Selected {} function app".format(name)) except RuntimeError: function_storage_account = self.config.resources.add_storage_account( self.cli_instance) @@ -373,35 +362,36 @@ def create_function(self, code_package: CodePackage, func_name: str) -> AzureFun # create function app self.cli_instance.execute( ( - " az functionapp create --resource-group {resource_group} " - " --os-type Linux --consumption-plan-location {region} " + " az functionapp create --functions-version 3 " + " --resource-group {resource_group} --os-type Linux" + " --consumption-plan-location {region} " " --runtime {runtime} --runtime-version {runtime_version} " - " --name {func_name} --storage-account {storage_account}" + " --name {name} --storage-account {storage_account}" ).format(**config) ) self.logging.info( - "Azure: Created function app {}".format(func_name)) + "Azure: Created function app {}".format(name)) break except RuntimeError as e: # Azure does not allow some concurrent operations if "another operation is in progress" in str(e): self.logging.info( - f"Repeat {func_name} creation, another operation in progress" + f"Repeat {name} creation, another operation in progress" ) # Rethrow -> another error else: raise - function = AzureFunction( - name=func_name, + benchmark = benchmark_cls( + name=name, benchmark=code_package.name, code_hash=code_package.hash, function_storage=function_storage_account, ) # update existing function app - self.update_function(function, code_package) + self.update_benchmark(benchmark, code_package) - return function + return benchmark def cached_benchmark(self, benchmark: Benchmark): @@ -412,92 +402,17 @@ def cached_benchmark(self, benchmark: Benchmark): azure_trigger.logging_handlers = self.logging_handlers azure_trigger.data_storage_account = data_storage_account - def create_workflow(self, code_package: CodePackage, workflow_name: str) -> AzureFunction: - language = code_package.language_name - language_runtime = code_package.language_version - resource_group = self.config.resources.resource_group( - self.cli_instance) - region = self.config.region - - config = { - "resource_group": resource_group, - "workflow_name": workflow_name, - "region": region, - "runtime": self.AZURE_RUNTIMES[language], - "runtime_version": language_runtime, - } - - # check if function does not exist - # no API to verify existence - try: - ret = self.cli_instance.execute( - ( - " az functionapp config appsettings list " - " --resource-group {resource_group} " - " --name {workflow_name} " - ).format(**config) - ) - for setting in json.loads(ret.decode()): - if setting["name"] == "AzureWebJobsStorage": - connection_string = setting["value"] - elems = [z for y in connection_string.split( - ";") for z in y.split("=")] - account_name = elems[elems.index("AccountName") + 1] - function_storage_account = AzureResources.Storage.from_cache( - account_name, connection_string - ) - self.logging.info( - "Azure: Selected {} function app".format(workflow_name)) - except RuntimeError: - function_storage_account = self.config.resources.add_storage_account( - self.cli_instance) - config["storage_account"] = function_storage_account.account_name - - # FIXME: only Linux type is supported - while True: - try: - # create function app - self.cli_instance.execute( - ( - " az functionapp create --resource-group {resource_group} " - " --os-type Linux --consumption-plan-location {region} " - " --runtime {runtime} --runtime-version {runtime_version} " - " --name {workflow_name} --storage-account {storage_account}" - ).format(**config) - ) - self.logging.info( - "Azure: Created workflow app {}".format(workflow_name)) - break - except RuntimeError as e: - # Azure does not allow some concurrent operations - if "another operation is in progress" in str(e): - self.logging.info( - f"Repeat {workflow_name} creation, another operation in progress" - ) - # Rethrow -> another error - else: - raise - workflow = AzureWorkflow( - name=workflow_name, - benchmark=code_package.name, - code_hash=code_package.hash, - function_storage=function_storage_account, - ) + def create_function(self, code_package: CodePackage, func_name: str) -> AzureFunction: + return self.create_benchmark(code_package, func_name, AzureFunction) - # update existing function app - self.update_function(workflow, code_package) + def update_function(self, function: Function, code_package: CodePackage): + self.update_benchmark(function, code_package) - return workflow + def create_workflow(self, code_package: CodePackage, workflow_name: str) -> AzureWorkflow: + return self.create_benchmark(code_package, workflow_name, AzureWorkflow) def update_workflow(self, workflow: Workflow, code_package: CodePackage): - # Mount code package in Docker instance - self._mount_function_code(code_package) - url = self.publish_function(workflow, code_package, True) - - trigger = HTTPTrigger( - url, self.config.resources.data_storage_account(self.cli_instance)) - trigger.logging_handlers = self.logging_handlers - workflow.add_trigger(trigger) + self.update_benchmark(workflow, code_package) """ @@ -607,7 +522,7 @@ def _enforce_cold_start(self, function: Function, code_package: CodePackage): f" --settings ForceColdStart={self.cold_start_counter}" ) - self.update_function(function, code_package) + self.update_benchmark(function, code_package) def enforce_cold_start(self, functions: List[Function], code_package: CodePackage): self.cold_start_counter += 1 diff --git a/sebs/azure/function.py b/sebs/azure/function_app.py similarity index 90% rename from sebs/azure/function.py rename to sebs/azure/function_app.py index 4f0a9671..bf86df8e 100644 --- a/sebs/azure/function.py +++ b/sebs/azure/function_app.py @@ -2,7 +2,7 @@ from sebs.faas.benchmark import Function -class AzureFunction(Function): +class FunctionApp(Function): def __init__( self, name: str, @@ -34,3 +34,9 @@ def deserialize(cached_config: dict) -> Function: assert trigger_type, "Unknown trigger type {}".format(trigger["type"]) ret.add_trigger(trigger_type.deserialize(trigger)) return ret + +class AzureFunction(FunctionApp): + pass + +class AzureWorkflow(FunctionApp): + pass \ No newline at end of file diff --git a/sebs/azure/workflow.py b/sebs/azure/workflow.py deleted file mode 100644 index 353fb3c4..00000000 --- a/sebs/azure/workflow.py +++ /dev/null @@ -1,36 +0,0 @@ -from sebs.azure.config import AzureResources -from sebs.faas.benchmark import Workflow - - -class AzureWorkflow(Workflow): - def __init__( - self, - name: str, - benchmark: str, - code_hash: str, - function_storage: AzureResources.Storage, - ): - super().__init__(benchmark, name, code_hash) - self.function_storage = function_storage - - def serialize(self) -> dict: - return { - **super().serialize(), - "function_storage": self.function_storage.serialize(), - } - - @staticmethod - def deserialize(cached_config: dict) -> Workflow: - ret = AzureWorkflow( - cached_config["name"], - cached_config["code_package"], - cached_config["hash"], - AzureResources.Storage.deserialize(cached_config["function_storage"]), - ) - from sebs.azure.triggers import HTTPTrigger - - for trigger in cached_config["triggers"]: - trigger_type = {"HTTP": HTTPTrigger}.get(trigger["type"]) - assert trigger_type, "Unknown trigger type {}".format(trigger["type"]) - ret.add_trigger(trigger_type.deserialize(trigger)) - return ret