diff --git a/notebooks/DAX Performance Testing.ipynb b/notebooks/DAX Performance Testing.ipynb new file mode 100644 index 00000000..29ab3038 --- /dev/null +++ b/notebooks/DAX Performance Testing.ipynb @@ -0,0 +1,1265 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "20b29d19", + "metadata": {}, + "source": [ + "# DAX Performance Testing\n", + "\n", + "## Summary\n", + "\n", + "This notebook is designed to measure DAX query timings under different cache states (cold, warm, and hot). Specifically:\n", + "\n", + "1. **DAX Queries from Excel** \n", + " - You must provide an Excel file containing the DAX queries in a table you wish to test. \n", + " - For each query, a column needs align with the `runQueryType` used for a given `queryId`. \n", + " - This notebook reads those queries and executes them on one or more Power BI/Fabric models.\n", + "\n", + "2. **Lakehouse Logging** \n", + " - You also must attach the appropriate Lakehouse in Fabric so that logs can be saved (both in a table and as files if you choose). \n", + "\n", + "3. **Capacity Pause/Resume** \n", + " - In some scenarios (e.g., simulating a \"cold\" cache on DirectQuery or Import models), the code pauses and resumes capacities. \n", + " - **Warning**: Pausing a capacity will interrupt any running workloads on that capacity. Resuming will take time and resources, and can affect other workspaces assigned to the same capacity. \n", + "\n", + "Overall, the purpose is to capture performance metrics (timings, CPU usage, etc.) for DAX queries under different cache states.\n" + ] + }, + { + "cell_type": "markdown", + "id": "5c27dfd1-4fe0-4a97-92e6-ddf78889aa93", + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + }, + "source": [ + "### Install the latest .whl package\n", + "\n", + "Check [here](https://pypi.org/project/semantic-link-labs/) to see the latest version." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d5cae9db-cef9-48a8-a351-9c5fcc99645c", + "metadata": { + "jupyter": { + "outputs_hidden": true, + "source_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + } + }, + "outputs": [], + "source": [ + "%pip install semantic-link-labs" + ] + }, + { + "cell_type": "markdown", + "id": "b195eae8", + "metadata": {}, + "source": [ + "### Import the library and necessary packages" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1344e286", + "metadata": {}, + "outputs": [], + "source": [ + "import sempy.fabric as fabric\n", + "import sempy_labs as labs\n", + "import pandas as pd\n", + "import time\n", + "import itertools\n", + "import random\n", + "import requests\n", + "import functools\n", + "import builtins\n", + "from threading import local\n", + "from contextlib import contextmanager\n", + "from uuid import uuid4\n", + "from pyspark.sql.functions import col, sum as _sum, when, countDistinct\n", + "from datetime import datetime" + ] + }, + { + "cell_type": "markdown", + "id": "437e5935", + "metadata": {}, + "source": [ + "### Global configurations & variables" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "31200f84", + "metadata": {}, + "outputs": [], + "source": [ + "# Generate a unique run ID for this test run\n", + "run_id = str(uuid4())\n", + "\n", + "# Define models and their configurations for testing\n", + "models = [\n", + " {\n", + " \"name\": \"Model Name\", # The name of the semantic model\n", + " \"storageMode\": \"DirectLake\", # Import, DirectQuery, or DirectLake\n", + " \"cache_types\": [\"cold\", \"warm\", \"hot\"], # List of cache types to be run (hot, warm, and cold)\n", + " \"model_workspace_name\": \"Model Workspace Name\", # The workspace name of the semantic model\n", + " \"database_name\": \"Lakehouse Name\", # Only needed for cold cache queries for Import and DirectQuery\n", + " \"database_workspace_name\": \"Lakehouse Workspace Name\", # Only needed for cold cache queries for Import and DirectQuery\n", + " \"runQueryType\": \"query\", # The name of the column in your DAX Excel file contains the query to be run\n", + " },\n", + "]\n", + "\n", + "# Only needed for cold cache queries for Import and DirectQuery\n", + "workspace_capacities = {\n", + " \"Workspace Name\": {\n", + " \"capacity_name\": \"Testing Capacity Name\",\n", + " \"alt_capacity_name\": \"Alternate Capacity Name\",\n", + " }\n", + "}\n", + "\n", + "# Read DAX queries from the Excel file uploaded to the attached lakehouse\n", + "# The first column must be 'queryId' and additional columns should contain variants of the DAX query.\n", + "dax_queries = pd.read_excel(\n", + " \"/lakehouse/default/Files/DAXExcelFileName.xlsx\", \"DAXTableName\"\n", + ")\n", + "\n", + "# Additional arguments controlling the behavior of query execution and logging\n", + "additional_arguments = {\n", + " \"roundNumber\": 1, # The current round of DAX testing. Will be considered when determine if maxNumberPerQuery is met or not\n", + " \"onlyRunNewQueries\": True, # Will determine if queries will stop being tested after maxNumberPerQuery is met\n", + " \"maxNumberPerQuery\": 1, # The max number of queries to capture per round, queryId, model and cache type\n", + " \"maxFailuresBeforeSkipping\": 5, # The number of failed query attempts per round, queryId, model and cache type before skipping\n", + " \"numberOfRunsPerQueryId\": 15, # The number of times to loop over each queryId. If all combos have met maxNumberPerQuery, the loop will break\n", + " \"stopQueryIdsAt\": 99, # Allows you to stop the queryId loop at a certain number, even if there are more queries present, i.e., there are queryIds 1-20 but stop at 5\n", + " \"forceStartQueriesAt1\": False, # If set to False, testing will stop at the first incomplete queryId instead of starting at queryId 1 \n", + " \"logTableName\": \"DAXTestingLogTableName\", # The name of the table in the attached lakehouse to save the performance logs to\n", + " \"clearAllLogs\": False, # Will drop the existing logs table before starting testing\n", + " \"clearCurrentRoundLogs\": False, # Will delete the logs associated with the current roundNumber before starting testing\n", + " \"randomizeRuns\": True, # Will randomize the model and cache type combos when testing\n", + " \"skipSettingHotCache\": False, # Should be False if randomizing the runs. If the runs are randomized, the previous warm cache run will set the hot cache\n", + " \"pauseAfterSettingCache\": 5, # The number of seconds to wait after setting the cache\n", + " \"pauseAfterRunningQuery\": 5, # The number of second to wait before writing the logs to the log table\n", + " \"pauseBetweenRuns\": 30, # The number of seconds to wait before starting the next query\n", + "}\n", + "\n", + "# Define the expected schema for DAX trace log events\n", + "event_schema = {\n", + " \"DirectQueryBegin\": [\n", + " \"EventClass\",\n", + " \"CurrentTime\",\n", + " \"TextData\",\n", + " \"StartTime\",\n", + " \"EndTime\",\n", + " \"Duration\",\n", + " \"CpuTime\",\n", + " \"Success\",\n", + " ],\n", + " \"DirectQueryEnd\": [\n", + " \"EventClass\",\n", + " \"CurrentTime\",\n", + " \"TextData\",\n", + " \"StartTime\",\n", + " \"EndTime\",\n", + " \"Duration\",\n", + " \"CpuTime\",\n", + " \"Success\",\n", + " ],\n", + " \"VertiPaqSEQueryBegin\": [\n", + " \"EventClass\",\n", + " \"EventSubclass\",\n", + " \"CurrentTime\",\n", + " \"TextData\",\n", + " \"StartTime\",\n", + " ],\n", + " \"VertiPaqSEQueryEnd\": [\n", + " \"EventClass\",\n", + " \"EventSubclass\",\n", + " \"CurrentTime\",\n", + " \"TextData\",\n", + " \"StartTime\",\n", + " \"EndTime\",\n", + " \"Duration\",\n", + " \"CpuTime\",\n", + " \"Success\",\n", + " ],\n", + " \"VertiPaqSEQueryCacheMatch\": [\n", + " \"EventClass\",\n", + " \"EventSubclass\",\n", + " \"CurrentTime\",\n", + " \"TextData\",\n", + " ],\n", + " \"QueryBegin\": [\n", + " \"EventClass\",\n", + " \"EventSubclass\",\n", + " \"CurrentTime\",\n", + " \"TextData\",\n", + " \"StartTime\",\n", + " \"ConnectionID\",\n", + " \"SessionID\",\n", + " \"RequestProperties\"\n", + " ],\n", + " \"QueryEnd\": [\n", + " \"EventClass\",\n", + " \"EventSubclass\",\n", + " \"CurrentTime\",\n", + " \"TextData\",\n", + " \"StartTime\",\n", + " \"EndTime\",\n", + " \"Duration\",\n", + " \"CpuTime\",\n", + " \"Success\",\n", + " \"ConnectionID\",\n", + " \"SessionID\",\n", + " ],\n", + "}\n", + "\n", + "# Dictionary to track if a capacity pause is needed for each model during testing\n", + "model_pause_capacity_needed = {}\n", + "\n", + "# Variables for Pausing/Resuming Capacities: credentials and configuration parameters for Azure Key Vault and resource management\n", + "resource_group_name = \"\"\n", + "subscription_id = \"\"\n", + "key_vault_uri = \"\"\n", + "key_vault_client_id = \"\"\n", + "key_vault_tenant_id = \"\"\n", + "key_vault_client_secret = \"\"\n", + "\n", + "# Enforce case-sensitivity in Spark to ensure column name matching is exact\n", + "spark.conf.set(\"spark.sql.caseSensitive\", True)" + ] + }, + { + "cell_type": "markdown", + "id": "ea5a9edc", + "metadata": {}, + "source": [ + "### Logging & Retry Decorators, Basic Helpers" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b0024e3b", + "metadata": {}, + "outputs": [], + "source": [ + "# Thread-local storage for tracking the call depth (used for indented printing)\n", + "_thread_local = local()\n", + "\n", + "@contextmanager\n", + "def indented_print(indent_level: int):\n", + " \"\"\"\n", + " Temporarily replaces the built-in print function with an indented version.\n", + " This helps in visually distinguishing nested function calls in the logs.\n", + " \n", + " Parameters:\n", + " indent_level (int): The indentation depth to apply.\n", + " \"\"\"\n", + " original_print = builtins.print\n", + "\n", + " def custom_print(*args, **kwargs):\n", + " indent = \" \" * indent_level\n", + " original_print(indent + \" \".join(map(str, args)), **kwargs)\n", + "\n", + " builtins.print = custom_print\n", + " try:\n", + " yield\n", + " finally:\n", + " builtins.print = original_print\n", + "\n", + "def log_function_calls(func):\n", + " \"\"\"\n", + " Decorator that logs the start and end of a function call with indented printing.\n", + " This is useful for tracking nested function calls in the execution logs.\n", + " \"\"\"\n", + " @functools.wraps(func)\n", + " def wrapper(*args, **kwargs):\n", + " if not hasattr(_thread_local, \"call_depth\"):\n", + " _thread_local.call_depth = 0\n", + "\n", + " indent = _thread_local.call_depth\n", + "\n", + " with indented_print(indent):\n", + " print(f\"✅ {func.__name__} - Starting\")\n", + "\n", + " _thread_local.call_depth += 1\n", + " try:\n", + " with indented_print(_thread_local.call_depth):\n", + " result = func(*args, **kwargs)\n", + " finally:\n", + " _thread_local.call_depth -= 1\n", + " with indented_print(_thread_local.call_depth):\n", + " print(f\"✅ {func.__name__} - Ending\")\n", + "\n", + " return result\n", + "\n", + " return wrapper\n", + "\n", + "def retry(exceptions, tries=3, delay=5, backoff=2, logger=None):\n", + " \"\"\"\n", + " Decorator for retrying a function call with exponential backoff.\n", + " It will attempt to call the function and, if specified exceptions occur, wait and retry.\n", + "\n", + " Parameters:\n", + " exceptions (tuple): Exception classes to catch.\n", + " tries (int): Number of attempts.\n", + " delay (int): Initial delay between attempts in seconds.\n", + " backoff (int): Multiplier applied to the delay between attempts.\n", + " logger (callable): Logging function to use for printing warnings.\n", + "\n", + " Returns:\n", + " The decorated function.\n", + " \"\"\"\n", + " def decorator_retry(func):\n", + " @functools.wraps(func)\n", + " def wrapper_retry(*args, **kwargs):\n", + " if not hasattr(_thread_local, \"call_depth\"):\n", + " _thread_local.call_depth = 0\n", + "\n", + " _tries, _delay = tries, delay\n", + " first_fail = True # Track whether we've failed before\n", + "\n", + " while _tries > 1:\n", + " try:\n", + " return func(*args, **kwargs)\n", + " except exceptions as e:\n", + " # Skip printing on the first failure; print on subsequent failures\n", + " if not first_fail:\n", + " with indented_print(_thread_local.call_depth):\n", + " print(f\"⚠️ {func.__name__} failed with {e}, retrying in {_delay} seconds...\")\n", + " else:\n", + " first_fail = False\n", + "\n", + " time.sleep(_delay)\n", + " _tries -= 1\n", + " _delay *= backoff\n", + "\n", + " # Last attempt (no retries left); let the exception bubble if it fails\n", + " return func(*args, **kwargs)\n", + "\n", + " return wrapper_retry\n", + "\n", + " return decorator_retry" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "810b0555", + "metadata": {}, + "outputs": [], + "source": [ + "def trace_started(_traces, _trace_name):\n", + " # Check if a specific trace (by name) has started by looking into the traces DataFrame.\n", + " return _traces.loc[_traces[\"Name\"] == _trace_name].shape[0] > 0\n", + "\n", + "def query_end_event_collected(_trace_events):\n", + " # Verify if the 'QueryEnd' event is present in the collected trace events.\n", + " return _trace_events.loc[_trace_events[\"Event Class\"] == \"QueryEnd\"].shape[0] > 0\n", + "\n", + "@log_function_calls\n", + "@retry(Exception, tries=10, delay=2, backoff=2, logger=print)\n", + "def wait_for_trace_start(trace_connection, trace_name):\n", + " # Wait until the trace with the specified name is detected as started.\n", + " if not trace_started(trace_connection.list_traces(), trace_name):\n", + " raise Exception(\"Trace has not started yet\")\n", + " return True\n", + "\n", + "@log_function_calls\n", + "@retry(Exception, tries=60, delay=3, backoff=1, logger=print)\n", + "def wait_for_query_end_event(trace):\n", + " # Poll until the trace logs indicate that the query end event has been collected.\n", + " logs = trace.get_trace_logs()\n", + " if not query_end_event_collected(logs):\n", + " raise Exception(\"Query end event not collected yet\")\n", + " return logs\n", + "\n", + "@log_function_calls\n", + "@retry(Exception, tries=30, delay=5, backoff=1, logger=print)\n", + "def check_model_online(_model):\n", + " # Check if the model is online by executing a simple DAX query.\n", + " dax_query_eval_1(_model)\n", + "\n", + "@log_function_calls\n", + "@retry(Exception, tries=30, delay=2, backoff=2, logger=print)\n", + "def wait_for_capacity_status(_capacity_name, target_status):\n", + " # Check the current status of a capacity; wait until it matches the target status.\n", + " current_status = labs.list_capacities().loc[\n", + " labs.list_capacities()[\"Display Name\"] == _capacity_name, \"State\"\n", + " ].iloc[0]\n", + " \n", + " if current_status != target_status:\n", + " raise Exception(\"Capacity status not updated yet\")\n", + " return current_status\n", + "\n", + "def dax_query_eval_1(_model):\n", + " # Execute a simple DAX query to verify connectivity and evaluate model responsiveness.\n", + " fabric.evaluate_dax(\n", + " _model[\"name\"], \"EVALUATE {1}\", workspace=_model[\"model_workspace_name\"]\n", + " )\n", + "\n", + "@log_function_calls\n", + "def wait_for_model_to_come_online(_model):\n", + " # Wait until the model is confirmed to be online; raise an exception if it fails.\n", + " try:\n", + " check_model_online(_model)\n", + " print(\"✅ Model is online\")\n", + " except Exception as e:\n", + " raise Exception(\"❌ Model failed to come online\") from e" + ] + }, + { + "cell_type": "markdown", + "id": "ab8bb86c", + "metadata": {}, + "source": [ + "### Pause & Resume Capacity" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5e0c2a30", + "metadata": {}, + "outputs": [], + "source": [ + "@log_function_calls\n", + "def update_model_pause_status(event, model=None, workspace=None):\n", + " \"\"\"\n", + " Updates the model_pause_capacity_needed dictionary based on specific events.\n", + "\n", + " Parameters:\n", + " event (str): The event type. Allowed values are:\n", + " - \"initialize\": Initializes the dictionary so that every model is set to True,\n", + " except DirectLake models which are always False.\n", + " - \"model_queried\": A model has been queried. For Import models, mark it as True.\n", + " For DirectQuery models, mark all models sharing the same database_name and\n", + " database_workspace_name as True.\n", + " - \"capacity_paused\": After a capacity pause, for the given workspace,\n", + " mark as False all Import models in that workspace and all DirectQuery models whose\n", + " database_workspace_name matches that workspace.\n", + " model (dict, optional): The model dictionary (required for \"model_queried\").\n", + " workspace (str, optional): The workspace name (required for \"capacity_paused\").\n", + " \"\"\"\n", + " global model_pause_capacity_needed\n", + "\n", + " if event == \"initialize\":\n", + " # Set every model to True except for DirectLake models, which are always False.\n", + " for m in models:\n", + " if m[\"storageMode\"] == \"DirectLake\":\n", + " model_pause_capacity_needed[m[\"name\"]] = False\n", + " else:\n", + " model_pause_capacity_needed[m[\"name\"]] = True\n", + "\n", + " elif event == \"model_queried\" and model is not None:\n", + " if model[\"storageMode\"] == \"Import\":\n", + " model_pause_capacity_needed[model[\"name\"]] = True\n", + " elif model[\"storageMode\"] == \"DirectQuery\":\n", + " # Mark all DirectQuery models sharing the same database settings as True.\n", + " target_db = model[\"database_name\"]\n", + " target_db_workspace = model[\"database_workspace_name\"]\n", + " for m in models:\n", + " if (\n", + " m[\"storageMode\"] == \"DirectQuery\"\n", + " and m[\"database_name\"] == target_db\n", + " and m[\"database_workspace_name\"] == target_db_workspace\n", + " ):\n", + " model_pause_capacity_needed[m[\"name\"]] = True\n", + "\n", + " elif event == \"capacity_paused\" and workspace is not None:\n", + " # For the given workspace, mark as False all Import models and all DirectQuery models\n", + " # whose database_workspace_name matches the workspace.\n", + " for m in models:\n", + " if m[\"model_workspace_name\"] == workspace and m[\"storageMode\"] == \"Import\":\n", + " model_pause_capacity_needed[m[\"name\"]] = False\n", + " for m in models:\n", + " if (\n", + " m[\"storageMode\"] == \"DirectQuery\"\n", + " and m[\"database_workspace_name\"] == workspace\n", + " ):\n", + " model_pause_capacity_needed[m[\"name\"]] = False\n", + "\n", + " else:\n", + " print(f\"⚠️ Unknown event '{event}' or missing required parameter(s).\")\n", + "\n", + " # Debug output (optional)\n", + " print(\"📝 Updated model_pause_capacity_needed\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6a4307ee", + "metadata": {}, + "outputs": [], + "source": [ + "@labs.log_function_calls\n", + "def pause_resume_capacity(_capacity_name, _action, _simplify_logs=False):\n", + " \"\"\"\n", + " Pauses or resumes a given capacity using Semantic Link Labs functions.\n", + " \n", + " Parameters:\n", + " _capacity_name: The name of the capacity to be paused or resumed.\n", + " _action: The action to perform, either \"pause\" or \"resume\".\n", + " _simplify_logs: Optional flag to simplify logging output.\n", + " \"\"\"\n", + " print(f\"🔄 {_action.title()} capacity '{_capacity_name}': Attempting\")\n", + "\n", + " # Check current status using the new labs function.\n", + " current_status = labs.list_capacities().loc[\n", + " labs.list_capacities()[\"Display Name\"] == _capacity_name, \"State\"\n", + " ].iloc[0]\n", + " \n", + " # Define mapping options for pause and resume actions.\n", + " action_options = {\n", + " \"pause\": {\n", + " \"expected_status\": \"Active\", # Current status needed to perform a pause.\n", + " \"target_status\": \"Paused\",\n", + " \"action_function\": labs.suspend_fabric_capacity,\n", + " },\n", + " \"resume\": {\n", + " \"expected_status\": \"Paused\", # Current status needed to perform a resume.\n", + " \"target_status\": \"Active\",\n", + " \"action_function\": labs.resume_fabric_capacity,\n", + " },\n", + " }\n", + " \n", + " if current_status == action_options[_action][\"expected_status\"]:\n", + " print(f\"🛠️ {_action.title()} capacity '{_capacity_name}': Requesting action\")\n", + " \n", + " # Call the appropriate labs function for pausing or resuming the capacity.\n", + " action_options[_action][\"action_function\"](\n", + " capacity_name=_capacity_name,\n", + " azure_subscription_id=subscription_id,\n", + " resource_group=resource_group_name,\n", + " key_vault_uri=key_vault_uri,\n", + " key_vault_tenant_id=key_vault_tenant_id,\n", + " key_vault_client_id=key_vault_client_id,\n", + " key_vault_client_secret=key_vault_client_secret # Use the provided secret\n", + " )\n", + " \n", + " # Construct the GET URL for checking capacity status.\n", + " base_url = (\n", + " f\"https://management.azure.com/subscriptions/{subscription_id}\"\n", + " f\"/resourceGroups/{resource_group_name}\"\n", + " f\"/providers/Microsoft.Fabric/capacities/{_capacity_name}\"\n", + " )\n", + " get_url = f\"{base_url}?api-version=2023-11-01\"\n", + " \n", + " # Create headers using the token provider.\n", + " token_provider = auth.token_provider.get()\n", + " if token_provider is None:\n", + " token_provider = ServicePrincipalTokenProvider.from_azure_key_vault(\n", + " key_vault_uri=key_vault_uri,\n", + " key_vault_tenant_id=key_vault_tenant_id,\n", + " key_vault_client_id=key_vault_client_id,\n", + " key_vault_client_secret=key_vault_client_secret\n", + " )\n", + " headers = _get_headers(token_provider, audience=\"azure\")\n", + " \n", + " # Wait for the capacity status to change to the target status.\n", + " try:\n", + " wait_for_capacity_status(_capacity_name, action_options[_action][\"target_status\"])\n", + " print(f\"✅ {_action.title()} capacity '{_capacity_name}': Action successful\")\n", + " except Exception as e:\n", + " print(f\"⚠️ {_action.title()} capacity '{_capacity_name}': Timeout waiting for target status. Error: {e}\")\n", + " else:\n", + " print(f\"ℹ️ {_action.title()} capacity '{_capacity_name}': Already '{current_status}'\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "3af750ca", + "metadata": {}, + "source": [ + "### Cache-setting functions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "370e2ae8", + "metadata": {}, + "outputs": [], + "source": [ + "@log_function_calls\n", + "@retry(exceptions=(Exception,), tries=5, delay=5, backoff=2)\n", + "def clear_vertipaq_cache(_model):\n", + " \"\"\"\n", + " Clears the VertiPaq cache by calling labs.clear_cache.\n", + " Retries automatically up to 'tries' times if there's an error.\n", + " \"\"\"\n", + " print(\"🧹 Clearing VertiPaq cache\")\n", + " wait_for_model_to_come_online(_model)\n", + "\n", + " # Attempt the clearing and verify by executing a simple DAX query.\n", + " try:\n", + " labs.clear_cache(_model[\"name\"], workspace=_model[\"model_workspace_name\"])\n", + " dax_query_eval_1(_model)\n", + " print(\"✅ Clear VertiPaq cache successful\")\n", + " except Exception as e:\n", + " # If clearing fails, refresh the TOM cache before retrying.\n", + " print(\"🔄 Clearing VertiPaq cache failed; retrying...\")\n", + " fabric.refresh_tom_cache(_model[\"model_workspace_name\"])\n", + " raise e # Re-raise exception to trigger the retry mechanism\n", + "\n", + " # Small buffer after clearing to allow processes to settle.\n", + " time.sleep(5)\n", + "\n", + "@log_function_calls\n", + "def set_hot_cache(_model, _expression, successful_query_count_goal=2):\n", + " \"\"\"\n", + " Executes the same query multiple times to prime the cache (hot cache).\n", + " The goal is to have a specified number of successful queries to confirm the cache is set.\n", + " \"\"\"\n", + " print(\"🔥 Setting Hot Cache\")\n", + " successful_query_count = 0\n", + " number_of_query_attempts = (\n", + " successful_query_count_goal * 5 if successful_query_count_goal > 1 else 1\n", + " )\n", + "\n", + " if additional_arguments[\"skipSettingHotCache\"]:\n", + " successful_query_count = successful_query_count_goal\n", + " else:\n", + " for _ in range(number_of_query_attempts):\n", + " # Drop any existing traces before starting a new trace for hot cache priming.\n", + " fabric.create_trace_connection(\n", + " _model[\"name\"], _model[\"model_workspace_name\"]\n", + " ).drop_traces()\n", + " trace_name = f\"Cache Trace {str(uuid4())}\"\n", + " with fabric.create_trace_connection(\n", + " _model[\"name\"], _model[\"model_workspace_name\"]\n", + " ) as trace_connection:\n", + " with trace_connection.create_trace(event_schema, trace_name) as trace:\n", + " print(\"🔍 Starting trace for hot cache\")\n", + " trace.start()\n", + " wait_for_trace_start(trace_connection, trace_name)\n", + " try:\n", + " print(\"⚡ Executing DAX query for hot cache\")\n", + " fabric.evaluate_dax(\n", + " _model[\"name\"],\n", + " _expression,\n", + " workspace=_model[\"model_workspace_name\"],\n", + " )\n", + " successful_query_count += 1\n", + " print(\"✅ DAX query succeeded for hot cache\")\n", + " except Exception as e:\n", + " print(\"❌ DAX query failed for hot cache:\", e)\n", + "\n", + " print(\"📜 Collecting trace logs for hot cache\")\n", + " wait_for_query_end_event(trace)\n", + " trace.stop()\n", + "\n", + " if successful_query_count == successful_query_count_goal:\n", + " break\n", + "\n", + " print(f\"✅ Hot cache set; goal: {successful_query_count_goal} successful queries\")\n", + " return successful_query_count == successful_query_count_goal\n", + "\n", + "@log_function_calls\n", + "def set_warm_cache(_model, _expression):\n", + " \"\"\"\n", + " Sets a warm cache for the model.\n", + " - For DirectLake models, performs a hot cache query then clears the VertiPaq cache.\n", + " - For DirectQuery models, ensures a cold state first if needed.\n", + " \"\"\"\n", + " print(\"🔥 Setting Warm Cache\")\n", + "\n", + " if _model[\"storageMode\"] == \"DirectQuery\":\n", + " # For DirectQuery, simulate a cold cache before warming.\n", + " set_cold_cache(_model)\n", + "\n", + " # Prime the cache with a hot query and then clear the VertiPaq cache.\n", + " hot_cache_successful = set_hot_cache(\n", + " _model, _expression, successful_query_count_goal=1\n", + " )\n", + " clear_vertipaq_cache(_model)\n", + "\n", + " print(\"✅ Warm cache set\")\n", + " return hot_cache_successful\n", + "\n", + "@retry(exceptions=(Exception,), tries=5, delay=5, backoff=2)\n", + "def _refresh_tom_cache(workspace_name):\n", + " \"\"\"\n", + " Calls fabric.refresh_tom_cache(workspace_name) exactly once,\n", + " raising an exception if it fails. The '@retry' decorator will call it again on failure.\n", + " \"\"\"\n", + " print(f\"⌛ Refreshing TOM cache for workspace '{workspace_name}'\")\n", + " fabric.refresh_tom_cache(workspace_name)\n", + " print(\"✅ TOM cache refreshed successfully\")\n", + "\n", + "@retry(exceptions=(Exception,), tries=30, delay=3, backoff=2)\n", + "def _wait_for_refresh_to_complete(_model, refresh_id):\n", + " \"\"\"\n", + " Checks the status of a dataset refresh once. If it is still 'InProgress' or not in the expected terminal states,\n", + " raises an exception to trigger a retry.\n", + " \"\"\"\n", + " status = fabric.get_refresh_execution_details(\n", + " _model[\"name\"],\n", + " refresh_id,\n", + " workspace=_model[\"model_workspace_name\"],\n", + " ).status\n", + "\n", + " if status not in [\"Completed\", \"Failed\"]:\n", + " # If refresh is still in progress, raise an exception to trigger another retry attempt.\n", + " raise Exception(f\"Refresh status is '{status}'; not done yet.\")\n", + "\n", + " # Log completion status for the refresh process.\n", + " print(f\"✅ Refresh status: '{status}' - finishing polling.\")\n", + "\n", + "@log_function_calls\n", + "def set_cold_cache(_model):\n", + " \"\"\"\n", + " Sets a cold cache for the model:\n", + " - For DirectLake: perform a clearValues refresh, then a full refresh, and finally clear the VertiPaq cache.\n", + " - For Import/DirectQuery: perform capacity reassignment and pause/resume operations, followed by a refresh.\n", + " \"\"\"\n", + " print(\"❄️ Setting Cold Cache\")\n", + " if _model[\"storageMode\"] != \"DirectLake\":\n", + " if model_pause_capacity_needed[_model[\"name\"]]:\n", + " ws_caps = workspace_capacities[_model[\"model_workspace_name\"]]\n", + " print(f\"🔄 Assigning alternate capacity for workspace '{_model['model_workspace_name']}'\")\n", + " labs.assign_workspace_to_capacity(ws_caps[\"alt_capacity_name\"], _model[\"model_workspace_name\"])\n", + " print(f\"✅ Alternate capacity assigned: {ws_caps['alt_capacity_name']}\")\n", + "\n", + " pause_resume_capacity(ws_caps[\"capacity_name\"], \"pause\")\n", + " pause_resume_capacity(ws_caps[\"capacity_name\"], \"resume\")\n", + "\n", + " print(f\"🔄 Reassigning primary capacity for workspace '{_model['model_workspace_name']}'\")\n", + " labs.assign_workspace_to_capacity(ws_caps[\"capacity_name\"], _model[\"model_workspace_name\"])\n", + " print(f\"✅ Primary capacity assigned: {ws_caps['capacity_name']}\")\n", + "\n", + " # Refresh the TOM cache and ensure the model is online after capacity actions.\n", + " _refresh_tom_cache(_model[\"model_workspace_name\"])\n", + "\n", + " wait_for_model_to_come_online(_model)\n", + "\n", + " # Update the pause status after the capacity has been paused.\n", + " update_model_pause_status(\"capacity_paused\", workspace=_model[\"model_workspace_name\"])\n", + "\n", + " time.sleep(30) # Allow time for the system to settle\n", + " clear_vertipaq_cache(_model)\n", + " else:\n", + " print(\"ℹ️ Performing clear refresh for cold cache\")\n", + " refresh_status_clear = fabric.refresh_dataset(\n", + " _model[\"name\"],\n", + " refresh_type=\"clearValues\",\n", + " workspace=_model[\"model_workspace_name\"],\n", + " )\n", + "\n", + " # Wait (by polling) for the clear refresh to complete or fail.\n", + " _wait_for_refresh_to_complete(_model, refresh_status_clear)\n", + "\n", + " print(\"✅ Clear refresh completed; performing full refresh\")\n", + " refresh_status_full = fabric.refresh_dataset(\n", + " _model[\"name\"],\n", + " refresh_type=\"full\",\n", + " workspace=_model[\"model_workspace_name\"],\n", + " )\n", + "\n", + " # Wait for the full refresh to complete or fail.\n", + " _wait_for_refresh_to_complete(_model, refresh_status_full)\n", + "\n", + " # Finally, clear the VertiPaq cache (with retry logic in place).\n", + " clear_vertipaq_cache(_model)\n", + "\n", + " print(\"✅ Cold cache set\")" + ] + }, + { + "cell_type": "markdown", + "id": "72e80aa6", + "metadata": {}, + "source": [ + "### Log-table helpers & query checks" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "951ea4c5", + "metadata": {}, + "outputs": [], + "source": [ + "@log_function_calls\n", + "def get_log_table(_table_name):\n", + " \"\"\"\n", + " Returns a Spark DataFrame of the existing log table filtered for\n", + " the environment & roundNumber. If the table doesn't exist, returns None.\n", + " \"\"\"\n", + " try:\n", + " raw_table = spark.table(_table_name)\n", + " base_filters = (\n", + " (col(\"roundNumber\") == additional_arguments[\"roundNumber\"])\n", + " & (col(\"Event_Class\") == \"QueryEnd\")\n", + " )\n", + " return raw_table.filter(base_filters)\n", + " except Exception:\n", + " return None\n", + "\n", + "@log_function_calls\n", + "def max_queries_met(_check_logs, _log_table, _model_cache_combo, _queryId):\n", + " \"\"\"\n", + " Checks if the maximum number of queries for a given\n", + " model/cache/queryId combo has been met.\n", + " \n", + " The query is skipped if either:\n", + " - The count of successful queries is greater than or equal to additional_arguments[\"maxNumberPerQuery\"], or\n", + " - The count of failed queries is greater than or equal to additional_arguments[\"maxFailuresBeforeSkipping\"].\n", + " \"\"\"\n", + " if _check_logs and _log_table is not None:\n", + " base_filters = (\n", + " (col(\"modelName\") == _model_cache_combo[\"model\"][\"name\"])\n", + " & (col(\"queryId\") == _queryId)\n", + " & (col(\"cacheType\") == _model_cache_combo[\"cache_type\"])\n", + " )\n", + " \n", + " success_count = _log_table.filter(\n", + " base_filters & (col(\"Success\") == \"Success\")\n", + " ).count()\n", + " \n", + " failure_count = _log_table.filter(\n", + " base_filters & (col(\"Success\") == \"Failure\")\n", + " ).count()\n", + "\n", + " result = (\n", + " success_count >= additional_arguments[\"maxNumberPerQuery\"]\n", + " or failure_count >= additional_arguments[\"maxFailuresBeforeSkipping\"]\n", + " )\n", + " \n", + " print(\n", + " f\"📊 {'Skipping' if result else 'Continuing'} queries (Success: {success_count}, Failure: {failure_count})\"\n", + " )\n", + " return result\n", + " else:\n", + " return False\n", + "\n", + "@log_function_calls\n", + "def get_starting_query_id(_log_table, additional_arguments):\n", + " \"\"\"\n", + " Determines the next queryId to start from by checking how many queries\n", + " have fully met the maxNumberPerQuery across all model/cache combos.\n", + " \"\"\"\n", + " print(\"🔍 Determining starting query ID\")\n", + " if _log_table is not None:\n", + " success_failure_counts = _log_table.groupBy(\n", + " \"modelName\", \"cacheType\", \"queryId\"\n", + " ).agg(\n", + " _sum(when(col(\"Success\") == \"Success\", 1).otherwise(0)).alias(\n", + " \"success_count\"\n", + " ),\n", + " _sum(when(col(\"Success\") == \"Failure\", 1).otherwise(0)).alias(\n", + " \"failure_count\"\n", + " ),\n", + " )\n", + "\n", + " valid_queries = success_failure_counts.filter(\n", + " success_count >= additional_arguments[\"maxNumberPerQuery\"]\n", + " or failure_count >= additional_arguments[\"maxFailuresBeforeSkipping\"]\n", + " )\n", + "\n", + " distinct_combos_count = (\n", + " valid_queries.select(\"modelName\", \"cacheType\").distinct().count()\n", + " )\n", + "\n", + " valid_query_ids = (\n", + " valid_queries.groupBy(\"queryId\")\n", + " .agg(countDistinct(\"modelName\", \"cacheType\").alias(\"valid_combinations\"))\n", + " .filter(col(\"valid_combinations\") == distinct_combos_count)\n", + " )\n", + "\n", + " query_id_list = [\n", + " row[\"queryId\"] for row in valid_query_ids.select(\"queryId\").collect()\n", + " ]\n", + " query_id_list.sort()\n", + "\n", + " max_query_id = 0\n", + " for i, qid in enumerate(query_id_list):\n", + " if qid != i + 1:\n", + " break\n", + " max_query_id = qid\n", + "\n", + " starting_query_id = 1 if max_query_id == 0 else max_query_id + 1\n", + " else:\n", + " print(f\"ℹ️ Log table {additional_arguments['logTableName']} does not exist\")\n", + " starting_query_id = 1\n", + "\n", + " print(f\"✅ Starting query ID set to {starting_query_id}\")\n", + " return starting_query_id" + ] + }, + { + "cell_type": "markdown", + "id": "1363ac8b", + "metadata": {}, + "source": [ + "### Main DAX testing orchestration functions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "125da13c", + "metadata": {}, + "outputs": [], + "source": [ + "@log_function_calls\n", + "def run_dax_query_and_collect_logs(_model_cache_combo, _dax_query, _log_table):\n", + " \"\"\"\n", + " Runs a single DAX query (given model + cache type + queryId),\n", + " collects logs, and appends them to the table.\n", + " \"\"\"\n", + " _model = _model_cache_combo[\"model\"]\n", + " used_dax_expression = _dax_query[_model[\"runQueryType\"]]\n", + " query_run_name = f\"Model: {_model['name']}, QueryId: {_dax_query['queryId']}, Cache Type: {_model_cache_combo['cache_type']}\"\n", + " valid_cache_type_for_model = (\n", + " _model_cache_combo[\"cache_type\"] in _model[\"cache_types\"]\n", + " )\n", + "\n", + " print(f\"🚀 Starting query: {query_run_name}\")\n", + "\n", + " if (\n", + " not max_queries_met(\n", + " additional_arguments[\"onlyRunNewQueries\"],\n", + " _log_table,\n", + " _model_cache_combo,\n", + " _dax_query[\"queryId\"],\n", + " )\n", + " and valid_cache_type_for_model\n", + " ):\n", + "\n", + " # Record the time before cache setup begins.\n", + " set_cache_start_time = datetime.now().isoformat()\n", + "\n", + " wait_for_model_to_come_online(_model)\n", + "\n", + " # Set the desired cache state before running the query.\n", + " if _model_cache_combo[\"cache_type\"] == \"cold\":\n", + " set_cold_cache(_model)\n", + " cache_set = True\n", + " elif _model_cache_combo[\"cache_type\"] == \"warm\":\n", + " cache_set = set_warm_cache(_model, used_dax_expression)\n", + " else: # 'hot'\n", + " cache_set = set_hot_cache(_model, used_dax_expression)\n", + "\n", + " # Record the time after cache setup completes.\n", + " time.sleep(additional_arguments[\"pauseAfterSettingCache\"])\n", + " set_cache_end_time = datetime.now().isoformat()\n", + "\n", + " # Mark that the model has been queried for pause/resume tracking.\n", + " update_model_pause_status(\"model_queried\", model=_model)\n", + "\n", + " query_start_time = datetime.now().isoformat()\n", + "\n", + " # Start a new trace for the DAX query execution.\n", + " fabric.create_trace_connection(\n", + " _model[\"name\"], _model[\"model_workspace_name\"]\n", + " ).drop_traces()\n", + " trace_name = f\"Simple DAX Trace {uuid4()}\"\n", + "\n", + " with fabric.create_trace_connection(\n", + " _model[\"name\"], _model[\"model_workspace_name\"]\n", + " ) as trace_connection:\n", + " with trace_connection.create_trace(event_schema, trace_name) as trace:\n", + " print(\"🔍 Starting trace for DAX query\")\n", + " trace.start()\n", + " wait_for_trace_start(trace_connection, trace_name)\n", + "\n", + " dax_query_result = \"Success\"\n", + " try:\n", + " print(\"⚡ Executing DAX query\")\n", + " fabric.evaluate_dax(\n", + " _model[\"name\"],\n", + " used_dax_expression,\n", + " workspace=_model[\"model_workspace_name\"],\n", + " )\n", + " print(\"✅ DAX query executed successfully\")\n", + " except Exception as e:\n", + " dax_query_result = str(e)\n", + " print(\"❌ DAX query execution failed:\", e)\n", + "\n", + " print(\"📜 Collecting trace logs\")\n", + " wait_for_query_end_event(trace)\n", + " current_query_trace_logs = trace.stop()\n", + "\n", + " # Extract RequestProperties from the QueryBegin event if available.\n", + " if \"Request Properties\" in current_query_trace_logs.columns:\n", + " query_begin_rows = current_query_trace_logs[current_query_trace_logs[\"Event Class\"] == \"QueryBegin\"]\n", + " if not query_begin_rows.empty:\n", + " request_properties_value = query_begin_rows.iloc[0][\"Request Properties\"]\n", + " else:\n", + " request_properties_value = None\n", + " current_query_trace_logs[\"Request Properties\"] = request_properties_value\n", + " else:\n", + " current_query_trace_logs[\"Request Properties\"] = None\n", + "\n", + " time.sleep(additional_arguments[\"pauseAfterRunningQuery\"])\n", + " query_end_time = datetime.now().isoformat()\n", + " \n", + " # Append metadata columns to the trace logs DataFrame.\n", + " current_query_trace_logs = current_query_trace_logs.assign(\n", + " runId=run_id,\n", + " setCacheStartTime=set_cache_start_time,\n", + " setCacheEndTime=set_cache_end_time,\n", + " queryStartTime=query_start_time,\n", + " queryEndTime=query_end_time,\n", + " modelName=_model[\"name\"],\n", + " queryId=_dax_query[\"queryId\"],\n", + " runQueryType=_model[\"runQueryType\"],\n", + " queryUUID=str(uuid4()),\n", + " cacheType=_model_cache_combo[\"cache_type\"],\n", + " queryResult=dax_query_result,\n", + " storageMode=_model[\"storageMode\"],\n", + " sentExpression=used_dax_expression,\n", + " roundNumber=additional_arguments[\"roundNumber\"],\n", + " )\n", + "\n", + " # If cache was not set properly, mark the query as failed.\n", + " if not cache_set:\n", + " print(\"❌ Cache was not set properly; marking query as failed\")\n", + " current_query_trace_logs = current_query_trace_logs.assign(\n", + " Success=\"Failure\"\n", + " )\n", + "\n", + " # Format column names for Spark by replacing spaces with underscores.\n", + " current_query_trace_logs.columns = current_query_trace_logs.columns.str.replace(\n", + " \" \", \"_\"\n", + " )\n", + " current_query_trace_logs = spark.createDataFrame(current_query_trace_logs)\n", + "\n", + " print(\"💾 Appending logs to table\")\n", + " current_query_trace_logs.write.format(\"delta\").mode(\"append\").option(\n", + " \"mergeSchema\", \"true\"\n", + " ).saveAsTable(additional_arguments[\"logTableName\"])\n", + "\n", + " print(\"✅ Logs appended to table\")\n", + "\n", + " print(\"ℹ️ Pausing between runs\")\n", + " time.sleep(additional_arguments[\"pauseBetweenRuns\"])\n", + "\n", + " return \"Ran\"\n", + "\n", + " else:\n", + " print(\"⏭️ Query skipped (logs exist or invalid cache type)\")\n", + " return \"Skipped\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6cc9509f", + "metadata": {}, + "outputs": [], + "source": [ + "@log_function_calls\n", + "def run_dax_queries():\n", + " \"\"\"\n", + " Primary entry point: runs all queries from the loaded DAX Excel file.\n", + " Handles log table management, capacity checks, and iteration over queries.\n", + " \"\"\"\n", + " print(\"🚀 Starting all DAX queries\")\n", + "\n", + " # Initialize the pause status for each model.\n", + " update_model_pause_status(\"initialize\")\n", + "\n", + " # Handle log table clearing or retrieval based on additional arguments.\n", + " if additional_arguments[\"clearCurrentRoundLogs\"]:\n", + " print(\n", + " f\"🗑️ Dropping round {additional_arguments['roundNumber']} logs from {additional_arguments['logTableName']}\"\n", + " )\n", + " spark.sql(\n", + " f\"DELETE FROM {additional_arguments['logTableName']} WHERE roundNumber = {additional_arguments['roundNumber']}\"\n", + " )\n", + " if additional_arguments[\"clearAllLogs\"]:\n", + " print(f\"🗑️ Dropping entire table {additional_arguments['logTableName']}\")\n", + " spark.sql(f\"DROP TABLE IF EXISTS {additional_arguments['logTableName']}\")\n", + " startQueryIdsAt = 1\n", + " else:\n", + " print(f\"🔍 Retrieving table {additional_arguments['logTableName']}\")\n", + " log_table = get_log_table(additional_arguments[\"logTableName\"])\n", + " startQueryIdsAt = (\n", + " 1\n", + " if additional_arguments[\"clearCurrentRoundLogs\"]\n", + " or additional_arguments[\"forceStartQueriesAt1\"]\n", + " or log_table is None\n", + " else get_starting_query_id(log_table, additional_arguments)\n", + " )\n", + "\n", + " # Check if capacity pause/resume logic is required based on model storage modes.\n", + " include_pause_resume_logic = any(\n", + " model[\"storageMode\"] in [\"Import\", \"DirectQuery\"]\n", + " and \"cold\" in model[\"cache_types\"]\n", + " for model in models\n", + " )\n", + "\n", + " if include_pause_resume_logic:\n", + " # Validate that all required workspace capacities are defined.\n", + " for m in models:\n", + " ws = m[\"model_workspace_name\"]\n", + " if ws not in workspace_capacities:\n", + " raise ValueError(\n", + " f\"The workspace '{ws}' (in model '{m['name']}') is not found \"\n", + " \"in the 'workspace_capacities' dictionary. Please add it.\"\n", + " )\n", + "\n", + " # Resume both primary and alternate capacities and reassign the primary capacity.\n", + " for ws, caps in workspace_capacities.items():\n", + " pause_resume_capacity(caps[\"capacity_name\"], \"resume\", _simplify_logs=True)\n", + " pause_resume_capacity(\n", + " caps[\"alt_capacity_name\"], \"resume\", _simplify_logs=True\n", + " )\n", + " print(\n", + " f\"✅ Assigning primary capacity '{caps['capacity_name']}' to workspace '{ws}'\"\n", + " )\n", + " labs.assign_workspace_to_capacity(caps[\"capacity_name\"], ws)\n", + "\n", + " # Loop over each DAX query from the Excel file.\n", + " for _, dax_query in dax_queries.iterrows():\n", + " if (\n", + " dax_query[\"queryId\"] <= additional_arguments[\"stopQueryIdsAt\"]\n", + " and dax_query[\"queryId\"] >= startQueryIdsAt\n", + " ):\n", + " for _ in range(additional_arguments[\"numberOfRunsPerQueryId\"]):\n", + " total_query_count = 0\n", + " skipped_query_count = 0\n", + " if additional_arguments[\"randomizeRuns\"]:\n", + " print(\"🔀 Randomizing run order of (model, cache_type)\")\n", + " model_cache_combo = (\n", + " pd.DataFrame(\n", + " itertools.product(models, [\"cold\", \"warm\", \"hot\"]),\n", + " columns=[\"model\", \"cache_type\"],\n", + " )\n", + " .sample(frac=1)\n", + " .reset_index(drop=True)\n", + " )\n", + " else:\n", + " df = pd.DataFrame(\n", + " itertools.product(models, [\"cold\", \"warm\", \"hot\"]),\n", + " columns=[\"model\", \"cache_type\"],\n", + " )\n", + " df[\"model_name\"] = df[\"model\"].apply(lambda m: m[\"name\"])\n", + " df[\"cache_order\"] = pd.Categorical(\n", + " df[\"cache_type\"],\n", + " categories=[\"cold\", \"warm\", \"hot\"],\n", + " ordered=True,\n", + " )\n", + " model_cache_combo = (\n", + " df.sort_values(by=[\"model_name\", \"cache_order\"])\n", + " .drop([\"model_name\", \"cache_order\"], axis=1)\n", + " .reset_index(drop=True)\n", + " )\n", + "\n", + " for _, current_combo in model_cache_combo.iterrows():\n", + " total_query_count += 1\n", + " # Update the pause status when a model is queried.\n", + " if include_pause_resume_logic:\n", + " update_model_pause_status(\n", + " \"model_queried\", model=current_combo[\"model\"]\n", + " )\n", + " run_status = run_dax_query_and_collect_logs(\n", + " current_combo, dax_query, log_table\n", + " )\n", + " if run_status == \"Skipped\":\n", + " skipped_query_count += 1\n", + "\n", + " print(\n", + " f\"🔄 Refreshing log table from {additional_arguments['logTableName']}\"\n", + " )\n", + " log_table = get_log_table(additional_arguments[\"logTableName\"])\n", + "\n", + " if total_query_count == skipped_query_count:\n", + " print(\n", + " \"ℹ️ No new queries; skipping additional runs for this query group\"\n", + " )\n", + " break\n", + "\n", + " if include_pause_resume_logic:\n", + " for ws, caps in workspace_capacities.items():\n", + " pause_resume_capacity(\n", + " caps[\"alt_capacity_name\"], \"pause\", _simplify_logs=True\n", + " )\n", + " print(\"✅ All queries complete\")" + ] + }, + { + "cell_type": "markdown", + "id": "00bf9c3a", + "metadata": {}, + "source": [ + "### Execute main flow" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e835a13c", + "metadata": {}, + "outputs": [], + "source": [ + "run_dax_queries()" + ] + }, + { + "cell_type": "markdown", + "id": "aada846a", + "metadata": {}, + "source": [ + "### Stop session" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4f8eb7f9", + "metadata": {}, + "outputs": [], + "source": [ + "mssparkutils.session.stop()" + ] + } + ], + "metadata": { + "kernel_info": { + "name": "synapse_pyspark" + }, + "kernelspec": { + "display_name": "Synapse PySpark", + "language": "Python", + "name": "synapse_pyspark" + }, + "language_info": { + "name": "python" + }, + "microsoft": { + "language": "python" + }, + "nteract": { + "version": "nteract-front-end@1.0.0" + }, + "spark_compute": { + "compute_id": "/trident/default" + }, + "synapse_widget": { + "state": {}, + "version": "0.1" + }, + "widgets": {} + }, + "nbformat": 4, + "nbformat_minor": 5 +}