From 2f3d0b85f42d4f0aec8bb6b2824fd9618c8f128d Mon Sep 17 00:00:00 2001 From: Yogi Pandey <20666257+ynpandey@users.noreply.github.com> Date: Mon, 11 Sep 2023 16:35:14 -0500 Subject: [PATCH] Online feature store samples (#2640) * Added online feature store sample notebook. * Updated conda YAML files to use azureml-featurestore[online]==0.1.0b4, changed online FS sample notebook and scoring script for pyarrow compatibility. * Updated scoring.py used in online batch inferencing to handle NaN feature values. * Fixed code formatting, added code cell names, and some clean-up. * Removed redundant json metadata from the online feature store sample notebook. * Updated for Redis cache capacity nomenclature. --- ...nline store and run online inference.ipynb | 1387 +++++++++++++++++ .../project/env/online.yml | 16 + .../fraud_model/online_inference/conda.yml | 11 + .../online_inference/src/scoring.py | 73 + .../fraud_model/online_inference/test.json | 10 + 5 files changed, 1497 insertions(+) create mode 100644 sdk/python/featurestore_sample/notebooks/sdk_only/5. Enable online store and run online inference.ipynb create mode 100644 sdk/python/featurestore_sample/project/env/online.yml create mode 100644 sdk/python/featurestore_sample/project/fraud_model/online_inference/conda.yml create mode 100644 sdk/python/featurestore_sample/project/fraud_model/online_inference/src/scoring.py create mode 100644 sdk/python/featurestore_sample/project/fraud_model/online_inference/test.json diff --git a/sdk/python/featurestore_sample/notebooks/sdk_only/5. Enable online store and run online inference.ipynb b/sdk/python/featurestore_sample/notebooks/sdk_only/5. Enable online store and run online inference.ipynb new file mode 100644 index 0000000000..848afaa33a --- /dev/null +++ b/sdk/python/featurestore_sample/notebooks/sdk_only/5. Enable online store and run online inference.ipynb @@ -0,0 +1,1387 @@ +{ + "cells": [ + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Tutorial #5: Enable online materialization and run online inference\n", + "So far you have learned how to develop features, materialize them to offline materialization store, perform training, and perform batch inference. In this tutorial you will learn how to use feature store for online/realtime inference use cases.\n", + "\n", + "You will perform the following steps:\n", + "\n", + "1. Setup Azure Cache for Redis.\n", + "1. Attach the cache to feature store as the online materialization store and grant necessary permissions.\n", + "1. Materialize feature sets to the online store.\n", + "1. Test online deployment with mock data.\n" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Prerequisites\n", + "1. Before proceeding, please ensure that you have already completed previous four turorials of this tutorial series. We will be reusing feature store and some other resources created in the previous tutorials." + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "## Prepare the notebook environment for development\n", + "Note: This tutorial uses Azure Machine Learning notebook with **Serverless Spark Compute**.\n", + "\n", + "1. Clone the examples repository to your local machine: To run the tutorial, first clone the [examples repository - (azureml-examples)](https://github.com/azure/azureml-examples) with this command:\n", + "\n", + " `git clone --depth 1 https://github.com/Azure/azureml-examples`\n", + "\n", + " You can also download a zip file from the [examples repository (azureml-examples)](https://github.com/azure/azureml-examples). At this page, first select the `code` dropdown, and then select `Download ZIP`. Then, unzip the contents into a folder on your local device.\n", + "\n", + "2. Running the tutorial:\n", + "* Option 1: Create a new notebook, and execute the instructions in this document step by step. \n", + "* Option 2: Open the existing notebook `featurestore_sample/notebooks/sdk_only/5. Enable online store and run online inference.ipynb`. You may keep this document open and refer to it for additional explanation and documentation links.\n", + "\n", + " 1. Select **Serverless Spark Compute** in the top navigation **Compute** dropdown. This operation might take one to two minutes. Wait for a status bar in the top to display **Configure session**.\n", + " 2. Select **Configure session** in the top status bar.\n", + " 3. Select **Python packages**.\n", + " 4. Select **Upload conda file**.\n", + " 5. Select file `azureml-examples/sdk/python/featurestore-sample/project/env/online.yml` located on your local device.\n", + " 6. (Optional) Increase the session time-out (idle time in minutes) to reduce the serverless spark cluster startup time." + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "# Set up" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Start Spark session\n", + "Execute the following code cell to start the Spark session. It wil take approximately 10 minutes to install all dependencies and start the Spark session." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1685571267325 + }, + "name": "start-spark-session" + }, + "outputs": [], + "source": [ + "# Run this cell to start the spark session (any code block will start the session ). This can take approximately 10 mins.\n", + "print(\"start spark session\")" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Setup root directory for the samples" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1685571691922 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "root-dir", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "import os\n", + "\n", + "# Please update the dir to ./Users/{your-alias} (or any custom directory you uploaded the samples to).\n", + "# You can find the name from the directory structure inm the left navigation panel.\n", + "root_dir = \"./Users//featurestore_sample\"\n", + "\n", + "if os.path.isdir(root_dir):\n", + " print(\"The folder exists.\")\n", + "else:\n", + " print(\"The folder does not exist. Please create or fix the path\")" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Initialize the project workspace CRUD client\n", + "The `MLClient` for the current workspace, where you are running this tutorial notebook, will be used for create, read, update, and delete (CRUD) operations." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1685571719727 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "init-prj-ws-client", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "import os\n", + "from azure.ai.ml import MLClient\n", + "from azure.ai.ml.identity import AzureMLOnBehalfOfCredential\n", + "\n", + "project_ws_sub_id = os.environ[\"AZUREML_ARM_SUBSCRIPTION\"]\n", + "project_ws_rg = os.environ[\"AZUREML_ARM_RESOURCEGROUP\"]\n", + "project_ws_name = os.environ[\"AZUREML_ARM_WORKSPACE_NAME\"]\n", + "\n", + "# Connect to the project workspace\n", + "ws_client = MLClient(\n", + " AzureMLOnBehalfOfCredential(), project_ws_sub_id, project_ws_rg, project_ws_name\n", + ")" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Initialize the CRUD client of the feature store workspace\n", + "The `MLClient` for the feature store workspace for create, read, update, and delete (CRUD) operations on feature store workspace." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "name": "init-fs-ws-client" + }, + "outputs": [], + "source": [ + "from azure.ai.ml import MLClient\n", + "from azure.ai.ml.identity import AzureMLOnBehalfOfCredential\n", + "\n", + "# Feature store\n", + "featurestore_name = \"featurestore\" # use the same name from part #1 of the tutorial\n", + "featurestore_subscription_id = os.environ[\"AZUREML_ARM_SUBSCRIPTION\"]\n", + "featurestore_resource_group_name = os.environ[\"AZUREML_ARM_RESOURCEGROUP\"]\n", + "\n", + "# Feature store MLClient\n", + "fs_client = MLClient(\n", + " AzureMLOnBehalfOfCredential(),\n", + " featurestore_subscription_id,\n", + " featurestore_resource_group_name,\n", + " featurestore_name,\n", + ")" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Initialize the feature store core SDK client\n", + "This tutorial uses the Python feature store core SDK (`azureml-featurestore`). The SDK client initialized here is used for create, read, update, and delete (CRUD) operations, on feature stores, feature sets, and feature store entities." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "name": "init-fs-core-sdk" + }, + "outputs": [], + "source": [ + "from azureml.featurestore import FeatureStoreClient\n", + "from azure.ai.ml.identity import AzureMLOnBehalfOfCredential\n", + "\n", + "featurestore = FeatureStoreClient(\n", + " credential=AzureMLOnBehalfOfCredential(),\n", + " subscription_id=featurestore_subscription_id,\n", + " resource_group_name=featurestore_resource_group_name,\n", + " name=featurestore_name,\n", + ")" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup Azure cache for Redis\n", + "This tutorial uses Azure Cache for Redis as the online materialization store. You can either create a new Redis instance or reuse an existing one." + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Set values for the Azure Cache for Redis that will be used as online materialization store\n", + "In the following code cell, define the name of the Azure Cache for Redis that you want to create or reuse. Optionally, you can override other default settings." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1685571729309 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "redis-settings", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "ws_location = ws_client.workspaces.get(ws_client.workspace_name).location\n", + "\n", + "redis_subscription_id = os.environ[\"AZUREML_ARM_SUBSCRIPTION\"]\n", + "redis_resource_group_name = os.environ[\"AZUREML_ARM_RESOURCEGROUP\"]\n", + "redis_name = \"redis1\"\n", + "redis_location = ws_location" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Azure Cache for Redis (option 1): create new Redis instance\n", + "You can select the Redis cache tier (basic, standard, premium, or enterprise). You should choose a SKU family that is available for the selected cache tier. See this documentation page to learn more about [how selecting different tiers may affect cache performance](https://learn.microsoft.com/azure/azure-cache-for-redis/cache-best-practices-performance). See this link learn more about [pricing for different SKU tiers and families of Azure Cache for Redis](https://azure.microsoft.com/en-us/pricing/details/cache/).\n", + "\n", + "Execute the following code cell to create an Azure Cache for Redis with premium tier, SKU family `P` and cache capacity 2. It may take approximately 5-10 minutes to provision the Redis instance." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1685571734517 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "provision-redis", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "from azure.mgmt.redis import RedisManagementClient\n", + "from azure.mgmt.redis.models import RedisCreateParameters, Sku, SkuFamily, SkuName\n", + "\n", + "management_client = RedisManagementClient(\n", + " AzureMLOnBehalfOfCredential(), redis_subscription_id\n", + ")\n", + "\n", + "# It usually takes about 5 - 10 min to finish the provision of the Redis instance.\n", + "# If the following begin_create() call still hangs for longer than that,\n", + "# please check the status of the Redis instance on the Azure portal and cancel the cell if the provision has completed.\n", + "# This sample uses a PREMIUM tier Redis SKU from family P, which may cost more than a STANDARD tier SKU from family C.\n", + "# Please choose the SKU tier and family according to your performance and pricing requirements.\n", + "\n", + "redis_arm_id = (\n", + " management_client.redis.begin_create(\n", + " resource_group_name=redis_resource_group_name,\n", + " name=redis_name,\n", + " parameters=RedisCreateParameters(\n", + " location=redis_location,\n", + " sku=Sku(name=SkuName.PREMIUM, family=SkuFamily.P, capacity=2),\n", + " ),\n", + " )\n", + " .result()\n", + " .id\n", + ")\n", + "\n", + "print(redis_arm_id)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Azure Cache for Redis (option 2): use existing Redis instance\n", + "Optionally, you can reuse an existing Redis instance with the previously defined name by executing the following code." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1684886695973 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "reuse-redis", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "redis_arm_id = \"/subscriptions/{sub_id}/resourceGroups/{rg}/providers/Microsoft.Cache/Redis/{name}\".format(\n", + " sub_id=redis_subscription_id,\n", + " rg=redis_resource_group_name,\n", + " name=redis_name,\n", + ")" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Retrieve the user-assigned managed identity (UAI) used for feature store for materialization\n", + "This code cell retrieves the principal ID, client ID, and ARM ID property values for the UAI that will be used by the feature store for data materialization." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1685571749076 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "retrieve-uai", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "from azure.mgmt.msi import ManagedServiceIdentityClient\n", + "\n", + "uai_arm_id = fs_client.feature_stores.get(\n", + " featurestore_name\n", + ").materialization_identity.resource_id\n", + "uai_principal_id = fs_client.feature_stores.get(\n", + " featurestore_name\n", + ").materialization_identity.principal_id\n", + "uai_client_id = fs_client.feature_stores.get(\n", + " featurestore_name\n", + ").materialization_identity.client_id\n", + "\n", + "print(\"uai_principal_id:\" + uai_principal_id)\n", + "print(\"uai_client_id:\" + uai_client_id)\n", + "print(\"uai_arm_id:\" + uai_arm_id)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Grant `Contributor` role to the UAI on the Azure Cache for Redis\n", + "The following code grant `Contributor` role to the UAI on the Azure Cache for Redis. This is required to write data into Redis during materialization." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1684886991067 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "uai-redis-rbac", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "from azure.core.exceptions import ResourceExistsError\n", + "from azure.mgmt.msi import ManagedServiceIdentityClient\n", + "from azure.mgmt.msi.models import Identity\n", + "from azure.mgmt.authorization import AuthorizationManagementClient\n", + "from azure.mgmt.authorization.models import RoleAssignmentCreateParameters\n", + "from uuid import uuid4\n", + "\n", + "auth_client = AuthorizationManagementClient(\n", + " AzureMLOnBehalfOfCredential(), redis_subscription_id\n", + ")\n", + "\n", + "scope = f\"/subscriptions/{redis_subscription_id}/resourceGroups/{redis_resource_group_name}/providers/Microsoft.Cache/Redis/{redis_name}\"\n", + "\n", + "\n", + "# The role definition ID for the \"contributor\" role on the redis cache\n", + "# You can find other built-in role definition IDs in the Azure documentation\n", + "role_definition_id = f\"/subscriptions/{redis_subscription_id}/providers/Microsoft.Authorization/roleDefinitions/b24988ac-6180-42a0-ab88-20f7382dd24c\"\n", + "\n", + "# Generate a random UUID for the role assignment name\n", + "role_assignment_name = str(uuid4())\n", + "\n", + "# Set up the role assignment creation parameters\n", + "role_assignment_params = RoleAssignmentCreateParameters(\n", + " principal_id=uai_principal_id,\n", + " role_definition_id=role_definition_id,\n", + " principal_type=\"ServicePrincipal\",\n", + ")\n", + "\n", + "# Create the role assignment\n", + "try:\n", + " # Create the role assignment\n", + " result = auth_client.role_assignments.create(\n", + " scope, role_assignment_name, role_assignment_params\n", + " )\n", + " print(f\"redis RBAC granted to managed identity '{uai_principal_id}'.\")\n", + "except ResourceExistsError:\n", + " print(f\"redis RBAC already exists for managed identity '{uai_principal_id}'.\")" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Step 1: Attach online materialization store to the feature store\n", + "Attach the Azure Cache for Redis to the feature store to be used as the online materialization store." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1684887054700 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "attach-online-store", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "from azure.ai.ml.entities import (\n", + " ManagedIdentityConfiguration,\n", + " FeatureStore,\n", + " MaterializationStore,\n", + ")\n", + "\n", + "online_store = MaterializationStore(type=\"redis\", target=redis_arm_id)\n", + "\n", + "materialization_identity1 = ManagedIdentityConfiguration(\n", + " client_id=uai_client_id, principal_id=uai_principal_id, resource_id=uai_arm_id\n", + ")\n", + "\n", + "\n", + "ml_client = MLClient(\n", + " AzureMLOnBehalfOfCredential(),\n", + " subscription_id=featurestore_subscription_id,\n", + " resource_group_name=featurestore_resource_group_name,\n", + ")\n", + "\n", + "fs = FeatureStore(\n", + " name=featurestore_name,\n", + " online_store=online_store,\n", + " materialization_identity=materialization_identity1,\n", + ")\n", + "\n", + "fs_poller = ml_client.feature_stores.begin_create(fs, update_dependent_resources=True)\n", + "print(fs_poller.result())" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Step 2: Materialize `accounts` feature set data to online store" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Enable materialization on the `accounts` feature set\n", + "In the previous parts of the tutorial series, we did **not** materialize the accounts feature set since it had precomputed features and was used only for batch inference scenarios. In this step we will enable online materialization so that the features are available in the online store with low latency access. We will also enable offline materialization for consistency. Enabling offline materialization is optional." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1685121352342 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "enable-accounts-material", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "from azure.ai.ml.entities import (\n", + " MaterializationSettings,\n", + " MaterializationComputeResource,\n", + ")\n", + "\n", + "# Turn on both offline and online materialization on the \"accounts\" featureset.\n", + "\n", + "accounts_fset_config = fs_client._featuresets.get(name=\"accounts\", version=\"1\")\n", + "\n", + "accounts_fset_config.materialization_settings = MaterializationSettings(\n", + " offline_enabled=True,\n", + " online_enabled=True,\n", + " resource=MaterializationComputeResource(instance_type=\"standard_e8s_v3\"),\n", + " spark_configuration={\n", + " \"spark.driver.cores\": 4,\n", + " \"spark.driver.memory\": \"36g\",\n", + " \"spark.executor.cores\": 4,\n", + " \"spark.executor.memory\": \"36g\",\n", + " \"spark.executor.instances\": 2,\n", + " },\n", + " schedule=None,\n", + ")\n", + "\n", + "fs_poller = fs_client.feature_sets.begin_create_or_update(accounts_fset_config)\n", + "print(fs_poller.result())" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "### Backfill the `account` feature set\n", + "`backfill` command backfills data to all the materialization stores that are enabled for this feature set. In this case both offline and online materialization is enabled. Therefore `backfill` will be performed on both offline and online materialization stores." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "name": "start-accounts-backfill" + }, + "outputs": [], + "source": [ + "from datetime import datetime, timedelta\n", + "\n", + "# Trigger backfill on the \"accounts\" feature set.\n", + "# Backfill from 01/01/2023 to all the way to 3 hours ago.\n", + "\n", + "st = datetime(2020, 1, 1, 0, 0, 0, 0)\n", + "ed = datetime.now() - timedelta(hours=3)\n", + "\n", + "poller = fs_client.feature_sets.begin_backfill(\n", + " name=\"accounts\",\n", + " version=\"1\",\n", + " feature_window_start_time=st,\n", + " feature_window_end_time=ed,\n", + ")\n", + "print(poller.result().job_id)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "The following code cell tracks completion of the backfill job. Using the premium tier Azure Cache for Redis provisioned earlier, this step may take approximately 10 minutes to complete." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "name": "track-accounts-backfill" + }, + "outputs": [], + "source": [ + "# Get the job URL, and stream the job logs.\n", + "# With PREMIUM Redis SKU, SKU family \"P\", and cache capacity 2,\n", + "# it takes approximately 10 minutes to complete.\n", + "fs_client.jobs.stream(poller.result().job_id)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Step 3: Materialize `transactions` feature set data to the online store\n", + "\n", + "In the previous tutorials, we materialized data of the `transactions` feature set to the offline materialization store. In this step we will:\n", + "\n", + "1. Enable online materilization for the `transactions` feature set." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1684887083625 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "enable-transact-material", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "# Enable materialization to online store for the \"transactions\" feature set.\n", + "\n", + "transactions_fset_config = fs_client._featuresets.get(name=\"transactions\", version=\"1\")\n", + "transactions_fset_config.materialization_settings.online_enabled = True\n", + "\n", + "fs_poller = fs_client.feature_sets.begin_create_or_update(transactions_fset_config)\n", + "print(fs_poller.result())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "2. Backfill the data to both the online and offline materialization store to ensure that both have the latest data. Note that recurrent materialization job, which was setup earlier in the tutorial 2 of this series, will now materialize data to both online and offline materialization stores." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1685571817460 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "start-transact-material", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "# Trigger backfill on the \"transactions\" feature set to fill in the online/offline store.\n", + "# Backfill from 01/01/2023 to all the way to 3 hours ago.\n", + "\n", + "from datetime import datetime, timedelta\n", + "\n", + "st = datetime(2020, 1, 1, 0, 0, 0, 0)\n", + "ed = datetime.now() - timedelta(hours=3)\n", + "\n", + "\n", + "poller = fs_client.feature_sets.begin_backfill(\n", + " name=\"transactions\",\n", + " version=\"1\",\n", + " feature_window_start_time=st,\n", + " feature_window_end_time=ed,\n", + ")\n", + "print(poller.result().job_id)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The following code cell tracks completion of the backfill job. Using the premium tier Azure Cache for Redis provisioned earlier, this step may take approximately 3-4 minutes to complete." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1685572796715 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "track-transact-material", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "# Get the job URL, and stream the job logs.\n", + "# With PREMIUM Redis SKU, SKU family \"P\", and cache capacity 2,\n", + "# it takes approximately 3-4 minutes to complete.\n", + "fs_client.jobs.stream(poller.result().job_id)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Step 4: Test locally\n", + "In this step we will use our development environment (i.e. this notebook) to lookup features from online materialization store. " + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "First, we will parse the list of features from the existing feature retrieval specification:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1685132938320 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "parse-feat-list", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "# Parse the list of features from the existing feature retrieval specification.\n", + "feature_retrieval_spec_folder = root_dir + \"/project/fraud_model/feature_retrieval_spec\"\n", + "\n", + "features = featurestore.resolve_feature_retrieval_spec(feature_retrieval_spec_folder)\n", + "\n", + "features" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, we will get feature values from the online materialization store:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1685132960042 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "init-online-lookup", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "from azureml.featurestore import init_online_lookup\n", + "import time\n", + "\n", + "# Initialize the online store client.\n", + "init_online_lookup(features, AzureMLOnBehalfOfCredential())\n", + "\n", + "# It may take 5 - 10 sec the local RPC server of online store client to be ready.\n", + "time.sleep(5)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now, we will prepare some observation data for testing and use it to lookup features from the online materialization store. During online lookup, it may happen that the keys (`accountID`) defined in the observation sample data do not exist in the Redis (due to `TTL`). If this happens:\n", + "1. Open Azure portal.\n", + "2. Navigate to the Redis instance. \n", + "3. Open console for the Redis instance and check for existing keys using command `KEYS *`.\n", + "4. Replace `accountID` values in the sample observation data with the existing keys." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1685132964129 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "online-feat-loockup", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "import pyarrow\n", + "from azureml.featurestore import get_online_features\n", + "\n", + "# Prepare test observation data\n", + "obs = pyarrow.Table.from_pydict(\n", + " {\"accountID\": [\"A985156952816816\", \"A1055521248929430\", \"A914800935560176\"]}\n", + ")\n", + "\n", + "# Online lookup:\n", + "# It may happen that the keys defined in the observation sample data above does not exist in the Redis (due to TTL).\n", + "# If this happens, go to Azure portal and navigate to the Redis instance, open its console and check for existing keys using command \"KEYS *\"\n", + "# and replace the sample observation data with the existing keys.\n", + "df = get_online_features(features, obs)\n", + "df" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "Now that we have successfully looked up features from the online store, we will test online features using Azure Machine Learning managed online endpoint." + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Step 5: Test online features from Azure Machine Learning managed online endpoint\n", + "Managed online endpoint provide capability for deploying and scoring models for online/realtime inference. Optionally, you can use any inference technology of your choice (like kubernetes).\n", + "\n", + "As a part of this step, we will perform the following actions:\n", + "\n", + "1. Create an Azure Machine Learning managed online endpoint.\n", + "1. Grant required role-based access control (RBAC) permissions.\n", + "1. Deploy the model that we trained in the tutorial 3 of this tutorial series. The scoring script used in this step will have the code to lookup online features.\n", + "2. Perform scoring of the model with sample data. You will see that the online features are looked up and model scoring is completed successfully." + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create Azure Machine Learning managed online endpoint\n", + "You can learn more about managed online endpoints [here](https://learn.microsoft.com/azure/machine-learning/how-to-deploy-online-endpoints?view=azureml-api-2&tabs=azure-cli). Note that using managed feature store API, you can also lookup online features from other inference platforms based on your need.\n", + "\n", + "Following code defines a managed online endpoint with name `fraud-model`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1685155956798 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "define-endpoint", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "from azure.ai.ml.entities import (\n", + " ManagedOnlineDeployment,\n", + " ManagedOnlineEndpoint,\n", + " Model,\n", + " CodeConfiguration,\n", + " Environment,\n", + ")\n", + "\n", + "\n", + "endpoint_name = \"fraud-model\"\n", + "\n", + "endpoint = ManagedOnlineEndpoint(name=endpoint_name, auth_mode=\"key\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Excute the following code cell to create the managed online endpoint defined in the previous code cell." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1685156110582 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "create-endpoint", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "ws_client.online_endpoints.begin_create_or_update(endpoint).result()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "## Grant required RBAC permissions\n", + "In this step, we will grant required RBAC permissions to the managed online endpoint on the Redis instance and feature store. The scoring code in the model deployment will need these RBAC permissions to successfully lookup features from the online store using the managed feature store API." + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Get managed identity of the managed online endpoint" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1685156114744 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "get-endpoint-identity", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "# Get managed identity of the managed online endpoint.\n", + "endpoint = ws_client.online_endpoints.get(endpoint_name)\n", + "\n", + "model_endpoint_msi_principal_id = endpoint.identity.principal_id\n", + "model_endpoint_msi_principal_id" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Grant `Contributor` role to the online endpoint managed identity on the Azure Cache for Redis \n", + "We will grant `Contributor` role to the online endpoint managed identity on the Redis instance. This RBAC permission is needed to materialize data into the Redis online store." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1685156142743 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "endpoint-redis-rbac", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "from azure.core.exceptions import ResourceExistsError\n", + "from azure.mgmt.msi import ManagedServiceIdentityClient\n", + "from azure.mgmt.msi.models import Identity\n", + "from azure.mgmt.authorization import AuthorizationManagementClient\n", + "from azure.mgmt.authorization.models import RoleAssignmentCreateParameters\n", + "from uuid import uuid4\n", + "\n", + "auth_client = AuthorizationManagementClient(\n", + " AzureMLOnBehalfOfCredential(), redis_subscription_id\n", + ")\n", + "\n", + "scope = f\"/subscriptions/{redis_subscription_id}/resourceGroups/{redis_resource_group_name}/providers/Microsoft.Cache/Redis/{redis_name}\"\n", + "\n", + "\n", + "# The role definition ID for the \"contributor\" role on the redis cache\n", + "# You can find other built-in role definition IDs in the Azure documentation\n", + "role_definition_id = f\"/subscriptions/{redis_subscription_id}/providers/Microsoft.Authorization/roleDefinitions/b24988ac-6180-42a0-ab88-20f7382dd24c\"\n", + "\n", + "# Generate a random UUID for the role assignment name\n", + "role_assignment_name = str(uuid4())\n", + "\n", + "# Set up the role assignment creation parameters\n", + "role_assignment_params = RoleAssignmentCreateParameters(\n", + " principal_id=model_endpoint_msi_principal_id,\n", + " role_definition_id=role_definition_id,\n", + " principal_type=\"ServicePrincipal\",\n", + ")\n", + "\n", + "# Create the role assignment\n", + "try:\n", + " # Create the role assignment\n", + " result = auth_client.role_assignments.create(\n", + " scope, role_assignment_name, role_assignment_params\n", + " )\n", + " print(\n", + " f\"Redis RBAC granted to managed identity '{model_endpoint_msi_principal_id}'.\"\n", + " )\n", + "except ResourceExistsError:\n", + " print(\n", + " f\"Redis RBAC already exists for managed identity '{model_endpoint_msi_principal_id}'.\"\n", + " )" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Grant `AzureML Data Scientist` role to the online endpoint managed identity on the feature store\n", + "We will grant `AzureML Data Scientist` role to the online endpoint managed identity on the feature store. This RBAC permission is required for successful deployment of the model to the online endpoint." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1685156168113 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "endpoint-fs-rbac", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "auth_client = AuthorizationManagementClient(\n", + " AzureMLOnBehalfOfCredential(), featurestore_subscription_id\n", + ")\n", + "\n", + "scope = f\"/subscriptions/{featurestore_subscription_id}/resourceGroups/{featurestore_resource_group_name}/providers/Microsoft.MachineLearningServices/workspaces/{featurestore_name}\"\n", + "\n", + "# The role definition ID for the \"AzureML Data Scientist\" role.\n", + "# You can find other built-in role definition IDs in the Azure documentation.\n", + "role_definition_id = f\"/subscriptions/{featurestore_subscription_id}/providers/Microsoft.Authorization/roleDefinitions/f6c7c914-8db3-469d-8ca1-694a8f32e121\"\n", + "\n", + "# Generate a random UUID for the role assignment name.\n", + "role_assignment_name = str(uuid4())\n", + "\n", + "# Set up the role assignment creation parameters.\n", + "role_assignment_params = RoleAssignmentCreateParameters(\n", + " principal_id=model_endpoint_msi_principal_id,\n", + " role_definition_id=role_definition_id,\n", + " principal_type=\"ServicePrincipal\",\n", + ")\n", + "\n", + "# Create the role assignment\n", + "try:\n", + " # Create the role assignment\n", + " result = auth_client.role_assignments.create(\n", + " scope, role_assignment_name, role_assignment_params\n", + " )\n", + " print(\n", + " f\"Feature store RBAC granted to managed identity '{model_endpoint_msi_principal_id}'.\"\n", + " )\n", + "except ResourceExistsError:\n", + " print(\n", + " f\"Feature store RBAC already exists for managed identity '{model_endpoint_msi_principal_id}'.\"\n", + " )" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Deploy the model to the online endpoint\n", + "First, inspect the scoring script `project/fraud_model/online_inference/src/scoring.py`. The scoring script performs the following tasks:\n", + "\n", + "1. Load the feature metadata from the feature retrieval specification that was packaged along with the model during model training (tutorial 3 of this tutorial series). This specification has features from both `transactions` and `accounts` feature sets.\n", + "2. When an input inference request is received, the scoring code looks up the online features using the index keys from the request. In this case for both feature sets, the index column is the `accountID`.\n", + "3. Passes the features to the model to perform inference and returs the response, a boolean value representing the variable `is_fraud`.\n", + "\n", + "First, create managed online deployment definition for model deployment by executing the following code cell:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1685156215970 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "define-online-deployment", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "deployment = ManagedOnlineDeployment(\n", + " name=\"green\",\n", + " endpoint_name=endpoint_name,\n", + " model=\"azureml:fraud_model:1\",\n", + " code_configuration=CodeConfiguration(\n", + " code=root_dir + \"/project/fraud_model/online_inference/src/\",\n", + " scoring_script=\"scoring.py\",\n", + " ),\n", + " environment=Environment(\n", + " conda_file=root_dir + \"/project/fraud_model/online_inference/conda.yml\",\n", + " image=\"mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04\",\n", + " ),\n", + " instance_type=\"Standard_DS3_v2\",\n", + " instance_count=1,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, deploy the model to online enpoint by executing the following code cell. Note that it may take approximately 4-5 minutes to deploy the model." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1685156672789 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "begin-online-deployment", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "# Model deployment to online enpoint may take 4-5 minutes.\n", + "ws_client.online_deployments.begin_create_or_update(deployment).result()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Test online deployment with mock data\n", + "Finally, execute the following code to test the online deployment using the mock data. You should see `0` or `1` as the output of this cell." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "gather": { + "logged": 1685157485313 + }, + "jupyter": { + "outputs_hidden": false, + "source_hidden": false + }, + "name": "test-online-deployment", + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "# Test the online deployment using the mock data.\n", + "sample_data = root_dir + \"/project/fraud_model/online_inference/test.json\"\n", + "ws_client.online_endpoints.invoke(\n", + " endpoint_name=endpoint_name, request_file=sample_data, deployment_name=\"green\"\n", + ")" + ] + } + ], + "metadata": { + "celltoolbar": "Edit Metadata", + "kernel_info": { + "name": "synapse_pyspark" + }, + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.13" + }, + "microsoft": { + "host": { + "AzureML": { + "notebookHasBeenCompleted": true + } + }, + "ms_spell_check": { + "ms_spell_check_language": "en" + } + }, + "nteract": { + "version": "nteract-front-end@1.0.0" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/sdk/python/featurestore_sample/project/env/online.yml b/sdk/python/featurestore_sample/project/env/online.yml new file mode 100644 index 0000000000..5e00591f98 --- /dev/null +++ b/sdk/python/featurestore_sample/project/env/online.yml @@ -0,0 +1,16 @@ +dependencies: + - python=3.8 + - pip: + # Protobuf is needed to avoid conflict with managed spark + - protobuf==3.19.6 + # For asynchronous HTTP requests + - aiohttp==3.8.4 + # Online feature store core SDK + - azureml-featurestore[online]==0.1.0b4 + # Azure management libraries + - azure-mgmt-msi + - azure-mgmt-redis + - azure-mgmt-authorization==3.0.0 + # Required for working with observation data in online feature set samples + - pandas==1.5.3 +name: fs_online_env \ No newline at end of file diff --git a/sdk/python/featurestore_sample/project/fraud_model/online_inference/conda.yml b/sdk/python/featurestore_sample/project/fraud_model/online_inference/conda.yml new file mode 100644 index 0000000000..d175537de6 --- /dev/null +++ b/sdk/python/featurestore_sample/project/fraud_model/online_inference/conda.yml @@ -0,0 +1,11 @@ +dependencies: + - python=3.8 + - pip: + - protobuf==3.19.6 + - azureml-featurestore[online]==0.1.0b4 + - aiohttp==3.8.4 + - azureml-inference-server-http + - scikit-learn + - pandas + - azure-identity +name: fs_env \ No newline at end of file diff --git a/sdk/python/featurestore_sample/project/fraud_model/online_inference/src/scoring.py b/sdk/python/featurestore_sample/project/fraud_model/online_inference/src/scoring.py new file mode 100644 index 0000000000..22eb90bdc3 --- /dev/null +++ b/sdk/python/featurestore_sample/project/fraud_model/online_inference/src/scoring.py @@ -0,0 +1,73 @@ +import os +import logging +import json +import time +import pyarrow +import pickle + +from azure.identity import ManagedIdentityCredential +from azureml.featurestore import FeatureStoreClient +from azureml.featurestore import init_online_lookup +from azureml.featurestore import get_online_features + + +def init(): + """ + This function is called when the container is initialized/started, typically after create/update of the deployment. + You can write the logic here to perform init operations like caching the model in memory + """ + + global model + + # load the model + print("check model path") + + model_path = os.path.join(os.getenv("AZUREML_MODEL_DIR"), "model_output/clf.pkl") + + with open(model_path, "rb") as pickle_file: + model = pickle.load(pickle_file) + # AZUREML_MODEL_DIR is an environment variable created during deployment. + # It is the path to the model folder (./azureml-models/$MODEL_NAME/$VERSION) + # Please provide your model's folder name if there is one + + # load feature retrieval spec + print("load feature spec") + + credential = ManagedIdentityCredential() + + spec_path = os.path.join(os.getenv("AZUREML_MODEL_DIR"), "model_output") + + global features + + featurestore = FeatureStoreClient(credential=credential) + + features = featurestore.resolve_feature_retrieval_spec(spec_path) + + init_online_lookup(features, credential) + + time.sleep(20) + + logging.info("Init complete") + + +def run(raw_data): + + logging.info("model 1: request received") + logging.info(raw_data) + print(raw_data) + + data = json.loads(raw_data)["data"] + + obs = pyarrow.Table.from_pydict(data) + feat_table = get_online_features(features, obs) + df = feat_table.to_pandas() + df.fillna(0, inplace=True) + print("feature retrieved") + print(df) + + logging.info("model 1: feature joined") + + data = df.drop(["accountID"], axis="columns").to_numpy() + result = model.predict(data) + logging.info("Request processed") + return result.tolist() diff --git a/sdk/python/featurestore_sample/project/fraud_model/online_inference/test.json b/sdk/python/featurestore_sample/project/fraud_model/online_inference/test.json new file mode 100644 index 0000000000..3e5c74f9e5 --- /dev/null +++ b/sdk/python/featurestore_sample/project/fraud_model/online_inference/test.json @@ -0,0 +1,10 @@ +{ + "data": { + "accountID": ["A1055521510328860"], + "transactionAmount": [299], + "localHour": [21], + "transactionAmountUSD": [21], + "digitalItemCount": [1], + "physicalItemCount": [1] + } +}